[libvirt] [PATCH v3 00/31] Implement sparse streams for libvirt

v3 of: https://www.redhat.com/archives/libvir-list/2017-April/msg00671.html All the patches can be found on my github: https://github.com/zippy2/libvirt/tree/sparse_streams2 diff to v2: - renamed APIs from Skip & GetHoleSize to SendHole & RecvHole - switched from 'unsigned long long len' to 'long long len' (where len is size of a hole) - introduced @flags to public APIs for future extensibility - couple of coding style fixes - couple of fixes suggested by John in review of v2 As expressed earlier, a lot of these patches should have Reviewed-by tag as John reviewed majority of them. But we don't have a clear agreement when to use the tag, so I'm not putting it in just yet. However, will do before pushing. Some patches were ACKed. However, changes described above changed them, so I'm not sure ACK still stands. Michal Privoznik (31): virfdstream: Use messages instead of pipe util: Introduce virFileInData Introduce virStreamRecvFlags Implement virStreamRecvFlags to some drivers Introduce virStreamSendHole Introduce virStreamRecvHole Introduce VIR_STREAM_RECV_STOP_AT_HOLE flag Introduce virStreamSparseRecvAll Introduce virStreamSparseSendAll Introduce virStreamInData virNetClientStreamNew: Track origin stream Add new flag to daemonCreateClientStream and virNetClientStreamNew RPC: Introduce virNetStreamHole Introduce VIR_NET_STREAM_HOLE message type Teach wireshark plugin about VIR_NET_STREAM_HOLE daemon: Introduce virNetServerProgramSendStreamHole virnetclientstream: Introduce virNetClientStreamSendHole daemon: Implement VIR_NET_STREAM_HOLE handling virnetclientstream: Introduce virNetClientStreamHandleHole remote_driver: Implement virStreamSendHole virNetClientStreamRecvPacket: Introduce @flags argument Introduce virNetClientStreamRecvHole remote: Implement virStreamRecvHole virNetClientStream: Wire up VIR_NET_STREAM_HOLE remote_driver: Implement VIR_STREAM_RECV_STOP_AT_HOLE daemonStreamHandleRead: Wire up seekable stream fdstream: Implement sparse stream gendispatch: Introduce @sparseflag for our calls Introduce virStorageVol{Download,Upload}Flags virsh: Implement sparse stream to vol-download virsh: Implement sparse stream to vol-upload daemon/remote.c | 2 +- daemon/stream.c | 148 ++++++++- daemon/stream.h | 3 +- include/libvirt/libvirt-storage.h | 9 + include/libvirt/libvirt-stream.h | 115 ++++++- src/driver-stream.h | 25 ++ src/esx/esx_stream.c | 16 +- src/libvirt-storage.c | 4 +- src/libvirt-stream.c | 526 ++++++++++++++++++++++++++++++ src/libvirt_internal.h | 4 + src/libvirt_private.syms | 2 + src/libvirt_public.syms | 9 + src/libvirt_remote.syms | 3 + src/remote/remote_driver.c | 99 +++++- src/remote/remote_protocol.x | 2 + src/rpc/gendispatch.pl | 21 +- src/rpc/virnetclient.c | 1 + src/rpc/virnetclientstream.c | 238 +++++++++++++- src/rpc/virnetclientstream.h | 18 +- src/rpc/virnetprotocol.x | 17 +- src/rpc/virnetserverprogram.c | 35 ++ src/rpc/virnetserverprogram.h | 8 + src/storage/storage_driver.c | 4 +- src/storage/storage_util.c | 10 +- src/util/virfdstream.c | 609 +++++++++++++++++++++++++++++++---- src/util/virfdstream.h | 1 + src/util/virfile.c | 82 +++++ src/util/virfile.h | 4 + src/virnetprotocol-structs | 5 + tests/virfiletest.c | 203 ++++++++++++ tools/virsh-util.c | 65 ++++ tools/virsh-util.h | 29 ++ tools/virsh-volume.c | 50 ++- tools/virsh.pod | 6 +- tools/wireshark/src/packet-libvirt.c | 52 +++ tools/wireshark/src/packet-libvirt.h | 2 + 36 files changed, 2301 insertions(+), 126 deletions(-) -- 2.13.0

