This makes the Stream class a native citizen of the Java API.
It can be used with the NIO Channel API, as well as (In,Out)putStream's
using the java.nio.channels.Channels convenience wrappers.
---
src/main/java/org/libvirt/Stream.java | 171 ++++++++++++++++++++++++++++-
src/main/java/org/libvirt/jna/Libvirt.java | 6 +-
2 files changed, 172 insertions(+), 5 deletions(-)
diff --git a/src/main/java/org/libvirt/Stream.java
b/src/main/java/org/libvirt/Stream.java
index 404c9a0..975e1b6 100644
--- a/src/main/java/org/libvirt/Stream.java
+++ b/src/main/java/org/libvirt/Stream.java
@@ -1,12 +1,48 @@
package org.libvirt;
+import java.io.IOException;
+
+import java.nio.ByteBuffer;
+import java.nio.channels.ByteChannel;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.NonReadableChannelException;
+import java.nio.channels.NonWritableChannelException;
+
import org.libvirt.jna.Libvirt;
import org.libvirt.jna.StreamPointer;
import static org.libvirt.Library.libvirt;
import com.sun.jna.NativeLong;
-public class Stream {
+/**
+ * The Stream class is used to transfer data between a libvirt daemon
+ * and a client.
+ * <p>
+ * It implements the ByteChannel interface.
+ * <p>
+ * Basic usage:
+ *
+ * <pre>
+ * {@code
+ * ByteBuffer buf = ByteBuffer.allocate(1024);
+ * Stream str = conn.streamNew(0);
+ *
+ * ... // open the stream e.g. calling Domain.screenshot
+ *
+ * while (str.read(buf) != -1) {
+ * buf.flip();
+ * ... // do something with the data
+ * buf.compact();
+ * }
+ * }
+ * </pre>
+ * <p>
+ * If you want to use this class as an InputStream or OutputStream,
+ * convert it using the {@link java.nio.channels.Channels#newInputStream
+ * Channels.newInputStream} and {@link java.nio.channels.Channels#newOutputStream
+ * Channels.newOutputStream} respectively.
+ */
+public class Stream implements ByteChannel {
public static int VIR_STREAM_NONBLOCK = (1 << 0);
@@ -20,6 +56,56 @@ public class Stream {
*/
private Connect virConnect;
+ private final static int CLOSED = 0;
+ private final static int READABLE = 1;
+ private final static int WRITABLE = 2;
+ private final static int OPEN = READABLE | WRITABLE;
+ private final static int EOF = 4;
+
+ /* The status of the stream. A stream starts its live in the
+ * "CLOSED" state.
+ *
+ * It will be opened for input / output by another libvirt
+ * operation (e.g. virStorageVolDownload), which means it will
+ * be in state "READABLE" or "WRITABLE", exclusively.
+ *
+ * It will reach state "EOF", if {@link finish()} is called.
+ *
+ * It will be in the "CLOSED" state again, after calling abort()
+ * or close().
+ */
+ private int state = CLOSED;
+
+ void markReadable() {
+ assert !isWritable()
+ : "A Stream cannot be readable and writable at the same time";
+
+ state |= READABLE;
+ }
+
+ void markWritable() {
+ assert !isReadable()
+ : "A Stream cannot be readable and writable at the same time";
+
+ state |= WRITABLE;
+ }
+
+ boolean isReadable() {
+ return (state & READABLE) != 0;
+ }
+
+ boolean isWritable() {
+ return (state & WRITABLE) != 0;
+ }
+
+ protected boolean isEOF() {
+ return (state & EOF) != 0;
+ }
+
+ private void markEOF() {
+ state |= EOF;
+ }
+
Stream(Connect virConnect, StreamPointer VSP) {
this.virConnect = virConnect;
this.VSP = VSP;
@@ -32,6 +118,7 @@ public class Stream {
public int abort() throws LibvirtException {
int returnValue = libvirt.virStreamAbort(VSP);
processError();
+ this.state = CLOSED;
return returnValue;
}
@@ -70,6 +157,7 @@ public class Stream {
public int finish() throws LibvirtException {
int returnValue = libvirt.virStreamFinish(VSP);
processError();
+ markEOF();
return returnValue;
}
@@ -83,6 +171,7 @@ public class Stream {
public int free() throws LibvirtException {
int success = 0;
if (VSP != null) {
+ closeStream();
success = libvirt.virStreamFree(VSP);
processError();
VSP = null;
@@ -108,11 +197,82 @@ public class Stream {
* @throws LibvirtException
*/
public int receive(byte[] data) throws LibvirtException {
- int returnValue = libvirt.virStreamRecv(VSP, data, new NativeLong(data.length));
+ return receive(ByteBuffer.wrap(data));
+ }
+
+ protected int receive(ByteBuffer buffer) throws LibvirtException {
+ int returnValue = libvirt.virStreamRecv(VSP, buffer, new
NativeLong(buffer.remaining()));
processError();
+ buffer.position(buffer.position() + returnValue);
return returnValue;
}
+ @Override
+ public int read(ByteBuffer buffer) throws IOException {
+ if (!isOpen()) throw new ClosedChannelException();
+ if (!isReadable()) throw new NonReadableChannelException();
+ if (isEOF()) return -1;
+
+ try {
+ int ret = receive(buffer);
+
+ switch (ret) {
+ case 0:
+ finish();
+ return -1;
+
+ case -2:
+ throw new UnsupportedOperationException("non-blocking I/O stream not
yet supported");
+
+ default:
+ return ret;
+ }
+ } catch (LibvirtException e) {
+ throw new IOException("could not read from stream", e);
+ }
+ }
+
+ @Override
+ public int write(ByteBuffer buffer) throws IOException {
+ if (!isOpen()) throw new ClosedChannelException();
+ if (!isWritable()) throw new NonWritableChannelException();
+
+ int pos = buffer.position();
+
+ try {
+ while (buffer.hasRemaining()) {
+ int ret = send(buffer);
+
+ if (ret == -2)
+ throw new UnsupportedOperationException("non-blocking I/O stream
not yet supported");
+ }
+ return buffer.position() - pos;
+ } catch (LibvirtException e) {
+ throw new IOException("could not write to stream", e);
+ }
+ }
+
+ protected void closeStream() throws LibvirtException {
+ if (isOpen() && !isEOF()) {
+ abort();
+ }
+ this.state = CLOSED;
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ closeStream();
+ } catch (LibvirtException e) {
+ throw new IOException("error while closing Stream", e);
+ }
+ }
+
+ @Override
+ public boolean isOpen() {
+ return (this.state & OPEN) != 0;
+ }
+
/**
* Batch receive method
*
@@ -174,8 +334,13 @@ public class Stream {
* @since 1.5.2
*/
public int send(byte[] data) throws LibvirtException {
- int returnValue = libvirt.virStreamSend(VSP, data, new NativeLong(data.length));
+ return send(ByteBuffer.wrap(data));
+ }
+
+ protected int send(ByteBuffer buffer) throws LibvirtException {
+ int returnValue = libvirt.virStreamSend(VSP, buffer, new
NativeLong(buffer.remaining()));
processError();
+ buffer.position(buffer.position() + returnValue);
return returnValue;
}
diff --git a/src/main/java/org/libvirt/jna/Libvirt.java
b/src/main/java/org/libvirt/jna/Libvirt.java
index 98f2125..fe74087 100644
--- a/src/main/java/org/libvirt/jna/Libvirt.java
+++ b/src/main/java/org/libvirt/jna/Libvirt.java
@@ -8,6 +8,8 @@ import com.sun.jna.Pointer;
import com.sun.jna.ptr.IntByReference;
import com.sun.jna.ptr.LongByReference;
+import java.nio.ByteBuffer;
+
/**
* The libvirt interface which is exposed via JNA. The complete API is
* documented at
http://www.libvirt.org/html/libvirt-libvirt.html.
@@ -368,9 +370,9 @@ public interface Libvirt extends Library {
int virStreamFinish(StreamPointer virStreamPtr) ;
int virStreamFree(StreamPointer virStreamPtr) ;
StreamPointer virStreamNew(ConnectionPointer virConnectPtr, int flags) ;
- int virStreamSend(StreamPointer virStreamPtr, byte[] data, NativeLong size);
+ int virStreamSend(StreamPointer virStreamPtr, ByteBuffer data, NativeLong size);
int virStreamSendAll(StreamPointer virStreamPtr, Libvirt.VirStreamSourceFunc handler,
Pointer opaque);
- int virStreamRecv(StreamPointer virStreamPtr, byte[] data, NativeLong length);
+ int virStreamRecv(StreamPointer virStreamPtr, ByteBuffer data, NativeLong length);
int virStreamRecvAll(StreamPointer virStreamPtr, Libvirt.VirStreamSinkFunc handler,
Pointer opaque);
//DomainSnapshot Methods
--
1.8.5.2.msysgit.0