[libvirt] [PATCH 00/38] Implement sparse streams for libvirt

There were already some attempts to do this in the past, but neither of them was successful. Anyway, let me just show you how good these perform: 7.9G -rw-r--r-- 1 root root 21G Feb 15 14:04 /var/lib/libvirt/images/gentoo.qcow2 libvirt.git # time virsh vol-download --sparse /var/lib/libvirt/images/gentoo.qcow2 /mnt/floppy/gentoo.qcow2 real 0m41.627s user 0m3.880s sys 0m5.720s libvirt.git # time virsh vol-download /var/lib/libvirt/images/gentoo.qcow2 /mnt/floppy/gentoo.qcow2 real 2m22.357s user 1m20.590s sys 0m12.510s All the patches can be found on my github: https://github.com/zippy2/libvirt/tree/sparse_iohelper5 (Yes, weird name, but there were plenty of local attempts to implement this and I'm not throwing them away until the feature is merged) Michal Privoznik (38): fdstreamtest: Rename tempdir fdstreamtest: Print more info on read failure fdstream: s/struct virFDStreamData */virFDStreamDataPtr/ virFDStreamData: Turn into virObjectLockable virfdstream: Drop iohelper in favour of a thread virfdstream: Use messages instead of pipe iohelper: Remove unused mode util: Introduce virFileInData Introduce virStreamRecvFlags Implement virStreamRecvFlags to some drivers Introduce virStreamSkip Introduce virStreamHoleSize Introduce VIR_STREAM_RECV_STOP_AT_HOLE flag Introduce virStreamSparseRecvAll Introduce virStreamSparseSendAll Introduce virStreamInData virNetClientStreamNew: Track origin stream Track if stream is skippable RPC: Introduce virNetStreamSkip Introduce VIR_NET_STREAM_SKIP message type Teach wireshark plugin about VIR_NET_STREAM_SKIP daemon: Introduce virNetServerProgramSendStreamSkip virnetclientstream: Introduce virNetClientStreamSendSkip daemon: Implement VIR_NET_STREAM_SKIP handling virnetclientstream: Introduce virNetClientStreamHandleSkip remote_driver: Implement virStreamSkip virNetClientStreamRecvPacket: Introduce @flags argument Introduce virNetClientStreamHoleSize remote: Implement virStreamHoleSize virNetClientStream: Wire up VIR_NET_STREAM_SKIP remote_driver: Implement VIR_STREAM_RECV_STOP_AT_HOLE daemonStreamHandleRead: Wire up seekable stream daemon: Don't call virStreamInData so often fdstream: Implement sparse stream gendispatch: Introduce @sparseflag for our calls Introduce virStorageVol{Download,Upload}Flags virsh: Implement sparse stream to vol-download virsh: Implement sparse stream to vol-upload daemon/remote.c | 2 +- daemon/stream.c | 147 +++++- daemon/stream.h | 3 +- include/libvirt/libvirt-storage.h | 9 + include/libvirt/libvirt-stream.h | 86 +++- src/driver-stream.h | 23 + src/esx/esx_stream.c | 16 +- src/libvirt-storage.c | 4 +- src/libvirt-stream.c | 453 ++++++++++++++++++ src/libvirt_internal.h | 3 + src/libvirt_private.syms | 2 + src/libvirt_public.syms | 9 + src/libvirt_remote.syms | 3 + src/remote/remote_driver.c | 89 +++- src/remote/remote_protocol.x | 2 + src/rpc/gendispatch.pl | 21 +- src/rpc/virnetclient.c | 1 + src/rpc/virnetclientstream.c | 202 +++++++- src/rpc/virnetclientstream.h | 17 +- src/rpc/virnetprotocol.x | 16 +- src/rpc/virnetserverprogram.c | 33 ++ src/rpc/virnetserverprogram.h | 7 + src/storage/storage_driver.c | 4 +- src/storage/storage_util.c | 10 +- src/util/iohelper.c | 72 +-- src/util/virfdstream.c | 888 ++++++++++++++++++++++++++++------- src/util/virfdstream.h | 2 +- src/util/virfile.c | 81 ++++ src/util/virfile.h | 3 + src/virnetprotocol-structs | 4 + tests/fdstreamtest.c | 12 +- tests/virfiletest.c | 203 ++++++++ tools/virsh-util.c | 49 ++ tools/virsh-util.h | 24 + tools/virsh-volume.c | 49 +- tools/virsh.pod | 6 +- tools/wireshark/src/packet-libvirt.c | 48 ++ tools/wireshark/src/packet-libvirt.h | 2 + 38 files changed, 2284 insertions(+), 321 deletions(-) -- 2.10.2

Because of copy-paste the temporary directory used for this test is called "fakesysdir". That's probably misleading. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- tests/fdstreamtest.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/fdstreamtest.c b/tests/fdstreamtest.c index 4605845..5e82dac 100644 --- a/tests/fdstreamtest.c +++ b/tests/fdstreamtest.c @@ -314,7 +314,7 @@ static int testFDStreamWriteNonblock(const void *data) return testFDStreamWriteCommon(data, false); } -#define SCRATCHDIRTEMPLATE abs_builddir "/fakesysfsdir-XXXXXX" +#define SCRATCHDIRTEMPLATE abs_builddir "/fdstreamdir-XXXXXX" static int mymain(void) @@ -323,7 +323,7 @@ mymain(void) int ret = 0; if (!mkdtemp(scratchdir)) { - virFilePrintf(stderr, "Cannot create fakesysfsdir"); + virFilePrintf(stderr, "Cannot create fdstreamdir"); abort(); } -- 2.10.2

It helps with debugging if we know what's the return value of saferead(). Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- tests/fdstreamtest.c | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/tests/fdstreamtest.c b/tests/fdstreamtest.c index 5e82dac..68a5bc3 100644 --- a/tests/fdstreamtest.c +++ b/tests/fdstreamtest.c @@ -250,14 +250,16 @@ static int testFDStreamWriteCommon(const char *scratchdir, bool blocking) goto cleanup; for (i = 0; i < 10; i++) { - size_t want; + size_t want, got; if (i == 9) want = PATTERN_LEN / 2; else want = PATTERN_LEN; - if (saferead(fd, buf, want) != want) { - virFilePrintf(stderr, "Short read from data\n"); + if ((got = saferead(fd, buf, want)) != want) { + virFilePrintf(stderr, + "Short read from data, i=%zu got=%zu want=%zu\n", + i, got, want); goto cleanup; } -- 2.10.2

