[libvirt] [PATCH RFC 00/48] Sparse streams

So, after some time spent working on this, I think I finally have something that works. Finally, we have a formatted messages between iohelper and the daemon. BUT, it there's a corner case because where this does not work 100%. If there's a hole at EOF, it is either not transferred to the other side or not fully seeked. Therefore, hole at EOF is not in the copy. I guess it is some silly mistake somewhere. Anyway, any comments appreciated. v2 can be found here: https://www.redhat.com/archives/libvir-list/2016-May/msg01691.html Michal Privoznik (48): util: Introduce virFileInData src: Move iohelper out from utils/ to a separate dir fdstream: s/struct virFDStreamData */virFDStreamDataPtr/ virFDStreamData: Turn into virObjectLockable Introduce virStreamRecvFlags Implement virStreamRecvFlags to some drivers Introduce virStreamSkip Introduce virStreamHoleSize Introduce VIR_STREAM_RECV_STOP_AT_HOLE flag Introduce virStreamSparseRecvAll Introduce virStreamSparseSendAll Introduce virStreamInData virNetClientStreamNew: Track origin stream Track if stream is skippable RPC: Introduce virNetStreamSkip Introduce VIR_NET_STREAM_SKIP message type Teach wireshark plugin about VIR_NET_STREAM_SKIP daemon: Introduce virNetServerProgramSendStreamSkip virnetclientstream: Introduce virNetClientStreamSendSkip daemon: Implement VIR_NET_STREAM_SKIP handling virnetclientstream: Introduce virNetClientStreamHandleSkip remote_driver: Implement virStreamSkip virNetClientStreamRecvPacket: Introduce @flags argument Introduce virNetClientStreamHoleSize remote: Implement virStreamHoleSize virNetClientStream: Wire up VIR_NET_STREAM_SKIP remote_driver: Implement VIR_STREAM_RECV_STOP_AT_HOLE daemonStreamHandleRead: Wire up seekable stream fdstream: Implement seek gendispatch: Introduce @sparseflag for our calls Introduce virStorageVol{Download,Upload}Flags virsh: Implement sparse stream to vol-download virsh: Implement sparse stream to vol-upload daemon: Don't call virStreamInData so often storage: Enable sparse streams for virStorageVol{Download,Upload} fdstream: Track formatted message fdstream: Handle formatted messages separately iohelper: Introduce iohelper_message iohelper_message: Introduce API stubs iohelper_message: Implement formatted read iohelper_message: Implement formatted write tests: Introduce iohelpermessagetest iohelpermessagetest: test non-blocking read & write iohelper_message: Add support for sparse streams iohelper: Move runIO to runIOBasic iohelper: Introduce setupFDs iohelper: Teach command line 'sparse' argument iohelper: Wire up formatted messages daemon/remote.c | 2 +- daemon/stream.c | 152 +++++++++++- daemon/stream.h | 3 +- include/libvirt/libvirt-storage.h | 9 + include/libvirt/libvirt-stream.h | 86 ++++++- po/POTFILES.in | 2 +- src/Makefile.am | 33 ++- src/driver-stream.h | 23 ++ src/esx/esx_stream.c | 16 +- src/fdstream.c | 267 +++++++++++++++----- src/fdstream.h | 3 +- src/{util => iohelper}/iohelper.c | 234 +++++++++++++++--- src/iohelper/iohelper_message.c | 429 ++++++++++++++++++++++++++++++++ src/iohelper/iohelper_message.h | 53 ++++ src/libvirt-storage.c | 4 +- src/libvirt-stream.c | 455 ++++++++++++++++++++++++++++++++++ src/libvirt_internal.h | 3 + src/libvirt_private.syms | 2 + src/libvirt_public.syms | 8 +- src/libvirt_remote.syms | 3 + src/remote/remote_driver.c | 89 ++++++- src/remote/remote_protocol.x | 2 + src/rpc/gendispatch.pl | 21 +- src/rpc/virnetclient.c | 1 + src/rpc/virnetclientstream.c | 203 ++++++++++++++- src/rpc/virnetclientstream.h | 17 +- src/rpc/virnetprotocol.x | 16 +- src/rpc/virnetserverprogram.c | 33 +++ src/rpc/virnetserverprogram.h | 7 + src/storage/storage_backend.c | 12 +- src/storage/storage_driver.c | 4 +- src/util/virfile.c | 70 ++++++ src/util/virfile.h | 4 + src/virnetprotocol-structs | 4 + tests/Makefile.am | 10 +- tests/iohelpermessagetest.c | 466 +++++++++++++++++++++++++++++++++++ tools/virsh-volume.c | 49 ++-- tools/virsh.c | 36 +++ tools/virsh.h | 17 ++ tools/virsh.pod | 6 +- tools/wireshark/src/packet-libvirt.c | 40 +++ tools/wireshark/src/packet-libvirt.h | 2 + 42 files changed, 2716 insertions(+), 180 deletions(-) rename src/{util => iohelper}/iohelper.c (58%) create mode 100644 src/iohelper/iohelper_message.c create mode 100644 src/iohelper/iohelper_message.h create mode 100644 tests/iohelpermessagetest.c -- 2.8.4

This function takes a FD and determines whether the current position is in data section or in a hole. In addition to that, it also determines how much bytes are there remaining till the current section ends. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/libvirt_private.syms | 1 + src/util/virfile.c | 70 ++++++++++++++++++++++++++++++++++++++++++++++++ src/util/virfile.h | 4 +++ 3 files changed, 75 insertions(+) diff --git a/src/libvirt_private.syms b/src/libvirt_private.syms index 501c23e..f476eae 100644 --- a/src/libvirt_private.syms +++ b/src/libvirt_private.syms @@ -1513,6 +1513,7 @@ virFileGetHugepageSize; virFileGetMountReverseSubtree; virFileGetMountSubtree; virFileHasSuffix; +virFileInData; virFileIsAbsPath; virFileIsDir; virFileIsExecutable; diff --git a/src/util/virfile.c b/src/util/virfile.c index f47bf39..05b709a 100644 --- a/src/util/virfile.c +++ b/src/util/virfile.c @@ -3443,3 +3443,73 @@ int virFileIsSharedFS(const char *path) VIR_FILE_SHFS_SMB | VIR_FILE_SHFS_CIFS); } + + +int virFileInData(int fd, + int *inData, + unsigned long long *length) +{ + int ret = -1; + off_t cur, data, hole; + + /* Get current position */ + cur = lseek(fd, 0, SEEK_CUR); + if (cur == (off_t) -1) { + virReportSystemError(errno, "%s", + _("Unable to get current position in file")); + goto cleanup; + } + + /* Now try to get data and hole offsets */ + data = lseek(fd, cur, SEEK_DATA); + + /* There are four options: + * 1) data == cur; @cur is in data + * 2) data > cur; @cur is in a hole, next data at @data + * 3) data < 0, errno = ENXIO; either @cur is in trailing hole, or @cur is beyond EOF. + * 4) data < 0, errno != ENXIO; we learned nothing + */ + + if (data == (off_t) -1) { + /* cases 3 and 4 */ + if (errno != ENXIO) { + virReportSystemError(errno, "%s", + _("Unable to seek to data")); + goto cleanup; + } + *inData = 0; + *length = 0; + } else if (data > cur) { + /* case 2 */ + *inData = 0; + *length = data - cur; + } else { + /* case 1 */ + *inData = 1; + + /* We don't know where does the next hole start. Let's + * find out. Here we get the same 4 possibilities as + * described above.*/ + hole = lseek(fd, data, SEEK_HOLE); + if (hole == (off_t) -1 || hole == data) { + /* cases 1, 3 and 4 */ + /* Wait a second. The reason why we are here is + * because we are in data. But at the same time we + * are in a trailing hole? Wut!? Do the best what we + * can do here. */ + virReportSystemError(errno, "%s", + _("unable to seek to hole")); + goto cleanup; + } else { + /* case 2 */ + *length = (hole - data); + } + } + + ret = 0; + cleanup: + /* At any rate, reposition back to where we started. */ + if (cur != (off_t) -1) + ignore_value(lseek(fd, cur, SEEK_SET)); + return ret; +} diff --git a/src/util/virfile.h b/src/util/virfile.h index dae234e..5222b31 100644 --- a/src/util/virfile.h +++ b/src/util/virfile.h @@ -298,4 +298,8 @@ int virFileGetHugepageSize(const char *path, unsigned long long *size); int virFileFindHugeTLBFS(virHugeTLBFSPtr *ret_fs, size_t *ret_nfs); + +int virFileInData(int fd, + int *inData, + unsigned long long *length); #endif /* __VIR_FILE_H */ -- 2.8.4

