This makes the Stream class a native citizen of the Java API.
It can be used with the NIO Channel API, as well as (In,Out)putStream's
using the java.nio.channels.Channels convenience wrappers.
Signed-off-by: Claudio Bley <cbley(a)av-test.de>
---
src/main/java/org/libvirt/Stream.java | 183 ++++++++++++++++++++++++++--
src/main/java/org/libvirt/jna/Libvirt.java | 4 +-
2 files changed, 178 insertions(+), 9 deletions(-)
diff --git a/src/main/java/org/libvirt/Stream.java
b/src/main/java/org/libvirt/Stream.java
index c36ed70..71a2a6e 100644
--- a/src/main/java/org/libvirt/Stream.java
+++ b/src/main/java/org/libvirt/Stream.java
@@ -1,12 +1,46 @@
package org.libvirt;
+import java.io.IOException;
+
+import java.nio.ByteBuffer;
+import java.nio.channels.ByteChannel;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.NonReadableChannelException;
+import java.nio.channels.NonWritableChannelException;
+
import org.libvirt.jna.Libvirt;
import org.libvirt.jna.SizeT;
import org.libvirt.jna.StreamPointer;
import static org.libvirt.Library.libvirt;
import static org.libvirt.ErrorHandler.processError;
-public class Stream {
+/**
+ * The Stream class is used to transfer data between a libvirt daemon
+ * and a client.
+ * <p>
+ * It implements the ByteChannel interface.
+ * <p>
+ * Basic usage:
+ *
+ * <pre>
+ * {@code
+ * ByteBuffer buf = ByteBuffer.allocate(1024);
+ * Stream str = conn.streamNew(0);
+ *
+ * ... // open the stream e.g. calling Domain.screenshot
+ *
+ * while (str.read(buf) != -1) {
+ * buf.flip();
+ * ... // do something with the data
+ * buf.compact();
+ * }}</pre>
+ * <p>
+ * If you want to use this class as an InputStream or OutputStream,
+ * convert it using the {@link java.nio.channels.Channels#newInputStream
+ * Channels.newInputStream} and {@link java.nio.channels.Channels#newOutputStream
+ * Channels.newOutputStream} respectively.
+ */
+public class Stream implements ByteChannel {
public static int VIR_STREAM_NONBLOCK = (1 << 0);
@@ -20,6 +54,56 @@ public class Stream {
*/
private Connect virConnect;
+ private final static int CLOSED = 0;
+ private final static int READABLE = 1;
+ private final static int WRITABLE = 2;
+ private final static int OPEN = READABLE | WRITABLE;
+ private final static int EOF = 4;
+
+ /* The status of the stream. A stream starts its live in the
+ * "CLOSED" state.
+ *
+ * It will be opened for input / output by another libvirt
+ * operation (e.g. virStorageVolDownload), which means it will
+ * be in state "READABLE" or "WRITABLE", exclusively.
+ *
+ * It will reach state "EOF", if {@link finish()} is called.
+ *
+ * It will be in the "CLOSED" state again, after calling abort()
+ * or close().
+ */
+ private int state = CLOSED;
+
+ void markReadable() {
+ assert !isWritable()
+ : "A Stream cannot be readable and writable at the same time";
+
+ state |= READABLE;
+ }
+
+ void markWritable() {
+ assert !isReadable()
+ : "A Stream cannot be readable and writable at the same time";
+
+ state |= WRITABLE;
+ }
+
+ boolean isReadable() {
+ return (state & READABLE) != 0;
+ }
+
+ boolean isWritable() {
+ return (state & WRITABLE) != 0;
+ }
+
+ protected boolean isEOF() {
+ return (state & EOF) != 0;
+ }
+
+ private void markEOF() {
+ state |= EOF;
+ }
+
Stream(Connect virConnect, StreamPointer VSP) {
this.virConnect = virConnect;
this.VSP = VSP;
@@ -32,7 +116,9 @@ public class Stream {
* @return <em>ignore</em> (always 0)
*/
public int abort() throws LibvirtException {
- return processError(libvirt.virStreamAbort(VSP));
+ int returnValue = processError(libvirt.virStreamAbort(VSP));
+ this.state = CLOSED;
+ return returnValue;
}
/**
@@ -66,7 +152,9 @@ public class Stream {
* @throws LibvirtException
*/
public int finish() throws LibvirtException {
- return processError(libvirt.virStreamFinish(VSP));
+ int returnValue = processError(libvirt.virStreamFinish(VSP));
+ markEOF();
+ return returnValue;
}
/**
@@ -79,7 +167,8 @@ public class Stream {
public int free() throws LibvirtException {
int success = 0;
if (VSP != null) {
- processError(libvirt.virStreamFree(VSP));
+ closeStream();
+ success = processError(libvirt.virStreamFree(VSP));
VSP = null;
}
@@ -95,7 +184,80 @@ public class Stream {
* @throws LibvirtException
*/
public int receive(byte[] data) throws LibvirtException {
- return processError(libvirt.virStreamRecv(VSP, data, new SizeT(data.length)));
+ return receive(ByteBuffer.wrap(data));
+ }
+
+ protected int receive(ByteBuffer buffer) throws LibvirtException {
+ int returnValue = processError(libvirt.virStreamRecv(VSP, buffer, new
SizeT(buffer.remaining())));
+ buffer.position(buffer.position() + returnValue);
+ return returnValue;
+ }
+
+ @Override
+ public int read(ByteBuffer buffer) throws IOException {
+ if (!isOpen()) throw new ClosedChannelException();
+ if (!isReadable()) throw new NonReadableChannelException();
+ if (isEOF()) return -1;
+
+ try {
+ int ret = receive(buffer);
+
+ switch (ret) {
+ case 0:
+ finish();
+ return -1;
+
+ case -2:
+ throw new UnsupportedOperationException("non-blocking I/O stream not
yet supported");
+
+ default:
+ return ret;
+ }
+ } catch (LibvirtException e) {
+ throw new IOException("could not read from stream", e);
+ }
+ }
+
+ @Override
+ public int write(ByteBuffer buffer) throws IOException {
+ if (!isOpen()) throw new ClosedChannelException();
+ if (!isWritable()) throw new NonWritableChannelException();
+
+ int pos = buffer.position();
+
+ try {
+ while (buffer.hasRemaining()) {
+ int ret = send(buffer);
+
+ if (ret == -2)
+ throw new UnsupportedOperationException("non-blocking I/O stream
not yet supported");
+ }
+ return buffer.position() - pos;
+ } catch (LibvirtException e) {
+ throw new IOException("could not write to stream", e);
+ }
+ }
+
+ protected void closeStream() throws LibvirtException {
+ if (isOpen() && !isEOF()) {
+ if (isWritable()) finish();
+ else if (isReadable()) abort();
+ }
+ this.state = CLOSED;
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ closeStream();
+ } catch (LibvirtException e) {
+ throw new IOException("error while closing Stream", e);
+ }
+ }
+
+ @Override
+ public boolean isOpen() {
+ return (this.state & OPEN) != 0;
}
/**
@@ -131,8 +293,15 @@ public class Stream {
* full
* @throws LibvirtException
*/
- public int send(String data) throws LibvirtException {
- return processError(libvirt.virStreamSend(VSP, data, new SizeT(data.length())));
+ public int send(byte[] data) throws LibvirtException {
+ return send(ByteBuffer.wrap(data));
+ }
+
+ protected int send(ByteBuffer buffer) throws LibvirtException {
+ SizeT size = new SizeT(buffer.remaining());
+ int returnValue = processError(libvirt.virStreamSend(VSP, buffer, size));
+ buffer.position(buffer.position() + returnValue);
+ return returnValue;
}
/**
diff --git a/src/main/java/org/libvirt/jna/Libvirt.java
b/src/main/java/org/libvirt/jna/Libvirt.java
index c8735d2..c383ba6 100644
--- a/src/main/java/org/libvirt/jna/Libvirt.java
+++ b/src/main/java/org/libvirt/jna/Libvirt.java
@@ -440,9 +440,9 @@ public interface Libvirt extends Library {
int virStreamFinish(StreamPointer virStreamPtr) ;
int virStreamFree(StreamPointer virStreamPtr) ;
StreamPointer virStreamNew(ConnectionPointer virConnectPtr, int flags) ;
- int virStreamSend(StreamPointer virStreamPtr, String data, SizeT size);
+ int virStreamSend(StreamPointer virStreamPtr, ByteBuffer data, SizeT size);
int virStreamSendAll(StreamPointer virStreamPtr, Libvirt.VirStreamSourceFunc handler,
Pointer opaque);
- int virStreamRecv(StreamPointer virStreamPtr, byte[] data, SizeT length);
+ int virStreamRecv(StreamPointer virStreamPtr, ByteBuffer data, SizeT length);
int virStreamRecvAll(StreamPointer virStreamPtr, Libvirt.VirStreamSinkFunc handler,
Pointer opaque);
//DomainSnapshot Methods
--
1.7.9.5