There is really no reason why we should have to have 'struct' everywhere. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/util/virfdstream.c | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/src/util/virfdstream.c b/src/util/virfdstream.c index 75b69b6..e6670c5 100644 --- a/src/util/virfdstream.c +++ b/src/util/virfdstream.c @@ -50,6 +50,8 @@ VIR_LOG_INIT("fdstream"); /* Tunnelled migration stream support */ +typedef struct virFDStreamData virFDStreamData; +typedef virFDStreamData *virFDStreamDataPtr; struct virFDStreamData { int fd; int errfd; @@ -82,7 +84,7 @@ struct virFDStreamData { static int virFDStreamRemoveCallback(virStreamPtr stream) { - struct virFDStreamData *fdst = stream->privateData; + virFDStreamDataPtr fdst = stream->privateData; int ret = -1; if (!fdst) { @@ -119,7 +121,7 @@ static int virFDStreamRemoveCallback(virStreamPtr stream) static int virFDStreamUpdateCallback(virStreamPtr stream, int events) { - struct virFDStreamData *fdst = stream->privateData; + virFDStreamDataPtr fdst = stream->privateData; int ret = -1; if (!fdst) { @@ -151,7 +153,7 @@ static void virFDStreamEvent(int watch ATTRIBUTE_UNUSED, void *opaque) { virStreamPtr stream = opaque; - struct virFDStreamData *fdst = stream->privateData; + virFDStreamDataPtr fdst = stream->privateData; virStreamEventCallback cb; void *cbopaque; virFreeCallback ff; @@ -200,7 +202,7 @@ virFDStreamAddCallback(virStreamPtr st, void *opaque, virFreeCallback ff) { - struct virFDStreamData *fdst = st->privateData; + virFDStreamDataPtr fdst = st->privateData; int ret = -1; if (!fdst) { @@ -242,7 +244,7 @@ virFDStreamAddCallback(virStreamPtr st, } static int -virFDStreamCloseCommand(struct virFDStreamData *fdst, bool streamAbort) +virFDStreamCloseCommand(virFDStreamDataPtr fdst, bool streamAbort) { char buf[1024]; ssize_t len; @@ -295,7 +297,7 @@ virFDStreamCloseCommand(struct virFDStreamData *fdst, bool streamAbort) static int virFDStreamCloseInt(virStreamPtr st, bool streamAbort) { - struct virFDStreamData *fdst; + virFDStreamDataPtr fdst; virStreamEventCallback cb; void *opaque; int ret; @@ -378,7 +380,7 @@ virFDStreamAbort(virStreamPtr st) static int virFDStreamWrite(virStreamPtr st, const char *bytes, size_t nbytes) { - struct virFDStreamData *fdst = st->privateData; + virFDStreamDataPtr fdst = st->privateData; int ret; if (nbytes > INT_MAX) { @@ -432,7 +434,7 @@ static int virFDStreamWrite(virStreamPtr st, const char *bytes, size_t nbytes) static int virFDStreamRead(virStreamPtr st, char *bytes, size_t nbytes) { - struct virFDStreamData *fdst = st->privateData; + virFDStreamDataPtr fdst = st->privateData; int ret; if (nbytes > INT_MAX) { @@ -498,7 +500,7 @@ static int virFDStreamOpenInternal(virStreamPtr st, int errfd, unsigned long long length) { - struct virFDStreamData *fdst; + virFDStreamDataPtr fdst; VIR_DEBUG("st=%p fd=%d cmd=%p errfd=%d length=%llu", st, fd, cmd, errfd, length); @@ -754,7 +756,7 @@ int virFDStreamOpenPTY(virStreamPtr st, unsigned long long length, int oflags) { - struct virFDStreamData *fdst = NULL; + virFDStreamDataPtr fdst = NULL; struct termios rawattr; if (virFDStreamOpenFileInternal(st, path, @@ -817,7 +819,7 @@ int virFDStreamSetInternalCloseCb(virStreamPtr st, void *opaque, virFDStreamInternalCloseCbFreeOpaque fcb) { - struct virFDStreamData *fdst = st->privateData; + virFDStreamDataPtr fdst = st->privateData; virMutexLock(&fdst->lock); -- 2.10.2

While this is no functional change, it makes the code look a bit nicer. Moreover, it prepares ground for future work. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/util/virfdstream.c | 97 +++++++++++++++++++++++++++++--------------------- 1 file changed, 57 insertions(+), 40 deletions(-) diff --git a/src/util/virfdstream.c b/src/util/virfdstream.c index e6670c5..9a4a7ff 100644 --- a/src/util/virfdstream.c +++ b/src/util/virfdstream.c @@ -53,6 +53,8 @@ VIR_LOG_INIT("fdstream"); typedef struct virFDStreamData virFDStreamData; typedef virFDStreamData *virFDStreamDataPtr; struct virFDStreamData { + virObjectLockable parent; + int fd; int errfd; virCommandPtr cmd; @@ -77,10 +79,31 @@ struct virFDStreamData { virFDStreamInternalCloseCb icbCb; virFDStreamInternalCloseCbFreeOpaque icbFreeOpaque; void *icbOpaque; - - virMutex lock; }; +static virClassPtr virFDStreamDataClass; + +static void +virFDStreamDataDispose(void *obj) +{ + virFDStreamDataPtr fdst = obj; + + VIR_DEBUG("obj=%p", fdst); +} + +static int virFDStreamDataOnceInit(void) +{ + if (!(virFDStreamDataClass = virClassNew(virClassForObjectLockable(), + "virFDStreamData", + sizeof(virFDStreamData), + virFDStreamDataDispose))) + return -1; + + return 0; +} + +VIR_ONCE_GLOBAL_INIT(virFDStreamData) + static int virFDStreamRemoveCallback(virStreamPtr stream) { @@ -93,7 +116,7 @@ static int virFDStreamRemoveCallback(virStreamPtr stream) return -1; } - virMutexLock(&fdst->lock); + virObjectLock(fdst); if (fdst->watch == 0) { virReportError(VIR_ERR_INTERNAL_ERROR, "%s", _("stream does not have a callback registered")); @@ -115,7 +138,7 @@ static int virFDStreamRemoveCallback(virStreamPtr stream) ret = 0; cleanup: - virMutexUnlock(&fdst->lock); + virObjectUnlock(fdst); return ret; } @@ -130,7 +153,7 @@ static int virFDStreamUpdateCallback(virStreamPtr stream, int events) return -1; } - virMutexLock(&fdst->lock); + virObjectLock(fdst); if (fdst->watch == 0) { virReportError(VIR_ERR_INTERNAL_ERROR, "%s", _("stream does not have a callback registered")); @@ -143,7 +166,7 @@ static int virFDStreamUpdateCallback(virStreamPtr stream, int events) ret = 0; cleanup: - virMutexUnlock(&fdst->lock); + virObjectUnlock(fdst); return ret; } @@ -162,9 +185,9 @@ static void virFDStreamEvent(int watch ATTRIBUTE_UNUSED, if (!fdst) return; - virMutexLock(&fdst->lock); + virObjectLock(fdst); if (!fdst->cb) { - virMutexUnlock(&fdst->lock); + virObjectUnlock(fdst); return; } @@ -172,21 +195,19 @@ static void virFDStreamEvent(int watch ATTRIBUTE_UNUSED, cbopaque = fdst->opaque; ff = fdst->ff; fdst->dispatching = true; - virMutexUnlock(&fdst->lock); + virObjectUnlock(fdst); cb(stream, events, cbopaque); - virMutexLock(&fdst->lock); + virObjectLock(fdst); fdst->dispatching = false; if (fdst->cbRemoved && ff) (ff)(cbopaque); closed = fdst->closed; - virMutexUnlock(&fdst->lock); + virObjectUnlock(fdst); - if (closed) { - virMutexDestroy(&fdst->lock); - VIR_FREE(fdst); - } + if (closed) + virObjectUnref(fdst); } static void virFDStreamCallbackFree(void *opaque) @@ -211,7 +232,7 @@ virFDStreamAddCallback(virStreamPtr st, return -1; } - virMutexLock(&fdst->lock); + virObjectLock(fdst); if (fdst->watch != 0) { virReportError(VIR_ERR_INTERNAL_ERROR, "%s", _("stream already has a callback registered")); @@ -239,7 +260,7 @@ virFDStreamAddCallback(virStreamPtr st, ret = 0; cleanup: - virMutexUnlock(&fdst->lock); + virObjectUnlock(fdst); return ret; } @@ -307,7 +328,7 @@ virFDStreamCloseInt(virStreamPtr st, bool streamAbort) if (!st || !(fdst = st->privateData) || fdst->abortCallbackDispatching) return 0; - virMutexLock(&fdst->lock); + virObjectLock(fdst); /* aborting the stream, ensure the callback is called if it's * registered for stream error event */ @@ -317,7 +338,7 @@ virFDStreamCloseInt(virStreamPtr st, bool streamAbort) VIR_STREAM_EVENT_WRITABLE))) { /* don't enter this function accidentally from the callback again */ if (fdst->abortCallbackCalled) { - virMutexUnlock(&fdst->lock); + virObjectUnlock(fdst); return 0; } @@ -327,12 +348,12 @@ virFDStreamCloseInt(virStreamPtr st, bool streamAbort) /* cache the pointers */ cb = fdst->cb; opaque = fdst->opaque; - virMutexUnlock(&fdst->lock); + virObjectUnlock(fdst); /* call failure callback, poll reports nothing on closed fd */ (cb)(st, VIR_STREAM_EVENT_ERROR, opaque); - virMutexLock(&fdst->lock); + virObjectLock(fdst); fdst->abortCallbackDispatching = false; } @@ -356,11 +377,10 @@ virFDStreamCloseInt(virStreamPtr st, bool streamAbort) if (fdst->dispatching) { fdst->closed = true; - virMutexUnlock(&fdst->lock); + virObjectUnlock(fdst); } else { - virMutexUnlock(&fdst->lock); - virMutexDestroy(&fdst->lock); - VIR_FREE(fdst); + virObjectUnlock(fdst); + virObjectUnref(fdst); } return ret; @@ -395,13 +415,13 @@ static int virFDStreamWrite(virStreamPtr st, const char *bytes, size_t nbytes) return -1; } - virMutexLock(&fdst->lock); + virObjectLock(fdst); if (fdst->length) { if (fdst->length == fdst->offset) { virReportSystemError(ENOSPC, "%s", _("cannot write to stream")); - virMutexUnlock(&fdst->lock); + virObjectUnlock(fdst); return -1; } @@ -427,7 +447,7 @@ static int virFDStreamWrite(virStreamPtr st, const char *bytes, size_t nbytes) fdst->offset += ret; } - virMutexUnlock(&fdst->lock); + virObjectUnlock(fdst); return ret; } @@ -449,11 +469,11 @@ static int virFDStreamRead(virStreamPtr st, char *bytes, size_t nbytes) return -1; } - virMutexLock(&fdst->lock); + virObjectLock(fdst); if (fdst->length) { if (fdst->length == fdst->offset) { - virMutexUnlock(&fdst->lock); + virObjectUnlock(fdst); return 0; } @@ -479,7 +499,7 @@ static int virFDStreamRead(virStreamPtr st, char *bytes, size_t nbytes) fdst->offset += ret; } - virMutexUnlock(&fdst->lock); + virObjectUnlock(fdst); return ret; } @@ -505,25 +525,22 @@ static int virFDStreamOpenInternal(virStreamPtr st, VIR_DEBUG("st=%p fd=%d cmd=%p errfd=%d length=%llu", st, fd, cmd, errfd, length); + if (virFDStreamDataInitialize() < 0) + return -1; + if ((st->flags & VIR_STREAM_NONBLOCK) && virSetNonBlock(fd) < 0) { virReportSystemError(errno, "%s", _("Unable to set non-blocking mode")); return -1; } - if (VIR_ALLOC(fdst) < 0) + if (!(fdst = virObjectLockableNew(virFDStreamDataClass))) return -1; fdst->fd = fd; fdst->cmd = cmd; fdst->errfd = errfd; fdst->length = length; - if (virMutexInit(&fdst->lock) < 0) { - VIR_FREE(fdst); - virReportError(VIR_ERR_INTERNAL_ERROR, "%s", - _("Unable to initialize mutex")); - return -1; - } st->driver = &virFDStreamDrv; st->privateData = fdst; @@ -821,7 +838,7 @@ int virFDStreamSetInternalCloseCb(virStreamPtr st, { virFDStreamDataPtr fdst = st->privateData; - virMutexLock(&fdst->lock); + virObjectLock(fdst); if (fdst->icbFreeOpaque) (fdst->icbFreeOpaque)(fdst->icbOpaque); @@ -830,6 +847,6 @@ int virFDStreamSetInternalCloseCb(virStreamPtr st, fdst->icbOpaque = opaque; fdst->icbFreeOpaque = fcb; - virMutexUnlock(&fdst->lock); + virObjectUnlock(fdst); return 0; } -- 2.10.2

Currently we use iohelper for virFDStream implementation. This is because UNIX I/O can lie sometimes: even though a FD for a file/block device is set as unblocking, actual read()/write() can block. To avoid this, a pipe is created and one end is kept for read/write while the other is handed over to iohelper to write/read the data for us. Thus it's iohelper which gets blocked and not our event loop. This approach has two problems: 1) we are spawning a new process. 2) any exchange of information between daemon and iohelper can be done only through the pipe. Therefore, iohelper is replaced with an implementation in thread which is created just for the stream lifetime. The data are still transferred through pipe (for now), but both problems described above are solved. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/util/virfdstream.c | 233 +++++++++++++++++++++++++++++++------------------ src/util/virfdstream.h | 1 - 2 files changed, 150 insertions(+), 84 deletions(-) diff --git a/src/util/virfdstream.c b/src/util/virfdstream.c index 9a4a7ff..4efc65d 100644 --- a/src/util/virfdstream.c +++ b/src/util/virfdstream.c @@ -56,8 +56,6 @@ struct virFDStreamData { virObjectLockable parent; int fd; - int errfd; - virCommandPtr cmd; unsigned long long offset; unsigned long long length; @@ -79,6 +77,11 @@ struct virFDStreamData { virFDStreamInternalCloseCb icbCb; virFDStreamInternalCloseCbFreeOpaque icbFreeOpaque; void *icbOpaque; + + /* Thread data */ + virThreadPtr thread; + int threadErr; + bool threadQuit; }; static virClassPtr virFDStreamDataClass; @@ -264,57 +267,123 @@ virFDStreamAddCallback(virStreamPtr st, return ret; } + +typedef struct _virFDStreamThreadData virFDStreamThreadData; +typedef virFDStreamThreadData *virFDStreamThreadDataPtr; +struct _virFDStreamThreadData { + virStreamPtr st; + size_t length; + int fdin; + char *fdinname; + int fdout; + char *fdoutname; +}; + + +static void +virFDStreamThreadDataFree(virFDStreamThreadDataPtr data) +{ + if (!data) + return; + + virObjectUnref(data->st); + VIR_FREE(data->fdinname); + VIR_FREE(data->fdoutname); + VIR_FORCE_CLOSE(data->fdin); + VIR_FORCE_CLOSE(data->fdout); + VIR_FREE(data); +} + + +static void +virFDStreamThread(void *opaque) +{ + virFDStreamThreadDataPtr data = opaque; + virStreamPtr st = data->st; + size_t length = data->length; + int fdin = data->fdin; + char *fdinname = data->fdinname; + int fdout = data->fdout; + char *fdoutname = data->fdoutname; + virFDStreamDataPtr fdst = st->privateData; + char *buf = NULL; + size_t buflen = 256 * 1024; + size_t total = 0; + + virObjectRef(fdst); + + if (VIR_ALLOC_N(buf, buflen) < 0) + goto error; + + while (1) { + ssize_t got; + + if (length && + (length - total) < buflen) + buflen = length - total; + + if (buflen == 0) + break; /* End of requested data from client */ + + if ((got = saferead(fdin, buf, buflen)) < 0) { + virReportSystemError(errno, + _("Unable to read %s"), + fdinname); + goto error; + } + + if (got == 0) + break; + + total += got; + + if (safewrite(fdout, buf, got) < 0) { + virReportSystemError(errno, + _("Unable to write %s"), + fdoutname); + goto error; + } + } + + cleanup: + if (!virObjectUnref(fdst)) + st->privateData = NULL; + virFDStreamThreadDataFree(data); + VIR_FREE(buf); + return; + + error: + virObjectLock(fdst); + fdst->threadErr = errno; + virObjectUnlock(fdst); + goto cleanup; +} + + static int -virFDStreamCloseCommand(virFDStreamDataPtr fdst, bool streamAbort) +virFDStreamJoinWorker(virFDStreamDataPtr fdst, bool streamAbort) { - char buf[1024]; - ssize_t len; - int status; int ret = -1; - - if (!fdst->cmd) + if (!fdst->thread) return 0; - if ((len = saferead(fdst->errfd, buf, sizeof(buf)-1)) < 0) - buf[0] = '\0'; - else - buf[len] = '\0'; + /* Give the thread a chance to lock the FD stream object. */ + virObjectUnlock(fdst); + virThreadJoin(fdst->thread); + virObjectLock(fdst); - virCommandRawStatus(fdst->cmd); - if (virCommandWait(fdst->cmd, &status) < 0) - goto cleanup; - - if (status != 0) { - if (buf[0] != '\0') { - virReportError(VIR_ERR_INTERNAL_ERROR, "%s", buf); - } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGPIPE) { - if (streamAbort) { - /* Explicit abort request means the caller doesn't care - if there's data left over, so skip the error */ - goto out; - } - - virReportError(VIR_ERR_INTERNAL_ERROR, "%s", - _("I/O helper exited " - "before all data was processed")); - } else { - char *str = virProcessTranslateStatus(status); - virReportError(VIR_ERR_INTERNAL_ERROR, - _("I/O helper exited with %s"), - NULLSTR(str)); - VIR_FREE(str); - } + if (fdst->threadErr && !streamAbort) { + /* errors are expected on streamAbort */ goto cleanup; } - out: ret = 0; cleanup: - virCommandFree(fdst->cmd); - fdst->cmd = NULL; + VIR_FREE(fdst->thread); return ret; } + static int virFDStreamCloseInt(virStreamPtr st, bool streamAbort) { @@ -359,12 +428,9 @@ virFDStreamCloseInt(virStreamPtr st, bool streamAbort) /* mutex locked */ ret = VIR_CLOSE(fdst->fd); - if (virFDStreamCloseCommand(fdst, streamAbort) < 0) + if (virFDStreamJoinWorker(fdst, streamAbort) < 0) ret = -1; - if (VIR_CLOSE(fdst->errfd) < 0) - VIR_DEBUG("ignoring failed close on fd %d", fdst->errfd); - st->privateData = NULL; /* call the internal stream closing callback */ @@ -516,14 +582,13 @@ static virStreamDriver virFDStreamDrv = { static int virFDStreamOpenInternal(virStreamPtr st, int fd, - virCommandPtr cmd, - int errfd, + virFDStreamThreadDataPtr threadData, unsigned long long length) { virFDStreamDataPtr fdst; - VIR_DEBUG("st=%p fd=%d cmd=%p errfd=%d length=%llu", - st, fd, cmd, errfd, length); + VIR_DEBUG("st=%p fd=%d threadData=%p length=%llu", + st, fd, threadData, length); if (virFDStreamDataInitialize() < 0) return -1; @@ -538,21 +603,39 @@ static int virFDStreamOpenInternal(virStreamPtr st, return -1; fdst->fd = fd; - fdst->cmd = cmd; - fdst->errfd = errfd; fdst->length = length; st->driver = &virFDStreamDrv; st->privateData = fdst; + if (threadData) { + /* Create the thread after fdst and st were initialized. + * The thread worker expects them to be that way. */ + if (VIR_ALLOC(fdst->thread) < 0) + goto error; + + if (virThreadCreate(fdst->thread, + true, + virFDStreamThread, + threadData) < 0) + goto error; + } + return 0; + + error: + VIR_FREE(fdst->thread); + st->driver = NULL; + st->privateData = NULL; + virObjectUnref(fdst); + return -1; } int virFDStreamOpen(virStreamPtr st, int fd) { - return virFDStreamOpenInternal(st, fd, NULL, -1, 0); + return virFDStreamOpenInternal(st, fd, NULL, 0); } @@ -598,7 +681,7 @@ int virFDStreamConnectUNIX(virStreamPtr st, goto error; } - if (virFDStreamOpenInternal(st, fd, NULL, -1, 0) < 0) + if (virFDStreamOpenInternal(st, fd, NULL, 0) < 0) goto error; return 0; @@ -627,11 +710,8 @@ virFDStreamOpenFileInternal(virStreamPtr st, bool forceIOHelper) { int fd = -1; - int childfd = -1; struct stat sb; - virCommandPtr cmd = NULL; - int errfd = -1; - char *iohelper_path = NULL; + virFDStreamThreadDataPtr threadData = NULL; VIR_DEBUG("st=%p path=%s oflags=%x offset=%llu length=%llu mode=%o", st, path, oflags, offset, length, mode); @@ -687,52 +767,39 @@ virFDStreamOpenFileInternal(virStreamPtr st, goto error; } - if (!(iohelper_path = virFileFindResource("libvirt_iohelper", - abs_topbuilddir "/src", - LIBEXECDIR))) + if (VIR_ALLOC(threadData) < 0) goto error; - cmd = virCommandNewArgList(iohelper_path, - path, - NULL); - - VIR_FREE(iohelper_path); - - virCommandAddArgFormat(cmd, "%llu", length); - virCommandPassFD(cmd, fd, - VIR_COMMAND_PASS_FD_CLOSE_PARENT); - virCommandAddArgFormat(cmd, "%d", fd); + threadData->st = virObjectRef(st); + threadData->length = length; if ((oflags & O_ACCMODE) == O_RDONLY) { - childfd = fds[1]; + threadData->fdin = fd; + threadData->fdout = fds[1]; + if (VIR_STRDUP(threadData->fdinname, path) < 0 || + VIR_STRDUP(threadData->fdoutname, "pipe") < 0) + goto error; fd = fds[0]; - virCommandSetOutputFD(cmd, &childfd); } else { - childfd = fds[0]; + threadData->fdin = fds[0]; + threadData->fdout = fd; + if (VIR_STRDUP(threadData->fdinname, "pipe") < 0 || + VIR_STRDUP(threadData->fdoutname, path) < 0) + goto error; fd = fds[1]; - virCommandSetInputFD(cmd, childfd); } - virCommandSetErrorFD(cmd, &errfd); - - if (virCommandRunAsync(cmd, NULL) < 0) - goto error; - - VIR_FORCE_CLOSE(childfd); } - if (virFDStreamOpenInternal(st, fd, cmd, errfd, length) < 0) + if (virFDStreamOpenInternal(st, fd, threadData, length) < 0) goto error; return 0; error: - virCommandFree(cmd); VIR_FORCE_CLOSE(fd); - VIR_FORCE_CLOSE(childfd); - VIR_FORCE_CLOSE(errfd); - VIR_FREE(iohelper_path); if (oflags & O_CREAT) unlink(path); + virFDStreamThreadDataFree(threadData); return -1; } diff --git a/src/util/virfdstream.h b/src/util/virfdstream.h index 32a741e..34c4c3f 100644 --- a/src/util/virfdstream.h +++ b/src/util/virfdstream.h @@ -24,7 +24,6 @@ # define __VIR_FDSTREAM_H_ # include "internal.h" -# include "vircommand.h" /* internal callback, the generic one is used up by daemon stream driver */ /* the close callback is called with fdstream private data locked */ -- 2.10.2

One big downside of using the pipe to transfer the data is that we can really transfer just bare data. No metadata can be carried through unless some formatted messages are introduced. That would be quite painful to achieve so let's use a message queue. It's fairly easy to exchange info between threads now that iohelper is no longer used. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/util/virfdstream.c | 392 ++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 321 insertions(+), 71 deletions(-) diff --git a/src/util/virfdstream.c b/src/util/virfdstream.c index 4efc65d..efd9199 100644 --- a/src/util/virfdstream.c +++ b/src/util/virfdstream.c @@ -49,6 +49,27 @@ VIR_LOG_INIT("fdstream"); +typedef enum { + VIR_FDSTREAM_MSG_TYPE_DATA, +} virFDStreamMsgType; + +typedef struct _virFDStreamMsg virFDStreamMsg; +typedef virFDStreamMsg *virFDStreamMsgPtr; +struct _virFDStreamMsg { + virFDStreamMsgPtr next; + + virFDStreamMsgType type; + + union { + struct { + char *buf; + size_t len; + size_t offset; + } data; + } stream; +}; + + /* Tunnelled migration stream support */ typedef struct virFDStreamData virFDStreamData; typedef virFDStreamData *virFDStreamDataPtr; @@ -80,18 +101,25 @@ struct virFDStreamData { /* Thread data */ virThreadPtr thread; + virCond threadCond; int threadErr; bool threadQuit; + bool threadAbort; + bool threadDoRead; + virFDStreamMsgPtr msg; }; static virClassPtr virFDStreamDataClass; +static void virFDStreamMsgQueueFree(virFDStreamMsgPtr *queue); + static void virFDStreamDataDispose(void *obj) { virFDStreamDataPtr fdst = obj; VIR_DEBUG("obj=%p", fdst); + virFDStreamMsgQueueFree(&fdst->msg); } static int virFDStreamDataOnceInit(void) @@ -108,6 +136,66 @@ static int virFDStreamDataOnceInit(void) VIR_ONCE_GLOBAL_INIT(virFDStreamData) +static void +virFDStreamMsgQueuePush(virFDStreamDataPtr fdst, + virFDStreamMsgPtr msg) +{ + virFDStreamMsgPtr *tmp = &fdst->msg; + + while (*tmp) + tmp = &(*tmp)->next; + + *tmp = msg; + virCondSignal(&fdst->threadCond); +} + + +static virFDStreamMsgPtr +virFDStreamMsgQueuePop(virFDStreamDataPtr fdst) +{ + virFDStreamMsgPtr tmp = fdst->msg; + + if (tmp) { + fdst->msg = tmp->next; + tmp->next = NULL; + } + + virCondSignal(&fdst->threadCond); + return tmp; +} + + +static void +virFDStreamMsgFree(virFDStreamMsgPtr msg) +{ + if (!msg) + return; + + switch (msg->type) { + case VIR_FDSTREAM_MSG_TYPE_DATA: + VIR_FREE(msg->stream.data.buf); + break; + } + + VIR_FREE(msg); +} + + +static void +virFDStreamMsgQueueFree(virFDStreamMsgPtr *queue) +{ + virFDStreamMsgPtr tmp = *queue; + + while (tmp) { + virFDStreamMsgPtr next = tmp->next; + virFDStreamMsgFree(tmp); + tmp = next; + } + + *queue = NULL; +} + + static int virFDStreamRemoveCallback(virStreamPtr stream) { virFDStreamDataPtr fdst = stream->privateData; @@ -289,12 +377,86 @@ virFDStreamThreadDataFree(virFDStreamThreadDataPtr data) virObjectUnref(data->st); VIR_FREE(data->fdinname); VIR_FREE(data->fdoutname); - VIR_FORCE_CLOSE(data->fdin); - VIR_FORCE_CLOSE(data->fdout); VIR_FREE(data); } +static ssize_t +virFDStreamThreadDoRead(virFDStreamDataPtr fdst, + const int fdin, + const char *fdinname, + size_t buflen) +{ + virFDStreamMsgPtr msg = NULL; + char *buf = NULL; + ssize_t got; + + if (VIR_ALLOC(msg) < 0) + goto error; + + if (VIR_ALLOC_N(buf, buflen) < 0) + goto error; + + if ((got = saferead(fdin, buf, buflen)) < 0) { + virReportSystemError(errno, + _("Unable to read %s"), + fdinname); + goto error; + } + + msg->type = VIR_FDSTREAM_MSG_TYPE_DATA; + msg->stream.data.buf = buf; + msg->stream.data.len = got; + buf = NULL; + + virFDStreamMsgQueuePush(fdst, msg); + msg = NULL; + + return got; + + error: + VIR_FREE(buf); + virFDStreamMsgFree(msg); + return -1; +} + + +static ssize_t +virFDStreamThreadDoWrite(virFDStreamDataPtr fdst, + const int fdout, + const char *fdoutname) +{ + ssize_t got; + virFDStreamMsgPtr msg = fdst->msg; + bool pop = false; + + switch (msg->type) { + case VIR_FDSTREAM_MSG_TYPE_DATA: + got = safewrite(fdout, + msg->stream.data.buf + msg->stream.data.offset, + msg->stream.data.len - msg->stream.data.offset); + if (got < 0) { + virReportSystemError(errno, + _("Unable to write %s"), + fdoutname); + return -1; + } + + msg->stream.data.offset += got; + + pop = msg->stream.data.offset == msg->stream.data.len; + break; + } + + if (pop) { + virFDStreamMsgQueuePop(fdst); + virFDStreamMsgFree(msg); + } + + return got; +} + + static void virFDStreamThread(void *opaque) { @@ -306,14 +468,12 @@ virFDStreamThread(void *opaque) int fdout = data->fdout; char *fdoutname = data->fdoutname; virFDStreamDataPtr fdst = st->privateData; - char *buf = NULL; + bool doRead = fdst->threadDoRead; size_t buflen = 256 * 1024; size_t total = 0; virObjectRef(fdst); - - if (VIR_ALLOC_N(buf, buflen) < 0) - goto error; + virObjectLock(fdst); while (1) { ssize_t got; @@ -325,37 +485,49 @@ virFDStreamThread(void *opaque) if (buflen == 0) break; /* End of requested data from client */ - if ((got = saferead(fdin, buf, buflen)) < 0) { - virReportSystemError(errno, - _("Unable to read %s"), - fdinname); + while (doRead == (fdst->msg != NULL) && + !fdst->threadQuit) { + if (virCondWait(&fdst->threadCond, &fdst->parent.lock)) { + virReportSystemError(errno, "%s", + _("failed to wait on condition")); + goto error; + } + } + + if (fdst->threadQuit) { + /* If stream abort was requested, quit early. */ + if (fdst->threadAbort) + goto cleanup; + + /* Otherwise flush buffers and quit gracefully. */ + if (doRead == (fdst->msg != NULL)) + break; + } + + if (doRead) + got = virFDStreamThreadDoRead(fdst, fdin, fdinname, buflen); + else + got = virFDStreamThreadDoWrite(fdst, fdout, fdoutname); + + if (got < 0) goto error; - } if (got == 0) break; total += got; - - if (safewrite(fdout, buf, got) < 0) { - virReportSystemError(errno, - _("Unable to write %s"), - fdoutname); - goto error; - } } cleanup: + fdst->threadQuit = true; + virObjectUnlock(fdst); if (!virObjectUnref(fdst)) st->privateData = NULL; virFDStreamThreadDataFree(data); - VIR_FREE(buf); return; error: - virObjectLock(fdst); fdst->threadErr = errno; - virObjectUnlock(fdst); goto cleanup; } @@ -367,6 +539,10 @@ virFDStreamJoinWorker(virFDStreamDataPtr fdst, bool streamAbort) if (!fdst->thread) return 0; + fdst->threadAbort = streamAbort; + fdst->threadQuit = true; + virCondSignal(&fdst->threadCond); + /* Give the thread a chance to lock the FD stream object. */ virObjectUnlock(fdst); virThreadJoin(fdst->thread); @@ -380,6 +556,7 @@ virFDStreamJoinWorker(virFDStreamDataPtr fdst, bool streamAbort) ret = 0; cleanup: VIR_FREE(fdst->thread); + virCondDestroy(&fdst->threadCond); return ret; } @@ -426,11 +603,14 @@ virFDStreamCloseInt(virStreamPtr st, bool streamAbort) fdst->abortCallbackDispatching = false; } - /* mutex locked */ - ret = VIR_CLOSE(fdst->fd); if (virFDStreamJoinWorker(fdst, streamAbort) < 0) ret = -1; + /* mutex locked */ + if ((ret = VIR_CLOSE(fdst->fd)) < 0) + virReportSystemError(errno, "%s", + _("Unable to close")); + st->privateData = NULL; /* call the internal stream closing callback */ @@ -467,7 +647,8 @@ virFDStreamAbort(virStreamPtr st) static int virFDStreamWrite(virStreamPtr st, const char *bytes, size_t nbytes) { virFDStreamDataPtr fdst = st->privateData; - int ret; + virFDStreamMsgPtr msg = NULL; + int ret = -1; if (nbytes > INT_MAX) { virReportSystemError(ERANGE, "%s", @@ -495,25 +676,51 @@ static int virFDStreamWrite(virStreamPtr st, const char *bytes, size_t nbytes) nbytes = fdst->length - fdst->offset; } - retry: - ret = write(fdst->fd, bytes, nbytes); - if (ret < 0) { - VIR_WARNINGS_NO_WLOGICALOP_EQUAL_EXPR - if (errno == EAGAIN || errno == EWOULDBLOCK) { - VIR_WARNINGS_RESET - ret = -2; - } else if (errno == EINTR) { - goto retry; - } else { - ret = -1; - virReportSystemError(errno, "%s", + if (fdst->thread) { + char *buf; + + if (fdst->threadQuit) { + virReportSystemError(EBADF, "%s", _("cannot write to stream")); + return -1; + } + + if (VIR_ALLOC(msg) < 0 || + VIR_ALLOC_N(buf, nbytes) < 0) + goto cleanup; + + memcpy(buf, bytes, nbytes); + msg->type = VIR_FDSTREAM_MSG_TYPE_DATA; + msg->stream.data.buf = buf; + msg->stream.data.len = nbytes; + + virFDStreamMsgQueuePush(fdst, msg); + msg = NULL; + ret = nbytes; + } else { + retry: + ret = write(fdst->fd, bytes, nbytes); + if (ret < 0) { + VIR_WARNINGS_NO_WLOGICALOP_EQUAL_EXPR + if (errno == EAGAIN || errno == EWOULDBLOCK) { + VIR_WARNINGS_RESET + ret = -2; + } else if (errno == EINTR) { + goto retry; + } else { + ret = -1; + virReportSystemError(errno, "%s", + _("cannot write to stream")); + } } - } else if (fdst->length) { - fdst->offset += ret; } + if (fdst->length) + fdst->offset += ret; + + cleanup: virObjectUnlock(fdst); + virFDStreamMsgFree(msg); return ret; } @@ -521,7 +728,7 @@ static int virFDStreamWrite(virStreamPtr st, const char *bytes, size_t nbytes) static int virFDStreamRead(virStreamPtr st, char *bytes, size_t nbytes) { virFDStreamDataPtr fdst = st->privateData; - int ret; + int ret = -1; if (nbytes > INT_MAX) { virReportSystemError(ERANGE, "%s", @@ -547,24 +754,70 @@ static int virFDStreamRead(virStreamPtr st, char *bytes, size_t nbytes) nbytes = fdst->length - fdst->offset; } - retry: - ret = read(fdst->fd, bytes, nbytes); - if (ret < 0) { - VIR_WARNINGS_NO_WLOGICALOP_EQUAL_EXPR - if (errno == EAGAIN || errno == EWOULDBLOCK) { - VIR_WARNINGS_RESET - ret = -2; - } else if (errno == EINTR) { - goto retry; - } else { - ret = -1; - virReportSystemError(errno, "%s", - _("cannot read from stream")); + if (fdst->thread) { + virFDStreamMsgPtr msg = NULL; + + while (!(msg = fdst->msg)) { + if (fdst->threadQuit) { + if (nbytes) { + virReportSystemError(EBADF, "%s", + _("stream is not open")); + } else { + ret = 0; + } + goto cleanup; + } else { + virObjectUnlock(fdst); + virCondSignal(&fdst->threadCond); + virObjectLock(fdst); + } + } + + if (msg->type != VIR_FDSTREAM_MSG_TYPE_DATA) { + /* Nope, nope, I'm outta here */ + virReportError(VIR_ERR_INTERNAL_ERROR, "%s", + _("unexpected message type")); + goto cleanup; + } + + if (nbytes > msg->stream.data.len - msg->stream.data.offset) + nbytes = msg->stream.data.len - msg->stream.data.offset; + + memcpy(bytes, + msg->stream.data.buf + msg->stream.data.offset, + nbytes); + + msg->stream.data.offset += nbytes; + if (msg->stream.data.offset == msg->stream.data.len) { + virFDStreamMsgQueuePop(fdst); + virFDStreamMsgFree(msg); + } + + ret = nbytes; + + } else { + retry: + ret = read(fdst->fd, bytes, nbytes); + if (ret < 0) { + VIR_WARNINGS_NO_WLOGICALOP_EQUAL_EXPR + if (errno == EAGAIN || errno == EWOULDBLOCK) { + VIR_WARNINGS_RESET + ret = -2; + } else if (errno == EINTR) { + goto retry; + } else { + ret = -1; + virReportSystemError(errno, "%s", + _("cannot read from stream")); + } + goto cleanup; } - } else if (fdst->length) { - fdst->offset += ret; } + if (fdst->length) + fdst->offset += ret; + + cleanup: virObjectUnlock(fdst); return ret; } @@ -593,7 +846,7 @@ static int virFDStreamOpenInternal(virStreamPtr st, if (virFDStreamDataInitialize() < 0) return -1; - if ((st->flags & VIR_STREAM_NONBLOCK) && + if ((st->flags & VIR_STREAM_NONBLOCK) && !threadData && virSetNonBlock(fd) < 0) { virReportSystemError(errno, "%s", _("Unable to set non-blocking mode")); return -1; @@ -609,11 +862,20 @@ static int virFDStreamOpenInternal(virStreamPtr st, st->privateData = fdst; if (threadData) { + /* The thread is going to do reads if fdin is set and fdout is not. */ + fdst->threadDoRead = threadData->fdout == -1; + /* Create the thread after fdst and st were initialized. * The thread worker expects them to be that way. */ if (VIR_ALLOC(fdst->thread) < 0) goto error; + if (virCondInit(&fdst->threadCond) < 0) { + virReportSystemError(errno, "%s", + _("cannot initialize condition variable")); + goto error; + } + if (virThreadCreate(fdst->thread, true, virFDStreamThread, @@ -752,8 +1014,6 @@ virFDStreamOpenFileInternal(virStreamPtr st, if ((st->flags & VIR_STREAM_NONBLOCK) && ((!S_ISCHR(sb.st_mode) && !S_ISFIFO(sb.st_mode)) || forceIOHelper)) { - int fds[2] = { -1, -1 }; - if ((oflags & O_ACCMODE) == O_RDWR) { virReportError(VIR_ERR_INTERNAL_ERROR, _("%s: Cannot request read and write flags together"), @@ -761,12 +1021,6 @@ virFDStreamOpenFileInternal(virStreamPtr st, goto error; } - if (pipe(fds) < 0) { - virReportSystemError(errno, "%s", - _("Unable to create pipe")); - goto error; - } - if (VIR_ALLOC(threadData) < 0) goto error; @@ -775,18 +1029,14 @@ virFDStreamOpenFileInternal(virStreamPtr st, if ((oflags & O_ACCMODE) == O_RDONLY) { threadData->fdin = fd; - threadData->fdout = fds[1]; - if (VIR_STRDUP(threadData->fdinname, path) < 0 || - VIR_STRDUP(threadData->fdoutname, "pipe") < 0) + threadData->fdout = -1; + if (VIR_STRDUP(threadData->fdinname, path) < 0) goto error; - fd = fds[0]; } else { - threadData->fdin = fds[0]; + threadData->fdin = -1; threadData->fdout = fd; - if (VIR_STRDUP(threadData->fdinname, "pipe") < 0 || - VIR_STRDUP(threadData->fdoutname, path) < 0) + if (VIR_STRDUP(threadData->fdoutname, path) < 0) goto error; - fd = fds[1]; } } -- 2.10.2

On Thu, Apr 13, 2017 at 03:31:14PM +0200, Michal Privoznik wrote:
One big downside of using the pipe to transfer the data is that we can really transfer just bare data. No metadata can be carried through unless some formatted messages are introduced. That would be quite painful to achieve so let's use a message queue. It's fairly easy to exchange info between threads now that iohelper is no longer used.
I'm not seeing how this works correctly with the event loop.
@@ -752,8 +1014,6 @@ virFDStreamOpenFileInternal(virStreamPtr st, if ((st->flags & VIR_STREAM_NONBLOCK) && ((!S_ISCHR(sb.st_mode) && !S_ISFIFO(sb.st_mode)) || forceIOHelper)) { - int fds[2] = { -1, -1 }; - if ((oflags & O_ACCMODE) == O_RDWR) { virReportError(VIR_ERR_INTERNAL_ERROR, _("%s: Cannot request read and write flags together"), @@ -761,12 +1021,6 @@ virFDStreamOpenFileInternal(virStreamPtr st, goto error; }
- if (pipe(fds) < 0) { - virReportSystemError(errno, "%s", - _("Unable to create pipe")); - goto error; - }
Here we previously created the pipe....
@@ -775,18 +1029,14 @@ virFDStreamOpenFileInternal(virStreamPtr st,
if ((oflags & O_ACCMODE) == O_RDONLY) { threadData->fdin = fd; - threadData->fdout = fds[1]; - if (VIR_STRDUP(threadData->fdinname, path) < 0 || - VIR_STRDUP(threadData->fdoutname, "pipe") < 0) + threadData->fdout = -1; + if (VIR_STRDUP(threadData->fdinname, path) < 0) goto error; - fd = fds[0];
And here we set 'fd' to be the pipe
} else { - threadData->fdin = fds[0]; + threadData->fdin = -1; threadData->fdout = fd; - if (VIR_STRDUP(threadData->fdinname, "pipe") < 0 || - VIR_STRDUP(threadData->fdoutname, path) < 0) + if (VIR_STRDUP(threadData->fdoutname, path) < 0) goto error; - fd = fds[1];
Likewise here
} }
...now here 'fd' is passed to virFDStreamOpenInternal() and is the thing that the event loop watches are registered against by virFDStreamAddCallback With this change 'fd' is the actual plain file the thread is reading to/from, so the callbacks are being registered against the plain file, not the pipe. poll/select on POSIX always reports plain files as readable/writable even when they would block. So with this change we're just going to busy loop in the main event thread even when we'll block, which defeats the whole purpose of having a iohelper and/or thread. Regards, Daniel -- |: http://berrange.com -o- http://www.flickr.com/photos/dberrange/ :| |: http://libvirt.org -o- http://virt-manager.org :| |: http://entangle-photo.org -o- http://search.cpan.org/~danberr/ :|

On 04/13/2017 03:55 PM, Daniel P. Berrange wrote:
On Thu, Apr 13, 2017 at 03:31:14PM +0200, Michal Privoznik wrote:
One big downside of using the pipe to transfer the data is that we can really transfer just bare data. No metadata can be carried through unless some formatted messages are introduced. That would be quite painful to achieve so let's use a message queue. It's fairly easy to exchange info between threads now that iohelper is no longer used.
I'm not seeing how this works correctly with the event loop.
@@ -752,8 +1014,6 @@ virFDStreamOpenFileInternal(virStreamPtr st, if ((st->flags & VIR_STREAM_NONBLOCK) && ((!S_ISCHR(sb.st_mode) && !S_ISFIFO(sb.st_mode)) || forceIOHelper)) { - int fds[2] = { -1, -1 }; - if ((oflags & O_ACCMODE) == O_RDWR) { virReportError(VIR_ERR_INTERNAL_ERROR, _("%s: Cannot request read and write flags together"), @@ -761,12 +1021,6 @@ virFDStreamOpenFileInternal(virStreamPtr st, goto error; }
- if (pipe(fds) < 0) { - virReportSystemError(errno, "%s", - _("Unable to create pipe")); - goto error; - }
Here we previously created the pipe....
@@ -775,18 +1029,14 @@ virFDStreamOpenFileInternal(virStreamPtr st,
if ((oflags & O_ACCMODE) == O_RDONLY) { threadData->fdin = fd; - threadData->fdout = fds[1]; - if (VIR_STRDUP(threadData->fdinname, path) < 0 || - VIR_STRDUP(threadData->fdoutname, "pipe") < 0) + threadData->fdout = -1; + if (VIR_STRDUP(threadData->fdinname, path) < 0) goto error; - fd = fds[0];
And here we set 'fd' to be the pipe
} else { - threadData->fdin = fds[0]; + threadData->fdin = -1; threadData->fdout = fd; - if (VIR_STRDUP(threadData->fdinname, "pipe") < 0 || - VIR_STRDUP(threadData->fdoutname, path) < 0) + if (VIR_STRDUP(threadData->fdoutname, path) < 0) goto error; - fd = fds[1];
Likewise here
} }
...now here 'fd' is passed to virFDStreamOpenInternal() and is the thing that the event loop watches are registered against by virFDStreamAddCallback
With this change 'fd' is the actual plain file the thread is reading to/from, so the callbacks are being registered against the plain file, not the pipe.
poll/select on POSIX always reports plain files as readable/writable even when they would block. So with this change we're just going to busy loop in the main event thread even when we'll block, which defeats the whole purpose of having a iohelper and/or thread.
Oh, I've misunderstood what we've discussed on IRC then. The way I've understood it was that if an FD is set to nonblock mode and poll() claims there are some data available, subsequent read() might block. If that was the case we would be safe with this code. However, I didn't expect poll() to lie. Any link for further reading on this? I guess it's not only us who has to deal with this problem. Basically any application with poll() and disk read()/write() has to suffer from this. So what are our options here? Because I don't see any right now. Michal

On Thu, Apr 13, 2017 at 06:52:31PM +0200, Michal Privoznik wrote:
On 04/13/2017 03:55 PM, Daniel P. Berrange wrote:
On Thu, Apr 13, 2017 at 03:31:14PM +0200, Michal Privoznik wrote:
One big downside of using the pipe to transfer the data is that we can really transfer just bare data. No metadata can be carried through unless some formatted messages are introduced. That would be quite painful to achieve so let's use a message queue. It's fairly easy to exchange info between threads now that iohelper is no longer used.
I'm not seeing how this works correctly with the event loop.
@@ -752,8 +1014,6 @@ virFDStreamOpenFileInternal(virStreamPtr st, if ((st->flags & VIR_STREAM_NONBLOCK) && ((!S_ISCHR(sb.st_mode) && !S_ISFIFO(sb.st_mode)) || forceIOHelper)) { - int fds[2] = { -1, -1 }; - if ((oflags & O_ACCMODE) == O_RDWR) { virReportError(VIR_ERR_INTERNAL_ERROR, _("%s: Cannot request read and write flags together"), @@ -761,12 +1021,6 @@ virFDStreamOpenFileInternal(virStreamPtr st, goto error; }
- if (pipe(fds) < 0) { - virReportSystemError(errno, "%s", - _("Unable to create pipe")); - goto error; - }
Here we previously created the pipe....
@@ -775,18 +1029,14 @@ virFDStreamOpenFileInternal(virStreamPtr st,
if ((oflags & O_ACCMODE) == O_RDONLY) { threadData->fdin = fd; - threadData->fdout = fds[1]; - if (VIR_STRDUP(threadData->fdinname, path) < 0 || - VIR_STRDUP(threadData->fdoutname, "pipe") < 0) + threadData->fdout = -1; + if (VIR_STRDUP(threadData->fdinname, path) < 0) goto error; - fd = fds[0];
And here we set 'fd' to be the pipe
} else { - threadData->fdin = fds[0]; + threadData->fdin = -1; threadData->fdout = fd; - if (VIR_STRDUP(threadData->fdinname, "pipe") < 0 || - VIR_STRDUP(threadData->fdoutname, path) < 0) + if (VIR_STRDUP(threadData->fdoutname, path) < 0) goto error; - fd = fds[1];
Likewise here
} }
...now here 'fd' is passed to virFDStreamOpenInternal() and is the thing that the event loop watches are registered against by virFDStreamAddCallback
With this change 'fd' is the actual plain file the thread is reading to/from, so the callbacks are being registered against the plain file, not the pipe.
poll/select on POSIX always reports plain files as readable/writable even when they would block. So with this change we're just going to busy loop in the main event thread even when we'll block, which defeats the whole purpose of having a iohelper and/or thread.
Oh, I've misunderstood what we've discussed on IRC then. The way I've understood it was that if an FD is set to nonblock mode and poll() claims there are some data available, subsequent read() might block. If that was the case we would be safe with this code. However, I didn't expect poll() to lie.
This code wouldn't be safe - anytime poll claims data available, we *must* be able to read without blocking.
Any link for further reading on this? I guess it's not only us who has to deal with this problem. Basically any application with poll() and disk read()/write() has to suffer from this.
Yes, that's correct - QEMU has the same issue for example - it is why there is no 'file:' protocol for migration for example - it would block the QEMU main loop.
So what are our options here? Because I don't see any right now.
IIUC, you didn't want to use a pipe because you want to send structured messages, not just plain data. If we just have a linked list of messages there's nothing we can poll on, so we need to keep the pipe in use, but find a way to get the special messages in the flow. I think we could do a trick where we have two pipes in use, one for monitoring the readability, and one for monitoring writability. When the I/O thread has data on the queue ready for read by the main thread, it can write a single byte to the read-monitor pipe. When the I/O thread is ready to accept more data to write from the main thread, it can write a single byte to the write-monitor pipe. The main thread would monitor for POLLIN condition on both the read-monitor pipe and write-monitor pipe. BTW, we also need to make sure the I/O thread doesn't proactively queue too much data on the message queue when reading it, in case the main thread is being slow at consuming this read data and sending it to the TCP client. Regards, Daniel -- |: http://berrange.com -o- http://www.flickr.com/photos/dberrange/ :| |: http://libvirt.org -o- http://virt-manager.org :| |: http://entangle-photo.org -o- http://search.cpan.org/~danberr/ :|

On 04/13/2017 07:13 PM, Daniel P. Berrange wrote:
On Thu, Apr 13, 2017 at 06:52:31PM +0200, Michal Privoznik wrote:
On 04/13/2017 03:55 PM, Daniel P. Berrange wrote:
On Thu, Apr 13, 2017 at 03:31:14PM +0200, Michal Privoznik wrote:
One big downside of using the pipe to transfer the data is that we can really transfer just bare data. No metadata can be carried through unless some formatted messages are introduced. That would be quite painful to achieve so let's use a message queue. It's fairly easy to exchange info between threads now that iohelper is no longer used.
I'm not seeing how this works correctly with the event loop.
@@ -752,8 +1014,6 @@ virFDStreamOpenFileInternal(virStreamPtr st, if ((st->flags & VIR_STREAM_NONBLOCK) && ((!S_ISCHR(sb.st_mode) && !S_ISFIFO(sb.st_mode)) || forceIOHelper)) { - int fds[2] = { -1, -1 }; - if ((oflags & O_ACCMODE) == O_RDWR) { virReportError(VIR_ERR_INTERNAL_ERROR, _("%s: Cannot request read and write flags together"), @@ -761,12 +1021,6 @@ virFDStreamOpenFileInternal(virStreamPtr st, goto error; }
- if (pipe(fds) < 0) { - virReportSystemError(errno, "%s", - _("Unable to create pipe")); - goto error; - }
Here we previously created the pipe....
@@ -775,18 +1029,14 @@ virFDStreamOpenFileInternal(virStreamPtr st,
if ((oflags & O_ACCMODE) == O_RDONLY) { threadData->fdin = fd; - threadData->fdout = fds[1]; - if (VIR_STRDUP(threadData->fdinname, path) < 0 || - VIR_STRDUP(threadData->fdoutname, "pipe") < 0) + threadData->fdout = -1; + if (VIR_STRDUP(threadData->fdinname, path) < 0) goto error; - fd = fds[0];
And here we set 'fd' to be the pipe
} else { - threadData->fdin = fds[0]; + threadData->fdin = -1; threadData->fdout = fd; - if (VIR_STRDUP(threadData->fdinname, "pipe") < 0 || - VIR_STRDUP(threadData->fdoutname, path) < 0) + if (VIR_STRDUP(threadData->fdoutname, path) < 0) goto error; - fd = fds[1];
Likewise here
} }
...now here 'fd' is passed to virFDStreamOpenInternal() and is the thing that the event loop watches are registered against by virFDStreamAddCallback
With this change 'fd' is the actual plain file the thread is reading to/from, so the callbacks are being registered against the plain file, not the pipe.
poll/select on POSIX always reports plain files as readable/writable even when they would block. So with this change we're just going to busy loop in the main event thread even when we'll block, which defeats the whole purpose of having a iohelper and/or thread.
Oh, I've misunderstood what we've discussed on IRC then. The way I've understood it was that if an FD is set to nonblock mode and poll() claims there are some data available, subsequent read() might block. If that was the case we would be safe with this code. However, I didn't expect poll() to lie.
This code wouldn't be safe - anytime poll claims data available, we *must* be able to read without blocking.
Any link for further reading on this? I guess it's not only us who has to deal with this problem. Basically any application with poll() and disk read()/write() has to suffer from this.
Yes, that's correct - QEMU has the same issue for example - it is why there is no 'file:' protocol for migration for example - it would block the QEMU main loop.
So what are our options here? Because I don't see any right now.
IIUC, you didn't want to use a pipe because you want to send structured messages, not just plain data. If we just have a linked list of messages there's nothing we can poll on, so we need to keep the pipe in use, but find a way to get the special messages in the flow.
I think we could do a trick where we have two pipes in use, one for monitoring the readability, and one for monitoring writability.
When the I/O thread has data on the queue ready for read by the main thread, it can write a single byte to the read-monitor pipe.
When the I/O thread is ready to accept more data to write from the main thread, it can write a single byte to the write-monitor pipe.
The main thread would monitor for POLLIN condition on both the read-monitor pipe and write-monitor pipe.
Ah, indeed. This could work. But I also thought over different approach. What I need really is transfer "you're in a data/hole X bytes long" besides actual data. So I can use pipe for transferring the data as is currently, and store the metadata into a structured message that would the thread write/read and event loop read/write.
BTW, we also need to make sure the I/O thread doesn't proactively queue too much data on the message queue when reading it, in case the main thread is being slow at consuming this read data and sending it to the TCP client.
Sure. Currently, with this implementation there's always one message with 4MiB buffer in the queue. Even though it's prepared for a queue of messages, there is no more than 1 message in the queue really. Michal

On Tue, Apr 18, 2017 at 02:00:09PM +0200, Michal Privoznik wrote:
On 04/13/2017 07:13 PM, Daniel P. Berrange wrote:
On Thu, Apr 13, 2017 at 06:52:31PM +0200, Michal Privoznik wrote:
On 04/13/2017 03:55 PM, Daniel P. Berrange wrote:
On Thu, Apr 13, 2017 at 03:31:14PM +0200, Michal Privoznik wrote:
One big downside of using the pipe to transfer the data is that we can really transfer just bare data. No metadata can be carried through unless some formatted messages are introduced. That would be quite painful to achieve so let's use a message queue. It's fairly easy to exchange info between threads now that iohelper is no longer used.
I'm not seeing how this works correctly with the event loop.
@@ -752,8 +1014,6 @@ virFDStreamOpenFileInternal(virStreamPtr st, if ((st->flags & VIR_STREAM_NONBLOCK) && ((!S_ISCHR(sb.st_mode) && !S_ISFIFO(sb.st_mode)) || forceIOHelper)) { - int fds[2] = { -1, -1 }; - if ((oflags & O_ACCMODE) == O_RDWR) { virReportError(VIR_ERR_INTERNAL_ERROR, _("%s: Cannot request read and write flags together"), @@ -761,12 +1021,6 @@ virFDStreamOpenFileInternal(virStreamPtr st, goto error; }
- if (pipe(fds) < 0) { - virReportSystemError(errno, "%s", - _("Unable to create pipe")); - goto error; - }
Here we previously created the pipe....
@@ -775,18 +1029,14 @@ virFDStreamOpenFileInternal(virStreamPtr st,
if ((oflags & O_ACCMODE) == O_RDONLY) { threadData->fdin = fd; - threadData->fdout = fds[1]; - if (VIR_STRDUP(threadData->fdinname, path) < 0 || - VIR_STRDUP(threadData->fdoutname, "pipe") < 0) + threadData->fdout = -1; + if (VIR_STRDUP(threadData->fdinname, path) < 0) goto error; - fd = fds[0];
And here we set 'fd' to be the pipe
} else { - threadData->fdin = fds[0]; + threadData->fdin = -1; threadData->fdout = fd; - if (VIR_STRDUP(threadData->fdinname, "pipe") < 0 || - VIR_STRDUP(threadData->fdoutname, path) < 0) + if (VIR_STRDUP(threadData->fdoutname, path) < 0) goto error; - fd = fds[1];
Likewise here
} }
...now here 'fd' is passed to virFDStreamOpenInternal() and is the thing that the event loop watches are registered against by virFDStreamAddCallback
With this change 'fd' is the actual plain file the thread is reading to/from, so the callbacks are being registered against the plain file, not the pipe.
poll/select on POSIX always reports plain files as readable/writable even when they would block. So with this change we're just going to busy loop in the main event thread even when we'll block, which defeats the whole purpose of having a iohelper and/or thread.
Oh, I've misunderstood what we've discussed on IRC then. The way I've understood it was that if an FD is set to nonblock mode and poll() claims there are some data available, subsequent read() might block. If that was the case we would be safe with this code. However, I didn't expect poll() to lie.
This code wouldn't be safe - anytime poll claims data available, we *must* be able to read without blocking.
Any link for further reading on this? I guess it's not only us who has to deal with this problem. Basically any application with poll() and disk read()/write() has to suffer from this.
Yes, that's correct - QEMU has the same issue for example - it is why there is no 'file:' protocol for migration for example - it would block the QEMU main loop.
So what are our options here? Because I don't see any right now.
IIUC, you didn't want to use a pipe because you want to send structured messages, not just plain data. If we just have a linked list of messages there's nothing we can poll on, so we need to keep the pipe in use, but find a way to get the special messages in the flow.
I think we could do a trick where we have two pipes in use, one for monitoring the readability, and one for monitoring writability.
When the I/O thread has data on the queue ready for read by the main thread, it can write a single byte to the read-monitor pipe.
When the I/O thread is ready to accept more data to write from the main thread, it can write a single byte to the write-monitor pipe.
The main thread would monitor for POLLIN condition on both the read-monitor pipe and write-monitor pipe.
Ah, indeed. This could work. But I also thought over different approach. What I need really is transfer "you're in a data/hole X bytes long" besides actual data. So I can use pipe for transferring the data as is currently, and store the metadata into a structured message that would the thread write/read and event loop read/write.
Sure, that works too. Just depends how much you care about optimizing performance - avoiding the pipe removes the data copies between kerenl and userspace and back again, which could improve throughput.
BTW, we also need to make sure the I/O thread doesn't proactively queue too much data on the message queue when reading it, in case the main thread is being slow at consuming this read data and sending it to the TCP client.
Sure. Currently, with this implementation there's always one message with 4MiB buffer in the queue. Even though it's prepared for a queue of messages, there is no more than 1 message in the queue really.
Regards, Daniel -- |: http://berrange.com -o- http://www.flickr.com/photos/dberrange/ :| |: http://libvirt.org -o- http://virt-manager.org :| |: http://entangle-photo.org -o- http://search.cpan.org/~danberr/ :|

On 04/18/2017 02:03 PM, Daniel P. Berrange wrote:
On Tue, Apr 18, 2017 at 02:00:09PM +0200, Michal Privoznik wrote:
On 04/13/2017 07:13 PM, Daniel P. Berrange wrote:
On Thu, Apr 13, 2017 at 06:52:31PM +0200, Michal Privoznik wrote:
On 04/13/2017 03:55 PM, Daniel P. Berrange wrote:
On Thu, Apr 13, 2017 at 03:31:14PM +0200, Michal Privoznik wrote:
One big downside of using the pipe to transfer the data is that we can really transfer just bare data. No metadata can be carried through unless some formatted messages are introduced. That would be quite painful to achieve so let's use a message queue. It's fairly easy to exchange info between threads now that iohelper is no longer used.
I'm not seeing how this works correctly with the event loop.
@@ -752,8 +1014,6 @@ virFDStreamOpenFileInternal(virStreamPtr st, if ((st->flags & VIR_STREAM_NONBLOCK) && ((!S_ISCHR(sb.st_mode) && !S_ISFIFO(sb.st_mode)) || forceIOHelper)) { - int fds[2] = { -1, -1 }; - if ((oflags & O_ACCMODE) == O_RDWR) { virReportError(VIR_ERR_INTERNAL_ERROR, _("%s: Cannot request read and write flags together"), @@ -761,12 +1021,6 @@ virFDStreamOpenFileInternal(virStreamPtr st, goto error; }
- if (pipe(fds) < 0) { - virReportSystemError(errno, "%s", - _("Unable to create pipe")); - goto error; - }
Here we previously created the pipe....
@@ -775,18 +1029,14 @@ virFDStreamOpenFileInternal(virStreamPtr st,
if ((oflags & O_ACCMODE) == O_RDONLY) { threadData->fdin = fd; - threadData->fdout = fds[1]; - if (VIR_STRDUP(threadData->fdinname, path) < 0 || - VIR_STRDUP(threadData->fdoutname, "pipe") < 0) + threadData->fdout = -1; + if (VIR_STRDUP(threadData->fdinname, path) < 0) goto error; - fd = fds[0];
And here we set 'fd' to be the pipe
} else { - threadData->fdin = fds[0]; + threadData->fdin = -1; threadData->fdout = fd; - if (VIR_STRDUP(threadData->fdinname, "pipe") < 0 || - VIR_STRDUP(threadData->fdoutname, path) < 0) + if (VIR_STRDUP(threadData->fdoutname, path) < 0) goto error; - fd = fds[1];
Likewise here
} }
...now here 'fd' is passed to virFDStreamOpenInternal() and is the thing that the event loop watches are registered against by virFDStreamAddCallback
With this change 'fd' is the actual plain file the thread is reading to/from, so the callbacks are being registered against the plain file, not the pipe.
poll/select on POSIX always reports plain files as readable/writable even when they would block. So with this change we're just going to busy loop in the main event thread even when we'll block, which defeats the whole purpose of having a iohelper and/or thread.
Oh, I've misunderstood what we've discussed on IRC then. The way I've understood it was that if an FD is set to nonblock mode and poll() claims there are some data available, subsequent read() might block. If that was the case we would be safe with this code. However, I didn't expect poll() to lie.
This code wouldn't be safe - anytime poll claims data available, we *must* be able to read without blocking.
Any link for further reading on this? I guess it's not only us who has to deal with this problem. Basically any application with poll() and disk read()/write() has to suffer from this.
Yes, that's correct - QEMU has the same issue for example - it is why there is no 'file:' protocol for migration for example - it would block the QEMU main loop.
So what are our options here? Because I don't see any right now.
IIUC, you didn't want to use a pipe because you want to send structured messages, not just plain data. If we just have a linked list of messages there's nothing we can poll on, so we need to keep the pipe in use, but find a way to get the special messages in the flow.
I think we could do a trick where we have two pipes in use, one for monitoring the readability, and one for monitoring writability.
When the I/O thread has data on the queue ready for read by the main thread, it can write a single byte to the read-monitor pipe.
When the I/O thread is ready to accept more data to write from the main thread, it can write a single byte to the write-monitor pipe.
The main thread would monitor for POLLIN condition on both the read-monitor pipe and write-monitor pipe.
Ah, indeed. This could work. But I also thought over different approach. What I need really is transfer "you're in a data/hole X bytes long" besides actual data. So I can use pipe for transferring the data as is currently, and store the metadata into a structured message that would the thread write/read and event loop read/write.
Sure, that works too. Just depends how much you care about optimizing performance - avoiding the pipe removes the data copies between kerenl and userspace and back again, which could improve throughput.
Good point. So let me respin my patches with your approach implemented. Michal

After 1eb6647979f8c nobody calls the iohelper with 6 arguments. Everybody uses the other mode. Well, the only user of iohelper after the previous commit is virFileWrapperFd really. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/util/iohelper.c | 72 ++++------------------------------------------------- 1 file changed, 5 insertions(+), 67 deletions(-) diff --git a/src/util/iohelper.c b/src/util/iohelper.c index 00f31e7..d7bf5c7 100644 --- a/src/util/iohelper.c +++ b/src/util/iohelper.c @@ -44,35 +44,6 @@ #define VIR_FROM_THIS VIR_FROM_STORAGE static int -prepare(const char *path, int oflags, int mode, - unsigned long long offset) -{ - int fd = -1; - - if (oflags & O_CREAT) { - fd = open(path, oflags, mode); - } else { - fd = open(path, oflags); - } - if (fd < 0) { - virReportSystemError(errno, _("Unable to open %s"), path); - goto cleanup; - } - - if (offset) { - if (lseek(fd, offset, SEEK_SET) < 0) { - virReportSystemError(errno, _("Unable to seek %s to %llu"), - path, offset); - VIR_FORCE_CLOSE(fd); - goto cleanup; - } - } - - cleanup: - return fd; -} - -static int runIO(const char *path, int fd, int oflags, unsigned long long length) { void *base = NULL; /* Location to be freed */ @@ -207,9 +178,7 @@ usage(int status) if (status) { fprintf(stderr, _("%s: try --help for more details"), program_name); } else { - printf(_("Usage: %s FILENAME OFLAGS MODE OFFSET LENGTH DELETE\n" - " or: %s FILENAME LENGTH FD\n"), - program_name, program_name); + printf(_("Usage: %s FILENAME LENGTH FD\n"), program_name); } exit(status); } @@ -218,13 +187,9 @@ int main(int argc, char **argv) { const char *path; - unsigned long long offset; unsigned long long length; int oflags = -1; - int mode; - unsigned int delete = 0; int fd = -1; - int lengthIndex = 0; program_name = argv[0]; @@ -239,31 +204,13 @@ main(int argc, char **argv) if (argc > 1 && STREQ(argv[1], "--help")) usage(EXIT_SUCCESS); - if (argc == 7) { /* FILENAME OFLAGS MODE OFFSET LENGTH DELETE */ - lengthIndex = 5; - if (virStrToLong_i(argv[2], NULL, 10, &oflags) < 0) { - fprintf(stderr, _("%s: malformed file flags %s"), + if (argc == 4) { /* FILENAME LENGTH FD */ + if (virStrToLong_ull(argv[2], NULL, 10, &length) < 0) { + fprintf(stderr, _("%s: malformed file length %s"), program_name, argv[2]); exit(EXIT_FAILURE); } - if (virStrToLong_i(argv[3], NULL, 10, &mode) < 0) { - fprintf(stderr, _("%s: malformed file mode %s"), - program_name, argv[3]); - exit(EXIT_FAILURE); - } - if (virStrToLong_ull(argv[4], NULL, 10, &offset) < 0) { - fprintf(stderr, _("%s: malformed file offset %s"), - program_name, argv[4]); - exit(EXIT_FAILURE); - } - if (argc == 7 && virStrToLong_ui(argv[6], NULL, 10, &delete) < 0) { - fprintf(stderr, _("%s: malformed delete flag %s"), - program_name, argv[6]); - exit(EXIT_FAILURE); - } - fd = prepare(path, oflags, mode, offset); - } else if (argc == 4) { /* FILENAME LENGTH FD */ - lengthIndex = 2; + if (virStrToLong_i(argv[3], NULL, 10, &fd) < 0) { fprintf(stderr, _("%s: malformed fd %s"), program_name, argv[3]); @@ -287,18 +234,9 @@ main(int argc, char **argv) usage(EXIT_FAILURE); } - if (virStrToLong_ull(argv[lengthIndex], NULL, 10, &length) < 0) { - fprintf(stderr, _("%s: malformed file length %s"), - program_name, argv[lengthIndex]); - exit(EXIT_FAILURE); - } - if (fd < 0 || runIO(path, fd, oflags, length) < 0) goto error; - if (delete) - unlink(path); - return 0; error: -- 2.10.2

This function takes a FD and determines whether the current position is in data section or in a hole. In addition to that, it also determines how much bytes are there remaining till the current section ends. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/libvirt_private.syms | 1 + src/util/virfile.c | 81 +++++++++++++++++++ src/util/virfile.h | 3 + tests/virfiletest.c | 203 +++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 288 insertions(+) diff --git a/src/libvirt_private.syms b/src/libvirt_private.syms index 8509f63..3ff4a8b 100644 --- a/src/libvirt_private.syms +++ b/src/libvirt_private.syms @@ -1613,6 +1613,7 @@ virFileGetHugepageSize; virFileGetMountReverseSubtree; virFileGetMountSubtree; virFileHasSuffix; +virFileInData; virFileIsAbsPath; virFileIsDir; virFileIsExecutable; diff --git a/src/util/virfile.c b/src/util/virfile.c index cbfa384..093125c 100644 --- a/src/util/virfile.c +++ b/src/util/virfile.c @@ -3793,6 +3793,87 @@ virFileComparePaths(const char *p1, const char *p2) cleanup: VIR_FREE(res1); VIR_FREE(res2); + + return ret; +} + + +int virFileInData(int fd, + int *inData, + unsigned long long *length) +{ + int ret = -1; + off_t cur, data, hole, end; + + /* Get current position */ + cur = lseek(fd, 0, SEEK_CUR); + if (cur == (off_t) -1) { + virReportSystemError(errno, "%s", + _("Unable to get current position in file")); + goto cleanup; + } + + /* Now try to get data and hole offsets */ + data = lseek(fd, cur, SEEK_DATA); + + /* There are four options: + * 1) data == cur; @cur is in data + * 2) data > cur; @cur is in a hole, next data at @data + * 3) data < 0, errno = ENXIO; either @cur is in trailing hole, or @cur is beyond EOF. + * 4) data < 0, errno != ENXIO; we learned nothing + */ + + if (data == (off_t) -1) { + /* cases 3 and 4 */ + if (errno != ENXIO) { + virReportSystemError(errno, "%s", + _("Unable to seek to data")); + goto cleanup; + } + + *inData = 0; + /* There are two situations now. There is always an + * implicit hole at EOF. However, there might be a + * trailing hole just before EOF too. If that's the case + * report it. */ + if ((end = lseek(fd, 0, SEEK_END)) == (off_t) -1) { + virReportSystemError(errno, "%s", + _("Unable to seek to EOF")); + goto cleanup; + } + *length = end - cur; + } else if (data > cur) { + /* case 2 */ + *inData = 0; + *length = data - cur; + } else { + /* case 1 */ + *inData = 1; + + /* We don't know where does the next hole start. Let's + * find out. Here we get the same 4 possibilities as + * described above.*/ + hole = lseek(fd, data, SEEK_HOLE); + if (hole == (off_t) -1 || hole == data) { + /* cases 1, 3 and 4 */ + /* Wait a second. The reason why we are here is + * because we are in data. But at the same time we + * are in a trailing hole? Wut!? Do the best what we + * can do here. */ + virReportSystemError(errno, "%s", + _("unable to seek to hole")); + goto cleanup; + } else { + /* case 2 */ + *length = (hole - data); + } + } + + ret = 0; + cleanup: + /* At any rate, reposition back to where we started. */ + if (cur != (off_t) -1) + ignore_value(lseek(fd, cur, SEEK_SET)); return ret; } diff --git a/src/util/virfile.h b/src/util/virfile.h index ba1c57c..3ebc117 100644 --- a/src/util/virfile.h +++ b/src/util/virfile.h @@ -340,4 +340,7 @@ int virFileReadValueInt(const char *path, int *value); int virFileReadValueUint(const char *path, unsigned int *value); int virFileReadValueBitmap(const char *path, int maxlen, virBitmapPtr *value); +int virFileInData(int fd, + int *inData, + unsigned long long *length); #endif /* __VIR_FILE_H */ diff --git a/tests/virfiletest.c b/tests/virfiletest.c index 702a76a..3e298dc 100644 --- a/tests/virfiletest.c +++ b/tests/virfiletest.c @@ -21,6 +21,7 @@ #include <config.h> #include <stdlib.h> +#include <fcntl.h> #include "testutils.h" #include "virfile.h" @@ -119,6 +120,190 @@ testFileSanitizePath(const void *opaque) static int +makeSparseFile(const off_t offsets[], + const bool startData); + +#ifdef __linux__ +/* Create a sparse file. @offsets in KiB. */ +static int +makeSparseFile(const off_t offsets[], + const bool startData) +{ + int fd = -1; + char path[] = abs_builddir "fileInData.XXXXXX"; + off_t len = 0; + size_t i; + + if ((fd = mkostemp(path, O_CLOEXEC|O_RDWR)) < 0) + goto error; + + if (unlink(path) < 0) + goto error; + + for (i = 0; offsets[i] != (off_t) -1; i++) + len += offsets[i] * 1024; + + while (len) { + const char buf[] = "abcdefghijklmnopqrstuvwxyz"; + off_t toWrite = sizeof(buf); + + if (toWrite > len) + toWrite = len; + + if (safewrite(fd, buf, toWrite) < 0) { + fprintf(stderr, "unable to write to %s (errno=%d)\n", path, errno); + goto error; + } + + len -= toWrite; + } + + len = 0; + for (i = 0; offsets[i] != (off_t) -1; i++) { + bool inData = startData; + + if (i % 2) + inData = !inData; + + if (!inData && + fallocate(fd, + FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE, + len, offsets[i] * 1024) < 0) { + fprintf(stderr, "unable to punch a hole at offset %lld length %lld\n", + (long long) len, (long long) offsets[i]); + goto error; + } + + len += offsets[i] * 1024; + } + + if (lseek(fd, 0, SEEK_SET) == (off_t) -1) { + fprintf(stderr, "unable to lseek (errno=%d)\n", errno); + goto error; + } + + return fd; + error: + VIR_FORCE_CLOSE(fd); + return -1; +} + +#else /* !__linux__ */ + +static int +makeSparseFile(const off_t offsets[] ATTRIBUTE_UNUSED, + const bool startData ATTRIBUTE_UNUSED) +{ + return -1; +} + +#endif /* !__linux__ */ + + +#define EXTENT 4 +static bool +holesSupported(void) +{ + off_t offsets[] = {EXTENT, EXTENT, EXTENT, -1}; + off_t tmp; + int fd; + bool ret = false; + + if ((fd = makeSparseFile(offsets, true)) < 0) + goto cleanup; + + /* The way this works is: there are 4K of data followed by 4K hole followed + * by 4K hole again. Check if the filesystem we are running the test suite + * on supports holes. */ + if ((tmp = lseek(fd, 0, SEEK_DATA)) == (off_t) -1) + goto cleanup; + + if (tmp != 0) + goto cleanup; + + if ((tmp = lseek(fd, tmp, SEEK_HOLE)) == (off_t) -1) + goto cleanup; + + if (tmp != EXTENT * 1024) + goto cleanup; + + if ((tmp = lseek(fd, tmp, SEEK_DATA)) == (off_t) -1) + goto cleanup; + + if (tmp != 2 * EXTENT * 1024) + goto cleanup; + + if ((tmp = lseek(fd, tmp, SEEK_HOLE)) == (off_t) -1) + goto cleanup; + + if (tmp != 3 * EXTENT * 1024) + goto cleanup; + + ret = true; + cleanup: + VIR_FORCE_CLOSE(fd); + return ret; +} + + +struct testFileInData { + bool startData; /* whether the list of offsets starts with data section */ + off_t *offsets; +}; + + +static int +testFileInData(const void *opaque) +{ + const struct testFileInData *data = opaque; + int fd = -1; + int ret = -1; + size_t i; + + if ((fd = makeSparseFile(data->offsets, data->startData)) < 0) + goto cleanup; + + for (i = 0; data->offsets[i] != (off_t) -1; i++) { + bool shouldInData = data->startData; + int realInData; + unsigned long long shouldLen; + unsigned long long realLen; + + if (i % 2) + shouldInData = !shouldInData; + + if (virFileInData(fd, &realInData, &realLen) < 0) + goto cleanup; + + if (realInData != shouldInData) { + fprintf(stderr, "Unexpected data/hole. Expected %s got %s\n", + shouldInData ? "data" : "hole", + realInData ? "data" : "hole"); + goto cleanup; + } + + shouldLen = data->offsets[i] * 1024; + if (realLen != shouldLen) { + fprintf(stderr, "Unexpected section length. Expected %lld got %lld\n", + shouldLen, realLen); + goto cleanup; + } + + if (lseek(fd, shouldLen, SEEK_CUR) < 0) { + fprintf(stderr, "Unable to seek\n"); + goto cleanup; + } + } + + ret = 0; + + cleanup: + VIR_FORCE_CLOSE(fd); + return ret; +} + + +static int mymain(void) { int ret = 0; @@ -186,6 +371,24 @@ mymain(void) DO_TEST_SANITIZE_PATH_SAME("gluster://bar.baz/fooo//hoo"); DO_TEST_SANITIZE_PATH_SAME("gluster://bar.baz/fooo///////hoo"); +#define DO_TEST_IN_DATA(inData, ...) \ + do { \ + off_t offsets[] = {__VA_ARGS__, -1}; \ + struct testFileInData data = { \ + .startData = inData, .offsets = offsets, \ + }; \ + if (virTestRun(virTestCounterNext(), testFileInData, &data) < 0) \ + ret = -1; \ + } while (0) + + if (holesSupported()) { + DO_TEST_IN_DATA(true, 4, 4, 4); + DO_TEST_IN_DATA(false, 4, 4, 4); + DO_TEST_IN_DATA(true, 8, 8, 8); + DO_TEST_IN_DATA(false, 8, 8, 8); + DO_TEST_IN_DATA(true, 8, 16, 32, 64, 128, 256, 512); + DO_TEST_IN_DATA(false, 8, 16, 32, 64, 128, 256, 512); + } return ret != 0 ? EXIT_FAILURE : EXIT_SUCCESS; } -- 2.10.2

Although we already have virStreamRecv, just like some other older APIs it is missing @flags argument. This means, the function is not that flexible and therefore we need virStreamRecvFlags. The latter is going to be needed when we will want it to stop at a hole in stream. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- include/libvirt/libvirt-stream.h | 5 ++++ src/driver-stream.h | 7 +++++ src/libvirt-stream.c | 60 ++++++++++++++++++++++++++++++++++++++++ src/libvirt_public.syms | 5 ++++ 4 files changed, 77 insertions(+) diff --git a/include/libvirt/libvirt-stream.h b/include/libvirt/libvirt-stream.h index 831640d..bee2516 100644 --- a/include/libvirt/libvirt-stream.h +++ b/include/libvirt/libvirt-stream.h @@ -45,6 +45,11 @@ int virStreamRecv(virStreamPtr st, char *data, size_t nbytes); +int virStreamRecvFlags(virStreamPtr st, + char *data, + size_t nbytes, + unsigned int flags); + /** * virStreamSourceFunc: diff --git a/src/driver-stream.h b/src/driver-stream.h index 85b4e3b..d4b0480 100644 --- a/src/driver-stream.h +++ b/src/driver-stream.h @@ -36,6 +36,12 @@ typedef int size_t nbytes); typedef int +(*virDrvStreamRecvFlags)(virStreamPtr st, + char *data, + size_t nbytes, + unsigned int flags); + +typedef int (*virDrvStreamEventAddCallback)(virStreamPtr stream, int events, virStreamEventCallback cb, @@ -61,6 +67,7 @@ typedef virStreamDriver *virStreamDriverPtr; struct _virStreamDriver { virDrvStreamSend streamSend; virDrvStreamRecv streamRecv; + virDrvStreamRecvFlags streamRecvFlags; virDrvStreamEventAddCallback streamEventAddCallback; virDrvStreamEventUpdateCallback streamEventUpdateCallback; virDrvStreamEventRemoveCallback streamEventRemoveCallback; diff --git a/src/libvirt-stream.c b/src/libvirt-stream.c index 8384b37..80b2d47 100644 --- a/src/libvirt-stream.c +++ b/src/libvirt-stream.c @@ -286,6 +286,66 @@ virStreamRecv(virStreamPtr stream, /** + * virStreamRecvFlags: + * @stream: pointer to the stream object + * @data: buffer to read into from stream + * @nbytes: size of @data buffer + * @flags: extra flags; not used yet, so callers should always pass 0 + * + * Reads a series of bytes from the stream. This method may + * block the calling application for an arbitrary amount + * of time. + * + * This is just like virStreamRecv except this one has extra + * @flags. In fact, calling virStreamRecvFlags(stream, data, + * nbytes, 0) is equivalent to calling virStreamRecv(stream, + * data, nbytes) and vice versa. + * + * Returns 0 when the end of the stream is reached, at + * which time the caller should invoke virStreamFinish() + * to get confirmation of stream completion. + * + * Returns -1 upon error, at which time the stream will + * be marked as aborted, and the caller should now release + * the stream with virStreamFree. + * + * Returns -2 if there is no data pending to be read & the + * stream is marked as non-blocking. + */ +int +virStreamRecvFlags(virStreamPtr stream, + char *data, + size_t nbytes, + unsigned int flags) +{ + VIR_DEBUG("stream=%p, data=%p, nbytes=%zi flags=%x", + stream, data, nbytes, flags); + + virResetLastError(); + + virCheckStreamReturn(stream, -1); + virCheckNonNullArgGoto(data, error); + + if (stream->driver && + stream->driver->streamRecvFlags) { + int ret; + ret = (stream->driver->streamRecvFlags)(stream, data, nbytes, flags); + if (ret == -2) + return -2; + if (ret < 0) + goto error; + return ret; + } + + virReportUnsupportedError(); + + error: + virDispatchError(stream->conn); + return -1; +} + + +/** * virStreamSendAll: * @stream: pointer to the stream object * @handler: source callback for reading data from application diff --git a/src/libvirt_public.syms b/src/libvirt_public.syms index 428cf2e..af863bb 100644 --- a/src/libvirt_public.syms +++ b/src/libvirt_public.syms @@ -759,4 +759,9 @@ LIBVIRT_3.1.0 { virDomainSetVcpu; } LIBVIRT_3.0.0; +LIBVIRT_3.3.0 { + global: + virStreamRecvFlags; +} LIBVIRT_3.1.0; + # .... define new API here using predicted next version number .... -- 2.10.2

We have three virStreamDriver-s currently in our tree. virFDStream, remote driver and ESX driver.f or now, support for remote driver and ESX driver is sufficient, because implementation for virFDStream is going to be supplied later as it needs to be slightly different. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/esx/esx_stream.c | 16 +++++++++++++++- src/remote/remote_driver.c | 21 +++++++++++++++++---- 2 files changed, 32 insertions(+), 5 deletions(-) diff --git a/src/esx/esx_stream.c b/src/esx/esx_stream.c index fb9abbc..b820b38 100644 --- a/src/esx/esx_stream.c +++ b/src/esx/esx_stream.c @@ -252,12 +252,17 @@ esxStreamSend(virStreamPtr stream, const char *data, size_t nbytes) } static int -esxStreamRecv(virStreamPtr stream, char *data, size_t nbytes) +esxStreamRecvFlags(virStreamPtr stream, + char *data, + size_t nbytes, + unsigned int flags) { int result = -1; esxStreamPrivate *priv = stream->privateData; int status; + virCheckFlags(0, -1); + if (nbytes == 0) return 0; @@ -317,6 +322,14 @@ esxStreamRecv(virStreamPtr stream, char *data, size_t nbytes) return result; } +static int +esxStreamRecv(virStreamPtr stream, + char *data, + size_t nbytes) +{ + return esxStreamRecvFlags(stream, data, nbytes, 0); +} + static void esxFreeStreamPrivate(esxStreamPrivate **priv) { @@ -369,6 +382,7 @@ esxStreamAbort(virStreamPtr stream) virStreamDriver esxStreamDriver = { .streamSend = esxStreamSend, .streamRecv = esxStreamRecv, + .streamRecvFlags = esxStreamRecvFlags, /* FIXME: streamAddCallback missing */ /* FIXME: streamUpdateCallback missing */ /* FIXME: streamRemoveCallback missing */ diff --git a/src/remote/remote_driver.c b/src/remote/remote_driver.c index 1242bd6..718e322 100644 --- a/src/remote/remote_driver.c +++ b/src/remote/remote_driver.c @@ -5641,15 +5641,19 @@ remoteStreamSend(virStreamPtr st, static int -remoteStreamRecv(virStreamPtr st, - char *data, - size_t nbytes) +remoteStreamRecvFlags(virStreamPtr st, + char *data, + size_t nbytes, + unsigned int flags) { - VIR_DEBUG("st=%p data=%p nbytes=%zu", st, data, nbytes); + VIR_DEBUG("st=%p data=%p nbytes=%zu flags=%x", + st, data, nbytes, flags); struct private_data *priv = st->conn->privateData; virNetClientStreamPtr privst = st->privateData; int rv; + virCheckFlags(0, -1); + if (virNetClientStreamRaiseError(privst)) return -1; @@ -5671,6 +5675,14 @@ remoteStreamRecv(virStreamPtr st, return rv; } +static int +remoteStreamRecv(virStreamPtr st, + char *data, + size_t nbytes) +{ + return remoteStreamRecvFlags(st, data, nbytes, 0); +} + struct remoteStreamCallbackData { virStreamPtr st; virStreamEventCallback cb; @@ -5843,6 +5855,7 @@ remoteStreamAbort(virStreamPtr st) static virStreamDriver remoteStreamDrv = { .streamRecv = remoteStreamRecv, + .streamRecvFlags = remoteStreamRecvFlags, .streamSend = remoteStreamSend, .streamFinish = remoteStreamFinish, .streamAbort = remoteStreamAbort, -- 2.10.2

This API can be used to tell the other side of the stream to skip some bytes in the stream. This can be used to create a sparse file on the receiving side of a stream. It takes just one argument @length, which says how big the hole is. Since our streams are not rewindable like regular files, we don't need @whence argument like seek(2) has. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- include/libvirt/libvirt-stream.h | 3 +++ src/driver-stream.h | 5 ++++ src/libvirt-stream.c | 57 ++++++++++++++++++++++++++++++++++++++++ src/libvirt_public.syms | 1 + 4 files changed, 66 insertions(+) diff --git a/include/libvirt/libvirt-stream.h b/include/libvirt/libvirt-stream.h index bee2516..4e0a599 100644 --- a/include/libvirt/libvirt-stream.h +++ b/include/libvirt/libvirt-stream.h @@ -50,6 +50,9 @@ int virStreamRecvFlags(virStreamPtr st, size_t nbytes, unsigned int flags); +int virStreamSkip(virStreamPtr st, + unsigned long long length); + /** * virStreamSourceFunc: diff --git a/src/driver-stream.h b/src/driver-stream.h index d4b0480..20ea13f 100644 --- a/src/driver-stream.h +++ b/src/driver-stream.h @@ -42,6 +42,10 @@ typedef int unsigned int flags); typedef int +(*virDrvStreamSkip)(virStreamPtr st, + unsigned long long length); + +typedef int (*virDrvStreamEventAddCallback)(virStreamPtr stream, int events, virStreamEventCallback cb, @@ -68,6 +72,7 @@ struct _virStreamDriver { virDrvStreamSend streamSend; virDrvStreamRecv streamRecv; virDrvStreamRecvFlags streamRecvFlags; + virDrvStreamSkip streamSkip; virDrvStreamEventAddCallback streamEventAddCallback; virDrvStreamEventUpdateCallback streamEventUpdateCallback; virDrvStreamEventRemoveCallback streamEventRemoveCallback; diff --git a/src/libvirt-stream.c b/src/libvirt-stream.c index 80b2d47..55f3ef5 100644 --- a/src/libvirt-stream.c +++ b/src/libvirt-stream.c @@ -346,6 +346,63 @@ virStreamRecvFlags(virStreamPtr stream, /** + * virStreamSkip: + * @stream: pointer to the stream object + * @length: number of bytes to skip + * + * Skip @length bytes in the stream. This is useful when there's + * no actual data in the stream, just a hole. If that's the case, + * this API can be used to skip the hole properly instead of + * transmitting zeroes to the other side. + * + * An example using this with a hypothetical file upload API + * looks like: + * + * virStream st; + * + * while (1) { + * char buf[4096]; + * size_t len; + * if (..in hole...) { + * ..get hole size... + * virStreamSkip(st, len); + * } else { + * ...read len bytes... + * virStreamSend(st, buf, len); + * } + * } + * + * Returns 0 on success, + * -1 error + */ +int +virStreamSkip(virStreamPtr stream, + unsigned long long length) +{ + VIR_DEBUG("stream=%p, length=%llu", stream, length); + + virResetLastError(); + + virCheckStreamReturn(stream, -1); + + if (stream->driver && + stream->driver->streamSkip) { + int ret; + ret = (stream->driver->streamSkip)(stream, length); + if (ret < 0) + goto error; + return ret; + } + + virReportUnsupportedError(); + + error: + virDispatchError(stream->conn); + return -1; +} + + +/** * virStreamSendAll: * @stream: pointer to the stream object * @handler: source callback for reading data from application diff --git a/src/libvirt_public.syms b/src/libvirt_public.syms index af863bb..acadda8 100644 --- a/src/libvirt_public.syms +++ b/src/libvirt_public.syms @@ -762,6 +762,7 @@ LIBVIRT_3.1.0 { LIBVIRT_3.3.0 { global: virStreamRecvFlags; + virStreamSkip; } LIBVIRT_3.1.0; # .... define new API here using predicted next version number .... -- 2.10.2

This function is basically a counterpart for virStreamSkip. If one side of a stream called virStreamSkip() the other should call virStreamHoleSize() to get the size of the hole. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- include/libvirt/libvirt-stream.h | 3 +++ src/driver-stream.h | 5 +++++ src/libvirt-stream.c | 42 ++++++++++++++++++++++++++++++++++++++++ src/libvirt_public.syms | 1 + 4 files changed, 51 insertions(+) diff --git a/include/libvirt/libvirt-stream.h b/include/libvirt/libvirt-stream.h index 4e0a599..2ebda74 100644 --- a/include/libvirt/libvirt-stream.h +++ b/include/libvirt/libvirt-stream.h @@ -53,6 +53,9 @@ int virStreamRecvFlags(virStreamPtr st, int virStreamSkip(virStreamPtr st, unsigned long long length); +int virStreamHoleSize(virStreamPtr, + unsigned long long *length); + /** * virStreamSourceFunc: diff --git a/src/driver-stream.h b/src/driver-stream.h index 20ea13f..e196b6d 100644 --- a/src/driver-stream.h +++ b/src/driver-stream.h @@ -46,6 +46,10 @@ typedef int unsigned long long length); typedef int +(*virDrvStreamHoleSize)(virStreamPtr st, + unsigned long long *length); + +typedef int (*virDrvStreamEventAddCallback)(virStreamPtr stream, int events, virStreamEventCallback cb, @@ -73,6 +77,7 @@ struct _virStreamDriver { virDrvStreamRecv streamRecv; virDrvStreamRecvFlags streamRecvFlags; virDrvStreamSkip streamSkip; + virDrvStreamHoleSize streamHoleSize; virDrvStreamEventAddCallback streamEventAddCallback; virDrvStreamEventUpdateCallback streamEventUpdateCallback; virDrvStreamEventRemoveCallback streamEventRemoveCallback; diff --git a/src/libvirt-stream.c b/src/libvirt-stream.c index 55f3ef5..3ac9e0d 100644 --- a/src/libvirt-stream.c +++ b/src/libvirt-stream.c @@ -403,6 +403,48 @@ virStreamSkip(virStreamPtr stream, /** + * virStreamHoleSize: + * @stream: pointer to the stream object + * @length: number of bytes to skip + * + * This function is a counterpart to virStreamSkip(). That is, if + * one side of a stream has called virStreamSkip() the other side + * of the stream should call virStreamHoleSize() to retrieve the + * size of hole. If there's currently no hole in the stream, -1 + * is returned. + * + * Returns 0 on success, + * -1 on error + */ +int +virStreamHoleSize(virStreamPtr stream, + unsigned long long *length) +{ + VIR_DEBUG("stream=%p, length=%p", stream, length); + + virResetLastError(); + + virCheckStreamReturn(stream, -1); + virCheckNonNullArgReturn(length, -1); + + if (stream->driver && + stream->driver->streamHoleSize) { + int ret; + ret = (stream->driver->streamHoleSize)(stream, length); + if (ret < 0) + goto error; + return ret; + } + + virReportUnsupportedError(); + + error: + virDispatchError(stream->conn); + return -1; +} + + +/** * virStreamSendAll: * @stream: pointer to the stream object * @handler: source callback for reading data from application diff --git a/src/libvirt_public.syms b/src/libvirt_public.syms index acadda8..0e34eee 100644 --- a/src/libvirt_public.syms +++ b/src/libvirt_public.syms @@ -761,6 +761,7 @@ LIBVIRT_3.1.0 { LIBVIRT_3.3.0 { global: + virStreamHoleSize; virStreamRecvFlags; virStreamSkip; } LIBVIRT_3.1.0; -- 2.10.2

This flag is for virStreamRecvFlags API. Its purpose is to stop reading from the stream if a hole occurred as holes are to be threated separately. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- include/libvirt/libvirt-stream.h | 4 ++++ src/libvirt-stream.c | 30 +++++++++++++++++++++++++++++- 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/include/libvirt/libvirt-stream.h b/include/libvirt/libvirt-stream.h index 2ebda74..23fcc26 100644 --- a/include/libvirt/libvirt-stream.h +++ b/include/libvirt/libvirt-stream.h @@ -45,6 +45,10 @@ int virStreamRecv(virStreamPtr st, char *data, size_t nbytes); +typedef enum { + VIR_STREAM_RECV_STOP_AT_HOLE = (1 << 0), +} virStreamRecvFlagsValues; + int virStreamRecvFlags(virStreamPtr st, char *data, size_t nbytes, diff --git a/src/libvirt-stream.c b/src/libvirt-stream.c index 3ac9e0d..1162d33 100644 --- a/src/libvirt-stream.c +++ b/src/libvirt-stream.c @@ -290,7 +290,7 @@ virStreamRecv(virStreamPtr stream, * @stream: pointer to the stream object * @data: buffer to read into from stream * @nbytes: size of @data buffer - * @flags: extra flags; not used yet, so callers should always pass 0 + * @flags: bitwise-OR of virStreamRecvFlagsValues * * Reads a series of bytes from the stream. This method may * block the calling application for an arbitrary amount @@ -301,6 +301,29 @@ virStreamRecv(virStreamPtr stream, * nbytes, 0) is equivalent to calling virStreamRecv(stream, * data, nbytes) and vice versa. * + * If flag VIR_STREAM_RECV_STOP_AT_HOLE is set, this function + * will stop reading from stream if it has reached a hole. In + * that case, -3 is returned and virStreamHoleSize() should be + * called to get the hole size. An example using this flag might + * look like this: + * + * while (1) { + * char buf[4096]; + * + * int ret = virStreamRecvFlags(st, buf, len, VIR_STREAM_STOP_AT_HOLE); + * if (ret < 0) { + * if (ret == -3) { + * unsigned long long len; + * ret = virStreamHoleSize(st, &len); + * ...seek len bytes in target... + * } else { + * return -1; + * } + * } else { + * ...write buf to target... + * } + * } + * * Returns 0 when the end of the stream is reached, at * which time the caller should invoke virStreamFinish() * to get confirmation of stream completion. @@ -311,6 +334,9 @@ virStreamRecv(virStreamPtr stream, * * Returns -2 if there is no data pending to be read & the * stream is marked as non-blocking. + * + * Returns -3 if there is a hole in stream and caller requested + * to stop at a hole. */ int virStreamRecvFlags(virStreamPtr stream, @@ -332,6 +358,8 @@ virStreamRecvFlags(virStreamPtr stream, ret = (stream->driver->streamRecvFlags)(stream, data, nbytes, flags); if (ret == -2) return -2; + if (ret == -3) + return -3; if (ret < 0) goto error; return ret; -- 2.10.2

This is just a wrapper over new functions that have been just introduced: virStreamRecvFlags(), virStreamHoleSize(). It's very similar to virStreamRecvAll() except it handles sparse streams well. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- include/libvirt/libvirt-stream.h | 28 ++++++++- src/libvirt-stream.c | 119 +++++++++++++++++++++++++++++++++++++++ src/libvirt_public.syms | 1 + 3 files changed, 145 insertions(+), 3 deletions(-) diff --git a/include/libvirt/libvirt-stream.h b/include/libvirt/libvirt-stream.h index 23fcc26..e5f5126 100644 --- a/include/libvirt/libvirt-stream.h +++ b/include/libvirt/libvirt-stream.h @@ -102,9 +102,9 @@ int virStreamSendAll(virStreamPtr st, * @nbytes: size of the data array * @opaque: optional application provided data * - * The virStreamSinkFunc callback is used together - * with the virStreamRecvAll function for libvirt to - * provide the data that has been received. + * The virStreamSinkFunc callback is used together with the + * virStreamRecvAll or virStreamSparseRecvAll functions for + * libvirt to provide the data that has been received. * * The callback will be invoked multiple times, * providing data in small chunks. The application @@ -127,6 +127,28 @@ int virStreamRecvAll(virStreamPtr st, virStreamSinkFunc handler, void *opaque); +/** + * virStreamSinkHoleFunc: + * @st: the stream object + * @length: stream hole size + * @opaque: optional application provided data + * + * This callback is used together with the virStreamSparseRecvAll + * function for libvirt to provide the size of a hole that + * occurred in the stream. + * + * Returns 0 on success, + * -1 upon error + */ +typedef int (*virStreamSinkHoleFunc)(virStreamPtr st, + unsigned long long length, + void *opaque); + +int virStreamSparseRecvAll(virStreamPtr stream, + virStreamSinkFunc handler, + virStreamSinkHoleFunc holeHandler, + void *opaque); + typedef enum { VIR_STREAM_EVENT_READABLE = (1 << 0), VIR_STREAM_EVENT_WRITABLE = (1 << 1), diff --git a/src/libvirt-stream.c b/src/libvirt-stream.c index 1162d33..81190cc 100644 --- a/src/libvirt-stream.c +++ b/src/libvirt-stream.c @@ -660,6 +660,125 @@ virStreamRecvAll(virStreamPtr stream, /** + * virStreamSparseRecvAll: + * @stream: pointer to the stream object + * @handler: sink callback for writing data to application + * @holeHandler: stream hole callback for skipping holes + * @opaque: application defined data + * + * Receive the entire data stream, sending the data to the + * requested data sink. This is simply a convenient alternative + * to virStreamRecv, for apps that do blocking-I/O. + * + * An example using this with a hypothetical file download + * API looks like: + * + * int mysink(virStreamPtr st, const char *buf, int nbytes, void *opaque) { + * int *fd = opaque; + * + * return write(*fd, buf, nbytes); + * } + * + * int myskip(virStreamPtr st, unsigned long long offset, void *opaque) { + * int *fd = opaque; + * + * return lseek(*fd, offset, SEEK_CUR) == (off_t) -1 ? -1 : 0; + * } + * + * virStreamPtr st = virStreamNew(conn, 0); + * int fd = open("demo.iso", O_WRONLY); + * + * virConnectDownloadSparseFile(conn, st); + * if (virStreamSparseRecvAll(st, mysink, myskip, &fd) < 0) { + * ...report an error ... + * goto done; + * } + * if (virStreamFinish(st) < 0) + * ...report an error... + * virStreamFree(st); + * close(fd); + * + * Note that @opaque data is shared between both @handler and + * @holeHandler callbacks. + * + * Returns 0 if all the data was successfully received. The caller + * should invoke virStreamFinish(st) to flush the stream upon + * success and then virStreamFree + * + * Returns -1 upon any error, with virStreamAbort() already + * having been called, so the caller need only call + * virStreamFree() + */ +int +virStreamSparseRecvAll(virStreamPtr stream, + virStreamSinkFunc handler, + virStreamSinkHoleFunc holeHandler, + void *opaque) +{ + char *bytes = NULL; + size_t want = VIR_NET_MESSAGE_LEGACY_PAYLOAD_MAX; + const unsigned int flags = VIR_STREAM_RECV_STOP_AT_HOLE; + int ret = -1; + + VIR_DEBUG("stream=%p handler=%p holeHandler=%p opaque=%p", + stream, handler, holeHandler, opaque); + + virResetLastError(); + + virCheckStreamReturn(stream, -1); + virCheckNonNullArgGoto(handler, cleanup); + virCheckNonNullArgGoto(holeHandler, cleanup); + + if (stream->flags & VIR_STREAM_NONBLOCK) { + virReportError(VIR_ERR_OPERATION_INVALID, "%s", + _("data sinks cannot be used for non-blocking streams")); + goto cleanup; + } + + if (VIR_ALLOC_N(bytes, want) < 0) + goto cleanup; + + for (;;) { + int got, offset = 0; + unsigned long long holeLen; + got = virStreamRecvFlags(stream, bytes, want, flags); + if (got == -3) { + if (virStreamHoleSize(stream, &holeLen) < 0) { + virStreamAbort(stream); + goto cleanup; + } + + if (holeHandler(stream, holeLen, opaque) < 0) { + virStreamAbort(stream); + goto cleanup; + } + } else if (got < 0) { + goto cleanup; + } else if (got == 0) { + break; + } + while (offset < got) { + int done; + done = (handler)(stream, bytes + offset, got - offset, opaque); + if (done < 0) { + virStreamAbort(stream); + goto cleanup; + } + offset += done; + } + } + ret = 0; + + cleanup: + VIR_FREE(bytes); + + if (ret != 0) + virDispatchError(stream->conn); + + return ret; +} + +/** * virStreamEventAddCallback: * @stream: pointer to the stream object * @events: set of events to monitor diff --git a/src/libvirt_public.syms b/src/libvirt_public.syms index 0e34eee..008dc59 100644 --- a/src/libvirt_public.syms +++ b/src/libvirt_public.syms @@ -764,6 +764,7 @@ LIBVIRT_3.3.0 { virStreamHoleSize; virStreamRecvFlags; virStreamSkip; + virStreamSparseRecvAll; } LIBVIRT_3.1.0; # .... define new API here using predicted next version number .... -- 2.10.2

This is just a wrapper over new function that have been just introduced: virStreamSkip() . It's very similar to virStreamSendAll() except it handles sparse streams well. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- include/libvirt/libvirt-stream.h | 43 ++++++++++++++-- src/libvirt-stream.c | 104 +++++++++++++++++++++++++++++++++++++++ src/libvirt_public.syms | 1 + 3 files changed, 145 insertions(+), 3 deletions(-) diff --git a/include/libvirt/libvirt-stream.h b/include/libvirt/libvirt-stream.h index e5f5126..9aa728e 100644 --- a/include/libvirt/libvirt-stream.h +++ b/include/libvirt/libvirt-stream.h @@ -69,9 +69,9 @@ int virStreamHoleSize(virStreamPtr, * @nbytes: size of the data array * @opaque: optional application provided data * - * The virStreamSourceFunc callback is used together - * with the virStreamSendAll function for libvirt to - * obtain the data that is to be sent. + * The virStreamSourceFunc callback is used together with + * the virStreamSendAll and virStreamSparseSendAll functions + * for libvirt to obtain the data that is to be sent. * * The callback will be invoked multiple times, * fetching data in small chunks. The application @@ -95,6 +95,43 @@ int virStreamSendAll(virStreamPtr st, void *opaque); /** + * virStreamSourceHoleFunc: + * @st: the stream object + * @inData: are we in data section + * @length: how long is the section we are currently in + * @opaque: optional application provided data + * + * The virStreamSourceHoleFunc callback is used together + * with the virStreamSparseSendAll function for libvirt to + * obtain the length of section stream is currently in. + * + * Moreover, upon successful return, @length should be + * updated with how much bytes are there left until current + * section ends (be it data section or hole section) and if + * the stream is currently in data section, @inData should + * be set to a non-zero value and vice versa. + * As a corner case, there's an implicit hole at the end of + * each file. If that's the case, @inData should be set to 0 + * as well as @length. + * Moreover, this function should always leave the stream in + * data section. Either the one that we have been to prior + * calling this function, or the one that follows the hole + * we are in. + * + * Returns 0 on success, + * -1 upon error + */ +typedef int (*virStreamSourceHoleFunc)(virStreamPtr st, + int *inData, + unsigned long long *length, + void *opaque); + +int virStreamSparseSendAll(virStreamPtr st, + virStreamSourceFunc handler, + virStreamSourceHoleFunc holeHandler, + void *opaque); + +/** * virStreamSinkFunc: * * @st: the stream object diff --git a/src/libvirt-stream.c b/src/libvirt-stream.c index 81190cc..3a9edcd 100644 --- a/src/libvirt-stream.c +++ b/src/libvirt-stream.c @@ -565,7 +565,111 @@ virStreamSendAll(virStreamPtr stream, } + /** + * virStreamSparseSendAll: + * @stream: pointer to the stream object + * @handler: source callback for reading data from application + * @holeHandler: source callback for determining holes + * @opaque: application defined data + * + * Some dummy description here. + * + * Opaque data in @opaque are shared between @handler and @holeHandler. + * + * Returns 0 if all the data was successfully sent. The caller + * should invoke virStreamFinish(st) to flush the stream upon + * success and then virStreamFree + * + * Returns -1 upon any error, with virStreamAbort() already + * having been called, so the caller need only call + * virStreamFree() + */ +int virStreamSparseSendAll(virStreamPtr stream, + virStreamSourceFunc handler, + virStreamSourceHoleFunc holeHandler, + void *opaque) +{ + char *bytes = NULL; + size_t want = VIR_NET_MESSAGE_LEGACY_PAYLOAD_MAX; + int ret = -1; + unsigned long long dataLen = 0; + + VIR_DEBUG("stream=%p handler=%p holeHandler=%p opaque=%p", + stream, handler, holeHandler, opaque); + + virResetLastError(); + + virCheckStreamReturn(stream, -1); + virCheckNonNullArgGoto(handler, cleanup); + virCheckNonNullArgGoto(holeHandler, cleanup); + + if (stream->flags & VIR_STREAM_NONBLOCK) { + virReportError(VIR_ERR_OPERATION_INVALID, "%s", + _("data sources cannot be used for non-blocking streams")); + goto cleanup; + } + + if (VIR_ALLOC_N(bytes, want) < 0) + goto cleanup; + + for (;;) { + int inData, got, offset = 0; + unsigned long long sectionLen; + + if (!dataLen) { + if (holeHandler(stream, &inData, §ionLen, opaque) < 0) { + virStreamAbort(stream); + goto cleanup; + } + + if (!inData) { + if (sectionLen && virStreamSkip(stream, sectionLen) < 0) { + virStreamAbort(stream); + goto cleanup; + } + + if (sectionLen) + continue; + + /* Here inData == 0 and sectionLen == 0 as well. + * This means, we are in the trailing hole. Don't + * start new iteration but let virStreamSend() + * close the stream gracefully. */ + } else { + dataLen = sectionLen; + } + } + + if (want > dataLen) + want = dataLen; + + got = (handler)(stream, bytes, want, opaque); + if (got < 0) { + virStreamAbort(stream); + goto cleanup; + } + if (got == 0) + break; + while (offset < got) { + int done; + done = virStreamSend(stream, bytes + offset, got - offset); + if (done < 0) + goto cleanup; + offset += done; + dataLen -= done; + } + } + ret = 0; + + cleanup: + VIR_FREE(bytes); + + if (ret != 0) + virDispatchError(stream->conn); + + return ret; +}/** * virStreamRecvAll: * @stream: pointer to the stream object * @handler: sink callback for writing data to application diff --git a/src/libvirt_public.syms b/src/libvirt_public.syms index 008dc59..ea4ddd5 100644 --- a/src/libvirt_public.syms +++ b/src/libvirt_public.syms @@ -765,6 +765,7 @@ LIBVIRT_3.3.0 { virStreamRecvFlags; virStreamSkip; virStreamSparseRecvAll; + virStreamSparseSendAll; } LIBVIRT_3.1.0; # .... define new API here using predicted next version number .... -- 2.10.2

This is just an internal API, that calls corresponding function in stream driver. This function will set @data=1 if the underlying file is in data section, or @data=0 if it is in a hole. At any rate, @length is set to number of bytes remaining in the section the file currently is. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/driver-stream.h | 6 ++++++ src/libvirt-stream.c | 43 +++++++++++++++++++++++++++++++++++++++++++ src/libvirt_internal.h | 3 +++ src/libvirt_private.syms | 1 + 4 files changed, 53 insertions(+) diff --git a/src/driver-stream.h b/src/driver-stream.h index e196b6d..5d84b9a 100644 --- a/src/driver-stream.h +++ b/src/driver-stream.h @@ -50,6 +50,11 @@ typedef int unsigned long long *length); typedef int +(*virDrvStreamInData)(virStreamPtr st, + int *data, + unsigned long long *length); + +typedef int (*virDrvStreamEventAddCallback)(virStreamPtr stream, int events, virStreamEventCallback cb, @@ -78,6 +83,7 @@ struct _virStreamDriver { virDrvStreamRecvFlags streamRecvFlags; virDrvStreamSkip streamSkip; virDrvStreamHoleSize streamHoleSize; + virDrvStreamInData streamInData; virDrvStreamEventAddCallback streamEventAddCallback; virDrvStreamEventUpdateCallback streamEventUpdateCallback; virDrvStreamEventRemoveCallback streamEventRemoveCallback; diff --git a/src/libvirt-stream.c b/src/libvirt-stream.c index 3a9edcd..3a3e91e 100644 --- a/src/libvirt-stream.c +++ b/src/libvirt-stream.c @@ -473,6 +473,49 @@ virStreamHoleSize(virStreamPtr stream, /** + * virStreamInData: + * @stream: stream + * @data: are we in data or hole + * @length: length to next section + * + * This function checks the underlying stream (typically a file) + * to learn whether the current stream position lies within a + * data section or a hold. Upon return @data is set to a nonzero + * value if former is the case, or to zero if @stream is in a + * hole. Moreover, @length is updated to tell caller how many + * bytes can be read from @stream until current section changes + * (from data to a hole or vice versa). + * + * As a special case, there's an implicit hole at EOF. In this + * situation this function should set @data = false, @length = 0 + * and return 0. + * + * Returns 0 on success, + * -1 otherwise + */ +int +virStreamInData(virStreamPtr stream, + int *data, + unsigned long long *length) +{ + VIR_DEBUG("stream=%p, data=%p, length=%p", stream, data, length); + + /* No checks of arguments or error resetting. This is an + * internal function that just happen to live next to some + * public functions of ours. */ + + if (stream->driver->streamInData) { + int ret; + ret = (stream->driver->streamInData)(stream, data, length); + return ret; + } + + virReportUnsupportedError(); + return -1; +} + + +/** * virStreamSendAll: * @stream: pointer to the stream object * @handler: source callback for reading data from application diff --git a/src/libvirt_internal.h b/src/libvirt_internal.h index 96439d8..0e945aa 100644 --- a/src/libvirt_internal.h +++ b/src/libvirt_internal.h @@ -294,4 +294,7 @@ virTypedParameterValidateSet(virConnectPtr conn, virTypedParameterPtr params, int nparams); +int virStreamInData(virStreamPtr stream, + int *data, + unsigned long long *lengtht); #endif diff --git a/src/libvirt_private.syms b/src/libvirt_private.syms index 3ff4a8b..58eb643 100644 --- a/src/libvirt_private.syms +++ b/src/libvirt_private.syms @@ -1110,6 +1110,7 @@ virStateCleanup; virStateInitialize; virStateReload; virStateStop; +virStreamInData; # locking/domain_lock.h -- 2.10.2

This has no real added value right now, but is going to be very helpful later. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/remote/remote_driver.c | 6 ++++-- src/rpc/gendispatch.pl | 2 +- src/rpc/virnetclientstream.c | 6 +++++- src/rpc/virnetclientstream.h | 3 ++- 4 files changed, 12 insertions(+), 5 deletions(-) diff --git a/src/remote/remote_driver.c b/src/remote/remote_driver.c index 718e322..7024464 100644 --- a/src/remote/remote_driver.c +++ b/src/remote/remote_driver.c @@ -6170,7 +6170,8 @@ remoteDomainMigratePrepareTunnel3(virConnectPtr dconn, memset(&args, 0, sizeof(args)); memset(&ret, 0, sizeof(ret)); - if (!(netst = virNetClientStreamNew(priv->remoteProgram, + if (!(netst = virNetClientStreamNew(st, + priv->remoteProgram, REMOTE_PROC_DOMAIN_MIGRATE_PREPARE_TUNNEL3, priv->counter))) goto done; @@ -7094,7 +7095,8 @@ remoteDomainMigratePrepareTunnel3Params(virConnectPtr dconn, goto cleanup; } - if (!(netst = virNetClientStreamNew(priv->remoteProgram, + if (!(netst = virNetClientStreamNew(st, + priv->remoteProgram, REMOTE_PROC_DOMAIN_MIGRATE_PREPARE_TUNNEL3_PARAMS, priv->counter))) goto cleanup; diff --git a/src/rpc/gendispatch.pl b/src/rpc/gendispatch.pl index 173189c..e608812 100755 --- a/src/rpc/gendispatch.pl +++ b/src/rpc/gendispatch.pl @@ -1738,7 +1738,7 @@ elsif ($mode eq "client") { if ($call->{streamflag} ne "none") { print "\n"; - print " if (!(netst = virNetClientStreamNew(priv->remoteProgram, $call->{constname}, priv->counter)))\n"; + print " if (!(netst = virNetClientStreamNew(st, priv->remoteProgram, $call->{constname}, priv->counter)))\n"; print " goto done;\n"; print "\n"; print " if (virNetClientAddStream(priv->client, netst) < 0) {\n"; diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index 2105bd0..01761cf 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -36,6 +36,8 @@ VIR_LOG_INIT("rpc.netclientstream"); struct _virNetClientStream { virObjectLockable parent; + virStreamPtr stream; /* Reverse pointer to parent stream */ + virNetClientProgramPtr prog; int proc; unsigned serial; @@ -133,7 +135,8 @@ virNetClientStreamEventTimer(int timer ATTRIBUTE_UNUSED, void *opaque) } -virNetClientStreamPtr virNetClientStreamNew(virNetClientProgramPtr prog, +virNetClientStreamPtr virNetClientStreamNew(virStreamPtr stream, + virNetClientProgramPtr prog, int proc, unsigned serial) { @@ -145,6 +148,7 @@ virNetClientStreamPtr virNetClientStreamNew(virNetClientProgramPtr prog, if (!(st = virObjectLockableNew(virNetClientStreamClass))) return NULL; + st->stream = stream; st->prog = virObjectRef(prog); st->proc = proc; st->serial = serial; diff --git a/src/rpc/virnetclientstream.h b/src/rpc/virnetclientstream.h index a0d2be9..e278dab 100644 --- a/src/rpc/virnetclientstream.h +++ b/src/rpc/virnetclientstream.h @@ -32,7 +32,8 @@ typedef virNetClientStream *virNetClientStreamPtr; typedef void (*virNetClientStreamEventCallback)(virNetClientStreamPtr stream, int events, void *opaque); -virNetClientStreamPtr virNetClientStreamNew(virNetClientProgramPtr prog, +virNetClientStreamPtr virNetClientStreamNew(virStreamPtr stream, + virNetClientProgramPtr prog, int proc, unsigned serial); -- 2.10.2

Even though there's no way to make stream skippable right now, it is going to be soon. We need to track this info so that we don't send virStreamSkip to a client that did not want it or vice versa. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- daemon/remote.c | 2 +- daemon/stream.c | 6 +++++- daemon/stream.h | 3 ++- src/remote/remote_driver.c | 6 ++++-- src/rpc/gendispatch.pl | 4 ++-- src/rpc/virnetclientstream.c | 6 +++++- src/rpc/virnetclientstream.h | 3 ++- 7 files changed, 21 insertions(+), 9 deletions(-) diff --git a/daemon/remote.c b/daemon/remote.c index 1610fea..fbd5ba2 100644 --- a/daemon/remote.c +++ b/daemon/remote.c @@ -5373,7 +5373,7 @@ remoteDispatchDomainMigratePrepareTunnel3Params(virNetServerPtr server ATTRIBUTE if (!(st = virStreamNew(priv->conn, VIR_STREAM_NONBLOCK)) || !(stream = daemonCreateClientStream(client, st, remoteProgram, - &msg->header))) + &msg->header, false))) goto cleanup; if (virDomainMigratePrepareTunnel3Params(priv->conn, st, params, nparams, diff --git a/daemon/stream.c b/daemon/stream.c index 11c0a46..624a626 100644 --- a/daemon/stream.c +++ b/daemon/stream.c @@ -52,6 +52,8 @@ struct daemonClientStream { virNetMessagePtr rx; bool tx; + bool skippable; + daemonClientStreamPtr next; }; @@ -321,7 +323,8 @@ daemonClientStream * daemonCreateClientStream(virNetServerClientPtr client, virStreamPtr st, virNetServerProgramPtr prog, - virNetMessageHeaderPtr header) + virNetMessageHeaderPtr header, + bool skippable) { daemonClientStream *stream; daemonClientPrivatePtr priv = virNetServerClientGetPrivateData(client); @@ -339,6 +342,7 @@ daemonCreateClientStream(virNetServerClientPtr client, stream->serial = header->serial; stream->filterID = -1; stream->st = st; + stream->skippable = skippable; return stream; } diff --git a/daemon/stream.h b/daemon/stream.h index cf76e71..bf5dc24 100644 --- a/daemon/stream.h +++ b/daemon/stream.h @@ -30,7 +30,8 @@ daemonClientStream * daemonCreateClientStream(virNetServerClientPtr client, virStreamPtr st, virNetServerProgramPtr prog, - virNetMessageHeaderPtr hdr); + virNetMessageHeaderPtr hdr, + bool seekable); int daemonFreeClientStream(virNetServerClientPtr client, daemonClientStream *stream); diff --git a/src/remote/remote_driver.c b/src/remote/remote_driver.c index 7024464..6a2c6f6 100644 --- a/src/remote/remote_driver.c +++ b/src/remote/remote_driver.c @@ -6173,7 +6173,8 @@ remoteDomainMigratePrepareTunnel3(virConnectPtr dconn, if (!(netst = virNetClientStreamNew(st, priv->remoteProgram, REMOTE_PROC_DOMAIN_MIGRATE_PREPARE_TUNNEL3, - priv->counter))) + priv->counter, + false))) goto done; if (virNetClientAddStream(priv->client, netst) < 0) { @@ -7098,7 +7099,8 @@ remoteDomainMigratePrepareTunnel3Params(virConnectPtr dconn, if (!(netst = virNetClientStreamNew(st, priv->remoteProgram, REMOTE_PROC_DOMAIN_MIGRATE_PREPARE_TUNNEL3_PARAMS, - priv->counter))) + priv->counter, + false))) goto cleanup; if (virNetClientAddStream(priv->client, netst) < 0) { diff --git a/src/rpc/gendispatch.pl b/src/rpc/gendispatch.pl index e608812..9862598 100755 --- a/src/rpc/gendispatch.pl +++ b/src/rpc/gendispatch.pl @@ -1024,7 +1024,7 @@ elsif ($mode eq "server") { print " if (!(st = virStreamNew(priv->conn, VIR_STREAM_NONBLOCK)))\n"; print " goto cleanup;\n"; print "\n"; - print " if (!(stream = daemonCreateClientStream(client, st, remoteProgram, &msg->header)))\n"; + print " if (!(stream = daemonCreateClientStream(client, st, remoteProgram, &msg->header, false)))\n"; print " goto cleanup;\n"; print "\n"; } @@ -1738,7 +1738,7 @@ elsif ($mode eq "client") { if ($call->{streamflag} ne "none") { print "\n"; - print " if (!(netst = virNetClientStreamNew(st, priv->remoteProgram, $call->{constname}, priv->counter)))\n"; + print " if (!(netst = virNetClientStreamNew(st, priv->remoteProgram, $call->{constname}, priv->counter, false)))\n"; print " goto done;\n"; print "\n"; print " if (virNetClientAddStream(priv->client, netst) < 0) {\n"; diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index 01761cf..42619bf 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -54,6 +54,8 @@ struct _virNetClientStream { virNetMessagePtr rx; bool incomingEOF; + bool skippable; /* User requested skippable stream */ + virNetClientStreamEventCallback cb; void *cbOpaque; virFreeCallback cbFree; @@ -138,7 +140,8 @@ virNetClientStreamEventTimer(int timer ATTRIBUTE_UNUSED, void *opaque) virNetClientStreamPtr virNetClientStreamNew(virStreamPtr stream, virNetClientProgramPtr prog, int proc, - unsigned serial) + unsigned serial, + bool skippable) { virNetClientStreamPtr st; @@ -152,6 +155,7 @@ virNetClientStreamPtr virNetClientStreamNew(virStreamPtr stream, st->prog = virObjectRef(prog); st->proc = proc; st->serial = serial; + st->skippable = skippable; return st; } diff --git a/src/rpc/virnetclientstream.h b/src/rpc/virnetclientstream.h index e278dab..0a5aafd 100644 --- a/src/rpc/virnetclientstream.h +++ b/src/rpc/virnetclientstream.h @@ -35,7 +35,8 @@ typedef void (*virNetClientStreamEventCallback)(virNetClientStreamPtr stream, virNetClientStreamPtr virNetClientStreamNew(virStreamPtr stream, virNetClientProgramPtr prog, int proc, - unsigned serial); + unsigned serial, + bool seekable); bool virNetClientStreamRaiseError(virNetClientStreamPtr st); -- 2.10.2

This is going to be RPC representation for virStreamSkip. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/rpc/virnetprotocol.x | 4 ++++ src/virnetprotocol-structs | 3 +++ 2 files changed, 7 insertions(+) diff --git a/src/rpc/virnetprotocol.x b/src/rpc/virnetprotocol.x index 9ce33b0..3623588 100644 --- a/src/rpc/virnetprotocol.x +++ b/src/rpc/virnetprotocol.x @@ -236,3 +236,7 @@ struct virNetMessageError { int int2; virNetMessageNetwork net; /* unused */ }; + +struct virNetStreamSkip { + unsigned hyper length; +}; diff --git a/src/virnetprotocol-structs b/src/virnetprotocol-structs index af4526c..a8cc603 100644 --- a/src/virnetprotocol-structs +++ b/src/virnetprotocol-structs @@ -42,3 +42,6 @@ struct virNetMessageError { int int2; virNetMessageNetwork net; }; +struct virNetStreamSkip { + uint64_t length; +}; -- 2.10.2

This is a special type of stream packet, that is bidirectional and will contain information on how much bytes are both sides skipping in the stream. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- daemon/stream.c | 3 ++- src/rpc/virnetclient.c | 1 + src/rpc/virnetprotocol.x | 12 +++++++++++- src/virnetprotocol-structs | 1 + 4 files changed, 15 insertions(+), 2 deletions(-) diff --git a/daemon/stream.c b/daemon/stream.c index 624a626..6899c18 100644 --- a/daemon/stream.c +++ b/daemon/stream.c @@ -287,7 +287,8 @@ daemonStreamFilter(virNetServerClientPtr client ATTRIBUTE_UNUSED, virMutexLock(&stream->priv->lock); - if (msg->header.type != VIR_NET_STREAM) + if (msg->header.type != VIR_NET_STREAM && + msg->header.type != VIR_NET_STREAM_SKIP) goto cleanup; if (!virNetServerProgramMatches(stream->prog, msg)) diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c index 5174614..3563ef1 100644 --- a/src/rpc/virnetclient.c +++ b/src/rpc/virnetclient.c @@ -1284,6 +1284,7 @@ virNetClientCallDispatch(virNetClientPtr client) return virNetClientCallDispatchMessage(client); case VIR_NET_STREAM: /* Stream protocol */ + case VIR_NET_STREAM_SKIP: /* Stream skip protocol */ return virNetClientCallDispatchStream(client); default: diff --git a/src/rpc/virnetprotocol.x b/src/rpc/virnetprotocol.x index 3623588..3530694 100644 --- a/src/rpc/virnetprotocol.x +++ b/src/rpc/virnetprotocol.x @@ -143,6 +143,14 @@ const VIR_NET_MESSAGE_NUM_FDS_MAX = 32; * * status == VIR_NET_ERROR * remote_error Error information * + * - type == VIR_NET_STREAM_SKIP + * * status == VIR_NET_CONTINUE + * byte[] skip data + * * status == VIR_NET_ERROR + * remote_error error information + * * status == VIR_NET_OK + * <empty> + * */ enum virNetMessageType { /* client -> server. args from a method call */ @@ -156,7 +164,9 @@ enum virNetMessageType { /* client -> server. args from a method call, with passed FDs */ VIR_NET_CALL_WITH_FDS = 4, /* server -> client. reply/error from a method call, with passed FDs */ - VIR_NET_REPLY_WITH_FDS = 5 + VIR_NET_REPLY_WITH_FDS = 5, + /* either direction, stream skip data packet */ + VIR_NET_STREAM_SKIP = 6 }; enum virNetMessageStatus { diff --git a/src/virnetprotocol-structs b/src/virnetprotocol-structs index a8cc603..c31683f 100644 --- a/src/virnetprotocol-structs +++ b/src/virnetprotocol-structs @@ -6,6 +6,7 @@ enum virNetMessageType { VIR_NET_STREAM = 3, VIR_NET_CALL_WITH_FDS = 4, VIR_NET_REPLY_WITH_FDS = 5, + VIR_NET_STREAM_SKIP = 6, }; enum virNetMessageStatus { VIR_NET_OK = 0, -- 2.10.2

Ideally, this would be generated, but to achieve that corresponding XDR definitions needed to go into a different .x file. But they belong just to the one that they are right now. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- tools/wireshark/src/packet-libvirt.c | 48 ++++++++++++++++++++++++++++++++++++ tools/wireshark/src/packet-libvirt.h | 2 ++ 2 files changed, 50 insertions(+) diff --git a/tools/wireshark/src/packet-libvirt.c b/tools/wireshark/src/packet-libvirt.c index 260161e..be82a23 100644 --- a/tools/wireshark/src/packet-libvirt.c +++ b/tools/wireshark/src/packet-libvirt.c @@ -50,8 +50,11 @@ static int hf_libvirt_serial = -1; static int hf_libvirt_status = -1; static int hf_libvirt_stream = -1; static int hf_libvirt_num_of_fds = -1; +static int hf_libvirt_stream_skip_length = -1; +static int hf_libvirt_stream_skip = -1; int hf_libvirt_unknown = -1; static gint ett_libvirt = -1; +static gint ett_libvirt_stream_skip = -1; #define XDR_PRIMITIVE_DISSECTOR(xtype, ctype, ftype) \ static gboolean \ @@ -326,6 +329,36 @@ dissect_libvirt_payload_xdr_data(tvbuff_t *tvb, proto_tree *tree, gint payload_l dissect_libvirt_fds(tvb, start + payload_length, nfds); } +static gboolean +dissect_xdr_stream_skip(tvbuff_t *tvb, proto_tree *tree, XDR *xdrs, int hf) +{ + goffset start; + proto_item *ti; + + start = xdr_getpos(xdrs); + if (hf == -1) { + ti = proto_tree_add_item(tree, hf_libvirt_stream_skip, tvb, start, -1, ENC_NA); + } else { + header_field_info *hfinfo; + hfinfo = proto_registrar_get_nth(hf_libvirt_stream_skip); + ti = proto_tree_add_item(tree, hf, tvb, start, -1, ENC_NA); + proto_item_append_text(ti, " :: %s", hfinfo->name); + } + tree = proto_item_add_subtree(ti, ett_libvirt_stream_skip); + + hf = hf_libvirt_stream_skip_length; + if (!dissect_xdr_u_hyper(tvb, tree, xdrs, hf)) return FALSE; + proto_item_set_len(ti, xdr_getpos(xdrs) - start); + return TRUE; +} + + +static void +dissect_libvirt_stream_skip(tvbuff_t *tvb, proto_tree *tree, gint payload_length, guint32 status) +{ + proto_tree_add_item(tree, hf_libvirt_stream_skip_length, tvb, VIR_HEADER_LEN, -1, ENC_NA); +} + static void dissect_libvirt_payload(tvbuff_t *tvb, proto_tree *tree, guint32 prog, guint32 proc, guint32 type, guint32 status) @@ -346,6 +379,8 @@ dissect_libvirt_payload(tvbuff_t *tvb, proto_tree *tree, dissect_libvirt_payload_xdr_data(tvb, tree, payload_length, status, VIR_ERROR_MESSAGE_DISSECTOR); } else if (type == VIR_NET_STREAM) { /* implicitly, status == VIR_NET_CONTINUE */ dissect_libvirt_stream(tvb, tree, payload_length); + } else if (type == VIR_NET_STREAM_SKIP) { + dissect_libvirt_payload_xdr_data(tvb, tree, payload_length, status, dissect_xdr_stream_skip); } else { goto unknown; } @@ -525,6 +560,18 @@ proto_register_libvirt(void) NULL, 0x0, NULL, HFILL} }, + { &hf_libvirt_stream_skip, + { "stream_skip", "libvirt.stream_skip", + FT_BYTES, BASE_NONE, + NULL, 0x0, + NULL, HFILL} + }, + { &hf_libvirt_stream_skip_length, + { "length", "libvirt.stream_skip.length", + FT_UINT64, BASE_DEC, + NULL, 0x0, + NULL, HFILL} + }, { &hf_libvirt_unknown, { "unknown", "libvirt.unknown", FT_BYTES, BASE_NONE, @@ -535,6 +582,7 @@ proto_register_libvirt(void) static gint *ett[] = { VIR_DYNAMIC_ETTSET + &ett_libvirt_stream_skip, &ett_libvirt }; diff --git a/tools/wireshark/src/packet-libvirt.h b/tools/wireshark/src/packet-libvirt.h index 5f99fdf..006aa6d 100644 --- a/tools/wireshark/src/packet-libvirt.h +++ b/tools/wireshark/src/packet-libvirt.h @@ -53,6 +53,7 @@ enum vir_net_message_type { VIR_NET_STREAM = 3, VIR_NET_CALL_WITH_FDS = 4, VIR_NET_REPLY_WITH_FDS = 5, + VIR_NET_STREAM_SKIP = 6, }; enum vir_net_message_status { @@ -76,6 +77,7 @@ static const value_string type_strings[] = { { VIR_NET_STREAM, "STREAM" }, { VIR_NET_CALL_WITH_FDS, "CALL_WITH_FDS" }, { VIR_NET_REPLY_WITH_FDS, "REPLY_WITH_FDS" }, + { VIR_NET_STREAM_SKIP, "STREAM_SKIP" }, { -1, NULL } }; -- 2.10.2

This is just a helper function that takes in a length value, encodes it into XDR and sends to client. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/libvirt_remote.syms | 1 + src/rpc/virnetserverprogram.c | 33 +++++++++++++++++++++++++++++++++ src/rpc/virnetserverprogram.h | 7 +++++++ 3 files changed, 41 insertions(+) diff --git a/src/libvirt_remote.syms b/src/libvirt_remote.syms index ca1f3ac..29dceab 100644 --- a/src/libvirt_remote.syms +++ b/src/libvirt_remote.syms @@ -178,6 +178,7 @@ virNetServerProgramNew; virNetServerProgramSendReplyError; virNetServerProgramSendStreamData; virNetServerProgramSendStreamError; +virNetServerProgramSendStreamSkip; virNetServerProgramUnknownError; diff --git a/src/rpc/virnetserverprogram.c b/src/rpc/virnetserverprogram.c index d1597f4..6d84056 100644 --- a/src/rpc/virnetserverprogram.c +++ b/src/rpc/virnetserverprogram.c @@ -548,6 +548,39 @@ int virNetServerProgramSendStreamData(virNetServerProgramPtr prog, } +int virNetServerProgramSendStreamSkip(virNetServerProgramPtr prog, + virNetServerClientPtr client, + virNetMessagePtr msg, + int procedure, + unsigned int serial, + unsigned long long length) +{ + virNetStreamSkip data; + + VIR_DEBUG("client=%p msg=%p length=%llu", client, msg, length); + + memset(&data, 0, sizeof(data)); + data.length = length; + + msg->header.prog = prog->program; + msg->header.vers = prog->version; + msg->header.proc = procedure; + msg->header.type = VIR_NET_STREAM_SKIP; + msg->header.serial = serial; + msg->header.status = VIR_NET_CONTINUE; + + if (virNetMessageEncodeHeader(msg) < 0) + return -1; + + if (virNetMessageEncodePayload(msg, + (xdrproc_t) xdr_virNetStreamSkip, + &data) < 0) + return -1; + + return virNetServerClientSendMessage(client, msg); +} + + void virNetServerProgramDispose(void *obj ATTRIBUTE_UNUSED) { } diff --git a/src/rpc/virnetserverprogram.h b/src/rpc/virnetserverprogram.h index 531fca0..eba2168 100644 --- a/src/rpc/virnetserverprogram.h +++ b/src/rpc/virnetserverprogram.h @@ -104,4 +104,11 @@ int virNetServerProgramSendStreamData(virNetServerProgramPtr prog, const char *data, size_t len); +int virNetServerProgramSendStreamSkip(virNetServerProgramPtr prog, + virNetServerClientPtr client, + virNetMessagePtr msg, + int procedure, + unsigned int serial, + unsigned long long length); + #endif /* __VIR_NET_SERVER_PROGRAM_H__ */ -- 2.10.2

While the previous commit implemented a helper for sending a STREAM_SKIP packet for daemon, this is a client's counterpart. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/libvirt_remote.syms | 1 + src/rpc/virnetclientstream.c | 52 ++++++++++++++++++++++++++++++++++++++++++++ src/rpc/virnetclientstream.h | 4 ++++ 3 files changed, 57 insertions(+) diff --git a/src/libvirt_remote.syms b/src/libvirt_remote.syms index 29dceab..6093613 100644 --- a/src/libvirt_remote.syms +++ b/src/libvirt_remote.syms @@ -54,6 +54,7 @@ virNetClientStreamQueuePacket; virNetClientStreamRaiseError; virNetClientStreamRecvPacket; virNetClientStreamSendPacket; +virNetClientStreamSendSkip; virNetClientStreamSetError; diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index 42619bf..1e30080 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -429,6 +429,58 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, } +int +virNetClientStreamSendSkip(virNetClientStreamPtr st, + virNetClientPtr client, + unsigned long long length) +{ + virNetMessagePtr msg = NULL; + virNetStreamSkip data; + int ret = -1; + + VIR_DEBUG("st=%p length=%llu", st, length); + + if (!st->skippable) { + virReportError(VIR_ERR_OPERATION_INVALID, "%s", + _("Skipping is not supported with this stream")); + return -1; + } + + memset(&data, 0, sizeof(data)); + data.length = length; + + if (!(msg = virNetMessageNew(false))) + return -1; + + virObjectLock(st); + + msg->header.prog = virNetClientProgramGetProgram(st->prog); + msg->header.vers = virNetClientProgramGetVersion(st->prog); + msg->header.status = VIR_NET_CONTINUE; + msg->header.type = VIR_NET_STREAM_SKIP; + msg->header.serial = st->serial; + msg->header.proc = st->proc; + + virObjectUnlock(st); + + if (virNetMessageEncodeHeader(msg) < 0) + goto cleanup; + + if (virNetMessageEncodePayload(msg, + (xdrproc_t) xdr_virNetStreamSkip, + &data) < 0) + goto cleanup; + + if (virNetClientSendNoReply(client, msg) < 0) + goto cleanup; + + ret = 0; + cleanup: + virNetMessageFree(msg); + return ret; +} + + int virNetClientStreamEventAddCallback(virNetClientStreamPtr st, int events, virNetClientStreamEventCallback cb, diff --git a/src/rpc/virnetclientstream.h b/src/rpc/virnetclientstream.h index 0a5aafd..a648b7c 100644 --- a/src/rpc/virnetclientstream.h +++ b/src/rpc/virnetclientstream.h @@ -61,6 +61,10 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, size_t nbytes, bool nonblock); +int virNetClientStreamSendSkip(virNetClientStreamPtr st, + virNetClientPtr client, + unsigned long long length); + int virNetClientStreamEventAddCallback(virNetClientStreamPtr st, int events, virNetClientStreamEventCallback cb, -- 2.10.2

Basically, whenever the new type of stream packet arrives to the daemon call this function that decodes it and calls virStreamSkipCallback(). Otherwise a regular data stream packet has arrived and therefore continue its processing. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- daemon/stream.c | 81 +++++++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 70 insertions(+), 11 deletions(-) diff --git a/daemon/stream.c b/daemon/stream.c index 6899c18..84709da 100644 --- a/daemon/stream.c +++ b/daemon/stream.c @@ -29,6 +29,7 @@ #include "virlog.h" #include "virnetserverclient.h" #include "virerror.h" +#include "libvirt_internal.h" #define VIR_FROM_THIS VIR_FROM_STREAMS @@ -653,6 +654,52 @@ daemonStreamHandleAbort(virNetServerClientPtr client, } +static int +daemonStreamHandleSkip(virNetServerClientPtr client, + daemonClientStream *stream, + virNetMessagePtr msg) +{ + int ret; + virNetStreamSkip data; + + VIR_DEBUG("client=%p, stream=%p, proc=%d, serial=%u", + client, stream, msg->header.proc, msg->header.serial); + + /* Let's check if client plays nicely and advertised usage of + * sparse stream upfront. */ + if (!stream->skippable) { + virReportError(VIR_ERR_RPC, "%s", + _("Unexpected stream skip")); + return -1; + } + + if (virNetMessageDecodePayload(msg, + (xdrproc_t) xdr_virNetStreamSkip, + &data) < 0) + return -1; + + ret = virStreamSkip(stream->st, data.length); + + if (ret < 0) { + virNetMessageError rerr; + + memset(&rerr, 0, sizeof(rerr)); + + VIR_INFO("Stream skip failed"); + stream->closed = true; + virStreamEventRemoveCallback(stream->st); + virStreamAbort(stream->st); + + return virNetServerProgramSendReplyError(stream->prog, + client, + msg, + &rerr, + &msg->header); + } + + return 0; +} + /* * Called when the stream is signalled has being able to accept @@ -671,19 +718,31 @@ daemonStreamHandleWrite(virNetServerClientPtr client, virNetMessagePtr msg = stream->rx; int ret; - switch (msg->header.status) { - case VIR_NET_OK: - ret = daemonStreamHandleFinish(client, stream, msg); - break; + if (msg->header.type == VIR_NET_STREAM_SKIP) { + /* Handle special case when client sent us skip. + * Otherwise just carry on with processing stream + * data. */ + ret = daemonStreamHandleSkip(client, stream, msg); + } else if (msg->header.type == VIR_NET_STREAM) { + switch (msg->header.status) { + case VIR_NET_OK: + ret = daemonStreamHandleFinish(client, stream, msg); + break; - case VIR_NET_CONTINUE: - ret = daemonStreamHandleWriteData(client, stream, msg); - break; + case VIR_NET_CONTINUE: + ret = daemonStreamHandleWriteData(client, stream, msg); + break; - case VIR_NET_ERROR: - default: - ret = daemonStreamHandleAbort(client, stream, msg); - break; + case VIR_NET_ERROR: + default: + ret = daemonStreamHandleAbort(client, stream, msg); + break; + } + } else { + virReportError(VIR_ERR_RPC, + _("Unexpected message type: %d"), + msg->header.type); + ret = -1; } if (ret > 0) -- 2.10.2

This is a function that handles an incoming STREAM_SKIP packet. Even though it is not wired up yet, it will be soon. At the beginning do couple of checks whether server plays nicely and sent us a STREAM_SKIP packed only after we've enabled sparse streams. Then decodes the message payload to see how big the hole is and stores it in passed @length argument. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/rpc/virnetclientstream.c | 63 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 63 insertions(+) diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index 1e30080..027ffde 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -28,6 +28,7 @@ #include "virerror.h" #include "virlog.h" #include "virthread.h" +#include "libvirt_internal.h" #define VIR_FROM_THIS VIR_FROM_RPC @@ -55,6 +56,7 @@ struct _virNetClientStream { bool incomingEOF; bool skippable; /* User requested skippable stream */ + unsigned long long skipLength; /* Size of incoming hole in stream. */ virNetClientStreamEventCallback cb; void *cbOpaque; @@ -356,6 +358,67 @@ int virNetClientStreamSendPacket(virNetClientStreamPtr st, return -1; } + +static int ATTRIBUTE_UNUSED +virNetClientStreamHandleSkip(virNetClientPtr client, + virNetClientStreamPtr st) +{ + virNetMessagePtr msg; + virNetStreamSkip data; + int ret = -1; + + VIR_DEBUG("client=%p st=%p", client, st); + + msg = st->rx; + memset(&data, 0, sizeof(data)); + + /* We should not be called unless there's VIR_NET_STREAM_SKIP + * message at the head of the list. But doesn't hurt to check */ + if (!msg) { + virReportError(VIR_ERR_INTERNAL_ERROR, "%s", + _("No message in the queue")); + goto cleanup; + } + + if (msg->header.type != VIR_NET_STREAM_SKIP) { + virReportError(VIR_ERR_INTERNAL_ERROR, + _("Invalid message prog=%d type=%d serial=%u proc=%d"), + msg->header.prog, + msg->header.type, + msg->header.serial, + msg->header.proc); + goto cleanup; + } + + /* Server should not send us VIR_NET_STREAM_SKIP unless we + * have requested so. But does not hurt to check ... */ + if (!st->skippable) { + virReportError(VIR_ERR_RPC, "%s", + _("Unexpected stream skip")); + goto cleanup; + } + + if (virNetMessageDecodePayload(msg, + (xdrproc_t) xdr_virNetStreamSkip, + &data) < 0) { + virReportError(VIR_ERR_INTERNAL_ERROR, "%s", + _("Malformed stream skip packet")); + goto cleanup; + } + + virNetMessageQueueServe(&st->rx); + virNetMessageFree(msg); + st->skipLength += data.length; + + ret = 0; + cleanup: + if (ret < 0) { + /* Abort stream? */ + } + return ret; +} + + int virNetClientStreamRecvPacket(virNetClientStreamPtr st, virNetClientPtr client, char *data, -- 2.10.2

Now that we have RPC wrappers over VIR_NET_STREAM_SKIP we can start wiring them up. This commit wires up situation when a client wants to send a hole to daemon. To keep stream offsets synchronous, upon successful call on the daemon skip the same hole in local part of the stream. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/remote/remote_driver.c | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/src/remote/remote_driver.c b/src/remote/remote_driver.c index 6a2c6f6..6037e08 100644 --- a/src/remote/remote_driver.c +++ b/src/remote/remote_driver.c @@ -5683,6 +5683,34 @@ remoteStreamRecv(virStreamPtr st, return remoteStreamRecvFlags(st, data, nbytes, 0); } + +static int +remoteStreamSkip(virStreamPtr st, + unsigned long long length) +{ + VIR_DEBUG("st=%p length=%llu", st, length); + struct private_data *priv = st->conn->privateData; + virNetClientStreamPtr privst = st->privateData; + int rv; + + if (virNetClientStreamRaiseError(privst)) + return -1; + + remoteDriverLock(priv); + priv->localUses++; + remoteDriverUnlock(priv); + + rv = virNetClientStreamSendSkip(privst, + priv->client, + length); + + remoteDriverLock(priv); + priv->localUses--; + remoteDriverUnlock(priv); + return rv; +} + + struct remoteStreamCallbackData { virStreamPtr st; virStreamEventCallback cb; @@ -5857,6 +5885,7 @@ static virStreamDriver remoteStreamDrv = { .streamRecv = remoteStreamRecv, .streamRecvFlags = remoteStreamRecvFlags, .streamSend = remoteStreamSend, + .streamSkip = remoteStreamSkip, .streamFinish = remoteStreamFinish, .streamAbort = remoteStreamAbort, .streamEventAddCallback = remoteStreamEventAddCallback, -- 2.10.2

Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/remote/remote_driver.c | 3 ++- src/rpc/virnetclientstream.c | 10 +++++++--- src/rpc/virnetclientstream.h | 3 ++- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/src/remote/remote_driver.c b/src/remote/remote_driver.c index 6037e08..0512f14 100644 --- a/src/remote/remote_driver.c +++ b/src/remote/remote_driver.c @@ -5665,7 +5665,8 @@ remoteStreamRecvFlags(virStreamPtr st, priv->client, data, nbytes, - (st->flags & VIR_STREAM_NONBLOCK)); + (st->flags & VIR_STREAM_NONBLOCK), + flags); VIR_DEBUG("Done %d", rv); diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index 027ffde..f687bfa 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -423,13 +423,17 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, virNetClientPtr client, char *data, size_t nbytes, - bool nonblock) + bool nonblock, + unsigned int flags) { int rv = -1; size_t want; - VIR_DEBUG("st=%p client=%p data=%p nbytes=%zu nonblock=%d", - st, client, data, nbytes, nonblock); + VIR_DEBUG("st=%p client=%p data=%p nbytes=%zu nonblock=%d flags=%x", + st, client, data, nbytes, nonblock, flags); + + virCheckFlags(0, -1); + virObjectLock(st); if (!st->rx && !st->incomingEOF) { virNetMessagePtr msg; diff --git a/src/rpc/virnetclientstream.h b/src/rpc/virnetclientstream.h index a648b7c..2835066 100644 --- a/src/rpc/virnetclientstream.h +++ b/src/rpc/virnetclientstream.h @@ -59,7 +59,8 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, virNetClientPtr client, char *data, size_t nbytes, - bool nonblock); + bool nonblock, + unsigned int flags); int virNetClientStreamSendSkip(virNetClientStreamPtr st, virNetClientPtr client, -- 2.10.2

This function will fetch previously processed stream holes and return their sum. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/libvirt_remote.syms | 1 + src/rpc/virnetclientstream.c | 15 +++++++++++++++ src/rpc/virnetclientstream.h | 4 ++++ 3 files changed, 20 insertions(+) diff --git a/src/libvirt_remote.syms b/src/libvirt_remote.syms index 6093613..f546647 100644 --- a/src/libvirt_remote.syms +++ b/src/libvirt_remote.syms @@ -48,6 +48,7 @@ virNetClientStreamEOF; virNetClientStreamEventAddCallback; virNetClientStreamEventRemoveCallback; virNetClientStreamEventUpdateCallback; +virNetClientStreamHoleSize; virNetClientStreamMatches; virNetClientStreamNew; virNetClientStreamQueuePacket; diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index f687bfa..c773524 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -548,6 +548,21 @@ virNetClientStreamSendSkip(virNetClientStreamPtr st, } +int virNetClientStreamHoleSize(virNetClientPtr client ATTRIBUTE_UNUSED, + virNetClientStreamPtr st, + unsigned long long *length) +{ + int ret = st->skipLength; + + if (length) { + *length = st->skipLength; + st->skipLength = 0; + } + + return ret; +} + + int virNetClientStreamEventAddCallback(virNetClientStreamPtr st, int events, virNetClientStreamEventCallback cb, diff --git a/src/rpc/virnetclientstream.h b/src/rpc/virnetclientstream.h index 2835066..9caa091 100644 --- a/src/rpc/virnetclientstream.h +++ b/src/rpc/virnetclientstream.h @@ -66,6 +66,10 @@ int virNetClientStreamSendSkip(virNetClientStreamPtr st, virNetClientPtr client, unsigned long long length); +int virNetClientStreamHoleSize(virNetClientPtr client, + virNetClientStreamPtr st, + unsigned long long *length); + int virNetClientStreamEventAddCallback(virNetClientStreamPtr st, int events, virNetClientStreamEventCallback cb, -- 2.10.2

Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/remote/remote_driver.c | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/src/remote/remote_driver.c b/src/remote/remote_driver.c index 0512f14..376e9ba 100644 --- a/src/remote/remote_driver.c +++ b/src/remote/remote_driver.c @@ -5712,6 +5712,29 @@ remoteStreamSkip(virStreamPtr st, } +static int +remoteStreamHoleSize(virStreamPtr st, + unsigned long long *length) +{ + VIR_DEBUG("st=%p length=%p", st, length); + struct private_data *priv = st->conn->privateData; + virNetClientStreamPtr privst = st->privateData; + int rv; + + if (virNetClientStreamRaiseError(privst)) + return -1; + + remoteDriverLock(priv); + priv->localUses++; + remoteDriverUnlock(priv); + + rv = virNetClientStreamHoleSize(priv->client, privst, length); + + remoteDriverLock(priv); + priv->localUses--; + remoteDriverUnlock(priv); + return rv; +} struct remoteStreamCallbackData { virStreamPtr st; virStreamEventCallback cb; @@ -5887,6 +5910,7 @@ static virStreamDriver remoteStreamDrv = { .streamRecvFlags = remoteStreamRecvFlags, .streamSend = remoteStreamSend, .streamSkip = remoteStreamSkip, + .streamHoleSize = remoteStreamHoleSize, .streamFinish = remoteStreamFinish, .streamAbort = remoteStreamAbort, .streamEventAddCallback = remoteStreamEventAddCallback, -- 2.10.2

Whenever server sends a client stream packet (either regular with actual data or stream skip one) it is queued on @st->rx. So the list is a mixture of both types of stream packets. So now that we have all the helpers needed we can wire their processing up. But since virNetClientStreamRecvPacket doesn't support VIR_STREAM_RECV_STOP_AT_HOLE flag yet, let's turn all received skips into zeroes repeating requested times. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/rpc/virnetclientstream.c | 44 ++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 42 insertions(+), 2 deletions(-) diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index c773524..ff35137 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -296,6 +296,8 @@ int virNetClientStreamQueuePacket(virNetClientStreamPtr st, virObjectLock(st); + /* Don't distinguish VIR_NET_STREAM and VIR_NET_STREAM_SKIP + * here just yet. We want in order processing! */ virNetMessageQueuePush(&st->rx, tmp_msg); virNetClientStreamEventTimerUpdate(st); @@ -359,7 +361,7 @@ int virNetClientStreamSendPacket(virNetClientStreamPtr st, } -static int ATTRIBUTE_UNUSED +static int virNetClientStreamHandleSkip(virNetClientPtr client, virNetClientStreamPtr st) { @@ -435,6 +437,8 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, virCheckFlags(0, -1); virObjectLock(st); + + reread: if (!st->rx && !st->incomingEOF) { virNetMessagePtr msg; int ret; @@ -466,8 +470,44 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, } VIR_DEBUG("After IO rx=%p", st->rx); + + while (st->rx && + st->rx->header.type == VIR_NET_STREAM_SKIP) { + /* Handle skip sent to us by server. */ + + if (virNetClientStreamHandleSkip(client, st) < 0) + goto cleanup; + } + + if (!st->rx && !st->incomingEOF && !st->skipLength) { + if (nonblock) { + VIR_DEBUG("Non-blocking mode and no data available"); + rv = -2; + goto cleanup; + } + + /* We have consumed all packets from incoming queue but those + * were only skip packets, no data. Read the stream again. */ + goto reread; + } + want = nbytes; - while (want && st->rx) { + + if (st->skipLength) { + /* Pretend skipLength zeroes was read from stream. */ + size_t len = want; + + if (len > st->skipLength) + len = st->skipLength; + + memset(data, 0, len); + st->skipLength -= len; + want -= len; + } + + while (want && + st->rx && + st->rx->header.type == VIR_NET_STREAM) { virNetMessagePtr msg = st->rx; size_t len = want; -- 2.10.2

Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/remote/remote_driver.c | 2 +- src/rpc/virnetclientstream.c | 10 +++++++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/src/remote/remote_driver.c b/src/remote/remote_driver.c index 376e9ba..fd76811 100644 --- a/src/remote/remote_driver.c +++ b/src/remote/remote_driver.c @@ -5652,7 +5652,7 @@ remoteStreamRecvFlags(virStreamPtr st, virNetClientStreamPtr privst = st->privateData; int rv; - virCheckFlags(0, -1); + virCheckFlags(VIR_STREAM_RECV_STOP_AT_HOLE, -1); if (virNetClientStreamRaiseError(privst)) return -1; diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index ff35137..c668d64 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -434,7 +434,7 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, VIR_DEBUG("st=%p client=%p data=%p nbytes=%zu nonblock=%d flags=%x", st, client, data, nbytes, nonblock, flags); - virCheckFlags(0, -1); + virCheckFlags(VIR_STREAM_RECV_STOP_AT_HOLE, -1); virObjectLock(st); @@ -497,6 +497,14 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, /* Pretend skipLength zeroes was read from stream. */ size_t len = want; + /* Yes, pretend unless we are asked not to. */ + if (flags & VIR_STREAM_RECV_STOP_AT_HOLE) { + virReportError(VIR_ERR_INTERNAL_ERROR, "%s", + _("Stream is in a hole")); + rv = -3; + goto cleanup; + } + if (len > st->skipLength) len = st->skipLength; -- 2.10.2

Whenever client is able to receive some data from stream daemonStreamHandleRead is called. But now the behaviour of this function needs to be changed a bit. Previously it just read data from underlying file (of chardev or whatever) and sent those through the stream to client. This model will not work any longer because it does not differentiate whether underlying file is in data or hole section. Therefore, at the beginning of this function add code that checks this situation and acts accordingly. So after the this, when wanting to send some data we always check whether we are not in a hole and if so, skip it an inform client about its size. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- daemon/stream.c | 51 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/daemon/stream.c b/daemon/stream.c index 84709da..a21f1bf 100644 --- a/daemon/stream.c +++ b/daemon/stream.c @@ -797,6 +797,8 @@ daemonStreamHandleRead(virNetServerClientPtr client, size_t bufferLen = VIR_NET_MESSAGE_LEGACY_PAYLOAD_MAX; int ret = -1; int rv; + int inData = 0; + unsigned long long length = 0; VIR_DEBUG("client=%p, stream=%p tx=%d closed=%d", client, stream, stream->tx, stream->closed); @@ -821,6 +823,54 @@ daemonStreamHandleRead(virNetServerClientPtr client, if (!(msg = virNetMessageNew(false))) goto cleanup; + if (stream->skippable) { + /* Handle skip. We want to send some data to the client. But we might + * be in a hole. Seek to next data. But if we are in data already, just + * carry on. */ + + rv = virStreamInData(stream->st, &inData, &length); + VIR_DEBUG("rv=%d inData=%d length=%llu", rv, inData, length); + + if (rv < 0) { + if (virNetServerProgramSendStreamError(remoteProgram, + client, + msg, + &rerr, + stream->procedure, + stream->serial) < 0) + goto cleanup; + msg = NULL; + + /* We're done with this call */ + goto done; + } else { + if (!inData && length) { + stream->tx = false; + msg->cb = daemonStreamMessageFinished; + msg->opaque = stream; + stream->refs++; + if (virNetServerProgramSendStreamSkip(remoteProgram, + client, + msg, + stream->procedure, + stream->serial, + length) < 0) + goto cleanup; + + msg = NULL; + + /* We have successfully sent stream skip to the other side. + * To keep streams in sync seek locally too. */ + virStreamSkip(stream->st, length); + /* We're done with this call */ + goto done; + } + } + + if (length < bufferLen) + bufferLen = length; + } + rv = virStreamRecv(stream->st, buffer, bufferLen); if (rv == -2) { /* Should never get this, since we're only called when we know @@ -852,6 +902,7 @@ daemonStreamHandleRead(virNetServerClientPtr client, msg = NULL; } + done: ret = 0; cleanup: VIR_FREE(buffer); -- 2.10.2

While virStreamInData() should be a small and quick function, in our implementation it seeks multiple times. Moreover, it is called even if we know that we are in data. Well, quite. If we track its return values and do some basic math with them, we can end up calling virStreamInData right at the boundaries of a data section or a hole and nowhere else. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- daemon/stream.c | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/daemon/stream.c b/daemon/stream.c index a21f1bf..0901d82 100644 --- a/daemon/stream.c +++ b/daemon/stream.c @@ -54,6 +54,7 @@ struct daemonClientStream { bool tx; bool skippable; + size_t dataLen; /* How much data is there remaining until we see a hole */ daemonClientStreamPtr next; }; @@ -823,7 +824,7 @@ daemonStreamHandleRead(virNetServerClientPtr client, if (!(msg = virNetMessageNew(false))) goto cleanup; - if (stream->skippable) { + if (stream->skippable && !stream->dataLen) { /* Handle skip. We want to send some data to the client. But we might * be in a hole. Seek to next data. But if we are in data already, just * carry on. */ @@ -867,10 +868,13 @@ daemonStreamHandleRead(virNetServerClientPtr client, } } - if (length < bufferLen) - bufferLen = length; + stream->dataLen = length; } + if (stream->skippable && + bufferLen > stream->dataLen) + bufferLen = stream->dataLen; + rv = virStreamRecv(stream->st, buffer, bufferLen); if (rv == -2) { /* Should never get this, since we're only called when we know @@ -885,6 +889,8 @@ daemonStreamHandleRead(virNetServerClientPtr client, goto cleanup; msg = NULL; } else { + stream->dataLen -= rv; + stream->tx = false; if (rv == 0) stream->recvEOF = true; -- 2.10.2

Implement virStreamSkip and virStreamInData callbacks. These callbacks do no magic, just skip a hole or detect whether we are in a data section of a file or in a hole and how much bytes can we read until section changes. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/storage/storage_util.c | 4 +- src/util/virfdstream.c | 234 +++++++++++++++++++++++++++++++++++++++++---- src/util/virfdstream.h | 1 + 3 files changed, 216 insertions(+), 23 deletions(-) diff --git a/src/storage/storage_util.c b/src/storage/storage_util.c index a2d89af..3576435 100644 --- a/src/storage/storage_util.c +++ b/src/storage/storage_util.c @@ -2427,7 +2427,7 @@ virStorageBackendVolUploadLocal(virConnectPtr conn ATTRIBUTE_UNUSED, /* Not using O_CREAT because the file is required to already exist at * this point */ ret = virFDStreamOpenBlockDevice(stream, target_path, - offset, len, O_WRONLY); + offset, len, false, O_WRONLY); cleanup: VIR_FREE(path); @@ -2465,7 +2465,7 @@ virStorageBackendVolDownloadLocal(virConnectPtr conn ATTRIBUTE_UNUSED, } ret = virFDStreamOpenBlockDevice(stream, target_path, - offset, len, O_RDONLY); + offset, len, false, O_RDONLY); cleanup: VIR_FREE(path); diff --git a/src/util/virfdstream.c b/src/util/virfdstream.c index efd9199..e9b5962 100644 --- a/src/util/virfdstream.c +++ b/src/util/virfdstream.c @@ -51,6 +51,7 @@ VIR_LOG_INIT("fdstream"); typedef enum { VIR_FDSTREAM_MSG_TYPE_DATA, + VIR_FDSTREAM_MSG_TYPE_SKIP, } virFDStreamMsgType; typedef struct _virFDStreamMsg virFDStreamMsg; @@ -66,6 +67,9 @@ struct _virFDStreamMsg { size_t len; size_t offset; } data; + struct { + size_t len; + } skip; } stream; }; @@ -175,6 +179,9 @@ virFDStreamMsgFree(virFDStreamMsgPtr msg) case VIR_FDSTREAM_MSG_TYPE_DATA: VIR_FREE(msg->stream.data.buf); break; + case VIR_FDSTREAM_MSG_TYPE_SKIP: + /* nada */ + break; } VIR_FREE(msg); @@ -361,6 +368,7 @@ typedef virFDStreamThreadData *virFDStreamThreadDataPtr; struct _virFDStreamThreadData { virStreamPtr st; size_t length; + bool sparse; int fdin; char *fdinname; int fdout; @@ -383,32 +391,66 @@ virFDStreamThreadDataFree(virFDStreamThreadDataPtr data) static ssize_t virFDStreamThreadDoRead(virFDStreamDataPtr fdst, + bool sparse, const int fdin, const char *fdinname, + size_t *dataLen, size_t buflen) { virFDStreamMsgPtr msg = NULL; + int inData = 0; + unsigned long long sectionLen = 0; char *buf = NULL; ssize_t got; + if (sparse && *dataLen == 0) { + if (virFileInData(fdin, &inData, §ionLen) < 0) + goto error; + + if (inData) + *dataLen = sectionLen; + } + if (VIR_ALLOC(msg) < 0) goto error; - if (VIR_ALLOC_N(buf, buflen) < 0) - goto error; - - if ((got = saferead(fdin, buf, buflen)) < 0) { - virReportSystemError(errno, - _("Unable to read %s"), - fdinname); - goto error; + if (sparse && *dataLen == 0) { + msg->type = VIR_FDSTREAM_MSG_TYPE_SKIP; + msg->stream.skip.len = sectionLen; + got = sectionLen; + + /* HACK. The message queue is one directional. So caller + * cannot make us skip the hole. Do that for them instead. */ + if (sectionLen && + lseek(fdin, sectionLen, SEEK_CUR) == (off_t) -1) { + virReportSystemError(errno, + _("unable to seek in %s"), + fdinname); + goto error; + } + } else { + if (sparse && + buflen > *dataLen) + buflen = *dataLen; + + if (VIR_ALLOC_N(buf, buflen) < 0) + goto error; + + if ((got = saferead(fdin, buf, buflen)) < 0) { + virReportSystemError(errno, + _("Unable to read %s"), + fdinname); + goto error; + } + + msg->type = VIR_FDSTREAM_MSG_TYPE_DATA; + msg->stream.data.buf = buf; + msg->stream.data.len = got; + buf = NULL; + if (sparse) + *dataLen -= got; } - msg->type = VIR_FDSTREAM_MSG_TYPE_DATA; - msg->stream.data.buf = buf; - msg->stream.data.len = got; - buf = NULL; - virFDStreamMsgQueuePush(fdst, msg); msg = NULL; @@ -423,11 +465,13 @@ virFDStreamThreadDoRead(virFDStreamDataPtr fdst, static ssize_t virFDStreamThreadDoWrite(virFDStreamDataPtr fdst, + bool sparse, const int fdout, const char *fdoutname) { ssize_t got; virFDStreamMsgPtr msg = fdst->msg; + off_t off; bool pop = false; switch (msg->type) { @@ -446,6 +490,32 @@ virFDStreamThreadDoWrite(virFDStreamDataPtr fdst, pop = msg->stream.data.offset == msg->stream.data.len; break; + + case VIR_FDSTREAM_MSG_TYPE_SKIP: + if (!sparse) { + virReportError(VIR_ERR_INTERNAL_ERROR, "%s", + _("unexpected stream skip")); + return -1; + } + + got = msg->stream.skip.len; + off = lseek(fdout, got, SEEK_CUR); + if (off == (off_t) -1) { + virReportSystemError(errno, + _("unable to seek in %s"), + fdoutname); + return -1; + } + + if (ftruncate(fdout, off) < 0) { + virReportSystemError(errno, + _("unable to truncate %s"), + fdoutname); + return -1; + } + + pop = true; + break; } if (pop) { @@ -463,6 +533,7 @@ virFDStreamThread(void *opaque) virFDStreamThreadDataPtr data = opaque; virStreamPtr st = data->st; size_t length = data->length; + bool sparse = data->sparse; int fdin = data->fdin; char *fdinname = data->fdinname; int fdout = data->fdout; @@ -471,6 +542,7 @@ virFDStreamThread(void *opaque) bool doRead = fdst->threadDoRead; size_t buflen = 256 * 1024; size_t total = 0; + size_t dataLen = 0; virObjectRef(fdst); virObjectLock(fdst); @@ -505,9 +577,9 @@ virFDStreamThread(void *opaque) } if (doRead) - got = virFDStreamThreadDoRead(fdst, fdin, fdinname, buflen); + got = virFDStreamThreadDoRead(fdst, sparse, fdin, fdinname, &dataLen, buflen); else - got = virFDStreamThreadDoWrite(fdst, fdout, fdoutname); + got = virFDStreamThreadDoWrite(fdst, sparse, fdout, fdoutname); if (got < 0) goto error; @@ -773,6 +845,14 @@ static int virFDStreamRead(virStreamPtr st, char *bytes, size_t nbytes) } } + /* Shortcut, if the stream is in the trailing hole, + * return 0 immediately. */ + if (msg->type == VIR_FDSTREAM_MSG_TYPE_SKIP && + msg->stream.skip.len == 0) { + ret = 0; + goto cleanup; + } + if (msg->type != VIR_FDSTREAM_MSG_TYPE_DATA) { /* Nope, nope, I'm outta here */ virReportError(VIR_ERR_INTERNAL_ERROR, "%s", @@ -823,11 +903,120 @@ static int virFDStreamRead(virStreamPtr st, char *bytes, size_t nbytes) } +static int +virFDStreamSkip(virStreamPtr st, + unsigned long long length) +{ + virFDStreamDataPtr fdst = st->privateData; + virFDStreamMsgPtr msg = NULL; + off_t off; + int ret = -1; + + virObjectLock(fdst); + if (fdst->length) { + if (length > fdst->length - fdst->offset) + length = fdst->length - fdst->offset; + fdst->offset += length; + } + + if (fdst->thread) { + /* Things are a bit complicated here. But bear with me. If FDStream is + * in a read mode, then if the message at the queue head is SKIP, just + * pop it. The thread has lseek()-ed anyway. If however, the FDStream + * is in write mode, then tell the thread to do the lseek() for us. + * Under no circumstances we can do the lseek() ourselves here. We + * might mess up file position for the thread. */ + if (fdst->threadDoRead) { + msg = fdst->msg; + if (msg->type != VIR_FDSTREAM_MSG_TYPE_SKIP) { + virReportError(VIR_ERR_INTERNAL_ERROR, "%s", + _("Invalid stream skip")); + goto cleanup; + } + + virFDStreamMsgQueuePop(fdst); + } else { + if (VIR_ALLOC(msg) < 0) + goto cleanup; + + msg->type = VIR_FDSTREAM_MSG_TYPE_SKIP; + msg->stream.skip.len = length; + virFDStreamMsgQueuePush(fdst, msg); + msg = NULL; + } + } else { + off = lseek(fdst->fd, length, SEEK_CUR); + if (off == (off_t) -1) { + virReportSystemError(errno, "%s", + _("unable to seek")); + goto cleanup; + } + + if (ftruncate(fdst->fd, off) < 0) { + virReportSystemError(errno, "%s", + _("unable to truncate")); + goto cleanup; + } + } + + ret = 0; + cleanup: + virObjectUnlock(fdst); + virFDStreamMsgFree(msg); + return ret; +} + + +static int +virFDStreamInData(virStreamPtr st, + int *inData, + unsigned long long *length) +{ + virFDStreamDataPtr fdst = st->privateData; + int ret = -1; + + virObjectLock(fdst); + + if (fdst->thread) { + virFDStreamMsgPtr msg; + + while (!(msg = fdst->msg)) { + if (fdst->threadQuit) { + *inData = *length = 0; + ret = 0; + goto cleanup; + } else { + virObjectUnlock(fdst); + virCondSignal(&fdst->threadCond); + virObjectLock(fdst); + } + } + + if (msg->type == VIR_FDSTREAM_MSG_TYPE_DATA) { + *inData = 1; + *length = msg->stream.data.len - msg->stream.data.offset; + } else { + *inData = 0; + *length = msg->stream.skip.len; + } + ret = 0; + } else { + ret = virFileInData(fdst->fd, inData, length); + } + + cleanup: + virObjectUnlock(fdst); + return ret; +} + + static virStreamDriver virFDStreamDrv = { .streamSend = virFDStreamWrite, .streamRecv = virFDStreamRead, .streamFinish = virFDStreamClose, .streamAbort = virFDStreamAbort, + .streamSkip = virFDStreamSkip, + .streamInData = virFDStreamInData, .streamEventAddCallback = virFDStreamAddCallback, .streamEventUpdateCallback = virFDStreamUpdateCallback, .streamEventRemoveCallback = virFDStreamRemoveCallback @@ -969,7 +1158,8 @@ virFDStreamOpenFileInternal(virStreamPtr st, unsigned long long length, int oflags, int mode, - bool forceIOHelper) + bool forceIOHelper, + bool sparse) { int fd = -1; struct stat sb; @@ -1026,6 +1216,7 @@ virFDStreamOpenFileInternal(virStreamPtr st, threadData->st = virObjectRef(st); threadData->length = length; + threadData->sparse = sparse; if ((oflags & O_ACCMODE) == O_RDONLY) { threadData->fdin = fd; @@ -1067,7 +1258,7 @@ int virFDStreamOpenFile(virStreamPtr st, } return virFDStreamOpenFileInternal(st, path, offset, length, - oflags, 0, false); + oflags, 0, false, false); } int virFDStreamCreateFile(virStreamPtr st, @@ -1080,7 +1271,7 @@ int virFDStreamCreateFile(virStreamPtr st, return virFDStreamOpenFileInternal(st, path, offset, length, oflags | O_CREAT, mode, - false); + false, false); } #ifdef HAVE_CFMAKERAW @@ -1096,7 +1287,7 @@ int virFDStreamOpenPTY(virStreamPtr st, if (virFDStreamOpenFileInternal(st, path, offset, length, oflags | O_CREAT, 0, - false) < 0) + false, false) < 0) return -1; fdst = st->privateData; @@ -1133,7 +1324,7 @@ int virFDStreamOpenPTY(virStreamPtr st, return virFDStreamOpenFileInternal(st, path, offset, length, oflags | O_CREAT, 0, - false); + false, false); } #endif /* !HAVE_CFMAKERAW */ @@ -1141,11 +1332,12 @@ int virFDStreamOpenBlockDevice(virStreamPtr st, const char *path, unsigned long long offset, unsigned long long length, + bool sparse, int oflags) { return virFDStreamOpenFileInternal(st, path, offset, length, - oflags, 0, true); + oflags, 0, true, sparse); } int virFDStreamSetInternalCloseCb(virStreamPtr st, diff --git a/src/util/virfdstream.h b/src/util/virfdstream.h index 34c4c3f..887c991 100644 --- a/src/util/virfdstream.h +++ b/src/util/virfdstream.h @@ -59,6 +59,7 @@ int virFDStreamOpenBlockDevice(virStreamPtr st, const char *path, unsigned long long offset, unsigned long long length, + bool sparse, int oflags); int virFDStreamSetInternalCloseCb(virStreamPtr st, -- 2.10.2

Now, not all APIs are going to support sparse streams. To some it makes no sense at all, e.g. virDomainOpenConsole() or virDomainOpenChannel(). To others, we will need a special flag to indicate that client wants to enable sparse streams. Instead of having to write RPC dispatchers by hand we can just annotate in our .x files that a certain flag to certain RPC call enables this feature. For instance: /** * @generate: both * @readstream: 1 * @sparseflag: VIR_SPARSE_STREAM * @acl: storage_vol:data_read */ REMOTE_PROC_DOMAIN_SOME_API = XXX, Therefore, whenever client calls virDomainSomeAPI(.., VIR_SPARSE_STREAM); daemon will mark that down and send stream skips when possible. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/rpc/gendispatch.pl | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/src/rpc/gendispatch.pl b/src/rpc/gendispatch.pl index 9862598..def88d4 100755 --- a/src/rpc/gendispatch.pl +++ b/src/rpc/gendispatch.pl @@ -281,6 +281,13 @@ while (<PROTOCOL>) { $calls{$name}->{streamflag} = "none"; } + if (exists $opts{sparseflag}) { + die "\@sparseflag requires stream" unless $calls{$name}->{streamflag} ne "none"; + $calls{$name}->{sparseflag} = $opts{sparseflag}; + } else { + $calls{$name}->{sparseflag} = "none"; + } + $calls{$name}->{acl} = $opts{acl}; $calls{$name}->{aclfilter} = $opts{aclfilter}; @@ -982,6 +989,11 @@ elsif ($mode eq "server") { if ($call->{streamflag} ne "none") { print " virStreamPtr st = NULL;\n"; print " daemonClientStreamPtr stream = NULL;\n"; + if ($call->{sparseflag} ne "none") { + print " const bool sparse = args->flags & $call->{sparseflag};\n" + } else { + print " const bool sparse = false;\n"; + } } print "\n"; @@ -1024,7 +1036,7 @@ elsif ($mode eq "server") { print " if (!(st = virStreamNew(priv->conn, VIR_STREAM_NONBLOCK)))\n"; print " goto cleanup;\n"; print "\n"; - print " if (!(stream = daemonCreateClientStream(client, st, remoteProgram, &msg->header, false)))\n"; + print " if (!(stream = daemonCreateClientStream(client, st, remoteProgram, &msg->header, sparse)))\n"; print " goto cleanup;\n"; print "\n"; } @@ -1727,6 +1739,11 @@ elsif ($mode eq "client") { if ($call->{streamflag} ne "none") { print " virNetClientStreamPtr netst = NULL;\n"; + if ($call->{sparseflag} ne "none") { + print " const bool sparse = flags & $call->{sparseflag};\n" + } else { + print " const bool sparse = false;\n"; + } } print "\n"; @@ -1738,7 +1755,7 @@ elsif ($mode eq "client") { if ($call->{streamflag} ne "none") { print "\n"; - print " if (!(netst = virNetClientStreamNew(st, priv->remoteProgram, $call->{constname}, priv->counter, false)))\n"; + print " if (!(netst = virNetClientStreamNew(st, priv->remoteProgram, $call->{constname}, priv->counter, sparse)))\n"; print " goto done;\n"; print "\n"; print " if (virNetClientAddStream(priv->client, netst) < 0) {\n"; -- 2.10.2

These flags to APIs will tell if caller wants to use sparse stream for storage transfer. At the same time, it's safe to enable them in storage driver frontend and rely on our backends checking the flags. This way we can enable specific flags only on some specific backends, e.g. enable VIR_STORAGE_VOL_DOWNLOAD_SPARSE_STREAM for filesystem backend but not iSCSI backend. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- include/libvirt/libvirt-storage.h | 9 +++++++++ src/libvirt-storage.c | 4 ++-- src/remote/remote_protocol.x | 2 ++ src/storage/storage_driver.c | 4 ++-- src/storage/storage_util.c | 10 ++++++---- 5 files changed, 21 insertions(+), 8 deletions(-) diff --git a/include/libvirt/libvirt-storage.h b/include/libvirt/libvirt-storage.h index 45ec720..4517f71 100644 --- a/include/libvirt/libvirt-storage.h +++ b/include/libvirt/libvirt-storage.h @@ -346,11 +346,20 @@ virStorageVolPtr virStorageVolCreateXMLFrom (virStoragePoolPtr pool, const char *xmldesc, virStorageVolPtr clonevol, unsigned int flags); + +typedef enum { + VIR_STORAGE_VOL_DOWNLOAD_SPARSE_STREAM = 1 << 0, /* Use sparse stream */ +} virStorageVolDownloadFlags; + int virStorageVolDownload (virStorageVolPtr vol, virStreamPtr stream, unsigned long long offset, unsigned long long length, unsigned int flags); +typedef enum { + VIR_STORAGE_VOL_UPLOAD_SPARSE_STREAM = 1 << 0, /* Use sparse stream */ +} virStorageVolUploadFlags; + int virStorageVolUpload (virStorageVolPtr vol, virStreamPtr stream, unsigned long long offset, diff --git a/src/libvirt-storage.c b/src/libvirt-storage.c index 05eec8a..6420299 100644 --- a/src/libvirt-storage.c +++ b/src/libvirt-storage.c @@ -1549,7 +1549,7 @@ virStorageVolCreateXMLFrom(virStoragePoolPtr pool, * @stream: stream to use as output * @offset: position in @vol to start reading from * @length: limit on amount of data to download - * @flags: extra flags; not used yet, so callers should always pass 0 + * @flags: bitwise-OR of virStorageVolDownloadFlags * * Download the content of the volume as a stream. If @length * is zero, then the remaining contents of the volume after @@ -1613,7 +1613,7 @@ virStorageVolDownload(virStorageVolPtr vol, * @stream: stream to use as input * @offset: position to start writing to * @length: limit on amount of data to upload - * @flags: extra flags; not used yet, so callers should always pass 0 + * @flags: bitwise-OR of virStorageVolUploadFlags * * Upload new content to the volume from a stream. This call * will fail if @offset + @length exceeds the size of the diff --git a/src/remote/remote_protocol.x b/src/remote/remote_protocol.x index 87b2bd3..25e62a1 100644 --- a/src/remote/remote_protocol.x +++ b/src/remote/remote_protocol.x @@ -4896,6 +4896,7 @@ enum remote_procedure { /** * @generate: both * @writestream: 1 + * @sparseflag: VIR_STORAGE_VOL_UPLOAD_SPARSE_STREAM * @acl: storage_vol:data_write */ REMOTE_PROC_STORAGE_VOL_UPLOAD = 208, @@ -4903,6 +4904,7 @@ enum remote_procedure { /** * @generate: both * @readstream: 1 + * @sparseflag: VIR_STORAGE_VOL_DOWNLOAD_SPARSE_STREAM * @acl: storage_vol:data_read */ REMOTE_PROC_STORAGE_VOL_DOWNLOAD = 209, diff --git a/src/storage/storage_driver.c b/src/storage/storage_driver.c index 2103ed1..1b0d776 100644 --- a/src/storage/storage_driver.c +++ b/src/storage/storage_driver.c @@ -2117,7 +2117,7 @@ storageVolDownload(virStorageVolPtr obj, virStorageVolDefPtr vol = NULL; int ret = -1; - virCheckFlags(0, -1); + virCheckFlags(VIR_STORAGE_VOL_DOWNLOAD_SPARSE_STREAM, -1); if (!(vol = virStorageVolDefFromVol(obj, &pool, &backend))) return -1; @@ -2285,7 +2285,7 @@ storageVolUpload(virStorageVolPtr obj, virStorageVolStreamInfoPtr cbdata = NULL; int ret = -1; - virCheckFlags(0, -1); + virCheckFlags(VIR_STORAGE_VOL_UPLOAD_SPARSE_STREAM, -1); if (!(vol = virStorageVolDefFromVol(obj, &pool, &backend))) return -1; diff --git a/src/storage/storage_util.c b/src/storage/storage_util.c index 3576435..b393795 100644 --- a/src/storage/storage_util.c +++ b/src/storage/storage_util.c @@ -2401,8 +2401,9 @@ virStorageBackendVolUploadLocal(virConnectPtr conn ATTRIBUTE_UNUSED, char *target_path = vol->target.path; int ret = -1; int has_snap = 0; + bool sparse = flags & VIR_STORAGE_VOL_UPLOAD_SPARSE_STREAM; - virCheckFlags(0, -1); + virCheckFlags(VIR_STORAGE_VOL_UPLOAD_SPARSE_STREAM, -1); /* if volume has target format VIR_STORAGE_FILE_PLOOP * we need to restore DiskDescriptor.xml, according to * new contents of volume. This operation will be perfomed @@ -2427,7 +2428,7 @@ virStorageBackendVolUploadLocal(virConnectPtr conn ATTRIBUTE_UNUSED, /* Not using O_CREAT because the file is required to already exist at * this point */ ret = virFDStreamOpenBlockDevice(stream, target_path, - offset, len, false, O_WRONLY); + offset, len, sparse, O_WRONLY); cleanup: VIR_FREE(path); @@ -2447,8 +2448,9 @@ virStorageBackendVolDownloadLocal(virConnectPtr conn ATTRIBUTE_UNUSED, char *target_path = vol->target.path; int ret = -1; int has_snap = 0; + bool sparse = flags & VIR_STORAGE_VOL_DOWNLOAD_SPARSE_STREAM; - virCheckFlags(0, -1); + virCheckFlags(VIR_STORAGE_VOL_DOWNLOAD_SPARSE_STREAM, -1); if (vol->target.format == VIR_STORAGE_FILE_PLOOP) { has_snap = storageBackendPloopHasSnapshots(vol->target.path); if (has_snap < 0) { @@ -2465,7 +2467,7 @@ virStorageBackendVolDownloadLocal(virConnectPtr conn ATTRIBUTE_UNUSED, } ret = virFDStreamOpenBlockDevice(stream, target_path, - offset, len, false, O_RDONLY); + offset, len, sparse, O_RDONLY); cleanup: VIR_FREE(path); -- 2.10.2

The command grew new --sparse switch that does nothing more than enables the sparse streams feature for this command. Among with the switch new helper function is introduced: virshStreamSkip(). This is the callback that is called whenever daemon sends us a hole. In the callback we reflect the hole in underlying file by seeking as many bytes as told. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- tools/virsh-util.c | 18 ++++++++++++++++++ tools/virsh-util.h | 5 +++++ tools/virsh-volume.c | 12 ++++++++++-- tools/virsh.pod | 3 ++- 4 files changed, 35 insertions(+), 3 deletions(-) diff --git a/tools/virsh-util.c b/tools/virsh-util.c index 4b86e29..4119d7c 100644 --- a/tools/virsh-util.c +++ b/tools/virsh-util.c @@ -153,6 +153,24 @@ virshStreamSink(virStreamPtr st ATTRIBUTE_UNUSED, } +int +virshStreamSkip(virStreamPtr st ATTRIBUTE_UNUSED, + unsigned long long offset, + void *opaque) +{ + int *fd = opaque; + off_t cur; + + if ((cur = lseek(*fd, offset, SEEK_CUR)) == (off_t) -1) + return -1; + + if (ftruncate(*fd, cur) < 0) + return -1; + + return 0; +} + + void virshDomainFree(virDomainPtr dom) { diff --git a/tools/virsh-util.h b/tools/virsh-util.h index 64cef23..756546a 100644 --- a/tools/virsh-util.h +++ b/tools/virsh-util.h @@ -58,6 +58,11 @@ virshStreamSink(virStreamPtr st, void *opaque); int +virshStreamSkip(virStreamPtr st, + unsigned long long offset, + void *opaque); + +int virshDomainGetXMLFromDom(vshControl *ctl, virDomainPtr dom, unsigned int flags, diff --git a/tools/virsh-volume.c b/tools/virsh-volume.c index 66fe70e..3d19b74 100644 --- a/tools/virsh-volume.c +++ b/tools/virsh-volume.c @@ -763,6 +763,10 @@ static const vshCmdOptDef opts_vol_download[] = { .type = VSH_OT_INT, .help = N_("amount of data to download") }, + {.name = "sparse", + .type = VSH_OT_BOOL, + .help = N_("preserve sparseness of volume") + }, {.name = NULL} }; @@ -778,6 +782,7 @@ cmdVolDownload(vshControl *ctl, const vshCmd *cmd) unsigned long long offset = 0, length = 0; bool created = false; virshControlPtr priv = ctl->privData; + unsigned int flags = 0; if (vshCommandOptULongLong(ctl, cmd, "offset", &offset) < 0) return false; @@ -791,6 +796,9 @@ cmdVolDownload(vshControl *ctl, const vshCmd *cmd) if (vshCommandOptStringReq(ctl, cmd, "file", &file) < 0) goto cleanup; + if (vshCommandOptBool(cmd, "sparse")) + flags |= VIR_STORAGE_VOL_DOWNLOAD_SPARSE_STREAM; + if ((fd = open(file, O_WRONLY|O_CREAT|O_EXCL, 0666)) < 0) { if (errno != EEXIST || (fd = open(file, O_WRONLY|O_TRUNC, 0666)) < 0) { @@ -806,12 +814,12 @@ cmdVolDownload(vshControl *ctl, const vshCmd *cmd) goto cleanup; } - if (virStorageVolDownload(vol, st, offset, length, 0) < 0) { + if (virStorageVolDownload(vol, st, offset, length, flags) < 0) { vshError(ctl, _("cannot download from volume %s"), name); goto cleanup; } - if (virStreamRecvAll(st, virshStreamSink, &fd) < 0) { + if (virStreamSparseRecvAll(st, virshStreamSink, virshStreamSkip, &fd) < 0) { vshError(ctl, _("cannot receive data from volume %s"), name); goto cleanup; } diff --git a/tools/virsh.pod b/tools/virsh.pod index e16f62f..ce87c19 100644 --- a/tools/virsh.pod +++ b/tools/virsh.pod @@ -3916,12 +3916,13 @@ regarding possible target volume and pool changes as a result of the pool refresh when the upload is attempted. =item B<vol-download> [I<--pool> I<pool-or-uuid>] [I<--offset> I<bytes>] -[I<--length> I<bytes>] I<vol-name-or-key-or-path> I<local-file> +[I<--length> I<bytes>] [I<--sparse>] I<vol-name-or-key-or-path> I<local-file> Download the contents of a storage volume to I<local-file>. I<--pool> I<pool-or-uuid> is the name or UUID of the storage pool the volume is in. I<vol-name-or-key-or-path> is the name or key or path of the volume to download. +If I<--sparse> is specified, this command will preserve volume sparseness. I<--offset> is the position in the storage volume at which to start reading the data. The value must be 0 or larger. I<--length> is an upper bound of the amount of data to be downloaded. A negative value is interpreted as -- 2.10.2

Similarly to previous commit, implement sparse streams feature for vol-upload. This is, however, slightly different approach, because we must implement a function that will tell us whether we are in a data section or in a hole. But there's no magic hidden in here. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- tools/virsh-util.c | 31 +++++++++++++++++++++++++++++++ tools/virsh-util.h | 19 +++++++++++++++++++ tools/virsh-volume.c | 37 ++++++++++++++++++++++++------------- tools/virsh.pod | 3 ++- 4 files changed, 76 insertions(+), 14 deletions(-) diff --git a/tools/virsh-util.c b/tools/virsh-util.c index 4119d7c..b590b8f 100644 --- a/tools/virsh-util.c +++ b/tools/virsh-util.c @@ -154,6 +154,19 @@ virshStreamSink(virStreamPtr st ATTRIBUTE_UNUSED, int +virshStreamSource(virStreamPtr st ATTRIBUTE_UNUSED, + char *bytes, + size_t nbytes, + void *opaque) +{ + virshStreamCallbackDataPtr cbData = opaque; + int fd = cbData->fd; + + return saferead(fd, bytes, nbytes); +} + + +int virshStreamSkip(virStreamPtr st ATTRIBUTE_UNUSED, unsigned long long offset, void *opaque) @@ -171,6 +184,24 @@ virshStreamSkip(virStreamPtr st ATTRIBUTE_UNUSED, } +int +virshStreamInData(virStreamPtr st ATTRIBUTE_UNUSED, + int *inData, + unsigned long long *offset, + void *opaque) +{ + virshStreamCallbackDataPtr cbData = opaque; + vshControl *ctl = cbData->ctl; + int fd = cbData->fd; + int ret; + + if ((ret = virFileInData(fd, inData, offset)) < 0) + vshError(ctl, "%s", _("Unable to get current position in stream")); + + return ret; +} + + void virshDomainFree(virDomainPtr dom) { diff --git a/tools/virsh-util.h b/tools/virsh-util.h index 756546a..69ff143 100644 --- a/tools/virsh-util.h +++ b/tools/virsh-util.h @@ -57,12 +57,31 @@ virshStreamSink(virStreamPtr st, size_t nbytes, void *opaque); +typedef struct _virshStreamCallbackData virshStreamCallbackData; +typedef virshStreamCallbackData *virshStreamCallbackDataPtr; +struct _virshStreamCallbackData { + vshControl *ctl; + int fd; +}; + +int +virshStreamSource(virStreamPtr st, + char *bytes, + size_t nbytes, + void *opaque); + int virshStreamSkip(virStreamPtr st, unsigned long long offset, void *opaque); int +virshStreamInData(virStreamPtr st, + int *inData, + unsigned long long *offset, + void *opaque); + +int virshDomainGetXMLFromDom(vshControl *ctl, virDomainPtr dom, unsigned int flags, diff --git a/tools/virsh-volume.c b/tools/virsh-volume.c index 3d19b74..a2be0b9 100644 --- a/tools/virsh-volume.c +++ b/tools/virsh-volume.c @@ -660,18 +660,13 @@ static const vshCmdOptDef opts_vol_upload[] = { .type = VSH_OT_INT, .help = N_("amount of data to upload") }, + {.name = "sparse", + .type = VSH_OT_BOOL, + .help = N_("preserve sparseness of volume") + }, {.name = NULL} }; -static int -cmdVolUploadSource(virStreamPtr st ATTRIBUTE_UNUSED, - char *bytes, size_t nbytes, void *opaque) -{ - int *fd = opaque; - - return saferead(*fd, bytes, nbytes); -} - static bool cmdVolUpload(vshControl *ctl, const vshCmd *cmd) { @@ -683,6 +678,8 @@ cmdVolUpload(vshControl *ctl, const vshCmd *cmd) const char *name = NULL; unsigned long long offset = 0, length = 0; virshControlPtr priv = ctl->privData; + unsigned int flags = 0; + virshStreamCallbackData cbData; if (vshCommandOptULongLong(ctl, cmd, "offset", &offset) < 0) return false; @@ -701,19 +698,33 @@ cmdVolUpload(vshControl *ctl, const vshCmd *cmd) goto cleanup; } + cbData.ctl = ctl; + cbData.fd = fd; + + if (vshCommandOptBool(cmd, "sparse")) + flags |= VIR_STORAGE_VOL_UPLOAD_SPARSE_STREAM; + if (!(st = virStreamNew(priv->conn, 0))) { vshError(ctl, _("cannot create a new stream")); goto cleanup; } - if (virStorageVolUpload(vol, st, offset, length, 0) < 0) { + if (virStorageVolUpload(vol, st, offset, length, flags) < 0) { vshError(ctl, _("cannot upload to volume %s"), name); goto cleanup; } - if (virStreamSendAll(st, cmdVolUploadSource, &fd) < 0) { - vshError(ctl, _("cannot send data to volume %s"), name); - goto cleanup; + if (flags & VIR_STORAGE_VOL_UPLOAD_SPARSE_STREAM) { + if (virStreamSparseSendAll(st, virshStreamSource, + virshStreamInData, &cbData) < 0) { + vshError(ctl, _("cannot send data to volume %s"), name); + goto cleanup; + } + } else { + if (virStreamSendAll(st, virshStreamSource, &cbData) < 0) { + vshError(ctl, _("cannot send data to volume %s"), name); + goto cleanup; + } } if (VIR_CLOSE(fd) < 0) { diff --git a/tools/virsh.pod b/tools/virsh.pod index ce87c19..82ae843 100644 --- a/tools/virsh.pod +++ b/tools/virsh.pod @@ -3898,13 +3898,14 @@ the storage volume should be deleted as well. Not all storage drivers support this option, presently only rbd. =item B<vol-upload> [I<--pool> I<pool-or-uuid>] [I<--offset> I<bytes>] -[I<--length> I<bytes>] I<vol-name-or-key-or-path> I<local-file> +[I<--length> I<bytes>] [I<--sparse>] I<vol-name-or-key-or-path> I<local-file> Upload the contents of I<local-file> to a storage volume. I<--pool> I<pool-or-uuid> is the name or UUID of the storage pool the volume is in. I<vol-name-or-key-or-path> is the name or key or path of the volume where the file will be uploaded. +If I<--sparse> is specified, this command will preserve volume sparseness. I<--offset> is the position in the storage volume at which to start writing the data. The value must be 0 or larger. I<--length> is an upper bound of the amount of data to be uploaded. A negative value is interpreted -- 2.10.2
participants (2)
-
Daniel P. Berrange
-
Michal Privoznik