* include/libvirt/libvirt.h, include/libvirt/libvirt.h.in: Public
API contract for virStreamPtr object
* src/libvirt_public.syms: Export data stream APIs
* src/libvirt_private.syms: Export internal helper APIs
* src/libvirt.c: Data stream API driver dispatch
* src/datatypes.h, src/datatypes.c: Internal helpers for virStreamPtr
object
* src/driver.h: Define internal driver API for streams
* .x-sc_avoid_write: Ignore src/libvirt.c because it trips
up on comments including write()
---
.x-sc_avoid_write | 1 +
include/libvirt/libvirt.h | 93 ++++++
include/libvirt/libvirt.h.in | 93 ++++++
src/datatypes.c | 59 ++++
src/datatypes.h | 33 ++
src/driver.h | 34 ++
src/libvirt.c | 683 ++++++++++++++++++++++++++++++++++++++++++
src/libvirt_private.syms | 2 +
src/libvirt_public.syms | 15 +
9 files changed, 1013 insertions(+), 0 deletions(-)
diff --git a/.x-sc_avoid_write b/.x-sc_avoid_write
index 8ed87c5..c5a7535 100644
--- a/.x-sc_avoid_write
+++ b/.x-sc_avoid_write
@@ -1,5 +1,6 @@
^src/util\.c$
^src/xend_internal\.c$
^src/util-lib\.c$
+^src/libvirt\.c$
^qemud/qemud.c$
^gnulib/
diff --git a/include/libvirt/libvirt.h b/include/libvirt/libvirt.h
index 855f755..5dcecfd 100644
--- a/include/libvirt/libvirt.h
+++ b/include/libvirt/libvirt.h
@@ -110,6 +110,24 @@ typedef enum {
VIR_DOMAIN_NONE = 0
} virDomainCreateFlags;
+
+
+/**
+ * virStream:
+ *
+ * a virStream is a private structure representing a data stream.
+ */
+typedef struct _virStream virStream;
+
+/**
+ * virStreamPtr:
+ *
+ * a virStreamPtr is pointer to a virStream private structure, this is the
+ * type used to reference a data stream in the API.
+ */
+typedef virStream *virStreamPtr;
+
+
/**
* VIR_SECURITY_LABEL_BUFLEN:
*
@@ -1448,6 +1466,81 @@ void virEventRegisterImpl(virEventAddHandleFunc addHandle,
virEventAddTimeoutFunc addTimeout,
virEventUpdateTimeoutFunc updateTimeout,
virEventRemoveTimeoutFunc removeTimeout);
+
+enum {
+ VIR_STREAM_NONBLOCK = (1 << 0),
+};
+
+virStreamPtr virStreamNew(virConnectPtr conn,
+ unsigned int flags);
+int virStreamRef(virStreamPtr st);
+
+int virStreamSend(virStreamPtr st,
+ const char *data,
+ size_t nbytes);
+
+int virStreamRecv(virStreamPtr st,
+ char *data,
+ size_t nbytes);
+
+
+typedef int (*virStreamSourceFunc)(virStreamPtr st,
+ char *data,
+ size_t nbytes,
+ void *opaque);
+
+int virStreamSendAll(virStreamPtr st,
+ virStreamSourceFunc handler,
+ void *opaque);
+
+typedef int (*virStreamSinkFunc)(virStreamPtr st,
+ const char *data,
+ size_t nbytes,
+ void *opaque);
+
+int virStreamRecvAll(virStreamPtr st,
+ virStreamSinkFunc handler,
+ void *opaque);
+
+typedef enum {
+ VIR_STREAM_EVENT_READABLE = (1 << 0),
+ VIR_STREAM_EVENT_WRITABLE = (1 << 1),
+ VIR_STREAM_EVENT_ERROR = (1 << 2),
+ VIR_STREAM_EVENT_HANGUP = (1 << 3),
+} virStreamEventType;
+
+
+/**
+ * virStreamEventCallback:
+ *
+ * @stream: stream on which the event occurred
+ * @events: bitset of events from virEventHandleType constants
+ * @opaque: user data registered with handle
+ *
+ * Callback for receiving stream events. The callback will
+ * be invoked once for each event which is pending.
+ */
+typedef void (*virStreamEventCallback)(virStreamPtr stream, int events, void *opaque);
+
+int virStreamEventAddCallback(virStreamPtr stream,
+ int events,
+ virStreamEventCallback cb,
+ void *opaque,
+ virFreeCallback ff);
+
+int virStreamEventUpdateCallback(virStreamPtr stream,
+ int events);
+
+int virStreamEventRemoveCallback(virStreamPtr stream);
+
+
+int virStreamFinish(virStreamPtr st);
+int virStreamAbort(virStreamPtr st);
+
+int virStreamFree(virStreamPtr st);
+
+
+
#ifdef __cplusplus
}
#endif
diff --git a/include/libvirt/libvirt.h.in b/include/libvirt/libvirt.h.in
index e6536c7..db091dc 100644
--- a/include/libvirt/libvirt.h.in
+++ b/include/libvirt/libvirt.h.in
@@ -110,6 +110,24 @@ typedef enum {
VIR_DOMAIN_NONE = 0
} virDomainCreateFlags;
+
+
+/**
+ * virStream:
+ *
+ * a virStream is a private structure representing a data stream.
+ */
+typedef struct _virStream virStream;
+
+/**
+ * virStreamPtr:
+ *
+ * a virStreamPtr is pointer to a virStream private structure, this is the
+ * type used to reference a data stream in the API.
+ */
+typedef virStream *virStreamPtr;
+
+
/**
* VIR_SECURITY_LABEL_BUFLEN:
*
@@ -1448,6 +1466,81 @@ void virEventRegisterImpl(virEventAddHandleFunc addHandle,
virEventAddTimeoutFunc addTimeout,
virEventUpdateTimeoutFunc updateTimeout,
virEventRemoveTimeoutFunc removeTimeout);
+
+enum {
+ VIR_STREAM_NONBLOCK = (1 << 0),
+};
+
+virStreamPtr virStreamNew(virConnectPtr conn,
+ unsigned int flags);
+int virStreamRef(virStreamPtr st);
+
+int virStreamSend(virStreamPtr st,
+ const char *data,
+ size_t nbytes);
+
+int virStreamRecv(virStreamPtr st,
+ char *data,
+ size_t nbytes);
+
+
+typedef int (*virStreamSourceFunc)(virStreamPtr st,
+ char *data,
+ size_t nbytes,
+ void *opaque);
+
+int virStreamSendAll(virStreamPtr st,
+ virStreamSourceFunc handler,
+ void *opaque);
+
+typedef int (*virStreamSinkFunc)(virStreamPtr st,
+ const char *data,
+ size_t nbytes,
+ void *opaque);
+
+int virStreamRecvAll(virStreamPtr st,
+ virStreamSinkFunc handler,
+ void *opaque);
+
+typedef enum {
+ VIR_STREAM_EVENT_READABLE = (1 << 0),
+ VIR_STREAM_EVENT_WRITABLE = (1 << 1),
+ VIR_STREAM_EVENT_ERROR = (1 << 2),
+ VIR_STREAM_EVENT_HANGUP = (1 << 3),
+} virStreamEventType;
+
+
+/**
+ * virStreamEventCallback:
+ *
+ * @stream: stream on which the event occurred
+ * @events: bitset of events from virEventHandleType constants
+ * @opaque: user data registered with handle
+ *
+ * Callback for receiving stream events. The callback will
+ * be invoked once for each event which is pending.
+ */
+typedef void (*virStreamEventCallback)(virStreamPtr stream, int events, void *opaque);
+
+int virStreamEventAddCallback(virStreamPtr stream,
+ int events,
+ virStreamEventCallback cb,
+ void *opaque,
+ virFreeCallback ff);
+
+int virStreamEventUpdateCallback(virStreamPtr stream,
+ int events);
+
+int virStreamEventRemoveCallback(virStreamPtr stream);
+
+
+int virStreamFinish(virStreamPtr st);
+int virStreamAbort(virStreamPtr st);
+
+int virStreamFree(virStreamPtr st);
+
+
+
#ifdef __cplusplus
}
#endif
diff --git a/src/datatypes.c b/src/datatypes.c
index d03a679..3611b62 100644
--- a/src/datatypes.c
+++ b/src/datatypes.c
@@ -1129,3 +1129,62 @@ virUnrefNodeDevice(virNodeDevicePtr dev) {
virMutexUnlock(&dev->conn->lock);
return (refs);
}
+
+
+virStreamPtr virGetStream(virConnectPtr conn) {
+ virStreamPtr ret = NULL;
+
+ virMutexLock(&conn->lock);
+
+ if (VIR_ALLOC(ret) < 0) {
+ virReportOOMError(conn);
+ goto error;
+ }
+ ret->magic = VIR_STREAM_MAGIC;
+ ret->conn = conn;
+ conn->refs++;
+ ret->refs++;
+ virMutexUnlock(&conn->lock);
+ return(ret);
+
+error:
+ virMutexUnlock(&conn->lock);
+ VIR_FREE(ret);
+ return(NULL);
+}
+
+static void
+virReleaseStream(virStreamPtr st) {
+ virConnectPtr conn = st->conn;
+ DEBUG("release dev %p", st);
+
+ st->magic = -1;
+ VIR_FREE(st);
+
+ DEBUG("unref connection %p %d", conn, conn->refs);
+ conn->refs--;
+ if (conn->refs == 0) {
+ virReleaseConnect(conn);
+ /* Already unlocked mutex */
+ return;
+ }
+
+ virMutexUnlock(&conn->lock);
+}
+
+int virUnrefStream(virStreamPtr st) {
+ int refs;
+
+ virMutexLock(&st->conn->lock);
+ DEBUG("unref stream %p %d", st, st->refs);
+ st->refs--;
+ refs = st->refs;
+ if (refs == 0) {
+ virReleaseStream(st);
+ /* Already unlocked mutex */
+ return (0);
+ }
+
+ virMutexUnlock(&st->conn->lock);
+ return (refs);
+}
diff --git a/src/datatypes.h b/src/datatypes.h
index da83e02..0fed07f 100644
--- a/src/datatypes.h
+++ b/src/datatypes.h
@@ -100,6 +100,17 @@
/**
+ * VIR_STREAM_MAGIC:
+ *
+ * magic value used to protect the API when pointers to stream structures
+ * are passed down by the users.
+ */
+#define VIR_STREAM_MAGIC 0x1DEAD666
+#define VIR_IS_STREAM(obj) ((obj) &&
(obj)->magic==VIR_STREAM_MAGIC)
+#define VIR_IS_CONNECTED_STREAM(obj) (VIR_IS_STREAM(obj) &&
VIR_IS_CONNECT((obj)->conn))
+
+
+/**
* _virConnect:
*
* Internal structure associated to a connection
@@ -234,6 +245,25 @@ struct _virNodeDevice {
};
+typedef int (*virStreamAbortFunc)(virStreamPtr, void *opaque);
+typedef int (*virStreamFinishFunc)(virStreamPtr, void *opaque);
+
+/**
+ * _virStream:
+ *
+ * Internal structure associated with an input stream
+ */
+struct _virStream {
+ unsigned int magic;
+ virConnectPtr conn;
+ int refs;
+ int flags;
+
+ virStreamDriverPtr driver;
+ void *privateData;
+};
+
+
/************************************************************************
* *
* API for domain/connections (de)allocations and lookups *
@@ -270,4 +300,7 @@ virNodeDevicePtr virGetNodeDevice(virConnectPtr conn,
const char *name);
int virUnrefNodeDevice(virNodeDevicePtr dev);
+virStreamPtr virGetStream(virConnectPtr conn);
+int virUnrefStream(virStreamPtr st);
+
#endif
diff --git a/src/driver.h b/src/driver.h
index 79d46ff..25d34b6 100644
--- a/src/driver.h
+++ b/src/driver.h
@@ -799,6 +799,40 @@ struct _virDeviceMonitor {
virDrvNodeDeviceDestroy deviceDestroy;
};
+typedef struct _virStreamDriver virStreamDriver;
+typedef virStreamDriver *virStreamDriverPtr;
+
+typedef int (*virDrvStreamSend)(virStreamPtr st,
+ const char *data,
+ size_t nbytes);
+typedef int (*virDrvStreamRecv)(virStreamPtr st,
+ char *data,
+ size_t nbytes);
+
+typedef int (*virDrvStreamEventAddCallback)(virStreamPtr stream,
+ int events,
+ virStreamEventCallback cb,
+ void *opaque,
+ virFreeCallback ff);
+
+typedef int (*virDrvStreamEventUpdateCallback)(virStreamPtr stream,
+ int events);
+typedef int (*virDrvStreamEventRemoveCallback)(virStreamPtr stream);
+typedef int (*virDrvStreamFinish)(virStreamPtr st);
+typedef int (*virDrvStreamAbort)(virStreamPtr st);
+
+
+struct _virStreamDriver {
+ virDrvStreamSend streamSend;
+ virDrvStreamRecv streamRecv;
+ virDrvStreamEventAddCallback streamAddCallback;
+ virDrvStreamEventUpdateCallback streamUpdateCallback;
+ virDrvStreamEventRemoveCallback streamRemoveCallback;
+ virDrvStreamFinish streamFinish;
+ virDrvStreamAbort streamAbort;
+};
+
+
/*
* Registration
* TODO: also need ways to (des)activate a given driver
diff --git a/src/libvirt.c b/src/libvirt.c
index ca8e003..d6536f4 100644
--- a/src/libvirt.c
+++ b/src/libvirt.c
@@ -559,6 +559,10 @@ virLibNodeDeviceError(virNodeDevicePtr dev, virErrorNumber error,
errmsg, info, NULL, 0, 0, errmsg, info);
}
+#define virLibStreamError(conn, code, fmt...) \
+ virReportErrorHelper(conn, VIR_FROM_NONE, code, __FILE__, \
+ __FUNCTION__, __LINE__, fmt)
+
/**
* virRegisterNetworkDriver:
* @driver: pointer to a network driver block
@@ -8626,3 +8630,682 @@ error:
virSetConnError(conn);
return -1;
}
+
+
+/**
+ * virStreamNew:
+ * @conn: pointer to the connection
+ * @flags: control features of the stream
+ *
+ * Creates a new stream object which can be used to perform
+ * streamed I/O with other public API function.
+ *
+ * When no longer needed, a stream object must be released
+ * with virStreamFree. If a data stream has been used,
+ * then the application must call virStreamFinish or
+ * virStreamAbort before free'ing to, in order to notify
+ * the driver of termination.
+ *
+ * If a non-blocking data stream is required passed
+ * VIR_STREAM_NONBLOCK for flags, otherwise pass 0.
+ *
+ * Returns the new stream, or NULL upon error
+ */
+virStreamPtr
+virStreamNew(virConnectPtr conn,
+ unsigned int flags)
+{
+ virStreamPtr st;
+
+ DEBUG("conn=%p, flags=%u", conn, flags);
+
+ virResetLastError();
+
+ if (!VIR_IS_CONNECT(conn)) {
+ virLibConnError(NULL, VIR_ERR_INVALID_CONN, __FUNCTION__);
+ return (NULL);
+ }
+
+ st = virGetStream(conn);
+ if (st)
+ st->flags = flags;
+
+ return st;
+}
+
+
+/**
+ * virStreamRef:
+ * @stream: pointer to the stream
+ *
+ * Increment the reference count on the stream. For each
+ * additional call to this method, there shall be a corresponding
+ * call to virStreamFree to release the reference count, once
+ * the caller no longer needs the reference to this object.
+ *
+ * Returns 0 in case of success, -1 in case of failure
+ */
+int
+virStreamRef(virStreamPtr stream)
+{
+ if ((!VIR_IS_CONNECTED_STREAM(stream))) {
+ virLibConnError(NULL, VIR_ERR_INVALID_ARG, __FUNCTION__);
+ return(-1);
+ }
+ virMutexLock(&stream->conn->lock);
+ DEBUG("stream=%p refs=%d", stream, stream->refs);
+ stream->refs++;
+ virMutexUnlock(&stream->conn->lock);
+ return 0;
+}
+
+
+/**
+ * virStreamSend:
+ * @stream: pointer to the stream object
+ * @data: buffer to write to stream
+ * @nbytes: size of @data buffer
+ *
+ * Write a series of bytes to the stream. This method may
+ * block the calling application for an arbitrary amount
+ * of time. Once an application has finished sending data
+ * it should call virStreamFinish to wait for succesful
+ * confirmation from the driver, or detect any error
+ *
+ * This method may not be used if a stream source has been
+ * registered
+ *
+ * Errors are not guaranteed to be reported synchronously
+ * with the call, but may instead be delayed until a
+ * subsequent call.
+ *
+ * A example using this with a hypothetical file upload
+ * API looks like
+ *
+ * virStreamPtr st = virStreamNew(conn, 0);
+ * int fd = open("demo.iso", O_RDONLY)
+ *
+ * virConnectUploadFile(conn, "demo.iso", st);
+ *
+ * while (1) {
+ * char buf[1024];
+ * int got = read(fd, buf, 1024);
+ * if (got < 0) {
+ * virStreamAbort(st);
+ * break;
+ * }
+ * if (got == 0) {
+ * virStreamFinish(st);
+ * break;
+ * }
+ * int offset = 0;
+ * while (offset < got) {
+ * int sent = virStreamSend(st, buf+offset, got-offset)
+ * if (sent < 0) {
+ * virStreamAbort(st);
+ * goto done;
+ * }
+ * offset += sent;
+ * }
+ * }
+ * done:
+ * virStreamFree(st);
+ * close(fd);
+ *
+ * Returns the number of bytes written, which may be less
+ * than requested.
+ *
+ * Returns -1 upon error, at which time the stream will
+ * be marked as aborted, and the caller should now release
+ * the stream with virStreamFree.
+ *
+ * Returns -2 if the outgoing transmit buffers are full &
+ * the stream is marked as non-blocking.
+ */
+int virStreamSend(virStreamPtr stream,
+ const char *data,
+ size_t nbytes)
+{
+ DEBUG("stream=%p, data=%p, nbytes=%zi", stream, data, nbytes);
+
+ virResetLastError();
+
+ if (!VIR_IS_CONNECTED_STREAM(stream)) {
+ virLibConnError(NULL, VIR_ERR_INVALID_CONN, __FUNCTION__);
+ return (-1);
+ }
+
+ if (stream->driver &&
+ stream->driver->streamSend) {
+ int ret;
+ ret = (stream->driver->streamSend)(stream, data, nbytes);
+ if (ret == -2)
+ return -2;
+
+ if (ret < 0)
+ goto error;
+ return ret;
+ }
+
+ virLibConnError(stream->conn, VIR_ERR_NO_SUPPORT, __FUNCTION__);
+error:
+ /* Copy to connection error object for back compatability */
+ virSetConnError(stream->conn);
+ return -1;
+}
+
+/**
+ * virStreamRecv:
+ * @stream: pointer to the stream object
+ * @data: buffer to write to stream
+ * @nbytes: size of @data buffer
+ *
+ * Write a series of bytes to the stream. This method may
+ * block the calling application for an arbitrary amount
+ * of time.
+ *
+ * Errors are not guaranteed to be reported synchronously
+ * with the call, but may instead be delayed until a
+ * subsequent call.
+ *
+ * A example using this with a hypothetical file download
+ * API looks like
+ *
+ * virStreamPtr st = virStreamNew(conn, 0);
+ * int fd = open("demo.iso", O_WRONLY, 0600)
+ *
+ * virConnectDownloadFile(conn, "demo.iso", st);
+ *
+ * while (1) {
+ * char buf[1024];
+ * int got = virStreamRecv(st, buf, 1024);
+ * if (got < 0)
+ * break;
+ * if (got == 0) {
+ * virStreamFinish(st);
+ * break;
+ * }
+ * int offset = 0;
+ * while (offset < got) {
+ * int sent = write(fd, buf+offset, got-offset)
+ * if (sent < 0) {
+ * virStreamAbort(st);
+ * goto done;
+ * }
+ * offset += sent;
+ * }
+ * }
+ * done:
+ * virStreamFree(st);
+ * close(fd);
+ *
+ *
+ * Returns the number of bytes read, which may be less
+ * than requested.
+ *
+ * Returns 0 when the end of the stream is reached, at
+ * which time the caller should invoke virStreamFinish()
+ * to get confirmation of stream completion.
+ *
+ * Returns -1 upon error, at which time the stream will
+ * be marked as aborted, and the caller should now release
+ * the stream with virStreamFree.
+ *
+ * Returns -2 if there is no data pending to be read & the
+ * stream is marked as non-blocking.
+ */
+int virStreamRecv(virStreamPtr stream,
+ char *data,
+ size_t nbytes)
+{
+ DEBUG("stream=%p, data=%p, nbytes=%zi", stream, data, nbytes);
+
+ virResetLastError();
+
+ if (!VIR_IS_CONNECTED_STREAM(stream)) {
+ virLibConnError(NULL, VIR_ERR_INVALID_CONN, __FUNCTION__);
+ return (-1);
+ }
+
+ if (stream->driver &&
+ stream->driver->streamRecv) {
+ int ret;
+ ret = (stream->driver->streamRecv)(stream, data, nbytes);
+ if (ret == -2)
+ return -2;
+
+ if (ret < 0)
+ goto error;
+ return ret;
+ }
+
+ virLibConnError(stream->conn, VIR_ERR_NO_SUPPORT, __FUNCTION__);
+error:
+ /* Copy to connection error object for back compatability */
+ virSetConnError(stream->conn);
+ return -1;
+}
+
+
+/**
+ * virStreamSendAll:
+ * @stream: pointer to the stream object
+ * @handler: source callback for reading data from application
+ * @opaque: application defined data
+ *
+ * Send the entire data stream, reading the data from the
+ * requested data source. This is simply a convenient alternative
+ * to virStreamSend, for apps that do blocking-I/o.
+ *
+ * A example using this with a hypothetical file upload
+ * API looks like
+ *
+ * int mysource(virStreamPtr st, char *buf, int nbytes, void *opaque) {
+ * int *fd = opaque;
+ *
+ * return read(*fd, buf, nbytes);
+ * }
+ *
+ * virStreamPtr st = virStreamNew(conn, 0);
+ * int fd = open("demo.iso", O_RDONLY)
+ *
+ * virConnectUploadFile(conn, st);
+ * virStreamSendAll(st, mysource, &fd);
+ * virStreamFree(st);
+ * close(fd);
+ *
+ * Returns 0 if all the data was succesfully sent. The stream
+ * will be marked as finished on success, so the caller need
+ * only call virStreamFree().
+ *
+ * Returns -1 upon any error, with the stream being marked as
+ * aborted, so the caller need only call virStreamFree()
+ */
+int virStreamSendAll(virStreamPtr stream,
+ virStreamSourceFunc handler,
+ void *opaque)
+{
+ char *bytes = NULL;
+ int want = 1024*64;
+ int ret = -1;
+ DEBUG("stream=%p, handler=%p, opaque=%p", stream, handler, opaque);
+
+ virResetLastError();
+
+ if (!VIR_IS_CONNECTED_STREAM(stream)) {
+ virLibConnError(NULL, VIR_ERR_INVALID_CONN, __FUNCTION__);
+ return (-1);
+ }
+
+ if (stream->flags & VIR_STREAM_NONBLOCK) {
+ virLibConnError(NULL, VIR_ERR_OPERATION_INVALID,
+ _("data sources cannot be used for non-blocking
streams"));
+ goto cleanup;
+ }
+
+ if (VIR_ALLOC_N(bytes, want) < 0) {
+ virReportOOMError(stream->conn);
+ goto cleanup;
+ }
+
+ for (;;) {
+ int got, offset = 0;
+ got = (handler)(stream, bytes, want, opaque);
+ if (got < 0) {
+ virStreamAbort(stream);
+ goto cleanup;
+ }
+ if (got == 0)
+ break;
+ while (offset < got) {
+ int done;
+ done = virStreamSend(stream, bytes + offset, got - offset);
+ if (done < 0)
+ goto cleanup;
+ offset += done;
+ }
+ }
+ ret = 0;
+
+cleanup:
+ VIR_FREE(bytes);
+
+ /* Copy to connection error object for back compatability */
+ if (ret != 0)
+ virSetConnError(stream->conn);
+
+ return ret;
+}
+
+
+/**
+ * virStreamRecvAll:
+ * @stream: pointer to the stream object
+ * @handler: sink callback for writing data to application
+ * @opaque: application defined data
+ *
+ * Receive the entire data stream, sending the data to the
+ * requested data sink. This is simply a convenient alternative
+ * to virStreamRecv, for apps that do blocking-I/o.
+ *
+ * A example using this with a hypothetical file download
+ * API looks like
+ *
+ * int mysink(virStreamPtr st, const char *buf, int nbytes, void *opaque) {
+ * int *fd = opaque;
+ *
+ * return write(*fd, buf, nbytes);
+ * }
+ *
+ * virStreamPtr st = virStreamNew(conn, 0);
+ * int fd = open("demo.iso", O_WRONLY)
+ *
+ * virConnectUploadFile(conn, st);
+ * virStreamRecvAll(st, mysink, &fd);
+ * virStreamFree(st);
+ * close(fd);
+ *
+ * Returns 0 if all the data was succesfully received. The stream
+ * will be marked as finished on success, so the caller need
+ * only call virStreamFree().
+ *
+ * Returns -1 upon any error, with the stream being marked as
+ * aborted, so the caller need only call virStreamFree()
+ */
+int virStreamRecvAll(virStreamPtr stream,
+ virStreamSinkFunc handler,
+ void *opaque)
+{
+ char *bytes = NULL;
+ int want = 1024*64;
+ int ret = -1;
+ DEBUG("stream=%p, handler=%p, opaque=%p", stream, handler, opaque);
+
+ virResetLastError();
+
+ if (!VIR_IS_CONNECTED_STREAM(stream)) {
+ virLibConnError(NULL, VIR_ERR_INVALID_CONN, __FUNCTION__);
+ return (-1);
+ }
+
+ if (stream->flags & VIR_STREAM_NONBLOCK) {
+ virLibConnError(NULL, VIR_ERR_OPERATION_INVALID,
+ _("data sinks cannot be used for non-blocking
streams"));
+ goto cleanup;
+ }
+
+
+ if (VIR_ALLOC_N(bytes, want) < 0) {
+ virReportOOMError(stream->conn);
+ goto cleanup;
+ }
+
+ for (;;) {
+ int got, offset = 0;
+ got = virStreamRecv(stream, bytes, want);
+ if (got < 0)
+ goto cleanup;
+ if (got == 0)
+ break;
+ while (offset < got) {
+ int done;
+ done = (handler)(stream, bytes + offset, got - offset, opaque);
+ if (done < 0) {
+ virStreamAbort(stream);
+ goto cleanup;
+ }
+ offset += done;
+ }
+ }
+ ret = 0;
+
+cleanup:
+ VIR_FREE(bytes);
+
+ /* Copy to connection error object for back compatability */
+ if (ret != 0)
+ virSetConnError(stream->conn);
+
+ return ret;
+}
+
+
+/**
+ * virStreamEventAddCallback
+ * @stream: pointer to the stream object
+ * @events: set of events to monitor
+ * @cb: callback to invoke when an event occurs
+ * @opaque: application defined data
+ * @ff: callback to free @opaque data
+ *
+ * Register a callback to be notified when a stream
+ * becomes writable, or readable. This is most commonly
+ * used in conjunction with non-blocking data streams
+ * to integrate into an event loop
+ *
+ * Return 0 on success, -1 upon error
+ */
+int virStreamEventAddCallback(virStreamPtr stream,
+ int events,
+ virStreamEventCallback cb,
+ void *opaque,
+ virFreeCallback ff)
+{
+ DEBUG("stream=%p, events=%d, cb=%p, opaque=%p, ff=%p", stream, events, cb,
opaque, ff);
+
+ virResetLastError();
+
+ if (!VIR_IS_CONNECTED_STREAM(stream)) {
+ virLibConnError(NULL, VIR_ERR_INVALID_CONN, __FUNCTION__);
+ return (-1);
+ }
+
+ if (stream->driver &&
+ stream->driver->streamAddCallback) {
+ int ret;
+ ret = (stream->driver->streamAddCallback)(stream, events, cb, opaque, ff);
+ if (ret < 0)
+ goto error;
+ return ret;
+ }
+
+ virLibConnError(stream->conn, VIR_ERR_NO_SUPPORT, __FUNCTION__);
+error:
+ /* Copy to connection error object for back compatability */
+ virSetConnError(stream->conn);
+ return -1;
+}
+
+
+/**
+ * virStreamEventUpdateCallback
+ * @stream: pointer to the stream object
+ * @events: set of events to monitor
+ *
+ * Changes the set of events to monitor for a stream. This allows
+ * for event notification to be changed without having to
+ * unregister & register the callback completely. This method
+ * is guarenteed to succeed if a callback is already registered
+ *
+ * Returns 0 on success, -1 if no callback is registered
+ */
+int virStreamEventUpdateCallback(virStreamPtr stream,
+ int events)
+{
+ DEBUG("stream=%p, events=%d", stream, events);
+
+ virResetLastError();
+
+ if (!VIR_IS_CONNECTED_STREAM(stream)) {
+ virLibConnError(NULL, VIR_ERR_INVALID_CONN, __FUNCTION__);
+ return (-1);
+ }
+
+ if (stream->driver &&
+ stream->driver->streamUpdateCallback) {
+ int ret;
+ ret = (stream->driver->streamUpdateCallback)(stream, events);
+ if (ret < 0)
+ goto error;
+ return ret;
+ }
+
+ virLibConnError(stream->conn, VIR_ERR_NO_SUPPORT, __FUNCTION__);
+error:
+ /* Copy to connection error object for back compatability */
+ virSetConnError(stream->conn);
+ return -1;
+}
+
+
+/**
+ * virStreamEventRemoveCallback
+ * @stream: pointer to the stream object
+ *
+ * Remove a event callback from the stream
+ *
+ * Return 0 on success, -1 on error
+ */
+int virStreamEventRemoveCallback(virStreamPtr stream)
+{
+ DEBUG("stream=%p", stream);
+
+ virResetLastError();
+
+ if (!VIR_IS_CONNECTED_STREAM(stream)) {
+ virLibConnError(NULL, VIR_ERR_INVALID_CONN, __FUNCTION__);
+ return (-1);
+ }
+
+ if (stream->driver &&
+ stream->driver->streamRemoveCallback) {
+ int ret;
+ ret = (stream->driver->streamRemoveCallback)(stream);
+ if (ret < 0)
+ goto error;
+ return ret;
+ }
+
+ virLibConnError(stream->conn, VIR_ERR_NO_SUPPORT, __FUNCTION__);
+error:
+ /* Copy to connection error object for back compatability */
+ virSetConnError(stream->conn);
+ return -1;
+}
+
+
+/**
+ * virStreamFinish:
+ * @stream: pointer to the stream object
+ *
+ * Indicate that there is no further data is to be transmitted
+ * on the stream. For output streams this should be called once
+ * all data has been written. For input streams this should be
+ * called once virStreamRecv returns end-of-file.
+ *
+ * This method is a synchronization point for all asynchronous
+ * errors, so if this returns a success code the application can
+ * be sure that all data has been successfully processed.
+ *
+ * Returns 0 on success, -1 upon error
+ */
+int virStreamFinish(virStreamPtr stream)
+{
+ DEBUG("stream=%p", stream);
+
+ virResetLastError();
+
+ if (!VIR_IS_CONNECTED_STREAM(stream)) {
+ virLibConnError(NULL, VIR_ERR_INVALID_CONN, __FUNCTION__);
+ return (-1);
+ }
+
+ if (stream->driver &&
+ stream->driver->streamFinish) {
+ int ret;
+ ret = (stream->driver->streamFinish)(stream);
+ if (ret < 0)
+ goto error;
+ return ret;
+ }
+
+ virLibConnError(stream->conn, VIR_ERR_NO_SUPPORT, __FUNCTION__);
+error:
+ /* Copy to connection error object for back compatability */
+ virSetConnError(stream->conn);
+ return -1;
+}
+
+
+/**
+ * virStreamAbort:
+ * @stream: pointer to the stream object
+ *
+ * Request that the in progress data transfer be cancelled
+ * abnormally before the end of the stream has been reached.
+ * For output streams this can be used to inform the driver
+ * that the stream is being terminated early. For input
+ * streams this can be used to inform the driver that it
+ * should stop sending data.
+ *
+ * Returns 0 on success, -1 upon error
+ */
+int virStreamAbort(virStreamPtr stream)
+{
+ DEBUG("stream=%p", stream);
+
+ virResetLastError();
+
+ if (!VIR_IS_CONNECTED_STREAM(stream)) {
+ virLibConnError(NULL, VIR_ERR_INVALID_CONN, __FUNCTION__);
+ return (-1);
+ }
+
+ if (stream->driver &&
+ stream->driver->streamAbort) {
+ int ret;
+ ret = (stream->driver->streamAbort)(stream);
+ if (ret < 0)
+ goto error;
+ return ret;
+ }
+
+ virLibConnError(stream->conn, VIR_ERR_NO_SUPPORT, __FUNCTION__);
+error:
+ /* Copy to connection error object for back compatability */
+ virSetConnError(stream->conn);
+ return -1;
+}
+
+
+/**
+ * virStreamFree:
+ * @stream: pointer to the stream object
+ *
+ * Decrement the reference count on a stream, releasing
+ * the stream object if the reference count has hit zero.
+ *
+ * There must not be a active data transfer in progress
+ * when releasing the stream. If a stream needs to be
+ * disposed of prior to end of stream being reached, then
+ * the virStreamAbort function should be called first.
+ *
+ * Returns 0 upon success, or -1 on error
+ */
+int virStreamFree(virStreamPtr stream)
+{
+ DEBUG("stream=%p", stream);
+
+ virResetLastError();
+
+ if (!VIR_IS_CONNECTED_STREAM(stream)) {
+ virLibConnError(NULL, VIR_ERR_INVALID_CONN, __FUNCTION__);
+ return (-1);
+ }
+
+ /* XXX Enforce shutdown before free'ing resources ? */
+
+ if (virUnrefStream(stream) < 0)
+ return (-1);
+ return (0);
+}
diff --git a/src/libvirt_private.syms b/src/libvirt_private.syms
index 2bf4e15..12d552e 100644
--- a/src/libvirt_private.syms
+++ b/src/libvirt_private.syms
@@ -56,6 +56,8 @@ virUnrefStorageVol;
virGetNodeDevice;
virUnrefDomain;
virUnrefConnect;
+virGetStream;
+virUnrefStream;
# domain_conf.h
diff --git a/src/libvirt_public.syms b/src/libvirt_public.syms
index c06f51e..f48b8c5 100644
--- a/src/libvirt_public.syms
+++ b/src/libvirt_public.syms
@@ -291,4 +291,19 @@ LIBVIRT_0.7.0 {
virConnectListDefinedInterfaces;
} LIBVIRT_0.6.4;
+LIBVIRT_0.7.1 {
+ virStreamNew;
+ virStreamRef;
+ virStreamSend;
+ virStreamRecv;
+ virStreamSendAll;
+ virStreamRecvAll;
+ virStreamEventAddCallback;
+ virStreamEventUpdateCallback;
+ virStreamEventRemoveCallback;
+ virStreamFinish;
+ virStreamAbort;
+ virStreamFree;
+} LIBVIRT_0.7.0;
+
# .... define new API here using predicted next version number ....
--
1.6.2.5