One big downside of using the pipe to transfer the data is that we can really transfer just bare data. No metadata can be carried through unless some formatted messages are introduced. That would be quite painful to achieve so let's use a message queue. It's fairly easy to exchange info between threads now that iohelper is no longer used. The reason why we cannot use the FD for plain files directly is that despite us setting noblock flag on the FD, any read()/write() blocks regardless (which is a show stopper since those parts of the code are run from the event loop) and poll() reports such FD as always readable/writable - even though the subsequent operation might block. The pipe is still not gone though. It is used to signal to even loop that an event occurred (e.g. data are available for reading in the queue, or vice versa). Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/util/virfdstream.c | 402 ++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 350 insertions(+), 52 deletions(-) diff --git a/src/util/virfdstream.c b/src/util/virfdstream.c index 5ce78fe58..4b42939e7 100644 --- a/src/util/virfdstream.c +++ b/src/util/virfdstream.c @@ -49,6 +49,27 @@ VIR_LOG_INIT("fdstream"); +typedef enum { + VIR_FDSTREAM_MSG_TYPE_DATA, +} virFDStreamMsgType; + +typedef struct _virFDStreamMsg virFDStreamMsg; +typedef virFDStreamMsg *virFDStreamMsgPtr; +struct _virFDStreamMsg { + virFDStreamMsgPtr next; + + virFDStreamMsgType type; + + union { + struct { + char *buf; + size_t len; + size_t offset; + } data; + } stream; +}; + + /* Tunnelled migration stream support */ typedef struct virFDStreamData virFDStreamData; typedef virFDStreamData *virFDStreamDataPtr; @@ -80,18 +101,25 @@ struct virFDStreamData { /* Thread data */ virThreadPtr thread; + virCond threadCond; int threadErr; bool threadQuit; + bool threadAbort; + bool threadDoRead; + virFDStreamMsgPtr msg; }; static virClassPtr virFDStreamDataClass; +static void virFDStreamMsgQueueFree(virFDStreamMsgPtr *queue); + static void virFDStreamDataDispose(void *obj) { virFDStreamDataPtr fdst = obj; VIR_DEBUG("obj=%p", fdst); + virFDStreamMsgQueueFree(&fdst->msg); } static int virFDStreamDataOnceInit(void) @@ -108,6 +136,89 @@ static int virFDStreamDataOnceInit(void) VIR_ONCE_GLOBAL_INIT(virFDStreamData) +static int +virFDStreamMsgQueuePush(virFDStreamDataPtr fdst, + virFDStreamMsgPtr msg, + int fd, + const char *fdname) +{ + virFDStreamMsgPtr *tmp = &fdst->msg; + char c = '1'; + + while (*tmp) + tmp = &(*tmp)->next; + + *tmp = msg; + virCondSignal(&fdst->threadCond); + + if (safewrite(fd, &c, sizeof(c)) != sizeof(c)) { + virReportSystemError(errno, + _("Unable to write to %s"), + fdname); + return -1; + } + + return 0; +} + + +static virFDStreamMsgPtr +virFDStreamMsgQueuePop(virFDStreamDataPtr fdst, + int fd, + const char *fdname) +{ + virFDStreamMsgPtr tmp = fdst->msg; + char c; + + if (tmp) { + fdst->msg = tmp->next; + tmp->next = NULL; + } + + virCondSignal(&fdst->threadCond); + + if (saferead(fd, &c, sizeof(c)) != sizeof(c)) { + virReportSystemError(errno, + _("Unable to read from %s"), + fdname); + return NULL; + } + + return tmp; +} + + +static void +virFDStreamMsgFree(virFDStreamMsgPtr msg) +{ + if (!msg) + return; + + switch (msg->type) { + case VIR_FDSTREAM_MSG_TYPE_DATA: + VIR_FREE(msg->stream.data.buf); + break; + } + + VIR_FREE(msg); +} + + +static void +virFDStreamMsgQueueFree(virFDStreamMsgPtr *queue) +{ + virFDStreamMsgPtr tmp = *queue; + + while (tmp) { + virFDStreamMsgPtr next = tmp->next; + virFDStreamMsgFree(tmp); + tmp = next; + } + + *queue = NULL; +} + + static int virFDStreamRemoveCallback(virStreamPtr stream) { virFDStreamDataPtr fdst = stream->privateData; @@ -273,6 +384,7 @@ typedef virFDStreamThreadData *virFDStreamThreadDataPtr; struct _virFDStreamThreadData { virStreamPtr st; size_t length; + bool doRead; int fdin; char *fdinname; int fdout; @@ -293,6 +405,86 @@ virFDStreamThreadDataFree(virFDStreamThreadDataPtr data) } +static ssize_t +virFDStreamThreadDoRead(virFDStreamDataPtr fdst, + const int fdin, + const int fdout, + const char *fdinname, + const char *fdoutname, + size_t buflen) +{ + virFDStreamMsgPtr msg = NULL; + char *buf = NULL; + ssize_t got; + + if (VIR_ALLOC(msg) < 0) + goto error; + + if (VIR_ALLOC_N(buf, buflen) < 0) + goto error; + + if ((got = saferead(fdin, buf, buflen)) < 0) { + virReportSystemError(errno, + _("Unable to read %s"), + fdinname); + goto error; + } + + msg->type = VIR_FDSTREAM_MSG_TYPE_DATA; + msg->stream.data.buf = buf; + msg->stream.data.len = got; + buf = NULL; + + virFDStreamMsgQueuePush(fdst, msg, fdout, fdoutname); + msg = NULL; + + return got; + + error: + VIR_FREE(buf); + virFDStreamMsgFree(msg); + return -1; +} + + +static ssize_t +virFDStreamThreadDoWrite(virFDStreamDataPtr fdst, + const int fdin, + const int fdout, + const char *fdinname, + const char *fdoutname) +{ + ssize_t got; + virFDStreamMsgPtr msg = fdst->msg; + bool pop = false; + + switch (msg->type) { + case VIR_FDSTREAM_MSG_TYPE_DATA: + got = safewrite(fdout, + msg->stream.data.buf + msg->stream.data.offset, + msg->stream.data.len - msg->stream.data.offset); + if (got < 0) { + virReportSystemError(errno, + _("Unable to write %s"), + fdoutname); + return -1; + } + + msg->stream.data.offset += got; + + pop = msg->stream.data.offset == msg->stream.data.len; + break; + } + + if (pop) { + virFDStreamMsgQueuePop(fdst, fdin, fdinname); + virFDStreamMsgFree(msg); + } + + return got; +} + + static void virFDStreamThread(void *opaque) { @@ -304,14 +496,12 @@ virFDStreamThread(void *opaque) int fdout = data->fdout; char *fdoutname = data->fdoutname; virFDStreamDataPtr fdst = st->privateData; - char *buf = NULL; + bool doRead = fdst->threadDoRead; size_t buflen = 256 * 1024; size_t total = 0; virObjectRef(fdst); - - if (VIR_ALLOC_N(buf, buflen) < 0) - goto error; + virObjectLock(fdst); while (1) { ssize_t got; @@ -323,39 +513,56 @@ virFDStreamThread(void *opaque) if (buflen == 0) break; /* End of requested data from client */ - if ((got = saferead(fdin, buf, buflen)) < 0) { - virReportSystemError(errno, - _("Unable to read %s"), - fdinname); + while (doRead == (fdst->msg != NULL) && + !fdst->threadQuit) { + if (virCondWait(&fdst->threadCond, &fdst->parent.lock)) { + virReportSystemError(errno, "%s", + _("failed to wait on condition")); + goto error; + } + } + + if (fdst->threadQuit) { + /* If stream abort was requested, quit early. */ + if (fdst->threadAbort) + goto cleanup; + + /* Otherwise flush buffers and quit gracefully. */ + if (doRead == (fdst->msg != NULL)) + break; + } + + if (doRead) + got = virFDStreamThreadDoRead(fdst, + fdin, fdout, + fdinname, fdoutname, + buflen); + else + got = virFDStreamThreadDoWrite(fdst, + fdin, fdout, + fdinname, fdoutname); + + if (got < 0) goto error; - } if (got == 0) break; total += got; - - if (safewrite(fdout, buf, got) < 0) { - virReportSystemError(errno, - _("Unable to write %s"), - fdoutname); - goto error; - } } cleanup: + fdst->threadQuit = true; + virObjectUnlock(fdst); if (!virObjectUnref(fdst)) st->privateData = NULL; VIR_FORCE_CLOSE(fdin); VIR_FORCE_CLOSE(fdout); virFDStreamThreadDataFree(data); - VIR_FREE(buf); return; error: - virObjectLock(fdst); fdst->threadErr = errno; - virObjectUnlock(fdst); goto cleanup; } @@ -368,6 +575,10 @@ virFDStreamJoinWorker(virFDStreamDataPtr fdst, if (!fdst->thread) return 0; + fdst->threadAbort = streamAbort; + fdst->threadQuit = true; + virCondSignal(&fdst->threadCond); + /* Give the thread a chance to lock the FD stream object. */ virObjectUnlock(fdst); virThreadJoin(fdst->thread); @@ -381,6 +592,7 @@ virFDStreamJoinWorker(virFDStreamDataPtr fdst, ret = 0; cleanup: VIR_FREE(fdst->thread); + virCondDestroy(&fdst->threadCond); return ret; } @@ -427,11 +639,14 @@ virFDStreamCloseInt(virStreamPtr st, bool streamAbort) fdst->abortCallbackDispatching = false; } - /* mutex locked */ - ret = VIR_CLOSE(fdst->fd); if (virFDStreamJoinWorker(fdst, streamAbort) < 0) ret = -1; + /* mutex locked */ + if ((ret = VIR_CLOSE(fdst->fd)) < 0) + virReportSystemError(errno, "%s", + _("Unable to close")); + st->privateData = NULL; /* call the internal stream closing callback */ @@ -468,7 +683,8 @@ virFDStreamAbort(virStreamPtr st) static int virFDStreamWrite(virStreamPtr st, const char *bytes, size_t nbytes) { virFDStreamDataPtr fdst = st->privateData; - int ret; + virFDStreamMsgPtr msg = NULL; + int ret = -1; if (nbytes > INT_MAX) { virReportSystemError(ERANGE, "%s", @@ -496,25 +712,51 @@ static int virFDStreamWrite(virStreamPtr st, const char *bytes, size_t nbytes) nbytes = fdst->length - fdst->offset; } - retry: - ret = write(fdst->fd, bytes, nbytes); - if (ret < 0) { - VIR_WARNINGS_NO_WLOGICALOP_EQUAL_EXPR - if (errno == EAGAIN || errno == EWOULDBLOCK) { - VIR_WARNINGS_RESET - ret = -2; - } else if (errno == EINTR) { - goto retry; - } else { - ret = -1; - virReportSystemError(errno, "%s", + if (fdst->thread) { + char *buf; + + if (fdst->threadQuit) { + virReportSystemError(EBADF, "%s", _("cannot write to stream")); + return -1; + } + + if (VIR_ALLOC(msg) < 0 || + VIR_ALLOC_N(buf, nbytes) < 0) + goto cleanup; + + memcpy(buf, bytes, nbytes); + msg->type = VIR_FDSTREAM_MSG_TYPE_DATA; + msg->stream.data.buf = buf; + msg->stream.data.len = nbytes; + + virFDStreamMsgQueuePush(fdst, msg, fdst->fd, "pipe"); + msg = NULL; + ret = nbytes; + } else { + retry: + ret = write(fdst->fd, bytes, nbytes); + if (ret < 0) { + VIR_WARNINGS_NO_WLOGICALOP_EQUAL_EXPR + if (errno == EAGAIN || errno == EWOULDBLOCK) { + VIR_WARNINGS_RESET + ret = -2; + } else if (errno == EINTR) { + goto retry; + } else { + ret = -1; + virReportSystemError(errno, "%s", + _("cannot write to stream")); + } } - } else if (fdst->length) { - fdst->offset += ret; } + if (fdst->length) + fdst->offset += ret; + + cleanup: virObjectUnlock(fdst); + virFDStreamMsgFree(msg); return ret; } @@ -522,7 +764,7 @@ static int virFDStreamWrite(virStreamPtr st, const char *bytes, size_t nbytes) static int virFDStreamRead(virStreamPtr st, char *bytes, size_t nbytes) { virFDStreamDataPtr fdst = st->privateData; - int ret; + int ret = -1; if (nbytes > INT_MAX) { virReportSystemError(ERANGE, "%s", @@ -548,24 +790,70 @@ static int virFDStreamRead(virStreamPtr st, char *bytes, size_t nbytes) nbytes = fdst->length - fdst->offset; } - retry: - ret = read(fdst->fd, bytes, nbytes); - if (ret < 0) { - VIR_WARNINGS_NO_WLOGICALOP_EQUAL_EXPR - if (errno == EAGAIN || errno == EWOULDBLOCK) { - VIR_WARNINGS_RESET - ret = -2; - } else if (errno == EINTR) { - goto retry; - } else { - ret = -1; - virReportSystemError(errno, "%s", - _("cannot read from stream")); + if (fdst->thread) { + virFDStreamMsgPtr msg = NULL; + + while (!(msg = fdst->msg)) { + if (fdst->threadQuit) { + if (nbytes) { + virReportSystemError(EBADF, "%s", + _("stream is not open")); + } else { + ret = 0; + } + goto cleanup; + } else { + virObjectUnlock(fdst); + virCondSignal(&fdst->threadCond); + virObjectLock(fdst); + } + } + + if (msg->type != VIR_FDSTREAM_MSG_TYPE_DATA) { + /* Nope, nope, I'm outta here */ + virReportError(VIR_ERR_INTERNAL_ERROR, "%s", + _("unexpected message type")); + goto cleanup; + } + + if (nbytes > msg->stream.data.len - msg->stream.data.offset) + nbytes = msg->stream.data.len - msg->stream.data.offset; + + memcpy(bytes, + msg->stream.data.buf + msg->stream.data.offset, + nbytes); + + msg->stream.data.offset += nbytes; + if (msg->stream.data.offset == msg->stream.data.len) { + virFDStreamMsgQueuePop(fdst, fdst->fd, "pipe"); + virFDStreamMsgFree(msg); + } + + ret = nbytes; + + } else { + retry: + ret = read(fdst->fd, bytes, nbytes); + if (ret < 0) { + VIR_WARNINGS_NO_WLOGICALOP_EQUAL_EXPR + if (errno == EAGAIN || errno == EWOULDBLOCK) { + VIR_WARNINGS_RESET + ret = -2; + } else if (errno == EINTR) { + goto retry; + } else { + ret = -1; + virReportSystemError(errno, "%s", + _("cannot read from stream")); + } + goto cleanup; } - } else if (fdst->length) { - fdst->offset += ret; } + if (fdst->length) + fdst->offset += ret; + + cleanup: virObjectUnlock(fdst); return ret; } @@ -610,11 +898,19 @@ static int virFDStreamOpenInternal(virStreamPtr st, st->privateData = fdst; if (threadData) { + fdst->threadDoRead = threadData->doRead; + /* Create the thread after fdst and st were initialized. * The thread worker expects them to be that way. */ if (VIR_ALLOC(fdst->thread) < 0) goto error; + if (virCondInit(&fdst->threadCond) < 0) { + virReportSystemError(errno, "%s", + _("cannot initialize condition variable")); + goto error; + } + if (virThreadCreate(fdst->thread, true, virFDStreamThread, @@ -783,6 +1079,7 @@ virFDStreamOpenFileInternal(virStreamPtr st, VIR_STRDUP(threadData->fdoutname, "pipe") < 0) goto error; tmpfd = pipefds[0]; + threadData->doRead = true; } else { threadData->fdin = pipefds[0]; threadData->fdout = fd; @@ -790,6 +1087,7 @@ virFDStreamOpenFileInternal(virStreamPtr st, VIR_STRDUP(threadData->fdoutname, path) < 0) goto error; tmpfd = pipefds[1]; + threadData->doRead = false; } } -- 2.13.0

On 05/16/2017 10:03 AM, Michal Privoznik wrote:
One big downside of using the pipe to transfer the data is that we can really transfer just bare data. No metadata can be carried through unless some formatted messages are introduced. That would be quite painful to achieve so let's use a message queue. It's fairly easy to exchange info between threads now that iohelper is no longer used.
The reason why we cannot use the FD for plain files directly is that despite us setting noblock flag on the FD, any read()/write() blocks regardless (which is a show stopper since those parts of the code are run from the event loop) and poll() reports such FD as always readable/writable - even though the subsequent operation might block.
The pipe is still not gone though. It is used to signal to even
s/to even/the event/
loop that an event occurred (e.g. data are available for reading
s/are/is (yes, an oddity of the language)
in the queue, or vice versa).
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/util/virfdstream.c | 402 ++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 350 insertions(+), 52 deletions(-)
I'm still getting a compilation error on this patch... util/virfdstream.c: In function 'virFDStreamThread': util/virfdstream.c:551:15: error: 'got' may be used uninitialized in this function [-Werror=maybe-uninitialized] total += got; ^~
diff --git a/src/util/virfdstream.c b/src/util/virfdstream.c index 5ce78fe58..4b42939e7 100644 --- a/src/util/virfdstream.c +++ b/src/util/virfdstream.c @@ -49,6 +49,27 @@
[...]
+static ssize_t +virFDStreamThreadDoWrite(virFDStreamDataPtr fdst, + const int fdin, + const int fdout, + const char *fdinname, + const char *fdoutname) +{ + ssize_t got;
got = 0; Fixes the compilation issue since got is only set for MSG_TYPE_DATA and even though there is only that type, the compiler seems to somehow believe it could be set ambiguously.
+ virFDStreamMsgPtr msg = fdst->msg; + bool pop = false; + + switch (msg->type) { + case VIR_FDSTREAM_MSG_TYPE_DATA: + got = safewrite(fdout, + msg->stream.data.buf + msg->stream.data.offset, + msg->stream.data.len - msg->stream.data.offset); + if (got < 0) { + virReportSystemError(errno, + _("Unable to write %s"), + fdoutname); + return -1; + } + + msg->stream.data.offset += got; + + pop = msg->stream.data.offset == msg->stream.data.len; + break; + } + + if (pop) { + virFDStreamMsgQueuePop(fdst, fdin, fdinname); + virFDStreamMsgFree(msg); + } + + return got; +} + + static void virFDStreamThread(void *opaque) { @@ -304,14 +496,12 @@ virFDStreamThread(void *opaque) int fdout = data->fdout; char *fdoutname = data->fdoutname; virFDStreamDataPtr fdst = st->privateData; - char *buf = NULL; + bool doRead = fdst->threadDoRead;
Should the fdst ref come eafter the ObjectLock(fdst) below? [1]
size_t buflen = 256 * 1024; size_t total = 0;
virObjectRef(fdst); - - if (VIR_ALLOC_N(buf, buflen) < 0) - goto error; + virObjectLock(fdst);
^^^ [1] Reviewed-by: John Ferlan <jferlan@redhat.com> John [...]

On 05/16/2017 10:26 PM, John Ferlan wrote:
On 05/16/2017 10:03 AM, Michal Privoznik wrote:
One big downside of using the pipe to transfer the data is that we can really transfer just bare data. No metadata can be carried through unless some formatted messages are introduced. That would be quite painful to achieve so let's use a message queue. It's fairly easy to exchange info between threads now that iohelper is no longer used.
The reason why we cannot use the FD for plain files directly is that despite us setting noblock flag on the FD, any read()/write() blocks regardless (which is a show stopper since those parts of the code are run from the event loop) and poll() reports such FD as always readable/writable - even though the subsequent operation might block.
The pipe is still not gone though. It is used to signal to even
s/to even/the event/
loop that an event occurred (e.g. data are available for reading
s/are/is (yes, an oddity of the language)
in the queue, or vice versa).
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/util/virfdstream.c | 402 ++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 350 insertions(+), 52 deletions(-)
I'm still getting a compilation error on this patch...
util/virfdstream.c: In function 'virFDStreamThread': util/virfdstream.c:551:15: error: 'got' may be used uninitialized in this function [-Werror=maybe-uninitialized] total += got; ^~
diff --git a/src/util/virfdstream.c b/src/util/virfdstream.c index 5ce78fe58..4b42939e7 100644 --- a/src/util/virfdstream.c +++ b/src/util/virfdstream.c @@ -49,6 +49,27 @@
[...]
+static ssize_t +virFDStreamThreadDoWrite(virFDStreamDataPtr fdst, + const int fdin, + const int fdout, + const char *fdinname, + const char *fdoutname) +{ + ssize_t got;
got = 0;
Fixes the compilation issue since got is only set for MSG_TYPE_DATA and even though there is only that type, the compiler seems to somehow believe it could be set ambiguously.
A-ha! So this function might return uninitialized value (variable?), which is propagated to virFDStreamThread where it hits an error. Well, one one hand compiler tries to be smart (neither of those checks including @got in the parent function cause the compilation error since in that case @got is initialized); but on the other hand compiler fails to see there's no way with the current code for msg->type to be something different than MSG_TYPE_DATA in which case @got is set a value. Sigh.
+ virFDStreamMsgPtr msg = fdst->msg; + bool pop = false; + + switch (msg->type) { + case VIR_FDSTREAM_MSG_TYPE_DATA: + got = safewrite(fdout, + msg->stream.data.buf + msg->stream.data.offset, + msg->stream.data.len - msg->stream.data.offset); + if (got < 0) { + virReportSystemError(errno, + _("Unable to write %s"), + fdoutname); + return -1; + } + + msg->stream.data.offset += got; + + pop = msg->stream.data.offset == msg->stream.data.len; + break; + } + + if (pop) { + virFDStreamMsgQueuePop(fdst, fdin, fdinname); + virFDStreamMsgFree(msg); + } + + return got; +} + + static void virFDStreamThread(void *opaque) { @@ -304,14 +496,12 @@ virFDStreamThread(void *opaque) int fdout = data->fdout; char *fdoutname = data->fdoutname; virFDStreamDataPtr fdst = st->privateData; - char *buf = NULL; + bool doRead = fdst->threadDoRead;
Should the fdst ref come eafter the ObjectLock(fdst) below? [1]
Actually, it doesn't matter. At this point, @fdst should have at least one reference held by parent process (I mean the other thread that spawned this thread). Not even in the case when unfair thread scheduling occurs. I mean, imagine streamOpen() & streamClose() to be called immediately one after another. steamOpen() spawns the thread, but lets assume that the scheduler is unfair and does not schedule the thread for a while. Well, streamClose() waits for the thread to join anyway, therefore it doesn't really matter on ordering of lock() & ref() operations. Michal

This function takes a FD and determines whether the current position is in data section or in a hole. In addition to that, it also determines how much bytes are there remaining till the current section ends. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/libvirt_private.syms | 1 + src/util/virfile.c | 82 +++++++++++++++++++ src/util/virfile.h | 4 + tests/virfiletest.c | 203 +++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 290 insertions(+) diff --git a/src/libvirt_private.syms b/src/libvirt_private.syms index bbe283529..4102a002b 100644 --- a/src/libvirt_private.syms +++ b/src/libvirt_private.syms @@ -1628,6 +1628,7 @@ virFileGetHugepageSize; virFileGetMountReverseSubtree; virFileGetMountSubtree; virFileHasSuffix; +virFileInData; virFileIsAbsPath; virFileIsDir; virFileIsExecutable; diff --git a/src/util/virfile.c b/src/util/virfile.c index ea44a647c..5b10f9489 100644 --- a/src/util/virfile.c +++ b/src/util/virfile.c @@ -3793,6 +3793,88 @@ virFileComparePaths(const char *p1, const char *p2) cleanup: VIR_FREE(res1); VIR_FREE(res2); + + return ret; +} + + +int +virFileInData(int fd, + int *inData, + long long *length) +{ + int ret = -1; + off_t cur, data, hole, end; + + /* Get current position */ + cur = lseek(fd, 0, SEEK_CUR); + if (cur == (off_t) -1) { + virReportSystemError(errno, "%s", + _("Unable to get current position in file")); + goto cleanup; + } + + /* Now try to get data and hole offsets */ + data = lseek(fd, cur, SEEK_DATA); + + /* There are four options: + * 1) data == cur; @cur is in data + * 2) data > cur; @cur is in a hole, next data at @data + * 3) data < 0, errno = ENXIO; either @cur is in trailing hole, or @cur is beyond EOF. + * 4) data < 0, errno != ENXIO; we learned nothing + */ + + if (data == (off_t) -1) { + /* cases 3 and 4 */ + if (errno != ENXIO) { + virReportSystemError(errno, "%s", + _("Unable to seek to data")); + goto cleanup; + } + + *inData = 0; + /* There are two situations now. There is always an + * implicit hole at EOF. However, there might be a + * trailing hole just before EOF too. If that's the case + * report it. */ + if ((end = lseek(fd, 0, SEEK_END)) == (off_t) -1) { + virReportSystemError(errno, "%s", + _("Unable to seek to EOF")); + goto cleanup; + } + *length = end - cur; + } else if (data > cur) { + /* case 2 */ + *inData = 0; + *length = data - cur; + } else { + /* case 1 */ + *inData = 1; + + /* We don't know where does the next hole start. Let's + * find out. Here we get the same 4 possibilities as + * described above.*/ + hole = lseek(fd, data, SEEK_HOLE); + if (hole == (off_t) -1 || hole == data) { + /* cases 1, 3 and 4 */ + /* Wait a second. The reason why we are here is + * because we are in data. But at the same time we + * are in a trailing hole? Wut!? Do the best what we + * can do here. */ + virReportSystemError(errno, "%s", + _("unable to seek to hole")); + goto cleanup; + } else { + /* case 2 */ + *length = (hole - data); + } + } + + ret = 0; + cleanup: + /* At any rate, reposition back to where we started. */ + if (cur != (off_t) -1) + ignore_value(lseek(fd, cur, SEEK_SET)); return ret; } diff --git a/src/util/virfile.h b/src/util/virfile.h index 38e938f87..57ceb8072 100644 --- a/src/util/virfile.h +++ b/src/util/virfile.h @@ -348,4 +348,8 @@ int virFileReadValueString(char **value, const char *format, ...) ATTRIBUTE_FMT_PRINTF(2, 3); +int virFileInData(int fd, + int *inData, + long long *length); + #endif /* __VIR_FILE_H */ diff --git a/tests/virfiletest.c b/tests/virfiletest.c index 702a76a50..a93bee01a 100644 --- a/tests/virfiletest.c +++ b/tests/virfiletest.c @@ -21,6 +21,7 @@ #include <config.h> #include <stdlib.h> +#include <fcntl.h> #include "testutils.h" #include "virfile.h" @@ -118,6 +119,190 @@ testFileSanitizePath(const void *opaque) } +static int +makeSparseFile(const off_t offsets[], + const bool startData); + +#ifdef __linux__ +/* Create a sparse file. @offsets in KiB. */ +static int +makeSparseFile(const off_t offsets[], + const bool startData) +{ + int fd = -1; + char path[] = abs_builddir "fileInData.XXXXXX"; + off_t len = 0; + size_t i; + + if ((fd = mkostemp(path, O_CLOEXEC|O_RDWR)) < 0) + goto error; + + if (unlink(path) < 0) + goto error; + + for (i = 0; offsets[i] != (off_t) -1; i++) + len += offsets[i] * 1024; + + while (len) { + const char buf[] = "abcdefghijklmnopqrstuvwxyz"; + off_t toWrite = sizeof(buf); + + if (toWrite > len) + toWrite = len; + + if (safewrite(fd, buf, toWrite) < 0) { + fprintf(stderr, "unable to write to %s (errno=%d)\n", path, errno); + goto error; + } + + len -= toWrite; + } + + len = 0; + for (i = 0; offsets[i] != (off_t) -1; i++) { + bool inData = startData; + + if (i % 2) + inData = !inData; + + if (!inData && + fallocate(fd, + FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE, + len, offsets[i] * 1024) < 0) { + fprintf(stderr, "unable to punch a hole at offset %lld length %lld\n", + (long long) len, (long long) offsets[i]); + goto error; + } + + len += offsets[i] * 1024; + } + + if (lseek(fd, 0, SEEK_SET) == (off_t) -1) { + fprintf(stderr, "unable to lseek (errno=%d)\n", errno); + goto error; + } + + return fd; + error: + VIR_FORCE_CLOSE(fd); + return -1; +} + +#else /* !__linux__ */ + +static int +makeSparseFile(const off_t offsets[] ATTRIBUTE_UNUSED, + const bool startData ATTRIBUTE_UNUSED) +{ + return -1; +} + +#endif /* !__linux__ */ + + +#define EXTENT 4 +static bool +holesSupported(void) +{ + off_t offsets[] = {EXTENT, EXTENT, EXTENT, -1}; + off_t tmp; + int fd; + bool ret = false; + + if ((fd = makeSparseFile(offsets, true)) < 0) + goto cleanup; + + /* The way this works is: there are 4K of data followed by 4K hole followed + * by 4K hole again. Check if the filesystem we are running the test suite + * on supports holes. */ + if ((tmp = lseek(fd, 0, SEEK_DATA)) == (off_t) -1) + goto cleanup; + + if (tmp != 0) + goto cleanup; + + if ((tmp = lseek(fd, tmp, SEEK_HOLE)) == (off_t) -1) + goto cleanup; + + if (tmp != EXTENT * 1024) + goto cleanup; + + if ((tmp = lseek(fd, tmp, SEEK_DATA)) == (off_t) -1) + goto cleanup; + + if (tmp != 2 * EXTENT * 1024) + goto cleanup; + + if ((tmp = lseek(fd, tmp, SEEK_HOLE)) == (off_t) -1) + goto cleanup; + + if (tmp != 3 * EXTENT * 1024) + goto cleanup; + + ret = true; + cleanup: + VIR_FORCE_CLOSE(fd); + return ret; +} + + +struct testFileInData { + bool startData; /* whether the list of offsets starts with data section */ + off_t *offsets; +}; + + +static int +testFileInData(const void *opaque) +{ + const struct testFileInData *data = opaque; + int fd = -1; + int ret = -1; + size_t i; + + if ((fd = makeSparseFile(data->offsets, data->startData)) < 0) + goto cleanup; + + for (i = 0; data->offsets[i] != (off_t) -1; i++) { + bool shouldInData = data->startData; + int realInData; + long long shouldLen; + long long realLen; + + if (i % 2) + shouldInData = !shouldInData; + + if (virFileInData(fd, &realInData, &realLen) < 0) + goto cleanup; + + if (realInData != shouldInData) { + fprintf(stderr, "Unexpected data/hole. Expected %s got %s\n", + shouldInData ? "data" : "hole", + realInData ? "data" : "hole"); + goto cleanup; + } + + shouldLen = data->offsets[i] * 1024; + if (realLen != shouldLen) { + fprintf(stderr, "Unexpected section length. Expected %lld got %lld\n", + shouldLen, realLen); + goto cleanup; + } + + if (lseek(fd, shouldLen, SEEK_CUR) < 0) { + fprintf(stderr, "Unable to seek\n"); + goto cleanup; + } + } + + ret = 0; + + cleanup: + VIR_FORCE_CLOSE(fd); + return ret; +} + + static int mymain(void) { @@ -186,6 +371,24 @@ mymain(void) DO_TEST_SANITIZE_PATH_SAME("gluster://bar.baz/fooo//hoo"); DO_TEST_SANITIZE_PATH_SAME("gluster://bar.baz/fooo///////hoo"); +#define DO_TEST_IN_DATA(inData, ...) \ + do { \ + off_t offsets[] = {__VA_ARGS__, -1}; \ + struct testFileInData data = { \ + .startData = inData, .offsets = offsets, \ + }; \ + if (virTestRun(virTestCounterNext(), testFileInData, &data) < 0) \ + ret = -1; \ + } while (0) + + if (holesSupported()) { + DO_TEST_IN_DATA(true, 4, 4, 4); + DO_TEST_IN_DATA(false, 4, 4, 4); + DO_TEST_IN_DATA(true, 8, 8, 8); + DO_TEST_IN_DATA(false, 8, 8, 8); + DO_TEST_IN_DATA(true, 8, 16, 32, 64, 128, 256, 512); + DO_TEST_IN_DATA(false, 8, 16, 32, 64, 128, 256, 512); + } return ret != 0 ? EXIT_FAILURE : EXIT_SUCCESS; } -- 2.13.0

On 05/16/2017 10:03 AM, Michal Privoznik wrote:
This function takes a FD and determines whether the current position is in data section or in a hole. In addition to that, it also determines how much bytes are there remaining till the current section ends.
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/libvirt_private.syms | 1 + src/util/virfile.c | 82 +++++++++++++++++++ src/util/virfile.h | 4 + tests/virfiletest.c | 203 +++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 290 insertions(+)
diff --git a/src/libvirt_private.syms b/src/libvirt_private.syms index bbe283529..4102a002b 100644 --- a/src/libvirt_private.syms +++ b/src/libvirt_private.syms @@ -1628,6 +1628,7 @@ virFileGetHugepageSize; virFileGetMountReverseSubtree; virFileGetMountSubtree; virFileHasSuffix; +virFileInData; virFileIsAbsPath; virFileIsDir; virFileIsExecutable; diff --git a/src/util/virfile.c b/src/util/virfile.c index ea44a647c..5b10f9489 100644 --- a/src/util/virfile.c +++ b/src/util/virfile.c @@ -3793,6 +3793,88 @@ virFileComparePaths(const char *p1, const char *p2) cleanup: VIR_FREE(res1); VIR_FREE(res2); + + return ret; +} + +
Still undocumented. I know it got discussed a few times... I guess the odd thing I find about "long long" is that all the stream mgmt lengths/sizes, etc. are size_t. You also only test for smaller values only. Just saying... Please just add the function description *and* why a long long is being used here. Also a brief summary of what's being done and of course that lseek @cleanup: Reviewed-by: John Ferlan <jferlan@redhat.com> John
+int +virFileInData(int fd, + int *inData, + long long *length) +{
[...]

On 05/16/2017 10:55 PM, John Ferlan wrote:
On 05/16/2017 10:03 AM, Michal Privoznik wrote:
This function takes a FD and determines whether the current position is in data section or in a hole. In addition to that, it also determines how much bytes are there remaining till the current section ends.
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/libvirt_private.syms | 1 + src/util/virfile.c | 82 +++++++++++++++++++ src/util/virfile.h | 4 + tests/virfiletest.c | 203 +++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 290 insertions(+)
diff --git a/src/libvirt_private.syms b/src/libvirt_private.syms index bbe283529..4102a002b 100644 --- a/src/libvirt_private.syms +++ b/src/libvirt_private.syms @@ -1628,6 +1628,7 @@ virFileGetHugepageSize; virFileGetMountReverseSubtree; virFileGetMountSubtree; virFileHasSuffix; +virFileInData; virFileIsAbsPath; virFileIsDir; virFileIsExecutable; diff --git a/src/util/virfile.c b/src/util/virfile.c index ea44a647c..5b10f9489 100644 --- a/src/util/virfile.c +++ b/src/util/virfile.c @@ -3793,6 +3793,88 @@ virFileComparePaths(const char *p1, const char *p2) cleanup: VIR_FREE(res1); VIR_FREE(res2); + + return ret; +} + +
Still undocumented.
I know it got discussed a few times... I guess the odd thing I find about "long long" is that all the stream mgmt lengths/sizes, etc. are size_t. You also only test for smaller values only. Just saying...
That's true for areas where we need to address individual bytes in buffers, e.g. virStreamRecv & virStreamSend. When it comes to file offsets like in virStorageVolDownload and virStorageVolUpload we use ULL.
Please just add the function description *and* why a long long is being used here. Also a brief summary of what's being done and of course that lseek @cleanup:
How about this: +/** + * virFileInData: + * @fd: file to check + * @inData: true if current position in the @fd is in data section + * @length: amount of bytes until the end of the current section + * + * With sparse files not every extent has to be physically stored on + * the disk. This results in so called data or hole sections. This + * functions checks whether the current position in the file @fd is + * in a data section (@inData = 1) or in a hole (@inData = 0). Also, + * it sets @length to match the number of bytes remaining until the + * end of the current section. + * + * As a special case, there is an implicit hole at the end of any + * file. In this case, the function sets @inData = 0, @length = 0. + * + * Upon its return, the position in the @fd is left unchanged, i.e. + * despite this function lseek()-ing back and forth it always + * restores the original position in the file. + * + * NB, @length is type of long long because it corresponds to off_t + * the best. + * + * Returns 0 on success, + * -1 otherwise. + */ Michal

On 05/17/2017 08:41 AM, Michal Privoznik wrote:
On 05/16/2017 10:55 PM, John Ferlan wrote:
On 05/16/2017 10:03 AM, Michal Privoznik wrote:
This function takes a FD and determines whether the current position is in data section or in a hole. In addition to that, it also determines how much bytes are there remaining till the current section ends.
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/libvirt_private.syms | 1 + src/util/virfile.c | 82 +++++++++++++++++++ src/util/virfile.h | 4 + tests/virfiletest.c | 203 +++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 290 insertions(+)
diff --git a/src/libvirt_private.syms b/src/libvirt_private.syms index bbe283529..4102a002b 100644 --- a/src/libvirt_private.syms +++ b/src/libvirt_private.syms @@ -1628,6 +1628,7 @@ virFileGetHugepageSize; virFileGetMountReverseSubtree; virFileGetMountSubtree; virFileHasSuffix; +virFileInData; virFileIsAbsPath; virFileIsDir; virFileIsExecutable; diff --git a/src/util/virfile.c b/src/util/virfile.c index ea44a647c..5b10f9489 100644 --- a/src/util/virfile.c +++ b/src/util/virfile.c @@ -3793,6 +3793,88 @@ virFileComparePaths(const char *p1, const char *p2) cleanup: VIR_FREE(res1); VIR_FREE(res2); + + return ret; +} + +
Still undocumented.
I know it got discussed a few times... I guess the odd thing I find about "long long" is that all the stream mgmt lengths/sizes, etc. are size_t. You also only test for smaller values only. Just saying...
That's true for areas where we need to address individual bytes in buffers, e.g. virStreamRecv & virStreamSend. When it comes to file offsets like in virStorageVolDownload and virStorageVolUpload we use ULL.
Please just add the function description *and* why a long long is being used here. Also a brief summary of what's being done and of course that lseek @cleanup:
How about this:
Works for me with the one adjustment below: Thanks - Reviewed-by: John Ferlan <jferlan@redhat.com> John
+/** + * virFileInData: + * @fd: file to check + * @inData: true if current position in the @fd is in data section + * @length: amount of bytes until the end of the current section + * + * With sparse files not every extent has to be physically stored on + * the disk. This results in so called data or hole sections. This + * functions checks whether the current position in the file @fd is
function
+ * in a data section (@inData = 1) or in a hole (@inData = 0). Also, + * it sets @length to match the number of bytes remaining until the + * end of the current section. + * + * As a special case, there is an implicit hole at the end of any + * file. In this case, the function sets @inData = 0, @length = 0. + * + * Upon its return, the position in the @fd is left unchanged, i.e. + * despite this function lseek()-ing back and forth it always + * restores the original position in the file. + * + * NB, @length is type of long long because it corresponds to off_t + * the best. + * + * Returns 0 on success, + * -1 otherwise. + */
Michal

This patch is adding the virStreamRecvFlags as a variant to the virStreamRecv function in order to allow for future expansion of functionality for processing sparse streams using a @flags argument. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- include/libvirt/libvirt-stream.h | 5 ++++ src/driver-stream.h | 7 +++++ src/libvirt-stream.c | 59 ++++++++++++++++++++++++++++++++++++++++ src/libvirt_public.syms | 5 ++++ 4 files changed, 76 insertions(+) diff --git a/include/libvirt/libvirt-stream.h b/include/libvirt/libvirt-stream.h index 831640d56..bee25168b 100644 --- a/include/libvirt/libvirt-stream.h +++ b/include/libvirt/libvirt-stream.h @@ -45,6 +45,11 @@ int virStreamRecv(virStreamPtr st, char *data, size_t nbytes); +int virStreamRecvFlags(virStreamPtr st, + char *data, + size_t nbytes, + unsigned int flags); + /** * virStreamSourceFunc: diff --git a/src/driver-stream.h b/src/driver-stream.h index 85b4e3bc7..d4b048018 100644 --- a/src/driver-stream.h +++ b/src/driver-stream.h @@ -35,6 +35,12 @@ typedef int char *data, size_t nbytes); +typedef int +(*virDrvStreamRecvFlags)(virStreamPtr st, + char *data, + size_t nbytes, + unsigned int flags); + typedef int (*virDrvStreamEventAddCallback)(virStreamPtr stream, int events, @@ -61,6 +67,7 @@ typedef virStreamDriver *virStreamDriverPtr; struct _virStreamDriver { virDrvStreamSend streamSend; virDrvStreamRecv streamRecv; + virDrvStreamRecvFlags streamRecvFlags; virDrvStreamEventAddCallback streamEventAddCallback; virDrvStreamEventUpdateCallback streamEventUpdateCallback; virDrvStreamEventRemoveCallback streamEventRemoveCallback; diff --git a/src/libvirt-stream.c b/src/libvirt-stream.c index 8384b3720..7535deb3c 100644 --- a/src/libvirt-stream.c +++ b/src/libvirt-stream.c @@ -285,6 +285,65 @@ virStreamRecv(virStreamPtr stream, } +/** + * virStreamRecvFlags: + * @stream: pointer to the stream object + * @data: buffer to read into from stream + * @nbytes: size of @data buffer + * @flags: extra flags; not used yet, so callers should always pass 0 + * + * Reads a series of bytes from the stream. This method may + * block the calling application for an arbitrary amount + * of time. + * + * This is just like virStreamRecv except this one has extra + * @flags. Calling this function with no @flags set (equal to + * zero) is equivalent to calling virStreamRecv(stream, data, nbytes). + * + * Returns 0 when the end of the stream is reached, at + * which time the caller should invoke virStreamFinish() + * to get confirmation of stream completion. + * + * Returns -1 upon error, at which time the stream will + * be marked as aborted, and the caller should now release + * the stream with virStreamFree. + * + * Returns -2 if there is no data pending to be read & the + * stream is marked as non-blocking. + */ +int +virStreamRecvFlags(virStreamPtr stream, + char *data, + size_t nbytes, + unsigned int flags) +{ + VIR_DEBUG("stream=%p, data=%p, nbytes=%zu flags=%x", + stream, data, nbytes, flags); + + virResetLastError(); + + virCheckStreamReturn(stream, -1); + virCheckNonNullArgGoto(data, error); + + if (stream->driver && + stream->driver->streamRecvFlags) { + int ret; + ret = (stream->driver->streamRecvFlags)(stream, data, nbytes, flags); + if (ret == -2) + return -2; + if (ret < 0) + goto error; + return ret; + } + + virReportUnsupportedError(); + + error: + virDispatchError(stream->conn); + return -1; +} + + /** * virStreamSendAll: * @stream: pointer to the stream object diff --git a/src/libvirt_public.syms b/src/libvirt_public.syms index 428cf2e19..d50b36a24 100644 --- a/src/libvirt_public.syms +++ b/src/libvirt_public.syms @@ -759,4 +759,9 @@ LIBVIRT_3.1.0 { virDomainSetVcpu; } LIBVIRT_3.0.0; +LIBVIRT_3.4.0 { + global: + virStreamRecvFlags; +} LIBVIRT_3.1.0; + # .... define new API here using predicted next version number .... -- 2.13.0

On 05/16/2017 10:03 AM, Michal Privoznik wrote:
This patch is adding the virStreamRecvFlags as a variant to the virStreamRecv function in order to allow for future expansion of functionality for processing sparse streams using a @flags argument.
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- include/libvirt/libvirt-stream.h | 5 ++++ src/driver-stream.h | 7 +++++ src/libvirt-stream.c | 59 ++++++++++++++++++++++++++++++++++++++++ src/libvirt_public.syms | 5 ++++ 4 files changed, 76 insertions(+)
Reviewed-by: John Ferlan <jferlan@redhat.com> John

There are three virStreamDriver's currently supported: * virFDStream * remote driver * ESX driver For now, backend virStreamRecvFlags support for only remote driver and ESX driver is sufficient. Future patches will update virFDStream. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/esx/esx_stream.c | 16 +++++++++++++++- src/remote/remote_driver.c | 21 +++++++++++++++++---- 2 files changed, 32 insertions(+), 5 deletions(-) diff --git a/src/esx/esx_stream.c b/src/esx/esx_stream.c index fb9abbca6..b820b38ee 100644 --- a/src/esx/esx_stream.c +++ b/src/esx/esx_stream.c @@ -252,12 +252,17 @@ esxStreamSend(virStreamPtr stream, const char *data, size_t nbytes) } static int -esxStreamRecv(virStreamPtr stream, char *data, size_t nbytes) +esxStreamRecvFlags(virStreamPtr stream, + char *data, + size_t nbytes, + unsigned int flags) { int result = -1; esxStreamPrivate *priv = stream->privateData; int status; + virCheckFlags(0, -1); + if (nbytes == 0) return 0; @@ -317,6 +322,14 @@ esxStreamRecv(virStreamPtr stream, char *data, size_t nbytes) return result; } +static int +esxStreamRecv(virStreamPtr stream, + char *data, + size_t nbytes) +{ + return esxStreamRecvFlags(stream, data, nbytes, 0); +} + static void esxFreeStreamPrivate(esxStreamPrivate **priv) { @@ -369,6 +382,7 @@ esxStreamAbort(virStreamPtr stream) virStreamDriver esxStreamDriver = { .streamSend = esxStreamSend, .streamRecv = esxStreamRecv, + .streamRecvFlags = esxStreamRecvFlags, /* FIXME: streamAddCallback missing */ /* FIXME: streamUpdateCallback missing */ /* FIXME: streamRemoveCallback missing */ diff --git a/src/remote/remote_driver.c b/src/remote/remote_driver.c index 77250ea56..e79e796f2 100644 --- a/src/remote/remote_driver.c +++ b/src/remote/remote_driver.c @@ -5641,15 +5641,19 @@ remoteStreamSend(virStreamPtr st, static int -remoteStreamRecv(virStreamPtr st, - char *data, - size_t nbytes) +remoteStreamRecvFlags(virStreamPtr st, + char *data, + size_t nbytes, + unsigned int flags) { - VIR_DEBUG("st=%p data=%p nbytes=%zu", st, data, nbytes); + VIR_DEBUG("st=%p data=%p nbytes=%zu flags=%x", + st, data, nbytes, flags); struct private_data *priv = st->conn->privateData; virNetClientStreamPtr privst = st->privateData; int rv; + virCheckFlags(0, -1); + if (virNetClientStreamRaiseError(privst)) return -1; @@ -5671,6 +5675,14 @@ remoteStreamRecv(virStreamPtr st, return rv; } +static int +remoteStreamRecv(virStreamPtr st, + char *data, + size_t nbytes) +{ + return remoteStreamRecvFlags(st, data, nbytes, 0); +} + struct remoteStreamCallbackData { virStreamPtr st; virStreamEventCallback cb; @@ -5843,6 +5855,7 @@ remoteStreamAbort(virStreamPtr st) static virStreamDriver remoteStreamDrv = { .streamRecv = remoteStreamRecv, + .streamRecvFlags = remoteStreamRecvFlags, .streamSend = remoteStreamSend, .streamFinish = remoteStreamFinish, .streamAbort = remoteStreamAbort, -- 2.13.0

On 05/16/2017 10:03 AM, Michal Privoznik wrote:
There are three virStreamDriver's currently supported:
* virFDStream * remote driver * ESX driver
For now, backend virStreamRecvFlags support for only remote driver and ESX driver is sufficient. Future patches will update virFDStream.
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/esx/esx_stream.c | 16 +++++++++++++++- src/remote/remote_driver.c | 21 +++++++++++++++++---- 2 files changed, 32 insertions(+), 5 deletions(-)
Reviewed-by: John Ferlan <jferlan@redhat.com> John

This API is used to tell the other side of the stream to skip some bytes in the stream. This can be used to create a sparse file on the receiving side of a stream. It takes @length argument, which says how big the hole is. This skipping is done from the current point of stream. Since our streams are not rewindable like regular files, we don't need @whence argument like seek(2) has. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- include/libvirt/libvirt-stream.h | 4 +++ src/driver-stream.h | 6 ++++ src/libvirt-stream.c | 61 ++++++++++++++++++++++++++++++++++++++++ src/libvirt_public.syms | 1 + 4 files changed, 72 insertions(+) diff --git a/include/libvirt/libvirt-stream.h b/include/libvirt/libvirt-stream.h index bee25168b..14c9af142 100644 --- a/include/libvirt/libvirt-stream.h +++ b/include/libvirt/libvirt-stream.h @@ -50,6 +50,10 @@ int virStreamRecvFlags(virStreamPtr st, size_t nbytes, unsigned int flags); +int virStreamSendHole(virStreamPtr st, + long long length, + unsigned int flags); + /** * virStreamSourceFunc: diff --git a/src/driver-stream.h b/src/driver-stream.h index d4b048018..0a5201431 100644 --- a/src/driver-stream.h +++ b/src/driver-stream.h @@ -41,6 +41,11 @@ typedef int size_t nbytes, unsigned int flags); +typedef int +(*virDrvStreamSendHole)(virStreamPtr st, + long long length, + unsigned int flags); + typedef int (*virDrvStreamEventAddCallback)(virStreamPtr stream, int events, @@ -68,6 +73,7 @@ struct _virStreamDriver { virDrvStreamSend streamSend; virDrvStreamRecv streamRecv; virDrvStreamRecvFlags streamRecvFlags; + virDrvStreamSendHole streamSendHole; virDrvStreamEventAddCallback streamEventAddCallback; virDrvStreamEventUpdateCallback streamEventUpdateCallback; virDrvStreamEventRemoveCallback streamEventRemoveCallback; diff --git a/src/libvirt-stream.c b/src/libvirt-stream.c index 7535deb3c..a09896dcd 100644 --- a/src/libvirt-stream.c +++ b/src/libvirt-stream.c @@ -344,6 +344,67 @@ virStreamRecvFlags(virStreamPtr stream, } +/** + * virStreamSendHole: + * @stream: pointer to the stream object + * @length: number of bytes to skip + * @flags: extra flags; not used yet, so callers should always pass 0 + * + * Rather than transmitting empty file space, this API directs + * the @stream target to create @length bytes of empty space. + * This API would be used when uploading or downloading sparsely + * populated files to avoid the needless copy of empty file + * space. + * + * An example using this with a hypothetical file upload API + * looks like: + * + * virStream st; + * + * while (1) { + * char buf[4096]; + * size_t len; + * if (..in hole...) { + * ..get hole size... + * virStreamSendHole(st, len, 0); + * } else { + * ...read len bytes... + * virStreamSend(st, buf, len); + * } + * } + * + * Returns 0 on success, + * -1 error + */ +int +virStreamSendHole(virStreamPtr stream, + long long length, + unsigned int flags) +{ + VIR_DEBUG("stream=%p, length=%lld flags=%x", + stream, length, flags); + + virResetLastError(); + + virCheckStreamReturn(stream, -1); + + if (stream->driver && + stream->driver->streamSendHole) { + int ret; + ret = (stream->driver->streamSendHole)(stream, length, flags); + if (ret < 0) + goto error; + return ret; + } + + virReportUnsupportedError(); + + error: + virDispatchError(stream->conn); + return -1; +} + + /** * virStreamSendAll: * @stream: pointer to the stream object diff --git a/src/libvirt_public.syms b/src/libvirt_public.syms index d50b36a24..3be7cc6a0 100644 --- a/src/libvirt_public.syms +++ b/src/libvirt_public.syms @@ -762,6 +762,7 @@ LIBVIRT_3.1.0 { LIBVIRT_3.4.0 { global: virStreamRecvFlags; + virStreamSendHole; } LIBVIRT_3.1.0; # .... define new API here using predicted next version number .... -- 2.13.0

On 05/16/2017 10:03 AM, Michal Privoznik wrote:
This API is used to tell the other side of the stream to skip some bytes in the stream. This can be used to create a sparse file on the receiving side of a stream.
It takes @length argument, which says how big the hole is. This skipping is done from the current point of stream. Since our streams are not rewindable like regular files, we don't need @whence argument like seek(2) has.
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- include/libvirt/libvirt-stream.h | 4 +++ src/driver-stream.h | 6 ++++ src/libvirt-stream.c | 61 ++++++++++++++++++++++++++++++++++++++++ src/libvirt_public.syms | 1 + 4 files changed, 72 insertions(+)
[...]
diff --git a/src/libvirt-stream.c b/src/libvirt-stream.c index 7535deb3c..a09896dcd 100644 --- a/src/libvirt-stream.c +++ b/src/libvirt-stream.c @@ -344,6 +344,67 @@ virStreamRecvFlags(virStreamPtr stream, }
+/** + * virStreamSendHole: + * @stream: pointer to the stream object + * @length: number of bytes to skip + * @flags: extra flags; not used yet, so callers should always pass 0 + * + * Rather than transmitting empty file space, this API directs + * the @stream target to create @length bytes of empty space. + * This API would be used when uploading or downloading sparsely + * populated files to avoid the needless copy of empty file + * space. + * + * An example using this with a hypothetical file upload API + * looks like: + * + * virStream st; + * + * while (1) { + * char buf[4096]; + * size_t len; + * if (..in hole...) { + * ..get hole size... + * virStreamSendHole(st, len, 0); + * } else { + * ...read len bytes... + * virStreamSend(st, buf, len); + * } + * } + * + * Returns 0 on success, + * -1 error + */ +int +virStreamSendHole(virStreamPtr stream, + long long length, + unsigned int flags) +{ + VIR_DEBUG("stream=%p, length=%lld flags=%x", + stream, length, flags); + + virResetLastError(); + + virCheckStreamReturn(stream, -1);
Perhaps some preventative programming: virCheckNonNegativeArgGoto(length, error); Although that would mean calling virDispatchError unless there was some sort of virCheck*ArgReturn(length, -1) Reviewed-by: John Ferlan <jferlan@redhat.com> John [...]

On 05/16/2017 11:13 PM, John Ferlan wrote:
On 05/16/2017 10:03 AM, Michal Privoznik wrote:
This API is used to tell the other side of the stream to skip some bytes in the stream. This can be used to create a sparse file on the receiving side of a stream.
It takes @length argument, which says how big the hole is. This skipping is done from the current point of stream. Since our streams are not rewindable like regular files, we don't need @whence argument like seek(2) has.
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- include/libvirt/libvirt-stream.h | 4 +++ src/driver-stream.h | 6 ++++ src/libvirt-stream.c | 61 ++++++++++++++++++++++++++++++++++++++++ src/libvirt_public.syms | 1 + 4 files changed, 72 insertions(+)
[...]
diff --git a/src/libvirt-stream.c b/src/libvirt-stream.c index 7535deb3c..a09896dcd 100644 --- a/src/libvirt-stream.c +++ b/src/libvirt-stream.c @@ -344,6 +344,67 @@ virStreamRecvFlags(virStreamPtr stream, }
+/** + * virStreamSendHole: + * @stream: pointer to the stream object + * @length: number of bytes to skip + * @flags: extra flags; not used yet, so callers should always pass 0 + * + * Rather than transmitting empty file space, this API directs + * the @stream target to create @length bytes of empty space. + * This API would be used when uploading or downloading sparsely + * populated files to avoid the needless copy of empty file + * space. + * + * An example using this with a hypothetical file upload API + * looks like: + * + * virStream st; + * + * while (1) { + * char buf[4096]; + * size_t len; + * if (..in hole...) { + * ..get hole size... + * virStreamSendHole(st, len, 0); + * } else { + * ...read len bytes... + * virStreamSend(st, buf, len); + * } + * } + * + * Returns 0 on success, + * -1 error + */ +int +virStreamSendHole(virStreamPtr stream, + long long length, + unsigned int flags) +{ + VIR_DEBUG("stream=%p, length=%lld flags=%x", + stream, length, flags); + + virResetLastError(); + + virCheckStreamReturn(stream, -1);
Perhaps some preventative programming:
virCheckNonNegativeArgGoto(length, error);
Although that would mean calling virDispatchError unless there was some sort of virCheck*ArgReturn(length, -1)
Just like flags are checked on the receiving side, I think this should be checked in receiving side too. From arguments sanity POV, this is just like any other API call - the receiving side has to do the sanity check while the sender side merely grabs whatever has been passed and sends to the receiving side. Yes, there are few exceptions to this rule, esp. obvious misuse of APIs, like passing NULL as pointer to a libvirt object, negative size of an passed array and so on. This approach has one great advantage - we can have an older client talking to newer server and still achieve the same functionality as in new-new case. For instance, imagine we make some sense of negative seeks, @length < 0. If we check it just on the daemon, then yay - you can still use older client in order to send negative seek. Michal

This function is basically a counterpart for virStreamSendHole(). If one side of a stream called virStreamSendHole() the other should call virStreamRecvHole() to get the size of the hole. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- include/libvirt/libvirt-stream.h | 4 ++++ src/driver-stream.h | 6 ++++++ src/libvirt-stream.c | 44 ++++++++++++++++++++++++++++++++++++++++ src/libvirt_public.syms | 1 + 4 files changed, 55 insertions(+) diff --git a/include/libvirt/libvirt-stream.h b/include/libvirt/libvirt-stream.h index 14c9af142..feaa8ad64 100644 --- a/include/libvirt/libvirt-stream.h +++ b/include/libvirt/libvirt-stream.h @@ -54,6 +54,10 @@ int virStreamSendHole(virStreamPtr st, long long length, unsigned int flags); +int virStreamRecvHole(virStreamPtr, + long long *length, + unsigned int flags); + /** * virStreamSourceFunc: diff --git a/src/driver-stream.h b/src/driver-stream.h index 0a5201431..0fb56ebd2 100644 --- a/src/driver-stream.h +++ b/src/driver-stream.h @@ -46,6 +46,11 @@ typedef int long long length, unsigned int flags); +typedef int +(*virDrvStreamRecvHole)(virStreamPtr st, + long long *length, + unsigned int flags); + typedef int (*virDrvStreamEventAddCallback)(virStreamPtr stream, int events, @@ -74,6 +79,7 @@ struct _virStreamDriver { virDrvStreamRecv streamRecv; virDrvStreamRecvFlags streamRecvFlags; virDrvStreamSendHole streamSendHole; + virDrvStreamRecvHole streamRecvHole; virDrvStreamEventAddCallback streamEventAddCallback; virDrvStreamEventUpdateCallback streamEventUpdateCallback; virDrvStreamEventRemoveCallback streamEventRemoveCallback; diff --git a/src/libvirt-stream.c b/src/libvirt-stream.c index a09896dcd..dc0dc9ea3 100644 --- a/src/libvirt-stream.c +++ b/src/libvirt-stream.c @@ -405,6 +405,50 @@ virStreamSendHole(virStreamPtr stream, } +/** + * virStreamRecvHole: + * @stream: pointer to the stream object + * @length: number of bytes to skip + * @flags: extra flags; not used yet, so callers should always pass 0 + * + * This API is used to determine the @length in bytes of the + * empty space to be created in a @stream's target file when + * uploading or downloading sparsely populated files. This is the + * counterpart to virStreamSendHole(). + * + * Returns 0 on success, + * -1 on error or when there's currently no hole in the stream + */ +int +virStreamRecvHole(virStreamPtr stream, + long long *length, + unsigned int flags) +{ + VIR_DEBUG("stream=%p, length=%p flags=%x", + stream, length, flags); + + virResetLastError(); + + virCheckStreamReturn(stream, -1); + virCheckNonNullArgReturn(length, -1); + + if (stream->driver && + stream->driver->streamRecvHole) { + int ret; + ret = (stream->driver->streamRecvHole)(stream, length, flags); + if (ret < 0) + goto error; + return ret; + } + + virReportUnsupportedError(); + + error: + virDispatchError(stream->conn); + return -1; +} + + /** * virStreamSendAll: * @stream: pointer to the stream object diff --git a/src/libvirt_public.syms b/src/libvirt_public.syms index 3be7cc6a0..b73cc8af1 100644 --- a/src/libvirt_public.syms +++ b/src/libvirt_public.syms @@ -762,6 +762,7 @@ LIBVIRT_3.1.0 { LIBVIRT_3.4.0 { global: virStreamRecvFlags; + virStreamRecvHole; virStreamSendHole; } LIBVIRT_3.1.0; -- 2.13.0

On 05/16/2017 10:03 AM, Michal Privoznik wrote:
This function is basically a counterpart for virStreamSendHole(). If one side of a stream called virStreamSendHole() the other should call virStreamRecvHole() to get the size of the hole.
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- include/libvirt/libvirt-stream.h | 4 ++++ src/driver-stream.h | 6 ++++++ src/libvirt-stream.c | 44 ++++++++++++++++++++++++++++++++++++++++ src/libvirt_public.syms | 1 + 4 files changed, 55 insertions(+)
Reviewed-by: John Ferlan <jferlan@redhat.com> John

Add a new flag to virStreamRecvFlags in order to handle being able to stop reading from the stream so that the consumer can generate a "hole" in stream target. Generation of a hole replaces the need to receive and handle a sequence of zero bytes for sparse stream targets. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- include/libvirt/libvirt-stream.h | 4 ++++ src/libvirt-stream.c | 34 +++++++++++++++++++++++++++++++++- 2 files changed, 37 insertions(+), 1 deletion(-) diff --git a/include/libvirt/libvirt-stream.h b/include/libvirt/libvirt-stream.h index feaa8ad64..c4baaf7a3 100644 --- a/include/libvirt/libvirt-stream.h +++ b/include/libvirt/libvirt-stream.h @@ -45,6 +45,10 @@ int virStreamRecv(virStreamPtr st, char *data, size_t nbytes); +typedef enum { + VIR_STREAM_RECV_STOP_AT_HOLE = (1 << 0), +} virStreamRecvFlagsValues; + int virStreamRecvFlags(virStreamPtr st, char *data, size_t nbytes, diff --git a/src/libvirt-stream.c b/src/libvirt-stream.c index dc0dc9ea3..bedb6159a 100644 --- a/src/libvirt-stream.c +++ b/src/libvirt-stream.c @@ -290,7 +290,7 @@ virStreamRecv(virStreamPtr stream, * @stream: pointer to the stream object * @data: buffer to read into from stream * @nbytes: size of @data buffer - * @flags: extra flags; not used yet, so callers should always pass 0 + * @flags: bitwise-OR of virStreamRecvFlagsValues * * Reads a series of bytes from the stream. This method may * block the calling application for an arbitrary amount @@ -300,6 +300,33 @@ virStreamRecv(virStreamPtr stream, * @flags. Calling this function with no @flags set (equal to * zero) is equivalent to calling virStreamRecv(stream, data, nbytes). * + * If flag VIR_STREAM_RECV_STOP_AT_HOLE is set, this function + * will stop reading from stream if it has reached a hole. In + * that case, -3 is returned and virStreamRecvHole() should be + * called to get the hole size. An example using this flag might + * look like this: + * + * while (1) { + * char buf[4096]; + * + * int ret = virStreamRecvFlags(st, buf, len, VIR_STREAM_STOP_AT_HOLE); + * if (ret < 0) { + * if (ret == -3) { + * long long len; + * ret = virStreamRecvHole(st, &len, 0); + * if (ret < 0) { + * ...error.. + * } else { + * ...seek len bytes in target... + * } + * } else { + * return -1; + * } + * } else { + * ...write buf to target... + * } + * } + * * Returns 0 when the end of the stream is reached, at * which time the caller should invoke virStreamFinish() * to get confirmation of stream completion. @@ -310,6 +337,9 @@ virStreamRecv(virStreamPtr stream, * * Returns -2 if there is no data pending to be read & the * stream is marked as non-blocking. + * + * Returns -3 if there is a hole in stream and caller requested + * to stop at a hole. */ int virStreamRecvFlags(virStreamPtr stream, @@ -331,6 +361,8 @@ virStreamRecvFlags(virStreamPtr stream, ret = (stream->driver->streamRecvFlags)(stream, data, nbytes, flags); if (ret == -2) return -2; + if (ret == -3) + return -3; if (ret < 0) goto error; return ret; -- 2.13.0

On 05/16/2017 10:03 AM, Michal Privoznik wrote:
Add a new flag to virStreamRecvFlags in order to handle being able to stop reading from the stream so that the consumer can generate a "hole" in stream target. Generation of a hole replaces the need to receive and handle a sequence of zero bytes for sparse stream targets.
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- include/libvirt/libvirt-stream.h | 4 ++++ src/libvirt-stream.c | 34 +++++++++++++++++++++++++++++++++- 2 files changed, 37 insertions(+), 1 deletion(-)
diff --git a/include/libvirt/libvirt-stream.h b/include/libvirt/libvirt-stream.h index feaa8ad64..c4baaf7a3 100644 --- a/include/libvirt/libvirt-stream.h +++ b/include/libvirt/libvirt-stream.h @@ -45,6 +45,10 @@ int virStreamRecv(virStreamPtr st, char *data, size_t nbytes);
+typedef enum { + VIR_STREAM_RECV_STOP_AT_HOLE = (1 << 0), +} virStreamRecvFlagsValues; + int virStreamRecvFlags(virStreamPtr st, char *data, size_t nbytes, diff --git a/src/libvirt-stream.c b/src/libvirt-stream.c index dc0dc9ea3..bedb6159a 100644 --- a/src/libvirt-stream.c +++ b/src/libvirt-stream.c @@ -290,7 +290,7 @@ virStreamRecv(virStreamPtr stream, * @stream: pointer to the stream object * @data: buffer to read into from stream * @nbytes: size of @data buffer - * @flags: extra flags; not used yet, so callers should always pass 0 + * @flags: bitwise-OR of virStreamRecvFlagsValues * * Reads a series of bytes from the stream. This method may * block the calling application for an arbitrary amount @@ -300,6 +300,33 @@ virStreamRecv(virStreamPtr stream, * @flags. Calling this function with no @flags set (equal to * zero) is equivalent to calling virStreamRecv(stream, data, nbytes). * + * If flag VIR_STREAM_RECV_STOP_AT_HOLE is set, this function + * will stop reading from stream if it has reached a hole. In + * that case, -3 is returned and virStreamRecvHole() should be + * called to get the hole size. An example using this flag might + * look like this: + * + * while (1) { + * char buf[4096]; + * + * int ret = virStreamRecvFlags(st, buf, len, VIR_STREAM_STOP_AT_HOLE); + * if (ret < 0) { + * if (ret == -3) { + * long long len; + * ret = virStreamRecvHole(st, &len, 0); + * if (ret < 0) { + * ...error.. + * } else { + * ...seek len bytes in target...
Do/should we care to point out that @len is a "long long" and seeking is done in chunks of "off_t"? Reviewed-by: John Ferlan <jferlan@redhat.com> John [...]

On 05/16/2017 11:22 PM, John Ferlan wrote:
On 05/16/2017 10:03 AM, Michal Privoznik wrote:
Add a new flag to virStreamRecvFlags in order to handle being able to stop reading from the stream so that the consumer can generate a "hole" in stream target. Generation of a hole replaces the need to receive and handle a sequence of zero bytes for sparse stream targets.
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- include/libvirt/libvirt-stream.h | 4 ++++ src/libvirt-stream.c | 34 +++++++++++++++++++++++++++++++++- 2 files changed, 37 insertions(+), 1 deletion(-)
diff --git a/include/libvirt/libvirt-stream.h b/include/libvirt/libvirt-stream.h index feaa8ad64..c4baaf7a3 100644 --- a/include/libvirt/libvirt-stream.h +++ b/include/libvirt/libvirt-stream.h @@ -45,6 +45,10 @@ int virStreamRecv(virStreamPtr st, char *data, size_t nbytes);
+typedef enum { + VIR_STREAM_RECV_STOP_AT_HOLE = (1 << 0), +} virStreamRecvFlagsValues; + int virStreamRecvFlags(virStreamPtr st, char *data, size_t nbytes, diff --git a/src/libvirt-stream.c b/src/libvirt-stream.c index dc0dc9ea3..bedb6159a 100644 --- a/src/libvirt-stream.c +++ b/src/libvirt-stream.c @@ -290,7 +290,7 @@ virStreamRecv(virStreamPtr stream, * @stream: pointer to the stream object * @data: buffer to read into from stream * @nbytes: size of @data buffer - * @flags: extra flags; not used yet, so callers should always pass 0 + * @flags: bitwise-OR of virStreamRecvFlagsValues * * Reads a series of bytes from the stream. This method may * block the calling application for an arbitrary amount @@ -300,6 +300,33 @@ virStreamRecv(virStreamPtr stream, * @flags. Calling this function with no @flags set (equal to * zero) is equivalent to calling virStreamRecv(stream, data, nbytes). * + * If flag VIR_STREAM_RECV_STOP_AT_HOLE is set, this function + * will stop reading from stream if it has reached a hole. In + * that case, -3 is returned and virStreamRecvHole() should be + * called to get the hole size. An example using this flag might + * look like this: + * + * while (1) { + * char buf[4096]; + * + * int ret = virStreamRecvFlags(st, buf, len, VIR_STREAM_STOP_AT_HOLE); + * if (ret < 0) { + * if (ret == -3) { + * long long len; + * ret = virStreamRecvHole(st, &len, 0); + * if (ret < 0) { + * ...error.. + * } else { + * ...seek len bytes in target...
Do/should we care to point out that @len is a "long long" and seeking is done in chunks of "off_t"?
Not really. This is just an example - anybody can understand that. Michal

This is just a wrapper over new functions that have been just introduced: virStreamRecvFlags(), virStreamRecvHole(). It's very similar to virStreamRecvAll() except it handles sparse streams well. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- include/libvirt/libvirt-stream.h | 33 ++++++++++- src/libvirt-stream.c | 123 +++++++++++++++++++++++++++++++++++++++ src/libvirt_public.syms | 1 + 3 files changed, 154 insertions(+), 3 deletions(-) diff --git a/include/libvirt/libvirt-stream.h b/include/libvirt/libvirt-stream.h index c4baaf7a3..a5e69a1c1 100644 --- a/include/libvirt/libvirt-stream.h +++ b/include/libvirt/libvirt-stream.h @@ -104,9 +104,9 @@ int virStreamSendAll(virStreamPtr st, * @nbytes: size of the data array * @opaque: optional application provided data * - * The virStreamSinkFunc callback is used together - * with the virStreamRecvAll function for libvirt to - * provide the data that has been received. + * The virStreamSinkFunc callback is used together with the + * virStreamRecvAll or virStreamSparseRecvAll functions for + * libvirt to provide the data that has been received. * * The callback will be invoked multiple times, * providing data in small chunks. The application @@ -129,6 +129,33 @@ int virStreamRecvAll(virStreamPtr st, virStreamSinkFunc handler, void *opaque); +/** + * virStreamSinkHoleFunc: + * @st: the stream object + * @length: stream hole size + * @opaque: optional application provided data + * + * This callback is used together with the virStreamSparseRecvAll + * function for libvirt to provide the size of a hole that + * occurred in the stream. + * + * The callback may be invoked multiple times as holes are found + * during processing a stream. The application should create the + * hole in the stream target and then return. A return value of + * -1 at any time will abort the receive operation. + * + * Returns 0 on success, + * -1 upon error + */ +typedef int (*virStreamSinkHoleFunc)(virStreamPtr st, + long long length, + void *opaque); + +int virStreamSparseRecvAll(virStreamPtr stream, + virStreamSinkFunc handler, + virStreamSinkHoleFunc holeHandler, + void *opaque); + typedef enum { VIR_STREAM_EVENT_READABLE = (1 << 0), VIR_STREAM_EVENT_WRITABLE = (1 << 1), diff --git a/src/libvirt-stream.c b/src/libvirt-stream.c index bedb6159a..6bf4c4f29 100644 --- a/src/libvirt-stream.c +++ b/src/libvirt-stream.c @@ -668,6 +668,129 @@ virStreamRecvAll(virStreamPtr stream, } +/** + * virStreamSparseRecvAll: + * @stream: pointer to the stream object + * @handler: sink callback for writing data to application + * @holeHandler: stream hole callback for skipping holes + * @opaque: application defined data + * + * Receive the entire data stream, sending the data to the + * requested data sink @handler and calling the skip @holeHandler + * to generate holes for sparse stream targets. This is simply a + * convenient alternative to virStreamRecvFlags, for apps that do + * blocking-I/O. + * + * An example using this with a hypothetical file download + * API looks like: + * + * int mysink(virStreamPtr st, const char *buf, int nbytes, void *opaque) { + * int *fd = opaque; + * + * return write(*fd, buf, nbytes); + * } + * + * int myskip(virStreamPtr st, long long offset, void *opaque) { + * int *fd = opaque; + * + * return lseek(*fd, offset, SEEK_CUR) == (off_t) -1 ? -1 : 0; + * } + * + * virStreamPtr st = virStreamNew(conn, 0); + * int fd = open("demo.iso", O_WRONLY); + * + * virConnectDownloadSparseFile(conn, st); + * if (virStreamSparseRecvAll(st, mysink, myskip, &fd) < 0) { + * ...report an error ... + * goto done; + * } + * if (virStreamFinish(st) < 0) + * ...report an error... + * virStreamFree(st); + * close(fd); + * + * Note that @opaque data is shared between both @handler and + * @holeHandler callbacks. + * + * Returns 0 if all the data was successfully received. The caller + * should invoke virStreamFinish(st) to flush the stream upon + * success and then virStreamFree(st). + * + * Returns -1 upon any error, with virStreamAbort() already + * having been called, so the caller need only call virStreamFree(). + */ +int +virStreamSparseRecvAll(virStreamPtr stream, + virStreamSinkFunc handler, + virStreamSinkHoleFunc holeHandler, + void *opaque) +{ + char *bytes = NULL; + size_t want = VIR_NET_MESSAGE_LEGACY_PAYLOAD_MAX; + const unsigned int flags = VIR_STREAM_RECV_STOP_AT_HOLE; + int ret = -1; + + VIR_DEBUG("stream=%p handler=%p holeHandler=%p opaque=%p", + stream, handler, holeHandler, opaque); + + virResetLastError(); + + virCheckStreamReturn(stream, -1); + virCheckNonNullArgGoto(handler, cleanup); + virCheckNonNullArgGoto(holeHandler, cleanup); + + if (stream->flags & VIR_STREAM_NONBLOCK) { + virReportError(VIR_ERR_OPERATION_INVALID, "%s", + _("data sinks cannot be used for non-blocking streams")); + goto cleanup; + } + + if (VIR_ALLOC_N(bytes, want) < 0) + goto cleanup; + + for (;;) { + int got, offset = 0; + long long holeLen; + const unsigned int holeFlags = 0; + + got = virStreamRecvFlags(stream, bytes, want, flags); + if (got == -3) { + if (virStreamRecvHole(stream, &holeLen, holeFlags) < 0) { + virStreamAbort(stream); + goto cleanup; + } + + if (holeHandler(stream, holeLen, opaque) < 0) { + virStreamAbort(stream); + goto cleanup; + } + continue; + } else if (got < 0) { + goto cleanup; + } else if (got == 0) { + break; + } + while (offset < got) { + int done; + done = (handler)(stream, bytes + offset, got - offset, opaque); + if (done < 0) { + virStreamAbort(stream); + goto cleanup; + } + offset += done; + } + } + ret = 0; + + cleanup: + VIR_FREE(bytes); + + if (ret != 0) + virDispatchError(stream->conn); + + return ret; +} + /** * virStreamEventAddCallback: * @stream: pointer to the stream object diff --git a/src/libvirt_public.syms b/src/libvirt_public.syms index b73cc8af1..37fc4e224 100644 --- a/src/libvirt_public.syms +++ b/src/libvirt_public.syms @@ -764,6 +764,7 @@ LIBVIRT_3.4.0 { virStreamRecvFlags; virStreamRecvHole; virStreamSendHole; + virStreamSparseRecvAll; } LIBVIRT_3.1.0; # .... define new API here using predicted next version number .... -- 2.13.0

This is just a wrapper over new function that have been just introduced: virStreamSendHole() . It's very similar to virStreamSendAll() except it handles sparse streams well. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- include/libvirt/libvirt-stream.h | 65 +++++++++++++++- src/libvirt-stream.c | 159 +++++++++++++++++++++++++++++++++++++++ src/libvirt_public.syms | 1 + 3 files changed, 222 insertions(+), 3 deletions(-) diff --git a/include/libvirt/libvirt-stream.h b/include/libvirt/libvirt-stream.h index a5e69a1c1..d18d43140 100644 --- a/include/libvirt/libvirt-stream.h +++ b/include/libvirt/libvirt-stream.h @@ -71,9 +71,9 @@ int virStreamRecvHole(virStreamPtr, * @nbytes: size of the data array * @opaque: optional application provided data * - * The virStreamSourceFunc callback is used together - * with the virStreamSendAll function for libvirt to - * obtain the data that is to be sent. + * The virStreamSourceFunc callback is used together with + * the virStreamSendAll and virStreamSparseSendAll functions + * for libvirt to obtain the data that is to be sent. * * The callback will be invoked multiple times, * fetching data in small chunks. The application @@ -96,6 +96,65 @@ int virStreamSendAll(virStreamPtr st, virStreamSourceFunc handler, void *opaque); +/** + * virStreamSourceHoleFunc: + * @st: the stream object + * @inData: are we in data section + * @length: how long is the section we are currently in + * @opaque: optional application provided data + * + * The virStreamSourceHoleFunc callback is used together with the + * virStreamSparseSendAll function for libvirt to obtain the + * length of section stream is currently in. + * + * Moreover, upon successful return, @length should be updated + * with how many bytes are left until the current section ends + * (either data section or hole section). Also the stream is + * currently in data section, @inData should be set to a non-zero + * value and vice versa. + * + * NB: there's an implicit hole at the end of each file. If + * that's the case, @inData and @length should be both set to 0. + * + * This function should not adjust the current position within + * the file. + * + * Returns 0 on success, + * -1 upon error + */ +typedef int (*virStreamSourceHoleFunc)(virStreamPtr st, + int *inData, + long long *length, + void *opaque); + +/** + * virStreamSourceSkipFunc: + * @st: the stream object + * @length: stream hole size + * @opaque: optional application provided data + * + * This callback is used together with the virStreamSparseSendAll + * to skip holes in the underlying file as reported by + * virStreamSourceHoleFunc. + * + * The callback may be invoked multiple times as holes are found + * during processing a stream. The application should skip + * processing the hole in the stream source and then return. + * A return value of -1 at any time will abort the send operation. + * + * Returns 0 on success, + * -1 upon error. + */ +typedef int (*virStreamSourceSkipFunc)(virStreamPtr st, + long long length, + void *opaque); + +int virStreamSparseSendAll(virStreamPtr st, + virStreamSourceFunc handler, + virStreamSourceHoleFunc holeHandler, + virStreamSourceSkipFunc skipHandler, + void *opaque); + /** * virStreamSinkFunc: * diff --git a/src/libvirt-stream.c b/src/libvirt-stream.c index 6bf4c4f29..4cbe5eee1 100644 --- a/src/libvirt-stream.c +++ b/src/libvirt-stream.c @@ -574,6 +574,165 @@ virStreamSendAll(virStreamPtr stream, } +/** + * virStreamSparseSendAll: + * @stream: pointer to the stream object + * @handler: source callback for reading data from application + * @holeHandler: source callback for determining holes + * @skipHandler: skip holes as reported by @holeHandler + * @opaque: application defined data + * + * Send the entire data stream, reading the data from the + * requested data source. This is simply a convenient alternative + * to virStreamSend, for apps that do blocking-I/O. + * + * An example using this with a hypothetical file upload + * API looks like + * + * int mysource(virStreamPtr st, char *buf, int nbytes, void *opaque) { + * int *fd = opaque; + * + * return read(*fd, buf, nbytes); + * } + * + * int myskip(virStreamPtr st, long long offset, void *opaque) { + * int *fd = opaque; + * + * return lseek(*fd, offset, SEEK_CUR) == (off_t) -1 ? -1 : 0; + * } + * + * int myindata(virStreamPtr st, int *inData, + * long long *offset, void *opaque) { + * int *fd = opaque; + * + * if (@fd in hole) { + * *inData = 0; + * *offset = holeSize; + * } else { + * *inData = 1; + * *offset = dataSize; + * } + * + * return 0; + * } + * + * virStreamPtr st = virStreamNew(conn, 0); + * int fd = open("demo.iso", O_RDONLY); + * + * virConnectUploadFile(conn, st); + * if (virStreamSparseSendAll(st, + * mysource, + * myindata, + * myskip, + * &fd) < 0) { + * ...report an error ... + * goto done; + * } + * if (virStreamFinish(st) < 0) + * ...report an error... + * virStreamFree(st); + * close(fd); + * + * Note that @opaque data are shared between @handler, @holeHandler and @skipHandler. + * + * Returns 0 if all the data was successfully sent. The caller + * should invoke virStreamFinish(st) to flush the stream upon + * success and then virStreamFree. + * + * Returns -1 upon any error, with virStreamAbort() already + * having been called, so the caller need only call + * virStreamFree(). + */ +int virStreamSparseSendAll(virStreamPtr stream, + virStreamSourceFunc handler, + virStreamSourceHoleFunc holeHandler, + virStreamSourceSkipFunc skipHandler, + void *opaque) +{ + char *bytes = NULL; + size_t want = VIR_NET_MESSAGE_LEGACY_PAYLOAD_MAX; + int ret = -1; + unsigned long long dataLen = 0; + + VIR_DEBUG("stream=%p handler=%p holeHandler=%p opaque=%p", + stream, handler, holeHandler, opaque); + + virResetLastError(); + + virCheckStreamReturn(stream, -1); + virCheckNonNullArgGoto(handler, cleanup); + virCheckNonNullArgGoto(holeHandler, cleanup); + virCheckNonNullArgGoto(skipHandler, cleanup); + + if (stream->flags & VIR_STREAM_NONBLOCK) { + virReportError(VIR_ERR_OPERATION_INVALID, "%s", + _("data sources cannot be used for non-blocking streams")); + goto cleanup; + } + + if (VIR_ALLOC_N(bytes, want) < 0) + goto cleanup; + + for (;;) { + int inData, got, offset = 0; + long long sectionLen; + const unsigned int skipFlags = 0; + + if (!dataLen) { + if (holeHandler(stream, &inData, §ionLen, opaque) < 0) { + virStreamAbort(stream); + goto cleanup; + } + + if (!inData && sectionLen) { + if (virStreamSendHole(stream, sectionLen, skipFlags) < 0) { + virStreamAbort(stream); + goto cleanup; + } + + if (skipHandler(stream, sectionLen, opaque) < 0) { + virReportSystemError(errno, "%s", + _("unable to skip hole")); + virStreamAbort(stream); + goto cleanup; + } + continue; + } else { + dataLen = sectionLen; + } + } + + if (want > dataLen) + want = dataLen; + + got = (handler)(stream, bytes, want, opaque); + if (got < 0) { + virStreamAbort(stream); + goto cleanup; + } + if (got == 0) + break; + while (offset < got) { + int done; + done = virStreamSend(stream, bytes + offset, got - offset); + if (done < 0) + goto cleanup; + offset += done; + dataLen -= done; + } + } + ret = 0; + + cleanup: + VIR_FREE(bytes); + + if (ret != 0) + virDispatchError(stream->conn); + + return ret; +} + + /** * virStreamRecvAll: * @stream: pointer to the stream object diff --git a/src/libvirt_public.syms b/src/libvirt_public.syms index 37fc4e224..fac77fbea 100644 --- a/src/libvirt_public.syms +++ b/src/libvirt_public.syms @@ -765,6 +765,7 @@ LIBVIRT_3.4.0 { virStreamRecvHole; virStreamSendHole; virStreamSparseRecvAll; + virStreamSparseSendAll; } LIBVIRT_3.1.0; # .... define new API here using predicted next version number .... -- 2.13.0

On 05/16/2017 10:03 AM, Michal Privoznik wrote:
This is just a wrapper over new function that have been just introduced: virStreamSendHole() . It's very similar to virStreamSendAll() except it handles sparse streams well.
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- include/libvirt/libvirt-stream.h | 65 +++++++++++++++- src/libvirt-stream.c | 159 +++++++++++++++++++++++++++++++++++++++ src/libvirt_public.syms | 1 + 3 files changed, 222 insertions(+), 3 deletions(-)
[...]
diff --git a/src/libvirt-stream.c b/src/libvirt-stream.c index 6bf4c4f29..4cbe5eee1 100644 --- a/src/libvirt-stream.c +++ b/src/libvirt-stream.c @@ -574,6 +574,165 @@ virStreamSendAll(virStreamPtr stream, }
+/** + * virStreamSparseSendAll: + * @stream: pointer to the stream object + * @handler: source callback for reading data from application + * @holeHandler: source callback for determining holes + * @skipHandler: skip holes as reported by @holeHandler + * @opaque: application defined data + * + * Send the entire data stream, reading the data from the + * requested data source. This is simply a convenient alternative + * to virStreamSend, for apps that do blocking-I/O. + * + * An example using this with a hypothetical file upload + * API looks like + * + * int mysource(virStreamPtr st, char *buf, int nbytes, void *opaque) { + * int *fd = opaque; + * + * return read(*fd, buf, nbytes); + * } + * + * int myskip(virStreamPtr st, long long offset, void *opaque) { + * int *fd = opaque; + * + * return lseek(*fd, offset, SEEK_CUR) == (off_t) -1 ? -1 : 0; + * }
Similar notations here regarding "long long" values that are being used in/for lseek() which expects "off_t"...
+ * + * int myindata(virStreamPtr st, int *inData, + * long long *offset, void *opaque) { + * int *fd = opaque; + * + * if (@fd in hole) { + * *inData = 0; + * *offset = holeSize; + * } else { + * *inData = 1; + * *offset = dataSize; + * } + * + * return 0; + * } + * + * virStreamPtr st = virStreamNew(conn, 0); + * int fd = open("demo.iso", O_RDONLY); + * + * virConnectUploadFile(conn, st);
^^ This doesn't exist either and is a straight copy from virStreamSendAll... Should it also have the "Sparse" though? Reviewed-by: John Ferlan <jferlan@redhat.com> John

This is just an internal API, that calls corresponding function in stream driver. This function will set @data=1 if the underlying file is in data section, or @data=0 if it is in a hole. At any rate, @length is set to number of bytes remaining in the section the file currently is. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/driver-stream.h | 6 ++++++ src/libvirt-stream.c | 48 ++++++++++++++++++++++++++++++++++++++++++++++++ src/libvirt_internal.h | 4 ++++ src/libvirt_private.syms | 1 + 4 files changed, 59 insertions(+) diff --git a/src/driver-stream.h b/src/driver-stream.h index 0fb56ebd2..f207bf0eb 100644 --- a/src/driver-stream.h +++ b/src/driver-stream.h @@ -51,6 +51,11 @@ typedef int long long *length, unsigned int flags); +typedef int +(*virDrvStreamInData)(virStreamPtr st, + int *data, + long long *length); + typedef int (*virDrvStreamEventAddCallback)(virStreamPtr stream, int events, @@ -80,6 +85,7 @@ struct _virStreamDriver { virDrvStreamRecvFlags streamRecvFlags; virDrvStreamSendHole streamSendHole; virDrvStreamRecvHole streamRecvHole; + virDrvStreamInData streamInData; virDrvStreamEventAddCallback streamEventAddCallback; virDrvStreamEventUpdateCallback streamEventUpdateCallback; virDrvStreamEventRemoveCallback streamEventRemoveCallback; diff --git a/src/libvirt-stream.c b/src/libvirt-stream.c index 4cbe5eee1..30c305035 100644 --- a/src/libvirt-stream.c +++ b/src/libvirt-stream.c @@ -481,6 +481,54 @@ virStreamRecvHole(virStreamPtr stream, } +/** + * virStreamInData: + * @stream: stream + * @data: are we in data or hole + * @length: length to next section + * + * This function checks the underlying stream (typically a file) + * to learn whether the current stream position lies within a + * data section or a hole. Upon return @data is set to a nonzero + * value if former is the case, or to zero if @stream is in a + * hole. Moreover, @length is updated to tell caller how many + * bytes can be read from @stream until current section changes + * (from data to a hole or vice versa). + * + * NB: there's an implicit hole at EOF. In this situation this + * function should set @data = false, @length = 0 and return 0. + * + * To sum it up: + * + * data section: @data = true, @length > 0 + * hole: @data = false, @length > 0 + * EOF: @data = false, @length = 0 + * + * Returns 0 on success, + * -1 otherwise + */ +int +virStreamInData(virStreamPtr stream, + int *data, + long long *length) +{ + VIR_DEBUG("stream=%p, data=%p, length=%p", stream, data, length); + + /* No checks of arguments or error resetting. This is an + * internal function that just happen to live next to some + * public functions of ours. */ + + if (stream->driver->streamInData) { + int ret; + ret = (stream->driver->streamInData)(stream, data, length); + return ret; + } + + virReportUnsupportedError(); + return -1; +} + + /** * virStreamSendAll: * @stream: pointer to the stream object diff --git a/src/libvirt_internal.h b/src/libvirt_internal.h index 96439d840..62f490a7d 100644 --- a/src/libvirt_internal.h +++ b/src/libvirt_internal.h @@ -294,4 +294,8 @@ virTypedParameterValidateSet(virConnectPtr conn, virTypedParameterPtr params, int nparams); +int virStreamInData(virStreamPtr stream, + int *data, + long long *length); + #endif diff --git a/src/libvirt_private.syms b/src/libvirt_private.syms index 4102a002b..a1447eb44 100644 --- a/src/libvirt_private.syms +++ b/src/libvirt_private.syms @@ -1124,6 +1124,7 @@ virStateCleanup; virStateInitialize; virStateReload; virStateStop; +virStreamInData; # locking/domain_lock.h -- 2.13.0

On 05/16/2017 10:03 AM, Michal Privoznik wrote:
This is just an internal API, that calls corresponding function in stream driver. This function will set @data=1 if the underlying file is in data section, or @data=0 if it is in a hole. At any rate, @length is set to number of bytes remaining in the section the file currently is.
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/driver-stream.h | 6 ++++++ src/libvirt-stream.c | 48 ++++++++++++++++++++++++++++++++++++++++++++++++ src/libvirt_internal.h | 4 ++++ src/libvirt_private.syms | 1 + 4 files changed, 59 insertions(+)
[...]
diff --git a/src/libvirt-stream.c b/src/libvirt-stream.c index 4cbe5eee1..30c305035 100644 --- a/src/libvirt-stream.c +++ b/src/libvirt-stream.c @@ -481,6 +481,54 @@ virStreamRecvHole(virStreamPtr stream, }
+/** + * virStreamInData: + * @stream: stream + * @data: are we in data or hole + * @length: length to next section + * + * This function checks the underlying stream (typically a file) + * to learn whether the current stream position lies within a + * data section or a hole. Upon return @data is set to a nonzero + * value if former is the case, or to zero if @stream is in a + * hole. Moreover, @length is updated to tell caller how many + * bytes can be read from @stream until current section changes + * (from data to a hole or vice versa). + * + * NB: there's an implicit hole at EOF. In this situation this + * function should set @data = false, @length = 0 and return 0. + * + * To sum it up: + * + * data section: @data = true, @length > 0 + * hole: @data = false, @length > 0 + * EOF: @data = false, @length = 0 + * + * Returns 0 on success, + * -1 otherwise + */ +int +virStreamInData(virStreamPtr stream, + int *data, + long long *length) +{ + VIR_DEBUG("stream=%p, data=%p, length=%p", stream, data, length); + + /* No checks of arguments or error resetting. This is an + * internal function that just happen to live next to some + * public functions of ours. */
Well... If it's publicly accessible... @data and @length should probably be checked for non NULL. I looked at a couple of other "libvirt_internal.h" functions and they have some parameter checking. I'll put the R-b on anyway, I would suggest adding parameter checks for non null values though. It'd be stupid user death fairly quickly otherwise... Reviewed-by: John Ferlan <jferlan@redhat.com> John [have to stop for the evening, will pick this up again tomorrow] [...]

On 05/17/2017 12:00 AM, John Ferlan wrote:
On 05/16/2017 10:03 AM, Michal Privoznik wrote:
This is just an internal API, that calls corresponding function in stream driver. This function will set @data=1 if the underlying file is in data section, or @data=0 if it is in a hole. At any rate, @length is set to number of bytes remaining in the section the file currently is.
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/driver-stream.h | 6 ++++++ src/libvirt-stream.c | 48 ++++++++++++++++++++++++++++++++++++++++++++++++ src/libvirt_internal.h | 4 ++++ src/libvirt_private.syms | 1 + 4 files changed, 59 insertions(+)
[...]
diff --git a/src/libvirt-stream.c b/src/libvirt-stream.c index 4cbe5eee1..30c305035 100644 --- a/src/libvirt-stream.c +++ b/src/libvirt-stream.c @@ -481,6 +481,54 @@ virStreamRecvHole(virStreamPtr stream, }
+/** + * virStreamInData: + * @stream: stream + * @data: are we in data or hole + * @length: length to next section + * + * This function checks the underlying stream (typically a file) + * to learn whether the current stream position lies within a + * data section or a hole. Upon return @data is set to a nonzero + * value if former is the case, or to zero if @stream is in a + * hole. Moreover, @length is updated to tell caller how many + * bytes can be read from @stream until current section changes + * (from data to a hole or vice versa). + * + * NB: there's an implicit hole at EOF. In this situation this + * function should set @data = false, @length = 0 and return 0. + * + * To sum it up: + * + * data section: @data = true, @length > 0 + * hole: @data = false, @length > 0 + * EOF: @data = false, @length = 0 + * + * Returns 0 on success, + * -1 otherwise + */ +int +virStreamInData(virStreamPtr stream, + int *data, + long long *length) +{ + VIR_DEBUG("stream=%p, data=%p, length=%p", stream, data, length); + + /* No checks of arguments or error resetting. This is an + * internal function that just happen to live next to some + * public functions of ours. */
Well... If it's publicly accessible... @data and @length should probably be checked for non NULL.
It's not publicly accessible. It's private. But sure, checks would be harmless. I can add them. Michal

Add a virStreamPtr pointer to the _virNetClientStream in order to reverse track the parent stream. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/remote/remote_driver.c | 6 ++++-- src/rpc/gendispatch.pl | 2 +- src/rpc/virnetclientstream.c | 6 +++++- src/rpc/virnetclientstream.h | 3 ++- 4 files changed, 12 insertions(+), 5 deletions(-) diff --git a/src/remote/remote_driver.c b/src/remote/remote_driver.c index e79e796f2..b152be523 100644 --- a/src/remote/remote_driver.c +++ b/src/remote/remote_driver.c @@ -6170,7 +6170,8 @@ remoteDomainMigratePrepareTunnel3(virConnectPtr dconn, memset(&args, 0, sizeof(args)); memset(&ret, 0, sizeof(ret)); - if (!(netst = virNetClientStreamNew(priv->remoteProgram, + if (!(netst = virNetClientStreamNew(st, + priv->remoteProgram, REMOTE_PROC_DOMAIN_MIGRATE_PREPARE_TUNNEL3, priv->counter))) goto done; @@ -7094,7 +7095,8 @@ remoteDomainMigratePrepareTunnel3Params(virConnectPtr dconn, goto cleanup; } - if (!(netst = virNetClientStreamNew(priv->remoteProgram, + if (!(netst = virNetClientStreamNew(st, + priv->remoteProgram, REMOTE_PROC_DOMAIN_MIGRATE_PREPARE_TUNNEL3_PARAMS, priv->counter))) goto cleanup; diff --git a/src/rpc/gendispatch.pl b/src/rpc/gendispatch.pl index 173189c81..e608812ce 100755 --- a/src/rpc/gendispatch.pl +++ b/src/rpc/gendispatch.pl @@ -1738,7 +1738,7 @@ elsif ($mode eq "client") { if ($call->{streamflag} ne "none") { print "\n"; - print " if (!(netst = virNetClientStreamNew(priv->remoteProgram, $call->{constname}, priv->counter)))\n"; + print " if (!(netst = virNetClientStreamNew(st, priv->remoteProgram, $call->{constname}, priv->counter)))\n"; print " goto done;\n"; print "\n"; print " if (virNetClientAddStream(priv->client, netst) < 0) {\n"; diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index 2105bd0a9..01761cf8d 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -36,6 +36,8 @@ VIR_LOG_INIT("rpc.netclientstream"); struct _virNetClientStream { virObjectLockable parent; + virStreamPtr stream; /* Reverse pointer to parent stream */ + virNetClientProgramPtr prog; int proc; unsigned serial; @@ -133,7 +135,8 @@ virNetClientStreamEventTimer(int timer ATTRIBUTE_UNUSED, void *opaque) } -virNetClientStreamPtr virNetClientStreamNew(virNetClientProgramPtr prog, +virNetClientStreamPtr virNetClientStreamNew(virStreamPtr stream, + virNetClientProgramPtr prog, int proc, unsigned serial) { @@ -145,6 +148,7 @@ virNetClientStreamPtr virNetClientStreamNew(virNetClientProgramPtr prog, if (!(st = virObjectLockableNew(virNetClientStreamClass))) return NULL; + st->stream = stream; st->prog = virObjectRef(prog); st->proc = proc; st->serial = serial; diff --git a/src/rpc/virnetclientstream.h b/src/rpc/virnetclientstream.h index a0d2be9ed..e278dab85 100644 --- a/src/rpc/virnetclientstream.h +++ b/src/rpc/virnetclientstream.h @@ -32,7 +32,8 @@ typedef virNetClientStream *virNetClientStreamPtr; typedef void (*virNetClientStreamEventCallback)(virNetClientStreamPtr stream, int events, void *opaque); -virNetClientStreamPtr virNetClientStreamNew(virNetClientProgramPtr prog, +virNetClientStreamPtr virNetClientStreamNew(virStreamPtr stream, + virNetClientProgramPtr prog, int proc, unsigned serial); -- 2.13.0

On 05/16/2017 10:03 AM, Michal Privoznik wrote:
Add a virStreamPtr pointer to the _virNetClientStream in order to reverse track the parent stream.
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/remote/remote_driver.c | 6 ++++-- src/rpc/gendispatch.pl | 2 +- src/rpc/virnetclientstream.c | 6 +++++- src/rpc/virnetclientstream.h | 3 ++- 4 files changed, 12 insertions(+), 5 deletions(-)
[...]
diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index 2105bd0a9..01761cf8d 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -36,6 +36,8 @@ VIR_LOG_INIT("rpc.netclientstream"); struct _virNetClientStream { virObjectLockable parent;
+ virStreamPtr stream; /* Reverse pointer to parent stream */ + virNetClientProgramPtr prog; int proc; unsigned serial; @@ -133,7 +135,8 @@ virNetClientStreamEventTimer(int timer ATTRIBUTE_UNUSED, void *opaque) }
-virNetClientStreamPtr virNetClientStreamNew(virNetClientProgramPtr prog, +virNetClientStreamPtr virNetClientStreamNew(virStreamPtr stream, + virNetClientProgramPtr prog, int proc, unsigned serial) { @@ -145,6 +148,7 @@ virNetClientStreamPtr virNetClientStreamNew(virNetClientProgramPtr prog, if (!(st = virObjectLockableNew(virNetClientStreamClass))) return NULL;
+ st->stream = stream;
Should this be a virObjectRef(stream); ? Reviewed-by: John Ferlan <jferlan@redhat.com> John [...]

On 05/17/2017 01:13 PM, John Ferlan wrote:
On 05/16/2017 10:03 AM, Michal Privoznik wrote:
Add a virStreamPtr pointer to the _virNetClientStream in order to reverse track the parent stream.
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/remote/remote_driver.c | 6 ++++-- src/rpc/gendispatch.pl | 2 +- src/rpc/virnetclientstream.c | 6 +++++- src/rpc/virnetclientstream.h | 3 ++- 4 files changed, 12 insertions(+), 5 deletions(-)
[...]
diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index 2105bd0a9..01761cf8d 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -36,6 +36,8 @@ VIR_LOG_INIT("rpc.netclientstream"); struct _virNetClientStream { virObjectLockable parent;
+ virStreamPtr stream; /* Reverse pointer to parent stream */ + virNetClientProgramPtr prog; int proc; unsigned serial; @@ -133,7 +135,8 @@ virNetClientStreamEventTimer(int timer ATTRIBUTE_UNUSED, void *opaque) }
-virNetClientStreamPtr virNetClientStreamNew(virNetClientProgramPtr prog, +virNetClientStreamPtr virNetClientStreamNew(virStreamPtr stream, + virNetClientProgramPtr prog, int proc, unsigned serial) { @@ -145,6 +148,7 @@ virNetClientStreamPtr virNetClientStreamNew(virNetClientProgramPtr prog, if (!(st = virObjectLockableNew(virNetClientStreamClass))) return NULL;
+ st->stream = stream;
Should this be a virObjectRef(stream); ?
If so, then virNetClientStreamDispose needs to call unref() to match it. I can do that. Consider done. Michal

Add a new argument to daemonCreateClientStream in order to allow for future expansion to mark that a specific stream can be used to skip data, such as the case with sparsely populated files. The new flag will be the eventual decision point between client/server to decide whether both ends can support and want to use sparse streams. A new bool 'allowSkip' is added to both _virNetClientStream and daemonClientStream in order to perform the tracking. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- daemon/remote.c | 2 +- daemon/stream.c | 6 +++++- daemon/stream.h | 3 ++- src/remote/remote_driver.c | 6 ++++-- src/rpc/gendispatch.pl | 4 ++-- src/rpc/virnetclientstream.c | 6 +++++- src/rpc/virnetclientstream.h | 3 ++- 7 files changed, 21 insertions(+), 9 deletions(-) diff --git a/daemon/remote.c b/daemon/remote.c index 0dbb250ff..fd8542120 100644 --- a/daemon/remote.c +++ b/daemon/remote.c @@ -5323,7 +5323,7 @@ remoteDispatchDomainMigratePrepareTunnel3Params(virNetServerPtr server ATTRIBUTE if (!(st = virStreamNew(priv->conn, VIR_STREAM_NONBLOCK)) || !(stream = daemonCreateClientStream(client, st, remoteProgram, - &msg->header))) + &msg->header, false))) goto cleanup; if (virDomainMigratePrepareTunnel3Params(priv->conn, st, params, nparams, diff --git a/daemon/stream.c b/daemon/stream.c index 11c0a469d..6465463ff 100644 --- a/daemon/stream.c +++ b/daemon/stream.c @@ -52,6 +52,8 @@ struct daemonClientStream { virNetMessagePtr rx; bool tx; + bool allowSkip; + daemonClientStreamPtr next; }; @@ -321,7 +323,8 @@ daemonClientStream * daemonCreateClientStream(virNetServerClientPtr client, virStreamPtr st, virNetServerProgramPtr prog, - virNetMessageHeaderPtr header) + virNetMessageHeaderPtr header, + bool allowSkip) { daemonClientStream *stream; daemonClientPrivatePtr priv = virNetServerClientGetPrivateData(client); @@ -339,6 +342,7 @@ daemonCreateClientStream(virNetServerClientPtr client, stream->serial = header->serial; stream->filterID = -1; stream->st = st; + stream->allowSkip = allowSkip; return stream; } diff --git a/daemon/stream.h b/daemon/stream.h index cf76e717a..e1f106759 100644 --- a/daemon/stream.h +++ b/daemon/stream.h @@ -30,7 +30,8 @@ daemonClientStream * daemonCreateClientStream(virNetServerClientPtr client, virStreamPtr st, virNetServerProgramPtr prog, - virNetMessageHeaderPtr hdr); + virNetMessageHeaderPtr hdr, + bool allowSkip); int daemonFreeClientStream(virNetServerClientPtr client, daemonClientStream *stream); diff --git a/src/remote/remote_driver.c b/src/remote/remote_driver.c index b152be523..aebdd47c9 100644 --- a/src/remote/remote_driver.c +++ b/src/remote/remote_driver.c @@ -6173,7 +6173,8 @@ remoteDomainMigratePrepareTunnel3(virConnectPtr dconn, if (!(netst = virNetClientStreamNew(st, priv->remoteProgram, REMOTE_PROC_DOMAIN_MIGRATE_PREPARE_TUNNEL3, - priv->counter))) + priv->counter, + false))) goto done; if (virNetClientAddStream(priv->client, netst) < 0) { @@ -7098,7 +7099,8 @@ remoteDomainMigratePrepareTunnel3Params(virConnectPtr dconn, if (!(netst = virNetClientStreamNew(st, priv->remoteProgram, REMOTE_PROC_DOMAIN_MIGRATE_PREPARE_TUNNEL3_PARAMS, - priv->counter))) + priv->counter, + false))) goto cleanup; if (virNetClientAddStream(priv->client, netst) < 0) { diff --git a/src/rpc/gendispatch.pl b/src/rpc/gendispatch.pl index e608812ce..98625983a 100755 --- a/src/rpc/gendispatch.pl +++ b/src/rpc/gendispatch.pl @@ -1024,7 +1024,7 @@ elsif ($mode eq "server") { print " if (!(st = virStreamNew(priv->conn, VIR_STREAM_NONBLOCK)))\n"; print " goto cleanup;\n"; print "\n"; - print " if (!(stream = daemonCreateClientStream(client, st, remoteProgram, &msg->header)))\n"; + print " if (!(stream = daemonCreateClientStream(client, st, remoteProgram, &msg->header, false)))\n"; print " goto cleanup;\n"; print "\n"; } @@ -1738,7 +1738,7 @@ elsif ($mode eq "client") { if ($call->{streamflag} ne "none") { print "\n"; - print " if (!(netst = virNetClientStreamNew(st, priv->remoteProgram, $call->{constname}, priv->counter)))\n"; + print " if (!(netst = virNetClientStreamNew(st, priv->remoteProgram, $call->{constname}, priv->counter, false)))\n"; print " goto done;\n"; print "\n"; print " if (virNetClientAddStream(priv->client, netst) < 0) {\n"; diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index 01761cf8d..4c27f308e 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -54,6 +54,8 @@ struct _virNetClientStream { virNetMessagePtr rx; bool incomingEOF; + bool allowSkip; + virNetClientStreamEventCallback cb; void *cbOpaque; virFreeCallback cbFree; @@ -138,7 +140,8 @@ virNetClientStreamEventTimer(int timer ATTRIBUTE_UNUSED, void *opaque) virNetClientStreamPtr virNetClientStreamNew(virStreamPtr stream, virNetClientProgramPtr prog, int proc, - unsigned serial) + unsigned serial, + bool allowSkip) { virNetClientStreamPtr st; @@ -152,6 +155,7 @@ virNetClientStreamPtr virNetClientStreamNew(virStreamPtr stream, st->prog = virObjectRef(prog); st->proc = proc; st->serial = serial; + st->allowSkip = allowSkip; return st; } diff --git a/src/rpc/virnetclientstream.h b/src/rpc/virnetclientstream.h index e278dab85..f3bc0672b 100644 --- a/src/rpc/virnetclientstream.h +++ b/src/rpc/virnetclientstream.h @@ -35,7 +35,8 @@ typedef void (*virNetClientStreamEventCallback)(virNetClientStreamPtr stream, virNetClientStreamPtr virNetClientStreamNew(virStreamPtr stream, virNetClientProgramPtr prog, int proc, - unsigned serial); + unsigned serial, + bool allowSkip); bool virNetClientStreamRaiseError(virNetClientStreamPtr st); -- 2.13.0

On 05/16/2017 10:03 AM, Michal Privoznik wrote:
Add a new argument to daemonCreateClientStream in order to allow for future expansion to mark that a specific stream can be used to skip data, such as the case with sparsely populated files. The new flag will be the eventual decision point between client/server to decide whether both ends can support and want to use sparse streams.
A new bool 'allowSkip' is added to both _virNetClientStream and daemonClientStream in order to perform the tracking.
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- daemon/remote.c | 2 +- daemon/stream.c | 6 +++++- daemon/stream.h | 3 ++- src/remote/remote_driver.c | 6 ++++-- src/rpc/gendispatch.pl | 4 ++-- src/rpc/virnetclientstream.c | 6 +++++- src/rpc/virnetclientstream.h | 3 ++- 7 files changed, 21 insertions(+), 9 deletions(-)
Reviewed-by: John Ferlan <jferlan@redhat.com> John

This is going to be RPC representation for virStreamSendHole. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/rpc/virnetprotocol.x | 5 +++++ src/virnetprotocol-structs | 4 ++++ 2 files changed, 9 insertions(+) diff --git a/src/rpc/virnetprotocol.x b/src/rpc/virnetprotocol.x index 9ce33b073..cab047cb0 100644 --- a/src/rpc/virnetprotocol.x +++ b/src/rpc/virnetprotocol.x @@ -236,3 +236,8 @@ struct virNetMessageError { int int2; virNetMessageNetwork net; /* unused */ }; + +struct virNetStreamHole { + hyper length; + unsigned int flags; +}; diff --git a/src/virnetprotocol-structs b/src/virnetprotocol-structs index af4526c90..aa6e0602a 100644 --- a/src/virnetprotocol-structs +++ b/src/virnetprotocol-structs @@ -42,3 +42,7 @@ struct virNetMessageError { int int2; virNetMessageNetwork net; }; +struct virNetStreamHole { + int64_t length; + u_int flags; +}; -- 2.13.0

On 05/16/2017 10:03 AM, Michal Privoznik wrote:
This is going to be RPC representation for virStreamSendHole.
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/rpc/virnetprotocol.x | 5 +++++ src/virnetprotocol-structs | 4 ++++ 2 files changed, 9 insertions(+)
Reviewed-by: John Ferlan <jferlan@redhat.com> John

This is a special type of stream packet, that is bidirectional and will contain information on how much bytes are both sides skipping in the stream. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- daemon/stream.c | 3 ++- src/rpc/virnetclient.c | 1 + src/rpc/virnetprotocol.x | 12 +++++++++++- src/virnetprotocol-structs | 1 + 4 files changed, 15 insertions(+), 2 deletions(-) diff --git a/daemon/stream.c b/daemon/stream.c index 6465463ff..f44c21278 100644 --- a/daemon/stream.c +++ b/daemon/stream.c @@ -287,7 +287,8 @@ daemonStreamFilter(virNetServerClientPtr client ATTRIBUTE_UNUSED, virMutexLock(&stream->priv->lock); - if (msg->header.type != VIR_NET_STREAM) + if (msg->header.type != VIR_NET_STREAM && + msg->header.type != VIR_NET_STREAM_HOLE) goto cleanup; if (!virNetServerProgramMatches(stream->prog, msg)) diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c index 837a8a707..95cd9a6c7 100644 --- a/src/rpc/virnetclient.c +++ b/src/rpc/virnetclient.c @@ -1284,6 +1284,7 @@ virNetClientCallDispatch(virNetClientPtr client) return virNetClientCallDispatchMessage(client); case VIR_NET_STREAM: /* Stream protocol */ + case VIR_NET_STREAM_HOLE: /* Sparse stream protocol*/ return virNetClientCallDispatchStream(client); default: diff --git a/src/rpc/virnetprotocol.x b/src/rpc/virnetprotocol.x index cab047cb0..ee9899059 100644 --- a/src/rpc/virnetprotocol.x +++ b/src/rpc/virnetprotocol.x @@ -143,6 +143,14 @@ const VIR_NET_MESSAGE_NUM_FDS_MAX = 32; * * status == VIR_NET_ERROR * remote_error Error information * + * - type == VIR_NET_STREAM_HOLE + * * status == VIR_NET_CONTINUE + * byte[] hole data + * * status == VIR_NET_ERROR + * remote_error error information + * * status == VIR_NET_OK + * <empty> + * */ enum virNetMessageType { /* client -> server. args from a method call */ @@ -156,7 +164,9 @@ enum virNetMessageType { /* client -> server. args from a method call, with passed FDs */ VIR_NET_CALL_WITH_FDS = 4, /* server -> client. reply/error from a method call, with passed FDs */ - VIR_NET_REPLY_WITH_FDS = 5 + VIR_NET_REPLY_WITH_FDS = 5, + /* either direction, stream hole data packet */ + VIR_NET_STREAM_HOLE = 6 }; enum virNetMessageStatus { diff --git a/src/virnetprotocol-structs b/src/virnetprotocol-structs index aa6e0602a..b36581f86 100644 --- a/src/virnetprotocol-structs +++ b/src/virnetprotocol-structs @@ -6,6 +6,7 @@ enum virNetMessageType { VIR_NET_STREAM = 3, VIR_NET_CALL_WITH_FDS = 4, VIR_NET_REPLY_WITH_FDS = 5, + VIR_NET_STREAM_HOLE = 6, }; enum virNetMessageStatus { VIR_NET_OK = 0, -- 2.13.0

On 05/16/2017 10:03 AM, Michal Privoznik wrote:
This is a special type of stream packet, that is bidirectional and will contain information on how much bytes are both sides skipping in the stream.
and contains information regarding how many bytes each side will be skipping in the stream.
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- daemon/stream.c | 3 ++- src/rpc/virnetclient.c | 1 + src/rpc/virnetprotocol.x | 12 +++++++++++- src/virnetprotocol-structs | 1 + 4 files changed, 15 insertions(+), 2 deletions(-)
Reviewed-by: John Ferlan <jferlan@redhat.com> John

Ideally, this would be generated, but to achieve that corresponding XDR definitions needed to go into a different .x file. But they belong just to the one that they are right now. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- tools/wireshark/src/packet-libvirt.c | 52 ++++++++++++++++++++++++++++++++++++ tools/wireshark/src/packet-libvirt.h | 2 ++ 2 files changed, 54 insertions(+) diff --git a/tools/wireshark/src/packet-libvirt.c b/tools/wireshark/src/packet-libvirt.c index 260161e98..a1f5a34f4 100644 --- a/tools/wireshark/src/packet-libvirt.c +++ b/tools/wireshark/src/packet-libvirt.c @@ -50,8 +50,12 @@ static int hf_libvirt_serial = -1; static int hf_libvirt_status = -1; static int hf_libvirt_stream = -1; static int hf_libvirt_num_of_fds = -1; +static int hf_libvirt_stream_hole_length = -1; +static int hf_libvirt_stream_hole_flags = -1; +static int hf_libvirt_stream_hole = -1; int hf_libvirt_unknown = -1; static gint ett_libvirt = -1; +static gint ett_libvirt_stream_hole = -1; #define XDR_PRIMITIVE_DISSECTOR(xtype, ctype, ftype) \ static gboolean \ @@ -326,6 +330,33 @@ dissect_libvirt_payload_xdr_data(tvbuff_t *tvb, proto_tree *tree, gint payload_l dissect_libvirt_fds(tvb, start + payload_length, nfds); } +static gboolean +dissect_xdr_stream_hole(tvbuff_t *tvb, proto_tree *tree, XDR *xdrs, int hf) +{ + goffset start; + proto_item *ti; + + start = xdr_getpos(xdrs); + if (hf == -1) { + ti = proto_tree_add_item(tree, hf_libvirt_stream_hole, tvb, start, -1, ENC_NA); + } else { + header_field_info *hfinfo; + hfinfo = proto_registrar_get_nth(hf_libvirt_stream_hole); + ti = proto_tree_add_item(tree, hf, tvb, start, -1, ENC_NA); + proto_item_append_text(ti, " :: %s", hfinfo->name); + } + tree = proto_item_add_subtree(ti, ett_libvirt_stream_hole); + + hf = hf_libvirt_stream_hole_length; + if (!dissect_xdr_hyper(tvb, tree, xdrs, hf)) return FALSE; + + hf = hf_libvirt_stream_hole_flags; + if (!dissect_xdr_u_int(tvb, tree, xdrs, hf)) return FALSE; + + proto_item_set_len(ti, xdr_getpos(xdrs) - start); + return TRUE; +} + static void dissect_libvirt_payload(tvbuff_t *tvb, proto_tree *tree, guint32 prog, guint32 proc, guint32 type, guint32 status) @@ -346,6 +377,8 @@ dissect_libvirt_payload(tvbuff_t *tvb, proto_tree *tree, dissect_libvirt_payload_xdr_data(tvb, tree, payload_length, status, VIR_ERROR_MESSAGE_DISSECTOR); } else if (type == VIR_NET_STREAM) { /* implicitly, status == VIR_NET_CONTINUE */ dissect_libvirt_stream(tvb, tree, payload_length); + } else if (type == VIR_NET_STREAM_HOLE) { + dissect_libvirt_payload_xdr_data(tvb, tree, payload_length, status, dissect_xdr_stream_hole); } else { goto unknown; } @@ -525,6 +558,24 @@ proto_register_libvirt(void) NULL, 0x0, NULL, HFILL} }, + { &hf_libvirt_stream_hole, + { "stream_hole", "libvirt.stream_hole", + FT_NONE, BASE_NONE, + NULL, 0x0, + NULL, HFILL} + }, + { &hf_libvirt_stream_hole_length, + { "length", "libvirt.stream_hole.length", + FT_INT64, BASE_DEC, + NULL, 0x0, + NULL, HFILL} + }, + { &hf_libvirt_stream_hole_flags, + { "flags", "libvirt.stream_hole.flags", + FT_UINT32, BASE_DEC, + NULL, 0x0, + NULL, HFILL} + }, { &hf_libvirt_unknown, { "unknown", "libvirt.unknown", FT_BYTES, BASE_NONE, @@ -535,6 +586,7 @@ proto_register_libvirt(void) static gint *ett[] = { VIR_DYNAMIC_ETTSET + &ett_libvirt_stream_hole, &ett_libvirt }; diff --git a/tools/wireshark/src/packet-libvirt.h b/tools/wireshark/src/packet-libvirt.h index 5f99fdfae..9874a8cbf 100644 --- a/tools/wireshark/src/packet-libvirt.h +++ b/tools/wireshark/src/packet-libvirt.h @@ -53,6 +53,7 @@ enum vir_net_message_type { VIR_NET_STREAM = 3, VIR_NET_CALL_WITH_FDS = 4, VIR_NET_REPLY_WITH_FDS = 5, + VIR_NET_STREAM_HOLE = 6, }; enum vir_net_message_status { @@ -76,6 +77,7 @@ static const value_string type_strings[] = { { VIR_NET_STREAM, "STREAM" }, { VIR_NET_CALL_WITH_FDS, "CALL_WITH_FDS" }, { VIR_NET_REPLY_WITH_FDS, "REPLY_WITH_FDS" }, + { VIR_NET_STREAM_HOLE, "STREAM_HOLE" }, { -1, NULL } }; -- 2.13.0

On 05/16/2017 10:03 AM, Michal Privoznik wrote:
Ideally, this would be generated, but to achieve that corresponding XDR definitions needed to go into a different .x file. But they belong just to the one that they are right now.
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- tools/wireshark/src/packet-libvirt.c | 52 ++++++++++++++++++++++++++++++++++++ tools/wireshark/src/packet-libvirt.h | 2 ++ 2 files changed, 54 insertions(+)
diff --git a/tools/wireshark/src/packet-libvirt.c b/tools/wireshark/src/packet-libvirt.c index 260161e98..a1f5a34f4 100644 --- a/tools/wireshark/src/packet-libvirt.c +++ b/tools/wireshark/src/packet-libvirt.c @@ -50,8 +50,12 @@ static int hf_libvirt_serial = -1; static int hf_libvirt_status = -1; static int hf_libvirt_stream = -1; static int hf_libvirt_num_of_fds = -1; +static int hf_libvirt_stream_hole_length = -1;
How will this work with a LL (hyper) stream hole size? [1]
+static int hf_libvirt_stream_hole_flags = -1; +static int hf_libvirt_stream_hole = -1; int hf_libvirt_unknown = -1; static gint ett_libvirt = -1; +static gint ett_libvirt_stream_hole = -1;
#define XDR_PRIMITIVE_DISSECTOR(xtype, ctype, ftype) \ static gboolean \ @@ -326,6 +330,33 @@ dissect_libvirt_payload_xdr_data(tvbuff_t *tvb, proto_tree *tree, gint payload_l dissect_libvirt_fds(tvb, start + payload_length, nfds); }
+static gboolean +dissect_xdr_stream_hole(tvbuff_t *tvb, proto_tree *tree, XDR *xdrs, int hf) int hf? [1]
Of course this does match the prototype I see: static gboolean dissect_xdr_hyper(tvbuff_t *tvb, proto_tree *tree, XDR *xdrs, int hf); but seems strange. Then again, I know next to nothing about wireshark. Call this a "weak" (at best) Reviewed-by: John Ferlan <jferlan@redhat.com> John
+{ + goffset start; + proto_item *ti; + + start = xdr_getpos(xdrs); + if (hf == -1) { + ti = proto_tree_add_item(tree, hf_libvirt_stream_hole, tvb, start, -1, ENC_NA); + } else { + header_field_info *hfinfo; + hfinfo = proto_registrar_get_nth(hf_libvirt_stream_hole); + ti = proto_tree_add_item(tree, hf, tvb, start, -1, ENC_NA); + proto_item_append_text(ti, " :: %s", hfinfo->name); + } + tree = proto_item_add_subtree(ti, ett_libvirt_stream_hole); + + hf = hf_libvirt_stream_hole_length; + if (!dissect_xdr_hyper(tvb, tree, xdrs, hf)) return FALSE; + + hf = hf_libvirt_stream_hole_flags; + if (!dissect_xdr_u_int(tvb, tree, xdrs, hf)) return FALSE; + + proto_item_set_len(ti, xdr_getpos(xdrs) - start); + return TRUE; +} + static void dissect_libvirt_payload(tvbuff_t *tvb, proto_tree *tree, guint32 prog, guint32 proc, guint32 type, guint32 status) @@ -346,6 +377,8 @@ dissect_libvirt_payload(tvbuff_t *tvb, proto_tree *tree, dissect_libvirt_payload_xdr_data(tvb, tree, payload_length, status, VIR_ERROR_MESSAGE_DISSECTOR); } else if (type == VIR_NET_STREAM) { /* implicitly, status == VIR_NET_CONTINUE */ dissect_libvirt_stream(tvb, tree, payload_length); + } else if (type == VIR_NET_STREAM_HOLE) { + dissect_libvirt_payload_xdr_data(tvb, tree, payload_length, status, dissect_xdr_stream_hole); } else { goto unknown; } @@ -525,6 +558,24 @@ proto_register_libvirt(void) NULL, 0x0, NULL, HFILL} }, + { &hf_libvirt_stream_hole, + { "stream_hole", "libvirt.stream_hole", + FT_NONE, BASE_NONE, + NULL, 0x0, + NULL, HFILL} + }, + { &hf_libvirt_stream_hole_length, + { "length", "libvirt.stream_hole.length", + FT_INT64, BASE_DEC, + NULL, 0x0, + NULL, HFILL} + }, + { &hf_libvirt_stream_hole_flags, + { "flags", "libvirt.stream_hole.flags", + FT_UINT32, BASE_DEC, + NULL, 0x0, + NULL, HFILL} + }, { &hf_libvirt_unknown, { "unknown", "libvirt.unknown", FT_BYTES, BASE_NONE, @@ -535,6 +586,7 @@ proto_register_libvirt(void)
static gint *ett[] = { VIR_DYNAMIC_ETTSET + &ett_libvirt_stream_hole, &ett_libvirt };
diff --git a/tools/wireshark/src/packet-libvirt.h b/tools/wireshark/src/packet-libvirt.h index 5f99fdfae..9874a8cbf 100644 --- a/tools/wireshark/src/packet-libvirt.h +++ b/tools/wireshark/src/packet-libvirt.h @@ -53,6 +53,7 @@ enum vir_net_message_type { VIR_NET_STREAM = 3, VIR_NET_CALL_WITH_FDS = 4, VIR_NET_REPLY_WITH_FDS = 5, + VIR_NET_STREAM_HOLE = 6, };
enum vir_net_message_status { @@ -76,6 +77,7 @@ static const value_string type_strings[] = { { VIR_NET_STREAM, "STREAM" }, { VIR_NET_CALL_WITH_FDS, "CALL_WITH_FDS" }, { VIR_NET_REPLY_WITH_FDS, "REPLY_WITH_FDS" }, + { VIR_NET_STREAM_HOLE, "STREAM_HOLE" }, { -1, NULL } };

On 05/17/2017 01:44 PM, John Ferlan wrote:
On 05/16/2017 10:03 AM, Michal Privoznik wrote:
Ideally, this would be generated, but to achieve that corresponding XDR definitions needed to go into a different .x file. But they belong just to the one that they are right now.
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- tools/wireshark/src/packet-libvirt.c | 52 ++++++++++++++++++++++++++++++++++++ tools/wireshark/src/packet-libvirt.h | 2 ++ 2 files changed, 54 insertions(+)
diff --git a/tools/wireshark/src/packet-libvirt.c b/tools/wireshark/src/packet-libvirt.c index 260161e98..a1f5a34f4 100644 --- a/tools/wireshark/src/packet-libvirt.c +++ b/tools/wireshark/src/packet-libvirt.c @@ -50,8 +50,12 @@ static int hf_libvirt_serial = -1; static int hf_libvirt_status = -1; static int hf_libvirt_stream = -1; static int hf_libvirt_num_of_fds = -1; +static int hf_libvirt_stream_hole_length = -1;
How will this work with a LL (hyper) stream hole size? [1]
+static int hf_libvirt_stream_hole_flags = -1; +static int hf_libvirt_stream_hole = -1; int hf_libvirt_unknown = -1; static gint ett_libvirt = -1; +static gint ett_libvirt_stream_hole = -1;
#define XDR_PRIMITIVE_DISSECTOR(xtype, ctype, ftype) \ static gboolean \ @@ -326,6 +330,33 @@ dissect_libvirt_payload_xdr_data(tvbuff_t *tvb, proto_tree *tree, gint payload_l dissect_libvirt_fds(tvb, start + payload_length, nfds); }
+static gboolean +dissect_xdr_stream_hole(tvbuff_t *tvb, proto_tree *tree, XDR *xdrs, int hf) int hf? [1]
Of course this does match the prototype I see:
static gboolean dissect_xdr_hyper(tvbuff_t *tvb, proto_tree *tree, XDR *xdrs, int hf);
but seems strange. Then again, I know next to nothing about wireshark.
'int hf' is not there to represent LL @offset. @hf stands for 'header field'. It's an index into an array where info on the field is to be found, for instance: { &hf_libvirt_stream_hole, { "stream_hole", "libvirt.stream_hole", FT_NONE, BASE_NONE, NULL, 0x0, NULL, HFILL} }, "stream_hole" - abbreviated name of the field "libvirt.stream_hole" - full name of the field FT_NONE - field type (char, bool, integer of all sorts, ...) BASE_NONE - base for decoding (decimal, hexa, octa, ...) and so on. Therefore, the 'int hf' in the dissect_xdr_stream_hole is not 'long long offset' contained in the packed, rather than an index into an array. This index is then used when telling wireshark how to display particular piece of a packet.
Call this a "weak" (at best)
Reviewed-by: John Ferlan <jferlan@redhat.com>
Good enough for me :-) Michal

This is just a helper function that takes in a length value, encodes it into XDR and sends to client. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/libvirt_remote.syms | 1 + src/rpc/virnetserverprogram.c | 35 +++++++++++++++++++++++++++++++++++ src/rpc/virnetserverprogram.h | 8 ++++++++ 3 files changed, 44 insertions(+) diff --git a/src/libvirt_remote.syms b/src/libvirt_remote.syms index ca1f3ac86..bb6a8d465 100644 --- a/src/libvirt_remote.syms +++ b/src/libvirt_remote.syms @@ -178,6 +178,7 @@ virNetServerProgramNew; virNetServerProgramSendReplyError; virNetServerProgramSendStreamData; virNetServerProgramSendStreamError; +virNetServerProgramSendStreamHole; virNetServerProgramUnknownError; diff --git a/src/rpc/virnetserverprogram.c b/src/rpc/virnetserverprogram.c index d1597f438..556c91605 100644 --- a/src/rpc/virnetserverprogram.c +++ b/src/rpc/virnetserverprogram.c @@ -548,6 +548,41 @@ int virNetServerProgramSendStreamData(virNetServerProgramPtr prog, } +int virNetServerProgramSendStreamHole(virNetServerProgramPtr prog, + virNetServerClientPtr client, + virNetMessagePtr msg, + int procedure, + unsigned int serial, + long long length, + unsigned int flags) +{ + virNetStreamHole data; + + VIR_DEBUG("client=%p msg=%p length=%lld", client, msg, length); + + memset(&data, 0, sizeof(data)); + data.length = length; + data.flags = flags; + + msg->header.prog = prog->program; + msg->header.vers = prog->version; + msg->header.proc = procedure; + msg->header.type = VIR_NET_STREAM_HOLE; + msg->header.serial = serial; + msg->header.status = VIR_NET_CONTINUE; + + if (virNetMessageEncodeHeader(msg) < 0) + return -1; + + if (virNetMessageEncodePayload(msg, + (xdrproc_t) xdr_virNetStreamHole, + &data) < 0) + return -1; + + return virNetServerClientSendMessage(client, msg); +} + + void virNetServerProgramDispose(void *obj ATTRIBUTE_UNUSED) { } diff --git a/src/rpc/virnetserverprogram.h b/src/rpc/virnetserverprogram.h index 531fca024..1731c9e1d 100644 --- a/src/rpc/virnetserverprogram.h +++ b/src/rpc/virnetserverprogram.h @@ -104,4 +104,12 @@ int virNetServerProgramSendStreamData(virNetServerProgramPtr prog, const char *data, size_t len); +int virNetServerProgramSendStreamHole(virNetServerProgramPtr prog, + virNetServerClientPtr client, + virNetMessagePtr msg, + int procedure, + unsigned int serial, + long long length, + unsigned int flags); + #endif /* __VIR_NET_SERVER_PROGRAM_H__ */ -- 2.13.0

On 05/16/2017 10:03 AM, Michal Privoznik wrote:
This is just a helper function that takes in a length value, encodes it into XDR and sends to client.
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/libvirt_remote.syms | 1 + src/rpc/virnetserverprogram.c | 35 +++++++++++++++++++++++++++++++++++ src/rpc/virnetserverprogram.h | 8 ++++++++ 3 files changed, 44 insertions(+)
Reviewed-by: John Ferlan <jferlan@redhat.com> John

While the previous commit implemented a helper for sending a STREAM_HOLE packet for daemon, this is a client's counterpart. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/libvirt_remote.syms | 1 + src/rpc/virnetclientstream.c | 54 ++++++++++++++++++++++++++++++++++++++++++++ src/rpc/virnetclientstream.h | 5 ++++ 3 files changed, 60 insertions(+) diff --git a/src/libvirt_remote.syms b/src/libvirt_remote.syms index bb6a8d465..186d2c622 100644 --- a/src/libvirt_remote.syms +++ b/src/libvirt_remote.syms @@ -53,6 +53,7 @@ virNetClientStreamNew; virNetClientStreamQueuePacket; virNetClientStreamRaiseError; virNetClientStreamRecvPacket; +virNetClientStreamSendHole; virNetClientStreamSendPacket; virNetClientStreamSetError; diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index 4c27f308e..9005e6be9 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -429,6 +429,60 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, } +int +virNetClientStreamSendHole(virNetClientStreamPtr st, + virNetClientPtr client, + long long length, + unsigned int flags) +{ + virNetMessagePtr msg = NULL; + virNetStreamHole data; + int ret = -1; + + VIR_DEBUG("st=%p length=%llu", st, length); + + if (!st->allowSkip) { + virReportError(VIR_ERR_OPERATION_INVALID, "%s", + _("Skipping is not supported with this stream")); + return -1; + } + + memset(&data, 0, sizeof(data)); + data.length = length; + data.flags = flags; + + if (!(msg = virNetMessageNew(false))) + return -1; + + virObjectLock(st); + + msg->header.prog = virNetClientProgramGetProgram(st->prog); + msg->header.vers = virNetClientProgramGetVersion(st->prog); + msg->header.status = VIR_NET_CONTINUE; + msg->header.type = VIR_NET_STREAM_HOLE; + msg->header.serial = st->serial; + msg->header.proc = st->proc; + + virObjectUnlock(st); + + if (virNetMessageEncodeHeader(msg) < 0) + goto cleanup; + + if (virNetMessageEncodePayload(msg, + (xdrproc_t) xdr_virNetStreamHole, + &data) < 0) + goto cleanup; + + if (virNetClientSendNoReply(client, msg) < 0) + goto cleanup; + + ret = 0; + cleanup: + virNetMessageFree(msg); + return ret; +} + + int virNetClientStreamEventAddCallback(virNetClientStreamPtr st, int events, virNetClientStreamEventCallback cb, diff --git a/src/rpc/virnetclientstream.h b/src/rpc/virnetclientstream.h index f3bc0672b..c25c69bb1 100644 --- a/src/rpc/virnetclientstream.h +++ b/src/rpc/virnetclientstream.h @@ -61,6 +61,11 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, size_t nbytes, bool nonblock); +int virNetClientStreamSendHole(virNetClientStreamPtr st, + virNetClientPtr client, + long long length, + unsigned int flags); + int virNetClientStreamEventAddCallback(virNetClientStreamPtr st, int events, virNetClientStreamEventCallback cb, -- 2.13.0

On 05/16/2017 10:03 AM, Michal Privoznik wrote:
While the previous commit implemented a helper for sending a STREAM_HOLE packet for daemon, this is a client's counterpart.
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/libvirt_remote.syms | 1 + src/rpc/virnetclientstream.c | 54 ++++++++++++++++++++++++++++++++++++++++++++ src/rpc/virnetclientstream.h | 5 ++++ 3 files changed, 60 insertions(+)
Reviewed-by: John Ferlan <jferlan@redhat.com> John

Basically, whenever the new type of stream packet arrives to the daemon call this function that decodes it and calls virStreamSendHole(). Otherwise a regular data stream packet has arrived and therefore continue its processing. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- daemon/stream.c | 80 +++++++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 69 insertions(+), 11 deletions(-) diff --git a/daemon/stream.c b/daemon/stream.c index f44c21278..57ddfe830 100644 --- a/daemon/stream.c +++ b/daemon/stream.c @@ -653,6 +653,52 @@ daemonStreamHandleAbort(virNetServerClientPtr client, } +static int +daemonStreamHandleHole(virNetServerClientPtr client, + daemonClientStream *stream, + virNetMessagePtr msg) +{ + int ret; + virNetStreamHole data; + + VIR_DEBUG("client=%p, stream=%p, proc=%d, serial=%u", + client, stream, msg->header.proc, msg->header.serial); + + /* Let's check if client plays nicely and advertised usage of + * sparse stream upfront. */ + if (!stream->allowSkip) { + virReportError(VIR_ERR_RPC, "%s", + _("Unexpected stream hole")); + return -1; + } + + if (virNetMessageDecodePayload(msg, + (xdrproc_t) xdr_virNetStreamHole, + &data) < 0) + return -1; + + ret = virStreamSendHole(stream->st, data.length, data.flags); + + if (ret < 0) { + virNetMessageError rerr; + + memset(&rerr, 0, sizeof(rerr)); + + VIR_INFO("Stream send hole failed"); + stream->closed = true; + virStreamEventRemoveCallback(stream->st); + virStreamAbort(stream->st); + + return virNetServerProgramSendReplyError(stream->prog, + client, + msg, + &rerr, + &msg->header); + } + + return 0; +} + /* * Called when the stream is signalled has being able to accept @@ -671,19 +717,31 @@ daemonStreamHandleWrite(virNetServerClientPtr client, virNetMessagePtr msg = stream->rx; int ret; - switch (msg->header.status) { - case VIR_NET_OK: - ret = daemonStreamHandleFinish(client, stream, msg); - break; + if (msg->header.type == VIR_NET_STREAM_HOLE) { + /* Handle special case when the client sent us a hole. + * Otherwise just carry on with processing stream + * data. */ + ret = daemonStreamHandleHole(client, stream, msg); + } else if (msg->header.type == VIR_NET_STREAM) { + switch (msg->header.status) { + case VIR_NET_OK: + ret = daemonStreamHandleFinish(client, stream, msg); + break; - case VIR_NET_CONTINUE: - ret = daemonStreamHandleWriteData(client, stream, msg); - break; + case VIR_NET_CONTINUE: + ret = daemonStreamHandleWriteData(client, stream, msg); + break; - case VIR_NET_ERROR: - default: - ret = daemonStreamHandleAbort(client, stream, msg); - break; + case VIR_NET_ERROR: + default: + ret = daemonStreamHandleAbort(client, stream, msg); + break; + } + } else { + virReportError(VIR_ERR_RPC, + _("Unexpected message type: %d"), + msg->header.type); + ret = -1; } if (ret > 0) -- 2.13.0

On 05/16/2017 10:03 AM, Michal Privoznik wrote:
Basically, whenever the new type of stream packet arrives to the daemon call this function that decodes it and calls virStreamSendHole(). Otherwise a regular data stream packet has arrived and therefore continue its processing.
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- daemon/stream.c | 80 +++++++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 69 insertions(+), 11 deletions(-)
Reviewed-by: John Ferlan <jferlan@redhat.com> John

This is a function that handles an incoming STREAM_HOLE packet. Even though it is not wired up yet, it will be soon. At the beginning do couple of checks whether server plays nicely and sent us a STREAM_HOLE packed only after we've enabled sparse streams. Then decodes the message payload to see how big the hole is and stores it in passed @length argument. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/rpc/virnetclientstream.c | 96 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 96 insertions(+) diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index 9005e6be9..2f4f92a96 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -55,6 +55,7 @@ struct _virNetClientStream { bool incomingEOF; bool allowSkip; + long long holeLength; /* Size of incoming hole in stream. */ virNetClientStreamEventCallback cb; void *cbOpaque; @@ -356,6 +357,101 @@ int virNetClientStreamSendPacket(virNetClientStreamPtr st, return -1; } + +static int +virNetClientStreamSetHole(virNetClientStreamPtr st, + long long length, + unsigned int flags) +{ + virCheckFlags(0, -1); + + /* Shouldn't happen, But it's better to safe than sorry. */ + if (st->holeLength) { + virReportError(VIR_ERR_INTERNAL_ERROR, + _("unprocessed hole of size %lld already in the queue"), + st->holeLength); + return -1; + } + + st->holeLength += length; + return 0; +} + + +/** + * virNetClientStreamHandleHole: + * @client: client + * @st: stream + * + * Called whenever current message processed in the stream is + * VIR_NET_STREAM_HOLE. The stream @st is expected to be locked + * already. + * + * Returns: 0 on success, + * -1 otherwise. + */ +static int ATTRIBUTE_UNUSED +virNetClientStreamHandleHole(virNetClientPtr client, + virNetClientStreamPtr st) +{ + virNetMessagePtr msg; + virNetStreamHole data; + int ret = -1; + + VIR_DEBUG("client=%p st=%p", client, st); + + msg = st->rx; + memset(&data, 0, sizeof(data)); + + /* We should not be called unless there's VIR_NET_STREAM_HOLE + * message at the head of the list. But doesn't hurt to check */ + if (!msg) { + virReportError(VIR_ERR_INTERNAL_ERROR, "%s", + _("No message in the queue")); + goto cleanup; + } + + if (msg->header.type != VIR_NET_STREAM_HOLE) { + virReportError(VIR_ERR_INTERNAL_ERROR, + _("Invalid message prog=%d type=%d serial=%u proc=%d"), + msg->header.prog, + msg->header.type, + msg->header.serial, + msg->header.proc); + goto cleanup; + } + + /* Server should not send us VIR_NET_STREAM_HOLE unless we + * have requested so. But does not hurt to check ... */ + if (!st->allowSkip) { + virReportError(VIR_ERR_RPC, "%s", + _("Unexpected stream hole")); + goto cleanup; + } + + if (virNetMessageDecodePayload(msg, + (xdrproc_t) xdr_virNetStreamHole, + &data) < 0) { + virReportError(VIR_ERR_INTERNAL_ERROR, "%s", + _("Malformed stream hole packet")); + goto cleanup; + } + + virNetMessageQueueServe(&st->rx); + virNetMessageFree(msg); + + if (virNetClientStreamSetHole(st, data.length, data.flags) < 0) + goto cleanup; + + ret = 0; + cleanup: + if (ret < 0) { + /* Abort stream? */ + } + return ret; +} + + int virNetClientStreamRecvPacket(virNetClientStreamPtr st, virNetClientPtr client, char *data, -- 2.13.0

On 05/16/2017 10:03 AM, Michal Privoznik wrote:
This is a function that handles an incoming STREAM_HOLE packet. Even though it is not wired up yet, it will be soon. At the beginning do couple of checks whether server plays nicely and sent us a STREAM_HOLE packed only after we've enabled sparse streams. Then decodes the message payload to see how big the hole is and stores it in passed @length argument.
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/rpc/virnetclientstream.c | 96 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 96 insertions(+)
diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index 9005e6be9..2f4f92a96 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -55,6 +55,7 @@ struct _virNetClientStream { bool incomingEOF;
bool allowSkip; + long long holeLength; /* Size of incoming hole in stream. */
virNetClientStreamEventCallback cb; void *cbOpaque; @@ -356,6 +357,101 @@ int virNetClientStreamSendPacket(virNetClientStreamPtr st, return -1; }
+ +static int +virNetClientStreamSetHole(virNetClientStreamPtr st, + long long length, + unsigned int flags) +{ + virCheckFlags(0, -1); + + /* Shouldn't happen, But it's better to safe than sorry. */ + if (st->holeLength) { + virReportError(VIR_ERR_INTERNAL_ERROR, + _("unprocessed hole of size %lld already in the queue"), + st->holeLength); + return -1; + }
Perhaps this is what got me confused in the previous review of that series patch 30. Here it "feels like" having two holes in a row is not allowed, but then there was that logic and discussion that implied it was possible. So unless math is off, this is a "shouldn't happen" type condition. I'm fine with leaving the check in, but just trying to convince myself.
+ + st->holeLength += length; + return 0; +} + + +/** + * virNetClientStreamHandleHole: + * @client: client + * @st: stream + * + * Called whenever current message processed in the stream is + * VIR_NET_STREAM_HOLE. The stream @st is expected to be locked + * already. + * + * Returns: 0 on success, + * -1 otherwise. + */ +static int ATTRIBUTE_UNUSED +virNetClientStreamHandleHole(virNetClientPtr client, + virNetClientStreamPtr st) +{ + virNetMessagePtr msg; + virNetStreamHole data; + int ret = -1; + + VIR_DEBUG("client=%p st=%p", client, st); + + msg = st->rx; + memset(&data, 0, sizeof(data)); + + /* We should not be called unless there's VIR_NET_STREAM_HOLE + * message at the head of the list. But doesn't hurt to check */ + if (!msg) { + virReportError(VIR_ERR_INTERNAL_ERROR, "%s", + _("No message in the queue")); + goto cleanup; + } + + if (msg->header.type != VIR_NET_STREAM_HOLE) { + virReportError(VIR_ERR_INTERNAL_ERROR, + _("Invalid message prog=%d type=%d serial=%u proc=%d"), + msg->header.prog, + msg->header.type, + msg->header.serial, + msg->header.proc); + goto cleanup; + } + + /* Server should not send us VIR_NET_STREAM_HOLE unless we + * have requested so. But does not hurt to check ... */ + if (!st->allowSkip) { + virReportError(VIR_ERR_RPC, "%s", + _("Unexpected stream hole")); + goto cleanup; + } + + if (virNetMessageDecodePayload(msg, + (xdrproc_t) xdr_virNetStreamHole, + &data) < 0) { + virReportError(VIR_ERR_INTERNAL_ERROR, "%s", + _("Malformed stream hole packet")); + goto cleanup; + } + + virNetMessageQueueServe(&st->rx); + virNetMessageFree(msg); + + if (virNetClientStreamSetHole(st, data.length, data.flags) < 0) + goto cleanup; + + ret = 0; + cleanup: + if (ret < 0) { + /* Abort stream? */ + }
Do we still not have an answer? Nothing in this module does such an abort, so rather than leave this dangling, I'd lean towards no. Still one way or another this does need to be cleaned up before pushing... Still, consider this: Reviewed-by: John Ferlan <jferlan@redhat.com> John
+ return ret; +} + + int virNetClientStreamRecvPacket(virNetClientStreamPtr st, virNetClientPtr client, char *data,

Now that we have RPC wrappers over VIR_NET_STREAM_HOLE we can start wiring them up. This commit wires up situation when a client wants to send a hole to daemon. To keep stream offsets synchronous, upon successful call on the daemon skip the same hole in local part of the stream. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/remote/remote_driver.c | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/src/remote/remote_driver.c b/src/remote/remote_driver.c index aebdd47c9..ff5be6ebb 100644 --- a/src/remote/remote_driver.c +++ b/src/remote/remote_driver.c @@ -5683,6 +5683,37 @@ remoteStreamRecv(virStreamPtr st, return remoteStreamRecvFlags(st, data, nbytes, 0); } + +static int +remoteStreamSendHole(virStreamPtr st, + long long length, + unsigned int flags) +{ + VIR_DEBUG("st=%p length=%lld flags=%x", + st, length, flags); + struct private_data *priv = st->conn->privateData; + virNetClientStreamPtr privst = st->privateData; + int rv; + + if (virNetClientStreamRaiseError(privst)) + return -1; + + remoteDriverLock(priv); + priv->localUses++; + remoteDriverUnlock(priv); + + rv = virNetClientStreamSendHole(privst, + priv->client, + length, + flags); + + remoteDriverLock(priv); + priv->localUses--; + remoteDriverUnlock(priv); + return rv; +} + + struct remoteStreamCallbackData { virStreamPtr st; virStreamEventCallback cb; @@ -5857,6 +5888,7 @@ static virStreamDriver remoteStreamDrv = { .streamRecv = remoteStreamRecv, .streamRecvFlags = remoteStreamRecvFlags, .streamSend = remoteStreamSend, + .streamSendHole = remoteStreamSendHole, .streamFinish = remoteStreamFinish, .streamAbort = remoteStreamAbort, .streamEventAddCallback = remoteStreamEventAddCallback, -- 2.13.0

On 05/16/2017 10:04 AM, Michal Privoznik wrote:
Now that we have RPC wrappers over VIR_NET_STREAM_HOLE we can start wiring them up. This commit wires up situation when a client wants to send a hole to daemon.
To keep stream offsets synchronous, upon successful call on the daemon skip the same hole in local part of the stream.
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/remote/remote_driver.c | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+)
Reviewed-by: John Ferlan <jferlan@redhat.com> John

Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/remote/remote_driver.c | 3 ++- src/rpc/virnetclientstream.c | 10 +++++++--- src/rpc/virnetclientstream.h | 3 ++- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/src/remote/remote_driver.c b/src/remote/remote_driver.c index ff5be6ebb..63daec587 100644 --- a/src/remote/remote_driver.c +++ b/src/remote/remote_driver.c @@ -5665,7 +5665,8 @@ remoteStreamRecvFlags(virStreamPtr st, priv->client, data, nbytes, - (st->flags & VIR_STREAM_NONBLOCK)); + (st->flags & VIR_STREAM_NONBLOCK), + flags); VIR_DEBUG("Done %d", rv); diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index 2f4f92a96..4a3d843b1 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -456,13 +456,17 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, virNetClientPtr client, char *data, size_t nbytes, - bool nonblock) + bool nonblock, + unsigned int flags) { int rv = -1; size_t want; - VIR_DEBUG("st=%p client=%p data=%p nbytes=%zu nonblock=%d", - st, client, data, nbytes, nonblock); + VIR_DEBUG("st=%p client=%p data=%p nbytes=%zu nonblock=%d flags=%x", + st, client, data, nbytes, nonblock, flags); + + virCheckFlags(0, -1); + virObjectLock(st); if (!st->rx && !st->incomingEOF) { virNetMessagePtr msg; diff --git a/src/rpc/virnetclientstream.h b/src/rpc/virnetclientstream.h index c25c69bb1..1774e5ac3 100644 --- a/src/rpc/virnetclientstream.h +++ b/src/rpc/virnetclientstream.h @@ -59,7 +59,8 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, virNetClientPtr client, char *data, size_t nbytes, - bool nonblock); + bool nonblock, + unsigned int flags); int virNetClientStreamSendHole(virNetClientStreamPtr st, virNetClientPtr client, -- 2.13.0

On 05/16/2017 10:04 AM, Michal Privoznik wrote:
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/remote/remote_driver.c | 3 ++- src/rpc/virnetclientstream.c | 10 +++++++--- src/rpc/virnetclientstream.h | 3 ++- 3 files changed, 11 insertions(+), 5 deletions(-)
Reviewed-by: John Ferlan <jferlan@redhat.com> John

This function will fetch previously processed stream holes and return their sum. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/libvirt_remote.syms | 1 + src/rpc/virnetclientstream.c | 16 ++++++++++++++++ src/rpc/virnetclientstream.h | 4 ++++ 3 files changed, 21 insertions(+) diff --git a/src/libvirt_remote.syms b/src/libvirt_remote.syms index 186d2c622..61c20d530 100644 --- a/src/libvirt_remote.syms +++ b/src/libvirt_remote.syms @@ -52,6 +52,7 @@ virNetClientStreamMatches; virNetClientStreamNew; virNetClientStreamQueuePacket; virNetClientStreamRaiseError; +virNetClientStreamRecvHole; virNetClientStreamRecvPacket; virNetClientStreamSendHole; virNetClientStreamSendPacket; diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index 4a3d843b1..bf3922cb5 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -583,6 +583,22 @@ virNetClientStreamSendHole(virNetClientStreamPtr st, } +int virNetClientStreamRecvHole(virNetClientPtr client ATTRIBUTE_UNUSED, + virNetClientStreamPtr st, + long long *length) +{ + if (!st->allowSkip) { + virReportError(VIR_ERR_OPERATION_INVALID, "%s", + _("Holes are not supported with this stream")); + return -1; + } + + *length = st->holeLength; + st->holeLength = 0; + return 0; +} + + int virNetClientStreamEventAddCallback(virNetClientStreamPtr st, int events, virNetClientStreamEventCallback cb, diff --git a/src/rpc/virnetclientstream.h b/src/rpc/virnetclientstream.h index 1774e5ac3..c4e01bf1c 100644 --- a/src/rpc/virnetclientstream.h +++ b/src/rpc/virnetclientstream.h @@ -67,6 +67,10 @@ int virNetClientStreamSendHole(virNetClientStreamPtr st, long long length, unsigned int flags); +int virNetClientStreamRecvHole(virNetClientPtr client, + virNetClientStreamPtr st, + long long *length); + int virNetClientStreamEventAddCallback(virNetClientStreamPtr st, int events, virNetClientStreamEventCallback cb, -- 2.13.0

On 05/16/2017 10:04 AM, Michal Privoznik wrote:
This function will fetch previously processed stream holes and return their sum.
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/libvirt_remote.syms | 1 + src/rpc/virnetclientstream.c | 16 ++++++++++++++++ src/rpc/virnetclientstream.h | 4 ++++ 3 files changed, 21 insertions(+)
diff --git a/src/libvirt_remote.syms b/src/libvirt_remote.syms index 186d2c622..61c20d530 100644 --- a/src/libvirt_remote.syms +++ b/src/libvirt_remote.syms @@ -52,6 +52,7 @@ virNetClientStreamMatches; virNetClientStreamNew; virNetClientStreamQueuePacket; virNetClientStreamRaiseError; +virNetClientStreamRecvHole; virNetClientStreamRecvPacket; virNetClientStreamSendHole; virNetClientStreamSendPacket; diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index 4a3d843b1..bf3922cb5 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -583,6 +583,22 @@ virNetClientStreamSendHole(virNetClientStreamPtr st, }
+int virNetClientStreamRecvHole(virNetClientPtr client ATTRIBUTE_UNUSED, + virNetClientStreamPtr st, + long long *length)
This should be int virNetClient... Although, yes, different than other functions here. Reviewed-by: John Ferlan <jferlan@redhat.com> John
+{ + if (!st->allowSkip) { + virReportError(VIR_ERR_OPERATION_INVALID, "%s", + _("Holes are not supported with this stream")); + return -1; + } + + *length = st->holeLength; + st->holeLength = 0; + return 0; +} + +
[...]

Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/remote/remote_driver.c | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/src/remote/remote_driver.c b/src/remote/remote_driver.c index 63daec587..dc59034c3 100644 --- a/src/remote/remote_driver.c +++ b/src/remote/remote_driver.c @@ -5715,6 +5715,36 @@ remoteStreamSendHole(virStreamPtr st, } +static int +remoteStreamRecvHole(virStreamPtr st, + long long *length, + unsigned int flags) +{ + struct private_data *priv = st->conn->privateData; + virNetClientStreamPtr privst = st->privateData; + int rv; + + VIR_DEBUG("st=%p length=%p flags=%x", + st, length, flags); + + virCheckFlags(0, -1); + + if (virNetClientStreamRaiseError(privst)) + return -1; + + remoteDriverLock(priv); + priv->localUses++; + remoteDriverUnlock(priv); + + rv = virNetClientStreamRecvHole(priv->client, privst, length); + + remoteDriverLock(priv); + priv->localUses--; + remoteDriverUnlock(priv); + return rv; +} + + struct remoteStreamCallbackData { virStreamPtr st; virStreamEventCallback cb; @@ -5890,6 +5920,7 @@ static virStreamDriver remoteStreamDrv = { .streamRecvFlags = remoteStreamRecvFlags, .streamSend = remoteStreamSend, .streamSendHole = remoteStreamSendHole, + .streamRecvHole = remoteStreamRecvHole, .streamFinish = remoteStreamFinish, .streamAbort = remoteStreamAbort, .streamEventAddCallback = remoteStreamEventAddCallback, -- 2.13.0

On 05/16/2017 10:04 AM, Michal Privoznik wrote:
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/remote/remote_driver.c | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+)
Reviewed-by: John Ferlan <jferlan@redhat.com> John

Whenever server sends a client stream packet (either regular with actual data or stream skip one) it is queued on @st->rx. So the list is a mixture of both types of stream packets. So now that we have all the helpers needed we can wire their processing up. But since virNetClientStreamRecvPacket doesn't support VIR_STREAM_RECV_STOP_AT_HOLE flag yet, let's turn all received skips into zeroes repeating requested times. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/rpc/virnetclientstream.c | 45 ++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 43 insertions(+), 2 deletions(-) diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index bf3922cb5..75ec3f57f 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -295,6 +295,8 @@ int virNetClientStreamQueuePacket(virNetClientStreamPtr st, virObjectLock(st); + /* Don't distinguish VIR_NET_STREAM and VIR_NET_STREAM_SKIP + * here just yet. We want in order processing! */ virNetMessageQueuePush(&st->rx, tmp_msg); virNetClientStreamEventTimerUpdate(st); @@ -390,7 +392,7 @@ virNetClientStreamSetHole(virNetClientStreamPtr st, * Returns: 0 on success, * -1 otherwise. */ -static int ATTRIBUTE_UNUSED +static int virNetClientStreamHandleHole(virNetClientPtr client, virNetClientStreamPtr st) { @@ -468,6 +470,8 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, virCheckFlags(0, -1); virObjectLock(st); + + reread: if (!st->rx && !st->incomingEOF) { virNetMessagePtr msg; int ret; @@ -499,8 +503,45 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, } VIR_DEBUG("After IO rx=%p", st->rx); + + if (st->rx && + st->rx->header.type == VIR_NET_STREAM_HOLE && + st->holeLength == 0) { + /* Handle skip sent to us by server. */ + + if (virNetClientStreamHandleHole(client, st) < 0) + goto cleanup; + } + + if (!st->rx && !st->incomingEOF && !st->holeLength) { + if (nonblock) { + VIR_DEBUG("Non-blocking mode and no data available"); + rv = -2; + goto cleanup; + } + + /* We have consumed all packets from incoming queue but those + * were only skip packets, no data. Read the stream again. */ + goto reread; + } + want = nbytes; - while (want && st->rx) { + + if (st->holeLength) { + /* Pretend holeLength zeroes was read from stream. */ + size_t len = want; + + if (len > st->holeLength) + len = st->holeLength; + + memset(data, 0, len); + st->holeLength -= len; + want -= len; + } + + while (want && + st->rx && + st->rx->header.type == VIR_NET_STREAM) { virNetMessagePtr msg = st->rx; size_t len = want; -- 2.13.0

On 05/16/2017 10:04 AM, Michal Privoznik wrote:
Whenever server sends a client stream packet (either regular with actual data or stream skip one) it is queued on @st->rx. So the list is a mixture of both types of stream packets. So now that we have all the helpers needed we can wire their processing up. But since virNetClientStreamRecvPacket doesn't support VIR_STREAM_RECV_STOP_AT_HOLE flag yet, let's turn all received skips into zeroes repeating requested times.
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/rpc/virnetclientstream.c | 45 ++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 43 insertions(+), 2 deletions(-)
diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index bf3922cb5..75ec3f57f 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -295,6 +295,8 @@ int virNetClientStreamQueuePacket(virNetClientStreamPtr st,
virObjectLock(st);
+ /* Don't distinguish VIR_NET_STREAM and VIR_NET_STREAM_SKIP + * here just yet. We want in order processing! */ virNetMessageQueuePush(&st->rx, tmp_msg);
virNetClientStreamEventTimerUpdate(st); @@ -390,7 +392,7 @@ virNetClientStreamSetHole(virNetClientStreamPtr st, * Returns: 0 on success, * -1 otherwise. */ -static int ATTRIBUTE_UNUSED +static int virNetClientStreamHandleHole(virNetClientPtr client, virNetClientStreamPtr st) { @@ -468,6 +470,8 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, virCheckFlags(0, -1);
virObjectLock(st); + + reread: if (!st->rx && !st->incomingEOF) { virNetMessagePtr msg; int ret; @@ -499,8 +503,45 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, }
VIR_DEBUG("After IO rx=%p", st->rx); + + if (st->rx && + st->rx->header.type == VIR_NET_STREAM_HOLE && + st->holeLength == 0) {
Of course this is what I was referring to in patch 19... The only way to call virNetClientStreamHandleHole is if holeLength = 0, but since all the other validity checks (e.g. st->rx/msg and msg->header.type == HOLE) are present - what's one more later on... Your call you could also modify patch 19 to not have any extraneous checks that wouldn't be possible given the above lines... Your call, IDC.
+ /* Handle skip sent to us by server. */ + + if (virNetClientStreamHandleHole(client, st) < 0) + goto cleanup; + } + + if (!st->rx && !st->incomingEOF && !st->holeLength) {
holeLength == 0 Reviewed-by: John Ferlan <jferlan@redhat.com> John
+ if (nonblock) { + VIR_DEBUG("Non-blocking mode and no data available"); + rv = -2; + goto cleanup; + } + + /* We have consumed all packets from incoming queue but those + * were only skip packets, no data. Read the stream again. */ + goto reread; + } + want = nbytes; - while (want && st->rx) { + + if (st->holeLength) { + /* Pretend holeLength zeroes was read from stream. */ + size_t len = want; + + if (len > st->holeLength) + len = st->holeLength; + + memset(data, 0, len); + st->holeLength -= len; + want -= len; + } + + while (want && + st->rx && + st->rx->header.type == VIR_NET_STREAM) { virNetMessagePtr msg = st->rx; size_t len = want;

Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/remote/remote_driver.c | 2 +- src/rpc/virnetclientstream.c | 9 ++++++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/src/remote/remote_driver.c b/src/remote/remote_driver.c index dc59034c3..d27e96ffc 100644 --- a/src/remote/remote_driver.c +++ b/src/remote/remote_driver.c @@ -5652,7 +5652,7 @@ remoteStreamRecvFlags(virStreamPtr st, virNetClientStreamPtr privst = st->privateData; int rv; - virCheckFlags(0, -1); + virCheckFlags(VIR_STREAM_RECV_STOP_AT_HOLE, -1); if (virNetClientStreamRaiseError(privst)) return -1; diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index 75ec3f57f..e68d8f946 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -467,7 +467,7 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, VIR_DEBUG("st=%p client=%p data=%p nbytes=%zu nonblock=%d flags=%x", st, client, data, nbytes, nonblock, flags); - virCheckFlags(0, -1); + virCheckFlags(VIR_STREAM_RECV_STOP_AT_HOLE, -1); virObjectLock(st); @@ -531,6 +531,13 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, /* Pretend holeLength zeroes was read from stream. */ size_t len = want; + /* Yes, pretend unless we are asked not to. */ + if (flags & VIR_STREAM_RECV_STOP_AT_HOLE) { + /* No error reporting here. Caller knows what they are doing. */ + rv = -3; + goto cleanup; + } + if (len > st->holeLength) len = st->holeLength; -- 2.13.0

The commit message text could use a few words, but I know it's fairly self explanatory... Still something better than nothing. On 05/16/2017 10:04 AM, Michal Privoznik wrote:
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/remote/remote_driver.c | 2 +- src/rpc/virnetclientstream.c | 9 ++++++++- 2 files changed, 9 insertions(+), 2 deletions(-)
Reviewed-by: John Ferlan <jferlan@redhat.com> John

Whenever client is able to receive some data from stream daemonStreamHandleRead is called. But now the behaviour of this function needs to be changed a bit. Previously it just read data from underlying file (of chardev or whatever) and sent those through the stream to client. This model will not work any longer because it does not differentiate whether underlying file is in data or hole section. Therefore, at the beginning of this function add code that checks this situation and acts accordingly. So after the this, when wanting to send some data we always check whether we are not in a hole and if so, skip it an inform client about its size. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- daemon/stream.c | 59 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 59 insertions(+) diff --git a/daemon/stream.c b/daemon/stream.c index 57ddfe830..284499912 100644 --- a/daemon/stream.c +++ b/daemon/stream.c @@ -29,6 +29,7 @@ #include "virlog.h" #include "virnetserverclient.h" #include "virerror.h" +#include "libvirt_internal.h" #define VIR_FROM_THIS VIR_FROM_STREAMS @@ -53,6 +54,7 @@ struct daemonClientStream { bool tx; bool allowSkip; + size_t dataLen; /* How much data is there remaining until we see a hole */ daemonClientStreamPtr next; }; @@ -796,6 +798,8 @@ daemonStreamHandleRead(virNetServerClientPtr client, size_t bufferLen = VIR_NET_MESSAGE_LEGACY_PAYLOAD_MAX; int ret = -1; int rv; + int inData = 0; + long long length = 0; VIR_DEBUG("client=%p, stream=%p tx=%d closed=%d", client, stream, stream->tx, stream->closed); @@ -820,6 +824,58 @@ daemonStreamHandleRead(virNetServerClientPtr client, if (!(msg = virNetMessageNew(false))) goto cleanup; + if (stream->allowSkip && !stream->dataLen) { + /* Handle skip. We want to send some data to the client. But we might + * be in a hole. Seek to next data. But if we are in data already, just + * carry on. */ + + rv = virStreamInData(stream->st, &inData, &length); + VIR_DEBUG("rv=%d inData=%d length=%lld", rv, inData, length); + + if (rv < 0) { + if (virNetServerProgramSendStreamError(remoteProgram, + client, + msg, + &rerr, + stream->procedure, + stream->serial) < 0) + goto cleanup; + msg = NULL; + + /* We're done with this call */ + goto done; + } else { + if (!inData && length) { + stream->tx = false; + msg->cb = daemonStreamMessageFinished; + msg->opaque = stream; + stream->refs++; + if (virNetServerProgramSendStreamHole(remoteProgram, + client, + msg, + stream->procedure, + stream->serial, + length, + 0) < 0) + goto cleanup; + + msg = NULL; + + /* We have successfully sent stream skip to the other side. + * To keep streams in sync seek locally too. */ + virStreamSendHole(stream->st, length, 0); + /* We're done with this call */ + goto done; + } + } + + stream->dataLen = length; + } + + if (stream->allowSkip && + bufferLen > stream->dataLen) + bufferLen = stream->dataLen; + rv = virStreamRecv(stream->st, buffer, bufferLen); if (rv == -2) { /* Should never get this, since we're only called when we know @@ -834,6 +890,8 @@ daemonStreamHandleRead(virNetServerClientPtr client, goto cleanup; msg = NULL; } else { + stream->dataLen -= rv; + stream->tx = false; if (rv == 0) stream->recvEOF = true; @@ -851,6 +909,7 @@ daemonStreamHandleRead(virNetServerClientPtr client, msg = NULL; } + done: ret = 0; cleanup: VIR_FREE(buffer); -- 2.13.0

On 05/16/2017 10:04 AM, Michal Privoznik wrote:
Whenever client is able to receive some data from stream daemonStreamHandleRead is called. But now the behaviour of this function needs to be changed a bit. Previously it just read data from underlying file (of chardev or whatever) and sent those through the stream to client. This model will not work any longer because it does not differentiate whether underlying file is in data or hole section. Therefore, at the beginning of this function add code that checks this situation and acts accordingly.
empty between paragraphs
So after the this, when wanting to send some data we always check whether we are not in a hole and if so, skip it an inform client
s/it an inform/it and inform/
about its size.
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- daemon/stream.c | 59 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 59 insertions(+)
diff --git a/daemon/stream.c b/daemon/stream.c index 57ddfe830..284499912 100644 --- a/daemon/stream.c +++ b/daemon/stream.c @@ -29,6 +29,7 @@ #include "virlog.h" #include "virnetserverclient.h" #include "virerror.h" +#include "libvirt_internal.h"
Oh, right virStreamInData is not publicly available... (patch 10 comments)...
#define VIR_FROM_THIS VIR_FROM_STREAMS
@@ -53,6 +54,7 @@ struct daemonClientStream { bool tx;
bool allowSkip; + size_t dataLen; /* How much data is there remaining until we see a hole */
daemonClientStreamPtr next; }; @@ -796,6 +798,8 @@ daemonStreamHandleRead(virNetServerClientPtr client, size_t bufferLen = VIR_NET_MESSAGE_LEGACY_PAYLOAD_MAX; int ret = -1; int rv; + int inData = 0; + long long length = 0;
VIR_DEBUG("client=%p, stream=%p tx=%d closed=%d", client, stream, stream->tx, stream->closed); @@ -820,6 +824,58 @@ daemonStreamHandleRead(virNetServerClientPtr client, if (!(msg = virNetMessageNew(false))) goto cleanup;
+ if (stream->allowSkip && !stream->dataLen) {
dataLen == 0 ? (I know, same thing - I guess it's just one of those visual things for me...)
+ /* Handle skip. We want to send some data to the client. But we might + * be in a hole. Seek to next data. But if we are in data already, just + * carry on. */ + + rv = virStreamInData(stream->st, &inData, &length); + VIR_DEBUG("rv=%d inData=%d length=%lld", rv, inData, length); + + if (rv < 0) { + if (virNetServerProgramSendStreamError(remoteProgram, + client, + msg, + &rerr, + stream->procedure, + stream->serial) < 0) + goto cleanup; + msg = NULL; + + /* We're done with this call */ + goto done; + } else { + if (!inData && length) { + stream->tx = false; + msg->cb = daemonStreamMessageFinished; + msg->opaque = stream; + stream->refs++; + if (virNetServerProgramSendStreamHole(remoteProgram, + client, + msg, + stream->procedure, + stream->serial, + length, + 0) < 0) + goto cleanup; + + msg = NULL; + + /* We have successfully sent stream skip to the other side.
Extra space between "the other"
+ * To keep streams in sync seek locally too. */ + virStreamSendHole(stream->st, length, 0); + /* We're done with this call */ + goto done; + } + } + + stream->dataLen = length; + } + + if (stream->allowSkip && + bufferLen > stream->dataLen) + bufferLen = stream->dataLen; + rv = virStreamRecv(stream->st, buffer, bufferLen); if (rv == -2) { /* Should never get this, since we're only called when we know @@ -834,6 +890,8 @@ daemonStreamHandleRead(virNetServerClientPtr client, goto cleanup; msg = NULL; } else { + stream->dataLen -= rv; +
Since dataLen is only "set" if stream->allowSkip - should this be fenced similarly? Not that I see ->dataLen being used for anything other than sparse stream mgmt... Reviewed-by: John Ferlan <jferlan@redhat.com> John
stream->tx = false; if (rv == 0) stream->recvEOF = true; @@ -851,6 +909,7 @@ daemonStreamHandleRead(virNetServerClientPtr client, msg = NULL; }
+ done: ret = 0; cleanup: VIR_FREE(buffer);

Basically, what is needed here is to introduce new message type for the messages passed between the event loop callbacks and the worker thread that does all the I/O. The idea is that instead of a queue of read buffers we will have a queue where "hole of size X" messages appear. That way the even loop callbacks can just check the head of the queue and see if the worker thread is in data or a hole section and how long the section is. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/storage/storage_util.c | 4 +- src/util/virfdstream.c | 239 ++++++++++++++++++++++++++++++++++++++++----- src/util/virfdstream.h | 1 + 3 files changed, 220 insertions(+), 24 deletions(-) diff --git a/src/storage/storage_util.c b/src/storage/storage_util.c index 43f3561f8..908cad874 100644 --- a/src/storage/storage_util.c +++ b/src/storage/storage_util.c @@ -2427,7 +2427,7 @@ virStorageBackendVolUploadLocal(virConnectPtr conn ATTRIBUTE_UNUSED, /* Not using O_CREAT because the file is required to already exist at * this point */ ret = virFDStreamOpenBlockDevice(stream, target_path, - offset, len, O_WRONLY); + offset, len, false, O_WRONLY); cleanup: VIR_FREE(path); @@ -2465,7 +2465,7 @@ virStorageBackendVolDownloadLocal(virConnectPtr conn ATTRIBUTE_UNUSED, } ret = virFDStreamOpenBlockDevice(stream, target_path, - offset, len, O_RDONLY); + offset, len, false, O_RDONLY); cleanup: VIR_FREE(path); diff --git a/src/util/virfdstream.c b/src/util/virfdstream.c index 4b42939e7..ba209025a 100644 --- a/src/util/virfdstream.c +++ b/src/util/virfdstream.c @@ -51,6 +51,7 @@ VIR_LOG_INIT("fdstream"); typedef enum { VIR_FDSTREAM_MSG_TYPE_DATA, + VIR_FDSTREAM_MSG_TYPE_HOLE, } virFDStreamMsgType; typedef struct _virFDStreamMsg virFDStreamMsg; @@ -66,6 +67,9 @@ struct _virFDStreamMsg { size_t len; size_t offset; } data; + struct { + long long len; + } hole; } stream; }; @@ -198,6 +202,9 @@ virFDStreamMsgFree(virFDStreamMsgPtr msg) case VIR_FDSTREAM_MSG_TYPE_DATA: VIR_FREE(msg->stream.data.buf); break; + case VIR_FDSTREAM_MSG_TYPE_HOLE: + /* nada */ + break; } VIR_FREE(msg); @@ -385,6 +392,7 @@ struct _virFDStreamThreadData { virStreamPtr st; size_t length; bool doRead; + bool sparse; int fdin; char *fdinname; int fdout; @@ -407,34 +415,68 @@ virFDStreamThreadDataFree(virFDStreamThreadDataPtr data) static ssize_t virFDStreamThreadDoRead(virFDStreamDataPtr fdst, + bool sparse, const int fdin, const int fdout, const char *fdinname, const char *fdoutname, + size_t *dataLen, size_t buflen) { virFDStreamMsgPtr msg = NULL; + int inData = 0; + long long sectionLen = 0; char *buf = NULL; ssize_t got; + if (sparse && *dataLen == 0) { + if (virFileInData(fdin, &inData, §ionLen) < 0) + goto error; + + if (inData) + *dataLen = sectionLen; + } + if (VIR_ALLOC(msg) < 0) goto error; - if (VIR_ALLOC_N(buf, buflen) < 0) - goto error; - - if ((got = saferead(fdin, buf, buflen)) < 0) { - virReportSystemError(errno, - _("Unable to read %s"), - fdinname); - goto error; + if (sparse && *dataLen == 0) { + msg->type = VIR_FDSTREAM_MSG_TYPE_HOLE; + msg->stream.hole.len = sectionLen; + got = sectionLen; + + /* HACK. The message queue is one directional. So caller + * cannot make us skip the hole. Do that for them instead. */ + if (sectionLen && + lseek(fdin, sectionLen, SEEK_CUR) == (off_t) -1) { + virReportSystemError(errno, + _("unable to seek in %s"), + fdinname); + goto error; + } + } else { + if (sparse && + buflen > *dataLen) + buflen = *dataLen; + + if (VIR_ALLOC_N(buf, buflen) < 0) + goto error; + + if ((got = saferead(fdin, buf, buflen)) < 0) { + virReportSystemError(errno, + _("Unable to read %s"), + fdinname); + goto error; + } + + msg->type = VIR_FDSTREAM_MSG_TYPE_DATA; + msg->stream.data.buf = buf; + msg->stream.data.len = got; + buf = NULL; + if (sparse) + *dataLen -= got; } - msg->type = VIR_FDSTREAM_MSG_TYPE_DATA; - msg->stream.data.buf = buf; - msg->stream.data.len = got; - buf = NULL; - virFDStreamMsgQueuePush(fdst, msg, fdout, fdoutname); msg = NULL; @@ -449,6 +491,7 @@ virFDStreamThreadDoRead(virFDStreamDataPtr fdst, static ssize_t virFDStreamThreadDoWrite(virFDStreamDataPtr fdst, + bool sparse, const int fdin, const int fdout, const char *fdinname, @@ -456,6 +499,7 @@ virFDStreamThreadDoWrite(virFDStreamDataPtr fdst, { ssize_t got; virFDStreamMsgPtr msg = fdst->msg; + off_t off; bool pop = false; switch (msg->type) { @@ -474,6 +518,32 @@ virFDStreamThreadDoWrite(virFDStreamDataPtr fdst, pop = msg->stream.data.offset == msg->stream.data.len; break; + + case VIR_FDSTREAM_MSG_TYPE_HOLE: + if (!sparse) { + virReportError(VIR_ERR_INTERNAL_ERROR, "%s", + _("unexpected stream hole")); + return -1; + } + + got = msg->stream.hole.len; + off = lseek(fdout, got, SEEK_CUR); + if (off == (off_t) -1) { + virReportSystemError(errno, + _("unable to seek in %s"), + fdoutname); + return -1; + } + + if (ftruncate(fdout, off) < 0) { + virReportSystemError(errno, + _("unable to truncate %s"), + fdoutname); + return -1; + } + + pop = true; + break; } if (pop) { @@ -491,6 +561,7 @@ virFDStreamThread(void *opaque) virFDStreamThreadDataPtr data = opaque; virStreamPtr st = data->st; size_t length = data->length; + bool sparse = data->sparse; int fdin = data->fdin; char *fdinname = data->fdinname; int fdout = data->fdout; @@ -499,6 +570,7 @@ virFDStreamThread(void *opaque) bool doRead = fdst->threadDoRead; size_t buflen = 256 * 1024; size_t total = 0; + size_t dataLen = 0; virObjectRef(fdst); virObjectLock(fdst); @@ -533,12 +605,12 @@ virFDStreamThread(void *opaque) } if (doRead) - got = virFDStreamThreadDoRead(fdst, + got = virFDStreamThreadDoRead(fdst, sparse, fdin, fdout, fdinname, fdoutname, - buflen); + &dataLen, buflen); else - got = virFDStreamThreadDoWrite(fdst, + got = virFDStreamThreadDoWrite(fdst, sparse, fdin, fdout, fdinname, fdoutname); @@ -809,6 +881,14 @@ static int virFDStreamRead(virStreamPtr st, char *bytes, size_t nbytes) } } + /* Shortcut, if the stream is in the trailing hole, + * return 0 immediately. */ + if (msg->type == VIR_FDSTREAM_MSG_TYPE_HOLE && + msg->stream.hole.len == 0) { + ret = 0; + goto cleanup; + } + if (msg->type != VIR_FDSTREAM_MSG_TYPE_DATA) { /* Nope, nope, I'm outta here */ virReportError(VIR_ERR_INTERNAL_ERROR, "%s", @@ -859,11 +939,123 @@ static int virFDStreamRead(virStreamPtr st, char *bytes, size_t nbytes) } +static int +virFDStreamSendHole(virStreamPtr st, + long long length, + unsigned int flags) +{ + virFDStreamDataPtr fdst = st->privateData; + virFDStreamMsgPtr msg = NULL; + off_t off; + int ret = -1; + + virCheckFlags(0, -1); + + virObjectLock(fdst); + if (fdst->length) { + if (length > fdst->length - fdst->offset) + length = fdst->length - fdst->offset; + fdst->offset += length; + } + + if (fdst->thread) { + /* Things are a bit complicated here. But bear with me. If FDStream is + * in a read mode, then if the message at the queue head is HOLE, just + * pop it. The thread has lseek()-ed anyway. If however, the FDStream + * is in write mode, then tell the thread to do the lseek() for us. + * Under no circumstances we can do the lseek() ourselves here. We + * might mess up file position for the thread. */ + if (fdst->threadDoRead) { + msg = fdst->msg; + if (msg->type != VIR_FDSTREAM_MSG_TYPE_HOLE) { + virReportError(VIR_ERR_INTERNAL_ERROR, "%s", + _("Invalid stream hole")); + goto cleanup; + } + + virFDStreamMsgQueuePop(fdst, fdst->fd, "pipe"); + } else { + if (VIR_ALLOC(msg) < 0) + goto cleanup; + + msg->type = VIR_FDSTREAM_MSG_TYPE_HOLE; + msg->stream.hole.len = length; + virFDStreamMsgQueuePush(fdst, msg, fdst->fd, "pipe"); + msg = NULL; + } + } else { + off = lseek(fdst->fd, length, SEEK_CUR); + if (off == (off_t) -1) { + virReportSystemError(errno, "%s", + _("unable to seek")); + goto cleanup; + } + + if (ftruncate(fdst->fd, off) < 0) { + virReportSystemError(errno, "%s", + _("unable to truncate")); + goto cleanup; + } + } + + ret = 0; + cleanup: + virObjectUnlock(fdst); + virFDStreamMsgFree(msg); + return ret; +} + + +static int +virFDStreamInData(virStreamPtr st, + int *inData, + long long *length) +{ + virFDStreamDataPtr fdst = st->privateData; + int ret = -1; + + virObjectLock(fdst); + + if (fdst->thread) { + virFDStreamMsgPtr msg; + + while (!(msg = fdst->msg)) { + if (fdst->threadQuit) { + *inData = *length = 0; + ret = 0; + goto cleanup; + } else { + virObjectUnlock(fdst); + virCondSignal(&fdst->threadCond); + virObjectLock(fdst); + } + } + + if (msg->type == VIR_FDSTREAM_MSG_TYPE_DATA) { + *inData = 1; + *length = msg->stream.data.len - msg->stream.data.offset; + } else { + *inData = 0; + *length = msg->stream.hole.len; + } + ret = 0; + } else { + ret = virFileInData(fdst->fd, inData, length); + } + + cleanup: + virObjectUnlock(fdst); + return ret; +} + + static virStreamDriver virFDStreamDrv = { .streamSend = virFDStreamWrite, .streamRecv = virFDStreamRead, .streamFinish = virFDStreamClose, .streamAbort = virFDStreamAbort, + .streamSendHole = virFDStreamSendHole, + .streamInData = virFDStreamInData, .streamEventAddCallback = virFDStreamAddCallback, .streamEventUpdateCallback = virFDStreamUpdateCallback, .streamEventRemoveCallback = virFDStreamRemoveCallback @@ -1004,7 +1196,8 @@ virFDStreamOpenFileInternal(virStreamPtr st, unsigned long long length, int oflags, int mode, - bool forceIOHelper) + bool forceIOHelper, + bool sparse) { int fd = -1; int pipefds[2] = { -1, -1 }; @@ -1071,6 +1264,7 @@ virFDStreamOpenFileInternal(virStreamPtr st, threadData->st = virObjectRef(st); threadData->length = length; + threadData->sparse = sparse; if ((oflags & O_ACCMODE) == O_RDONLY) { threadData->fdin = fd; @@ -1120,7 +1314,7 @@ int virFDStreamOpenFile(virStreamPtr st, } return virFDStreamOpenFileInternal(st, path, offset, length, - oflags, 0, false); + oflags, 0, false, false); } int virFDStreamCreateFile(virStreamPtr st, @@ -1133,7 +1327,7 @@ int virFDStreamCreateFile(virStreamPtr st, return virFDStreamOpenFileInternal(st, path, offset, length, oflags | O_CREAT, mode, - false); + false, false); } #ifdef HAVE_CFMAKERAW @@ -1149,7 +1343,7 @@ int virFDStreamOpenPTY(virStreamPtr st, if (virFDStreamOpenFileInternal(st, path, offset, length, oflags | O_CREAT, 0, - false) < 0) + false, false) < 0) return -1; fdst = st->privateData; @@ -1186,7 +1380,7 @@ int virFDStreamOpenPTY(virStreamPtr st, return virFDStreamOpenFileInternal(st, path, offset, length, oflags | O_CREAT, 0, - false); + false, false); } #endif /* !HAVE_CFMAKERAW */ @@ -1194,11 +1388,12 @@ int virFDStreamOpenBlockDevice(virStreamPtr st, const char *path, unsigned long long offset, unsigned long long length, + bool sparse, int oflags) { return virFDStreamOpenFileInternal(st, path, offset, length, - oflags, 0, true); + oflags, 0, true, sparse); } int virFDStreamSetInternalCloseCb(virStreamPtr st, diff --git a/src/util/virfdstream.h b/src/util/virfdstream.h index 34c4c3fc6..887c991d6 100644 --- a/src/util/virfdstream.h +++ b/src/util/virfdstream.h @@ -59,6 +59,7 @@ int virFDStreamOpenBlockDevice(virStreamPtr st, const char *path, unsigned long long offset, unsigned long long length, + bool sparse, int oflags); int virFDStreamSetInternalCloseCb(virStreamPtr st, -- 2.13.0

On 05/16/2017 10:04 AM, Michal Privoznik wrote:
Basically, what is needed here is to introduce new message type for the messages passed between the event loop callbacks and the worker thread that does all the I/O. The idea is that instead of a queue of read buffers we will have a queue where "hole of size X" messages appear. That way the even loop callbacks can just
s/even/event/
check the head of the queue and see if the worker thread is in data or a hole section and how long the section is.
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/storage/storage_util.c | 4 +- src/util/virfdstream.c | 239 ++++++++++++++++++++++++++++++++++++++++----- src/util/virfdstream.h | 1 + 3 files changed, 220 insertions(+), 24 deletions(-)
[...]
diff --git a/src/util/virfdstream.c b/src/util/virfdstream.c index 4b42939e7..ba209025a 100644 --- a/src/util/virfdstream.c +++ b/src/util/virfdstream.c
[...]
static ssize_t virFDStreamThreadDoRead(virFDStreamDataPtr fdst, + bool sparse, const int fdin, const int fdout, const char *fdinname, const char *fdoutname, + size_t *dataLen, size_t buflen) { virFDStreamMsgPtr msg = NULL; + int inData = 0; + long long sectionLen = 0; char *buf = NULL; ssize_t got;
+ if (sparse && *dataLen == 0) { + if (virFileInData(fdin, &inData, §ionLen) < 0) + goto error; + + if (inData) + *dataLen = sectionLen; + } + if (VIR_ALLOC(msg) < 0) goto error;
- if (VIR_ALLOC_N(buf, buflen) < 0) - goto error; - - if ((got = saferead(fdin, buf, buflen)) < 0) { - virReportSystemError(errno, - _("Unable to read %s"), - fdinname); - goto error; + if (sparse && *dataLen == 0) { + msg->type = VIR_FDSTREAM_MSG_TYPE_HOLE; + msg->stream.hole.len = sectionLen; + got = sectionLen; + + /* HACK. The message queue is one directional. So caller
HACK or "By design"
+ * cannot make us skip the hole. Do that for them instead. */ + if (sectionLen && + lseek(fdin, sectionLen, SEEK_CUR) == (off_t) -1) { + virReportSystemError(errno, + _("unable to seek in %s"), + fdinname); + goto error; + }
[...]
+static int +virFDStreamSendHole(virStreamPtr st, + long long length, + unsigned int flags) +{ + virFDStreamDataPtr fdst = st->privateData; + virFDStreamMsgPtr msg = NULL; + off_t off; + int ret = -1; + + virCheckFlags(0, -1); + + virObjectLock(fdst); + if (fdst->length) { + if (length > fdst->length - fdst->offset) + length = fdst->length - fdst->offset; + fdst->offset += length; + } + + if (fdst->thread) { + /* Things are a bit complicated here. But bear with me. If FDStream is
s/But bear with me.//
+ * in a read mode, then if the message at the queue head is HOLE, just + * pop it. The thread has lseek()-ed anyway. If however, the FDStream
However, if the FDStream Reviewed-by: John Ferlan <jferlan@redhat.com> John
+ * is in write mode, then tell the thread to do the lseek() for us. + * Under no circumstances we can do the lseek() ourselves here. We + * might mess up file position for the thread. */ + if (fdst->threadDoRead) { + msg = fdst->msg; + if (msg->type != VIR_FDSTREAM_MSG_TYPE_HOLE) { + virReportError(VIR_ERR_INTERNAL_ERROR, "%s", + _("Invalid stream hole")); + goto cleanup; + } + + virFDStreamMsgQueuePop(fdst, fdst->fd, "pipe"); + } else { + if (VIR_ALLOC(msg) < 0) + goto cleanup; + + msg->type = VIR_FDSTREAM_MSG_TYPE_HOLE; + msg->stream.hole.len = length; + virFDStreamMsgQueuePush(fdst, msg, fdst->fd, "pipe"); + msg = NULL; + }
[...]

Now, not all APIs are going to support sparse streams. To some it makes no sense at all, e.g. virDomainOpenConsole() or virDomainOpenChannel(). To others, we will need a special flag to indicate that client wants to enable sparse streams. Instead of having to write RPC dispatchers by hand we can just annotate in our .x files that a certain flag to certain RPC call enables this feature. For instance: /** * @generate: both * @readstream: 1 * @sparseflag: VIR_SPARSE_STREAM * @acl: storage_vol:data_read */ REMOTE_PROC_DOMAIN_SOME_API = XXX, Therefore, whenever client calls virDomainSomeAPI(.., VIR_SPARSE_STREAM); daemon will mark that down and send stream skips when possible. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/rpc/gendispatch.pl | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/src/rpc/gendispatch.pl b/src/rpc/gendispatch.pl index 98625983a..def88d4f9 100755 --- a/src/rpc/gendispatch.pl +++ b/src/rpc/gendispatch.pl @@ -281,6 +281,13 @@ while (<PROTOCOL>) { $calls{$name}->{streamflag} = "none"; } + if (exists $opts{sparseflag}) { + die "\@sparseflag requires stream" unless $calls{$name}->{streamflag} ne "none"; + $calls{$name}->{sparseflag} = $opts{sparseflag}; + } else { + $calls{$name}->{sparseflag} = "none"; + } + $calls{$name}->{acl} = $opts{acl}; $calls{$name}->{aclfilter} = $opts{aclfilter}; @@ -982,6 +989,11 @@ elsif ($mode eq "server") { if ($call->{streamflag} ne "none") { print " virStreamPtr st = NULL;\n"; print " daemonClientStreamPtr stream = NULL;\n"; + if ($call->{sparseflag} ne "none") { + print " const bool sparse = args->flags & $call->{sparseflag};\n" + } else { + print " const bool sparse = false;\n"; + } } print "\n"; @@ -1024,7 +1036,7 @@ elsif ($mode eq "server") { print " if (!(st = virStreamNew(priv->conn, VIR_STREAM_NONBLOCK)))\n"; print " goto cleanup;\n"; print "\n"; - print " if (!(stream = daemonCreateClientStream(client, st, remoteProgram, &msg->header, false)))\n"; + print " if (!(stream = daemonCreateClientStream(client, st, remoteProgram, &msg->header, sparse)))\n"; print " goto cleanup;\n"; print "\n"; } @@ -1727,6 +1739,11 @@ elsif ($mode eq "client") { if ($call->{streamflag} ne "none") { print " virNetClientStreamPtr netst = NULL;\n"; + if ($call->{sparseflag} ne "none") { + print " const bool sparse = flags & $call->{sparseflag};\n" + } else { + print " const bool sparse = false;\n"; + } } print "\n"; @@ -1738,7 +1755,7 @@ elsif ($mode eq "client") { if ($call->{streamflag} ne "none") { print "\n"; - print " if (!(netst = virNetClientStreamNew(st, priv->remoteProgram, $call->{constname}, priv->counter, false)))\n"; + print " if (!(netst = virNetClientStreamNew(st, priv->remoteProgram, $call->{constname}, priv->counter, sparse)))\n"; print " goto done;\n"; print "\n"; print " if (virNetClientAddStream(priv->client, netst) < 0) {\n"; -- 2.13.0

On 05/16/2017 10:04 AM, Michal Privoznik wrote:
Now, not all APIs are going to support sparse streams. To some it makes no sense at all, e.g. virDomainOpenConsole() or virDomainOpenChannel(). To others, we will need a special flag to indicate that client wants to enable sparse streams. Instead of having to write RPC dispatchers by hand we can just annotate in our .x files that a certain flag to certain RPC call enables this feature. For instance:
/** * @generate: both * @readstream: 1 * @sparseflag: VIR_SPARSE_STREAM * @acl: storage_vol:data_read */ REMOTE_PROC_DOMAIN_SOME_API = XXX,
Therefore, whenever client calls virDomainSomeAPI(.., VIR_SPARSE_STREAM); daemon will mark that down and send stream skips when possible.
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/rpc/gendispatch.pl | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-)
Still a bit of black magic to me ;-), but I see that it works... Reviewed-by: John Ferlan <jferlan@redhat.com> John

On 05/17/2017 05:32 PM, John Ferlan wrote:
On 05/16/2017 10:04 AM, Michal Privoznik wrote:
Now, not all APIs are going to support sparse streams. To some it makes no sense at all, e.g. virDomainOpenConsole() or virDomainOpenChannel(). To others, we will need a special flag to indicate that client wants to enable sparse streams. Instead of having to write RPC dispatchers by hand we can just annotate in our .x files that a certain flag to certain RPC call enables this feature. For instance:
/** * @generate: both * @readstream: 1 * @sparseflag: VIR_SPARSE_STREAM * @acl: storage_vol:data_read */ REMOTE_PROC_DOMAIN_SOME_API = XXX,
Therefore, whenever client calls virDomainSomeAPI(.., VIR_SPARSE_STREAM); daemon will mark that down and send stream skips when possible.
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/rpc/gendispatch.pl | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-)
Still a bit of black magic to me ;-), but I see that it works...
Yes. So the problem this patch fixes is the following: nearly all APIs we have result in RPC call. Except a few ones. Unfortunately, virStreamNew() is one of them. Therefore, if one side creates a stream object, the other side has no knowledge of that. We fix this problem by creating server side stream object in the dispatch function, e.g. remoteDispatchStorageVolDownload. If something fails, we unref the object and we're back to the starting position. However, when creating the stream object we have to know if it is going to be sparse or not (trying to set it afterwards would complicate the code needlessly). And that's exactly what this patch does. It allows us to say "this flag for this RPC call enables sparse stream". Therefore, the generated code of dispatcher functions can then check if the flag is set, and if so create sparse stream. This all could have been avoided if virStreamNew() was RPC call. Then we could: virStreamNew(conn, SPARSE); virStorageVolDownload(); without introducing special flags for each API that wants to utilize sparse stream. Because sparseness is really an attribute of the stream, not StorageVol or whatever API. Well, maybe next time. Michal

These flags to APIs will tell if caller wants to use sparse stream for storage transfer. At the same time, it's safe to enable them in storage driver frontend and rely on our backends checking the flags. This way we can enable specific flags only on some specific backends, e.g. enable VIR_STORAGE_VOL_DOWNLOAD_SPARSE_STREAM for filesystem backend but not iSCSI backend. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- include/libvirt/libvirt-storage.h | 9 +++++++++ src/libvirt-storage.c | 4 ++-- src/remote/remote_protocol.x | 2 ++ src/storage/storage_driver.c | 4 ++-- src/storage/storage_util.c | 10 ++++++---- 5 files changed, 21 insertions(+), 8 deletions(-) diff --git a/include/libvirt/libvirt-storage.h b/include/libvirt/libvirt-storage.h index 45ec72065..4517f713c 100644 --- a/include/libvirt/libvirt-storage.h +++ b/include/libvirt/libvirt-storage.h @@ -346,11 +346,20 @@ virStorageVolPtr virStorageVolCreateXMLFrom (virStoragePoolPtr pool, const char *xmldesc, virStorageVolPtr clonevol, unsigned int flags); + +typedef enum { + VIR_STORAGE_VOL_DOWNLOAD_SPARSE_STREAM = 1 << 0, /* Use sparse stream */ +} virStorageVolDownloadFlags; + int virStorageVolDownload (virStorageVolPtr vol, virStreamPtr stream, unsigned long long offset, unsigned long long length, unsigned int flags); +typedef enum { + VIR_STORAGE_VOL_UPLOAD_SPARSE_STREAM = 1 << 0, /* Use sparse stream */ +} virStorageVolUploadFlags; + int virStorageVolUpload (virStorageVolPtr vol, virStreamPtr stream, unsigned long long offset, diff --git a/src/libvirt-storage.c b/src/libvirt-storage.c index 05eec8a9d..64202998b 100644 --- a/src/libvirt-storage.c +++ b/src/libvirt-storage.c @@ -1549,7 +1549,7 @@ virStorageVolCreateXMLFrom(virStoragePoolPtr pool, * @stream: stream to use as output * @offset: position in @vol to start reading from * @length: limit on amount of data to download - * @flags: extra flags; not used yet, so callers should always pass 0 + * @flags: bitwise-OR of virStorageVolDownloadFlags * * Download the content of the volume as a stream. If @length * is zero, then the remaining contents of the volume after @@ -1613,7 +1613,7 @@ virStorageVolDownload(virStorageVolPtr vol, * @stream: stream to use as input * @offset: position to start writing to * @length: limit on amount of data to upload - * @flags: extra flags; not used yet, so callers should always pass 0 + * @flags: bitwise-OR of virStorageVolUploadFlags * * Upload new content to the volume from a stream. This call * will fail if @offset + @length exceeds the size of the diff --git a/src/remote/remote_protocol.x b/src/remote/remote_protocol.x index 87b2bd365..25e62a181 100644 --- a/src/remote/remote_protocol.x +++ b/src/remote/remote_protocol.x @@ -4896,6 +4896,7 @@ enum remote_procedure { /** * @generate: both * @writestream: 1 + * @sparseflag: VIR_STORAGE_VOL_UPLOAD_SPARSE_STREAM * @acl: storage_vol:data_write */ REMOTE_PROC_STORAGE_VOL_UPLOAD = 208, @@ -4903,6 +4904,7 @@ enum remote_procedure { /** * @generate: both * @readstream: 1 + * @sparseflag: VIR_STORAGE_VOL_DOWNLOAD_SPARSE_STREAM * @acl: storage_vol:data_read */ REMOTE_PROC_STORAGE_VOL_DOWNLOAD = 209, diff --git a/src/storage/storage_driver.c b/src/storage/storage_driver.c index 2103ed11d..1b0d776c7 100644 --- a/src/storage/storage_driver.c +++ b/src/storage/storage_driver.c @@ -2117,7 +2117,7 @@ storageVolDownload(virStorageVolPtr obj, virStorageVolDefPtr vol = NULL; int ret = -1; - virCheckFlags(0, -1); + virCheckFlags(VIR_STORAGE_VOL_DOWNLOAD_SPARSE_STREAM, -1); if (!(vol = virStorageVolDefFromVol(obj, &pool, &backend))) return -1; @@ -2285,7 +2285,7 @@ storageVolUpload(virStorageVolPtr obj, virStorageVolStreamInfoPtr cbdata = NULL; int ret = -1; - virCheckFlags(0, -1); + virCheckFlags(VIR_STORAGE_VOL_UPLOAD_SPARSE_STREAM, -1); if (!(vol = virStorageVolDefFromVol(obj, &pool, &backend))) return -1; diff --git a/src/storage/storage_util.c b/src/storage/storage_util.c index 908cad874..493c651b7 100644 --- a/src/storage/storage_util.c +++ b/src/storage/storage_util.c @@ -2401,8 +2401,9 @@ virStorageBackendVolUploadLocal(virConnectPtr conn ATTRIBUTE_UNUSED, char *target_path = vol->target.path; int ret = -1; int has_snap = 0; + bool sparse = flags & VIR_STORAGE_VOL_UPLOAD_SPARSE_STREAM; - virCheckFlags(0, -1); + virCheckFlags(VIR_STORAGE_VOL_UPLOAD_SPARSE_STREAM, -1); /* if volume has target format VIR_STORAGE_FILE_PLOOP * we need to restore DiskDescriptor.xml, according to * new contents of volume. This operation will be perfomed @@ -2427,7 +2428,7 @@ virStorageBackendVolUploadLocal(virConnectPtr conn ATTRIBUTE_UNUSED, /* Not using O_CREAT because the file is required to already exist at * this point */ ret = virFDStreamOpenBlockDevice(stream, target_path, - offset, len, false, O_WRONLY); + offset, len, sparse, O_WRONLY); cleanup: VIR_FREE(path); @@ -2447,8 +2448,9 @@ virStorageBackendVolDownloadLocal(virConnectPtr conn ATTRIBUTE_UNUSED, char *target_path = vol->target.path; int ret = -1; int has_snap = 0; + bool sparse = flags & VIR_STORAGE_VOL_DOWNLOAD_SPARSE_STREAM; - virCheckFlags(0, -1); + virCheckFlags(VIR_STORAGE_VOL_DOWNLOAD_SPARSE_STREAM, -1); if (vol->target.format == VIR_STORAGE_FILE_PLOOP) { has_snap = storageBackendPloopHasSnapshots(vol->target.path); if (has_snap < 0) { @@ -2465,7 +2467,7 @@ virStorageBackendVolDownloadLocal(virConnectPtr conn ATTRIBUTE_UNUSED, } ret = virFDStreamOpenBlockDevice(stream, target_path, - offset, len, false, O_RDONLY); + offset, len, sparse, O_RDONLY); cleanup: VIR_FREE(path); -- 2.13.0

On 05/16/2017 10:04 AM, Michal Privoznik wrote:
These flags to APIs will tell if caller wants to use sparse stream for storage transfer. At the same time, it's safe to enable them in storage driver frontend and rely on our backends checking the flags. This way we can enable specific flags only on some specific backends, e.g. enable VIR_STORAGE_VOL_DOWNLOAD_SPARSE_STREAM for filesystem backend but not iSCSI backend.
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- include/libvirt/libvirt-storage.h | 9 +++++++++ src/libvirt-storage.c | 4 ++-- src/remote/remote_protocol.x | 2 ++ src/storage/storage_driver.c | 4 ++-- src/storage/storage_util.c | 10 ++++++---- 5 files changed, 21 insertions(+), 8 deletions(-)
diff --git a/include/libvirt/libvirt-storage.h b/include/libvirt/libvirt-storage.h index 45ec72065..4517f713c 100644 --- a/include/libvirt/libvirt-storage.h +++ b/include/libvirt/libvirt-storage.h @@ -346,11 +346,20 @@ virStorageVolPtr virStorageVolCreateXMLFrom (virStoragePoolPtr pool, const char *xmldesc, virStorageVolPtr clonevol, unsigned int flags); + +typedef enum { + VIR_STORAGE_VOL_DOWNLOAD_SPARSE_STREAM = 1 << 0, /* Use sparse stream */ +} virStorageVolDownloadFlags; + int virStorageVolDownload (virStorageVolPtr vol, virStreamPtr stream, unsigned long long offset, unsigned long long length, unsigned int flags); +typedef enum { + VIR_STORAGE_VOL_UPLOAD_SPARSE_STREAM = 1 << 0, /* Use sparse stream */ +} virStorageVolUploadFlags; +
/me wonders should the backend specific concerns be described in comments prior to each enum or is that too specific. Maybe it's more of a 'specific backends' that perform "file based manipulation" (rather than block based)... I dunno. I'll leave it to you though - the more documentation now while it's fresh in your mind the better.
int virStorageVolUpload (virStorageVolPtr vol, virStreamPtr stream, unsigned long long offset, diff --git a/src/libvirt-storage.c b/src/libvirt-storage.c index 05eec8a9d..64202998b 100644 --- a/src/libvirt-storage.c +++ b/src/libvirt-storage.c @@ -1549,7 +1549,7 @@ virStorageVolCreateXMLFrom(virStoragePoolPtr pool, * @stream: stream to use as output * @offset: position in @vol to start reading from * @length: limit on amount of data to download - * @flags: extra flags; not used yet, so callers should always pass 0 + * @flags: bitwise-OR of virStorageVolDownloadFlags * * Download the content of the volume as a stream. If @length * is zero, then the remaining contents of the volume after @@ -1613,7 +1613,7 @@ virStorageVolDownload(virStorageVolPtr vol, * @stream: stream to use as input * @offset: position to start writing to * @length: limit on amount of data to upload - * @flags: extra flags; not used yet, so callers should always pass 0 + * @flags: bitwise-OR of virStorageVolUploadFlags * * Upload new content to the volume from a stream. This call * will fail if @offset + @length exceeds the size of the
I suppose for each you c(sh)ould have documented what the specific FLAG does and the expectations therein.. Reviewed-by: John Ferlan <jferlan@redhat.com> John

On 05/17/2017 05:42 PM, John Ferlan wrote:
On 05/16/2017 10:04 AM, Michal Privoznik wrote:
These flags to APIs will tell if caller wants to use sparse stream for storage transfer. At the same time, it's safe to enable them in storage driver frontend and rely on our backends checking the flags. This way we can enable specific flags only on some specific backends, e.g. enable VIR_STORAGE_VOL_DOWNLOAD_SPARSE_STREAM for filesystem backend but not iSCSI backend.
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- include/libvirt/libvirt-storage.h | 9 +++++++++ src/libvirt-storage.c | 4 ++-- src/remote/remote_protocol.x | 2 ++ src/storage/storage_driver.c | 4 ++-- src/storage/storage_util.c | 10 ++++++---- 5 files changed, 21 insertions(+), 8 deletions(-)
diff --git a/include/libvirt/libvirt-storage.h b/include/libvirt/libvirt-storage.h index 45ec72065..4517f713c 100644 --- a/include/libvirt/libvirt-storage.h +++ b/include/libvirt/libvirt-storage.h @@ -346,11 +346,20 @@ virStorageVolPtr virStorageVolCreateXMLFrom (virStoragePoolPtr pool, const char *xmldesc, virStorageVolPtr clonevol, unsigned int flags); + +typedef enum { + VIR_STORAGE_VOL_DOWNLOAD_SPARSE_STREAM = 1 << 0, /* Use sparse stream */ +} virStorageVolDownloadFlags; + int virStorageVolDownload (virStorageVolPtr vol, virStreamPtr stream, unsigned long long offset, unsigned long long length, unsigned int flags); +typedef enum { + VIR_STORAGE_VOL_UPLOAD_SPARSE_STREAM = 1 << 0, /* Use sparse stream */ +} virStorageVolUploadFlags; +
/me wonders should the backend specific concerns be described in comments prior to each enum or is that too specific. Maybe it's more of a 'specific backends' that perform "file based manipulation" (rather than block based)... I dunno. I'll leave it to you though - the more documentation now while it's fresh in your mind the better.
int virStorageVolUpload (virStorageVolPtr vol, virStreamPtr stream, unsigned long long offset, diff --git a/src/libvirt-storage.c b/src/libvirt-storage.c index 05eec8a9d..64202998b 100644 --- a/src/libvirt-storage.c +++ b/src/libvirt-storage.c @@ -1549,7 +1549,7 @@ virStorageVolCreateXMLFrom(virStoragePoolPtr pool, * @stream: stream to use as output * @offset: position in @vol to start reading from * @length: limit on amount of data to download - * @flags: extra flags; not used yet, so callers should always pass 0 + * @flags: bitwise-OR of virStorageVolDownloadFlags * * Download the content of the volume as a stream. If @length * is zero, then the remaining contents of the volume after @@ -1613,7 +1613,7 @@ virStorageVolDownload(virStorageVolPtr vol, * @stream: stream to use as input * @offset: position to start writing to * @length: limit on amount of data to upload - * @flags: extra flags; not used yet, so callers should always pass 0 + * @flags: bitwise-OR of virStorageVolUploadFlags * * Upload new content to the volume from a stream. This call * will fail if @offset + @length exceeds the size of the
I suppose for each you c(sh)ould have documented what the specific FLAG does and the expectations therein..
How about this? iff --git i/src/libvirt-storage.c w/src/libvirt-storage.c index 64202998b..35f9926d5 100644 --- i/src/libvirt-storage.c +++ w/src/libvirt-storage.c @@ -1555,6 +1555,13 @@ virStorageVolCreateXMLFrom(virStoragePoolPtr pool, * is zero, then the remaining contents of the volume after * @offset will be downloaded. * + * If VIR_STORAGE_VOL_DOWNLOAD_SPARSE_STREAM is set in @flags + * effective transmission of holes is enabled. This assumes using + * the @stream with combination of virStreamSparseRecvAll() or + * virStreamRecvFlags(stream, ..., flags = + * VIR_STREAM_RECV_STOP_AT_HOLE) for honouring holes sent by + * server. + * * This call sets up an asynchronous stream; subsequent use of * stream APIs is necessary to transfer the actual data, * determine how much data is successfully transferred, and @@ -1621,6 +1628,11 @@ virStorageVolDownload(virStorageVolPtr vol, * will be raised if an attempt is made to upload greater * than @length bytes of data. * + * If VIR_STORAGE_VOL_UPLOAD_SPARSE_STREAM is set in @flags + * effective transmission of holes is enabled. This assumes using + * the @stream with combination of virStreamSparseSendAll() or + * virStreamSendHole() to preserve source file sparseness. + * * This call sets up an asynchronous stream; subsequent use of * stream APIs is necessary to transfer the actual data, * determine how much data is successfully transferred, and Michal

On 05/17/2017 12:30 PM, Michal Privoznik wrote:
On 05/17/2017 05:42 PM, John Ferlan wrote:
On 05/16/2017 10:04 AM, Michal Privoznik wrote:
These flags to APIs will tell if caller wants to use sparse stream for storage transfer. At the same time, it's safe to enable them in storage driver frontend and rely on our backends checking the flags. This way we can enable specific flags only on some specific backends, e.g. enable VIR_STORAGE_VOL_DOWNLOAD_SPARSE_STREAM for filesystem backend but not iSCSI backend.
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- include/libvirt/libvirt-storage.h | 9 +++++++++ src/libvirt-storage.c | 4 ++-- src/remote/remote_protocol.x | 2 ++ src/storage/storage_driver.c | 4 ++-- src/storage/storage_util.c | 10 ++++++---- 5 files changed, 21 insertions(+), 8 deletions(-)
diff --git a/include/libvirt/libvirt-storage.h b/include/libvirt/libvirt-storage.h index 45ec72065..4517f713c 100644 --- a/include/libvirt/libvirt-storage.h +++ b/include/libvirt/libvirt-storage.h @@ -346,11 +346,20 @@ virStorageVolPtr virStorageVolCreateXMLFrom (virStoragePoolPtr pool, const char *xmldesc, virStorageVolPtr clonevol, unsigned int flags); + +typedef enum { + VIR_STORAGE_VOL_DOWNLOAD_SPARSE_STREAM = 1 << 0, /* Use sparse stream */ +} virStorageVolDownloadFlags; + int virStorageVolDownload (virStorageVolPtr vol, virStreamPtr stream, unsigned long long offset, unsigned long long length, unsigned int flags); +typedef enum { + VIR_STORAGE_VOL_UPLOAD_SPARSE_STREAM = 1 << 0, /* Use sparse stream */ +} virStorageVolUploadFlags; +
/me wonders should the backend specific concerns be described in comments prior to each enum or is that too specific. Maybe it's more of a 'specific backends' that perform "file based manipulation" (rather than block based)... I dunno. I'll leave it to you though - the more documentation now while it's fresh in your mind the better.
int virStorageVolUpload (virStorageVolPtr vol, virStreamPtr stream, unsigned long long offset, diff --git a/src/libvirt-storage.c b/src/libvirt-storage.c index 05eec8a9d..64202998b 100644 --- a/src/libvirt-storage.c +++ b/src/libvirt-storage.c @@ -1549,7 +1549,7 @@ virStorageVolCreateXMLFrom(virStoragePoolPtr pool, * @stream: stream to use as output * @offset: position in @vol to start reading from * @length: limit on amount of data to download - * @flags: extra flags; not used yet, so callers should always pass 0 + * @flags: bitwise-OR of virStorageVolDownloadFlags * * Download the content of the volume as a stream. If @length * is zero, then the remaining contents of the volume after @@ -1613,7 +1613,7 @@ virStorageVolDownload(virStorageVolPtr vol, * @stream: stream to use as input * @offset: position to start writing to * @length: limit on amount of data to upload - * @flags: extra flags; not used yet, so callers should always pass 0 + * @flags: bitwise-OR of virStorageVolUploadFlags * * Upload new content to the volume from a stream. This call * will fail if @offset + @length exceeds the size of the
I suppose for each you c(sh)ould have documented what the specific FLAG does and the expectations therein..
How about this?
Fine - Reviewed-by: John Ferlan <jferlan@redhat.com> John
iff --git i/src/libvirt-storage.c w/src/libvirt-storage.c index 64202998b..35f9926d5 100644 --- i/src/libvirt-storage.c +++ w/src/libvirt-storage.c @@ -1555,6 +1555,13 @@ virStorageVolCreateXMLFrom(virStoragePoolPtr pool, * is zero, then the remaining contents of the volume after * @offset will be downloaded. * + * If VIR_STORAGE_VOL_DOWNLOAD_SPARSE_STREAM is set in @flags + * effective transmission of holes is enabled. This assumes using + * the @stream with combination of virStreamSparseRecvAll() or + * virStreamRecvFlags(stream, ..., flags = + * VIR_STREAM_RECV_STOP_AT_HOLE) for honouring holes sent by + * server. + * * This call sets up an asynchronous stream; subsequent use of * stream APIs is necessary to transfer the actual data, * determine how much data is successfully transferred, and @@ -1621,6 +1628,11 @@ virStorageVolDownload(virStorageVolPtr vol, * will be raised if an attempt is made to upload greater * than @length bytes of data. * + * If VIR_STORAGE_VOL_UPLOAD_SPARSE_STREAM is set in @flags + * effective transmission of holes is enabled. This assumes using + * the @stream with combination of virStreamSparseSendAll() or + * virStreamSendHole() to preserve source file sparseness. + * * This call sets up an asynchronous stream; subsequent use of * stream APIs is necessary to transfer the actual data, * determine how much data is successfully transferred, and
Michal

The command grew new --sparse switch that does nothing more than enables the sparse streams feature for this command. Among with the switch new helper function is introduced: virshStreamSkip(). This is the callback that is called whenever daemon sends us a hole. In the callback we reflect the hole in underlying file by seeking as many bytes as told. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- tools/virsh-util.c | 18 ++++++++++++++++++ tools/virsh-util.h | 5 +++++ tools/virsh-volume.c | 12 ++++++++++-- tools/virsh.pod | 3 ++- 4 files changed, 35 insertions(+), 3 deletions(-) diff --git a/tools/virsh-util.c b/tools/virsh-util.c index 4b86e29cb..198625bdb 100644 --- a/tools/virsh-util.c +++ b/tools/virsh-util.c @@ -153,6 +153,24 @@ virshStreamSink(virStreamPtr st ATTRIBUTE_UNUSED, } +int +virshStreamSkip(virStreamPtr st ATTRIBUTE_UNUSED, + long long offset, + void *opaque) +{ + int *fd = opaque; + off_t cur; + + if ((cur = lseek(*fd, offset, SEEK_CUR)) == (off_t) -1) + return -1; + + if (ftruncate(*fd, cur) < 0) + return -1; + + return 0; +} + + void virshDomainFree(virDomainPtr dom) { diff --git a/tools/virsh-util.h b/tools/virsh-util.h index 64cef23c0..0aba247f6 100644 --- a/tools/virsh-util.h +++ b/tools/virsh-util.h @@ -57,6 +57,11 @@ virshStreamSink(virStreamPtr st, size_t nbytes, void *opaque); +int +virshStreamSkip(virStreamPtr st, + long long offset, + void *opaque); + int virshDomainGetXMLFromDom(vshControl *ctl, virDomainPtr dom, diff --git a/tools/virsh-volume.c b/tools/virsh-volume.c index 66fe70ea7..3d19b745e 100644 --- a/tools/virsh-volume.c +++ b/tools/virsh-volume.c @@ -763,6 +763,10 @@ static const vshCmdOptDef opts_vol_download[] = { .type = VSH_OT_INT, .help = N_("amount of data to download") }, + {.name = "sparse", + .type = VSH_OT_BOOL, + .help = N_("preserve sparseness of volume") + }, {.name = NULL} }; @@ -778,6 +782,7 @@ cmdVolDownload(vshControl *ctl, const vshCmd *cmd) unsigned long long offset = 0, length = 0; bool created = false; virshControlPtr priv = ctl->privData; + unsigned int flags = 0; if (vshCommandOptULongLong(ctl, cmd, "offset", &offset) < 0) return false; @@ -791,6 +796,9 @@ cmdVolDownload(vshControl *ctl, const vshCmd *cmd) if (vshCommandOptStringReq(ctl, cmd, "file", &file) < 0) goto cleanup; + if (vshCommandOptBool(cmd, "sparse")) + flags |= VIR_STORAGE_VOL_DOWNLOAD_SPARSE_STREAM; + if ((fd = open(file, O_WRONLY|O_CREAT|O_EXCL, 0666)) < 0) { if (errno != EEXIST || (fd = open(file, O_WRONLY|O_TRUNC, 0666)) < 0) { @@ -806,12 +814,12 @@ cmdVolDownload(vshControl *ctl, const vshCmd *cmd) goto cleanup; } - if (virStorageVolDownload(vol, st, offset, length, 0) < 0) { + if (virStorageVolDownload(vol, st, offset, length, flags) < 0) { vshError(ctl, _("cannot download from volume %s"), name); goto cleanup; } - if (virStreamRecvAll(st, virshStreamSink, &fd) < 0) { + if (virStreamSparseRecvAll(st, virshStreamSink, virshStreamSkip, &fd) < 0) { vshError(ctl, _("cannot receive data from volume %s"), name); goto cleanup; } diff --git a/tools/virsh.pod b/tools/virsh.pod index 727acf6e6..dcaa0c170 100644 --- a/tools/virsh.pod +++ b/tools/virsh.pod @@ -3942,12 +3942,13 @@ regarding possible target volume and pool changes as a result of the pool refresh when the upload is attempted. =item B<vol-download> [I<--pool> I<pool-or-uuid>] [I<--offset> I<bytes>] -[I<--length> I<bytes>] I<vol-name-or-key-or-path> I<local-file> +[I<--length> I<bytes>] [I<--sparse>] I<vol-name-or-key-or-path> I<local-file> Download the contents of a storage volume to I<local-file>. I<--pool> I<pool-or-uuid> is the name or UUID of the storage pool the volume is in. I<vol-name-or-key-or-path> is the name or key or path of the volume to download. +If I<--sparse> is specified, this command will preserve volume sparseness. I<--offset> is the position in the storage volume at which to start reading the data. The value must be 0 or larger. I<--length> is an upper bound of the amount of data to be downloaded. A negative value is interpreted as -- 2.13.0

On 05/16/2017 10:04 AM, Michal Privoznik wrote:
The command grew new --sparse switch that does nothing more than
s/The command grew new/Add a new/
enables the sparse streams feature for this command. Among with the switch new helper function is introduced: virshStreamSkip(). This is the callback that is called whenever daemon sends us a hole. In the callback we reflect the hole in underlying file by seeking as many bytes as told.
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- tools/virsh-util.c | 18 ++++++++++++++++++ tools/virsh-util.h | 5 +++++ tools/virsh-volume.c | 12 ++++++++++-- tools/virsh.pod | 3 ++- 4 files changed, 35 insertions(+), 3 deletions(-)
Reviewed-by: John Ferlan <jferlan@redhat.com> John

Similarly to previous commit, implement sparse streams feature for vol-upload. This is, however, slightly different approach, because we must implement a function that will tell us whether we are in a data section or in a hole. But there's no magic hidden in here. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- tools/virsh-util.c | 47 +++++++++++++++++++++++++++++++++++++++++++++++ tools/virsh-util.h | 24 ++++++++++++++++++++++++ tools/virsh-volume.c | 38 +++++++++++++++++++++++++------------- tools/virsh.pod | 3 ++- 4 files changed, 98 insertions(+), 14 deletions(-) diff --git a/tools/virsh-util.c b/tools/virsh-util.c index 198625bdb..44be3ad64 100644 --- a/tools/virsh-util.c +++ b/tools/virsh-util.c @@ -153,6 +153,35 @@ virshStreamSink(virStreamPtr st ATTRIBUTE_UNUSED, } +int +virshStreamSource(virStreamPtr st ATTRIBUTE_UNUSED, + char *bytes, + size_t nbytes, + void *opaque) +{ + virshStreamCallbackDataPtr cbData = opaque; + int fd = cbData->fd; + + return saferead(fd, bytes, nbytes); +} + + +int +virshStreamSourceSkip(virStreamPtr st ATTRIBUTE_UNUSED, + long long offset, + void *opaque) +{ + virshStreamCallbackDataPtr cbData = opaque; + int fd = cbData->fd; + off_t cur; + + if ((cur = lseek(fd, offset, SEEK_CUR)) == (off_t) -1) + return -1; + + return 0; +} + + int virshStreamSkip(virStreamPtr st ATTRIBUTE_UNUSED, long long offset, @@ -171,6 +200,24 @@ virshStreamSkip(virStreamPtr st ATTRIBUTE_UNUSED, } +int +virshStreamInData(virStreamPtr st ATTRIBUTE_UNUSED, + int *inData, + long long *offset, + void *opaque) +{ + virshStreamCallbackDataPtr cbData = opaque; + vshControl *ctl = cbData->ctl; + int fd = cbData->fd; + int ret; + + if ((ret = virFileInData(fd, inData, offset)) < 0) + vshError(ctl, "%s", _("Unable to get current position in stream")); + + return ret; +} + + void virshDomainFree(virDomainPtr dom) { diff --git a/tools/virsh-util.h b/tools/virsh-util.h index 0aba247f6..9a0af3513 100644 --- a/tools/virsh-util.h +++ b/tools/virsh-util.h @@ -57,11 +57,35 @@ virshStreamSink(virStreamPtr st, size_t nbytes, void *opaque); +typedef struct _virshStreamCallbackData virshStreamCallbackData; +typedef virshStreamCallbackData *virshStreamCallbackDataPtr; +struct _virshStreamCallbackData { + vshControl *ctl; + int fd; +}; + +int +virshStreamSource(virStreamPtr st, + char *bytes, + size_t nbytes, + void *opaque); + +int +virshStreamSourceSkip(virStreamPtr st, + long long offset, + void *opaque); + int virshStreamSkip(virStreamPtr st, long long offset, void *opaque); +int +virshStreamInData(virStreamPtr st, + int *inData, + long long *offset, + void *opaque); + int virshDomainGetXMLFromDom(vshControl *ctl, virDomainPtr dom, diff --git a/tools/virsh-volume.c b/tools/virsh-volume.c index 3d19b745e..0736bdcdb 100644 --- a/tools/virsh-volume.c +++ b/tools/virsh-volume.c @@ -660,18 +660,13 @@ static const vshCmdOptDef opts_vol_upload[] = { .type = VSH_OT_INT, .help = N_("amount of data to upload") }, + {.name = "sparse", + .type = VSH_OT_BOOL, + .help = N_("preserve sparseness of volume") + }, {.name = NULL} }; -static int -cmdVolUploadSource(virStreamPtr st ATTRIBUTE_UNUSED, - char *bytes, size_t nbytes, void *opaque) -{ - int *fd = opaque; - - return saferead(*fd, bytes, nbytes); -} - static bool cmdVolUpload(vshControl *ctl, const vshCmd *cmd) { @@ -683,6 +678,8 @@ cmdVolUpload(vshControl *ctl, const vshCmd *cmd) const char *name = NULL; unsigned long long offset = 0, length = 0; virshControlPtr priv = ctl->privData; + unsigned int flags = 0; + virshStreamCallbackData cbData; if (vshCommandOptULongLong(ctl, cmd, "offset", &offset) < 0) return false; @@ -701,19 +698,34 @@ cmdVolUpload(vshControl *ctl, const vshCmd *cmd) goto cleanup; } + cbData.ctl = ctl; + cbData.fd = fd; + + if (vshCommandOptBool(cmd, "sparse")) + flags |= VIR_STORAGE_VOL_UPLOAD_SPARSE_STREAM; + if (!(st = virStreamNew(priv->conn, 0))) { vshError(ctl, _("cannot create a new stream")); goto cleanup; } - if (virStorageVolUpload(vol, st, offset, length, 0) < 0) { + if (virStorageVolUpload(vol, st, offset, length, flags) < 0) { vshError(ctl, _("cannot upload to volume %s"), name); goto cleanup; } - if (virStreamSendAll(st, cmdVolUploadSource, &fd) < 0) { - vshError(ctl, _("cannot send data to volume %s"), name); - goto cleanup; + if (flags & VIR_STORAGE_VOL_UPLOAD_SPARSE_STREAM) { + if (virStreamSparseSendAll(st, virshStreamSource, + virshStreamInData, + virshStreamSourceSkip, &cbData) < 0) { + vshError(ctl, _("cannot send data to volume %s"), name); + goto cleanup; + } + } else { + if (virStreamSendAll(st, virshStreamSource, &cbData) < 0) { + vshError(ctl, _("cannot send data to volume %s"), name); + goto cleanup; + } } if (VIR_CLOSE(fd) < 0) { diff --git a/tools/virsh.pod b/tools/virsh.pod index dcaa0c170..42769170d 100644 --- a/tools/virsh.pod +++ b/tools/virsh.pod @@ -3924,13 +3924,14 @@ the storage volume should be deleted as well. Not all storage drivers support this option, presently only rbd. =item B<vol-upload> [I<--pool> I<pool-or-uuid>] [I<--offset> I<bytes>] -[I<--length> I<bytes>] I<vol-name-or-key-or-path> I<local-file> +[I<--length> I<bytes>] [I<--sparse>] I<vol-name-or-key-or-path> I<local-file> Upload the contents of I<local-file> to a storage volume. I<--pool> I<pool-or-uuid> is the name or UUID of the storage pool the volume is in. I<vol-name-or-key-or-path> is the name or key or path of the volume where the file will be uploaded. +If I<--sparse> is specified, this command will preserve volume sparseness. I<--offset> is the position in the storage volume at which to start writing the data. The value must be 0 or larger. I<--length> is an upper bound of the amount of data to be uploaded. A negative value is interpreted -- 2.13.0

On 05/16/2017 10:04 AM, Michal Privoznik wrote:
Similarly to previous commit, implement sparse streams feature for vol-upload. This is, however, slightly different approach, because we must implement a function that will tell us whether we are in a data section or in a hole. But there's no magic hidden in here.
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- tools/virsh-util.c | 47 +++++++++++++++++++++++++++++++++++++++++++++++ tools/virsh-util.h | 24 ++++++++++++++++++++++++ tools/virsh-volume.c | 38 +++++++++++++++++++++++++------------- tools/virsh.pod | 3 ++- 4 files changed, 98 insertions(+), 14 deletions(-)
Reviewed-by: John Ferlan <jferlan@redhat.com> John

On 05/16/2017 10:03 AM, Michal Privoznik wrote: Don't forget the 32/31 patch which updates the news file... John
v3 of:
https://www.redhat.com/archives/libvir-list/2017-April/msg00671.html
All the patches can be found on my github:
https://github.com/zippy2/libvirt/tree/sparse_streams2
diff to v2: - renamed APIs from Skip & GetHoleSize to SendHole & RecvHole - switched from 'unsigned long long len' to 'long long len' (where len is size of a hole) - introduced @flags to public APIs for future extensibility - couple of coding style fixes - couple of fixes suggested by John in review of v2
As expressed earlier, a lot of these patches should have Reviewed-by tag as John reviewed majority of them. But we don't have a clear agreement when to use the tag, so I'm not putting it in just yet. However, will do before pushing.
Some patches were ACKed. However, changes described above changed them, so I'm not sure ACK still stands.
Michal Privoznik (31): virfdstream: Use messages instead of pipe util: Introduce virFileInData Introduce virStreamRecvFlags Implement virStreamRecvFlags to some drivers Introduce virStreamSendHole Introduce virStreamRecvHole Introduce VIR_STREAM_RECV_STOP_AT_HOLE flag Introduce virStreamSparseRecvAll Introduce virStreamSparseSendAll Introduce virStreamInData virNetClientStreamNew: Track origin stream Add new flag to daemonCreateClientStream and virNetClientStreamNew RPC: Introduce virNetStreamHole Introduce VIR_NET_STREAM_HOLE message type Teach wireshark plugin about VIR_NET_STREAM_HOLE daemon: Introduce virNetServerProgramSendStreamHole virnetclientstream: Introduce virNetClientStreamSendHole daemon: Implement VIR_NET_STREAM_HOLE handling virnetclientstream: Introduce virNetClientStreamHandleHole remote_driver: Implement virStreamSendHole virNetClientStreamRecvPacket: Introduce @flags argument Introduce virNetClientStreamRecvHole remote: Implement virStreamRecvHole virNetClientStream: Wire up VIR_NET_STREAM_HOLE remote_driver: Implement VIR_STREAM_RECV_STOP_AT_HOLE daemonStreamHandleRead: Wire up seekable stream fdstream: Implement sparse stream gendispatch: Introduce @sparseflag for our calls Introduce virStorageVol{Download,Upload}Flags virsh: Implement sparse stream to vol-download virsh: Implement sparse stream to vol-upload
daemon/remote.c | 2 +- daemon/stream.c | 148 ++++++++- daemon/stream.h | 3 +- include/libvirt/libvirt-storage.h | 9 + include/libvirt/libvirt-stream.h | 115 ++++++- src/driver-stream.h | 25 ++ src/esx/esx_stream.c | 16 +- src/libvirt-storage.c | 4 +- src/libvirt-stream.c | 526 ++++++++++++++++++++++++++++++ src/libvirt_internal.h | 4 + src/libvirt_private.syms | 2 + src/libvirt_public.syms | 9 + src/libvirt_remote.syms | 3 + src/remote/remote_driver.c | 99 +++++- src/remote/remote_protocol.x | 2 + src/rpc/gendispatch.pl | 21 +- src/rpc/virnetclient.c | 1 + src/rpc/virnetclientstream.c | 238 +++++++++++++- src/rpc/virnetclientstream.h | 18 +- src/rpc/virnetprotocol.x | 17 +- src/rpc/virnetserverprogram.c | 35 ++ src/rpc/virnetserverprogram.h | 8 + src/storage/storage_driver.c | 4 +- src/storage/storage_util.c | 10 +- src/util/virfdstream.c | 609 +++++++++++++++++++++++++++++++---- src/util/virfdstream.h | 1 + src/util/virfile.c | 82 +++++ src/util/virfile.h | 4 + src/virnetprotocol-structs | 5 + tests/virfiletest.c | 203 ++++++++++++ tools/virsh-util.c | 65 ++++ tools/virsh-util.h | 29 ++ tools/virsh-volume.c | 50 ++- tools/virsh.pod | 6 +- tools/wireshark/src/packet-libvirt.c | 52 +++ tools/wireshark/src/packet-libvirt.h | 2 + 36 files changed, 2301 insertions(+), 126 deletions(-)
participants (2)
-
John Ferlan
-
Michal Privoznik