On Wed, Jun 22, 2016 at 04:43:18PM +0200, Michal Privoznik wrote:
This function takes a FD and determines whether the current position is in data section or in a hole. In addition to that, it also determines how much bytes are there remaining till the current section ends.
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/libvirt_private.syms | 1 + src/util/virfile.c | 70 ++++++++++++++++++++++++++++++++++++++++++++++++ src/util/virfile.h | 4 +++ 3 files changed, 75 insertions(+)
diff --git a/src/libvirt_private.syms b/src/libvirt_private.syms index 501c23e..f476eae 100644 --- a/src/libvirt_private.syms +++ b/src/libvirt_private.syms @@ -1513,6 +1513,7 @@ virFileGetHugepageSize; virFileGetMountReverseSubtree; virFileGetMountSubtree; virFileHasSuffix; +virFileInData; virFileIsAbsPath; virFileIsDir; virFileIsExecutable; diff --git a/src/util/virfile.c b/src/util/virfile.c index f47bf39..05b709a 100644 --- a/src/util/virfile.c +++ b/src/util/virfile.c @@ -3443,3 +3443,73 @@ int virFileIsSharedFS(const char *path) VIR_FILE_SHFS_SMB | VIR_FILE_SHFS_CIFS); } + + +int virFileInData(int fd, + int *inData, + unsigned long long *length) +{ + int ret = -1; + off_t cur, data, hole; + + /* Get current position */ + cur = lseek(fd, 0, SEEK_CUR); + if (cur == (off_t) -1) { + virReportSystemError(errno, "%s", + _("Unable to get current position in file")); + goto cleanup; + } + + /* Now try to get data and hole offsets */ + data = lseek(fd, cur, SEEK_DATA); + + /* There are four options: + * 1) data == cur; @cur is in data + * 2) data > cur; @cur is in a hole, next data at @data + * 3) data < 0, errno = ENXIO; either @cur is in trailing hole, or @cur is beyond EOF. + * 4) data < 0, errno != ENXIO; we learned nothing + */ + + if (data == (off_t) -1) { + /* cases 3 and 4 */ + if (errno != ENXIO) { + virReportSystemError(errno, "%s", + _("Unable to seek to data")); + goto cleanup; + } + *inData = 0; + *length = 0; + } else if (data > cur) { + /* case 2 */ + *inData = 0; + *length = data - cur; + } else { + /* case 1 */ + *inData = 1; + + /* We don't know where does the next hole start. Let's + * find out. Here we get the same 4 possibilities as + * described above.*/ + hole = lseek(fd, data, SEEK_HOLE); + if (hole == (off_t) -1 || hole == data) { + /* cases 1, 3 and 4 */ + /* Wait a second. The reason why we are here is + * because we are in data. But at the same time we + * are in a trailing hole? Wut!? Do the best what we + * can do here. */ + virReportSystemError(errno, "%s", + _("unable to seek to hole")); + goto cleanup; + } else { + /* case 2 */ + *length = (hole - data); + } + } + + ret = 0; + cleanup: + /* At any rate, reposition back to where we started. */ + if (cur != (off_t) -1) + ignore_value(lseek(fd, cur, SEEK_SET));
Is it really safe to ignore the value here ? IIUC, callers of this function would be justified in thinking it would *not* have a side effect on file position. IOW, I think we'd probably want to treat this error as fatal too. I think it'd be desirable to have a unit test written explicitly for this function, since there's a few fun edge cases to worry about here. Regards, Daniel -- |: http://berrange.com -o- http://www.flickr.com/photos/dberrange/ :| |: http://libvirt.org -o- http://virt-manager.org :| |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :| |: http://entangle-photo.org -o- http://live.gnome.org/gtk-vnc :|

This helper binary will grow in the future (e.g. it will gain a so library which will be shared with daemon too) therefore its sources should live somewhere else than utils/. I've chosen src/iohelper. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- po/POTFILES.in | 2 +- src/Makefile.am | 6 +++--- src/{util => iohelper}/iohelper.c | 0 3 files changed, 4 insertions(+), 4 deletions(-) rename src/{util => iohelper}/iohelper.c (100%) diff --git a/po/POTFILES.in b/po/POTFILES.in index 822cfbc..9f4866c 100644 --- a/po/POTFILES.in +++ b/po/POTFILES.in @@ -63,6 +63,7 @@ src/hyperv/hyperv_wmi.c src/interface/interface_backend_netcf.c src/interface/interface_backend_udev.c src/internal.h +src/iohelper/iohelper.c src/libvirt-admin.c src/libvirt-domain-snapshot.c src/libvirt-domain.c @@ -176,7 +177,6 @@ src/storage/storage_driver.c src/test/test_driver.c src/uml/uml_conf.c src/uml/uml_driver.c -src/util/iohelper.c src/util/viralloc.c src/util/viraudit.c src/util/virauth.c diff --git a/src/Makefile.am b/src/Makefile.am index a14cb3f..3409631 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -991,8 +991,8 @@ STORAGE_DRIVER_ZFS_SOURCES = \ STORAGE_HELPER_DISK_SOURCES = \ storage/parthelper.c -UTIL_IO_HELPER_SOURCES = \ - util/iohelper.c +IO_HELPER_SOURCES = \ + iohelper/iohelper.c NETWORK_LEASES_HELPER_SOURCES = \ network/leaseshelper.c @@ -2843,7 +2843,7 @@ libexec_PROGRAMS = if WITH_LIBVIRTD libexec_PROGRAMS += libvirt_iohelper -libvirt_iohelper_SOURCES = $(UTIL_IO_HELPER_SOURCES) +libvirt_iohelper_SOURCES = $(IO_HELPER_SOURCES) libvirt_iohelper_LDFLAGS = \ $(AM_LDFLAGS) \ $(PIE_LDFLAGS) \ diff --git a/src/util/iohelper.c b/src/iohelper/iohelper.c similarity index 100% rename from src/util/iohelper.c rename to src/iohelper/iohelper.c -- 2.8.4

On Wed, Jun 22, 2016 at 04:43:19PM +0200, Michal Privoznik wrote:
This helper binary will grow in the future (e.g. it will gain a so library which will be shared with daemon too) therefore its sources should live somewhere else than utils/. I've chosen src/iohelper.
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- po/POTFILES.in | 2 +- src/Makefile.am | 6 +++--- src/{util => iohelper}/iohelper.c | 0 3 files changed, 4 insertions(+), 4 deletions(-) rename src/{util => iohelper}/iohelper.c (100%)
ACK Regards, Daniel -- |: http://berrange.com -o- http://www.flickr.com/photos/dberrange/ :| |: http://libvirt.org -o- http://virt-manager.org :| |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :| |: http://entangle-photo.org -o- http://live.gnome.org/gtk-vnc :|

There is really no reason why we should have to have 'struct' everywhere. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/fdstream.c | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/src/fdstream.c b/src/fdstream.c index 3e92577..aedda74 100644 --- a/src/fdstream.c +++ b/src/fdstream.c @@ -50,6 +50,8 @@ VIR_LOG_INIT("fdstream"); /* Tunnelled migration stream support */ +typedef struct virFDStreamData virFDStreamData; +typedef virFDStreamData *virFDStreamDataPtr; struct virFDStreamData { int fd; int errfd; @@ -82,7 +84,7 @@ struct virFDStreamData { static int virFDStreamRemoveCallback(virStreamPtr stream) { - struct virFDStreamData *fdst = stream->privateData; + virFDStreamDataPtr fdst = stream->privateData; int ret = -1; if (!fdst) { @@ -119,7 +121,7 @@ static int virFDStreamRemoveCallback(virStreamPtr stream) static int virFDStreamUpdateCallback(virStreamPtr stream, int events) { - struct virFDStreamData *fdst = stream->privateData; + virFDStreamDataPtr fdst = stream->privateData; int ret = -1; if (!fdst) { @@ -151,7 +153,7 @@ static void virFDStreamEvent(int watch ATTRIBUTE_UNUSED, void *opaque) { virStreamPtr stream = opaque; - struct virFDStreamData *fdst = stream->privateData; + virFDStreamDataPtr fdst = stream->privateData; virStreamEventCallback cb; void *cbopaque; virFreeCallback ff; @@ -200,7 +202,7 @@ virFDStreamAddCallback(virStreamPtr st, void *opaque, virFreeCallback ff) { - struct virFDStreamData *fdst = st->privateData; + virFDStreamDataPtr fdst = st->privateData; int ret = -1; if (!fdst) { @@ -242,7 +244,7 @@ virFDStreamAddCallback(virStreamPtr st, } static int -virFDStreamCloseCommand(struct virFDStreamData *fdst, bool streamAbort) +virFDStreamCloseCommand(virFDStreamDataPtr fdst, bool streamAbort) { char buf[1024]; ssize_t len; @@ -295,7 +297,7 @@ virFDStreamCloseCommand(struct virFDStreamData *fdst, bool streamAbort) static int virFDStreamCloseInt(virStreamPtr st, bool streamAbort) { - struct virFDStreamData *fdst; + virFDStreamDataPtr fdst; virStreamEventCallback cb; void *opaque; int ret; @@ -378,7 +380,7 @@ virFDStreamAbort(virStreamPtr st) static int virFDStreamWrite(virStreamPtr st, const char *bytes, size_t nbytes) { - struct virFDStreamData *fdst = st->privateData; + virFDStreamDataPtr fdst = st->privateData; int ret; if (nbytes > INT_MAX) { @@ -432,7 +434,7 @@ static int virFDStreamWrite(virStreamPtr st, const char *bytes, size_t nbytes) static int virFDStreamRead(virStreamPtr st, char *bytes, size_t nbytes) { - struct virFDStreamData *fdst = st->privateData; + virFDStreamDataPtr fdst = st->privateData; int ret; if (nbytes > INT_MAX) { @@ -498,7 +500,7 @@ static int virFDStreamOpenInternal(virStreamPtr st, int errfd, unsigned long long length) { - struct virFDStreamData *fdst; + virFDStreamDataPtr fdst; VIR_DEBUG("st=%p fd=%d cmd=%p errfd=%d length=%llu", st, fd, cmd, errfd, length); @@ -754,7 +756,7 @@ int virFDStreamOpenPTY(virStreamPtr st, unsigned long long length, int oflags) { - struct virFDStreamData *fdst = NULL; + virFDStreamDataPtr fdst = NULL; struct termios rawattr; if (virFDStreamOpenFileInternal(st, path, @@ -817,7 +819,7 @@ int virFDStreamSetInternalCloseCb(virStreamPtr st, void *opaque, virFDStreamInternalCloseCbFreeOpaque fcb) { - struct virFDStreamData *fdst = st->privateData; + virFDStreamDataPtr fdst = st->privateData; virMutexLock(&fdst->lock); -- 2.8.4

On Wed, Jun 22, 2016 at 04:43:20PM +0200, Michal Privoznik wrote:
There is really no reason why we should have to have 'struct' everywhere.
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/fdstream.c | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-)
ACK Regards, Daniel -- |: http://berrange.com -o- http://www.flickr.com/photos/dberrange/ :| |: http://libvirt.org -o- http://virt-manager.org :| |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :| |: http://entangle-photo.org -o- http://live.gnome.org/gtk-vnc :|

While this is no functional change, it makes the code look a bit nicer. Moreover, it prepares ground for future work. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/fdstream.c | 97 ++++++++++++++++++++++++++++++++++------------------------ 1 file changed, 57 insertions(+), 40 deletions(-) diff --git a/src/fdstream.c b/src/fdstream.c index aedda74..bebeac3 100644 --- a/src/fdstream.c +++ b/src/fdstream.c @@ -53,6 +53,8 @@ VIR_LOG_INIT("fdstream"); typedef struct virFDStreamData virFDStreamData; typedef virFDStreamData *virFDStreamDataPtr; struct virFDStreamData { + virObjectLockable parent; + int fd; int errfd; virCommandPtr cmd; @@ -77,10 +79,31 @@ struct virFDStreamData { virFDStreamInternalCloseCb icbCb; virFDStreamInternalCloseCbFreeOpaque icbFreeOpaque; void *icbOpaque; - - virMutex lock; }; +static virClassPtr virFDStreamDataClass; + +static void +virFDStreamDataDispose(void *obj) +{ + virFDStreamDataPtr fdst = obj; + + VIR_DEBUG("obj=%p", fdst); +} + +static int virFDStreamDataOnceInit(void) +{ + if (!(virFDStreamDataClass = virClassNew(virClassForObjectLockable(), + "virFDStreamData", + sizeof(virFDStreamData), + virFDStreamDataDispose))) + return -1; + + return 0; +} + +VIR_ONCE_GLOBAL_INIT(virFDStreamData) + static int virFDStreamRemoveCallback(virStreamPtr stream) { @@ -93,7 +116,7 @@ static int virFDStreamRemoveCallback(virStreamPtr stream) return -1; } - virMutexLock(&fdst->lock); + virObjectLock(fdst); if (fdst->watch == 0) { virReportError(VIR_ERR_INTERNAL_ERROR, "%s", _("stream does not have a callback registered")); @@ -115,7 +138,7 @@ static int virFDStreamRemoveCallback(virStreamPtr stream) ret = 0; cleanup: - virMutexUnlock(&fdst->lock); + virObjectUnlock(fdst); return ret; } @@ -130,7 +153,7 @@ static int virFDStreamUpdateCallback(virStreamPtr stream, int events) return -1; } - virMutexLock(&fdst->lock); + virObjectLock(fdst); if (fdst->watch == 0) { virReportError(VIR_ERR_INTERNAL_ERROR, "%s", _("stream does not have a callback registered")); @@ -143,7 +166,7 @@ static int virFDStreamUpdateCallback(virStreamPtr stream, int events) ret = 0; cleanup: - virMutexUnlock(&fdst->lock); + virObjectUnlock(fdst); return ret; } @@ -162,9 +185,9 @@ static void virFDStreamEvent(int watch ATTRIBUTE_UNUSED, if (!fdst) return; - virMutexLock(&fdst->lock); + virObjectLock(fdst); if (!fdst->cb) { - virMutexUnlock(&fdst->lock); + virObjectUnlock(fdst); return; } @@ -172,21 +195,19 @@ static void virFDStreamEvent(int watch ATTRIBUTE_UNUSED, cbopaque = fdst->opaque; ff = fdst->ff; fdst->dispatching = true; - virMutexUnlock(&fdst->lock); + virObjectUnlock(fdst); cb(stream, events, cbopaque); - virMutexLock(&fdst->lock); + virObjectLock(fdst); fdst->dispatching = false; if (fdst->cbRemoved && ff) (ff)(cbopaque); closed = fdst->closed; - virMutexUnlock(&fdst->lock); + virObjectUnlock(fdst); - if (closed) { - virMutexDestroy(&fdst->lock); - VIR_FREE(fdst); - } + if (closed) + virObjectUnref(fdst); } static void virFDStreamCallbackFree(void *opaque) @@ -211,7 +232,7 @@ virFDStreamAddCallback(virStreamPtr st, return -1; } - virMutexLock(&fdst->lock); + virObjectLock(fdst); if (fdst->watch != 0) { virReportError(VIR_ERR_INTERNAL_ERROR, "%s", _("stream already has a callback registered")); @@ -239,7 +260,7 @@ virFDStreamAddCallback(virStreamPtr st, ret = 0; cleanup: - virMutexUnlock(&fdst->lock); + virObjectUnlock(fdst); return ret; } @@ -307,7 +328,7 @@ virFDStreamCloseInt(virStreamPtr st, bool streamAbort) if (!st || !(fdst = st->privateData) || fdst->abortCallbackDispatching) return 0; - virMutexLock(&fdst->lock); + virObjectLock(fdst); /* aborting the stream, ensure the callback is called if it's * registered for stream error event */ @@ -317,7 +338,7 @@ virFDStreamCloseInt(virStreamPtr st, bool streamAbort) VIR_STREAM_EVENT_WRITABLE))) { /* don't enter this function accidentally from the callback again */ if (fdst->abortCallbackCalled) { - virMutexUnlock(&fdst->lock); + virObjectUnlock(fdst); return 0; } @@ -327,12 +348,12 @@ virFDStreamCloseInt(virStreamPtr st, bool streamAbort) /* cache the pointers */ cb = fdst->cb; opaque = fdst->opaque; - virMutexUnlock(&fdst->lock); + virObjectUnlock(fdst); /* call failure callback, poll reports nothing on closed fd */ (cb)(st, VIR_STREAM_EVENT_ERROR, opaque); - virMutexLock(&fdst->lock); + virObjectLock(fdst); fdst->abortCallbackDispatching = false; } @@ -356,11 +377,10 @@ virFDStreamCloseInt(virStreamPtr st, bool streamAbort) if (fdst->dispatching) { fdst->closed = true; - virMutexUnlock(&fdst->lock); + virObjectUnlock(fdst); } else { - virMutexUnlock(&fdst->lock); - virMutexDestroy(&fdst->lock); - VIR_FREE(fdst); + virObjectUnlock(fdst); + virObjectUnref(fdst); } return ret; @@ -395,13 +415,13 @@ static int virFDStreamWrite(virStreamPtr st, const char *bytes, size_t nbytes) return -1; } - virMutexLock(&fdst->lock); + virObjectLock(fdst); if (fdst->length) { if (fdst->length == fdst->offset) { virReportSystemError(ENOSPC, "%s", _("cannot write to stream")); - virMutexUnlock(&fdst->lock); + virObjectUnlock(fdst); return -1; } @@ -427,7 +447,7 @@ static int virFDStreamWrite(virStreamPtr st, const char *bytes, size_t nbytes) fdst->offset += ret; } - virMutexUnlock(&fdst->lock); + virObjectUnlock(fdst); return ret; } @@ -449,11 +469,11 @@ static int virFDStreamRead(virStreamPtr st, char *bytes, size_t nbytes) return -1; } - virMutexLock(&fdst->lock); + virObjectLock(fdst); if (fdst->length) { if (fdst->length == fdst->offset) { - virMutexUnlock(&fdst->lock); + virObjectUnlock(fdst); return 0; } @@ -479,7 +499,7 @@ static int virFDStreamRead(virStreamPtr st, char *bytes, size_t nbytes) fdst->offset += ret; } - virMutexUnlock(&fdst->lock); + virObjectUnlock(fdst); return ret; } @@ -505,25 +525,22 @@ static int virFDStreamOpenInternal(virStreamPtr st, VIR_DEBUG("st=%p fd=%d cmd=%p errfd=%d length=%llu", st, fd, cmd, errfd, length); + if (virFDStreamDataInitialize() < 0) + return -1; + if ((st->flags & VIR_STREAM_NONBLOCK) && virSetNonBlock(fd) < 0) { virReportSystemError(errno, "%s", _("Unable to set non-blocking mode")); return -1; } - if (VIR_ALLOC(fdst) < 0) + if (!(fdst = virObjectLockableNew(virFDStreamDataClass))) return -1; fdst->fd = fd; fdst->cmd = cmd; fdst->errfd = errfd; fdst->length = length; - if (virMutexInit(&fdst->lock) < 0) { - VIR_FREE(fdst); - virReportError(VIR_ERR_INTERNAL_ERROR, "%s", - _("Unable to initialize mutex")); - return -1; - } st->driver = &virFDStreamDrv; st->privateData = fdst; @@ -821,7 +838,7 @@ int virFDStreamSetInternalCloseCb(virStreamPtr st, { virFDStreamDataPtr fdst = st->privateData; - virMutexLock(&fdst->lock); + virObjectLock(fdst); if (fdst->icbFreeOpaque) (fdst->icbFreeOpaque)(fdst->icbOpaque); @@ -830,6 +847,6 @@ int virFDStreamSetInternalCloseCb(virStreamPtr st, fdst->icbOpaque = opaque; fdst->icbFreeOpaque = fcb; - virMutexUnlock(&fdst->lock); + virObjectUnlock(fdst); return 0; } -- 2.8.4

On Wed, Jun 22, 2016 at 04:43:21PM +0200, Michal Privoznik wrote:
While this is no functional change, it makes the code look a bit nicer. Moreover, it prepares ground for future work.
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/fdstream.c | 97 ++++++++++++++++++++++++++++++++++------------------------ 1 file changed, 57 insertions(+), 40 deletions(-)
ACK Regards, Daniel -- |: http://berrange.com -o- http://www.flickr.com/photos/dberrange/ :| |: http://libvirt.org -o- http://virt-manager.org :| |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :| |: http://entangle-photo.org -o- http://live.gnome.org/gtk-vnc :|

Although we already have virStreamRecv, just like some other older APIs it is missing @flags argument. This means, the function is not that flexible and therefore we need virStreamRecvFlags. The latter is going to be needed when we will want it to stop at a hole in stream. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- include/libvirt/libvirt-stream.h | 5 ++++ src/driver-stream.h | 7 +++++ src/libvirt-stream.c | 60 ++++++++++++++++++++++++++++++++++++++++ src/libvirt_public.syms | 4 ++- 4 files changed, 75 insertions(+), 1 deletion(-) diff --git a/include/libvirt/libvirt-stream.h b/include/libvirt/libvirt-stream.h index 831640d..bee2516 100644 --- a/include/libvirt/libvirt-stream.h +++ b/include/libvirt/libvirt-stream.h @@ -45,6 +45,11 @@ int virStreamRecv(virStreamPtr st, char *data, size_t nbytes); +int virStreamRecvFlags(virStreamPtr st, + char *data, + size_t nbytes, + unsigned int flags); + /** * virStreamSourceFunc: diff --git a/src/driver-stream.h b/src/driver-stream.h index 85b4e3b..d4b0480 100644 --- a/src/driver-stream.h +++ b/src/driver-stream.h @@ -36,6 +36,12 @@ typedef int size_t nbytes); typedef int +(*virDrvStreamRecvFlags)(virStreamPtr st, + char *data, + size_t nbytes, + unsigned int flags); + +typedef int (*virDrvStreamEventAddCallback)(virStreamPtr stream, int events, virStreamEventCallback cb, @@ -61,6 +67,7 @@ typedef virStreamDriver *virStreamDriverPtr; struct _virStreamDriver { virDrvStreamSend streamSend; virDrvStreamRecv streamRecv; + virDrvStreamRecvFlags streamRecvFlags; virDrvStreamEventAddCallback streamEventAddCallback; virDrvStreamEventUpdateCallback streamEventUpdateCallback; virDrvStreamEventRemoveCallback streamEventRemoveCallback; diff --git a/src/libvirt-stream.c b/src/libvirt-stream.c index 8384b37..80b2d47 100644 --- a/src/libvirt-stream.c +++ b/src/libvirt-stream.c @@ -286,6 +286,66 @@ virStreamRecv(virStreamPtr stream, /** + * virStreamRecvFlags: + * @stream: pointer to the stream object + * @data: buffer to read into from stream + * @nbytes: size of @data buffer + * @flags: extra flags; not used yet, so callers should always pass 0 + * + * Reads a series of bytes from the stream. This method may + * block the calling application for an arbitrary amount + * of time. + * + * This is just like virStreamRecv except this one has extra + * @flags. In fact, calling virStreamRecvFlags(stream, data, + * nbytes, 0) is equivalent to calling virStreamRecv(stream, + * data, nbytes) and vice versa. + * + * Returns 0 when the end of the stream is reached, at + * which time the caller should invoke virStreamFinish() + * to get confirmation of stream completion. + * + * Returns -1 upon error, at which time the stream will + * be marked as aborted, and the caller should now release + * the stream with virStreamFree. + * + * Returns -2 if there is no data pending to be read & the + * stream is marked as non-blocking. + */ +int +virStreamRecvFlags(virStreamPtr stream, + char *data, + size_t nbytes, + unsigned int flags) +{ + VIR_DEBUG("stream=%p, data=%p, nbytes=%zi flags=%x", + stream, data, nbytes, flags); + + virResetLastError(); + + virCheckStreamReturn(stream, -1); + virCheckNonNullArgGoto(data, error); + + if (stream->driver && + stream->driver->streamRecvFlags) { + int ret; + ret = (stream->driver->streamRecvFlags)(stream, data, nbytes, flags); + if (ret == -2) + return -2; + if (ret < 0) + goto error; + return ret; + } + + virReportUnsupportedError(); + + error: + virDispatchError(stream->conn); + return -1; +} + + +/** * virStreamSendAll: * @stream: pointer to the stream object * @handler: source callback for reading data from application diff --git a/src/libvirt_public.syms b/src/libvirt_public.syms index b6d2dfd..d013d9f 100644 --- a/src/libvirt_public.syms +++ b/src/libvirt_public.syms @@ -734,10 +734,12 @@ LIBVIRT_1.3.3 { LIBVIRT_2.0.0 { global: - virConnectStoragePoolEventRegisterAny; virConnectStoragePoolEventDeregisterAny; virDomainGetGuestVcpus; virDomainSetGuestVcpus; + virConnectStoragePoolEventRegisterAny; + virStreamRecvFlags; } LIBVIRT_1.3.3; + # .... define new API here using predicted next version number .... -- 2.8.4

On Wed, Jun 22, 2016 at 04:43:22PM +0200, Michal Privoznik wrote:
Although we already have virStreamRecv, just like some other older APIs it is missing @flags argument. This means, the function is not that flexible and therefore we need virStreamRecvFlags. The latter is going to be needed when we will want it to stop at a hole in stream.
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- include/libvirt/libvirt-stream.h | 5 ++++ src/driver-stream.h | 7 +++++ src/libvirt-stream.c | 60 ++++++++++++++++++++++++++++++++++++++++ src/libvirt_public.syms | 4 ++- 4 files changed, 75 insertions(+), 1 deletion(-)
ACK Regards, Daniel -- |: http://berrange.com -o- http://www.flickr.com/photos/dberrange/ :| |: http://libvirt.org -o- http://virt-manager.org :| |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :| |: http://entangle-photo.org -o- http://live.gnome.org/gtk-vnc :|

We have three virStreamDriver-s currently in our tree. virFDStream, remote driver and ESX driver.f or now, support for remote driver and ESX driver is sufficient, because implementation for virFDStream is going to be supplied later as it needs to be slightly different. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/esx/esx_stream.c | 16 +++++++++++++++- src/remote/remote_driver.c | 21 +++++++++++++++++---- 2 files changed, 32 insertions(+), 5 deletions(-) diff --git a/src/esx/esx_stream.c b/src/esx/esx_stream.c index fb9abbc..b820b38 100644 --- a/src/esx/esx_stream.c +++ b/src/esx/esx_stream.c @@ -252,12 +252,17 @@ esxStreamSend(virStreamPtr stream, const char *data, size_t nbytes) } static int -esxStreamRecv(virStreamPtr stream, char *data, size_t nbytes) +esxStreamRecvFlags(virStreamPtr stream, + char *data, + size_t nbytes, + unsigned int flags) { int result = -1; esxStreamPrivate *priv = stream->privateData; int status; + virCheckFlags(0, -1); + if (nbytes == 0) return 0; @@ -317,6 +322,14 @@ esxStreamRecv(virStreamPtr stream, char *data, size_t nbytes) return result; } +static int +esxStreamRecv(virStreamPtr stream, + char *data, + size_t nbytes) +{ + return esxStreamRecvFlags(stream, data, nbytes, 0); +} + static void esxFreeStreamPrivate(esxStreamPrivate **priv) { @@ -369,6 +382,7 @@ esxStreamAbort(virStreamPtr stream) virStreamDriver esxStreamDriver = { .streamSend = esxStreamSend, .streamRecv = esxStreamRecv, + .streamRecvFlags = esxStreamRecvFlags, /* FIXME: streamAddCallback missing */ /* FIXME: streamUpdateCallback missing */ /* FIXME: streamRemoveCallback missing */ diff --git a/src/remote/remote_driver.c b/src/remote/remote_driver.c index 3f9d812..bfd8e8e 100644 --- a/src/remote/remote_driver.c +++ b/src/remote/remote_driver.c @@ -5231,15 +5231,19 @@ remoteStreamSend(virStreamPtr st, static int -remoteStreamRecv(virStreamPtr st, - char *data, - size_t nbytes) +remoteStreamRecvFlags(virStreamPtr st, + char *data, + size_t nbytes, + unsigned int flags) { - VIR_DEBUG("st=%p data=%p nbytes=%zu", st, data, nbytes); + VIR_DEBUG("st=%p data=%p nbytes=%zu flags=%x", + st, data, nbytes, flags); struct private_data *priv = st->conn->privateData; virNetClientStreamPtr privst = st->privateData; int rv; + virCheckFlags(0, -1); + if (virNetClientStreamRaiseError(privst)) return -1; @@ -5261,6 +5265,14 @@ remoteStreamRecv(virStreamPtr st, return rv; } +static int +remoteStreamRecv(virStreamPtr st, + char *data, + size_t nbytes) +{ + return remoteStreamRecv(st, data, nbytes); +} + struct remoteStreamCallbackData { virStreamPtr st; virStreamEventCallback cb; @@ -5433,6 +5445,7 @@ remoteStreamAbort(virStreamPtr st) static virStreamDriver remoteStreamDrv = { .streamRecv = remoteStreamRecv, + .streamRecvFlags = remoteStreamRecvFlags, .streamSend = remoteStreamSend, .streamFinish = remoteStreamFinish, .streamAbort = remoteStreamAbort, -- 2.8.4

On Wed, Jun 22, 2016 at 04:43:23PM +0200, Michal Privoznik wrote:
We have three virStreamDriver-s currently in our tree. virFDStream, remote driver and ESX driver.f or now, support for remote driver and ESX driver is sufficient, because implementation for virFDStream is going to be supplied later as it needs to be slightly different.
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/esx/esx_stream.c | 16 +++++++++++++++- src/remote/remote_driver.c | 21 +++++++++++++++++---- 2 files changed, 32 insertions(+), 5 deletions(-)
ACK
+static int +remoteStreamRecv(virStreamPtr st, + char *data, + size_t nbytes) +{ + return remoteStreamRecv(st, data, nbytes);
Err, infinite loop ?
+} +
Regards, Daniel -- |: http://berrange.com -o- http://www.flickr.com/photos/dberrange/ :| |: http://libvirt.org -o- http://virt-manager.org :| |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :| |: http://entangle-photo.org -o- http://live.gnome.org/gtk-vnc :|

This API can be used to tell the other side of the stream to skip some bytes in the stream. This can be used to create a sparse file on the receiving side of a stream. It takes just one argument @length, which says how big the hole is. Since our streams are not rewindable like regular files, we don't need @whence argument like seek(2) has. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- include/libvirt/libvirt-stream.h | 3 +++ src/driver-stream.h | 5 ++++ src/libvirt-stream.c | 57 ++++++++++++++++++++++++++++++++++++++++ src/libvirt_public.syms | 1 + 4 files changed, 66 insertions(+) diff --git a/include/libvirt/libvirt-stream.h b/include/libvirt/libvirt-stream.h index bee2516..4e0a599 100644 --- a/include/libvirt/libvirt-stream.h +++ b/include/libvirt/libvirt-stream.h @@ -50,6 +50,9 @@ int virStreamRecvFlags(virStreamPtr st, size_t nbytes, unsigned int flags); +int virStreamSkip(virStreamPtr st, + unsigned long long length); + /** * virStreamSourceFunc: diff --git a/src/driver-stream.h b/src/driver-stream.h index d4b0480..20ea13f 100644 --- a/src/driver-stream.h +++ b/src/driver-stream.h @@ -42,6 +42,10 @@ typedef int unsigned int flags); typedef int +(*virDrvStreamSkip)(virStreamPtr st, + unsigned long long length); + +typedef int (*virDrvStreamEventAddCallback)(virStreamPtr stream, int events, virStreamEventCallback cb, @@ -68,6 +72,7 @@ struct _virStreamDriver { virDrvStreamSend streamSend; virDrvStreamRecv streamRecv; virDrvStreamRecvFlags streamRecvFlags; + virDrvStreamSkip streamSkip; virDrvStreamEventAddCallback streamEventAddCallback; virDrvStreamEventUpdateCallback streamEventUpdateCallback; virDrvStreamEventRemoveCallback streamEventRemoveCallback; diff --git a/src/libvirt-stream.c b/src/libvirt-stream.c index 80b2d47..55f3ef5 100644 --- a/src/libvirt-stream.c +++ b/src/libvirt-stream.c @@ -346,6 +346,63 @@ virStreamRecvFlags(virStreamPtr stream, /** + * virStreamSkip: + * @stream: pointer to the stream object + * @length: number of bytes to skip + * + * Skip @length bytes in the stream. This is useful when there's + * no actual data in the stream, just a hole. If that's the case, + * this API can be used to skip the hole properly instead of + * transmitting zeroes to the other side. + * + * An example using this with a hypothetical file upload API + * looks like: + * + * virStream st; + * + * while (1) { + * char buf[4096]; + * size_t len; + * if (..in hole...) { + * ..get hole size... + * virStreamSkip(st, len); + * } else { + * ...read len bytes... + * virStreamSend(st, buf, len); + * } + * } + * + * Returns 0 on success, + * -1 error + */ +int +virStreamSkip(virStreamPtr stream, + unsigned long long length) +{ + VIR_DEBUG("stream=%p, length=%llu", stream, length); + + virResetLastError(); + + virCheckStreamReturn(stream, -1); + + if (stream->driver && + stream->driver->streamSkip) { + int ret; + ret = (stream->driver->streamSkip)(stream, length); + if (ret < 0) + goto error; + return ret; + } + + virReportUnsupportedError(); + + error: + virDispatchError(stream->conn); + return -1; +} + + +/** * virStreamSendAll: * @stream: pointer to the stream object * @handler: source callback for reading data from application diff --git a/src/libvirt_public.syms b/src/libvirt_public.syms index d013d9f..5b536eb 100644 --- a/src/libvirt_public.syms +++ b/src/libvirt_public.syms @@ -739,6 +739,7 @@ LIBVIRT_2.0.0 { virDomainSetGuestVcpus; virConnectStoragePoolEventRegisterAny; virStreamRecvFlags; + virStreamSkip; } LIBVIRT_1.3.3; -- 2.8.4

On Wed, Jun 22, 2016 at 04:43:24PM +0200, Michal Privoznik wrote:
This API can be used to tell the other side of the stream to skip some bytes in the stream. This can be used to create a sparse file on the receiving side of a stream.
It takes just one argument @length, which says how big the hole is. Since our streams are not rewindable like regular files, we don't need @whence argument like seek(2) has.
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- include/libvirt/libvirt-stream.h | 3 +++ src/driver-stream.h | 5 ++++ src/libvirt-stream.c | 57 ++++++++++++++++++++++++++++++++++++++++ src/libvirt_public.syms | 1 + 4 files changed, 66 insertions(+)
diff --git a/include/libvirt/libvirt-stream.h b/include/libvirt/libvirt-stream.h index bee2516..4e0a599 100644 --- a/include/libvirt/libvirt-stream.h +++ b/include/libvirt/libvirt-stream.h @@ -50,6 +50,9 @@ int virStreamRecvFlags(virStreamPtr st, size_t nbytes, unsigned int flags);
+int virStreamSkip(virStreamPtr st, + unsigned long long length); +
/** * virStreamSourceFunc: diff --git a/src/driver-stream.h b/src/driver-stream.h index d4b0480..20ea13f 100644 --- a/src/driver-stream.h +++ b/src/driver-stream.h @@ -42,6 +42,10 @@ typedef int unsigned int flags);
typedef int +(*virDrvStreamSkip)(virStreamPtr st, + unsigned long long length); + +typedef int (*virDrvStreamEventAddCallback)(virStreamPtr stream, int events, virStreamEventCallback cb, @@ -68,6 +72,7 @@ struct _virStreamDriver { virDrvStreamSend streamSend; virDrvStreamRecv streamRecv; virDrvStreamRecvFlags streamRecvFlags; + virDrvStreamSkip streamSkip; virDrvStreamEventAddCallback streamEventAddCallback; virDrvStreamEventUpdateCallback streamEventUpdateCallback; virDrvStreamEventRemoveCallback streamEventRemoveCallback; diff --git a/src/libvirt-stream.c b/src/libvirt-stream.c index 80b2d47..55f3ef5 100644 --- a/src/libvirt-stream.c +++ b/src/libvirt-stream.c @@ -346,6 +346,63 @@ virStreamRecvFlags(virStreamPtr stream,
/** + * virStreamSkip: + * @stream: pointer to the stream object + * @length: number of bytes to skip + * + * Skip @length bytes in the stream. This is useful when there's + * no actual data in the stream, just a hole. If that's the case, + * this API can be used to skip the hole properly instead of + * transmitting zeroes to the other side. + * + * An example using this with a hypothetical file upload API + * looks like: + * + * virStream st; + * + * while (1) { + * char buf[4096]; + * size_t len; + * if (..in hole...) { + * ..get hole size... + * virStreamSkip(st, len); + * } else { + * ...read len bytes... + * virStreamSend(st, buf, len); + * } + * } + * + * Returns 0 on success, + * -1 error + */ +int +virStreamSkip(virStreamPtr stream, + unsigned long long length) +{ + VIR_DEBUG("stream=%p, length=%llu", stream, length); + + virResetLastError(); + + virCheckStreamReturn(stream, -1); + + if (stream->driver && + stream->driver->streamSkip) { + int ret; + ret = (stream->driver->streamSkip)(stream, length); + if (ret < 0) + goto error; + return ret; + } + + virReportUnsupportedError(); + + error: + virDispatchError(stream->conn); + return -1; +} + + +/** * virStreamSendAll: * @stream: pointer to the stream object * @handler: source callback for reading data from application diff --git a/src/libvirt_public.syms b/src/libvirt_public.syms index d013d9f..5b536eb 100644 --- a/src/libvirt_public.syms +++ b/src/libvirt_public.syms @@ -739,6 +739,7 @@ LIBVIRT_2.0.0 { virDomainSetGuestVcpus; virConnectStoragePoolEventRegisterAny; virStreamRecvFlags; + virStreamSkip; } LIBVIRT_1.3.3;
We can put this in a 2.1.0 block now ACK Regards, Daniel -- |: http://berrange.com -o- http://www.flickr.com/photos/dberrange/ :| |: http://libvirt.org -o- http://virt-manager.org :| |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :| |: http://entangle-photo.org -o- http://live.gnome.org/gtk-vnc :|

This function is basically a counterpart for virStreamSkip. If one side of a stream called virStreamSkip() the other should call virStreamHoleSize() to get the size of the hole. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- include/libvirt/libvirt-stream.h | 3 +++ src/driver-stream.h | 5 +++++ src/libvirt-stream.c | 42 ++++++++++++++++++++++++++++++++++++++++ src/libvirt_public.syms | 1 + 4 files changed, 51 insertions(+) diff --git a/include/libvirt/libvirt-stream.h b/include/libvirt/libvirt-stream.h index 4e0a599..2ebda74 100644 --- a/include/libvirt/libvirt-stream.h +++ b/include/libvirt/libvirt-stream.h @@ -53,6 +53,9 @@ int virStreamRecvFlags(virStreamPtr st, int virStreamSkip(virStreamPtr st, unsigned long long length); +int virStreamHoleSize(virStreamPtr, + unsigned long long *length); + /** * virStreamSourceFunc: diff --git a/src/driver-stream.h b/src/driver-stream.h index 20ea13f..e196b6d 100644 --- a/src/driver-stream.h +++ b/src/driver-stream.h @@ -46,6 +46,10 @@ typedef int unsigned long long length); typedef int +(*virDrvStreamHoleSize)(virStreamPtr st, + unsigned long long *length); + +typedef int (*virDrvStreamEventAddCallback)(virStreamPtr stream, int events, virStreamEventCallback cb, @@ -73,6 +77,7 @@ struct _virStreamDriver { virDrvStreamRecv streamRecv; virDrvStreamRecvFlags streamRecvFlags; virDrvStreamSkip streamSkip; + virDrvStreamHoleSize streamHoleSize; virDrvStreamEventAddCallback streamEventAddCallback; virDrvStreamEventUpdateCallback streamEventUpdateCallback; virDrvStreamEventRemoveCallback streamEventRemoveCallback; diff --git a/src/libvirt-stream.c b/src/libvirt-stream.c index 55f3ef5..3ac9e0d 100644 --- a/src/libvirt-stream.c +++ b/src/libvirt-stream.c @@ -403,6 +403,48 @@ virStreamSkip(virStreamPtr stream, /** + * virStreamHoleSize: + * @stream: pointer to the stream object + * @length: number of bytes to skip + * + * This function is a counterpart to virStreamSkip(). That is, if + * one side of a stream has called virStreamSkip() the other side + * of the stream should call virStreamHoleSize() to retrieve the + * size of hole. If there's currently no hole in the stream, -1 + * is returned. + * + * Returns 0 on success, + * -1 on error + */ +int +virStreamHoleSize(virStreamPtr stream, + unsigned long long *length) +{ + VIR_DEBUG("stream=%p, length=%p", stream, length); + + virResetLastError(); + + virCheckStreamReturn(stream, -1); + virCheckNonNullArgReturn(length, -1); + + if (stream->driver && + stream->driver->streamHoleSize) { + int ret; + ret = (stream->driver->streamHoleSize)(stream, length); + if (ret < 0) + goto error; + return ret; + } + + virReportUnsupportedError(); + + error: + virDispatchError(stream->conn); + return -1; +} + + +/** * virStreamSendAll: * @stream: pointer to the stream object * @handler: source callback for reading data from application diff --git a/src/libvirt_public.syms b/src/libvirt_public.syms index 5b536eb..0439434 100644 --- a/src/libvirt_public.syms +++ b/src/libvirt_public.syms @@ -738,6 +738,7 @@ LIBVIRT_2.0.0 { virDomainGetGuestVcpus; virDomainSetGuestVcpus; virConnectStoragePoolEventRegisterAny; + virStreamHoleSize; virStreamRecvFlags; virStreamSkip; } LIBVIRT_1.3.3; -- 2.8.4

On Wed, Jun 22, 2016 at 04:43:25PM +0200, Michal Privoznik wrote:
This function is basically a counterpart for virStreamSkip. If one side of a stream called virStreamSkip() the other should call virStreamHoleSize() to get the size of the hole.
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- include/libvirt/libvirt-stream.h | 3 +++ src/driver-stream.h | 5 +++++ src/libvirt-stream.c | 42 ++++++++++++++++++++++++++++++++++++++++ src/libvirt_public.syms | 1 + 4 files changed, 51 insertions(+)
diff --git a/include/libvirt/libvirt-stream.h b/include/libvirt/libvirt-stream.h index 4e0a599..2ebda74 100644 --- a/include/libvirt/libvirt-stream.h +++ b/include/libvirt/libvirt-stream.h @@ -53,6 +53,9 @@ int virStreamRecvFlags(virStreamPtr st, int virStreamSkip(virStreamPtr st, unsigned long long length);
+int virStreamHoleSize(virStreamPtr, + unsigned long long *length); +
/** * virStreamSourceFunc: diff --git a/src/driver-stream.h b/src/driver-stream.h index 20ea13f..e196b6d 100644 --- a/src/driver-stream.h +++ b/src/driver-stream.h @@ -46,6 +46,10 @@ typedef int unsigned long long length);
typedef int +(*virDrvStreamHoleSize)(virStreamPtr st, + unsigned long long *length); + +typedef int (*virDrvStreamEventAddCallback)(virStreamPtr stream, int events, virStreamEventCallback cb, @@ -73,6 +77,7 @@ struct _virStreamDriver { virDrvStreamRecv streamRecv; virDrvStreamRecvFlags streamRecvFlags; virDrvStreamSkip streamSkip; + virDrvStreamHoleSize streamHoleSize; virDrvStreamEventAddCallback streamEventAddCallback; virDrvStreamEventUpdateCallback streamEventUpdateCallback; virDrvStreamEventRemoveCallback streamEventRemoveCallback; diff --git a/src/libvirt-stream.c b/src/libvirt-stream.c index 55f3ef5..3ac9e0d 100644 --- a/src/libvirt-stream.c +++ b/src/libvirt-stream.c @@ -403,6 +403,48 @@ virStreamSkip(virStreamPtr stream,
/** + * virStreamHoleSize: + * @stream: pointer to the stream object + * @length: number of bytes to skip + * + * This function is a counterpart to virStreamSkip(). That is, if + * one side of a stream has called virStreamSkip() the other side + * of the stream should call virStreamHoleSize() to retrieve the + * size of hole. If there's currently no hole in the stream, -1 + * is returned. + * + * Returns 0 on success, + * -1 on error + */ +int +virStreamHoleSize(virStreamPtr stream, + unsigned long long *length) +{ + VIR_DEBUG("stream=%p, length=%p", stream, length); + + virResetLastError(); + + virCheckStreamReturn(stream, -1); + virCheckNonNullArgReturn(length, -1); + + if (stream->driver && + stream->driver->streamHoleSize) { + int ret; + ret = (stream->driver->streamHoleSize)(stream, length); + if (ret < 0) + goto error; + return ret; + } + + virReportUnsupportedError(); + + error: + virDispatchError(stream->conn); + return -1; +} + + +/** * virStreamSendAll: * @stream: pointer to the stream object * @handler: source callback for reading data from application diff --git a/src/libvirt_public.syms b/src/libvirt_public.syms index 5b536eb..0439434 100644 --- a/src/libvirt_public.syms +++ b/src/libvirt_public.syms @@ -738,6 +738,7 @@ LIBVIRT_2.0.0 { virDomainGetGuestVcpus; virDomainSetGuestVcpus; virConnectStoragePoolEventRegisterAny; + virStreamHoleSize; virStreamRecvFlags; virStreamSkip; } LIBVIRT_1.3.3;
Use 2.1.0 now ACK Regards, Daniel -- |: http://berrange.com -o- http://www.flickr.com/photos/dberrange/ :| |: http://libvirt.org -o- http://virt-manager.org :| |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :| |: http://entangle-photo.org -o- http://live.gnome.org/gtk-vnc :|

This flag is for virStreamRecvFlags API. Its purpose is to stop reading from the stream if a hole occurred as holes are to be threated separately. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- include/libvirt/libvirt-stream.h | 4 ++++ src/libvirt-stream.c | 30 +++++++++++++++++++++++++++++- 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/include/libvirt/libvirt-stream.h b/include/libvirt/libvirt-stream.h index 2ebda74..23fcc26 100644 --- a/include/libvirt/libvirt-stream.h +++ b/include/libvirt/libvirt-stream.h @@ -45,6 +45,10 @@ int virStreamRecv(virStreamPtr st, char *data, size_t nbytes); +typedef enum { + VIR_STREAM_RECV_STOP_AT_HOLE = (1 << 0), +} virStreamRecvFlagsValues; + int virStreamRecvFlags(virStreamPtr st, char *data, size_t nbytes, diff --git a/src/libvirt-stream.c b/src/libvirt-stream.c index 3ac9e0d..1162d33 100644 --- a/src/libvirt-stream.c +++ b/src/libvirt-stream.c @@ -290,7 +290,7 @@ virStreamRecv(virStreamPtr stream, * @stream: pointer to the stream object * @data: buffer to read into from stream * @nbytes: size of @data buffer - * @flags: extra flags; not used yet, so callers should always pass 0 + * @flags: bitwise-OR of virStreamRecvFlagsValues * * Reads a series of bytes from the stream. This method may * block the calling application for an arbitrary amount @@ -301,6 +301,29 @@ virStreamRecv(virStreamPtr stream, * nbytes, 0) is equivalent to calling virStreamRecv(stream, * data, nbytes) and vice versa. * + * If flag VIR_STREAM_RECV_STOP_AT_HOLE is set, this function + * will stop reading from stream if it has reached a hole. In + * that case, -3 is returned and virStreamHoleSize() should be + * called to get the hole size. An example using this flag might + * look like this: + * + * while (1) { + * char buf[4096]; + * + * int ret = virStreamRecvFlags(st, buf, len, VIR_STREAM_STOP_AT_HOLE); + * if (ret < 0) { + * if (ret == -3) { + * unsigned long long len; + * ret = virStreamHoleSize(st, &len); + * ...seek len bytes in target... + * } else { + * return -1; + * } + * } else { + * ...write buf to target... + * } + * } + * * Returns 0 when the end of the stream is reached, at * which time the caller should invoke virStreamFinish() * to get confirmation of stream completion. @@ -311,6 +334,9 @@ virStreamRecv(virStreamPtr stream, * * Returns -2 if there is no data pending to be read & the * stream is marked as non-blocking. + * + * Returns -3 if there is a hole in stream and caller requested + * to stop at a hole. */ int virStreamRecvFlags(virStreamPtr stream, @@ -332,6 +358,8 @@ virStreamRecvFlags(virStreamPtr stream, ret = (stream->driver->streamRecvFlags)(stream, data, nbytes, flags); if (ret == -2) return -2; + if (ret == -3) + return -3; if (ret < 0) goto error; return ret; -- 2.8.4

On Wed, Jun 22, 2016 at 04:43:26PM +0200, Michal Privoznik wrote:
This flag is for virStreamRecvFlags API. Its purpose is to stop reading from the stream if a hole occurred as holes are to be threated separately.
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- include/libvirt/libvirt-stream.h | 4 ++++ src/libvirt-stream.c | 30 +++++++++++++++++++++++++++++- 2 files changed, 33 insertions(+), 1 deletion(-)
ACK Regards, Daniel -- |: http://berrange.com -o- http://www.flickr.com/photos/dberrange/ :| |: http://libvirt.org -o- http://virt-manager.org :| |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :| |: http://entangle-photo.org -o- http://live.gnome.org/gtk-vnc :|

This is just a wrapper over new functions that have been just introduced: virStreamRecvFlags(), virStreamHoleSize(). It's very similar to virStreamRecvAll() except it handles sparse streams well. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- include/libvirt/libvirt-stream.h | 28 ++++++++- src/libvirt-stream.c | 120 +++++++++++++++++++++++++++++++++++++++ src/libvirt_public.syms | 1 + 3 files changed, 146 insertions(+), 3 deletions(-) diff --git a/include/libvirt/libvirt-stream.h b/include/libvirt/libvirt-stream.h index 23fcc26..e5f5126 100644 --- a/include/libvirt/libvirt-stream.h +++ b/include/libvirt/libvirt-stream.h @@ -102,9 +102,9 @@ int virStreamSendAll(virStreamPtr st, * @nbytes: size of the data array * @opaque: optional application provided data * - * The virStreamSinkFunc callback is used together - * with the virStreamRecvAll function for libvirt to - * provide the data that has been received. + * The virStreamSinkFunc callback is used together with the + * virStreamRecvAll or virStreamSparseRecvAll functions for + * libvirt to provide the data that has been received. * * The callback will be invoked multiple times, * providing data in small chunks. The application @@ -127,6 +127,28 @@ int virStreamRecvAll(virStreamPtr st, virStreamSinkFunc handler, void *opaque); +/** + * virStreamSinkHoleFunc: + * @st: the stream object + * @length: stream hole size + * @opaque: optional application provided data + * + * This callback is used together with the virStreamSparseRecvAll + * function for libvirt to provide the size of a hole that + * occurred in the stream. + * + * Returns 0 on success, + * -1 upon error + */ +typedef int (*virStreamSinkHoleFunc)(virStreamPtr st, + unsigned long long length, + void *opaque); + +int virStreamSparseRecvAll(virStreamPtr stream, + virStreamSinkFunc handler, + virStreamSinkHoleFunc holeHandler, + void *opaque); + typedef enum { VIR_STREAM_EVENT_READABLE = (1 << 0), VIR_STREAM_EVENT_WRITABLE = (1 << 1), diff --git a/src/libvirt-stream.c b/src/libvirt-stream.c index 1162d33..2e3e319 100644 --- a/src/libvirt-stream.c +++ b/src/libvirt-stream.c @@ -660,6 +660,126 @@ virStreamRecvAll(virStreamPtr stream, /** + * virStreamSparseRecvAll: + * @stream: pointer to the stream object + * @handler: sink callback for writing data to application + * @holeHandler: stream hole callback for skipping holes + * @opaque: application defined data + * + * Receive the entire data stream, sending the data to the + * requested data sink. This is simply a convenient alternative + * to virStreamRecv, for apps that do blocking-I/O. + * + * An example using this with a hypothetical file download + * API looks like: + * + * int mysink(virStreamPtr st, const char *buf, int nbytes, void *opaque) { + * int *fd = opaque; + * + * return write(*fd, buf, nbytes); + * } + * + * int myskip(virStreamPtr st, unsigned long long offset, void *opaque) { + * int *fd = opaque; + * + * return lseek(*fd, offset, SEEK_CUR) == (off_t) -1 ? -1 : 0; + * } + * + * virStreamPtr st = virStreamNew(conn, 0); + * int fd = open("demo.iso", O_WRONLY); + * + * virConnectDownloadSparseFile(conn, st); + * if (virStreamSparseRecvAll(st, mysink, myskip, &fd) < 0) { + * ...report an error ... + * goto done; + * } + * if (virStreamFinish(st) < 0) + * ...report an error... + * virStreamFree(st); + * close(fd); + * + * Note that @opaque data is shared between both @handler and + * @holeHandler callbacks. + * + * Returns 0 if all the data was successfully received. The caller + * should invoke virStreamFinish(st) to flush the stream upon + * success and then virStreamFree + * + * Returns -1 upon any error, with virStreamAbort() already + * having been called, so the caller need only call + * virStreamFree() + */ +int +virStreamSparseRecvAll(virStreamPtr stream, + virStreamSinkFunc handler, + virStreamSinkHoleFunc holeHandler, + void *opaque) +{ + char *bytes = NULL; + size_t want = VIR_NET_MESSAGE_LEGACY_PAYLOAD_MAX; + const unsigned int flags = VIR_STREAM_RECV_STOP_AT_HOLE; + int ret = -1; + + VIR_DEBUG("stream=%p handler=%p holeHandler=%p opaque=%p", + stream, handler, holeHandler, opaque); + + virResetLastError(); + + virCheckStreamReturn(stream, -1); + virCheckNonNullArgGoto(handler, cleanup); + virCheckNonNullArgGoto(holeHandler, cleanup); + + if (stream->flags & VIR_STREAM_NONBLOCK) { + virReportError(VIR_ERR_OPERATION_INVALID, "%s", + _("data sinks cannot be used for non-blocking streams")); + goto cleanup; + } + + if (VIR_ALLOC_N(bytes, want) < 0) + goto cleanup; + + for (;;) { + int got, offset = 0; + unsigned long long holeLen; + got = virStreamRecvFlags(stream, bytes, want, flags); + if (got == -3) { + if (virStreamHoleSize(stream, &holeLen) < 0) { + virStreamAbort(stream); + goto cleanup; + } + + if (holeHandler(stream, holeLen, opaque) < 0) { + virStreamAbort(stream); + goto cleanup; + } + } else { + if (got < 0) + goto cleanup; + } + if (got == 0) + break; + while (offset < got) { + int done; + done = (handler)(stream, bytes + offset, got - offset, opaque); + if (done < 0) { + virStreamAbort(stream); + goto cleanup; + } + offset += done; + } + } + ret = 0; + + cleanup: + VIR_FREE(bytes); + + if (ret != 0) + virDispatchError(stream->conn); + + return ret; +} + +/** * virStreamEventAddCallback: * @stream: pointer to the stream object * @events: set of events to monitor diff --git a/src/libvirt_public.syms b/src/libvirt_public.syms index 0439434..4e8db51 100644 --- a/src/libvirt_public.syms +++ b/src/libvirt_public.syms @@ -741,6 +741,7 @@ LIBVIRT_2.0.0 { virStreamHoleSize; virStreamRecvFlags; virStreamSkip; + virStreamSparseRecvAll; } LIBVIRT_1.3.3; -- 2.8.4

On Wed, Jun 22, 2016 at 04:43:27PM +0200, Michal Privoznik wrote:
This is just a wrapper over new functions that have been just introduced: virStreamRecvFlags(), virStreamHoleSize(). It's very similar to virStreamRecvAll() except it handles sparse streams well.
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- include/libvirt/libvirt-stream.h | 28 ++++++++- src/libvirt-stream.c | 120 +++++++++++++++++++++++++++++++++++++++ src/libvirt_public.syms | 1 + 3 files changed, 146 insertions(+), 3 deletions(-)
ACK
diff --git a/src/libvirt_public.syms b/src/libvirt_public.syms index 0439434..4e8db51 100644 --- a/src/libvirt_public.syms +++ b/src/libvirt_public.syms @@ -741,6 +741,7 @@ LIBVIRT_2.0.0 { virStreamHoleSize; virStreamRecvFlags; virStreamSkip; + virStreamSparseRecvAll; } LIBVIRT_1.3.3;
2.1.0 Regards, Daniel -- |: http://berrange.com -o- http://www.flickr.com/photos/dberrange/ :| |: http://libvirt.org -o- http://virt-manager.org :| |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :| |: http://entangle-photo.org -o- http://live.gnome.org/gtk-vnc :|

This is just a wrapper over new function that have been just introduced: virStreamSkip() . It's very similar to virStreamSendAll() except it handles sparse streams well. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- include/libvirt/libvirt-stream.h | 43 ++++++++++++++-- src/libvirt-stream.c | 104 +++++++++++++++++++++++++++++++++++++++ src/libvirt_public.syms | 1 + 3 files changed, 145 insertions(+), 3 deletions(-) diff --git a/include/libvirt/libvirt-stream.h b/include/libvirt/libvirt-stream.h index e5f5126..9aa728e 100644 --- a/include/libvirt/libvirt-stream.h +++ b/include/libvirt/libvirt-stream.h @@ -69,9 +69,9 @@ int virStreamHoleSize(virStreamPtr, * @nbytes: size of the data array * @opaque: optional application provided data * - * The virStreamSourceFunc callback is used together - * with the virStreamSendAll function for libvirt to - * obtain the data that is to be sent. + * The virStreamSourceFunc callback is used together with + * the virStreamSendAll and virStreamSparseSendAll functions + * for libvirt to obtain the data that is to be sent. * * The callback will be invoked multiple times, * fetching data in small chunks. The application @@ -95,6 +95,43 @@ int virStreamSendAll(virStreamPtr st, void *opaque); /** + * virStreamSourceHoleFunc: + * @st: the stream object + * @inData: are we in data section + * @length: how long is the section we are currently in + * @opaque: optional application provided data + * + * The virStreamSourceHoleFunc callback is used together + * with the virStreamSparseSendAll function for libvirt to + * obtain the length of section stream is currently in. + * + * Moreover, upon successful return, @length should be + * updated with how much bytes are there left until current + * section ends (be it data section or hole section) and if + * the stream is currently in data section, @inData should + * be set to a non-zero value and vice versa. + * As a corner case, there's an implicit hole at the end of + * each file. If that's the case, @inData should be set to 0 + * as well as @length. + * Moreover, this function should always leave the stream in + * data section. Either the one that we have been to prior + * calling this function, or the one that follows the hole + * we are in. + * + * Returns 0 on success, + * -1 upon error + */ +typedef int (*virStreamSourceHoleFunc)(virStreamPtr st, + int *inData, + unsigned long long *length, + void *opaque); + +int virStreamSparseSendAll(virStreamPtr st, + virStreamSourceFunc handler, + virStreamSourceHoleFunc holeHandler, + void *opaque); + +/** * virStreamSinkFunc: * * @st: the stream object diff --git a/src/libvirt-stream.c b/src/libvirt-stream.c index 2e3e319..707c0ed 100644 --- a/src/libvirt-stream.c +++ b/src/libvirt-stream.c @@ -565,7 +565,111 @@ virStreamSendAll(virStreamPtr stream, } + /** + * virStreamSparseSendAll: + * @stream: pointer to the stream object + * @handler: source callback for reading data from application + * @holeHandler: source callback for determining holes + * @opaque: application defined data + * + * Some dummy description here. + * + * Opaque data in @opaque are shared between @handler and @holeHandler. + * + * Returns 0 if all the data was successfully sent. The caller + * should invoke virStreamFinish(st) to flush the stream upon + * success and then virStreamFree + * + * Returns -1 upon any error, with virStreamAbort() already + * having been called, so the caller need only call + * virStreamFree() + */ +int virStreamSparseSendAll(virStreamPtr stream, + virStreamSourceFunc handler, + virStreamSourceHoleFunc holeHandler, + void *opaque) +{ + char *bytes = NULL; + size_t want = VIR_NET_MESSAGE_LEGACY_PAYLOAD_MAX; + int ret = -1; + unsigned long long dataLen = 0; + + VIR_DEBUG("stream=%p handler=%p holeHandler=%p opaque=%p", + stream, handler, holeHandler, opaque); + + virResetLastError(); + + virCheckStreamReturn(stream, -1); + virCheckNonNullArgGoto(handler, cleanup); + virCheckNonNullArgGoto(holeHandler, cleanup); + + if (stream->flags & VIR_STREAM_NONBLOCK) { + virReportError(VIR_ERR_OPERATION_INVALID, "%s", + _("data sources cannot be used for non-blocking streams")); + goto cleanup; + } + + if (VIR_ALLOC_N(bytes, want) < 0) + goto cleanup; + + for (;;) { + int inData, got, offset = 0; + unsigned long long sectionLen; + + if (!dataLen) { + if (holeHandler(stream, &inData, §ionLen, opaque) < 0) { + virStreamAbort(stream); + goto cleanup; + } + + if (!inData) { + if (sectionLen && virStreamSkip(stream, sectionLen) < 0) { + virStreamAbort(stream); + goto cleanup; + } + + if (sectionLen) + continue; + + /* Here inData == 0 and sectionLen == 0 as well. + * This means, we are in the trailing hole. Don't + * start new iteration but let virStreamSend() + * close the stream gracefully. */ + } else { + dataLen = sectionLen; + } + } + + if (want > dataLen) + want = dataLen; + + got = (handler)(stream, bytes, want, opaque); + if (got < 0) { + virStreamAbort(stream); + goto cleanup; + } + if (got == 0) + break; + while (offset < got) { + int done; + done = virStreamSend(stream, bytes + offset, got - offset); + if (done < 0) + goto cleanup; + offset += done; + dataLen -= done; + } + } + ret = 0; + + cleanup: + VIR_FREE(bytes); + + if (ret != 0) + virDispatchError(stream->conn); + + return ret; +}/** * virStreamRecvAll: * @stream: pointer to the stream object * @handler: sink callback for writing data to application diff --git a/src/libvirt_public.syms b/src/libvirt_public.syms index 4e8db51..2da61c8 100644 --- a/src/libvirt_public.syms +++ b/src/libvirt_public.syms @@ -742,6 +742,7 @@ LIBVIRT_2.0.0 { virStreamRecvFlags; virStreamSkip; virStreamSparseRecvAll; + virStreamSparseSendAll; } LIBVIRT_1.3.3; -- 2.8.4

On Wed, Jun 22, 2016 at 04:43:28PM +0200, Michal Privoznik wrote:
This is just a wrapper over new function that have been just introduced: virStreamSkip() . It's very similar to virStreamSendAll() except it handles sparse streams well.
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- include/libvirt/libvirt-stream.h | 43 ++++++++++++++-- src/libvirt-stream.c | 104 +++++++++++++++++++++++++++++++++++++++ src/libvirt_public.syms | 1 + 3 files changed, 145 insertions(+), 3 deletions(-)
ACK Regards, Daniel -- |: http://berrange.com -o- http://www.flickr.com/photos/dberrange/ :| |: http://libvirt.org -o- http://virt-manager.org :| |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :| |: http://entangle-photo.org -o- http://live.gnome.org/gtk-vnc :|

This is just an internal API, that calls corresponding function in stream driver. This function will set @data=1 if the underlying file is in data section, or @data=0 if it is in a hole. At any rate, @length is set to number of bytes remaining in the section the file currently is. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/driver-stream.h | 6 ++++++ src/libvirt-stream.c | 43 +++++++++++++++++++++++++++++++++++++++++++ src/libvirt_internal.h | 3 +++ src/libvirt_private.syms | 1 + 4 files changed, 53 insertions(+) diff --git a/src/driver-stream.h b/src/driver-stream.h index e196b6d..5d84b9a 100644 --- a/src/driver-stream.h +++ b/src/driver-stream.h @@ -50,6 +50,11 @@ typedef int unsigned long long *length); typedef int +(*virDrvStreamInData)(virStreamPtr st, + int *data, + unsigned long long *length); + +typedef int (*virDrvStreamEventAddCallback)(virStreamPtr stream, int events, virStreamEventCallback cb, @@ -78,6 +83,7 @@ struct _virStreamDriver { virDrvStreamRecvFlags streamRecvFlags; virDrvStreamSkip streamSkip; virDrvStreamHoleSize streamHoleSize; + virDrvStreamInData streamInData; virDrvStreamEventAddCallback streamEventAddCallback; virDrvStreamEventUpdateCallback streamEventUpdateCallback; virDrvStreamEventRemoveCallback streamEventRemoveCallback; diff --git a/src/libvirt-stream.c b/src/libvirt-stream.c index 707c0ed..2632d55 100644 --- a/src/libvirt-stream.c +++ b/src/libvirt-stream.c @@ -473,6 +473,49 @@ virStreamHoleSize(virStreamPtr stream, /** + * virStreamInData: + * @stream: stream + * @data: are we in data or hole + * @length: length to next section + * + * This function checks the underlying stream (typically a file) + * to learn whether the current stream position lies within a + * data section or a hold. Upon return @data is set to a nonzero + * value if former is the case, or to zero if @stream is in a + * hole. Moreover, @length is updated to tell caller how many + * bytes can be read from @stream until current section changes + * (from data to a hole or vice versa). + * + * As a special case, there's an implicit hole at EOF. In this + * situation this function should set @data = false, @length = 0 + * and return 0. + * + * Returns 0 on success, + * -1 otherwise + */ +int +virStreamInData(virStreamPtr stream, + int *data, + unsigned long long *length) +{ + VIR_DEBUG("stream=%p, data=%p, length=%p", stream, data, length); + + /* No checks of arguments or error resetting. This is an + * internal function that just happen to live next to some + * public functions of ours. */ + + if (stream->driver->streamInData) { + int ret; + ret = (stream->driver->streamInData)(stream, data, length); + return ret; + } + + virReportUnsupportedError(); + return -1; +} + + +/** * virStreamSendAll: * @stream: pointer to the stream object * @handler: source callback for reading data from application diff --git a/src/libvirt_internal.h b/src/libvirt_internal.h index 96439d8..0e945aa 100644 --- a/src/libvirt_internal.h +++ b/src/libvirt_internal.h @@ -294,4 +294,7 @@ virTypedParameterValidateSet(virConnectPtr conn, virTypedParameterPtr params, int nparams); +int virStreamInData(virStreamPtr stream, + int *data, + unsigned long long *lengtht); #endif diff --git a/src/libvirt_private.syms b/src/libvirt_private.syms index f476eae..f2983e0 100644 --- a/src/libvirt_private.syms +++ b/src/libvirt_private.syms @@ -1030,6 +1030,7 @@ virStateCleanup; virStateInitialize; virStateReload; virStateStop; +virStreamInData; # locking/domain_lock.h -- 2.8.4

This has no real added value right now, but is going to be very helpful later. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/remote/remote_driver.c | 6 ++++-- src/rpc/gendispatch.pl | 2 +- src/rpc/virnetclientstream.c | 6 +++++- src/rpc/virnetclientstream.h | 3 ++- 4 files changed, 12 insertions(+), 5 deletions(-) diff --git a/src/remote/remote_driver.c b/src/remote/remote_driver.c index bfd8e8e..e80ec58 100644 --- a/src/remote/remote_driver.c +++ b/src/remote/remote_driver.c @@ -5760,7 +5760,8 @@ remoteDomainMigratePrepareTunnel3(virConnectPtr dconn, memset(&args, 0, sizeof(args)); memset(&ret, 0, sizeof(ret)); - if (!(netst = virNetClientStreamNew(priv->remoteProgram, + if (!(netst = virNetClientStreamNew(st, + priv->remoteProgram, REMOTE_PROC_DOMAIN_MIGRATE_PREPARE_TUNNEL3, priv->counter))) goto done; @@ -6684,7 +6685,8 @@ remoteDomainMigratePrepareTunnel3Params(virConnectPtr dconn, goto cleanup; } - if (!(netst = virNetClientStreamNew(priv->remoteProgram, + if (!(netst = virNetClientStreamNew(st, + priv->remoteProgram, REMOTE_PROC_DOMAIN_MIGRATE_PREPARE_TUNNEL3_PARAMS, priv->counter))) goto cleanup; diff --git a/src/rpc/gendispatch.pl b/src/rpc/gendispatch.pl index 173189c..e608812 100755 --- a/src/rpc/gendispatch.pl +++ b/src/rpc/gendispatch.pl @@ -1738,7 +1738,7 @@ elsif ($mode eq "client") { if ($call->{streamflag} ne "none") { print "\n"; - print " if (!(netst = virNetClientStreamNew(priv->remoteProgram, $call->{constname}, priv->counter)))\n"; + print " if (!(netst = virNetClientStreamNew(st, priv->remoteProgram, $call->{constname}, priv->counter)))\n"; print " goto done;\n"; print "\n"; print " if (virNetClientAddStream(priv->client, netst) < 0) {\n"; diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index 34989a9..8920395 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -36,6 +36,8 @@ VIR_LOG_INIT("rpc.netclientstream"); struct _virNetClientStream { virObjectLockable parent; + virStreamPtr stream; /* Reverse pointer to parent stream */ + virNetClientProgramPtr prog; int proc; unsigned serial; @@ -133,7 +135,8 @@ virNetClientStreamEventTimer(int timer ATTRIBUTE_UNUSED, void *opaque) } -virNetClientStreamPtr virNetClientStreamNew(virNetClientProgramPtr prog, +virNetClientStreamPtr virNetClientStreamNew(virStreamPtr stream, + virNetClientProgramPtr prog, int proc, unsigned serial) { @@ -145,6 +148,7 @@ virNetClientStreamPtr virNetClientStreamNew(virNetClientProgramPtr prog, if (!(st = virObjectLockableNew(virNetClientStreamClass))) return NULL; + st->stream = stream; st->prog = prog; st->proc = proc; st->serial = serial; diff --git a/src/rpc/virnetclientstream.h b/src/rpc/virnetclientstream.h index a0d2be9..e278dab 100644 --- a/src/rpc/virnetclientstream.h +++ b/src/rpc/virnetclientstream.h @@ -32,7 +32,8 @@ typedef virNetClientStream *virNetClientStreamPtr; typedef void (*virNetClientStreamEventCallback)(virNetClientStreamPtr stream, int events, void *opaque); -virNetClientStreamPtr virNetClientStreamNew(virNetClientProgramPtr prog, +virNetClientStreamPtr virNetClientStreamNew(virStreamPtr stream, + virNetClientProgramPtr prog, int proc, unsigned serial); -- 2.8.4

Even though there's no way to make stream skippable right now, it is going to be soon. We need to track this info so that we don't send virStreamSkip to a client that did not want it or vice versa. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- daemon/remote.c | 2 +- daemon/stream.c | 6 +++++- daemon/stream.h | 3 ++- src/remote/remote_driver.c | 6 ++++-- src/rpc/gendispatch.pl | 4 ++-- src/rpc/virnetclientstream.c | 6 +++++- src/rpc/virnetclientstream.h | 3 ++- 7 files changed, 21 insertions(+), 9 deletions(-) diff --git a/daemon/remote.c b/daemon/remote.c index 4e2aff8..6610196 100644 --- a/daemon/remote.c +++ b/daemon/remote.c @@ -5038,7 +5038,7 @@ remoteDispatchDomainMigratePrepareTunnel3Params(virNetServerPtr server ATTRIBUTE if (!(st = virStreamNew(priv->conn, VIR_STREAM_NONBLOCK)) || !(stream = daemonCreateClientStream(client, st, remoteProgram, - &msg->header))) + &msg->header, false))) goto cleanup; if (virDomainMigratePrepareTunnel3Params(priv->conn, st, params, nparams, diff --git a/daemon/stream.c b/daemon/stream.c index bd0b5d2..e07a245 100644 --- a/daemon/stream.c +++ b/daemon/stream.c @@ -52,6 +52,8 @@ struct daemonClientStream { virNetMessagePtr rx; bool tx; + bool skippable; + daemonClientStreamPtr next; }; @@ -321,7 +323,8 @@ daemonClientStream * daemonCreateClientStream(virNetServerClientPtr client, virStreamPtr st, virNetServerProgramPtr prog, - virNetMessageHeaderPtr header) + virNetMessageHeaderPtr header, + bool skippable) { daemonClientStream *stream; daemonClientPrivatePtr priv = virNetServerClientGetPrivateData(client); @@ -339,6 +342,7 @@ daemonCreateClientStream(virNetServerClientPtr client, stream->serial = header->serial; stream->filterID = -1; stream->st = st; + stream->skippable = skippable; return stream; } diff --git a/daemon/stream.h b/daemon/stream.h index cf76e71..bf5dc24 100644 --- a/daemon/stream.h +++ b/daemon/stream.h @@ -30,7 +30,8 @@ daemonClientStream * daemonCreateClientStream(virNetServerClientPtr client, virStreamPtr st, virNetServerProgramPtr prog, - virNetMessageHeaderPtr hdr); + virNetMessageHeaderPtr hdr, + bool seekable); int daemonFreeClientStream(virNetServerClientPtr client, daemonClientStream *stream); diff --git a/src/remote/remote_driver.c b/src/remote/remote_driver.c index e80ec58..9f084a1 100644 --- a/src/remote/remote_driver.c +++ b/src/remote/remote_driver.c @@ -5763,7 +5763,8 @@ remoteDomainMigratePrepareTunnel3(virConnectPtr dconn, if (!(netst = virNetClientStreamNew(st, priv->remoteProgram, REMOTE_PROC_DOMAIN_MIGRATE_PREPARE_TUNNEL3, - priv->counter))) + priv->counter, + false))) goto done; if (virNetClientAddStream(priv->client, netst) < 0) { @@ -6688,7 +6689,8 @@ remoteDomainMigratePrepareTunnel3Params(virConnectPtr dconn, if (!(netst = virNetClientStreamNew(st, priv->remoteProgram, REMOTE_PROC_DOMAIN_MIGRATE_PREPARE_TUNNEL3_PARAMS, - priv->counter))) + priv->counter, + false))) goto cleanup; if (virNetClientAddStream(priv->client, netst) < 0) { diff --git a/src/rpc/gendispatch.pl b/src/rpc/gendispatch.pl index e608812..9862598 100755 --- a/src/rpc/gendispatch.pl +++ b/src/rpc/gendispatch.pl @@ -1024,7 +1024,7 @@ elsif ($mode eq "server") { print " if (!(st = virStreamNew(priv->conn, VIR_STREAM_NONBLOCK)))\n"; print " goto cleanup;\n"; print "\n"; - print " if (!(stream = daemonCreateClientStream(client, st, remoteProgram, &msg->header)))\n"; + print " if (!(stream = daemonCreateClientStream(client, st, remoteProgram, &msg->header, false)))\n"; print " goto cleanup;\n"; print "\n"; } @@ -1738,7 +1738,7 @@ elsif ($mode eq "client") { if ($call->{streamflag} ne "none") { print "\n"; - print " if (!(netst = virNetClientStreamNew(st, priv->remoteProgram, $call->{constname}, priv->counter)))\n"; + print " if (!(netst = virNetClientStreamNew(st, priv->remoteProgram, $call->{constname}, priv->counter, false)))\n"; print " goto done;\n"; print "\n"; print " if (virNetClientAddStream(priv->client, netst) < 0) {\n"; diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index 8920395..fb83693 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -54,6 +54,8 @@ struct _virNetClientStream { virNetMessagePtr rx; bool incomingEOF; + bool skippable; /* User requested skippable stream */ + virNetClientStreamEventCallback cb; void *cbOpaque; virFreeCallback cbFree; @@ -138,7 +140,8 @@ virNetClientStreamEventTimer(int timer ATTRIBUTE_UNUSED, void *opaque) virNetClientStreamPtr virNetClientStreamNew(virStreamPtr stream, virNetClientProgramPtr prog, int proc, - unsigned serial) + unsigned serial, + bool skippable) { virNetClientStreamPtr st; @@ -152,6 +155,7 @@ virNetClientStreamPtr virNetClientStreamNew(virStreamPtr stream, st->prog = prog; st->proc = proc; st->serial = serial; + st->skippable = skippable; virObjectRef(prog); diff --git a/src/rpc/virnetclientstream.h b/src/rpc/virnetclientstream.h index e278dab..0a5aafd 100644 --- a/src/rpc/virnetclientstream.h +++ b/src/rpc/virnetclientstream.h @@ -35,7 +35,8 @@ typedef void (*virNetClientStreamEventCallback)(virNetClientStreamPtr stream, virNetClientStreamPtr virNetClientStreamNew(virStreamPtr stream, virNetClientProgramPtr prog, int proc, - unsigned serial); + unsigned serial, + bool seekable); bool virNetClientStreamRaiseError(virNetClientStreamPtr st); -- 2.8.4

This is going to be RPC representation for virStreamSkip. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/rpc/virnetprotocol.x | 4 ++++ src/virnetprotocol-structs | 3 +++ 2 files changed, 7 insertions(+) diff --git a/src/rpc/virnetprotocol.x b/src/rpc/virnetprotocol.x index 9ce33b0..3623588 100644 --- a/src/rpc/virnetprotocol.x +++ b/src/rpc/virnetprotocol.x @@ -236,3 +236,7 @@ struct virNetMessageError { int int2; virNetMessageNetwork net; /* unused */ }; + +struct virNetStreamSkip { + unsigned hyper length; +}; diff --git a/src/virnetprotocol-structs b/src/virnetprotocol-structs index af4526c..a8cc603 100644 --- a/src/virnetprotocol-structs +++ b/src/virnetprotocol-structs @@ -42,3 +42,6 @@ struct virNetMessageError { int int2; virNetMessageNetwork net; }; +struct virNetStreamSkip { + uint64_t length; +}; -- 2.8.4

This is a special type of stream packet, that is bidirectional and will contain information on how much bytes are both sides skipping in the stream. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- daemon/stream.c | 3 ++- src/rpc/virnetclient.c | 1 + src/rpc/virnetprotocol.x | 12 +++++++++++- src/virnetprotocol-structs | 1 + 4 files changed, 15 insertions(+), 2 deletions(-) diff --git a/daemon/stream.c b/daemon/stream.c index e07a245..82a99e4 100644 --- a/daemon/stream.c +++ b/daemon/stream.c @@ -287,7 +287,8 @@ daemonStreamFilter(virNetServerClientPtr client ATTRIBUTE_UNUSED, virMutexLock(&stream->priv->lock); - if (msg->header.type != VIR_NET_STREAM) + if (msg->header.type != VIR_NET_STREAM && + msg->header.type != VIR_NET_STREAM_SKIP) goto cleanup; if (!virNetServerProgramMatches(stream->prog, msg)) diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c index 3d59990..9c87ce0 100644 --- a/src/rpc/virnetclient.c +++ b/src/rpc/virnetclient.c @@ -1157,6 +1157,7 @@ virNetClientCallDispatch(virNetClientPtr client) return virNetClientCallDispatchMessage(client); case VIR_NET_STREAM: /* Stream protocol */ + case VIR_NET_STREAM_SKIP: /* Stream skip protocol */ return virNetClientCallDispatchStream(client); default: diff --git a/src/rpc/virnetprotocol.x b/src/rpc/virnetprotocol.x index 3623588..3530694 100644 --- a/src/rpc/virnetprotocol.x +++ b/src/rpc/virnetprotocol.x @@ -143,6 +143,14 @@ const VIR_NET_MESSAGE_NUM_FDS_MAX = 32; * * status == VIR_NET_ERROR * remote_error Error information * + * - type == VIR_NET_STREAM_SKIP + * * status == VIR_NET_CONTINUE + * byte[] skip data + * * status == VIR_NET_ERROR + * remote_error error information + * * status == VIR_NET_OK + * <empty> + * */ enum virNetMessageType { /* client -> server. args from a method call */ @@ -156,7 +164,9 @@ enum virNetMessageType { /* client -> server. args from a method call, with passed FDs */ VIR_NET_CALL_WITH_FDS = 4, /* server -> client. reply/error from a method call, with passed FDs */ - VIR_NET_REPLY_WITH_FDS = 5 + VIR_NET_REPLY_WITH_FDS = 5, + /* either direction, stream skip data packet */ + VIR_NET_STREAM_SKIP = 6 }; enum virNetMessageStatus { diff --git a/src/virnetprotocol-structs b/src/virnetprotocol-structs index a8cc603..c31683f 100644 --- a/src/virnetprotocol-structs +++ b/src/virnetprotocol-structs @@ -6,6 +6,7 @@ enum virNetMessageType { VIR_NET_STREAM = 3, VIR_NET_CALL_WITH_FDS = 4, VIR_NET_REPLY_WITH_FDS = 5, + VIR_NET_STREAM_SKIP = 6, }; enum virNetMessageStatus { VIR_NET_OK = 0, -- 2.8.4

Ideally, this would be generated, but to achieve that corresponding XDR definitions needed to go into a different .x file. But they belong just to the one that they are right now. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- tools/wireshark/src/packet-libvirt.c | 40 ++++++++++++++++++++++++++++++++++++ tools/wireshark/src/packet-libvirt.h | 2 ++ 2 files changed, 42 insertions(+) diff --git a/tools/wireshark/src/packet-libvirt.c b/tools/wireshark/src/packet-libvirt.c index aa1c323..aa49559 100644 --- a/tools/wireshark/src/packet-libvirt.c +++ b/tools/wireshark/src/packet-libvirt.c @@ -52,8 +52,11 @@ static int hf_libvirt_serial = -1; static int hf_libvirt_status = -1; static int hf_libvirt_stream = -1; static int hf_libvirt_num_of_fds = -1; +static int hf_libvirt_stream_skip_length = -1; +static int hf_libvirt_stream_skip = -1; int hf_libvirt_unknown = -1; static gint ett_libvirt = -1; +static gint ett_libvirt_stream_skip = -1; #define XDR_PRIMITIVE_DISSECTOR(xtype, ctype, ftype) \ static gboolean \ @@ -328,6 +331,28 @@ dissect_libvirt_payload_xdr_data(tvbuff_t *tvb, proto_tree *tree, gint payload_l dissect_libvirt_fds(tvb, start + payload_length, nfds); } +static gboolean +dissect_xdr_stream_skip(tvbuff_t *tvb, proto_tree *tree, XDR *xdrs, int hf) +{ + goffset start = VIR_HEADER_LEN; + proto_item *ti; + + ti = proto_tree_add_item(tree, hf_libvirt_stream_skip, tvb, start, -1, ENC_NA); + tree = proto_item_add_subtree(ti, ett_libvirt_stream_skip); + + hf = hf_libvirt_stream_skip_length; + if (!dissect_xdr_u_hyper(tvb, tree, xdrs, hf)) return FALSE; + proto_item_set_len(ti, xdr_getpos(xdrs) - start); + return TRUE; +} + + +static void +dissect_libvirt_stream_skip(tvbuff_t *tvb, proto_tree *tree, gint payload_length, guint32 status) +{ + proto_tree_add_item(tree, hf_libvirt_stream_skip_length, tvb, VIR_HEADER_LEN, -1, ENC_NA); +} + static void dissect_libvirt_payload(tvbuff_t *tvb, proto_tree *tree, guint32 prog, guint32 proc, guint32 type, guint32 status) @@ -348,6 +373,8 @@ dissect_libvirt_payload(tvbuff_t *tvb, proto_tree *tree, dissect_libvirt_payload_xdr_data(tvb, tree, payload_length, status, VIR_ERROR_MESSAGE_DISSECTOR); } else if (type == VIR_NET_STREAM) { /* implicitly, status == VIR_NET_CONTINUE */ dissect_libvirt_stream(tvb, tree, payload_length); + } else if (type == VIR_NET_STREAM_SKIP) { + dissect_libvirt_payload_xdr_data(tvb, tree, payload_length, status, dissect_xdr_stream_skip); } else { goto unknown; } @@ -517,6 +544,18 @@ proto_register_libvirt(void) NULL, 0x0, NULL, HFILL} }, + { &hf_libvirt_stream_skip, + { "stream_skip", "libvirt.stream_skip", + FT_BYTES, BASE_NONE, + NULL, 0x0, + NULL, HFILL} + }, + { &hf_libvirt_stream_skip_length, + { "length", "libvirt.stream_skip.length", + FT_UINT64, BASE_DEC, + NULL, 0x0, + NULL, HFILL} + }, { &hf_libvirt_unknown, { "unknown", "libvirt.unknown", FT_BYTES, BASE_NONE, @@ -527,6 +566,7 @@ proto_register_libvirt(void) static gint *ett[] = { VIR_DYNAMIC_ETTSET + &ett_libvirt_stream_skip, &ett_libvirt }; diff --git a/tools/wireshark/src/packet-libvirt.h b/tools/wireshark/src/packet-libvirt.h index 5f99fdf..006aa6d 100644 --- a/tools/wireshark/src/packet-libvirt.h +++ b/tools/wireshark/src/packet-libvirt.h @@ -53,6 +53,7 @@ enum vir_net_message_type { VIR_NET_STREAM = 3, VIR_NET_CALL_WITH_FDS = 4, VIR_NET_REPLY_WITH_FDS = 5, + VIR_NET_STREAM_SKIP = 6, }; enum vir_net_message_status { @@ -76,6 +77,7 @@ static const value_string type_strings[] = { { VIR_NET_STREAM, "STREAM" }, { VIR_NET_CALL_WITH_FDS, "CALL_WITH_FDS" }, { VIR_NET_REPLY_WITH_FDS, "REPLY_WITH_FDS" }, + { VIR_NET_STREAM_SKIP, "STREAM_SKIP" }, { -1, NULL } }; -- 2.8.4

This is just a helper function that takes in a length value, encodes it into XDR and sends to client. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/libvirt_remote.syms | 1 + src/rpc/virnetserverprogram.c | 33 +++++++++++++++++++++++++++++++++ src/rpc/virnetserverprogram.h | 7 +++++++ 3 files changed, 41 insertions(+) diff --git a/src/libvirt_remote.syms b/src/libvirt_remote.syms index 1a88fff..b97b9b1 100644 --- a/src/libvirt_remote.syms +++ b/src/libvirt_remote.syms @@ -177,6 +177,7 @@ virNetServerProgramNew; virNetServerProgramSendReplyError; virNetServerProgramSendStreamData; virNetServerProgramSendStreamError; +virNetServerProgramSendStreamSkip; virNetServerProgramUnknownError; diff --git a/src/rpc/virnetserverprogram.c b/src/rpc/virnetserverprogram.c index d1597f4..6d84056 100644 --- a/src/rpc/virnetserverprogram.c +++ b/src/rpc/virnetserverprogram.c @@ -548,6 +548,39 @@ int virNetServerProgramSendStreamData(virNetServerProgramPtr prog, } +int virNetServerProgramSendStreamSkip(virNetServerProgramPtr prog, + virNetServerClientPtr client, + virNetMessagePtr msg, + int procedure, + unsigned int serial, + unsigned long long length) +{ + virNetStreamSkip data; + + VIR_DEBUG("client=%p msg=%p length=%llu", client, msg, length); + + memset(&data, 0, sizeof(data)); + data.length = length; + + msg->header.prog = prog->program; + msg->header.vers = prog->version; + msg->header.proc = procedure; + msg->header.type = VIR_NET_STREAM_SKIP; + msg->header.serial = serial; + msg->header.status = VIR_NET_CONTINUE; + + if (virNetMessageEncodeHeader(msg) < 0) + return -1; + + if (virNetMessageEncodePayload(msg, + (xdrproc_t) xdr_virNetStreamSkip, + &data) < 0) + return -1; + + return virNetServerClientSendMessage(client, msg); +} + + void virNetServerProgramDispose(void *obj ATTRIBUTE_UNUSED) { } diff --git a/src/rpc/virnetserverprogram.h b/src/rpc/virnetserverprogram.h index 531fca0..eba2168 100644 --- a/src/rpc/virnetserverprogram.h +++ b/src/rpc/virnetserverprogram.h @@ -104,4 +104,11 @@ int virNetServerProgramSendStreamData(virNetServerProgramPtr prog, const char *data, size_t len); +int virNetServerProgramSendStreamSkip(virNetServerProgramPtr prog, + virNetServerClientPtr client, + virNetMessagePtr msg, + int procedure, + unsigned int serial, + unsigned long long length); + #endif /* __VIR_NET_SERVER_PROGRAM_H__ */ -- 2.8.4

While the previous commit implemented a helper for sending a STREAM_SKIP packet for daemon, this is a client's counterpart. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/libvirt_remote.syms | 1 + src/rpc/virnetclientstream.c | 52 ++++++++++++++++++++++++++++++++++++++++++++ src/rpc/virnetclientstream.h | 4 ++++ 3 files changed, 57 insertions(+) diff --git a/src/libvirt_remote.syms b/src/libvirt_remote.syms index b97b9b1..16fde79 100644 --- a/src/libvirt_remote.syms +++ b/src/libvirt_remote.syms @@ -54,6 +54,7 @@ virNetClientStreamQueuePacket; virNetClientStreamRaiseError; virNetClientStreamRecvPacket; virNetClientStreamSendPacket; +virNetClientStreamSendSkip; virNetClientStreamSetError; diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index fb83693..d7d2b0e 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -431,6 +431,58 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, } +int +virNetClientStreamSendSkip(virNetClientStreamPtr st, + virNetClientPtr client, + unsigned long long length) +{ + virNetMessagePtr msg = NULL; + virNetStreamSkip data; + int ret = -1; + + VIR_DEBUG("st=%p length=%llu", st, length); + + if (!st->skippable) { + virReportError(VIR_ERR_OPERATION_INVALID, "%s", + _("Skipping is not supported with this stream")); + return -1; + } + + memset(&data, 0, sizeof(data)); + data.length = length; + + if (!(msg = virNetMessageNew(false))) + return -1; + + virObjectLock(st); + + msg->header.prog = virNetClientProgramGetProgram(st->prog); + msg->header.vers = virNetClientProgramGetVersion(st->prog); + msg->header.status = VIR_NET_CONTINUE; + msg->header.type = VIR_NET_STREAM_SKIP; + msg->header.serial = st->serial; + msg->header.proc = st->proc; + + virObjectUnlock(st); + + if (virNetMessageEncodeHeader(msg) < 0) + goto cleanup; + + if (virNetMessageEncodePayload(msg, + (xdrproc_t) xdr_virNetStreamSkip, + &data) < 0) + goto cleanup; + + if (virNetClientSendNoReply(client, msg) < 0) + goto cleanup; + + ret = 0; + cleanup: + virNetMessageFree(msg); + return ret; +} + + int virNetClientStreamEventAddCallback(virNetClientStreamPtr st, int events, virNetClientStreamEventCallback cb, diff --git a/src/rpc/virnetclientstream.h b/src/rpc/virnetclientstream.h index 0a5aafd..a648b7c 100644 --- a/src/rpc/virnetclientstream.h +++ b/src/rpc/virnetclientstream.h @@ -61,6 +61,10 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, size_t nbytes, bool nonblock); +int virNetClientStreamSendSkip(virNetClientStreamPtr st, + virNetClientPtr client, + unsigned long long length); + int virNetClientStreamEventAddCallback(virNetClientStreamPtr st, int events, virNetClientStreamEventCallback cb, -- 2.8.4

Basically, whenever the new type of stream packet arrives to the daemon call this function that decodes it and calls virStreamSkipCallback(). Otherwise a regular data stream packet has arrived and therefore continue its processing. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- daemon/stream.c | 81 +++++++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 70 insertions(+), 11 deletions(-) diff --git a/daemon/stream.c b/daemon/stream.c index 82a99e4..f9c0ba1 100644 --- a/daemon/stream.c +++ b/daemon/stream.c @@ -29,6 +29,7 @@ #include "virlog.h" #include "virnetserverclient.h" #include "virerror.h" +#include "libvirt_internal.h" #define VIR_FROM_THIS VIR_FROM_STREAMS @@ -653,6 +654,52 @@ daemonStreamHandleAbort(virNetServerClientPtr client, } +static int +daemonStreamHandleSkip(virNetServerClientPtr client, + daemonClientStream *stream, + virNetMessagePtr msg) +{ + int ret; + virNetStreamSkip data; + + VIR_DEBUG("client=%p, stream=%p, proc=%d, serial=%u", + client, stream, msg->header.proc, msg->header.serial); + + /* Let's check if client plays nicely and advertised usage of + * sparse stream upfront. */ + if (!stream->skippable) { + virReportError(VIR_ERR_RPC, "%s", + _("Unexpected stream skip")); + return -1; + } + + if (virNetMessageDecodePayload(msg, + (xdrproc_t) xdr_virNetStreamSkip, + &data) < 0) + return -1; + + ret = virStreamSkip(stream->st, data.length); + + if (ret < 0) { + virNetMessageError rerr; + + memset(&rerr, 0, sizeof(rerr)); + + VIR_INFO("Stream skip failed"); + stream->closed = true; + virStreamEventRemoveCallback(stream->st); + virStreamAbort(stream->st); + + return virNetServerProgramSendReplyError(stream->prog, + client, + msg, + &rerr, + &msg->header); + } + + return 0; +} + /* * Called when the stream is signalled has being able to accept @@ -671,19 +718,31 @@ daemonStreamHandleWrite(virNetServerClientPtr client, virNetMessagePtr msg = stream->rx; int ret; - switch (msg->header.status) { - case VIR_NET_OK: - ret = daemonStreamHandleFinish(client, stream, msg); - break; + if (msg->header.type == VIR_NET_STREAM_SKIP) { + /* Handle special case when client sent us skip. + * Otherwise just carry on with processing stream + * data. */ + ret = daemonStreamHandleSkip(client, stream, msg); + } else if (msg->header.type == VIR_NET_STREAM) { + switch (msg->header.status) { + case VIR_NET_OK: + ret = daemonStreamHandleFinish(client, stream, msg); + break; - case VIR_NET_CONTINUE: - ret = daemonStreamHandleWriteData(client, stream, msg); - break; + case VIR_NET_CONTINUE: + ret = daemonStreamHandleWriteData(client, stream, msg); + break; - case VIR_NET_ERROR: - default: - ret = daemonStreamHandleAbort(client, stream, msg); - break; + case VIR_NET_ERROR: + default: + ret = daemonStreamHandleAbort(client, stream, msg); + break; + } + } else { + virReportError(VIR_ERR_RPC, + _("Unexpected message type: %d"), + msg->header.type); + ret = -1; } if (ret > 0) -- 2.8.4

This is a function that handles an incoming STREAM_SKIP packet. Even though it is not wired up yet, it will be soon. At the beginning do couple of checks whether server plays nicely and sent us a STREAM_SKIP packed only after we've enabled sparse streams. Then decodes the message payload to see how big the hole is and stores it in passed @length argument. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/rpc/virnetclientstream.c | 63 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 63 insertions(+) diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index d7d2b0e..0e982ba 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -28,6 +28,7 @@ #include "virerror.h" #include "virlog.h" #include "virthread.h" +#include "libvirt_internal.h" #define VIR_FROM_THIS VIR_FROM_RPC @@ -55,6 +56,7 @@ struct _virNetClientStream { bool incomingEOF; bool skippable; /* User requested skippable stream */ + unsigned long long skipLength; /* Size of incoming hole in stream. */ virNetClientStreamEventCallback cb; void *cbOpaque; @@ -358,6 +360,67 @@ int virNetClientStreamSendPacket(virNetClientStreamPtr st, return -1; } + +static int ATTRIBUTE_UNUSED +virNetClientStreamHandleSkip(virNetClientPtr client, + virNetClientStreamPtr st) +{ + virNetMessagePtr msg; + virNetStreamSkip data; + int ret = -1; + + VIR_DEBUG("client=%p st=%p", client, st); + + msg = st->rx; + memset(&data, 0, sizeof(data)); + + /* We should not be called unless there's VIR_NET_STREAM_SKIP + * message at the head of the list. But doesn't hurt to check */ + if (!msg) { + virReportError(VIR_ERR_INTERNAL_ERROR, "%s", + _("No message in the queue")); + goto cleanup; + } + + if (msg->header.type != VIR_NET_STREAM_SKIP) { + virReportError(VIR_ERR_INTERNAL_ERROR, + _("Invalid message prog=%d type=%d serial=%u proc=%d"), + msg->header.prog, + msg->header.type, + msg->header.serial, + msg->header.proc); + goto cleanup; + } + + /* Server should not send us VIR_NET_STREAM_SKIP unless we + * have requested so. But does not hurt to check ... */ + if (!st->skippable) { + virReportError(VIR_ERR_RPC, "%s", + _("Unexpected stream skip")); + goto cleanup; + } + + if (virNetMessageDecodePayload(msg, + (xdrproc_t) xdr_virNetStreamSkip, + &data) < 0) { + virReportError(VIR_ERR_INTERNAL_ERROR, "%s", + _("Malformed stream skip packet")); + goto cleanup; + } + + virNetMessageQueueServe(&st->rx); + virNetMessageFree(msg); + st->skipLength += data.length; + + ret = 0; + cleanup: + if (ret < 0) { + /* Abort stream? */ + } + return ret; +} + + int virNetClientStreamRecvPacket(virNetClientStreamPtr st, virNetClientPtr client, char *data, -- 2.8.4

Now that we have RPC wrappers over VIR_NET_STREAM_SKIP we can start wiring them up. This commit wires up situation when a client wants to send a hole to daemon. To keep stream offsets synchronous, upon successful call on the daemon skip the same hole in local part of the stream. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/remote/remote_driver.c | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/src/remote/remote_driver.c b/src/remote/remote_driver.c index 9f084a1..2dc21d2 100644 --- a/src/remote/remote_driver.c +++ b/src/remote/remote_driver.c @@ -5273,6 +5273,34 @@ remoteStreamRecv(virStreamPtr st, return remoteStreamRecv(st, data, nbytes); } + +static int +remoteStreamSkip(virStreamPtr st, + unsigned long long length) +{ + VIR_DEBUG("st=%p length=%llu", st, length); + struct private_data *priv = st->conn->privateData; + virNetClientStreamPtr privst = st->privateData; + int rv; + + if (virNetClientStreamRaiseError(privst)) + return -1; + + remoteDriverLock(priv); + priv->localUses++; + remoteDriverUnlock(priv); + + rv = virNetClientStreamSendSkip(privst, + priv->client, + length); + + remoteDriverLock(priv); + priv->localUses--; + remoteDriverUnlock(priv); + return rv; +} + + struct remoteStreamCallbackData { virStreamPtr st; virStreamEventCallback cb; @@ -5447,6 +5475,7 @@ static virStreamDriver remoteStreamDrv = { .streamRecv = remoteStreamRecv, .streamRecvFlags = remoteStreamRecvFlags, .streamSend = remoteStreamSend, + .streamSkip = remoteStreamSkip, .streamFinish = remoteStreamFinish, .streamAbort = remoteStreamAbort, .streamEventAddCallback = remoteStreamEventAddCallback, -- 2.8.4

Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/remote/remote_driver.c | 3 ++- src/rpc/virnetclientstream.c | 10 +++++++--- src/rpc/virnetclientstream.h | 3 ++- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/src/remote/remote_driver.c b/src/remote/remote_driver.c index 2dc21d2..7192f56 100644 --- a/src/remote/remote_driver.c +++ b/src/remote/remote_driver.c @@ -5255,7 +5255,8 @@ remoteStreamRecvFlags(virStreamPtr st, priv->client, data, nbytes, - (st->flags & VIR_STREAM_NONBLOCK)); + (st->flags & VIR_STREAM_NONBLOCK), + flags); VIR_DEBUG("Done %d", rv); diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index 0e982ba..c8c89ec 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -425,13 +425,17 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, virNetClientPtr client, char *data, size_t nbytes, - bool nonblock) + bool nonblock, + unsigned int flags) { int rv = -1; size_t want; - VIR_DEBUG("st=%p client=%p data=%p nbytes=%zu nonblock=%d", - st, client, data, nbytes, nonblock); + VIR_DEBUG("st=%p client=%p data=%p nbytes=%zu nonblock=%d flags=%x", + st, client, data, nbytes, nonblock, flags); + + virCheckFlags(0, -1); + virObjectLock(st); if (!st->rx && !st->incomingEOF) { virNetMessagePtr msg; diff --git a/src/rpc/virnetclientstream.h b/src/rpc/virnetclientstream.h index a648b7c..2835066 100644 --- a/src/rpc/virnetclientstream.h +++ b/src/rpc/virnetclientstream.h @@ -59,7 +59,8 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, virNetClientPtr client, char *data, size_t nbytes, - bool nonblock); + bool nonblock, + unsigned int flags); int virNetClientStreamSendSkip(virNetClientStreamPtr st, virNetClientPtr client, -- 2.8.4

This function will fetch previously processed stream holes and return their sum. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/libvirt_remote.syms | 1 + src/rpc/virnetclientstream.c | 15 +++++++++++++++ src/rpc/virnetclientstream.h | 4 ++++ 3 files changed, 20 insertions(+) diff --git a/src/libvirt_remote.syms b/src/libvirt_remote.syms index 16fde79..d843fc6 100644 --- a/src/libvirt_remote.syms +++ b/src/libvirt_remote.syms @@ -48,6 +48,7 @@ virNetClientStreamEOF; virNetClientStreamEventAddCallback; virNetClientStreamEventRemoveCallback; virNetClientStreamEventUpdateCallback; +virNetClientStreamHoleSize; virNetClientStreamMatches; virNetClientStreamNew; virNetClientStreamQueuePacket; diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index c8c89ec..37ce257 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -550,6 +550,21 @@ virNetClientStreamSendSkip(virNetClientStreamPtr st, } +int virNetClientStreamHoleSize(virNetClientPtr client ATTRIBUTE_UNUSED, + virNetClientStreamPtr st, + unsigned long long *length) +{ + int ret = st->skipLength; + + if (length) { + *length = st->skipLength; + st->skipLength = 0; + } + + return ret; +} + + int virNetClientStreamEventAddCallback(virNetClientStreamPtr st, int events, virNetClientStreamEventCallback cb, diff --git a/src/rpc/virnetclientstream.h b/src/rpc/virnetclientstream.h index 2835066..9caa091 100644 --- a/src/rpc/virnetclientstream.h +++ b/src/rpc/virnetclientstream.h @@ -66,6 +66,10 @@ int virNetClientStreamSendSkip(virNetClientStreamPtr st, virNetClientPtr client, unsigned long long length); +int virNetClientStreamHoleSize(virNetClientPtr client, + virNetClientStreamPtr st, + unsigned long long *length); + int virNetClientStreamEventAddCallback(virNetClientStreamPtr st, int events, virNetClientStreamEventCallback cb, -- 2.8.4

Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/remote/remote_driver.c | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/src/remote/remote_driver.c b/src/remote/remote_driver.c index 7192f56..445b7ce 100644 --- a/src/remote/remote_driver.c +++ b/src/remote/remote_driver.c @@ -5302,6 +5302,29 @@ remoteStreamSkip(virStreamPtr st, } +static int +remoteStreamHoleSize(virStreamPtr st, + unsigned long long *length) +{ + VIR_DEBUG("st=%p length=%p", st, length); + struct private_data *priv = st->conn->privateData; + virNetClientStreamPtr privst = st->privateData; + int rv; + + if (virNetClientStreamRaiseError(privst)) + return -1; + + remoteDriverLock(priv); + priv->localUses++; + remoteDriverUnlock(priv); + + rv = virNetClientStreamHoleSize(priv->client, privst, length); + + remoteDriverLock(priv); + priv->localUses--; + remoteDriverUnlock(priv); + return rv; +} struct remoteStreamCallbackData { virStreamPtr st; virStreamEventCallback cb; @@ -5477,6 +5500,7 @@ static virStreamDriver remoteStreamDrv = { .streamRecvFlags = remoteStreamRecvFlags, .streamSend = remoteStreamSend, .streamSkip = remoteStreamSkip, + .streamHoleSize = remoteStreamHoleSize, .streamFinish = remoteStreamFinish, .streamAbort = remoteStreamAbort, .streamEventAddCallback = remoteStreamEventAddCallback, -- 2.8.4

Whenever server sends a client stream packet (either regular with actual data or stream skip one) it is queued on @st->rx. So the list is a mixture of both types of stream packets. So now that we have all the helpers needed we can wire their processing up. But since virNetClientStreamRecvPacket doesn't support VIR_STREAM_RECV_STOP_AT_HOLE flag yet, let's turn all received skips into zeroes repeating requested times. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/rpc/virnetclientstream.c | 44 ++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 42 insertions(+), 2 deletions(-) diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index 37ce257..f63072e 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -298,6 +298,8 @@ int virNetClientStreamQueuePacket(virNetClientStreamPtr st, virObjectLock(st); + /* Don't distinguish VIR_NET_STREAM and VIR_NET_STREAM_SKIP + * here just yet. We want in order processing! */ virNetMessageQueuePush(&st->rx, tmp_msg); virNetClientStreamEventTimerUpdate(st); @@ -361,7 +363,7 @@ int virNetClientStreamSendPacket(virNetClientStreamPtr st, } -static int ATTRIBUTE_UNUSED +static int virNetClientStreamHandleSkip(virNetClientPtr client, virNetClientStreamPtr st) { @@ -437,6 +439,8 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, virCheckFlags(0, -1); virObjectLock(st); + + reread: if (!st->rx && !st->incomingEOF) { virNetMessagePtr msg; int ret; @@ -468,8 +472,44 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, } VIR_DEBUG("After IO rx=%p", st->rx); + + while (st->rx && + st->rx->header.type == VIR_NET_STREAM_SKIP) { + /* Handle skip sent to us by server. */ + + if (virNetClientStreamHandleSkip(client, st) < 0) + goto cleanup; + } + + if (!st->rx && !st->incomingEOF && !st->skipLength) { + if (nonblock) { + VIR_DEBUG("Non-blocking mode and no data available"); + rv = -2; + goto cleanup; + } + + /* We have consumed all packets from incoming queue but those + * were only skip packets, no data. Read the stream again. */ + goto reread; + } + want = nbytes; - while (want && st->rx) { + + if (st->skipLength) { + /* Pretend skipLength zeroes was read from stream. */ + size_t len = want; + + if (len > st->skipLength) + len = st->skipLength; + + memset(data, 0, len); + st->skipLength -= len; + want -= len; + } + + while (want && + st->rx && + st->rx->header.type == VIR_NET_STREAM) { virNetMessagePtr msg = st->rx; size_t len = want; -- 2.8.4

Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/remote/remote_driver.c | 2 +- src/rpc/virnetclientstream.c | 11 ++++++++++- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/src/remote/remote_driver.c b/src/remote/remote_driver.c index 445b7ce..0206510 100644 --- a/src/remote/remote_driver.c +++ b/src/remote/remote_driver.c @@ -5242,7 +5242,7 @@ remoteStreamRecvFlags(virStreamPtr st, virNetClientStreamPtr privst = st->privateData; int rv; - virCheckFlags(0, -1); + virCheckFlags(VIR_STREAM_RECV_STOP_AT_HOLE, -1); if (virNetClientStreamRaiseError(privst)) return -1; diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index f63072e..e8ef3a3 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -436,7 +436,7 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, VIR_DEBUG("st=%p client=%p data=%p nbytes=%zu nonblock=%d flags=%x", st, client, data, nbytes, nonblock, flags); - virCheckFlags(0, -1); + virCheckFlags(VIR_STREAM_RECV_STOP_AT_HOLE, -1); virObjectLock(st); @@ -499,6 +499,15 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, /* Pretend skipLength zeroes was read from stream. */ size_t len = want; + /* Yes, pretend unless we are asked not to. */ + if (flags & VIR_STREAM_RECV_STOP_AT_HOLE) { + virReportError(VIR_ERR_INTERNAL_ERROR, "%s", + _("Stream is in a hole")); + rv = -3; + goto cleanup; + } + + if (len > st->skipLength) len = st->skipLength; -- 2.8.4

Whenever client is able to receive some data from stream daemonStreamHandleRead is called. But now the behaviour of this function needs to be changed a bit. Previously it just read data from underlying file (of chardev or whatever) and sent those through the stream to client. This model will not work any longer because it does not differentiate whether underlying file is in data or hole section. Therefore, at the beginning of this function add code that checks this situation and acts accordingly. So after the this, when wanting to send some data we always check whether we are not in a hole and if so, skip it an inform client about its size. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- daemon/stream.c | 51 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/daemon/stream.c b/daemon/stream.c index f9c0ba1..a8ad8f7 100644 --- a/daemon/stream.c +++ b/daemon/stream.c @@ -797,6 +797,8 @@ daemonStreamHandleRead(virNetServerClientPtr client, size_t bufferLen = VIR_NET_MESSAGE_LEGACY_PAYLOAD_MAX; int ret = -1; int rv; + int inData = 0; + unsigned long long length; VIR_DEBUG("client=%p, stream=%p tx=%d closed=%d", client, stream, stream->tx, stream->closed); @@ -821,6 +823,54 @@ daemonStreamHandleRead(virNetServerClientPtr client, if (!(msg = virNetMessageNew(false))) goto cleanup; + if (stream->skippable) { + /* Handle skip. We want to send some data to the client. But we might + * be in a hole. Seek to next data. But if we are in data already, just + * carry on. */ + + rv = virStreamInData(stream->st, &inData, &length); + VIR_DEBUG("rv=%d inData=%d length=%llu", rv, inData, length); + + if (rv < 0) { + if (virNetServerProgramSendStreamError(remoteProgram, + client, + msg, + &rerr, + stream->procedure, + stream->serial) < 0) + goto cleanup; + msg = NULL; + + /* We're done with this call */ + goto done; + } else { + if (!inData && length) { + stream->tx = false; + msg->cb = daemonStreamMessageFinished; + msg->opaque = stream; + stream->refs++; + if (virNetServerProgramSendStreamSkip(remoteProgram, + client, + msg, + stream->procedure, + stream->serial, + length) < 0) + goto cleanup; + + msg = NULL; + + /* We have successfully sent stream skip to the other side. + * To keep streams in sync seek locally too. */ + virStreamSkip(stream->st, length); + /* We're done with this call */ + goto done; + } + } + + if (length < bufferLen) + bufferLen = length; + } + rv = virStreamRecv(stream->st, buffer, bufferLen); if (rv == -2) { /* Should never get this, since we're only called when we know @@ -852,6 +902,7 @@ daemonStreamHandleRead(virNetServerClientPtr client, msg = NULL; } + done: ret = 0; cleanup: VIR_FREE(buffer); -- 2.8.4

Implement virStreamSkip and virStreamInData callbacks. These callbacks do no magic, just skip a hole or detect whether we are in a data section of a file or in a hole and how much bytes can we read until section changes. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/fdstream.c | 48 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/src/fdstream.c b/src/fdstream.c index bebeac3..be94325 100644 --- a/src/fdstream.c +++ b/src/fdstream.c @@ -504,11 +504,59 @@ static int virFDStreamRead(virStreamPtr st, char *bytes, size_t nbytes) } +static int +virFDStreamSkip(virStreamPtr st, + unsigned long long length) +{ + virFDStreamDataPtr fdst = st->privateData; + off_t off; + int ret = -1; + + virObjectLock(fdst); + if (fdst->length) { + if (fdst->offset + length > fdst->length) { + virReportSystemError(ENOSPC, "%s", + _("cannot write to stream")); + goto cleanup; + } + fdst->offset += length; + } + + off = lseek(fdst->fd, length, SEEK_CUR); + if (off == (off_t) -1) { + virReportSystemError(errno, "%s", + _("unable to seek")); + goto cleanup; + } + ret = 0; + cleanup: + virObjectUnlock(fdst); + return ret; +} + + +static int +virFDStreamInData(virStreamPtr st, + int *inData, + unsigned long long *length) +{ + virFDStreamDataPtr fdst = st->privateData; + int ret = -1; + + virObjectLock(fdst); + ret = virFileInData(fdst->fd, inData, length); + virObjectUnlock(fdst); + return ret; +} + + static virStreamDriver virFDStreamDrv = { .streamSend = virFDStreamWrite, .streamRecv = virFDStreamRead, .streamFinish = virFDStreamClose, .streamAbort = virFDStreamAbort, + .streamSkip = virFDStreamSkip, + .streamInData = virFDStreamInData, .streamEventAddCallback = virFDStreamAddCallback, .streamEventUpdateCallback = virFDStreamUpdateCallback, .streamEventRemoveCallback = virFDStreamRemoveCallback -- 2.8.4

Now, not all APIs are going to support sparse streams. To some it makes no sense at all, e.g. virDomainOpenConsole() or virDomainOpenChannel(). To others, we will need a special flag to indicate that client wants to enable sparse streams. Instead of having to write RPC dispatchers by hand we can just annotate in our .x files that a certain flag to certain RPC call enables this feature. For instance: /** * @generate: both * @readstream: 1 * @sparseflag: VIR_SPARSE_STREAM * @acl: storage_vol:data_read */ REMOTE_PROC_DOMAIN_SOME_API = XXX, Therefore, whenever client calls virDomainSomeAPI(.., VIR_SPARSE_STREAM); daemon will mark that down and send stream skips when possible. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/rpc/gendispatch.pl | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/src/rpc/gendispatch.pl b/src/rpc/gendispatch.pl index 9862598..def88d4 100755 --- a/src/rpc/gendispatch.pl +++ b/src/rpc/gendispatch.pl @@ -281,6 +281,13 @@ while (<PROTOCOL>) { $calls{$name}->{streamflag} = "none"; } + if (exists $opts{sparseflag}) { + die "\@sparseflag requires stream" unless $calls{$name}->{streamflag} ne "none"; + $calls{$name}->{sparseflag} = $opts{sparseflag}; + } else { + $calls{$name}->{sparseflag} = "none"; + } + $calls{$name}->{acl} = $opts{acl}; $calls{$name}->{aclfilter} = $opts{aclfilter}; @@ -982,6 +989,11 @@ elsif ($mode eq "server") { if ($call->{streamflag} ne "none") { print " virStreamPtr st = NULL;\n"; print " daemonClientStreamPtr stream = NULL;\n"; + if ($call->{sparseflag} ne "none") { + print " const bool sparse = args->flags & $call->{sparseflag};\n" + } else { + print " const bool sparse = false;\n"; + } } print "\n"; @@ -1024,7 +1036,7 @@ elsif ($mode eq "server") { print " if (!(st = virStreamNew(priv->conn, VIR_STREAM_NONBLOCK)))\n"; print " goto cleanup;\n"; print "\n"; - print " if (!(stream = daemonCreateClientStream(client, st, remoteProgram, &msg->header, false)))\n"; + print " if (!(stream = daemonCreateClientStream(client, st, remoteProgram, &msg->header, sparse)))\n"; print " goto cleanup;\n"; print "\n"; } @@ -1727,6 +1739,11 @@ elsif ($mode eq "client") { if ($call->{streamflag} ne "none") { print " virNetClientStreamPtr netst = NULL;\n"; + if ($call->{sparseflag} ne "none") { + print " const bool sparse = flags & $call->{sparseflag};\n" + } else { + print " const bool sparse = false;\n"; + } } print "\n"; @@ -1738,7 +1755,7 @@ elsif ($mode eq "client") { if ($call->{streamflag} ne "none") { print "\n"; - print " if (!(netst = virNetClientStreamNew(st, priv->remoteProgram, $call->{constname}, priv->counter, false)))\n"; + print " if (!(netst = virNetClientStreamNew(st, priv->remoteProgram, $call->{constname}, priv->counter, sparse)))\n"; print " goto done;\n"; print "\n"; print " if (virNetClientAddStream(priv->client, netst) < 0) {\n"; -- 2.8.4

These flags to APIs will tell if caller wants to use sparse stream for storage transfer. At the same time, it's safe to enable them in storage driver frontend and rely on our backends checking the flags. This way we can enable specific flags only on some specific backends, e.g. enable VIR_STORAGE_VOL_DOWNLOAD_SPARSE_STREAM for filesystem backend but not iSCSI backend. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- include/libvirt/libvirt-storage.h | 9 +++++++++ src/libvirt-storage.c | 4 ++-- src/remote/remote_protocol.x | 2 ++ src/storage/storage_driver.c | 4 ++-- 4 files changed, 15 insertions(+), 4 deletions(-) diff --git a/include/libvirt/libvirt-storage.h b/include/libvirt/libvirt-storage.h index a67a97f..e08af50 100644 --- a/include/libvirt/libvirt-storage.h +++ b/include/libvirt/libvirt-storage.h @@ -337,11 +337,20 @@ virStorageVolPtr virStorageVolCreateXMLFrom (virStoragePoolPtr pool, const char *xmldesc, virStorageVolPtr clonevol, unsigned int flags); + +typedef enum { + VIR_STORAGE_VOL_DOWNLOAD_SPARSE_STREAM = 1 << 0, /* Use sparse stream */ +} virStorageVolDownloadFlags; + int virStorageVolDownload (virStorageVolPtr vol, virStreamPtr stream, unsigned long long offset, unsigned long long length, unsigned int flags); +typedef enum { + VIR_STORAGE_VOL_UPLOAD_SPARSE_STREAM = 1 << 0, /* Use sparse stream */ +} virStorageVolUploadFlags; + int virStorageVolUpload (virStorageVolPtr vol, virStreamPtr stream, unsigned long long offset, diff --git a/src/libvirt-storage.c b/src/libvirt-storage.c index cebe02f..5ab719f 100644 --- a/src/libvirt-storage.c +++ b/src/libvirt-storage.c @@ -1549,7 +1549,7 @@ virStorageVolCreateXMLFrom(virStoragePoolPtr pool, * @stream: stream to use as output * @offset: position in @vol to start reading from * @length: limit on amount of data to download - * @flags: extra flags; not used yet, so callers should always pass 0 + * @flags: bitwise-OR of virStorageVolDownloadFlags * * Download the content of the volume as a stream. If @length * is zero, then the remaining contents of the volume after @@ -1613,7 +1613,7 @@ virStorageVolDownload(virStorageVolPtr vol, * @stream: stream to use as input * @offset: position to start writing to * @length: limit on amount of data to upload - * @flags: extra flags; not used yet, so callers should always pass 0 + * @flags: bitwise-OR of virStorageVolUploadFlags * * Upload new content to the volume from a stream. This call * will fail if @offset + @length exceeds the size of the diff --git a/src/remote/remote_protocol.x b/src/remote/remote_protocol.x index d11bfdf..65dc2a8 100644 --- a/src/remote/remote_protocol.x +++ b/src/remote/remote_protocol.x @@ -4798,6 +4798,7 @@ enum remote_procedure { /** * @generate: both * @writestream: 1 + * @sparseflag: VIR_STORAGE_VOL_UPLOAD_SPARSE_STREAM * @acl: storage_vol:data_write */ REMOTE_PROC_STORAGE_VOL_UPLOAD = 208, @@ -4805,6 +4806,7 @@ enum remote_procedure { /** * @generate: both * @readstream: 1 + * @sparseflag: VIR_STORAGE_VOL_DOWNLOAD_SPARSE_STREAM * @acl: storage_vol:data_read */ REMOTE_PROC_STORAGE_VOL_DOWNLOAD = 209, diff --git a/src/storage/storage_driver.c b/src/storage/storage_driver.c index e2d729f..a153a59 100644 --- a/src/storage/storage_driver.c +++ b/src/storage/storage_driver.c @@ -2224,7 +2224,7 @@ storageVolDownload(virStorageVolPtr obj, virStorageVolDefPtr vol = NULL; int ret = -1; - virCheckFlags(0, -1); + virCheckFlags(VIR_STORAGE_VOL_DOWNLOAD_SPARSE_STREAM, -1); if (!(vol = virStorageVolDefFromVol(obj, &pool, &backend))) return -1; @@ -2394,7 +2394,7 @@ storageVolUpload(virStorageVolPtr obj, virStorageVolStreamInfoPtr cbdata = NULL; int ret = -1; - virCheckFlags(0, -1); + virCheckFlags(VIR_STORAGE_VOL_UPLOAD_SPARSE_STREAM, -1); if (!(vol = virStorageVolDefFromVol(obj, &pool, &backend))) return -1; -- 2.8.4

The command grew new --sparse switch that does nothing more than enables the sparse streams feature for this command. Among with the switch new helper function is introduced: virshStreamSkip(). This is the callback that is called whenever daemon sends us a hole. In the callback we reflect the hole in underlying file by seeking as many bytes as told. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- tools/virsh-volume.c | 12 ++++++++++-- tools/virsh.c | 8 ++++++++ tools/virsh.h | 3 +++ tools/virsh.pod | 3 ++- 4 files changed, 23 insertions(+), 3 deletions(-) diff --git a/tools/virsh-volume.c b/tools/virsh-volume.c index d35fee1..27744c7 100644 --- a/tools/virsh-volume.c +++ b/tools/virsh-volume.c @@ -762,6 +762,10 @@ static const vshCmdOptDef opts_vol_download[] = { .type = VSH_OT_INT, .help = N_("amount of data to download") }, + {.name = "sparse", + .type = VSH_OT_BOOL, + .help = N_("preserve sparseness of volume") + }, {.name = NULL} }; @@ -777,6 +781,7 @@ cmdVolDownload(vshControl *ctl, const vshCmd *cmd) unsigned long long offset = 0, length = 0; bool created = false; virshControlPtr priv = ctl->privData; + unsigned int flags = 0; if (vshCommandOptULongLong(ctl, cmd, "offset", &offset) < 0) return false; @@ -790,6 +795,9 @@ cmdVolDownload(vshControl *ctl, const vshCmd *cmd) if (vshCommandOptStringReq(ctl, cmd, "file", &file) < 0) goto cleanup; + if (vshCommandOptBool(cmd, "sparse")) + flags |= VIR_STORAGE_VOL_DOWNLOAD_SPARSE_STREAM; + if ((fd = open(file, O_WRONLY|O_CREAT|O_EXCL, 0666)) < 0) { if (errno != EEXIST || (fd = open(file, O_WRONLY|O_TRUNC, 0666)) < 0) { @@ -805,12 +813,12 @@ cmdVolDownload(vshControl *ctl, const vshCmd *cmd) goto cleanup; } - if (virStorageVolDownload(vol, st, offset, length, 0) < 0) { + if (virStorageVolDownload(vol, st, offset, length, flags) < 0) { vshError(ctl, _("cannot download from volume %s"), name); goto cleanup; } - if (virStreamRecvAll(st, virshStreamSink, &fd) < 0) { + if (virStreamSparseRecvAll(st, virshStreamSink, virshStreamSkip, &fd) < 0) { vshError(ctl, _("cannot receive data from volume %s"), name); goto cleanup; } diff --git a/tools/virsh.c b/tools/virsh.c index 5dc482d..080d675 100644 --- a/tools/virsh.c +++ b/tools/virsh.c @@ -263,6 +263,14 @@ int virshStreamSink(virStreamPtr st ATTRIBUTE_UNUSED, return safewrite(*fd, bytes, nbytes); } +int virshStreamSkip(virStreamPtr st ATTRIBUTE_UNUSED, + unsigned long long offset, void *opaque) +{ + int *fd = opaque; + + return lseek(*fd, offset, SEEK_CUR) == (off_t) -1 ? -1 : 0; +} + /* --------------- * Command Connect * --------------- diff --git a/tools/virsh.h b/tools/virsh.h index fd552bb..5c382ef 100644 --- a/tools/virsh.h +++ b/tools/virsh.h @@ -150,4 +150,7 @@ int virshDomainState(vshControl *ctl, virDomainPtr dom, int *reason); int virshStreamSink(virStreamPtr st, const char *bytes, size_t nbytes, void *opaque); +int virshStreamSkip(virStreamPtr st, + unsigned long long offset, void *opaque); + #endif /* VIRSH_H */ diff --git a/tools/virsh.pod b/tools/virsh.pod index 601cb44..e04921d 100644 --- a/tools/virsh.pod +++ b/tools/virsh.pod @@ -3627,12 +3627,13 @@ regarding possible target volume and pool changes as a result of the pool refresh when the upload is attempted. =item B<vol-download> [I<--pool> I<pool-or-uuid>] [I<--offset> I<bytes>] -[I<--length> I<bytes>] I<vol-name-or-key-or-path> I<local-file> +[I<--length> I<bytes>] [I<--sparse>] I<vol-name-or-key-or-path> I<local-file> Download the contents of a storage volume to I<local-file>. I<--pool> I<pool-or-uuid> is the name or UUID of the storage pool the volume is in. I<vol-name-or-key-or-path> is the name or key or path of the volume to download. +If I<--sparse> is specified, this command will preserve volume sparseness. I<--offset> is the position in the storage volume at which to start reading the data. The value must be 0 or larger. I<--length> is an upper bound of the amount of data to be downloaded. A negative value is interpreted as -- 2.8.4

Similarly to previous commit, implement sparse streams feature for vol-upload. This is, however, slightly different approach, because we must implement a function that will tell us whether we are in a data section or in a hole. But there's no magic hidden in here. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- tools/virsh-volume.c | 37 ++++++++++++++++++++++++------------- tools/virsh.c | 28 ++++++++++++++++++++++++++++ tools/virsh.h | 14 ++++++++++++++ tools/virsh.pod | 3 ++- 4 files changed, 68 insertions(+), 14 deletions(-) diff --git a/tools/virsh-volume.c b/tools/virsh-volume.c index 27744c7..d6434d5 100644 --- a/tools/virsh-volume.c +++ b/tools/virsh-volume.c @@ -659,18 +659,13 @@ static const vshCmdOptDef opts_vol_upload[] = { .type = VSH_OT_INT, .help = N_("amount of data to upload") }, + {.name = "sparse", + .type = VSH_OT_BOOL, + .help = N_("preserve sparseness of volume") + }, {.name = NULL} }; -static int -cmdVolUploadSource(virStreamPtr st ATTRIBUTE_UNUSED, - char *bytes, size_t nbytes, void *opaque) -{ - int *fd = opaque; - - return saferead(*fd, bytes, nbytes); -} - static bool cmdVolUpload(vshControl *ctl, const vshCmd *cmd) { @@ -682,6 +677,8 @@ cmdVolUpload(vshControl *ctl, const vshCmd *cmd) const char *name = NULL; unsigned long long offset = 0, length = 0; virshControlPtr priv = ctl->privData; + unsigned int flags = 0; + virshStreamCallbackData cbData; if (vshCommandOptULongLong(ctl, cmd, "offset", &offset) < 0) return false; @@ -700,19 +697,33 @@ cmdVolUpload(vshControl *ctl, const vshCmd *cmd) goto cleanup; } + cbData.ctl = ctl; + cbData.fd = fd; + + if (vshCommandOptBool(cmd, "sparse")) + flags |= VIR_STORAGE_VOL_UPLOAD_SPARSE_STREAM; + if (!(st = virStreamNew(priv->conn, 0))) { vshError(ctl, _("cannot create a new stream")); goto cleanup; } - if (virStorageVolUpload(vol, st, offset, length, 0) < 0) { + if (virStorageVolUpload(vol, st, offset, length, flags) < 0) { vshError(ctl, _("cannot upload to volume %s"), name); goto cleanup; } - if (virStreamSendAll(st, cmdVolUploadSource, &fd) < 0) { - vshError(ctl, _("cannot send data to volume %s"), name); - goto cleanup; + if (flags & VIR_STORAGE_VOL_UPLOAD_SPARSE_STREAM) { + if (virStreamSparseSendAll(st, virshStreamSource, + virshStreamInData, &cbData) < 0) { + vshError(ctl, _("cannot send data to volume %s"), name); + goto cleanup; + } + } else { + if (virStreamSendAll(st, virshStreamSource, &cbData) < 0) { + vshError(ctl, _("cannot send data to volume %s"), name); + goto cleanup; + } } if (VIR_CLOSE(fd) < 0) { diff --git a/tools/virsh.c b/tools/virsh.c index 080d675..18e899e 100644 --- a/tools/virsh.c +++ b/tools/virsh.c @@ -263,6 +263,17 @@ int virshStreamSink(virStreamPtr st ATTRIBUTE_UNUSED, return safewrite(*fd, bytes, nbytes); } +int +virshStreamSource(virStreamPtr st ATTRIBUTE_UNUSED, + char *bytes, size_t nbytes, void *opaque) +{ + virshStreamCallbackDataPtr cbData = opaque; + int fd = cbData->fd; + + return saferead(fd, bytes, nbytes); +} + + int virshStreamSkip(virStreamPtr st ATTRIBUTE_UNUSED, unsigned long long offset, void *opaque) { @@ -271,6 +282,23 @@ int virshStreamSkip(virStreamPtr st ATTRIBUTE_UNUSED, return lseek(*fd, offset, SEEK_CUR) == (off_t) -1 ? -1 : 0; } +int virshStreamInData(virStreamPtr st ATTRIBUTE_UNUSED, + int *inData, + unsigned long long *offset, + void *opaque) +{ + virshStreamCallbackDataPtr cbData = opaque; + vshControl *ctl = cbData->ctl; + int fd = cbData->fd; + int ret; + + if ((ret = virFileInData(fd, inData, offset)) < 0) + vshError(ctl, "%s", _("Unable to get current position in stream")); + + return ret; +} + + /* --------------- * Command Connect * --------------- diff --git a/tools/virsh.h b/tools/virsh.h index 5c382ef..1362819 100644 --- a/tools/virsh.h +++ b/tools/virsh.h @@ -150,7 +150,21 @@ int virshDomainState(vshControl *ctl, virDomainPtr dom, int *reason); int virshStreamSink(virStreamPtr st, const char *bytes, size_t nbytes, void *opaque); +int virshStreamSource(virStreamPtr st, + char *bytes, size_t nbytes, void *opaque); + int virshStreamSkip(virStreamPtr st, unsigned long long offset, void *opaque); +typedef struct _virshStreamCallbackData virshStreamCallbackData; +typedef virshStreamCallbackData *virshStreamCallbackDataPtr; +struct _virshStreamCallbackData { + vshControl *ctl; + int fd; +}; + +int virshStreamInData(virStreamPtr st, + int *data, + unsigned long long *offset, + void *opaque); #endif /* VIRSH_H */ diff --git a/tools/virsh.pod b/tools/virsh.pod index e04921d..ba5567f 100644 --- a/tools/virsh.pod +++ b/tools/virsh.pod @@ -3609,13 +3609,14 @@ the storage volume should be deleted as well. Not all storage drivers support this option, presently only rbd. =item B<vol-upload> [I<--pool> I<pool-or-uuid>] [I<--offset> I<bytes>] -[I<--length> I<bytes>] I<vol-name-or-key-or-path> I<local-file> +[I<--length> I<bytes>] [I<--sparse>] I<vol-name-or-key-or-path> I<local-file> Upload the contents of I<local-file> to a storage volume. I<--pool> I<pool-or-uuid> is the name or UUID of the storage pool the volume is in. I<vol-name-or-key-or-path> is the name or key or path of the volume where the file will be uploaded. +If I<--sparse> is specified, this command will preserve volume sparseness. I<--offset> is the position in the storage volume at which to start writing the data. The value must be 0 or larger. I<--length> is an upper bound of the amount of data to be uploaded. A negative value is interpreted -- 2.8.4

While virStreamInData() should be a small and quick function, in our implementation it seeks multiple times. Moreover, it is called even if we know that we are in data. Well, quite. If we track its return values and do some basic math with them, we can end up calling virStreamInData right at the boundaries of a data section or a hole and nowhere else. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- daemon/stream.c | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/daemon/stream.c b/daemon/stream.c index a8ad8f7..22d7cf7 100644 --- a/daemon/stream.c +++ b/daemon/stream.c @@ -54,6 +54,7 @@ struct daemonClientStream { bool tx; bool skippable; + size_t dataLen; /* How much data is there remaining until we see a hole */ daemonClientStreamPtr next; }; @@ -823,7 +824,7 @@ daemonStreamHandleRead(virNetServerClientPtr client, if (!(msg = virNetMessageNew(false))) goto cleanup; - if (stream->skippable) { + if (stream->skippable && !stream->dataLen) { /* Handle skip. We want to send some data to the client. But we might * be in a hole. Seek to next data. But if we are in data already, just * carry on. */ @@ -867,10 +868,13 @@ daemonStreamHandleRead(virNetServerClientPtr client, } } - if (length < bufferLen) - bufferLen = length; + stream->dataLen = length; } + if (stream->skippable && + bufferLen > stream->dataLen) + bufferLen = stream->dataLen; + rv = virStreamRecv(stream->st, buffer, bufferLen); if (rv == -2) { /* Should never get this, since we're only called when we know @@ -885,6 +889,8 @@ daemonStreamHandleRead(virNetServerClientPtr client, goto cleanup; msg = NULL; } else { + stream->dataLen -= rv; + stream->tx = false; if (rv == 0) stream->recvEOF = true; -- 2.8.4

Now that we have everything prepared, let's enable the feature for these two APIs. Well, except we can't. Not just yet. Because of the previous commit. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/storage/storage_backend.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/storage/storage_backend.c b/src/storage/storage_backend.c index d041530..19d5cd5 100644 --- a/src/storage/storage_backend.c +++ b/src/storage/storage_backend.c @@ -2030,7 +2030,8 @@ virStorageBackendVolUploadLocal(virConnectPtr conn ATTRIBUTE_UNUSED, int ret = -1; int has_snap = 0; - virCheckFlags(0, -1); + virCheckFlags(VIR_STORAGE_VOL_UPLOAD_SPARSE_STREAM, -1); + /* if volume has target format VIR_STORAGE_FILE_PLOOP * we need to restore DiskDescriptor.xml, according to * new contents of volume. This operation will be perfomed @@ -2076,7 +2077,8 @@ virStorageBackendVolDownloadLocal(virConnectPtr conn ATTRIBUTE_UNUSED, int ret = -1; int has_snap = 0; - virCheckFlags(0, -1); + virCheckFlags(VIR_STORAGE_VOL_DOWNLOAD_SPARSE_STREAM, -1); + if (vol->target.format == VIR_STORAGE_FILE_PLOOP) { has_snap = virStorageBackendPloopHasSnapshots(vol->target.path); if (has_snap < 0) { -- 2.8.4

When using iohelper and sparse streams at once, we need a protocol between iohelper and daemon to tell the other side to represent received data in some fashion (e.g. whether it is raw data, or a size of hole). In this patch, virFDStreamDrv tracks whether the assumption from the first line stands. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/fdstream.c | 32 +++++++++++++++++++++----------- src/fdstream.h | 3 ++- src/storage/storage_backend.c | 6 ++++-- 3 files changed, 27 insertions(+), 14 deletions(-) diff --git a/src/fdstream.c b/src/fdstream.c index be94325..911fac0 100644 --- a/src/fdstream.c +++ b/src/fdstream.c @@ -60,6 +60,7 @@ struct virFDStreamData { virCommandPtr cmd; unsigned long long offset; unsigned long long length; + bool formatted; /* True if formatted messages are read from/written to @fd. */ int watch; int events; /* events the stream callback is subscribed for */ @@ -566,7 +567,8 @@ static int virFDStreamOpenInternal(virStreamPtr st, int fd, virCommandPtr cmd, int errfd, - unsigned long long length) + unsigned long long length, + bool formatted) { virFDStreamDataPtr fdst; @@ -589,6 +591,7 @@ static int virFDStreamOpenInternal(virStreamPtr st, fdst->cmd = cmd; fdst->errfd = errfd; fdst->length = length; + fdst->formatted = formatted; st->driver = &virFDStreamDrv; st->privateData = fdst; @@ -600,7 +603,7 @@ static int virFDStreamOpenInternal(virStreamPtr st, int virFDStreamOpen(virStreamPtr st, int fd) { - return virFDStreamOpenInternal(st, fd, NULL, -1, 0); + return virFDStreamOpenInternal(st, fd, NULL, -1, 0, false); } @@ -646,7 +649,7 @@ int virFDStreamConnectUNIX(virStreamPtr st, goto error; } - if (virFDStreamOpenInternal(st, fd, NULL, -1, 0) < 0) + if (virFDStreamOpenInternal(st, fd, NULL, -1, 0, false) < 0) goto error; return 0; @@ -672,7 +675,8 @@ virFDStreamOpenFileInternal(virStreamPtr st, unsigned long long length, int oflags, int mode, - bool forceIOHelper) + bool forceIOHelper, + bool sparse) { int fd = -1; int childfd = -1; @@ -680,6 +684,7 @@ virFDStreamOpenFileInternal(virStreamPtr st, virCommandPtr cmd = NULL; int errfd = -1; char *iohelper_path = NULL; + bool formatted = false; VIR_DEBUG("st=%p path=%s oflags=%x offset=%llu length=%llu mode=%o", st, path, oflags, offset, length, mode); @@ -750,6 +755,10 @@ virFDStreamOpenFileInternal(virStreamPtr st, virCommandPassFD(cmd, fd, VIR_COMMAND_PASS_FD_CLOSE_PARENT); virCommandAddArgFormat(cmd, "%d", fd); + if (sparse) { + virCommandAddArg(cmd, "sparse"); + formatted = true; + } if ((oflags & O_ACCMODE) == O_RDONLY) { childfd = fds[1]; @@ -768,7 +777,7 @@ virFDStreamOpenFileInternal(virStreamPtr st, VIR_FORCE_CLOSE(childfd); } - if (virFDStreamOpenInternal(st, fd, cmd, errfd, length) < 0) + if (virFDStreamOpenInternal(st, fd, cmd, errfd, length, formatted) < 0) goto error; return 0; @@ -798,7 +807,7 @@ int virFDStreamOpenFile(virStreamPtr st, } return virFDStreamOpenFileInternal(st, path, offset, length, - oflags, 0, false); + oflags, 0, false, false); } int virFDStreamCreateFile(virStreamPtr st, @@ -811,7 +820,7 @@ int virFDStreamCreateFile(virStreamPtr st, return virFDStreamOpenFileInternal(st, path, offset, length, oflags | O_CREAT, mode, - false); + false, false); } #ifdef HAVE_CFMAKERAW @@ -827,7 +836,7 @@ int virFDStreamOpenPTY(virStreamPtr st, if (virFDStreamOpenFileInternal(st, path, offset, length, oflags | O_CREAT, 0, - false) < 0) + false, false) < 0) return -1; fdst = st->privateData; @@ -864,7 +873,7 @@ int virFDStreamOpenPTY(virStreamPtr st, return virFDStreamOpenFileInternal(st, path, offset, length, oflags | O_CREAT, 0, - false); + false, false); } #endif /* !HAVE_CFMAKERAW */ @@ -872,11 +881,12 @@ int virFDStreamOpenBlockDevice(virStreamPtr st, const char *path, unsigned long long offset, unsigned long long length, - int oflags) + int oflags, + bool sparse) { return virFDStreamOpenFileInternal(st, path, offset, length, - oflags, 0, true); + oflags, 0, true, sparse); } int virFDStreamSetInternalCloseCb(virStreamPtr st, diff --git a/src/fdstream.h b/src/fdstream.h index 2c913ea..bfdebc2 100644 --- a/src/fdstream.h +++ b/src/fdstream.h @@ -60,7 +60,8 @@ int virFDStreamOpenBlockDevice(virStreamPtr st, const char *path, unsigned long long offset, unsigned long long length, - int oflags); + int oflags, + bool sparse); int virFDStreamSetInternalCloseCb(virStreamPtr st, virFDStreamInternalCloseCb cb, diff --git a/src/storage/storage_backend.c b/src/storage/storage_backend.c index 19d5cd5..fac5b64 100644 --- a/src/storage/storage_backend.c +++ b/src/storage/storage_backend.c @@ -2029,6 +2029,7 @@ virStorageBackendVolUploadLocal(virConnectPtr conn ATTRIBUTE_UNUSED, char *target_path = vol->target.path; int ret = -1; int has_snap = 0; + bool sparse = flags & VIR_STORAGE_VOL_UPLOAD_SPARSE_STREAM; virCheckFlags(VIR_STORAGE_VOL_UPLOAD_SPARSE_STREAM, -1); @@ -2056,7 +2057,7 @@ virStorageBackendVolUploadLocal(virConnectPtr conn ATTRIBUTE_UNUSED, /* Not using O_CREAT because the file is required to already exist at * this point */ ret = virFDStreamOpenBlockDevice(stream, target_path, - offset, len, O_WRONLY); + offset, len, O_WRONLY, sparse); cleanup: VIR_FREE(path); @@ -2076,6 +2077,7 @@ virStorageBackendVolDownloadLocal(virConnectPtr conn ATTRIBUTE_UNUSED, char *target_path = vol->target.path; int ret = -1; int has_snap = 0; + bool sparse = flags & VIR_STORAGE_VOL_DOWNLOAD_SPARSE_STREAM; virCheckFlags(VIR_STORAGE_VOL_DOWNLOAD_SPARSE_STREAM, -1); @@ -2095,7 +2097,7 @@ virStorageBackendVolDownloadLocal(virConnectPtr conn ATTRIBUTE_UNUSED, } ret = virFDStreamOpenBlockDevice(stream, target_path, - offset, len, O_RDONLY); + offset, len, O_RDONLY, sparse); cleanup: VIR_FREE(path); -- 2.8.4

As described in the previous commit, we need to handle communication with iohelper differently in some cases. This patch creates function stubs for that. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/fdstream.c | 83 +++++++++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 71 insertions(+), 12 deletions(-) diff --git a/src/fdstream.c b/src/fdstream.c index 911fac0..8ab7cd5 100644 --- a/src/fdstream.c +++ b/src/fdstream.c @@ -399,6 +399,21 @@ virFDStreamAbort(virStreamPtr st) return virFDStreamCloseInt(st, true); } +static ssize_t +virFDStreamWriteInternal(virFDStreamDataPtr fdst, + const char *bytes, + size_t nbytes) +{ + if (fdst->formatted) { + virReportError(VIR_ERR_INTERNAL_ERROR, "%s", + _("sparse stream not supported")); + return -1; + } else { + return write(fdst->fd, bytes, nbytes); + } +} + + static int virFDStreamWrite(virStreamPtr st, const char *bytes, size_t nbytes) { virFDStreamDataPtr fdst = st->privateData; @@ -431,7 +446,7 @@ static int virFDStreamWrite(virStreamPtr st, const char *bytes, size_t nbytes) } retry: - ret = write(fdst->fd, bytes, nbytes); + ret = virFDStreamWriteInternal(fdst, bytes, nbytes); if (ret < 0) { VIR_WARNINGS_NO_WLOGICALOP_EQUAL_EXPR if (errno == EAGAIN || errno == EWOULDBLOCK) { @@ -453,6 +468,21 @@ static int virFDStreamWrite(virStreamPtr st, const char *bytes, size_t nbytes) } +static ssize_t +virFDStreamReadInternal(virFDStreamDataPtr fdst, + char *bytes, + size_t nbytes) +{ + if (fdst->formatted) { + virReportError(VIR_ERR_INTERNAL_ERROR, "%s", + _("sparse stream not supported")); + return -1; + } else { + return read(fdst->fd, bytes, nbytes); + } +} + + static int virFDStreamRead(virStreamPtr st, char *bytes, size_t nbytes) { virFDStreamDataPtr fdst = st->privateData; @@ -483,7 +513,7 @@ static int virFDStreamRead(virStreamPtr st, char *bytes, size_t nbytes) } retry: - ret = read(fdst->fd, bytes, nbytes); + ret = virFDStreamReadInternal(fdst, bytes, nbytes); if (ret < 0) { VIR_WARNINGS_NO_WLOGICALOP_EQUAL_EXPR if (errno == EAGAIN || errno == EWOULDBLOCK) { @@ -506,11 +536,26 @@ static int virFDStreamRead(virStreamPtr st, char *bytes, size_t nbytes) static int +virFDStreamSkipInternal(virFDStreamDataPtr fdst, + unsigned long long length) +{ + off_t off; + if (fdst->formatted) { + virReportError(VIR_ERR_INTERNAL_ERROR, "%s", + _("sparse stream not supported")); + return -1; + } else { + off = lseek(fdst->fd, length, SEEK_CUR); + return off == (off_t) -1 ? -1 : 0; + } +} + + +static int virFDStreamSkip(virStreamPtr st, unsigned long long length) { virFDStreamDataPtr fdst = st->privateData; - off_t off; int ret = -1; virObjectLock(fdst); @@ -520,15 +565,13 @@ virFDStreamSkip(virStreamPtr st, _("cannot write to stream")); goto cleanup; } + } + + if (virFDStreamSkipInternal(fdst, length) < 0) + goto cleanup; + + if (fdst->length) fdst->offset += length; - } - - off = lseek(fdst->fd, length, SEEK_CUR); - if (off == (off_t) -1) { - virReportSystemError(errno, "%s", - _("unable to seek")); - goto cleanup; - } ret = 0; cleanup: virObjectUnlock(fdst); @@ -536,6 +579,22 @@ virFDStreamSkip(virStreamPtr st, } + +static int +virFDStreamInDataInternal(virFDStreamDataPtr fdst, + int *inData, + unsigned long long *length) +{ + if (fdst->formatted) { + virReportError(VIR_ERR_INTERNAL_ERROR, "%s", + _("sparse stream not supported")); + return -1; + } else { + return virFileInData(fdst->fd, inData, length); + } +} + + static int virFDStreamInData(virStreamPtr st, int *inData, @@ -545,7 +604,7 @@ virFDStreamInData(virStreamPtr st, int ret = -1; virObjectLock(fdst); - ret = virFileInData(fdst->fd, inData, length); + ret = virFDStreamInDataInternal(fdst, inData, length); virObjectUnlock(fdst); return ret; } -- 2.8.4

This patch does nothing more than introducing a skeleton for formatted messages between the daemon on iohelper. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/Makefile.am | 20 +++++++++++++++++++- src/iohelper/iohelper_message.c | 24 ++++++++++++++++++++++++ src/iohelper/iohelper_message.h | 26 ++++++++++++++++++++++++++ 3 files changed, 69 insertions(+), 1 deletion(-) create mode 100644 src/iohelper/iohelper_message.c create mode 100644 src/iohelper/iohelper_message.h diff --git a/src/Makefile.am b/src/Makefile.am index 3409631..af5ef72 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -994,6 +994,10 @@ STORAGE_HELPER_DISK_SOURCES = \ IO_HELPER_SOURCES = \ iohelper/iohelper.c +IO_HELPER_LIB_SOURCES = \ + iohelper/iohelper_message.c \ + iohelper/iohelper_message.h + NETWORK_LEASES_HELPER_SOURCES = \ network/leaseshelper.c @@ -2842,6 +2846,18 @@ libvirt_net_rpc_client_la_LIBADD = \ libexec_PROGRAMS = if WITH_LIBVIRTD +noinst_LTLIBRARIES += libvirt-iohelper.la +libvirt_iohelper_la_SOURCES = $(IO_HELPER_LIB_SOURCES) +libvirt_iohelper_la_LDFLAGS = \ + $(AM_LDFLAGS) \ + $(NULL) +libvirt_iohelper_la_CFLAGS = \ + -I$(srcdir)/util \ + -I$(srcdir)/rpc \ + $(AM_CFLAGS) \ + $(XDR_CFLAGS) \ + $(NULL) + libexec_PROGRAMS += libvirt_iohelper libvirt_iohelper_SOURCES = $(IO_HELPER_SOURCES) libvirt_iohelper_LDFLAGS = \ @@ -2882,7 +2898,9 @@ else ! WITH_NETWORK EXTRA_DIST += $(NETWORK_LEASES_HELPER_SOURCES) endif ! WITH_NETWORK -endif WITH_LIBVIRTD +else ! WITH_LIBVIRTD +EXTRA_DIST += $(IO_HELPER_LIB_SOURCES) +endif ! WITH_LIBVIRTD if WITH_STORAGE_DISK if WITH_LIBVIRTD diff --git a/src/iohelper/iohelper_message.c b/src/iohelper/iohelper_message.c new file mode 100644 index 0000000..54b9355 --- /dev/null +++ b/src/iohelper/iohelper_message.c @@ -0,0 +1,24 @@ +/* + * iohelper_message.c: Formatted messages between iohelper and us + * + * Copyright (C) 2016 Red Hat, Inc. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library. If not, see + * <http://www.gnu.org/licenses/>. + * + */ + +#include <config.h> + +#include "iohelper_message.h" diff --git a/src/iohelper/iohelper_message.h b/src/iohelper/iohelper_message.h new file mode 100644 index 0000000..791a645 --- /dev/null +++ b/src/iohelper/iohelper_message.h @@ -0,0 +1,26 @@ +/* + * iohelper_message.h: Formatted messages between iohelper and us + * + * Copyright (C) 2016 Red Hat, Inc. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library. If not, see + * <http://www.gnu.org/licenses/>. + * + */ + + +#ifndef __VIR_IOHELPER_MESSAGE_H__ +# define __VIR_IOHELPER_MESSAGE_H__ + +#endif /* __VIR_IOHELPER_H__ */ -- 2.8.4

And switch fdstream to call them. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- po/POTFILES.in | 1 + src/Makefile.am | 5 ++- src/fdstream.c | 29 +++++++------ src/iohelper/iohelper_message.c | 94 +++++++++++++++++++++++++++++++++++++++++ src/iohelper/iohelper_message.h | 26 ++++++++++++ 5 files changed, 139 insertions(+), 16 deletions(-) diff --git a/po/POTFILES.in b/po/POTFILES.in index 9f4866c..7f40200 100644 --- a/po/POTFILES.in +++ b/po/POTFILES.in @@ -64,6 +64,7 @@ src/interface/interface_backend_netcf.c src/interface/interface_backend_udev.c src/internal.h src/iohelper/iohelper.c +src/iohelper/iohelper_message.c src/libvirt-admin.c src/libvirt-domain-snapshot.c src/libvirt-domain.c diff --git a/src/Makefile.am b/src/Makefile.am index af5ef72..1cce603 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -1170,9 +1170,10 @@ libvirt_driver_la_SOURCES = $(DRIVER_SOURCES) libvirt_driver_la_CFLAGS = \ $(GNUTLS_CFLAGS) $(CURL_CFLAGS) \ - -I$(srcdir)/conf $(AM_CFLAGS) + -I$(srcdir)/conf -I$(srcdir)/iohelper $(AM_CFLAGS) libvirt_driver_la_LIBADD = \ - $(GNUTLS_LIBS) $(CURL_LIBS) $(DLOPEN_LIBS) + $(GNUTLS_LIBS) $(CURL_LIBS) $(DLOPEN_LIBS) \ + libvirt-iohelper.la # All .syms files should be placed in exactly one of these three lists, # depending on whether they are stored in git and/or used in the build. diff --git a/src/fdstream.c b/src/fdstream.c index 8ab7cd5..dc41164 100644 --- a/src/fdstream.c +++ b/src/fdstream.c @@ -44,6 +44,7 @@ #include "virstring.h" #include "virtime.h" #include "virprocess.h" +#include "iohelper_message.h" #define VIR_FROM_THIS VIR_FROM_STREAMS @@ -61,6 +62,7 @@ struct virFDStreamData { unsigned long long offset; unsigned long long length; bool formatted; /* True if formatted messages are read from/written to @fd. */ + iohelperCtlPtr ioCtl; int watch; int events; /* events the stream callback is subscribed for */ @@ -89,7 +91,7 @@ virFDStreamDataDispose(void *obj) { virFDStreamDataPtr fdst = obj; - VIR_DEBUG("obj=%p", fdst); + virObjectUnref(fdst->ioCtl); } static int virFDStreamDataOnceInit(void) @@ -405,9 +407,7 @@ virFDStreamWriteInternal(virFDStreamDataPtr fdst, size_t nbytes) { if (fdst->formatted) { - virReportError(VIR_ERR_INTERNAL_ERROR, "%s", - _("sparse stream not supported")); - return -1; + return iohelperWrite(fdst->ioCtl, bytes, nbytes); } else { return write(fdst->fd, bytes, nbytes); } @@ -474,9 +474,7 @@ virFDStreamReadInternal(virFDStreamDataPtr fdst, size_t nbytes) { if (fdst->formatted) { - virReportError(VIR_ERR_INTERNAL_ERROR, "%s", - _("sparse stream not supported")); - return -1; + return iohelperRead(fdst->ioCtl, bytes, nbytes); } else { return read(fdst->fd, bytes, nbytes); } @@ -541,9 +539,7 @@ virFDStreamSkipInternal(virFDStreamDataPtr fdst, { off_t off; if (fdst->formatted) { - virReportError(VIR_ERR_INTERNAL_ERROR, "%s", - _("sparse stream not supported")); - return -1; + return iohelperSkip(fdst->ioCtl, length); } else { off = lseek(fdst->fd, length, SEEK_CUR); return off == (off_t) -1 ? -1 : 0; @@ -586,9 +582,7 @@ virFDStreamInDataInternal(virFDStreamDataPtr fdst, unsigned long long *length) { if (fdst->formatted) { - virReportError(VIR_ERR_INTERNAL_ERROR, "%s", - _("sparse stream not supported")); - return -1; + return iohelperInData(fdst->ioCtl, inData, length); } else { return virFileInData(fdst->fd, inData, length); } @@ -651,11 +645,18 @@ static int virFDStreamOpenInternal(virStreamPtr st, fdst->errfd = errfd; fdst->length = length; fdst->formatted = formatted; + if (formatted && + !(fdst->ioCtl = iohelperCtlNew(fd))) + goto error; + st->driver = &virFDStreamDrv; st->privateData = fdst; - return 0; + + error: + virObjectUnref(fdst); + return -1; } diff --git a/src/iohelper/iohelper_message.c b/src/iohelper/iohelper_message.c index 54b9355..51b283d 100644 --- a/src/iohelper/iohelper_message.c +++ b/src/iohelper/iohelper_message.c @@ -22,3 +22,97 @@ #include <config.h> #include "iohelper_message.h" +#include "virobject.h" +#include "virlog.h" + +#define VIR_FROM_THIS VIR_FROM_STREAMS + +VIR_LOG_INIT("iohelperCtl"); + +struct iohelperCtl { + virObject parent; + + int fd; +}; + +static virClassPtr iohelperCtlClass; + +static void +iohelperCtlDispose(void *obj) +{ + iohelperCtlPtr ctl = obj; + + VIR_DEBUG("obj = %p", ctl); +} + +static int iohelperCtlOnceInit(void) +{ + if (!(iohelperCtlClass = virClassNew(virClassForObject(), + "iohelperCtl", + sizeof(iohelperCtl), + iohelperCtlDispose))) + return -1; + + return 0; +} + +VIR_ONCE_GLOBAL_INIT(iohelperCtl) + +iohelperCtlPtr +iohelperCtlNew(int fd) +{ + iohelperCtlPtr ret; + + if (iohelperCtlInitialize() < 0) + return NULL; + + if (!(ret = virObjectNew(iohelperCtlClass))) + return NULL; + + ret->fd = fd; + + return ret; +} + + +ssize_t +iohelperRead(iohelperCtlPtr ctl ATTRIBUTE_UNUSED, + char *bytes ATTRIBUTE_UNUSED, + size_t nbytes ATTRIBUTE_UNUSED) +{ + virReportError(VIR_ERR_INTERNAL_ERROR, "%s", + _("sparse stream not supported")); + return -1; +} + + +ssize_t +iohelperWrite(iohelperCtlPtr ctl ATTRIBUTE_UNUSED, + const char *bytes ATTRIBUTE_UNUSED, + size_t nbytes ATTRIBUTE_UNUSED) +{ + virReportError(VIR_ERR_INTERNAL_ERROR, "%s", + _("sparse stream not supported")); + return -1; +} + + +int +iohelperSkip(iohelperCtlPtr ctl ATTRIBUTE_UNUSED, + unsigned long long length ATTRIBUTE_UNUSED) +{ + virReportError(VIR_ERR_INTERNAL_ERROR, "%s", + _("sparse stream not supported")); + return -1; +} + + +int +iohelperInData(iohelperCtlPtr ctl ATTRIBUTE_UNUSED, + int *inData ATTRIBUTE_UNUSED, + unsigned long long *length ATTRIBUTE_UNUSED) +{ + virReportError(VIR_ERR_INTERNAL_ERROR, "%s", + _("sparse stream not supported")); + return -1; +} diff --git a/src/iohelper/iohelper_message.h b/src/iohelper/iohelper_message.h index 791a645..74afd49 100644 --- a/src/iohelper/iohelper_message.h +++ b/src/iohelper/iohelper_message.h @@ -23,4 +23,30 @@ #ifndef __VIR_IOHELPER_MESSAGE_H__ # define __VIR_IOHELPER_MESSAGE_H__ +# include "internal.h" + +typedef struct iohelperCtl iohelperCtl; +typedef iohelperCtl *iohelperCtlPtr; + +iohelperCtlPtr iohelperCtlNew(int fd); + +ssize_t +iohelperRead(iohelperCtlPtr ctl, + char *bytes, + size_t nbytes); + +ssize_t +iohelperWrite(iohelperCtlPtr ctl, + const char *bytes, + size_t nbytes); + +int +iohelperSkip(iohelperCtlPtr ctl, + unsigned long long length); + +int +iohelperInData(iohelperCtlPtr ctl, + int *inData, + unsigned long long *length); + #endif /* __VIR_IOHELPER_H__ */ -- 2.8.4

This is a bit tricky to grasp. But lets try anyway. So we read data from a pipe. Now, the data are not raw file data as they used to be. But they are a formatted message (encoded virNetMessage in this case). So until we've read it whole, we must: a) continue reading its remainder b) if reading would block and caller wants us to read nonblock, claim EAGAIN and don't return any partially read data. Now, another interesting thing may happen - we've read a big message, decoded it but caller is reading just a small chunks. In that case we must not read any new message but rather copy those small chunks into caller's buffer until the whole message is processed. Only after that we can read new message from the pipe and process continues. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/fdstream.c | 2 +- src/iohelper/iohelper_message.c | 137 +++++++++++++++++++++++++++++++++++++--- src/iohelper/iohelper_message.h | 3 +- 3 files changed, 131 insertions(+), 11 deletions(-) diff --git a/src/fdstream.c b/src/fdstream.c index dc41164..77b5586 100644 --- a/src/fdstream.c +++ b/src/fdstream.c @@ -646,7 +646,7 @@ static int virFDStreamOpenInternal(virStreamPtr st, fdst->length = length; fdst->formatted = formatted; if (formatted && - !(fdst->ioCtl = iohelperCtlNew(fd))) + !(fdst->ioCtl = iohelperCtlNew(fd, false))) goto error; diff --git a/src/iohelper/iohelper_message.c b/src/iohelper/iohelper_message.c index 51b283d..fe2304b 100644 --- a/src/iohelper/iohelper_message.c +++ b/src/iohelper/iohelper_message.c @@ -22,8 +22,11 @@ #include <config.h> #include "iohelper_message.h" -#include "virobject.h" +#include "viralloc.h" +#include "virfile.h" #include "virlog.h" +#include "virnetmessage.h" +#include "virobject.h" #define VIR_FROM_THIS VIR_FROM_STREAMS @@ -33,8 +36,13 @@ struct iohelperCtl { virObject parent; int fd; + bool blocking; + virNetMessagePtr msg; + bool msgReadyRead; }; +typedef ssize_t (*readfunc)(int fd, void *buf, size_t count); + static virClassPtr iohelperCtlClass; static void @@ -42,7 +50,7 @@ iohelperCtlDispose(void *obj) { iohelperCtlPtr ctl = obj; - VIR_DEBUG("obj = %p", ctl); + virNetMessageFree(ctl->msg); } static int iohelperCtlOnceInit(void) @@ -59,7 +67,8 @@ static int iohelperCtlOnceInit(void) VIR_ONCE_GLOBAL_INIT(iohelperCtl) iohelperCtlPtr -iohelperCtlNew(int fd) +iohelperCtlNew(int fd, + bool blocking) { iohelperCtlPtr ret; @@ -69,20 +78,130 @@ iohelperCtlNew(int fd) if (!(ret = virObjectNew(iohelperCtlClass))) return NULL; + if (!(ret->msg = virNetMessageNew(false))) + goto error; + ret->fd = fd; + ret->blocking = blocking; + ret->msgReadyRead = false; return ret; + + error: + virObjectUnref(ret); + return NULL; +} + + +static void +messageClear(iohelperCtlPtr ctl) +{ + virNetMessageClear(ctl->msg); + ctl->msgReadyRead = false; +} + + +static inline bool +messageReadyRead(iohelperCtlPtr ctl) +{ + return ctl->msgReadyRead; +} + + +static ssize_t +messageRecv(iohelperCtlPtr ctl) +{ + virNetMessagePtr msg = ctl->msg; + readfunc readF = ctl->blocking ? saferead : read; + + ctl->msgReadyRead = false; + + if (!msg->bufferLength) { + msg->bufferLength = 4; + if (VIR_ALLOC_N(msg->buffer, msg->bufferLength) < 0) + return -1; + } + + while (true) { + ssize_t nread; + size_t want; + + want = msg->bufferLength - msg->bufferOffset; + + reread: + errno = 0; + nread = readF(ctl->fd, + msg->buffer + msg->bufferOffset, + want); + + if (nread < 0) { + if (errno == EINTR) + goto reread; + if (errno == EAGAIN) + return 0; + return -1; + } else if (nread == 0) { + /* EOF while reading */ + return 0; + } else { + msg->bufferOffset += nread; + } + + if (msg->bufferOffset == msg->bufferLength) { + if (msg->bufferOffset == 4) { + if (virNetMessageDecodeLength(msg) < 0) + return -1; + } else { + if (virNetMessageDecodeHeader(msg) < 0) + return -1; + + /* Here we would decode the payload someday */ + + ctl->msgReadyRead = true; + return msg->bufferLength - msg->bufferOffset; + } + } + } } ssize_t -iohelperRead(iohelperCtlPtr ctl ATTRIBUTE_UNUSED, - char *bytes ATTRIBUTE_UNUSED, - size_t nbytes ATTRIBUTE_UNUSED) +iohelperRead(iohelperCtlPtr ctl, + char *bytes, + size_t nbytes) { - virReportError(VIR_ERR_INTERNAL_ERROR, "%s", - _("sparse stream not supported")); - return -1; + ssize_t want = nbytes; + virNetMessagePtr msg = ctl->msg; + + if (!messageReadyRead(ctl)) { + ssize_t nread; + /* Okay, the incoming message is not fully read. Try to + * finish its receiving and recheck. */ + if ((nread = messageRecv(ctl)) < 0) + return -1; + + if (!nread && errno != EAGAIN) + return 0; + + if (!messageReadyRead(ctl)) { + errno = EAGAIN; + return -1; + } + } + + if (want > msg->bufferLength - msg->bufferOffset) + want = msg->bufferLength - msg->bufferOffset; + + memcpy(bytes, + msg->buffer + msg->bufferOffset, + want); + + msg->bufferOffset += want; + + if (msg->bufferOffset == msg->bufferLength) + messageClear(ctl); + + return want; } diff --git a/src/iohelper/iohelper_message.h b/src/iohelper/iohelper_message.h index 74afd49..68beef0 100644 --- a/src/iohelper/iohelper_message.h +++ b/src/iohelper/iohelper_message.h @@ -28,7 +28,8 @@ typedef struct iohelperCtl iohelperCtl; typedef iohelperCtl *iohelperCtlPtr; -iohelperCtlPtr iohelperCtlNew(int fd); +iohelperCtlPtr iohelperCtlNew(int fd, + bool blocking); ssize_t iohelperRead(iohelperCtlPtr ctl, -- 2.8.4

This is nearly the same story as formatted read. With one exception. Due to the recurring pattern of calling write functions in our code: repeat { nwritten = write(dest, buf + offset, buflen - offset); offset += nwritten; } we have to be cautious about the return value of iohelperWrite(). If we would return number of bytes partially written, we would be called again with an offset shifted. But at the beginning of iohelperWrite() we flush the output queue so the remaining data is written then. Moreover, there's no way for us to determine whether this is the case or we are called with fresh data, completely unrelated to the first call. Therefore, we must keep claiming EAGAIN, even though the message is being partially written, and just on the last iteration claim success and return the size of data we were requested to write in the first place. Even if this means that in the last iteration just one byte was written to make the message write complete. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/iohelper/iohelper_message.c | 105 ++++++++++++++++++++++++++++++++++++++-- 1 file changed, 100 insertions(+), 5 deletions(-) diff --git a/src/iohelper/iohelper_message.c b/src/iohelper/iohelper_message.c index fe2304b..d900c2f 100644 --- a/src/iohelper/iohelper_message.c +++ b/src/iohelper/iohelper_message.c @@ -39,9 +39,11 @@ struct iohelperCtl { bool blocking; virNetMessagePtr msg; bool msgReadyRead; + bool msgReadyWrite; }; typedef ssize_t (*readfunc)(int fd, void *buf, size_t count); +typedef ssize_t (*writefunc)(int fd, const void *buf, size_t count); static virClassPtr iohelperCtlClass; @@ -84,6 +86,7 @@ iohelperCtlNew(int fd, ret->fd = fd; ret->blocking = blocking; ret->msgReadyRead = false; + ret->msgReadyWrite = true; return ret; @@ -98,6 +101,7 @@ messageClear(iohelperCtlPtr ctl) { virNetMessageClear(ctl->msg); ctl->msgReadyRead = false; + ctl->msgReadyWrite = true; } @@ -107,6 +111,11 @@ messageReadyRead(iohelperCtlPtr ctl) return ctl->msgReadyRead; } +static inline bool +messageReadyWrite(iohelperCtlPtr ctl) +{ + return ctl->msgReadyWrite; +} static ssize_t messageRecv(iohelperCtlPtr ctl) @@ -165,6 +174,47 @@ messageRecv(iohelperCtlPtr ctl) } +static ssize_t +messageSend(iohelperCtlPtr ctl) +{ + virNetMessagePtr msg = ctl->msg; + writefunc writeF = ctl->blocking ? safewrite : write; + + ctl->msgReadyWrite = false; + + while (true) { + ssize_t nwritten; + size_t want; + + want = msg->bufferLength - msg->bufferOffset; + + rewrite: + errno = 0; + nwritten = writeF(ctl->fd, + msg->buffer + msg->bufferOffset, + want); + + if (nwritten < 0) { + if (errno == EINTR) + goto rewrite; + if (errno == EAGAIN) + return 0; + return -1; + } else if (nwritten == 0) { + /* EOF while writing */ + return 0; + } else { + msg->bufferOffset += nwritten; + } + + if (msg->bufferOffset == msg->bufferLength) { + ctl->msgReadyWrite = true; + return msg->bufferLength; + } + } +} + + ssize_t iohelperRead(iohelperCtlPtr ctl, char *bytes, @@ -206,12 +256,57 @@ iohelperRead(iohelperCtlPtr ctl, ssize_t -iohelperWrite(iohelperCtlPtr ctl ATTRIBUTE_UNUSED, - const char *bytes ATTRIBUTE_UNUSED, - size_t nbytes ATTRIBUTE_UNUSED) +iohelperWrite(iohelperCtlPtr ctl, + const char *bytes, + size_t nbytes) { - virReportError(VIR_ERR_INTERNAL_ERROR, "%s", - _("sparse stream not supported")); + size_t headerLen; + ssize_t nwritten, totalNwritten = 0; + + virNetMessagePtr msg = ctl->msg; + + if (!messageReadyWrite(ctl)) { + /* Okay, the outgoing message is not fully sent. Try to + * finish the sending and recheck. */ + if ((nwritten = messageSend(ctl)) < 0) + return -1; + + if (!nwritten && errno != EAGAIN) + return 0; + + if (!messageReadyWrite(ctl)) { + errno = EAGAIN; + return -2; + } + + totalNwritten += nwritten; + } + + memset(&msg->header, 0, sizeof(msg->header)); + msg->header.type = VIR_NET_STREAM; + msg->header.status = nbytes ? VIR_NET_CONTINUE : VIR_NET_OK; + + /* Encoding a message is fatal and we should discard any + * partially encoded message. */ + if (virNetMessageEncodeHeader(msg) < 0) + goto error; + + headerLen = msg->bufferOffset; + + if (virNetMessageEncodePayloadRaw(msg, bytes, nbytes) < 0) + goto error; + + /* At this point, the message is successfully encoded. Don't + * discard it if something below fails. */ + if ((nwritten = messageSend(ctl)) < 0) + return -1; + + totalNwritten += nwritten - headerLen; + + return totalNwritten; + + error: + messageClear(ctl); return -1; } -- 2.8.4

Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- tests/Makefile.am | 10 ++- tests/iohelpermessagetest.c | 177 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 186 insertions(+), 1 deletion(-) create mode 100644 tests/iohelpermessagetest.c diff --git a/tests/Makefile.am b/tests/Makefile.am index 444e0fd..a87de5f 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -379,7 +379,8 @@ test_scripts += $(libvirtd_test_scripts) test_programs += \ eventtest \ - libvirtdconftest + libvirtdconftest \ + iohelpermessagetest else ! WITH_LIBVIRTD EXTRA_DIST += $(libvirtd_test_scripts) endif ! WITH_LIBVIRTD @@ -1296,6 +1297,13 @@ if WITH_LIBVIRTD eventtest_SOURCES = \ eventtest.c testutils.h testutils.c eventtest_LDADD = -lrt $(LDADDS) + +iohelpermessagetest_SOURCES = \ + iohelpermessagetest.c testutils.h testutils.c +iohelpermessagetest_CFLAGS = \ + $(AM_CFLAGS) -I$(top_srcdir)/src/iohelper +iohelpermessagetest_LDADD = \ + $(LDADDS) ../src/libvirt-iohelper.la endif WITH_LIBVIRTD libshunload_la_SOURCES = shunloadhelper.c diff --git a/tests/iohelpermessagetest.c b/tests/iohelpermessagetest.c new file mode 100644 index 0000000..293c107 --- /dev/null +++ b/tests/iohelpermessagetest.c @@ -0,0 +1,177 @@ +/* + * Copyright (C) 2016 Red Hat, Inc. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library. If not, see + * <http://www.gnu.org/licenses/>. + * + * Author: Michal Privoznik <mprivozn@redhat.com> + */ + +#include <config.h> + +#include <fcntl.h> +#include <unistd.h> + +#include "testutils.h" +#include "internal.h" +#include "iohelper_message.h" +#include "virlog.h" + +VIR_LOG_INIT("tests.iohelpermessagetest"); + +#define VIR_FROM_THIS VIR_FROM_NONE + +typedef struct { + const char * const *msg; + unsigned int *len; +} testData; + +typedef testData *testDataPtr; + +static int +testInit(iohelperCtlPtr ctl[2], int fd[2], bool blockR, bool blockW) +{ + ctl[0] = ctl[1] = NULL; + + if (pipe(fd) < 0) { + fprintf(stderr, "Cannot create pipe: %d", errno); + return -1; + } + + if (virSetBlocking(fd[0], blockR) < 0 || + virSetBlocking(fd[1], blockW) < 0) + goto error; + + if (!(ctl[0] = iohelperCtlNew(fd[0], blockR)) || + !(ctl[1] = iohelperCtlNew(fd[1], blockW))) + goto error; + + return 0; + error: + virObjectUnref(ctl[0]); + virObjectUnref(ctl[1]); + ctl[0] = ctl[1] = NULL; + return -1; +} + +static int +testBlocking(const void *opaque) +{ + int ret = -1; + const testData *data = opaque; + iohelperCtlPtr ioCtl[2]; + int pipeFD[2] = {-1, -1}; + size_t idx = 0; + bool quit = !data->msg && !data->len; + char *genMsg = NULL; + char buf[1024]; + + if (testInit(ioCtl, pipeFD, true, true) < 0) + goto cleanup; + + while (!quit) { + const char *msg = NULL; + size_t len = 0; + ssize_t nread = 0, nwritten = 0; + size_t i; + + if (data->len) { + len = data->len[idx]; + quit = !data->len[idx + 1]; + VIR_FREE(genMsg); + if (VIR_ALLOC_N(genMsg, len) < 0) + goto cleanup; + for (i = 0; i < len; i++) + genMsg[i] = i; + msg = genMsg; + VIR_DEBUG("Testing string of len %zu", len); + } else { + msg = data->msg[idx]; + quit = !data->msg[idx + 1]; + len = strlen(msg); + VIR_DEBUG("Testing string '%s'", msg); + } + + if ((nwritten = iohelperWrite(ioCtl[1], msg, len)) < 0) { + virFilePrintf(stderr, "Unable to write message (errno=%d)\n", errno); + goto cleanup; + } + + if (nwritten != len) { + virFilePrintf(stderr, "Mismatched data len written=%zu wanted=%zu\n", nwritten, len); + goto cleanup; + } + + if ((nread = iohelperRead(ioCtl[0], buf, sizeof(buf))) < 0) { + virFilePrintf(stderr, "Unable to read message (errno=%d)\n", errno); + goto cleanup; + } + + if (nread != nwritten) { + virFilePrintf(stderr, "Mismatched data len written=%zu read=%zu\n", nwritten, nread); + goto cleanup; + } + + buf[nread] = '\0'; + + if (memcmp(buf, msg, nread)) { + virFilePrintf(stderr, "Mismatched data written='%s' read='%s'\n", msg, buf); + goto cleanup; + } + + idx++; + } + + ret = 0; + cleanup: + VIR_FREE(genMsg); + virObjectUnref(ioCtl[0]); + virObjectUnref(ioCtl[1]); + VIR_FORCE_CLOSE(pipeFD[0]); + VIR_FORCE_CLOSE(pipeFD[1]); + return ret; +} + +static int +mymain(void) +{ + int ret = 0; + +#define DO_TEST_BLOCKING_SIMPLE(...) \ + do { \ + const char *msg[] = { __VA_ARGS__, NULL}; \ + testData data = {.msg = msg, .len = NULL }; \ + if (virTestRun("Blocking simple", testBlocking, &data) < 0) \ + ret = -1; \ + } while (0) + +#define DO_TEST_BLOCKING_LEN(...) \ + do { \ + unsigned int len[] = { __VA_ARGS__, 0}; \ + testData data = {.msg = NULL, .len = len }; \ + if (virTestRun("Blocking len", testBlocking, &data) < 0) \ + ret = -1; \ + } while (0) + + DO_TEST_BLOCKING_SIMPLE("Hello world"); + DO_TEST_BLOCKING_SIMPLE("Hello world", "Hello", "world"); + + DO_TEST_BLOCKING_LEN(10); + DO_TEST_BLOCKING_LEN(1024); + DO_TEST_BLOCKING_LEN(32, 64, 128, 512, 1024); + + return ret == 0 ? EXIT_SUCCESS : EXIT_FAILURE; +} + +VIRT_TEST_MAIN(mymain) -- 2.8.4

Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- tests/iohelpermessagetest.c | 295 +++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 292 insertions(+), 3 deletions(-) diff --git a/tests/iohelpermessagetest.c b/tests/iohelpermessagetest.c index 293c107..8c83261 100644 --- a/tests/iohelpermessagetest.c +++ b/tests/iohelpermessagetest.c @@ -27,6 +27,7 @@ #include "internal.h" #include "iohelper_message.h" #include "virlog.h" +#include "virtime.h" VIR_LOG_INIT("tests.iohelpermessagetest"); @@ -35,6 +36,8 @@ VIR_LOG_INIT("tests.iohelpermessagetest"); typedef struct { const char * const *msg; unsigned int *len; + bool blockR; + bool blockW; } testData; typedef testData *testDataPtr; @@ -77,7 +80,7 @@ testBlocking(const void *opaque) char *genMsg = NULL; char buf[1024]; - if (testInit(ioCtl, pipeFD, true, true) < 0) + if (testInit(ioCtl, pipeFD, data->blockR, data->blockW) < 0) goto cleanup; while (!quit) { @@ -143,15 +146,253 @@ testBlocking(const void *opaque) return ret; } +typedef struct { + virMutexPtr lock; + virCondPtr cond; + bool finished; + int ret; + iohelperCtlPtr ioCtl; + char *msg; + size_t msgLen; +} threadData; + +typedef threadData *threadDataPtr; + +static void +readerThread(void *opaque) +{ + threadDataPtr data = opaque; + char *bigBuf = NULL; + size_t bigBufSize = 0; + + virObjectRef(data->ioCtl); + /* Sleep some random time to simulate out of sync read & + * write */ + usleep((rand() % 100) * 1000); + + while (true) { + char buf[10]; /* Simulate reads of small chunks of data */ + ssize_t nread; + + reread: + nread = iohelperRead(data->ioCtl, buf, sizeof(buf)); + if (nread < 0) { + if (errno == EAGAIN) { + usleep(20 * 1000); + goto reread; + } + + virFilePrintf(stderr, "Unable to read message (errno=%d)\n", errno); + goto cleanup; + } + + if (!nread) + break; + + if (VIR_REALLOC_N(bigBuf, bigBufSize + nread) < 0) + goto cleanup; + + memcpy(bigBuf + bigBufSize, buf, nread); + bigBufSize += nread; + } + + if (bigBufSize != data->msgLen) { + virFilePrintf(stderr, "Message length mismatch: expected %zu got %zu", + data->msgLen, bigBufSize); + goto cleanup; + } + + if (memcmp(bigBuf, data->msg, data->msgLen)) { + virFilePrintf(stderr, "Mismatched data"); + goto cleanup; + } + + data->ret = 0; + + cleanup: + VIR_FREE(bigBuf); + virObjectUnref(data->ioCtl); + virMutexLock(data->lock); + data->finished = true; + virCondSignal(data->cond); + virMutexUnlock(data->lock); +} + +static void +writerThread(void *opaque ATTRIBUTE_UNUSED) +{ + threadDataPtr data = opaque; + size_t writeOff = 0; + + virObjectRef(data->ioCtl); + /* Sleep some random time to simulate out of sync read & + * write */ + usleep((rand() % 100) * 1000); + + while (true) { + ssize_t nwritten; + size_t want = data->msgLen - writeOff; + + if (!want) + break; + + rewrite: + nwritten = iohelperWrite(data->ioCtl, + data->msg + writeOff, + want); + + if (nwritten < 0) { + if (errno == EAGAIN) { + usleep(20 * 1000); + goto rewrite; + } + + virFilePrintf(stderr, "Unable to write message (errno=%d)\n", errno); + goto cleanup; + } + + if (!nwritten) + break; + + writeOff += nwritten; + } + + if (writeOff != data->msgLen) { + virFilePrintf(stderr, "Message length mismatch: expected %zu written %zu", + data->msgLen, writeOff); + goto cleanup; + } + + data->ret = 0; + + cleanup: + virObjectUnref(data->ioCtl); + virMutexLock(data->lock); + data->finished = true; + virCondSignal(data->cond); + virMutexUnlock(data->lock); +} + +/* How long wait (in ms) for both reader & writer + * threads to finish? */ +#define WAIT_TIME 10000 + +static int +testNonblocking(const void *opaque) +{ + int ret = -1; + const testData *data = opaque; + iohelperCtlPtr ioCtl[2] = {NULL, NULL}; + int pipeFD[2] = {-1, -1}; + virThread reader, writer; + threadData readerD, writerD; + virMutex lock; + virCond cond; + unsigned long long now; + unsigned long long then; + char *msg = NULL; + size_t msgLen = 0, idx; + + for (idx = 0; data->msg && data->msg[idx]; idx++) { + const char *tmp = data->msg[idx]; + size_t tmpLen = strlen(tmp); + + if (VIR_REALLOC_N(msg, msgLen + tmpLen + 1) < 0) + goto cleanup; + + memcpy(msg + msgLen, tmp, tmpLen + 1); + msgLen += tmpLen; + } + + for (idx = 0; data->len && data->len[idx]; idx++) { + size_t tmpLen = data->len[idx]; + + if (VIR_REALLOC_N(msg, msgLen + tmpLen) < 0) + goto cleanup; + msgLen += tmpLen; + + /* Here @msg contains some garbage that was on the heap + * when the memory was allocated. That's okay, we want to + * be sure iohelper can deal with binary garbage. */ + } + + if (virMutexInit(&lock) < 0 || + virCondInit(&cond) < 0) + goto cleanup; + + if (testInit(ioCtl, pipeFD, data->blockR, data->blockW) < 0) + goto cleanup; + + readerD = writerD = (threadData) {.lock = &lock, .cond = &cond, + .ret = -1, .finished = false, .msg = msg, msgLen = msgLen}; + readerD.ioCtl = ioCtl[0]; + writerD.ioCtl = ioCtl[1]; + + /* Now, ideally we would set the kernel's pipe buffer to be + * small. Really small. Couple of bytes perhaps so that we + * can be sure writes wrap around it just nicely. But the + * smallest possible size is PAGESIZE. Trying to set anything + * smaller than that is silently rounded up to PAGESIZE. + * Okay, in that case we should write multiple of that. */ + fcntl(pipeFD[0], F_SETPIPE_SZ, 0); + + virMutexLock(&lock); + + if (virThreadCreate(&reader, false, readerThread, &readerD) < 0 || + virThreadCreate(&writer, false, writerThread, &writerD) < 0) + goto cleanup; + + if (virTimeMillisNow(&now) < 0) + goto cleanup; + + then = now + WAIT_TIME; + + while (!readerD.finished || + !writerD.finished) { + if (virCondWaitUntil(&cond, &lock, then) < 0) { + if (errno == ETIMEDOUT) { + if (!readerD.finished) + virThreadCancel(&reader); + if (!writerD.finished) + virThreadCancel(&writer); + } + + goto cleanup; + } + if (readerD.finished) + VIR_FORCE_CLOSE(pipeFD[0]); + if (writerD.finished) + VIR_FORCE_CLOSE(pipeFD[1]); + } + + if (readerD.ret < 0 || + writerD.ret < 0) + goto cleanup; + + ret = 0; + cleanup: + virMutexUnlock(&lock); + virMutexDestroy(&lock); + virCondDestroy(&cond); + virObjectUnref(ioCtl[0]); + virObjectUnref(ioCtl[1]); + VIR_FORCE_CLOSE(pipeFD[0]); + VIR_FORCE_CLOSE(pipeFD[1]); + return ret; +} + static int mymain(void) { int ret = 0; + srand(time(NULL)); + #define DO_TEST_BLOCKING_SIMPLE(...) \ do { \ const char *msg[] = { __VA_ARGS__, NULL}; \ - testData data = {.msg = msg, .len = NULL }; \ + testData data = {.blockR = true, .blockW = true, \ + .msg = msg, .len = NULL }; \ if (virTestRun("Blocking simple", testBlocking, &data) < 0) \ ret = -1; \ } while (0) @@ -159,11 +400,47 @@ mymain(void) #define DO_TEST_BLOCKING_LEN(...) \ do { \ unsigned int len[] = { __VA_ARGS__, 0}; \ - testData data = {.msg = NULL, .len = len }; \ + testData data = {.blockR = true, .blockW = true, \ + .msg = NULL, .len = len }; \ if (virTestRun("Blocking len", testBlocking, &data) < 0) \ ret = -1; \ } while (0) +#define DO_TEST_BLOCKR_SIMPLE(...) \ + do { \ + const char *msg[] = { __VA_ARGS__, NULL}; \ + testData data = {.blockR = true, .blockW = false, \ + .msg = msg, .len = NULL }; \ + if (virTestRun("Blocking read simple", testNonblocking, &data) < 0) \ + ret = -1; \ + } while (0) + +#define DO_TEST_BLOCKR_LEN(...) \ + do { \ + unsigned int len[] = { __VA_ARGS__, 0}; \ + testData data = {.blockR = true, .blockW = false, \ + .msg = NULL, .len = len }; \ + if (virTestRun("Blocking read len", testNonblocking, &data) < 0) \ + ret = -1; \ + } while (0) + +#define DO_TEST_BLOCKW_SIMPLE(...) \ + do { \ + const char *msg[] = { __VA_ARGS__, NULL}; \ + testData data = {.blockR = false, .blockW = true, \ + .msg = msg, .len = NULL }; \ + if (virTestRun("Blocking write simple", testNonblocking, &data) < 0) \ + ret = -1; \ + } while (0) +#define DO_TEST_BLOCKW_LEN(...) \ + do { \ + unsigned int len[] = { __VA_ARGS__, 0}; \ + testData data = {.blockR = false, .blockW = true, \ + .msg = NULL, .len = len }; \ + if (virTestRun("Blocking write len", testNonblocking, &data) < 0) \ + ret = -1; \ + } while (0) + DO_TEST_BLOCKING_SIMPLE("Hello world"); DO_TEST_BLOCKING_SIMPLE("Hello world", "Hello", "world"); @@ -171,6 +448,18 @@ mymain(void) DO_TEST_BLOCKING_LEN(1024); DO_TEST_BLOCKING_LEN(32, 64, 128, 512, 1024); + DO_TEST_BLOCKR_SIMPLE("Hello world"); + DO_TEST_BLOCKR_SIMPLE("Hello world", "Hello", "world"); + + DO_TEST_BLOCKR_LEN(1024); + DO_TEST_BLOCKR_LEN(409600); + + DO_TEST_BLOCKW_SIMPLE("Hello world"); + DO_TEST_BLOCKW_SIMPLE("Hello world", "Hello", "world"); + + DO_TEST_BLOCKW_LEN(1024); + DO_TEST_BLOCKW_LEN(409600); + return ret == 0 ? EXIT_SUCCESS : EXIT_FAILURE; } -- 2.8.4

Now that we have formatted messages flying through pipe back and forth, we can start introducing support for other types of messages. For instance, a type to represent a hole in file/stream. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- daemon/stream.c | 21 ++++--- po/POTFILES.in | 1 - src/iohelper/iohelper_message.c | 131 ++++++++++++++++++++++++++++++++++------ src/libvirt-stream.c | 1 + tests/Makefile.am | 2 +- 5 files changed, 129 insertions(+), 27 deletions(-) diff --git a/daemon/stream.c b/daemon/stream.c index 22d7cf7..83f7310 100644 --- a/daemon/stream.c +++ b/daemon/stream.c @@ -833,14 +833,19 @@ daemonStreamHandleRead(virNetServerClientPtr client, VIR_DEBUG("rv=%d inData=%d length=%llu", rv, inData, length); if (rv < 0) { - if (virNetServerProgramSendStreamError(remoteProgram, - client, - msg, - &rerr, - stream->procedure, - stream->serial) < 0) - goto cleanup; - msg = NULL; + if (rv == -2) { + /* Unable to determine yet. Claim success. */ + } else { + /* Proper error. */ + if (virNetServerProgramSendStreamError(remoteProgram, + client, + msg, + &rerr, + stream->procedure, + stream->serial) < 0) + goto cleanup; + msg = NULL; + } /* We're done with this call */ goto done; diff --git a/po/POTFILES.in b/po/POTFILES.in index 7f40200..9f4866c 100644 --- a/po/POTFILES.in +++ b/po/POTFILES.in @@ -64,7 +64,6 @@ src/interface/interface_backend_netcf.c src/interface/interface_backend_udev.c src/internal.h src/iohelper/iohelper.c -src/iohelper/iohelper_message.c src/libvirt-admin.c src/libvirt-domain-snapshot.c src/libvirt-domain.c diff --git a/src/iohelper/iohelper_message.c b/src/iohelper/iohelper_message.c index d900c2f..02c0283 100644 --- a/src/iohelper/iohelper_message.c +++ b/src/iohelper/iohelper_message.c @@ -40,6 +40,7 @@ struct iohelperCtl { virNetMessagePtr msg; bool msgReadyRead; bool msgReadyWrite; + unsigned long long skipLength; }; typedef ssize_t (*readfunc)(int fd, void *buf, size_t count); @@ -122,19 +123,20 @@ messageRecv(iohelperCtlPtr ctl) { virNetMessagePtr msg = ctl->msg; readfunc readF = ctl->blocking ? saferead : read; + virNetStreamSkip data; ctl->msgReadyRead = false; - if (!msg->bufferLength) { - msg->bufferLength = 4; - if (VIR_ALLOC_N(msg->buffer, msg->bufferLength) < 0) - return -1; - } - while (true) { ssize_t nread; size_t want; + if (!msg->bufferLength) { + msg->bufferLength = 4; + if (VIR_ALLOC_N(msg->buffer, msg->bufferLength) < 0) + return -1; + } + want = msg->bufferLength - msg->bufferOffset; reread: @@ -164,7 +166,17 @@ messageRecv(iohelperCtlPtr ctl) if (virNetMessageDecodeHeader(msg) < 0) return -1; - /* Here we would decode the payload someday */ + if (msg->header.type == VIR_NET_STREAM_SKIP) { + if (virNetMessageDecodePayload(msg, + (xdrproc_t) xdr_virNetStreamSkip, + &data) < 0) { + return -1; + } + + ctl->skipLength += data.length; + messageClear(ctl); + continue; + } ctl->msgReadyRead = true; return msg->bufferLength - msg->bufferOffset; @@ -239,6 +251,12 @@ iohelperRead(iohelperCtlPtr ctl, } } + /* Should never happen, but things change. */ + if (msg->header.type != VIR_NET_STREAM) { + errno = EAGAIN; + return -1; + } + if (want > msg->bufferLength - msg->bufferOffset) want = msg->bufferLength - msg->bufferOffset; @@ -312,21 +330,100 @@ iohelperWrite(iohelperCtlPtr ctl, int -iohelperSkip(iohelperCtlPtr ctl ATTRIBUTE_UNUSED, - unsigned long long length ATTRIBUTE_UNUSED) +iohelperSkip(iohelperCtlPtr ctl, + unsigned long long length) { - virReportError(VIR_ERR_INTERNAL_ERROR, "%s", - _("sparse stream not supported")); + virNetMessagePtr msg = ctl->msg; + virNetStreamSkip data; + + if (messageReadyRead(ctl)) { + /* This stream is used for reading. */ + return 0; + } + + if (!messageReadyWrite(ctl)) { + ssize_t nwritten; + /* Okay, the outgoing message is not fully sent. Try to + * finish the sending and recheck. */ + if ((nwritten = messageSend(ctl)) < 0) + return -1; + + if (!nwritten && errno != EAGAIN) + return 0; + + if (!messageReadyWrite(ctl)) { + errno = EAGAIN; + return -2; + } + } + + memset(&msg->header, 0, sizeof(msg->header)); + msg->header.type = VIR_NET_STREAM_SKIP; + msg->header.status = VIR_NET_CONTINUE; + + memset(&data, 0, sizeof(data)); + data.length = length; + + /* Encoding a message is fatal and we should discard any + * partially encoded message. */ + if (virNetMessageEncodeHeader(msg) < 0) + goto error; + + if (virNetMessageEncodePayload(msg, + (xdrproc_t) xdr_virNetStreamSkip, + &data) < 0) + goto error; + + /* At this point, the message is successfully encoded. Don't + * discard it if something below fails. */ + if (messageSend(ctl) < 0) + return -1; + + return 0; + + error: + messageClear(ctl); return -1; } int -iohelperInData(iohelperCtlPtr ctl ATTRIBUTE_UNUSED, - int *inData ATTRIBUTE_UNUSED, - unsigned long long *length ATTRIBUTE_UNUSED) +iohelperInData(iohelperCtlPtr ctl, + int *inData, + unsigned long long *length) { - virReportError(VIR_ERR_INTERNAL_ERROR, "%s", - _("sparse stream not supported")); - return -1; + virNetMessagePtr msg; + + /* Make sure we have a message waiting in the queue. */ + + if (!messageReadyRead(ctl)) { + ssize_t nread; + /* Okay, the incoming message is not fully read. Try to + * finish its receiving and recheck. */ + if ((nread = messageRecv(ctl)) < 0) + return -1; + + if (!nread && errno != EAGAIN) { + /* EOF */ + *inData = *length = 0; + return 0; + } + + if (!messageReadyRead(ctl)) { + errno = EAGAIN; + return -2; + } + } + + if (ctl->skipLength) { + *inData = 0; + *length = ctl->skipLength; + ctl->skipLength = 0; + } else { + msg = ctl->msg; + *inData = 1; + *length = msg->bufferLength - msg->bufferOffset; + } + + return 0; } diff --git a/src/libvirt-stream.c b/src/libvirt-stream.c index 2632d55..13cbbe5 100644 --- a/src/libvirt-stream.c +++ b/src/libvirt-stream.c @@ -491,6 +491,7 @@ virStreamHoleSize(virStreamPtr stream, * and return 0. * * Returns 0 on success, + * -2 if unable to determine yet, * -1 otherwise */ int diff --git a/tests/Makefile.am b/tests/Makefile.am index a87de5f..aa35a6f 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -1303,7 +1303,7 @@ iohelpermessagetest_SOURCES = \ iohelpermessagetest_CFLAGS = \ $(AM_CFLAGS) -I$(top_srcdir)/src/iohelper iohelpermessagetest_LDADD = \ - $(LDADDS) ../src/libvirt-iohelper.la + $(LDADDS) ../src/libvirt-iohelper.la ../src/libvirt-net-rpc.la endif WITH_LIBVIRTD libshunload_la_SOURCES = shunloadhelper.c -- 2.8.4

The runIO function is after all initialization (e.g. argument parsing, file opening, ...) is done. It is the heart of the iohelper. Unfortunately, approach implemented in next patches is not easy to be combined with O_DIRECT. Therefore, rename runIO to runIOBasic and create new runIO, which for now just wraps over runIOBasic. But in not so distant future it will decide which IO loop is going to be called. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/iohelper/iohelper.c | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/src/iohelper/iohelper.c b/src/iohelper/iohelper.c index 00f31e7..02fc2ef 100644 --- a/src/iohelper/iohelper.c +++ b/src/iohelper/iohelper.c @@ -73,7 +73,7 @@ prepare(const char *path, int oflags, int mode, } static int -runIO(const char *path, int fd, int oflags, unsigned long long length) +runIOBasic(const char *path, int fd, int oflags, unsigned long long length) { void *base = NULL; /* Location to be freed */ char *buf = NULL; /* Aligned location within base */ @@ -199,6 +199,23 @@ runIO(const char *path, int fd, int oflags, unsigned long long length) return ret; } + +static int +runIO(const char *path, int fd, int oflags, + unsigned long long length, bool sparse) +{ + bool direct = O_DIRECT && ((oflags & O_DIRECT) != 0); + + /* Right now, our implementation do not support O_DIRECT and sparse. */ + if (direct && sparse) { + virReportError(VIR_ERR_INTERNAL_ERROR, "%s", + _("O_DIRECT and sparse streams is not supported at once")); + return -1; + } + + return runIOBasic(path, fd, oflags, length); +} + static const char *program_name; ATTRIBUTE_NORETURN static void @@ -225,6 +242,7 @@ main(int argc, char **argv) unsigned int delete = 0; int fd = -1; int lengthIndex = 0; + bool sparse = false; program_name = argv[0]; @@ -293,7 +311,7 @@ main(int argc, char **argv) exit(EXIT_FAILURE); } - if (fd < 0 || runIO(path, fd, oflags, length) < 0) + if (fd < 0 || runIO(path, fd, oflags, length, sparse) < 0) goto error; if (delete) -- 2.8.4

At the beginning of runIOBasic there's some reusable setting of file descriptors and names they are attached to. Move it to a special static function as this piece is going to be reused. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/iohelper/iohelper.c | 93 +++++++++++++++++++++++++++++++------------------ 1 file changed, 60 insertions(+), 33 deletions(-) diff --git a/src/iohelper/iohelper.c b/src/iohelper/iohelper.c index 02fc2ef..4224d97 100644 --- a/src/iohelper/iohelper.c +++ b/src/iohelper/iohelper.c @@ -72,6 +72,41 @@ prepare(const char *path, int oflags, int mode, return fd; } + +static int +setupFDs(const char *path, + int fd, + int oflags, + int *fdin, + const char **fdinname, + int *fdout, + const char **fdoutname) +{ + switch (oflags & O_ACCMODE) { + case O_RDONLY: + *fdin = fd; + *fdinname = path; + *fdout = STDOUT_FILENO; + *fdoutname = "stdout"; + break; + case O_WRONLY: + *fdin = STDIN_FILENO; + *fdinname = "stdin"; + *fdout = fd; + *fdoutname = path; + break; + + default: + virReportSystemError(EINVAL, + _("Unable to process file with flags %d"), + (oflags & O_ACCMODE)); + return -1; + } + + return 0; +} + + static int runIOBasic(const char *path, int fd, int oflags, unsigned long long length) { @@ -100,40 +135,32 @@ runIOBasic(const char *path, int fd, int oflags, unsigned long long length) buf = (char *) (((intptr_t) base + alignMask) & ~alignMask); #endif - switch (oflags & O_ACCMODE) { - case O_RDONLY: - fdin = fd; - fdinname = path; - fdout = STDOUT_FILENO; - fdoutname = "stdout"; - /* To make the implementation simpler, we give up on any - * attempt to use O_DIRECT in a non-trivial manner. */ - if (direct && ((end = lseek(fd, 0, SEEK_CUR)) != 0 || length)) { - virReportSystemError(end < 0 ? errno : EINVAL, "%s", - _("O_DIRECT read needs entire seekable file")); - goto cleanup; - } - break; - case O_WRONLY: - fdin = STDIN_FILENO; - fdinname = "stdin"; - fdout = fd; - fdoutname = path; - /* To make the implementation simpler, we give up on any - * attempt to use O_DIRECT in a non-trivial manner. */ - if (direct && (end = lseek(fd, 0, SEEK_END)) != 0) { - virReportSystemError(end < 0 ? errno : EINVAL, "%s", - _("O_DIRECT write needs empty seekable file")); - goto cleanup; - } - break; - - case O_RDWR: - default: - virReportSystemError(EINVAL, - _("Unable to process file with flags %d"), - (oflags & O_ACCMODE)); + if (setupFDs(path, fd, oflags, + &fdin, &fdinname, + &fdout, &fdoutname) < 0) goto cleanup; + + if (direct) { + switch (oflags & O_ACCMODE) { + case O_RDONLY: + /* To make the implementation simpler, we give up on any + * attempt to use O_DIRECT in a non-trivial manner. */ + if ((end = lseek(fd, 0, SEEK_CUR)) != 0 || length) { + virReportSystemError(end < 0 ? errno : EINVAL, "%s", + _("O_DIRECT read needs entire seekable file")); + goto cleanup; + } + break; + case O_WRONLY: + /* To make the implementation simpler, we give up on any + * attempt to use O_DIRECT in a non-trivial manner. */ + if ((end = lseek(fd, 0, SEEK_END)) != 0) { + virReportSystemError(end < 0 ? errno : EINVAL, "%s", + _("O_DIRECT write needs empty seekable file")); + goto cleanup; + } + break; + } } while (1) { -- 2.8.4

If formatted messages are required, iohelper is spawned with 'sparse' argument. Teach it to actually deal with it. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/iohelper/iohelper.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/iohelper/iohelper.c b/src/iohelper/iohelper.c index 4224d97..65fbc63 100644 --- a/src/iohelper/iohelper.c +++ b/src/iohelper/iohelper.c @@ -252,7 +252,7 @@ usage(int status) fprintf(stderr, _("%s: try --help for more details"), program_name); } else { printf(_("Usage: %s FILENAME OFLAGS MODE OFFSET LENGTH DELETE\n" - " or: %s FILENAME LENGTH FD\n"), + " or: %s FILENAME LENGTH FD [sparse]\n"), program_name, program_name); } exit(status); @@ -307,13 +307,15 @@ main(int argc, char **argv) exit(EXIT_FAILURE); } fd = prepare(path, oflags, mode, offset); - } else if (argc == 4) { /* FILENAME LENGTH FD */ + } else if (argc == 4 || argc == 5) { /* FILENAME LENGTH FD [sparse] */ lengthIndex = 2; if (virStrToLong_i(argv[3], NULL, 10, &fd) < 0) { fprintf(stderr, _("%s: malformed fd %s"), program_name, argv[3]); exit(EXIT_FAILURE); } + if (argc == 5) + sparse = STREQ(argv[4], "sparse"); #ifdef F_GETFL oflags = fcntl(fd, F_GETFL); #else -- 2.8.4

Finally, now that everything is prepared, we can wire up formatted messages for iohelper too. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/Makefile.am | 2 + src/iohelper/iohelper.c | 113 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 115 insertions(+) diff --git a/src/Makefile.am b/src/Makefile.am index 1cce603..f5bc5b6 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -2867,6 +2867,8 @@ libvirt_iohelper_LDFLAGS = \ $(NULL) libvirt_iohelper_LDADD = \ libvirt_util.la \ + libvirt-iohelper.la \ + libvirt-net-rpc.la \ ../gnulib/lib/libgnu.la if WITH_DTRACE_PROBES libvirt_iohelper_LDADD += libvirt_probes.lo diff --git a/src/iohelper/iohelper.c b/src/iohelper/iohelper.c index 65fbc63..4b431af 100644 --- a/src/iohelper/iohelper.c +++ b/src/iohelper/iohelper.c @@ -32,6 +32,7 @@ #include <stdio.h> #include <stdlib.h> +#include "iohelper_message.h" #include "virutil.h" #include "virthread.h" #include "virfile.h" @@ -40,6 +41,7 @@ #include "virrandom.h" #include "virstring.h" #include "virgettext.h" +#include "virobject.h" #define VIR_FROM_THIS VIR_FROM_STORAGE @@ -228,6 +230,114 @@ runIOBasic(const char *path, int fd, int oflags, unsigned long long length) static int +runIOFormatted(const char *path, + int fd, + int oflags, + unsigned long long length) +{ + int ret = -1; + int fdin, fdout; + const char *fdinname, *fdoutname; + unsigned long long total = 0; + iohelperCtlPtr ioCtl = NULL; + char *buf = NULL; + size_t bufLen = 1024 * 1024; + bool formattedIN, formattedOUT; + + if (VIR_ALLOC_N(buf, bufLen) < 0) + goto cleanup; + + if (setupFDs(path, fd, oflags, + &fdin, &fdinname, + &fdout, &fdoutname) < 0) + goto cleanup; + + /* Maybe this looks a bit silly. But it's simple. Either we + * are reading from @fd and writing to stdout, or we're + * reading from stdin and writing to @fd. But the formatted + * messages occurs just on std* not @fd. */ + formattedIN = fdout == fd; + formattedOUT = fdin == fd; + + if (!(ioCtl = iohelperCtlNew(formattedIN ? fdin : fdout, true))) + goto cleanup; + + while (true) { + ssize_t nread, nwritten, want = bufLen; + int inData = 1; + unsigned long long sectionLength; + + if (formattedOUT) { + if (virFileInData(fdin, &inData, §ionLength) < 0) + goto cleanup; + + if (!inData) { + if (iohelperSkip(ioCtl, sectionLength) < 0) + goto cleanup; + if (!sectionLength) + break; + if (lseek(fdin, sectionLength, SEEK_CUR) == (off_t) -1) { + virReportSystemError(errno, + _("Unable to seek in %s"), fdoutname); + goto cleanup; + } + continue; + } else { + want = MIN(sectionLength, bufLen); + } + } else { + if (iohelperInData(ioCtl, &inData, §ionLength) < 0) + goto cleanup; + + if (!inData) { + if (!sectionLength) + break; + + if (lseek(fdout, sectionLength, SEEK_CUR) == (off_t) -1) { + virReportSystemError(errno, + _("Unable to seek in %s"), fdoutname); + goto cleanup; + } + continue; + } else { + want = MIN(sectionLength, bufLen); + } + } + + if (length && + (length - total) < want) + want = length - total; + + if (want == 0) + break; /* End of requested data from client */ + + if ((formattedIN && (nread = iohelperRead(ioCtl, buf, want)) < 0) || + (!formattedIN && (nread = saferead(fdin, buf, want)) < 0)) { + virReportSystemError(errno, _("Unable to read %s"), fdinname); + goto cleanup; + } + + if (!nread) + break; + + total += nread; + + if ((formattedOUT && (nwritten = iohelperWrite(ioCtl, buf, nread)) < 0) || + (!formattedOUT && (nwritten = safewrite(fdout, buf, nread)) < 0)) { + virReportSystemError(errno, _("Unable to write %s"), fdoutname); + goto cleanup; + } + } + + ret = 0; + cleanup: + virObjectUnref(ioCtl); + VIR_FREE(buf); + return ret; +} + + +static int runIO(const char *path, int fd, int oflags, unsigned long long length, bool sparse) { @@ -240,6 +350,9 @@ runIO(const char *path, int fd, int oflags, return -1; } + if (sparse) + return runIOFormatted(path, fd, oflags, length); + return runIOBasic(path, fd, oflags, length); } -- 2.8.4
participants (2)
-
Daniel P. Berrange
-
Michal Privoznik