This is just a wrapper over new functions that have been just
introduced: virStreamRecvFlags(), virStreamHoleSize(). It's very
similar to virStreamRecvAll() except it handles sparse streams
well.
Signed-off-by: Michal Privoznik <mprivozn(a)redhat.com>
---
include/libvirt/libvirt-stream.h | 28 ++++++++-
src/libvirt-stream.c | 119 +++++++++++++++++++++++++++++++++++++++
src/libvirt_public.syms | 1 +
3 files changed, 145 insertions(+), 3 deletions(-)
diff --git a/include/libvirt/libvirt-stream.h b/include/libvirt/libvirt-stream.h
index 23fcc26..e5f5126 100644
--- a/include/libvirt/libvirt-stream.h
+++ b/include/libvirt/libvirt-stream.h
@@ -102,9 +102,9 @@ int virStreamSendAll(virStreamPtr st,
* @nbytes: size of the data array
* @opaque: optional application provided data
*
- * The virStreamSinkFunc callback is used together
- * with the virStreamRecvAll function for libvirt to
- * provide the data that has been received.
+ * The virStreamSinkFunc callback is used together with the
+ * virStreamRecvAll or virStreamSparseRecvAll functions for
+ * libvirt to provide the data that has been received.
*
* The callback will be invoked multiple times,
* providing data in small chunks. The application
@@ -127,6 +127,28 @@ int virStreamRecvAll(virStreamPtr st,
virStreamSinkFunc handler,
void *opaque);
+/**
+ * virStreamSinkHoleFunc:
+ * @st: the stream object
+ * @length: stream hole size
+ * @opaque: optional application provided data
+ *
+ * This callback is used together with the virStreamSparseRecvAll
+ * function for libvirt to provide the size of a hole that
+ * occurred in the stream.
+ *
+ * Returns 0 on success,
+ * -1 upon error
+ */
+typedef int (*virStreamSinkHoleFunc)(virStreamPtr st,
+ unsigned long long length,
+ void *opaque);
+
+int virStreamSparseRecvAll(virStreamPtr stream,
+ virStreamSinkFunc handler,
+ virStreamSinkHoleFunc holeHandler,
+ void *opaque);
+
typedef enum {
VIR_STREAM_EVENT_READABLE = (1 << 0),
VIR_STREAM_EVENT_WRITABLE = (1 << 1),
diff --git a/src/libvirt-stream.c b/src/libvirt-stream.c
index 1162d33..81190cc 100644
--- a/src/libvirt-stream.c
+++ b/src/libvirt-stream.c
@@ -660,6 +660,125 @@ virStreamRecvAll(virStreamPtr stream,
/**
+ * virStreamSparseRecvAll:
+ * @stream: pointer to the stream object
+ * @handler: sink callback for writing data to application
+ * @holeHandler: stream hole callback for skipping holes
+ * @opaque: application defined data
+ *
+ * Receive the entire data stream, sending the data to the
+ * requested data sink. This is simply a convenient alternative
+ * to virStreamRecv, for apps that do blocking-I/O.
+ *
+ * An example using this with a hypothetical file download
+ * API looks like:
+ *
+ * int mysink(virStreamPtr st, const char *buf, int nbytes, void *opaque) {
+ * int *fd = opaque;
+ *
+ * return write(*fd, buf, nbytes);
+ * }
+ *
+ * int myskip(virStreamPtr st, unsigned long long offset, void *opaque) {
+ * int *fd = opaque;
+ *
+ * return lseek(*fd, offset, SEEK_CUR) == (off_t) -1 ? -1 : 0;
+ * }
+ *
+ * virStreamPtr st = virStreamNew(conn, 0);
+ * int fd = open("demo.iso", O_WRONLY);
+ *
+ * virConnectDownloadSparseFile(conn, st);
+ * if (virStreamSparseRecvAll(st, mysink, myskip, &fd) < 0) {
+ * ...report an error ...
+ * goto done;
+ * }
+ * if (virStreamFinish(st) < 0)
+ * ...report an error...
+ * virStreamFree(st);
+ * close(fd);
+ *
+ * Note that @opaque data is shared between both @handler and
+ * @holeHandler callbacks.
+ *
+ * Returns 0 if all the data was successfully received. The caller
+ * should invoke virStreamFinish(st) to flush the stream upon
+ * success and then virStreamFree
+ *
+ * Returns -1 upon any error, with virStreamAbort() already
+ * having been called, so the caller need only call
+ * virStreamFree()
+ */
+int
+virStreamSparseRecvAll(virStreamPtr stream,
+ virStreamSinkFunc handler,
+ virStreamSinkHoleFunc holeHandler,
+ void *opaque)
+{
+ char *bytes = NULL;
+ size_t want = VIR_NET_MESSAGE_LEGACY_PAYLOAD_MAX;
+ const unsigned int flags = VIR_STREAM_RECV_STOP_AT_HOLE;
+ int ret = -1;
+
+ VIR_DEBUG("stream=%p handler=%p holeHandler=%p opaque=%p",
+ stream, handler, holeHandler, opaque);
+
+ virResetLastError();
+
+ virCheckStreamReturn(stream, -1);
+ virCheckNonNullArgGoto(handler, cleanup);
+ virCheckNonNullArgGoto(holeHandler, cleanup);
+
+ if (stream->flags & VIR_STREAM_NONBLOCK) {
+ virReportError(VIR_ERR_OPERATION_INVALID, "%s",
+ _("data sinks cannot be used for non-blocking
streams"));
+ goto cleanup;
+ }
+
+ if (VIR_ALLOC_N(bytes, want) < 0)
+ goto cleanup;
+
+ for (;;) {
+ int got, offset = 0;
+ unsigned long long holeLen;
+ got = virStreamRecvFlags(stream, bytes, want, flags);
+ if (got == -3) {
+ if (virStreamHoleSize(stream, &holeLen) < 0) {
+ virStreamAbort(stream);
+ goto cleanup;
+ }
+
+ if (holeHandler(stream, holeLen, opaque) < 0) {
+ virStreamAbort(stream);
+ goto cleanup;
+ }
+ } else if (got < 0) {
+ goto cleanup;
+ } else if (got == 0) {
+ break;
+ }
+ while (offset < got) {
+ int done;
+ done = (handler)(stream, bytes + offset, got - offset, opaque);
+ if (done < 0) {
+ virStreamAbort(stream);
+ goto cleanup;
+ }
+ offset += done;
+ }
+ }
+ ret = 0;
+
+ cleanup:
+ VIR_FREE(bytes);
+
+ if (ret != 0)
+ virDispatchError(stream->conn);
+
+ return ret;
+}
+
+/**
* virStreamEventAddCallback:
* @stream: pointer to the stream object
* @events: set of events to monitor
diff --git a/src/libvirt_public.syms b/src/libvirt_public.syms
index 0e34eee..008dc59 100644
--- a/src/libvirt_public.syms
+++ b/src/libvirt_public.syms
@@ -764,6 +764,7 @@ LIBVIRT_3.3.0 {
virStreamHoleSize;
virStreamRecvFlags;
virStreamSkip;
+ virStreamSparseRecvAll;
} LIBVIRT_3.1.0;
# .... define new API here using predicted next version number ....
--
2.10.2