[libvirt] [PATCH 00/27] Introduce sparse streams

From users POV they just need to pass correct argument to 'vol-upload' or 'vol-download' virsh commands. One layer down, on
So, after couple of sleepless nights and headaches I'm proud to announce that finally got this working. What? Our regular streams that are can be used to transfer disk images for domains are unaware of any sparseness. Therefore they have two limitations: a) transferring big but sparse image can take ages as all the holes (interpreted by kernel as '\0') have to go through our event loop. b) resulting volume is not sparse even if the source was. How? I went by verified approach that linux kernel has. One way to look at our streams is just like read() and write() with a different names: virStreamRecv() and virStreamSend(). They even have the same prototype (if 'int fd' is substituted with 'virStreamPtr'). Now, holes in files are created and detected via third API: lseek(). Therefore I'm introducing new virStreamSkip() API that mimics the missing primitive. Now, because our streams do not necessarily have to work over files (they are for generic data transfer), I had to let users register a callback that is called whenever the other side calls virStreamSkip(). So now that we have all three primitives, we can focus on making life easier for our users. Nobody is actually using bare virStreamSend() and virStreamRecv() rather than our wrappers: virStreamSendAll() and virStreamRecvAll(). With my approach described above just virStreamSendAll() needs to be adjusted so that it's 'sparse file' aware. The virStreamRecvAll() will only get the data to write (just like it is now) with skip callback called automatically whenever needed. In order for virStreamSendAll() to skip holes I'm introducing yet another callback: virStreamInDataFunc(). This callback will help us to create a map of a file: before each virStreamSend() it checks whether we are in a data section or a hole and calls virStreamSend() or virStreamSkip() respectively. Do not worry - it will all become clear once you see the code. Now question is - how will users enable this feature? I mean, we have take into account that we might be talking to an older daemon that does not know how to skip a hole. Or vice versa - older client. The solution I came up with is to introduce flags to APIs where sparse streams make sense. I guess it makes sense for volume upload and download, but almost certainly makes no sense for virDomainOpenConsole(). Code? programming level they need to merely: st = virStreamNew(conn, 0); virStreamRegisterSkip(st, skipFunc, &fd); virStorageVolDownload(st, ...); virStreamRecvAll(st, sinkFunc, &fd); where: int skipFunc(virStreamPtr st, unsigned long long offset, void *opaque) { int *fd = opaque; return lseek(*fd, offset, SEEK_CUR) == (off_t) -1 ? -1 : 0; } And for uploading it's slightly more verbose - see patch 24. Limitations? While testing this on my machine with XFS, I've noticed that the resulting map of a transferred file is not exactly the same as the source's. Checksums are the same though. After digging deeper I found this commit in the kernel: http://git.kernel.org/cgit/linux/kernel/git/torvalds/linux.git/commit/?id=05... Thing is, as we transfer the file, we are practically just seeking at EOF and thus creating holes. But if the hole size is small enough, XFS will use some speculative file allocation algorithm and eventually fully allocate the blocks even if we intended to create a hole. This does not occur when punching a hole into a file though. Well, I guess XFS devels have some reasons to do that. This behaviour has not been observed on EXT4. Notes? Oh, patches 01-03 have been ACKed already, but are not pushed yet because of the freeze. But since this feature build on the top of them, I'm sending them too. Also the whole patch set is accessible at my github: https://github.com/zippy2/libvirt/tree/sparse_streams4 Michal Privoznik (27): Revert "rpc: Fix slow volume download (virsh vol-download)" virnetclientstream: Process stream messages later virStream{Recv,Send}All: Increase client buffer Introduce virStreamSkip Introduce virStreamRegisterSkip and virStreamSkipCallback Introduce virStreamInData and virStreamRegisterInData virNetClientStreamNew: Track origin stream Track if stream is seekable RPC: Introduce virNetStreamSkip Introduce VIR_NET_STREAM_SKIP message type Teach wireshark plugin about VIR_NET_STREAM_SKIP daemon: Implement VIR_NET_STREAM_SKIP handling daemon: Introduce virNetServerProgramSendStreamSkip virnetclientstream: Introduce virNetClientStreamSendSkip virnetclientstream: Introduce virNetClientStreamHandleSkip remote_driver: Implement virStreamSkip daemonStreamHandleRead: Wire up seekable stream virNetClientStream: Wire up VIR_NET_STREAM_SKIP virStreamSendAll: Wire up sparse streams fdstream: Implement seek gendispatch: Introduce @sparseflag for our calls Introduce virStorageVol{Download,Upload}Flags virsh: Implement sparse stream to vol-download virsh: Implement sparse stream to vol-upload fdstream: Suppress use of IO helper for sparse streams daemon: Don't call virStreamInData so often storage: Enable sparse streams for virStorageVol{Download,Upload} daemon/remote.c | 2 +- daemon/stream.c | 134 +++++++++++++-- daemon/stream.h | 3 +- include/libvirt/libvirt-storage.h | 9 + include/libvirt/libvirt-stream.h | 47 ++++++ src/datatypes.h | 8 + src/driver-stream.h | 5 + src/fdstream.c | 156 +++++++++++++++--- src/fdstream.h | 3 +- src/libvirt-storage.c | 4 +- src/libvirt-stream.c | 238 ++++++++++++++++++++++++++- src/libvirt_internal.h | 7 + src/libvirt_private.syms | 2 + src/libvirt_public.syms | 7 + src/libvirt_remote.syms | 2 + src/remote/remote_driver.c | 41 ++++- src/remote/remote_protocol.x | 2 + src/rpc/gendispatch.pl | 21 ++- src/rpc/virnetclient.c | 1 + src/rpc/virnetclientstream.c | 308 ++++++++++++++++++++++++----------- src/rpc/virnetclientstream.h | 10 +- src/rpc/virnetprotocol.x | 16 +- src/rpc/virnetserverprogram.c | 33 ++++ src/rpc/virnetserverprogram.h | 7 + src/storage/storage_backend.c | 12 +- src/storage/storage_driver.c | 4 +- src/virnetprotocol-structs | 4 + tools/virsh-volume.c | 40 ++++- tools/virsh.c | 79 +++++++++ tools/virsh.h | 12 ++ tools/virsh.pod | 6 +- tools/wireshark/src/packet-libvirt.c | 40 +++++ tools/wireshark/src/packet-libvirt.h | 2 + 33 files changed, 1104 insertions(+), 161 deletions(-) -- 2.8.1

This reverts commit d9c9e138f22c48626f719f880920e04c639e0177. Unfortunately, things are going to be handled differently so this commit must go. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/rpc/virnetclientstream.c | 152 +++++++++++++++---------------------------- 1 file changed, 53 insertions(+), 99 deletions(-) diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index 64e9cd2..b428f4b 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -49,9 +49,9 @@ struct _virNetClientStream { * time by stopping consuming any incoming data * off the socket.... */ - struct iovec *incomingVec; /* I/O Vector to hold data */ - size_t writeVec; /* Vectors produced */ - size_t readVec; /* Vectors consumed */ + char *incoming; + size_t incomingOffset; + size_t incomingLength; bool incomingEOF; virNetClientStreamEventCallback cb; @@ -86,9 +86,9 @@ virNetClientStreamEventTimerUpdate(virNetClientStreamPtr st) if (!st->cb) return; - VIR_DEBUG("Check timer readVec %zu writeVec %zu %d", st->readVec, st->writeVec, st->cbEvents); + VIR_DEBUG("Check timer offset=%zu %d", st->incomingOffset, st->cbEvents); - if ((((st->readVec < st->writeVec) || st->incomingEOF) && + if (((st->incomingOffset || st->incomingEOF) && (st->cbEvents & VIR_STREAM_EVENT_READABLE)) || (st->cbEvents & VIR_STREAM_EVENT_WRITABLE)) { VIR_DEBUG("Enabling event timer"); @@ -110,14 +110,13 @@ virNetClientStreamEventTimer(int timer ATTRIBUTE_UNUSED, void *opaque) if (st->cb && (st->cbEvents & VIR_STREAM_EVENT_READABLE) && - ((st->readVec < st->writeVec) || st->incomingEOF)) + (st->incomingOffset || st->incomingEOF)) events |= VIR_STREAM_EVENT_READABLE; if (st->cb && (st->cbEvents & VIR_STREAM_EVENT_WRITABLE)) events |= VIR_STREAM_EVENT_WRITABLE; - VIR_DEBUG("Got Timer dispatch %d %d readVec %zu writeVec %zu", events, st->cbEvents, - st->readVec, st->writeVec); + VIR_DEBUG("Got Timer dispatch %d %d offset=%zu", events, st->cbEvents, st->incomingOffset); if (events) { virNetClientStreamEventCallback cb = st->cb; void *cbOpaque = st->cbOpaque; @@ -162,7 +161,7 @@ void virNetClientStreamDispose(void *obj) virNetClientStreamPtr st = obj; virResetError(&st->err); - VIR_FREE(st->incomingVec); + VIR_FREE(st->incoming); virObjectUnref(st->prog); } @@ -266,50 +265,38 @@ int virNetClientStreamQueuePacket(virNetClientStreamPtr st, virNetMessagePtr msg) { int ret = -1; - struct iovec iov; - char *base; - size_t piece, pieces, length, offset = 0, size = 1024*1024; + size_t need; virObjectLock(st); + need = msg->bufferLength - msg->bufferOffset; + if (need) { + size_t avail = st->incomingLength - st->incomingOffset; + if (need > avail) { + size_t extra = need - avail; + if (VIR_REALLOC_N(st->incoming, + st->incomingLength + extra) < 0) { + VIR_DEBUG("Out of memory handling stream data"); + goto cleanup; + } + st->incomingLength += extra; + } - length = msg->bufferLength - msg->bufferOffset; - - if (length == 0) { + memcpy(st->incoming + st->incomingOffset, + msg->buffer + msg->bufferOffset, + msg->bufferLength - msg->bufferOffset); + st->incomingOffset += (msg->bufferLength - msg->bufferOffset); + } else { st->incomingEOF = true; - goto end; } - pieces = VIR_DIV_UP(length, size); - for (piece = 0; piece < pieces; piece++) { - if (size > length - offset) - size = length - offset; - - if (VIR_ALLOC_N(base, size)) { - VIR_DEBUG("Allocation failed"); - goto cleanup; - } - - memcpy(base, msg->buffer + msg->bufferOffset + offset, size); - iov.iov_base = base; - iov.iov_len = size; - offset += size; - - if (VIR_APPEND_ELEMENT(st->incomingVec, st->writeVec, iov) < 0) { - VIR_DEBUG("Append failed"); - VIR_FREE(base); - goto cleanup; - } - VIR_DEBUG("Wrote piece of vector. readVec %zu, writeVec %zu size %zu", - st->readVec, st->writeVec, size); - } - - end: + VIR_DEBUG("Stream incoming data offset %zu length %zu EOF %d", + st->incomingOffset, st->incomingLength, + st->incomingEOF); virNetClientStreamEventTimerUpdate(st); + ret = 0; cleanup: - VIR_DEBUG("Stream incoming data readVec %zu writeVec %zu EOF %d", - st->readVec, st->writeVec, st->incomingEOF); virObjectUnlock(st); return ret; } @@ -374,21 +361,17 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, size_t nbytes, bool nonblock) { - int ret = -1; - size_t partial, offset; - - virObjectLock(st); - + int rv = -1; VIR_DEBUG("st=%p client=%p data=%p nbytes=%zu nonblock=%d", st, client, data, nbytes, nonblock); - - if ((st->readVec >= st->writeVec) && !st->incomingEOF) { + virObjectLock(st); + if (!st->incomingOffset && !st->incomingEOF) { virNetMessagePtr msg; - int rv; + int ret; if (nonblock) { VIR_DEBUG("Non-blocking mode and no data available"); - ret = -2; + rv = -2; goto cleanup; } @@ -404,66 +387,37 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, VIR_DEBUG("Dummy packet to wait for stream data"); virObjectUnlock(st); - rv = virNetClientSendWithReplyStream(client, msg, st); + ret = virNetClientSendWithReplyStream(client, msg, st); virObjectLock(st); virNetMessageFree(msg); - if (rv < 0) + if (ret < 0) goto cleanup; } - offset = 0; - partial = nbytes; - - while (st->incomingVec && (st->readVec < st->writeVec)) { - struct iovec *iov = st->incomingVec + st->readVec; - - if (!iov || !iov->iov_base) { - virReportError(VIR_ERR_INTERNAL_ERROR, - "%s", _("NULL pointer encountered")); - goto cleanup; - } - - if (partial < iov->iov_len) { - memcpy(data+offset, iov->iov_base, partial); - memmove(iov->iov_base, (char*)iov->iov_base+partial, - iov->iov_len-partial); - iov->iov_len -= partial; - offset += partial; - VIR_DEBUG("Consumed %zu, left %zu", partial, iov->iov_len); - break; + VIR_DEBUG("After IO %zu", st->incomingOffset); + if (st->incomingOffset) { + int want = st->incomingOffset; + if (want > nbytes) + want = nbytes; + memcpy(data, st->incoming, want); + if (want < st->incomingOffset) { + memmove(st->incoming, st->incoming + want, st->incomingOffset - want); + st->incomingOffset -= want; + } else { + VIR_FREE(st->incoming); + st->incomingOffset = st->incomingLength = 0; } - - memcpy(data+offset, iov->iov_base, iov->iov_len); - VIR_DEBUG("Consumed %zu. Moving to next piece", iov->iov_len); - partial -= iov->iov_len; - offset += iov->iov_len; - VIR_FREE(iov->iov_base); - iov->iov_len = 0; - st->readVec++; - - VIR_DEBUG("Read piece of vector. read %zu, readVec %zu, writeVec %zu", - offset, st->readVec, st->writeVec); - } - - /* Shrink the I/O Vector buffer to free up memory. Do the - shrinking only when there is selected amount or more buffers to - free so it doesn't constantly memmove() and realloc() buffers. - */ - if (st->readVec >= 16) { - memmove(st->incomingVec, st->incomingVec + st->readVec, - sizeof(*st->incomingVec)*(st->writeVec - st->readVec)); - VIR_SHRINK_N(st->incomingVec, st->writeVec, st->readVec); - VIR_DEBUG("shrink removed %zu, left %zu", st->readVec, st->writeVec); - st->readVec = 0; + rv = want; + } else { + rv = 0; } - ret = offset; virNetClientStreamEventTimerUpdate(st); cleanup: virObjectUnlock(st); - return ret; + return rv; } -- 2.8.1

There are two functions on the client that handle incoming stream data. The first one virNetClientStreamQueuePacket() is a low level function that just processes the incoming stream data from the socket and stores it into an internal structure. This happens in the client event loop therefore the shorter the callbacks are, the better. The second function virNetClientStreamRecvPacket() then handles copying data from internal structure into a client provided buffer. Change introduced in this commit makes just that: new queue for incoming stream packets is introduced. Then instead of copying data into intermediate internal buffer and then copying them into user buffer, incoming stream messages are queue into the queue and data is copied just once - in the upper layer function virNetClientStreamRecvPacket(). In the end, there's just one copying of data and therefore shorter event loop callback. This should boost the performance which has proven to be the case in my testing. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/rpc/virnetclientstream.c | 106 ++++++++++++++++++++++--------------------- 1 file changed, 54 insertions(+), 52 deletions(-) diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index b428f4b..34989a9 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -49,9 +49,7 @@ struct _virNetClientStream { * time by stopping consuming any incoming data * off the socket.... */ - char *incoming; - size_t incomingOffset; - size_t incomingLength; + virNetMessagePtr rx; bool incomingEOF; virNetClientStreamEventCallback cb; @@ -86,9 +84,9 @@ virNetClientStreamEventTimerUpdate(virNetClientStreamPtr st) if (!st->cb) return; - VIR_DEBUG("Check timer offset=%zu %d", st->incomingOffset, st->cbEvents); + VIR_DEBUG("Check timer rx=%p cbEvents=%d", st->rx, st->cbEvents); - if (((st->incomingOffset || st->incomingEOF) && + if (((st->rx || st->incomingEOF) && (st->cbEvents & VIR_STREAM_EVENT_READABLE)) || (st->cbEvents & VIR_STREAM_EVENT_WRITABLE)) { VIR_DEBUG("Enabling event timer"); @@ -110,13 +108,13 @@ virNetClientStreamEventTimer(int timer ATTRIBUTE_UNUSED, void *opaque) if (st->cb && (st->cbEvents & VIR_STREAM_EVENT_READABLE) && - (st->incomingOffset || st->incomingEOF)) + (st->rx || st->incomingEOF)) events |= VIR_STREAM_EVENT_READABLE; if (st->cb && (st->cbEvents & VIR_STREAM_EVENT_WRITABLE)) events |= VIR_STREAM_EVENT_WRITABLE; - VIR_DEBUG("Got Timer dispatch %d %d offset=%zu", events, st->cbEvents, st->incomingOffset); + VIR_DEBUG("Got Timer dispatch events=%d cbEvents=%d rx=%p", events, st->cbEvents, st->rx); if (events) { virNetClientStreamEventCallback cb = st->cb; void *cbOpaque = st->cbOpaque; @@ -161,7 +159,11 @@ void virNetClientStreamDispose(void *obj) virNetClientStreamPtr st = obj; virResetError(&st->err); - VIR_FREE(st->incoming); + while (st->rx) { + virNetMessagePtr msg = st->rx; + virNetMessageQueueServe(&st->rx); + virNetMessageFree(msg); + } virObjectUnref(st->prog); } @@ -264,41 +266,34 @@ int virNetClientStreamSetError(virNetClientStreamPtr st, int virNetClientStreamQueuePacket(virNetClientStreamPtr st, virNetMessagePtr msg) { - int ret = -1; - size_t need; + virNetMessagePtr tmp_msg; + + VIR_DEBUG("Incoming stream message: stream=%p message=%p", st, msg); + + /* Unfortunately, we must allocate new message as the one we + * get in @msg is going to be cleared later in the process. */ + + if (!(tmp_msg = virNetMessageNew(false))) + return -1; + + /* Copy header */ + memcpy(&tmp_msg->header, &msg->header, sizeof(msg->header)); + + /* Steal message buffer */ + tmp_msg->buffer = msg->buffer; + tmp_msg->bufferLength = msg->bufferLength; + tmp_msg->bufferOffset = msg->bufferOffset; + msg->buffer = NULL; + msg->bufferLength = msg->bufferOffset = 0; virObjectLock(st); - need = msg->bufferLength - msg->bufferOffset; - if (need) { - size_t avail = st->incomingLength - st->incomingOffset; - if (need > avail) { - size_t extra = need - avail; - if (VIR_REALLOC_N(st->incoming, - st->incomingLength + extra) < 0) { - VIR_DEBUG("Out of memory handling stream data"); - goto cleanup; - } - st->incomingLength += extra; - } - memcpy(st->incoming + st->incomingOffset, - msg->buffer + msg->bufferOffset, - msg->bufferLength - msg->bufferOffset); - st->incomingOffset += (msg->bufferLength - msg->bufferOffset); - } else { - st->incomingEOF = true; - } + virNetMessageQueuePush(&st->rx, tmp_msg); - VIR_DEBUG("Stream incoming data offset %zu length %zu EOF %d", - st->incomingOffset, st->incomingLength, - st->incomingEOF); virNetClientStreamEventTimerUpdate(st); - ret = 0; - - cleanup: virObjectUnlock(st); - return ret; + return 0; } @@ -362,10 +357,12 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, bool nonblock) { int rv = -1; + size_t want; + VIR_DEBUG("st=%p client=%p data=%p nbytes=%zu nonblock=%d", st, client, data, nbytes, nonblock); virObjectLock(st); - if (!st->incomingOffset && !st->incomingEOF) { + if (!st->rx && !st->incomingEOF) { virNetMessagePtr msg; int ret; @@ -395,23 +392,28 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, goto cleanup; } - VIR_DEBUG("After IO %zu", st->incomingOffset); - if (st->incomingOffset) { - int want = st->incomingOffset; - if (want > nbytes) - want = nbytes; - memcpy(data, st->incoming, want); - if (want < st->incomingOffset) { - memmove(st->incoming, st->incoming + want, st->incomingOffset - want); - st->incomingOffset -= want; - } else { - VIR_FREE(st->incoming); - st->incomingOffset = st->incomingLength = 0; + VIR_DEBUG("After IO rx=%p", st->rx); + want = nbytes; + while (want && st->rx) { + virNetMessagePtr msg = st->rx; + size_t len = want; + + if (len > msg->bufferLength - msg->bufferOffset) + len = msg->bufferLength - msg->bufferOffset; + + if (!len) + break; + + memcpy(data + (nbytes - want), msg->buffer + msg->bufferOffset, len); + want -= len; + msg->bufferOffset += len; + + if (msg->bufferOffset == msg->bufferLength) { + virNetMessageQueueServe(&st->rx); + virNetMessageFree(msg); } - rv = want; - } else { - rv = 0; } + rv = nbytes - want; virNetClientStreamEventTimerUpdate(st); -- 2.8.1

These are wrappers over virStreamRecv and virStreamSend so that users have to care about nothing but writing data into / reading data from a sink (typically a file). Note, that these wrappers are used exclusively on client side as the daemon has slightly different approach. Anyway, the wrappers allocate this buffer and use it for intermediate data storage until the data is passed to stream to send, or to the client application. So far, we are using 64KB buffer. This is enough, but suboptimal because server can send messages up to VIR_NET_MESSAGE_LEGACY_PAYLOAD_MAX bytes big (262120B, roughly 256KB). So if we make the buffer this big, a single message containing the data is sent instead of four, which is current situation. This means lower overhead, because each message contains a header which needs to be processed, each message is processed roughly same amount of time regardless of its size, less bytes need to be sent through the wire, and so on. Note that since server will never sent us a stream message bigger than VIR_NET_MESSAGE_LEGACY_PAYLOAD_MAX there's no point in sizing up the client buffer past this threshold. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/libvirt-stream.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/libvirt-stream.c b/src/libvirt-stream.c index c16f586..8384b37 100644 --- a/src/libvirt-stream.c +++ b/src/libvirt-stream.c @@ -23,6 +23,7 @@ #include "datatypes.h" #include "viralloc.h" #include "virlog.h" +#include "rpc/virnetprotocol.h" VIR_LOG_INIT("libvirt.stream"); @@ -330,7 +331,7 @@ virStreamSendAll(virStreamPtr stream, void *opaque) { char *bytes = NULL; - int want = 1024*64; + size_t want = VIR_NET_MESSAGE_LEGACY_PAYLOAD_MAX; int ret = -1; VIR_DEBUG("stream=%p, handler=%p, opaque=%p", stream, handler, opaque); @@ -423,7 +424,7 @@ virStreamRecvAll(virStreamPtr stream, void *opaque) { char *bytes = NULL; - int want = 1024*64; + size_t want = VIR_NET_MESSAGE_LEGACY_PAYLOAD_MAX; int ret = -1; VIR_DEBUG("stream=%p, handler=%p, opaque=%p", stream, handler, opaque); -- 2.8.1

This API can be used to tell the other side of the stream to skip some bytes in the stream. This can be used to create a sparse file on the receiving side of a stream. It takes just one argument @offset, which says how big the hole is. Since our streams are not rewindable like regular files, we don't need @whence argument like seek(2) has. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- include/libvirt/libvirt-stream.h | 3 +++ src/driver-stream.h | 5 +++++ src/libvirt-stream.c | 40 ++++++++++++++++++++++++++++++++++++++++ src/libvirt_public.syms | 5 +++++ 4 files changed, 53 insertions(+) diff --git a/include/libvirt/libvirt-stream.h b/include/libvirt/libvirt-stream.h index 831640d..f12d695 100644 --- a/include/libvirt/libvirt-stream.h +++ b/include/libvirt/libvirt-stream.h @@ -45,6 +45,9 @@ int virStreamRecv(virStreamPtr st, char *data, size_t nbytes); +int virStreamSkip(virStreamPtr st, + unsigned long long offset); + /** * virStreamSourceFunc: diff --git a/src/driver-stream.h b/src/driver-stream.h index 85b4e3b..786d7b6 100644 --- a/src/driver-stream.h +++ b/src/driver-stream.h @@ -36,6 +36,10 @@ typedef int size_t nbytes); typedef int +(*virDrvStreamSkip)(virStreamPtr st, + unsigned long long offset); + +typedef int (*virDrvStreamEventAddCallback)(virStreamPtr stream, int events, virStreamEventCallback cb, @@ -61,6 +65,7 @@ typedef virStreamDriver *virStreamDriverPtr; struct _virStreamDriver { virDrvStreamSend streamSend; virDrvStreamRecv streamRecv; + virDrvStreamSkip streamSkip; virDrvStreamEventAddCallback streamEventAddCallback; virDrvStreamEventUpdateCallback streamEventUpdateCallback; virDrvStreamEventRemoveCallback streamEventRemoveCallback; diff --git a/src/libvirt-stream.c b/src/libvirt-stream.c index 8384b37..1c9a12b 100644 --- a/src/libvirt-stream.c +++ b/src/libvirt-stream.c @@ -286,6 +286,46 @@ virStreamRecv(virStreamPtr stream, /** + * virStreamSkip: + * @stream: pointer to the stream object + * @offset: number of bytes to skip + * + * Skip @offset bytes in the stream. This is useful when there's + * no actual data in the stream, just a hole. If that's the case, + * this API can be used to skip the hole properly instead of + * transmitting zeroes to the other side. + * + * Returns 0 on success, + * -1 error + */ +int +virStreamSkip(virStreamPtr stream, + unsigned long long offset) +{ + VIR_DEBUG("stream=%p, offset=%llu", stream, offset); + + virResetLastError(); + + virCheckStreamReturn(stream, -1); + + if (stream->driver && + stream->driver->streamSkip) { + int ret; + ret = (stream->driver->streamSkip)(stream, offset); + if (ret < 0) + goto error; + return ret; + } + + virReportUnsupportedError(); + + error: + virDispatchError(stream->conn); + return -1; +} + + +/** * virStreamSendAll: * @stream: pointer to the stream object * @handler: source callback for reading data from application diff --git a/src/libvirt_public.syms b/src/libvirt_public.syms index 1e920d6..b4c693a 100644 --- a/src/libvirt_public.syms +++ b/src/libvirt_public.syms @@ -732,4 +732,9 @@ LIBVIRT_1.3.3 { virDomainSetPerfEvents; } LIBVIRT_1.2.19; +LIBVIRT_1.3.5 { + global: + virStreamSkip; +} LIBVIRT_1.3.3; + # .... define new API here using predicted next version number .... -- 2.8.1

On 04/28/2016 04:04 AM, Michal Privoznik wrote:
This API can be used to tell the other side of the stream to skip some bytes in the stream. This can be used to create a sparse file on the receiving side of a stream.
It takes just one argument @offset, which says how big the hole is. Since our streams are not rewindable like regular files, we don't need @whence argument like seek(2) has.
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- include/libvirt/libvirt-stream.h | 3 +++ src/driver-stream.h | 5 +++++ src/libvirt-stream.c | 40 ++++++++++++++++++++++++++++++++++++++++ src/libvirt_public.syms | 5 +++++ 4 files changed, 53 insertions(+)
+++ b/src/libvirt-stream.c @@ -286,6 +286,46 @@ virStreamRecv(virStreamPtr stream,
/** + * virStreamSkip: + * @stream: pointer to the stream object + * @offset: number of bytes to skip + * + * Skip @offset bytes in the stream. This is useful when there's + * no actual data in the stream, just a hole. If that's the case, + * this API can be used to skip the hole properly instead of + * transmitting zeroes to the other side. + * + * Returns 0 on success, + * -1 error + */ +int +virStreamSkip(virStreamPtr stream, + unsigned long long offset)
'offset' is a bit misleading - you're not skipping _to_ the given offset, so much as _over_ length bytes. I'd name it 'length'. Otherwise looks okay. -- Eric Blake eblake redhat com +1-919-301-3266 Libvirt virtualization library http://libvirt.org

On 05/05/2016 08:37 AM, Eric Blake wrote:
/** + * virStreamSkip: + * @stream: pointer to the stream object + * @offset: number of bytes to skip + * + * Skip @offset bytes in the stream. This is useful when there's + * no actual data in the stream, just a hole. If that's the case, + * this API can be used to skip the hole properly instead of + * transmitting zeroes to the other side. + * + * Returns 0 on success, + * -1 error + */ +int +virStreamSkip(virStreamPtr stream, + unsigned long long offset)
'offset' is a bit misleading - you're not skipping _to_ the given offset, so much as _over_ length bytes. I'd name it 'length'.
Otherwise looks okay.
Also, the interface doesn't show it, but I hope the implementation recognizes a malicious remote side that tries to skip beyond the end of the stream. Do we need to specifically call out in the docs what happens if 'offset' (or renamed 'length') attempts to go beyond the stream size? Do you skip to EOF, or leave the current position unchanged, so that the next real attempt will just work? Would this be a way to probe whether a stream supports skips, by attempting to skip to ULLONG_MAX (which is ALWAYS beyond the current stream size, since it is larger than the maximum signed off_t value), then detecting a difference in errors between 'sparse stream but seek too large' vs. 'not sparse stream')? -- Eric Blake eblake redhat com +1-919-301-3266 Libvirt virtualization library http://libvirt.org

On 05.05.2016 16:37, Eric Blake wrote:
On 04/28/2016 04:04 AM, Michal Privoznik wrote:
This API can be used to tell the other side of the stream to skip some bytes in the stream. This can be used to create a sparse file on the receiving side of a stream.
It takes just one argument @offset, which says how big the hole is. Since our streams are not rewindable like regular files, we don't need @whence argument like seek(2) has.
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- include/libvirt/libvirt-stream.h | 3 +++ src/driver-stream.h | 5 +++++ src/libvirt-stream.c | 40 ++++++++++++++++++++++++++++++++++++++++ src/libvirt_public.syms | 5 +++++ 4 files changed, 53 insertions(+)
+++ b/src/libvirt-stream.c @@ -286,6 +286,46 @@ virStreamRecv(virStreamPtr stream,
/** + * virStreamSkip: + * @stream: pointer to the stream object + * @offset: number of bytes to skip + * + * Skip @offset bytes in the stream. This is useful when there's + * no actual data in the stream, just a hole. If that's the case, + * this API can be used to skip the hole properly instead of + * transmitting zeroes to the other side. + * + * Returns 0 on success, + * -1 error + */ +int +virStreamSkip(virStreamPtr stream, + unsigned long long offset)
'offset' is a bit misleading - you're not skipping _to_ the given offset, so much as _over_ length bytes. I'd name it 'length'.
Otherwise looks okay.
Ah, good point. Whilst implementing this I've balanced between Seek and Skip back and forth. That's why I call streem seekable even if it's really just skip what is implemented here. Michal

The former is a public API and registers a callback that will be called whenever the other side of a stream calls virStreamSkip. The latter is a wrapper that actually calls the callback. It is not made public as it is intended to be used purely internally. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- include/libvirt/libvirt-stream.h | 21 ++++++++++++ src/datatypes.h | 6 ++++ src/libvirt-stream.c | 70 ++++++++++++++++++++++++++++++++++++++++ src/libvirt_internal.h | 4 +++ src/libvirt_private.syms | 1 + src/libvirt_public.syms | 1 + 6 files changed, 103 insertions(+) diff --git a/include/libvirt/libvirt-stream.h b/include/libvirt/libvirt-stream.h index f12d695..1a5286a 100644 --- a/include/libvirt/libvirt-stream.h +++ b/include/libvirt/libvirt-stream.h @@ -45,6 +45,27 @@ int virStreamRecv(virStreamPtr st, char *data, size_t nbytes); +/** + * virStreamSkipFunc: + * @stream: stream + * @offset: size of hole in bytes + * @opaque: optional application provided data + * + * This callback is called whenever the other side of @stream is + * willing to skip a hole in the stream. The @offset argument + * then contains the size of hole in bytes. + * + * Returns 0 on success, + * -1 otherwise. + */ +typedef int (*virStreamSkipFunc)(virStreamPtr stream, + unsigned long long offset, + void *opaque); + +int virStreamRegisterSkip(virStreamPtr stream, + virStreamSkipFunc skipCb, + void *opaque); + int virStreamSkip(virStreamPtr st, unsigned long long offset); diff --git a/src/datatypes.h b/src/datatypes.h index 92e6863..169fc46 100644 --- a/src/datatypes.h +++ b/src/datatypes.h @@ -568,6 +568,12 @@ struct _virStream { virStreamDriverPtr driver; void *privateData; + + /* Unfortunately, this can't go into virStreamDriver because + * when register function for skipCb is called, @driver + * is not populated yet. */ + virStreamSkipFunc skipCb; + void *skipCbOpaque; }; /** diff --git a/src/libvirt-stream.c b/src/libvirt-stream.c index 1c9a12b..58665f1 100644 --- a/src/libvirt-stream.c +++ b/src/libvirt-stream.c @@ -286,6 +286,76 @@ virStreamRecv(virStreamPtr stream, /** + * virStreamRegisterSkip: + * @stream: stream + * @skipCb: callback function + * @opaque: optional application provided data + * + * This function registers callback that will be called whenever + * the other side of the @stream is willing to skip a hole in the + * stream. + * + * Returns 0 on success, + * -1 otherwise. + */ +int +virStreamRegisterSkip(virStreamPtr stream, + virStreamSkipFunc skipCb, + void *opaque) +{ + VIR_DEBUG("stream=%p, skipCb=%p opaque=%p", stream, skipCb, opaque); + + virResetLastError(); + + virCheckStreamReturn(stream, -1); + virCheckNonNullArgReturn(skipCb, -1); + + if (stream->skipCb) { + virReportError(VIR_ERR_OPERATION_INVALID, "%s", + _("A skip callback is already registered")); + return -1; + } + + stream->skipCb = skipCb; + stream->skipCbOpaque = opaque; + return 0; +} + + +/** + * virStreamSkipCallback: + * @stream: stream + * @offset: stream offset + * + * Call previously registered skip callback. + * + * Returns 0 on success, + * -1 otherwise. + */ +int +virStreamSkipCallback(virStreamPtr stream, + unsigned long long offset) +{ + VIR_DEBUG("stream=%p, offset=%llu", stream, offset); + + virCheckStreamReturn(stream, -1); + + if (stream->skipCb) { + int ret; + ret = (stream->skipCb)(stream, offset, stream->skipCbOpaque); + if (ret < 0) + goto error; + return 0; + } + + virReportUnsupportedError(); + error: + virDispatchError(stream->conn); + return -1; +} + + +/** * virStreamSkip: * @stream: pointer to the stream object * @offset: number of bytes to skip diff --git a/src/libvirt_internal.h b/src/libvirt_internal.h index 96439d8..7a75491 100644 --- a/src/libvirt_internal.h +++ b/src/libvirt_internal.h @@ -294,4 +294,8 @@ virTypedParameterValidateSet(virConnectPtr conn, virTypedParameterPtr params, int nparams); +int +virStreamSkipCallback(virStreamPtr stream, + unsigned long long offset); + #endif diff --git a/src/libvirt_private.syms b/src/libvirt_private.syms index 5030ec3..e83d5d6 100644 --- a/src/libvirt_private.syms +++ b/src/libvirt_private.syms @@ -1015,6 +1015,7 @@ virStateCleanup; virStateInitialize; virStateReload; virStateStop; +virStreamSkipCallback; # locking/domain_lock.h diff --git a/src/libvirt_public.syms b/src/libvirt_public.syms index b4c693a..0b80d27 100644 --- a/src/libvirt_public.syms +++ b/src/libvirt_public.syms @@ -735,6 +735,7 @@ LIBVIRT_1.3.3 { LIBVIRT_1.3.5 { global: virStreamSkip; + virStreamRegisterSkip; } LIBVIRT_1.3.3; # .... define new API here using predicted next version number .... -- 2.8.1

On 04/28/2016 04:04 AM, Michal Privoznik wrote:
The former is a public API and registers a callback that will be called whenever the other side of a stream calls virStreamSkip. The latter is a wrapper that actually calls the callback. It is not made public as it is intended to be used purely internally.
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> ---
+/** + * virStreamSkipFunc: + * @stream: stream + * @offset: size of hole in bytes
Again, naming this 'length' makes more sense (you're not skipping TO a particular offset, but OVER a given length).
+++ b/src/libvirt-stream.c @@ -286,6 +286,76 @@ virStreamRecv(virStreamPtr stream,
/** + * virStreamRegisterSkip: + * @stream: stream + * @skipCb: callback function + * @opaque: optional application provided data + * + * This function registers callback that will be called whenever
s/callback/a callback/
+ * the other side of the @stream is willing to skip a hole in the + * stream. + * + * Returns 0 on success, + * -1 otherwise. + */ +int +virStreamRegisterSkip(virStreamPtr stream, + virStreamSkipFunc skipCb, + void *opaque) +{ + VIR_DEBUG("stream=%p, skipCb=%p opaque=%p", stream, skipCb, opaque); + + virResetLastError(); + + virCheckStreamReturn(stream, -1); + virCheckNonNullArgReturn(skipCb, -1); + + if (stream->skipCb) { + virReportError(VIR_ERR_OPERATION_INVALID, "%s", + _("A skip callback is already registered")); + return -1;
I guess that means we allow passing skipCb=NULL to deregister a callback; does it need to be specifically documented? Are there scenarios where you WANT to deregister before closing something else, to make sure that a stale callback is not called during a race scenario?
+int +virStreamSkipCallback(virStreamPtr stream, + unsigned long long offset) +{ + VIR_DEBUG("stream=%p, offset=%llu", stream, offset); + + virCheckStreamReturn(stream, -1); + + if (stream->skipCb) { + int ret; + ret = (stream->skipCb)(stream, offset, stream->skipCbOpaque);
I might have omitted the () around stream->skipCb, but I don't know if we have a consistent style, and yours makes it obvious that we know we are dereferencing a function pointer.
+++ b/src/libvirt_public.syms @@ -735,6 +735,7 @@ LIBVIRT_1.3.3 { LIBVIRT_1.3.5 { global: virStreamSkip; + virStreamRegisterSkip;
Worth keeping sorted? -- Eric Blake eblake redhat com +1-919-301-3266 Libvirt virtualization library http://libvirt.org

On 05.05.2016 16:49, Eric Blake wrote:
On 04/28/2016 04:04 AM, Michal Privoznik wrote:
The former is a public API and registers a callback that will be called whenever the other side of a stream calls virStreamSkip. The latter is a wrapper that actually calls the callback. It is not made public as it is intended to be used purely internally.
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> ---
+/** + * virStreamSkipFunc: + * @stream: stream + * @offset: size of hole in bytes
Again, naming this 'length' makes more sense (you're not skipping TO a particular offset, but OVER a given length).
+++ b/src/libvirt-stream.c @@ -286,6 +286,76 @@ virStreamRecv(virStreamPtr stream,
/** + * virStreamRegisterSkip: + * @stream: stream + * @skipCb: callback function + * @opaque: optional application provided data + * + * This function registers callback that will be called whenever
s/callback/a callback/
+ * the other side of the @stream is willing to skip a hole in the + * stream. + * + * Returns 0 on success, + * -1 otherwise. + */ +int +virStreamRegisterSkip(virStreamPtr stream, + virStreamSkipFunc skipCb, + void *opaque) +{ + VIR_DEBUG("stream=%p, skipCb=%p opaque=%p", stream, skipCb, opaque); + + virResetLastError(); + + virCheckStreamReturn(stream, -1); + virCheckNonNullArgReturn(skipCb, -1); + + if (stream->skipCb) { + virReportError(VIR_ERR_OPERATION_INVALID, "%s", + _("A skip callback is already registered")); + return -1;
I guess that means we allow passing skipCb=NULL to deregister a callback; does it need to be specifically documented? Are there scenarios where you WANT to deregister before closing something else, to make sure that a stale callback is not called during a race scenario?
I haven't thought of that. I mean, the line just above this check prohibits skipCB being NULL. But it seems like this will be thrown away anyway.
+int +virStreamSkipCallback(virStreamPtr stream, + unsigned long long offset) +{ + VIR_DEBUG("stream=%p, offset=%llu", stream, offset); + + virCheckStreamReturn(stream, -1); + + if (stream->skipCb) { + int ret; + ret = (stream->skipCb)(stream, offset, stream->skipCbOpaque);
I might have omitted the () around stream->skipCb, but I don't know if we have a consistent style, and yours makes it obvious that we know we are dereferencing a function pointer.
+++ b/src/libvirt_public.syms @@ -735,6 +735,7 @@ LIBVIRT_1.3.3 { LIBVIRT_1.3.5 { global: virStreamSkip; + virStreamRegisterSkip;
Worth keeping sorted?
Sure. Michal

These functions will be called to determine whether underlying file that stream is transferring is currently in a data or hole. While virStreamRegisterInData is exposed, virStreamInData does not need to be made a public API as it will be called just internally. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- include/libvirt/libvirt-stream.h | 23 +++++++++++ src/datatypes.h | 8 ++-- src/libvirt-stream.c | 88 ++++++++++++++++++++++++++++++++++++++++ src/libvirt_internal.h | 3 ++ src/libvirt_private.syms | 1 + src/libvirt_public.syms | 1 + 6 files changed, 121 insertions(+), 3 deletions(-) diff --git a/include/libvirt/libvirt-stream.h b/include/libvirt/libvirt-stream.h index 1a5286a..3a0c986 100644 --- a/include/libvirt/libvirt-stream.h +++ b/include/libvirt/libvirt-stream.h @@ -69,6 +69,29 @@ int virStreamRegisterSkip(virStreamPtr stream, int virStreamSkip(virStreamPtr st, unsigned long long offset); +/** + * virStreamInDataFunc: + * @stream: stream + * @data: are we in data or hole + * @offset: offset to next section + * @opaque: optional application provided data + * + * This callback is called whenever virStreamInData needs to + * check whether @stream is in data section or in hole. Check + * description for virStreamInData for more detailed description. + * + * Returns 0 on success (with @data and @offset updated) + * -1 otherwise (with @data and @offset untouched) + */ +typedef int (*virStreamInDataFunc)(virStreamPtr stream, + int *data, + unsigned long long *offset, + void *opaque); + +int virStreamRegisterInData(virStreamPtr stream, + virStreamInDataFunc inDataCb, + void *opaque); + /** * virStreamSourceFunc: diff --git a/src/datatypes.h b/src/datatypes.h index 169fc46..41f1536 100644 --- a/src/datatypes.h +++ b/src/datatypes.h @@ -569,11 +569,13 @@ struct _virStream { virStreamDriverPtr driver; void *privateData; - /* Unfortunately, this can't go into virStreamDriver because - * when register function for skipCb is called, @driver - * is not populated yet. */ + /* Unfortunately, these can't go into virStreamDriver because + * when register function for skipCb or inDataFunc is called, + * @driver is not populated yet. */ virStreamSkipFunc skipCb; void *skipCbOpaque; + virStreamInDataFunc inDataCb; + void *inDataCbOpaque; }; /** diff --git a/src/libvirt-stream.c b/src/libvirt-stream.c index 58665f1..371efed 100644 --- a/src/libvirt-stream.c +++ b/src/libvirt-stream.c @@ -396,6 +396,94 @@ virStreamSkip(virStreamPtr stream, /** + * virStreamRegisterInData: + * @stream: stream + * @inDataCb: callback function + * @opaque: optional application provided data + * + * This function registers callback that will be called whenever + * virStreamInData needs to check whether @stream is currently in + * data or in a hole. This is to be used purely with sparse + * streams. + * + * Returns 0 on success, + * -1 otherwise. + */ +int +virStreamRegisterInData(virStreamPtr stream, + virStreamInDataFunc inDataCb, + void *opaque) +{ + VIR_DEBUG("stream=%p, inDataCb=%p opaque=%p", stream, inDataCb, opaque); + + virResetLastError(); + + virCheckStreamReturn(stream, -1); + virCheckNonNullArgReturn(inDataCb, -1); + + if (stream->inDataCb) { + virReportError(VIR_ERR_OPERATION_INVALID, "%s", + _("A inData callback is already registered")); + return -1; + } + + stream->inDataCb = inDataCb; + stream->inDataCbOpaque = opaque; + return 0; +} + + +/** + * virStreamInData: + * @stream: stream + * @data: are we in data or hole + * @offset: offset to next section + * + * This function will check underlying stream (typically a file) + * whether the current position the stream is in lies in a data + * section or in a hole. Upon return @data is set to a nonzero + * value if former is the case, or to zero if @stream is in a + * hole. Moreover, @offset it updated to tell caller how much + * bytes can be read from @stream until current section changes + * (from data to a hole or vice versa). + * + * As a special case, there's an implicit hole at EOF. In this + * situation this function should set @data = false, @offset = 0 + * and return 0. + * + * Returns 0 on success, + * -1 otherwise + */ +int +virStreamInData(virStreamPtr stream, + int *data, + unsigned long long *offset) +{ + VIR_DEBUG("stream=%p, data=%p, offset=%p", stream, data, offset); + + virResetLastError(); + + virCheckStreamReturn(stream, -1); + virCheckNonNullArgReturn(data, -1); + virCheckNonNullArgReturn(offset, -1); + + if (stream->inDataCb) { + int ret; + ret = (stream->inDataCb)(stream, data, offset, stream->inDataCbOpaque); + if (ret < 0) + goto error; + return ret; + } + + virReportUnsupportedError(); + + error: + virDispatchError(stream->conn); + return -1; +} + + +/** * virStreamSendAll: * @stream: pointer to the stream object * @handler: source callback for reading data from application diff --git a/src/libvirt_internal.h b/src/libvirt_internal.h index 7a75491..d20acbc 100644 --- a/src/libvirt_internal.h +++ b/src/libvirt_internal.h @@ -298,4 +298,7 @@ int virStreamSkipCallback(virStreamPtr stream, unsigned long long offset); +int virStreamInData(virStreamPtr stream, + int *data, + unsigned long long *offset); #endif diff --git a/src/libvirt_private.syms b/src/libvirt_private.syms index e83d5d6..e506d1c 100644 --- a/src/libvirt_private.syms +++ b/src/libvirt_private.syms @@ -1015,6 +1015,7 @@ virStateCleanup; virStateInitialize; virStateReload; virStateStop; +virStreamInData; virStreamSkipCallback; diff --git a/src/libvirt_public.syms b/src/libvirt_public.syms index 0b80d27..396fea5 100644 --- a/src/libvirt_public.syms +++ b/src/libvirt_public.syms @@ -736,6 +736,7 @@ LIBVIRT_1.3.5 { global: virStreamSkip; virStreamRegisterSkip; + virStreamRegisterInData; } LIBVIRT_1.3.3; # .... define new API here using predicted next version number .... -- 2.8.1

On 04/28/2016 04:04 AM, Michal Privoznik wrote:
These functions will be called to determine whether underlying
s/whether/whether the/
file that stream is transferring is currently in a data or hole.
s/stream/the stream/
While virStreamRegisterInData is exposed, virStreamInData does not need to be made a public API as it will be called just internally.
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> ---
+int +virStreamRegisterInData(virStreamPtr stream, + virStreamInDataFunc inDataCb, + void *opaque) +{ + VIR_DEBUG("stream=%p, inDataCb=%p opaque=%p", stream, inDataCb, opaque); + + virResetLastError(); + + virCheckStreamReturn(stream, -1); + virCheckNonNullArgReturn(inDataCb, -1); + + if (stream->inDataCb) { + virReportError(VIR_ERR_OPERATION_INVALID, "%s", + _("A inData callback is already registered"));
s/A inData/An inData/ (or maybe s/A //)
+/** + * virStreamInData: + * @stream: stream + * @data: are we in data or hole + * @offset: offset to next section + * + * This function will check underlying stream (typically a file)
s/check/check the/
+ * whether the current position the stream is in lies in a data + * section or in a hole.
Reads a bit awkwardly; maybe: This function checks the underlying stream (typically a file) to learn whether the current stream position lies within a data section or a hold.
Upon return @data is set to a nonzero + * value if former is the case, or to zero if @stream is in a + * hole. Moreover, @offset it updated to tell caller how much
s/it/is/ s/much/many/
+ * bytes can be read from @stream until current section changes + * (from data to a hole or vice versa). + * + * As a special case, there's an implicit hole at EOF. In this + * situation this function should set @data = false, @offset = 0 + * and return 0. + * + * Returns 0 on success, + * -1 otherwise + */ +int +virStreamInData(virStreamPtr stream, + int *data, + unsigned long long *offset)
I still wonder if 'length' is a better name than 'offset', but this one doesn't feel quite as awkward as the ones earlier in the series.
+++ b/src/libvirt_public.syms @@ -736,6 +736,7 @@ LIBVIRT_1.3.5 { global: virStreamSkip; virStreamRegisterSkip; + virStreamRegisterInData;
Worth sorting? Interface looks usable so far. -- Eric Blake eblake redhat com +1-919-301-3266 Libvirt virtualization library http://libvirt.org

This has no real added value right now, but is going to be very helpful later. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/remote/remote_driver.c | 6 ++++-- src/rpc/gendispatch.pl | 2 +- src/rpc/virnetclientstream.c | 6 +++++- src/rpc/virnetclientstream.h | 3 ++- 4 files changed, 12 insertions(+), 5 deletions(-) diff --git a/src/remote/remote_driver.c b/src/remote/remote_driver.c index 6bed2c5..b7fd869 100644 --- a/src/remote/remote_driver.c +++ b/src/remote/remote_driver.c @@ -5594,7 +5594,8 @@ remoteDomainMigratePrepareTunnel3(virConnectPtr dconn, memset(&args, 0, sizeof(args)); memset(&ret, 0, sizeof(ret)); - if (!(netst = virNetClientStreamNew(priv->remoteProgram, + if (!(netst = virNetClientStreamNew(st, + priv->remoteProgram, REMOTE_PROC_DOMAIN_MIGRATE_PREPARE_TUNNEL3, priv->counter))) goto done; @@ -6518,7 +6519,8 @@ remoteDomainMigratePrepareTunnel3Params(virConnectPtr dconn, goto cleanup; } - if (!(netst = virNetClientStreamNew(priv->remoteProgram, + if (!(netst = virNetClientStreamNew(st, + priv->remoteProgram, REMOTE_PROC_DOMAIN_MIGRATE_PREPARE_TUNNEL3_PARAMS, priv->counter))) goto cleanup; diff --git a/src/rpc/gendispatch.pl b/src/rpc/gendispatch.pl index 624049b..10fde25 100755 --- a/src/rpc/gendispatch.pl +++ b/src/rpc/gendispatch.pl @@ -1654,7 +1654,7 @@ elsif ($mode eq "client") { if ($call->{streamflag} ne "none") { print "\n"; - print " if (!(netst = virNetClientStreamNew(priv->remoteProgram, $call->{constname}, priv->counter)))\n"; + print " if (!(netst = virNetClientStreamNew(st, priv->remoteProgram, $call->{constname}, priv->counter)))\n"; print " goto done;\n"; print "\n"; print " if (virNetClientAddStream(priv->client, netst) < 0) {\n"; diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index 34989a9..8920395 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -36,6 +36,8 @@ VIR_LOG_INIT("rpc.netclientstream"); struct _virNetClientStream { virObjectLockable parent; + virStreamPtr stream; /* Reverse pointer to parent stream */ + virNetClientProgramPtr prog; int proc; unsigned serial; @@ -133,7 +135,8 @@ virNetClientStreamEventTimer(int timer ATTRIBUTE_UNUSED, void *opaque) } -virNetClientStreamPtr virNetClientStreamNew(virNetClientProgramPtr prog, +virNetClientStreamPtr virNetClientStreamNew(virStreamPtr stream, + virNetClientProgramPtr prog, int proc, unsigned serial) { @@ -145,6 +148,7 @@ virNetClientStreamPtr virNetClientStreamNew(virNetClientProgramPtr prog, if (!(st = virObjectLockableNew(virNetClientStreamClass))) return NULL; + st->stream = stream; st->prog = prog; st->proc = proc; st->serial = serial; diff --git a/src/rpc/virnetclientstream.h b/src/rpc/virnetclientstream.h index a0d2be9..e278dab 100644 --- a/src/rpc/virnetclientstream.h +++ b/src/rpc/virnetclientstream.h @@ -32,7 +32,8 @@ typedef virNetClientStream *virNetClientStreamPtr; typedef void (*virNetClientStreamEventCallback)(virNetClientStreamPtr stream, int events, void *opaque); -virNetClientStreamPtr virNetClientStreamNew(virNetClientProgramPtr prog, +virNetClientStreamPtr virNetClientStreamNew(virStreamPtr stream, + virNetClientProgramPtr prog, int proc, unsigned serial); -- 2.8.1

On 04/28/2016 04:04 AM, Michal Privoznik wrote:
This has no real added value right now, but is going to be very helpful later.
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/remote/remote_driver.c | 6 ++++-- src/rpc/gendispatch.pl | 2 +- src/rpc/virnetclientstream.c | 6 +++++- src/rpc/virnetclientstream.h | 3 ++- 4 files changed, 12 insertions(+), 5 deletions(-)
ACK -- Eric Blake eblake redhat com +1-919-301-3266 Libvirt virtualization library http://libvirt.org

Even though there's no way how to make stream seekable right now, it is going to be soon. We need to track this info so that we don't send virStreamSkip to a client that did not want it or vice versa. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- daemon/remote.c | 2 +- daemon/stream.c | 6 +++++- daemon/stream.h | 3 ++- src/remote/remote_driver.c | 6 ++++-- src/rpc/gendispatch.pl | 4 ++-- src/rpc/virnetclientstream.c | 6 +++++- src/rpc/virnetclientstream.h | 3 ++- 7 files changed, 21 insertions(+), 9 deletions(-) diff --git a/daemon/remote.c b/daemon/remote.c index fde029d..e2e08aa 100644 --- a/daemon/remote.c +++ b/daemon/remote.c @@ -4948,7 +4948,7 @@ remoteDispatchDomainMigratePrepareTunnel3Params(virNetServerPtr server ATTRIBUTE if (!(st = virStreamNew(priv->conn, VIR_STREAM_NONBLOCK)) || !(stream = daemonCreateClientStream(client, st, remoteProgram, - &msg->header))) + &msg->header, false))) goto cleanup; if (virDomainMigratePrepareTunnel3Params(priv->conn, st, params, nparams, diff --git a/daemon/stream.c b/daemon/stream.c index c892dcb..02729e4 100644 --- a/daemon/stream.c +++ b/daemon/stream.c @@ -52,6 +52,8 @@ struct daemonClientStream { virNetMessagePtr rx; bool tx; + bool seekable; + daemonClientStreamPtr next; }; @@ -319,7 +321,8 @@ daemonClientStream * daemonCreateClientStream(virNetServerClientPtr client, virStreamPtr st, virNetServerProgramPtr prog, - virNetMessageHeaderPtr header) + virNetMessageHeaderPtr header, + bool seekable) { daemonClientStream *stream; daemonClientPrivatePtr priv = virNetServerClientGetPrivateData(client); @@ -337,6 +340,7 @@ daemonCreateClientStream(virNetServerClientPtr client, stream->serial = header->serial; stream->filterID = -1; stream->st = st; + stream->seekable = seekable; return stream; } diff --git a/daemon/stream.h b/daemon/stream.h index cf76e71..bf5dc24 100644 --- a/daemon/stream.h +++ b/daemon/stream.h @@ -30,7 +30,8 @@ daemonClientStream * daemonCreateClientStream(virNetServerClientPtr client, virStreamPtr st, virNetServerProgramPtr prog, - virNetMessageHeaderPtr hdr); + virNetMessageHeaderPtr hdr, + bool seekable); int daemonFreeClientStream(virNetServerClientPtr client, daemonClientStream *stream); diff --git a/src/remote/remote_driver.c b/src/remote/remote_driver.c index b7fd869..a3c331e 100644 --- a/src/remote/remote_driver.c +++ b/src/remote/remote_driver.c @@ -5597,7 +5597,8 @@ remoteDomainMigratePrepareTunnel3(virConnectPtr dconn, if (!(netst = virNetClientStreamNew(st, priv->remoteProgram, REMOTE_PROC_DOMAIN_MIGRATE_PREPARE_TUNNEL3, - priv->counter))) + priv->counter, + false))) goto done; if (virNetClientAddStream(priv->client, netst) < 0) { @@ -6522,7 +6523,8 @@ remoteDomainMigratePrepareTunnel3Params(virConnectPtr dconn, if (!(netst = virNetClientStreamNew(st, priv->remoteProgram, REMOTE_PROC_DOMAIN_MIGRATE_PREPARE_TUNNEL3_PARAMS, - priv->counter))) + priv->counter, + false))) goto cleanup; if (virNetClientAddStream(priv->client, netst) < 0) { diff --git a/src/rpc/gendispatch.pl b/src/rpc/gendispatch.pl index 10fde25..f2fa467 100755 --- a/src/rpc/gendispatch.pl +++ b/src/rpc/gendispatch.pl @@ -976,7 +976,7 @@ elsif ($mode eq "server") { print " if (!(st = virStreamNew(priv->conn, VIR_STREAM_NONBLOCK)))\n"; print " goto cleanup;\n"; print "\n"; - print " if (!(stream = daemonCreateClientStream(client, st, remoteProgram, &msg->header)))\n"; + print " if (!(stream = daemonCreateClientStream(client, st, remoteProgram, &msg->header, false)))\n"; print " goto cleanup;\n"; print "\n"; } @@ -1654,7 +1654,7 @@ elsif ($mode eq "client") { if ($call->{streamflag} ne "none") { print "\n"; - print " if (!(netst = virNetClientStreamNew(st, priv->remoteProgram, $call->{constname}, priv->counter)))\n"; + print " if (!(netst = virNetClientStreamNew(st, priv->remoteProgram, $call->{constname}, priv->counter, false)))\n"; print " goto done;\n"; print "\n"; print " if (virNetClientAddStream(priv->client, netst) < 0) {\n"; diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index 8920395..06d91d1 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -54,6 +54,8 @@ struct _virNetClientStream { virNetMessagePtr rx; bool incomingEOF; + bool seekable; /* User requested seekable stream */ + virNetClientStreamEventCallback cb; void *cbOpaque; virFreeCallback cbFree; @@ -138,7 +140,8 @@ virNetClientStreamEventTimer(int timer ATTRIBUTE_UNUSED, void *opaque) virNetClientStreamPtr virNetClientStreamNew(virStreamPtr stream, virNetClientProgramPtr prog, int proc, - unsigned serial) + unsigned serial, + bool seekable) { virNetClientStreamPtr st; @@ -152,6 +155,7 @@ virNetClientStreamPtr virNetClientStreamNew(virStreamPtr stream, st->prog = prog; st->proc = proc; st->serial = serial; + st->seekable = seekable; virObjectRef(prog); diff --git a/src/rpc/virnetclientstream.h b/src/rpc/virnetclientstream.h index e278dab..0a5aafd 100644 --- a/src/rpc/virnetclientstream.h +++ b/src/rpc/virnetclientstream.h @@ -35,7 +35,8 @@ typedef void (*virNetClientStreamEventCallback)(virNetClientStreamPtr stream, virNetClientStreamPtr virNetClientStreamNew(virStreamPtr stream, virNetClientProgramPtr prog, int proc, - unsigned serial); + unsigned serial, + bool seekable); bool virNetClientStreamRaiseError(virNetClientStreamPtr st); -- 2.8.1

On 04/28/2016 04:04 AM, Michal Privoznik wrote:
Even though there's no way how to make stream seekable right now,
s/how //
it is going to be soon. We need to track this info so that we don't send virStreamSkip to a client that did not want it or vice versa.
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> ---
+++ b/daemon/stream.c @@ -52,6 +52,8 @@ struct daemonClientStream { virNetMessagePtr rx; bool tx;
+ bool seekable;
This is not random-access, but just fast-forward, correct? Maybe 'skippable' (or would that be 'skipable'?) is a better term than 'seekable', since we are only skipping forwards rather than seeking to arbitrary offsets, and since the API was named virStreamSkip rather than virStreamSeek. -- Eric Blake eblake redhat com +1-919-301-3266 Libvirt virtualization library http://libvirt.org

This is going to be RPC representation for virStreamSkip. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/rpc/virnetprotocol.x | 4 ++++ src/virnetprotocol-structs | 3 +++ 2 files changed, 7 insertions(+) diff --git a/src/rpc/virnetprotocol.x b/src/rpc/virnetprotocol.x index 327a334..f73e1aa 100644 --- a/src/rpc/virnetprotocol.x +++ b/src/rpc/virnetprotocol.x @@ -233,3 +233,7 @@ struct virNetMessageError { int int2; virNetMessageNetwork net; /* unused */ }; + +struct virNetStreamSkip { + unsigned hyper offset; +}; diff --git a/src/virnetprotocol-structs b/src/virnetprotocol-structs index af4526c..96b984e 100644 --- a/src/virnetprotocol-structs +++ b/src/virnetprotocol-structs @@ -42,3 +42,6 @@ struct virNetMessageError { int int2; virNetMessageNetwork net; }; +struct virNetStreamSkip { + uint64_t offset; +}; -- 2.8.1

This is a special type of stream packet, that is bidirectional and will contain information on how much bytes are both sides skipping in the stream. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- daemon/stream.c | 3 ++- src/rpc/virnetclient.c | 1 + src/rpc/virnetprotocol.x | 12 +++++++++++- src/virnetprotocol-structs | 1 + 4 files changed, 15 insertions(+), 2 deletions(-) diff --git a/daemon/stream.c b/daemon/stream.c index 02729e4..b7ccc85 100644 --- a/daemon/stream.c +++ b/daemon/stream.c @@ -285,7 +285,8 @@ daemonStreamFilter(virNetServerClientPtr client ATTRIBUTE_UNUSED, virMutexLock(&stream->priv->lock); - if (msg->header.type != VIR_NET_STREAM) + if (msg->header.type != VIR_NET_STREAM && + msg->header.type != VIR_NET_STREAM_SKIP) goto cleanup; if (!virNetServerProgramMatches(stream->prog, msg)) diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c index 781e74c..cb87b81 100644 --- a/src/rpc/virnetclient.c +++ b/src/rpc/virnetclient.c @@ -1147,6 +1147,7 @@ virNetClientCallDispatch(virNetClientPtr client) return virNetClientCallDispatchMessage(client); case VIR_NET_STREAM: /* Stream protocol */ + case VIR_NET_STREAM_SKIP: /* Stream seek protocol */ return virNetClientCallDispatchStream(client); default: diff --git a/src/rpc/virnetprotocol.x b/src/rpc/virnetprotocol.x index f73e1aa..b4b3f17 100644 --- a/src/rpc/virnetprotocol.x +++ b/src/rpc/virnetprotocol.x @@ -140,6 +140,14 @@ const VIR_NET_MESSAGE_NUM_FDS_MAX = 32; * * status == VIR_NET_ERROR * remote_error Error information * + * - type == VIR_NET_STREAM_SKIP + * * status == VIR_NET_CONTINUE + * byte[] skip data + * * status == VIR_NET_ERROR + * remote_error error information + * * status == VIR_NET_OK + * <empty> + * */ enum virNetMessageType { /* client -> server. args from a method call */ @@ -153,7 +161,9 @@ enum virNetMessageType { /* client -> server. args from a method call, with passed FDs */ VIR_NET_CALL_WITH_FDS = 4, /* server -> client. reply/error from a method call, with passed FDs */ - VIR_NET_REPLY_WITH_FDS = 5 + VIR_NET_REPLY_WITH_FDS = 5, + /* either direction, stream skip data packet */ + VIR_NET_STREAM_SKIP = 6 }; enum virNetMessageStatus { diff --git a/src/virnetprotocol-structs b/src/virnetprotocol-structs index 96b984e..3758a65 100644 --- a/src/virnetprotocol-structs +++ b/src/virnetprotocol-structs @@ -6,6 +6,7 @@ enum virNetMessageType { VIR_NET_STREAM = 3, VIR_NET_CALL_WITH_FDS = 4, VIR_NET_REPLY_WITH_FDS = 5, + VIR_NET_STREAM_SKIP = 6, }; enum virNetMessageStatus { VIR_NET_OK = 0, -- 2.8.1

Ideally, this would be generated, but to achieve that corresponding XDR definitions needed to go into a different .x file. But they belong just to the one that they are right now. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- tools/wireshark/src/packet-libvirt.c | 40 ++++++++++++++++++++++++++++++++++++ tools/wireshark/src/packet-libvirt.h | 2 ++ 2 files changed, 42 insertions(+) diff --git a/tools/wireshark/src/packet-libvirt.c b/tools/wireshark/src/packet-libvirt.c index aa1c323..fb44135 100644 --- a/tools/wireshark/src/packet-libvirt.c +++ b/tools/wireshark/src/packet-libvirt.c @@ -52,8 +52,11 @@ static int hf_libvirt_serial = -1; static int hf_libvirt_status = -1; static int hf_libvirt_stream = -1; static int hf_libvirt_num_of_fds = -1; +static int hf_libvirt_stream_skip_offset = -1; +static int hf_libvirt_stream_skip = -1; int hf_libvirt_unknown = -1; static gint ett_libvirt = -1; +static gint ett_libvirt_stream_skip = -1; #define XDR_PRIMITIVE_DISSECTOR(xtype, ctype, ftype) \ static gboolean \ @@ -328,6 +331,28 @@ dissect_libvirt_payload_xdr_data(tvbuff_t *tvb, proto_tree *tree, gint payload_l dissect_libvirt_fds(tvb, start + payload_length, nfds); } +static gboolean +dissect_xdr_stream_skip(tvbuff_t *tvb, proto_tree *tree, XDR *xdrs, int hf) +{ + goffset start; + proto_item *ti; + + ti = proto_tree_add_item(tree, hf_libvirt_stream_skip, tvb, start, -1, ENC_NA); + tree = proto_item_add_subtree(ti, ett_libvirt_stream_skip); + + hf = hf_libvirt_stream_skip_offset; + if (!dissect_xdr_u_hyper(tvb, tree, xdrs, hf)) return FALSE; + proto_item_set_len(ti, xdr_getpos(xdrs) - start); + return TRUE; +} + + +static void +dissect_libvirt_stream_skip(tvbuff_t *tvb, proto_tree *tree, gint payload_length, guint32 status) +{ + proto_tree_add_item(tree, hf_libvirt_stream_skip_offset, tvb, VIR_HEADER_LEN, -1, ENC_NA); +} + static void dissect_libvirt_payload(tvbuff_t *tvb, proto_tree *tree, guint32 prog, guint32 proc, guint32 type, guint32 status) @@ -348,6 +373,8 @@ dissect_libvirt_payload(tvbuff_t *tvb, proto_tree *tree, dissect_libvirt_payload_xdr_data(tvb, tree, payload_length, status, VIR_ERROR_MESSAGE_DISSECTOR); } else if (type == VIR_NET_STREAM) { /* implicitly, status == VIR_NET_CONTINUE */ dissect_libvirt_stream(tvb, tree, payload_length); + } else if (type == VIR_NET_STREAM_SKIP) { + dissect_libvirt_payload_xdr_data(tvb, tree, payload_length, status, dissect_xdr_stream_skip); } else { goto unknown; } @@ -517,6 +544,18 @@ proto_register_libvirt(void) NULL, 0x0, NULL, HFILL} }, + { &hf_libvirt_stream_skip, + { "stream_skip", "libvirt.stream_skip", + FT_BYTES, BASE_NONE, + NULL, 0x0, + NULL, HFILL} + }, + { &hf_libvirt_stream_skip_offset, + { "offset", "libvirt.stream_skip.offset", + FT_UINT64, BASE_DEC, + NULL, 0x0, + NULL, HFILL} + }, { &hf_libvirt_unknown, { "unknown", "libvirt.unknown", FT_BYTES, BASE_NONE, @@ -527,6 +566,7 @@ proto_register_libvirt(void) static gint *ett[] = { VIR_DYNAMIC_ETTSET + &ett_libvirt_stream_skip, &ett_libvirt }; diff --git a/tools/wireshark/src/packet-libvirt.h b/tools/wireshark/src/packet-libvirt.h index 5f99fdf..006aa6d 100644 --- a/tools/wireshark/src/packet-libvirt.h +++ b/tools/wireshark/src/packet-libvirt.h @@ -53,6 +53,7 @@ enum vir_net_message_type { VIR_NET_STREAM = 3, VIR_NET_CALL_WITH_FDS = 4, VIR_NET_REPLY_WITH_FDS = 5, + VIR_NET_STREAM_SKIP = 6, }; enum vir_net_message_status { @@ -76,6 +77,7 @@ static const value_string type_strings[] = { { VIR_NET_STREAM, "STREAM" }, { VIR_NET_CALL_WITH_FDS, "CALL_WITH_FDS" }, { VIR_NET_REPLY_WITH_FDS, "REPLY_WITH_FDS" }, + { VIR_NET_STREAM_SKIP, "STREAM_SKIP" }, { -1, NULL } }; -- 2.8.1

Basically, whenever the new type of stream packet arrives to the daemon call this function that decodes it and calls virStreamSkipCallback(). Otherwise a regular data stream packet has arrived and therefore continue its processing. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- daemon/stream.c | 68 +++++++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 57 insertions(+), 11 deletions(-) diff --git a/daemon/stream.c b/daemon/stream.c index b7ccc85..1a7d334 100644 --- a/daemon/stream.c +++ b/daemon/stream.c @@ -29,6 +29,7 @@ #include "virlog.h" #include "virnetserverclient.h" #include "virerror.h" +#include "libvirt_internal.h" #define VIR_FROM_THIS VIR_FROM_STREAMS @@ -635,6 +636,39 @@ daemonStreamHandleAbort(virNetServerClientPtr client, } +static int +daemonStreamHandleSkip(virNetServerClientPtr client, + daemonClientStream *stream, + virNetMessagePtr msg) +{ + int ret; + virNetStreamSkip data; + + VIR_DEBUG("client=%p, stream=%p, proc=%d, serial=%u", + client, stream, msg->header.proc, msg->header.serial); + + /* Let's check if client plays nicely and advertised usage of + * sparse stream upfront. */ + if (!stream->seekable) { + virReportError(VIR_ERR_RPC, "%s", + _("Unexpected stream seek")); + return -1; + } + + if (virNetMessageDecodePayload(msg, + (xdrproc_t) xdr_virNetStreamSkip, + &data) < 0) + return -1; + + /* We could have called virStreamSkip() here as well because + * we have access to underlying virStream structure so we can + * register handler there. But for clean code lets call + * virStreamSkipCallback(). */ + ret = virStreamSkipCallback(stream->st, data.offset); + + return ret; +} + /* * Called when the stream is signalled has being able to accept @@ -653,19 +687,31 @@ daemonStreamHandleWrite(virNetServerClientPtr client, virNetMessagePtr msg = stream->rx; int ret; - switch (msg->header.status) { - case VIR_NET_OK: - ret = daemonStreamHandleFinish(client, stream, msg); - break; + if (msg->header.type == VIR_NET_STREAM_SKIP) { + /* Handle special case when client sent us skip. + * Otherwise just carry on with processing stream + * data. */ + ret = daemonStreamHandleSkip(client, stream, msg); + } else if (msg->header.type == VIR_NET_STREAM) { + switch (msg->header.status) { + case VIR_NET_OK: + ret = daemonStreamHandleFinish(client, stream, msg); + break; - case VIR_NET_CONTINUE: - ret = daemonStreamHandleWriteData(client, stream, msg); - break; + case VIR_NET_CONTINUE: + ret = daemonStreamHandleWriteData(client, stream, msg); + break; - case VIR_NET_ERROR: - default: - ret = daemonStreamHandleAbort(client, stream, msg); - break; + case VIR_NET_ERROR: + default: + ret = daemonStreamHandleAbort(client, stream, msg); + break; + } + } else { + virReportError(VIR_ERR_RPC, + _("Unexpected message type: %d"), + msg->header.type); + ret = -1; } if (ret > 0) -- 2.8.1

This is just a helper function that takes in an offset value, encodes it into XDR and sends to client. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/libvirt_remote.syms | 1 + src/rpc/virnetserverprogram.c | 33 +++++++++++++++++++++++++++++++++ src/rpc/virnetserverprogram.h | 7 +++++++ 3 files changed, 41 insertions(+) diff --git a/src/libvirt_remote.syms b/src/libvirt_remote.syms index 66f9383..18d3f2d 100644 --- a/src/libvirt_remote.syms +++ b/src/libvirt_remote.syms @@ -168,6 +168,7 @@ virNetServerProgramNew; virNetServerProgramSendReplyError; virNetServerProgramSendStreamData; virNetServerProgramSendStreamError; +virNetServerProgramSendStreamSkip; virNetServerProgramUnknownError; diff --git a/src/rpc/virnetserverprogram.c b/src/rpc/virnetserverprogram.c index d1597f4..fbdb179 100644 --- a/src/rpc/virnetserverprogram.c +++ b/src/rpc/virnetserverprogram.c @@ -548,6 +548,39 @@ int virNetServerProgramSendStreamData(virNetServerProgramPtr prog, } +int virNetServerProgramSendStreamSkip(virNetServerProgramPtr prog, + virNetServerClientPtr client, + virNetMessagePtr msg, + int procedure, + unsigned int serial, + unsigned long long offset) +{ + virNetStreamSkip data; + + VIR_DEBUG("client=%p msg=%p offset=%llu", client, msg, offset); + + memset(&data, 0, sizeof(data)); + data.offset = offset; + + msg->header.prog = prog->program; + msg->header.vers = prog->version; + msg->header.proc = procedure; + msg->header.type = VIR_NET_STREAM_SKIP; + msg->header.serial = serial; + msg->header.status = VIR_NET_CONTINUE; + + if (virNetMessageEncodeHeader(msg) < 0) + return -1; + + if (virNetMessageEncodePayload(msg, + (xdrproc_t) xdr_virNetStreamSkip, + &data) < 0) + return -1; + + return virNetServerClientSendMessage(client, msg); +} + + void virNetServerProgramDispose(void *obj ATTRIBUTE_UNUSED) { } diff --git a/src/rpc/virnetserverprogram.h b/src/rpc/virnetserverprogram.h index 531fca0..545aa3d 100644 --- a/src/rpc/virnetserverprogram.h +++ b/src/rpc/virnetserverprogram.h @@ -104,4 +104,11 @@ int virNetServerProgramSendStreamData(virNetServerProgramPtr prog, const char *data, size_t len); +int virNetServerProgramSendStreamSkip(virNetServerProgramPtr prog, + virNetServerClientPtr client, + virNetMessagePtr msg, + int procedure, + unsigned int serial, + unsigned long long offset); + #endif /* __VIR_NET_SERVER_PROGRAM_H__ */ -- 2.8.1

While the previous commit implemented a helper for sending a STREAM_SKIP packet for daemon, this is a client's counterpart. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/libvirt_remote.syms | 1 + src/rpc/virnetclientstream.c | 52 ++++++++++++++++++++++++++++++++++++++++++++ src/rpc/virnetclientstream.h | 4 ++++ 3 files changed, 57 insertions(+) diff --git a/src/libvirt_remote.syms b/src/libvirt_remote.syms index 18d3f2d..e823588 100644 --- a/src/libvirt_remote.syms +++ b/src/libvirt_remote.syms @@ -54,6 +54,7 @@ virNetClientStreamQueuePacket; virNetClientStreamRaiseError; virNetClientStreamRecvPacket; virNetClientStreamSendPacket; +virNetClientStreamSendSkip; virNetClientStreamSetError; diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index 06d91d1..bfd76d7 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -431,6 +431,58 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, } +int +virNetClientStreamSendSkip(virNetClientStreamPtr st, + virNetClientPtr client, + unsigned long long offset) +{ + virNetMessagePtr msg = NULL; + virNetStreamSkip data; + int ret = -1; + + VIR_DEBUG("st=%p offset=%llu", st, offset); + + if (!st->seekable) { + virReportError(VIR_ERR_OPERATION_INVALID, "%s", + _("Skipping is not supported with this stream")); + return -1; + } + + memset(&data, 0, sizeof(data)); + data.offset = offset; + + if (!(msg = virNetMessageNew(false))) + return -1; + + virObjectLock(st); + + msg->header.prog = virNetClientProgramGetProgram(st->prog); + msg->header.vers = virNetClientProgramGetVersion(st->prog); + msg->header.status = VIR_NET_CONTINUE; + msg->header.type = VIR_NET_STREAM_SKIP; + msg->header.serial = st->serial; + msg->header.proc = st->proc; + + virObjectUnlock(st); + + if (virNetMessageEncodeHeader(msg) < 0) + goto cleanup; + + if (virNetMessageEncodePayload(msg, + (xdrproc_t) xdr_virNetStreamSkip, + &data) < 0) + goto cleanup; + + if (virNetClientSendNoReply(client, msg) < 0) + goto cleanup; + + ret = 0; + cleanup: + virNetMessageFree(msg); + return ret; +} + + int virNetClientStreamEventAddCallback(virNetClientStreamPtr st, int events, virNetClientStreamEventCallback cb, diff --git a/src/rpc/virnetclientstream.h b/src/rpc/virnetclientstream.h index 0a5aafd..58fbc89 100644 --- a/src/rpc/virnetclientstream.h +++ b/src/rpc/virnetclientstream.h @@ -61,6 +61,10 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, size_t nbytes, bool nonblock); +int virNetClientStreamSendSkip(virNetClientStreamPtr st, + virNetClientPtr client, + unsigned long long offset); + int virNetClientStreamEventAddCallback(virNetClientStreamPtr st, int events, virNetClientStreamEventCallback cb, -- 2.8.1

This is a function that handles an incoming STREAM_SKIP packet. Even though it is not wired up yet, it will be soon. At the beginning do couple of checks whether server plays nicely and sent us a STREAM_SKIP packed only after we've enabled sparse streams. Then decodes the message payload to see how big the hole is. And finally call the virStreamSkipCallback(). Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/rpc/virnetclientstream.c | 63 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 63 insertions(+) diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index bfd76d7..e4dee9b 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -28,6 +28,7 @@ #include "virerror.h" #include "virlog.h" #include "virthread.h" +#include "libvirt_internal.h" #define VIR_FROM_THIS VIR_FROM_RPC @@ -358,6 +359,68 @@ int virNetClientStreamSendPacket(virNetClientStreamPtr st, return -1; } + +static int ATTRIBUTE_UNUSED +virNetClientStreamHandleSkip(virNetClientPtr client, + virNetClientStreamPtr st) +{ + virNetMessagePtr msg; + virNetStreamSkip data; + int rv, ret = -1; + + VIR_DEBUG("client=%p st=%p", client, st); + + msg = st->rx; + memset(&data, 0, sizeof(data)); + + /* We should not be called unless there's VIR_NET_STREAM_SKIP + * message at the head of the list. But doesn't hurt to check */ + if (!msg || + msg->header.type != VIR_NET_STREAM_SKIP) { + virReportError(VIR_ERR_INTERNAL_ERROR, + _("Invalid message prog=%d type=%d serial=%u proc=%d"), + msg->header.prog, + msg->header.type, + msg->header.serial, + msg->header.proc); + goto cleanup; + } + + /* Server should not send us VIR_NET_STREAM_SKIP unless we + * have requested so. But does not hurt to check ... */ + if (!st->seekable) { + virReportError(VIR_ERR_RPC, "%s", + _("Unexpected stream seek")); + goto cleanup; + } + + if (virNetMessageDecodePayload(msg, + (xdrproc_t) xdr_virNetStreamSkip, + &data) < 0) { + virReportError(VIR_ERR_INTERNAL_ERROR, "%s", + _("Malformed stream seek packet")); + goto cleanup; + } + + virNetMessageQueueServe(&st->rx); + virNetMessageFree(msg); + + virObjectUnlock(st); + rv = virStreamSkipCallback(st->stream, data.offset); + virObjectLock(st); + + if (rv < 0) + goto cleanup; + + ret = 0; + cleanup: + if (ret < 0) { + /* Abort stream? */ + } + return ret; +} + + int virNetClientStreamRecvPacket(virNetClientStreamPtr st, virNetClientPtr client, char *data, -- 2.8.1

Now that we have RPC wrappers over VIR_NET_STREAM_SKIP we can start wiring them up. This commit wires up situation when a client wants to send a hole to daemon. To keep stream offsets synchronous, upon successful call on the daemon skip the same hole in local part of the stream. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/remote/remote_driver.c | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/src/remote/remote_driver.c b/src/remote/remote_driver.c index a3c331e..9bb0840 100644 --- a/src/remote/remote_driver.c +++ b/src/remote/remote_driver.c @@ -5108,6 +5108,34 @@ remoteStreamRecv(virStreamPtr st, return rv; } + +static int +remoteStreamSkip(virStreamPtr st, + unsigned long long offset) +{ + VIR_DEBUG("st=%p offset=%llu", st, offset); + struct private_data *priv = st->conn->privateData; + virNetClientStreamPtr privst = st->privateData; + int rv; + + if (virNetClientStreamRaiseError(privst)) + return -1; + + remoteDriverLock(priv); + priv->localUses++; + remoteDriverUnlock(priv); + + rv = virNetClientStreamSendSkip(privst, + priv->client, + offset); + + remoteDriverLock(priv); + priv->localUses--; + remoteDriverUnlock(priv); + return rv; +} + + struct remoteStreamCallbackData { virStreamPtr st; virStreamEventCallback cb; @@ -5281,6 +5309,7 @@ remoteStreamAbort(virStreamPtr st) static virStreamDriver remoteStreamDrv = { .streamRecv = remoteStreamRecv, .streamSend = remoteStreamSend, + .streamSkip = remoteStreamSkip, .streamFinish = remoteStreamFinish, .streamAbort = remoteStreamAbort, .streamEventAddCallback = remoteStreamEventAddCallback, -- 2.8.1

Whenever client is able to receive some data from stream daemonStreamHandleRead is called. But now the behaviour of this function needs to be changed a bit. Previously it just read data from underlying file (of chardev or whatever) and sent those through the stream to client. This model will not work any longer because it does not differentiate whether underlying file is in data or hole section. Therefore, at the beginning of this function add code that checks this situation and acts accordingly. So after the this, when wanting to send some data we always check whether we are not in a hole and if so, skip it an inform client about its size. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- daemon/stream.c | 51 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/daemon/stream.c b/daemon/stream.c index 1a7d334..5bf6f23 100644 --- a/daemon/stream.c +++ b/daemon/stream.c @@ -766,6 +766,8 @@ daemonStreamHandleRead(virNetServerClientPtr client, size_t bufferLen = VIR_NET_MESSAGE_LEGACY_PAYLOAD_MAX; int ret = -1; int rv; + int inData = 0; + unsigned long long offset; VIR_DEBUG("client=%p, stream=%p tx=%d closed=%d", client, stream, stream->tx, stream->closed); @@ -790,6 +792,54 @@ daemonStreamHandleRead(virNetServerClientPtr client, if (!(msg = virNetMessageNew(false))) goto cleanup; + if (stream->seekable) { + /* Handle skip. We want to send some data to the client. But we might + * be in a hole. Seek to next data. But if we are in data already, just + * carry on. */ + + rv = virStreamInData(stream->st, &inData, &offset); + VIR_DEBUG("rv=%d inData=%d offset=%llu", rv, inData, offset); + + if (rv < 0) { + if (virNetServerProgramSendStreamError(remoteProgram, + client, + msg, + &rerr, + stream->procedure, + stream->serial) < 0) + goto cleanup; + msg = NULL; + + /* We're done with this call */ + goto done; + } else { + if (!inData && offset) { + stream->tx = false; + msg->cb = daemonStreamMessageFinished; + msg->opaque = stream; + stream->refs++; + if (virNetServerProgramSendStreamSkip(remoteProgram, + client, + msg, + stream->procedure, + stream->serial, + offset) < 0) + goto cleanup; + + msg = NULL; + + /* We have successfully sent stream skip to the other side. + * To keep streams in sync seek locally too. */ + virStreamSkipCallback(stream->st, offset); + /* We're done with this call */ + goto done; + } + } + + if (offset < bufferLen) + bufferLen = offset; + } + rv = virStreamRecv(stream->st, buffer, bufferLen); if (rv == -2) { /* Should never get this, since we're only called when we know @@ -821,6 +871,7 @@ daemonStreamHandleRead(virNetServerClientPtr client, msg = NULL; } + done: ret = 0; cleanup: VIR_FREE(buffer); -- 2.8.1

Whenever server sends a client stream packet (either regular with actual data or stream skip one) it is queued on @st->rx. So the list is a mixture of both types of stream packets. So now that we have all the helpers needed we can wire their processing up. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/rpc/virnetclientstream.c | 37 +++++++++++++++++++++++++++++++++++-- 1 file changed, 35 insertions(+), 2 deletions(-) diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index e4dee9b..65a18c9 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -297,6 +297,8 @@ int virNetClientStreamQueuePacket(virNetClientStreamPtr st, virObjectLock(st); + /* Don't distinguish VIR_NET_STREAM and VIR_NET_STREAM_SKIP + * here just yet. We want in order processing! */ virNetMessageQueuePush(&st->rx, tmp_msg); virNetClientStreamEventTimerUpdate(st); @@ -360,7 +362,7 @@ int virNetClientStreamSendPacket(virNetClientStreamPtr st, } -static int ATTRIBUTE_UNUSED +static int virNetClientStreamHandleSkip(virNetClientPtr client, virNetClientStreamPtr st) { @@ -433,6 +435,8 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, VIR_DEBUG("st=%p client=%p data=%p nbytes=%zu nonblock=%d", st, client, data, nbytes, nonblock); virObjectLock(st); + + reread: if (!st->rx && !st->incomingEOF) { virNetMessagePtr msg; int ret; @@ -464,8 +468,37 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, } VIR_DEBUG("After IO rx=%p", st->rx); + + while (st->rx && + st->rx->header.type == VIR_NET_STREAM_SKIP) { + /* Handle skip sent to us by server. Moreover, if client + * lacks event loop, this is only chance for us to + * process the skip. Therefore we should: + * a) process it, + * b) carry on with regular read from stream (if possible + * of course). + */ + + if (virNetClientStreamHandleSkip(client, st) < 0) + goto cleanup; + } + + if (!st->rx && !st->incomingEOF) { + if (nonblock) { + VIR_DEBUG("Non-blocking mode and no data available"); + rv = -2; + goto cleanup; + } + + /* We have consumed all packets from incoming queue but those + * were only skip packets, no data. Read the stream again. */ + goto reread; + } + want = nbytes; - while (want && st->rx) { + while (want && + st->rx && + st->rx->header.type == VIR_NET_STREAM) { virNetMessagePtr msg = st->rx; size_t len = want; -- 2.8.1

Plenty of clients use virStreamSendAll() instead of using virStreamSend() directly. We should allow those clients to utilize the new feature too. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/libvirt-stream.c | 37 +++++++++++++++++++++++++++++++------ 1 file changed, 31 insertions(+), 6 deletions(-) diff --git a/src/libvirt-stream.c b/src/libvirt-stream.c index 371efed..989c0b2 100644 --- a/src/libvirt-stream.c +++ b/src/libvirt-stream.c @@ -528,9 +528,10 @@ virStreamSendAll(virStreamPtr stream, virStreamSourceFunc handler, void *opaque) { - char *bytes = NULL; - size_t want = VIR_NET_MESSAGE_LEGACY_PAYLOAD_MAX; + char *buf = NULL; + size_t bufSize = VIR_NET_MESSAGE_LEGACY_PAYLOAD_MAX; int ret = -1; + unsigned long long dataLen = 0; VIR_DEBUG("stream=%p, handler=%p, opaque=%p", stream, handler, opaque); virResetLastError(); @@ -544,12 +545,35 @@ virStreamSendAll(virStreamPtr stream, goto cleanup; } - if (VIR_ALLOC_N(bytes, want) < 0) + if (VIR_ALLOC_N(buf, bufSize) < 0) goto cleanup; for (;;) { + size_t want = bufSize; int got, offset = 0; - got = (handler)(stream, bytes, want, opaque); + + if (stream->inDataCb && !dataLen) { + int inData = 0; + int rv = virStreamInData(stream, &inData, &dataLen); + + if (rv < 0) { + virStreamAbort(stream); + goto cleanup; + } + + if (!inData && dataLen) { + if (virStreamSkip(stream, dataLen) < 0 || + virStreamSkipCallback(stream, dataLen) < 0) { + virStreamAbort(stream); + goto cleanup; + } + } + } + + if (dataLen && want > dataLen) + want = dataLen; + + got = (handler)(stream, buf, want, opaque); if (got < 0) { virStreamAbort(stream); goto cleanup; @@ -558,16 +582,17 @@ virStreamSendAll(virStreamPtr stream, break; while (offset < got) { int done; - done = virStreamSend(stream, bytes + offset, got - offset); + done = virStreamSend(stream, buf + offset, got - offset); if (done < 0) goto cleanup; offset += done; + dataLen -= done; } } ret = 0; cleanup: - VIR_FREE(bytes); + VIR_FREE(buf); if (ret != 0) virDispatchError(stream->conn); -- 2.8.1

Implement virStreamSkip and virStreamInData callbacks. These callbacks do no magic, just skip a hole or detect whether we are in a data section of a file or in a hole and how much bytes can we read until section changes. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/fdstream.c | 102 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 102 insertions(+) diff --git a/src/fdstream.c b/src/fdstream.c index a6a0fbe..38342a7 100644 --- a/src/fdstream.c +++ b/src/fdstream.c @@ -460,6 +460,106 @@ static int virFDStreamRead(virStreamPtr st, char *bytes, size_t nbytes) } +static int +virFDStreamSkip(virStreamPtr st, + unsigned long long offset, + void *opaque ATTRIBUTE_UNUSED) +{ + struct virFDStreamData *fdst = st->privateData; + off_t off; + + virMutexLock(&fdst->lock); + off = lseek(fdst->fd, offset, SEEK_CUR); + if (off == (off_t) -1) { + virMutexUnlock(&fdst->lock); + return -1; + } + if (fdst->length) + fdst->offset += offset; + virMutexUnlock(&fdst->lock); + return 0; +} + + +static int +virFDStreamInData(virStreamPtr st, + int *inData, + unsigned long long *offset, + void *opaque ATTRIBUTE_UNUSED) +{ + struct virFDStreamData *fdst = st->privateData; + int fd; + off_t cur, data, hole; + int ret = -1; + + virMutexLock(&fdst->lock); + + fd = fdst->fd; + + /* Get current position */ + cur = lseek(fd, 0, SEEK_CUR); + if (cur == (off_t) -1) { + virReportSystemError(errno, "%s", + _("Unable to get current position in stream")); + goto cleanup; + } + + /* Now try to get data and hole offsets */ + data = lseek(fd, cur, SEEK_DATA); + + /* There are four options: + * 1) data == cur; @cur is in data + * 2) data > cur; @cur is in a hole, next data at @data + * 3) data < 0, errno = ENXIO; either @cur is in trailing hole, or @cur is beyond EOF. + * 4) data < 0, errno != ENXIO; we learned nothing + */ + + if (data == (off_t) -1) { + /* cases 3 and 4 */ + if (errno != ENXIO) { + virReportSystemError(errno, "%s", + _("Unable to seek to data")); + goto cleanup; + } + *inData = 0; + *offset = 0; + } else if (data > cur) { + /* case 2 */ + *inData = 0; + *offset = data - cur; + } else { + /* case 1 */ + *inData = 1; + + /* We don't know where does the next hole start. Let's + * find out. Here we get the same 4 possibilities as + * described above.*/ + hole = lseek(fd, data, SEEK_HOLE); + if (hole == (off_t) -1 || hole == data) { + /* cases 1, 3 and 4 */ + /* Wait a second. The reason why we are here is + * because we are in data. But at the same time we + * are in a trailing hole? Wut!? Do the best what we + * can do here. */ + virReportSystemError(errno, "%s", + _("unable to seek to hole")); + goto cleanup; + } else { + /* case 2 */ + *offset = (hole - data); + } + } + + ret = 0; + cleanup: + /* At any rate, reposition back to where we started. */ + if (cur != (off_t) -1) + ignore_value(lseek(fd, cur, SEEK_SET)); + virMutexUnlock(&fdst->lock); + return ret; +} + + static virStreamDriver virFDStreamDrv = { .streamSend = virFDStreamWrite, .streamRecv = virFDStreamRead, @@ -503,6 +603,8 @@ static int virFDStreamOpenInternal(virStreamPtr st, st->driver = &virFDStreamDrv; st->privateData = fdst; + st->skipCb = virFDStreamSkip; + st->inDataCb = virFDStreamInData; return 0; } -- 2.8.1

Now, not all APIs are going to support sparse streams. To some it makes no sense at all, e.g. virDomainOpenConsole() or virDomainOpenChannel(). To others, we will need a special flag to indicate that client wants to enable sparse streams. Instead of having to write RPC dispatchers by hand we can just annotate in our .x files that a certain flag to certain RPC call enables this feature. For instance: /** * @generate: both * @readstream: 1 * @sparseflag: VIR_SPARSE_STREAM * @acl: storage_vol:data_read */ REMOTE_PROC_DOMAIN_SOME_API = XXX, Therefore, whenever client calls virDomainSomeAPI(.., VIR_SPARSE_STREAM); daemon will mark that down and send stream skips when possible. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/rpc/gendispatch.pl | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/src/rpc/gendispatch.pl b/src/rpc/gendispatch.pl index f2fa467..2abe186 100755 --- a/src/rpc/gendispatch.pl +++ b/src/rpc/gendispatch.pl @@ -281,6 +281,13 @@ while (<PROTOCOL>) { $calls{$name}->{streamflag} = "none"; } + if (exists $opts{sparseflag}) { + die "\@sparseflag requires stream" unless $calls{$name}->{streamflag} ne "none"; + $calls{$name}->{sparseflag} = $opts{sparseflag}; + } else { + $calls{$name}->{sparseflag} = "none"; + } + $calls{$name}->{acl} = $opts{acl}; $calls{$name}->{aclfilter} = $opts{aclfilter}; @@ -934,6 +941,11 @@ elsif ($mode eq "server") { if ($call->{streamflag} ne "none") { print " virStreamPtr st = NULL;\n"; print " daemonClientStreamPtr stream = NULL;\n"; + if ($call->{sparseflag} ne "none") { + print " const bool sparse = args->flags & $call->{sparseflag};\n" + } else { + print " const bool sparse = false;\n"; + } } print "\n"; @@ -976,7 +988,7 @@ elsif ($mode eq "server") { print " if (!(st = virStreamNew(priv->conn, VIR_STREAM_NONBLOCK)))\n"; print " goto cleanup;\n"; print "\n"; - print " if (!(stream = daemonCreateClientStream(client, st, remoteProgram, &msg->header, false)))\n"; + print " if (!(stream = daemonCreateClientStream(client, st, remoteProgram, &msg->header, sparse)))\n"; print " goto cleanup;\n"; print "\n"; } @@ -1643,6 +1655,11 @@ elsif ($mode eq "client") { if ($call->{streamflag} ne "none") { print " virNetClientStreamPtr netst = NULL;\n"; + if ($call->{sparseflag} ne "none") { + print " const bool sparse = flags & $call->{sparseflag};\n" + } else { + print " const bool sparse = false;\n"; + } } print "\n"; @@ -1654,7 +1671,7 @@ elsif ($mode eq "client") { if ($call->{streamflag} ne "none") { print "\n"; - print " if (!(netst = virNetClientStreamNew(st, priv->remoteProgram, $call->{constname}, priv->counter, false)))\n"; + print " if (!(netst = virNetClientStreamNew(st, priv->remoteProgram, $call->{constname}, priv->counter, sparse)))\n"; print " goto done;\n"; print "\n"; print " if (virNetClientAddStream(priv->client, netst) < 0) {\n"; -- 2.8.1

These flags to APIs will tell if caller wants to use sparse stream for storage transfer. At the same time, it's safe to enable them in storage driver frontend and rely on our backends checking the flags. This way we can enable specific flags only on some specific backends, e.g. enable VIR_STORAGE_VOL_DOWNLOAD_SPARSE_STREAM for filesystem backend but not iSCSI backend. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- include/libvirt/libvirt-storage.h | 9 +++++++++ src/libvirt-storage.c | 4 ++-- src/remote/remote_protocol.x | 2 ++ src/storage/storage_driver.c | 4 ++-- 4 files changed, 15 insertions(+), 4 deletions(-) diff --git a/include/libvirt/libvirt-storage.h b/include/libvirt/libvirt-storage.h index db6f2b4..e98e8a5 100644 --- a/include/libvirt/libvirt-storage.h +++ b/include/libvirt/libvirt-storage.h @@ -337,11 +337,20 @@ virStorageVolPtr virStorageVolCreateXMLFrom (virStoragePoolPtr pool, const char *xmldesc, virStorageVolPtr clonevol, unsigned int flags); + +typedef enum { + VIR_STORAGE_VOL_DOWNLOAD_SPARSE_STREAM = 1 << 0, /* Use sparse stream */ +} virStorageVolDownloadFlags; + int virStorageVolDownload (virStorageVolPtr vol, virStreamPtr stream, unsigned long long offset, unsigned long long length, unsigned int flags); +typedef enum { + VIR_STORAGE_VOL_UPLOAD_SPARSE_STREAM = 1 << 0, /* Use sparse stream */ +} virStorageVolUploadFlags; + int virStorageVolUpload (virStorageVolPtr vol, virStreamPtr stream, unsigned long long offset, diff --git a/src/libvirt-storage.c b/src/libvirt-storage.c index 1ce6745..cd8ce16 100644 --- a/src/libvirt-storage.c +++ b/src/libvirt-storage.c @@ -1549,7 +1549,7 @@ virStorageVolCreateXMLFrom(virStoragePoolPtr pool, * @stream: stream to use as output * @offset: position in @vol to start reading from * @length: limit on amount of data to download - * @flags: extra flags; not used yet, so callers should always pass 0 + * @flags: bitwise-OR of virStorageVolDownloadFlags * * Download the content of the volume as a stream. If @length * is zero, then the remaining contents of the volume after @@ -1613,7 +1613,7 @@ virStorageVolDownload(virStorageVolPtr vol, * @stream: stream to use as input * @offset: position to start writing to * @length: limit on amount of data to upload - * @flags: extra flags; not used yet, so callers should always pass 0 + * @flags: bitwise-OR of virStorageVolUploadFlags * * Upload new content to the volume from a stream. This call * will fail if @offset + @length exceeds the size of the diff --git a/src/remote/remote_protocol.x b/src/remote/remote_protocol.x index bab8ef2..35f56d4 100644 --- a/src/remote/remote_protocol.x +++ b/src/remote/remote_protocol.x @@ -4753,6 +4753,7 @@ enum remote_procedure { /** * @generate: both * @writestream: 1 + * @sparseflag: VIR_STORAGE_VOL_UPLOAD_SPARSE_STREAM * @acl: storage_vol:data_write */ REMOTE_PROC_STORAGE_VOL_UPLOAD = 208, @@ -4760,6 +4761,7 @@ enum remote_procedure { /** * @generate: both * @readstream: 1 + * @sparseflag: VIR_STORAGE_VOL_DOWNLOAD_SPARSE_STREAM * @acl: storage_vol:data_read */ REMOTE_PROC_STORAGE_VOL_DOWNLOAD = 209, diff --git a/src/storage/storage_driver.c b/src/storage/storage_driver.c index fcc0991..c6146cc 100644 --- a/src/storage/storage_driver.c +++ b/src/storage/storage_driver.c @@ -2160,7 +2160,7 @@ storageVolDownload(virStorageVolPtr obj, virStorageVolDefPtr vol = NULL; int ret = -1; - virCheckFlags(0, -1); + virCheckFlags(VIR_STORAGE_VOL_DOWNLOAD_SPARSE_STREAM, -1); if (!(vol = virStorageVolDefFromVol(obj, &pool, &backend))) return -1; @@ -2322,7 +2322,7 @@ storageVolUpload(virStorageVolPtr obj, virStorageVolStreamInfoPtr cbdata = NULL; int ret = -1; - virCheckFlags(0, -1); + virCheckFlags(VIR_STORAGE_VOL_UPLOAD_SPARSE_STREAM, -1); if (!(vol = virStorageVolDefFromVol(obj, &pool, &backend))) return -1; -- 2.8.1

The command grew new --sparse switch that does nothing more than enables the sparse streams feature for this command. Among with the switch new helper function is introduced: virshStreamSkip(). This is the callback that is called whenever daemon sends us a hole. In the callback we reflect the hole in underlying file by seeking as many bytes as told. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- tools/virsh-volume.c | 15 ++++++++++++++- tools/virsh.c | 8 ++++++++ tools/virsh.h | 3 +++ tools/virsh.pod | 3 ++- 4 files changed, 27 insertions(+), 2 deletions(-) diff --git a/tools/virsh-volume.c b/tools/virsh-volume.c index 9cc8e52..018e0f3 100644 --- a/tools/virsh-volume.c +++ b/tools/virsh-volume.c @@ -762,6 +762,10 @@ static const vshCmdOptDef opts_vol_download[] = { .type = VSH_OT_INT, .help = N_("amount of data to download") }, + {.name = "sparse", + .type = VSH_OT_BOOL, + .help = N_("preserve sparseness of volume") + }, {.name = NULL} }; @@ -777,6 +781,7 @@ cmdVolDownload(vshControl *ctl, const vshCmd *cmd) unsigned long long offset = 0, length = 0; bool created = false; virshControlPtr priv = ctl->privData; + unsigned int flags = 0; if (vshCommandOptULongLong(ctl, cmd, "offset", &offset) < 0) return false; @@ -790,6 +795,9 @@ cmdVolDownload(vshControl *ctl, const vshCmd *cmd) if (vshCommandOptStringReq(ctl, cmd, "file", &file) < 0) goto cleanup; + if (vshCommandOptBool(cmd, "sparse")) + flags |= VIR_STORAGE_VOL_DOWNLOAD_SPARSE_STREAM; + if ((fd = open(file, O_WRONLY|O_CREAT|O_EXCL, 0666)) < 0) { if (errno != EEXIST || (fd = open(file, O_WRONLY|O_TRUNC, 0666)) < 0) { @@ -805,7 +813,12 @@ cmdVolDownload(vshControl *ctl, const vshCmd *cmd) goto cleanup; } - if (virStorageVolDownload(vol, st, offset, length, 0) < 0) { + if (virStreamRegisterSkip(st, virshStreamSkip, &fd) < 0) { + vshError(ctl, _("cannot register stream skip handling function")); + goto cleanup; + } + + if (virStorageVolDownload(vol, st, offset, length, flags) < 0) { vshError(ctl, _("cannot download from volume %s"), name); goto cleanup; } diff --git a/tools/virsh.c b/tools/virsh.c index af07251..af4f9d1 100644 --- a/tools/virsh.c +++ b/tools/virsh.c @@ -257,6 +257,14 @@ int virshStreamSink(virStreamPtr st ATTRIBUTE_UNUSED, return safewrite(*fd, bytes, nbytes); } +int virshStreamSkip(virStreamPtr st ATTRIBUTE_UNUSED, + unsigned long long offset, void *opaque) +{ + int *fd = opaque; + + return lseek(*fd, offset, SEEK_CUR) == (off_t) -1 ? -1 : 0; +} + /* --------------- * Command Connect * --------------- diff --git a/tools/virsh.h b/tools/virsh.h index fd552bb..5c382ef 100644 --- a/tools/virsh.h +++ b/tools/virsh.h @@ -150,4 +150,7 @@ int virshDomainState(vshControl *ctl, virDomainPtr dom, int *reason); int virshStreamSink(virStreamPtr st, const char *bytes, size_t nbytes, void *opaque); +int virshStreamSkip(virStreamPtr st, + unsigned long long offset, void *opaque); + #endif /* VIRSH_H */ diff --git a/tools/virsh.pod b/tools/virsh.pod index 90ab47d..64f0f5f 100644 --- a/tools/virsh.pod +++ b/tools/virsh.pod @@ -3575,12 +3575,13 @@ regarding possible target volume and pool changes as a result of the pool refresh when the upload is attempted. =item B<vol-download> [I<--pool> I<pool-or-uuid>] [I<--offset> I<bytes>] -[I<--length> I<bytes>] I<vol-name-or-key-or-path> I<local-file> +[I<--length> I<bytes>] [I<--sparse>] I<vol-name-or-key-or-path> I<local-file> Download the contents of a storage volume to I<local-file>. I<--pool> I<pool-or-uuid> is the name or UUID of the storage pool the volume is in. I<vol-name-or-key-or-path> is the name or key or path of the volume to download. +If I<--sparse> is specified, this command will preserve volume sparseness. I<--offset> is the position in the storage volume at which to start reading the data. The value must be 0 or larger. I<--length> is an upper bound of the amount of data to be downloaded. A negative value is interpreted as -- 2.8.1

Similarly to previous commit, implement sparse streams feature for vol-upload. This is, however, slightly different approach, because we must implement a function that will tell us whether we are in a data section or in a hole. But there's no magic hidden in here. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- tools/virsh-volume.c | 25 +++++++++++++++++- tools/virsh.c | 71 ++++++++++++++++++++++++++++++++++++++++++++++++++++ tools/virsh.h | 9 +++++++ tools/virsh.pod | 3 ++- 4 files changed, 106 insertions(+), 2 deletions(-) diff --git a/tools/virsh-volume.c b/tools/virsh-volume.c index 018e0f3..9a6a695 100644 --- a/tools/virsh-volume.c +++ b/tools/virsh-volume.c @@ -659,6 +659,10 @@ static const vshCmdOptDef opts_vol_upload[] = { .type = VSH_OT_INT, .help = N_("amount of data to upload") }, + {.name = "sparse", + .type = VSH_OT_BOOL, + .help = N_("preserve sparseness of volume") + }, {.name = NULL} }; @@ -682,6 +686,8 @@ cmdVolUpload(vshControl *ctl, const vshCmd *cmd) const char *name = NULL; unsigned long long offset = 0, length = 0; virshControlPtr priv = ctl->privData; + unsigned int flags = 0; + struct _virshStreamInData cbData; if (vshCommandOptULongLong(ctl, cmd, "offset", &offset) < 0) return false; @@ -705,7 +711,24 @@ cmdVolUpload(vshControl *ctl, const vshCmd *cmd) goto cleanup; } - if (virStorageVolUpload(vol, st, offset, length, 0) < 0) { + if (vshCommandOptBool(cmd, "sparse")) { + flags |= VIR_STORAGE_VOL_UPLOAD_SPARSE_STREAM; + + cbData.ctl = ctl; + cbData.fd = fd; + + if (virStreamRegisterInData(st, virshStreamInData, &cbData) < 0) { + vshError(ctl, _("cannot register stream inData callback")); + goto cleanup; + } + + if (virStreamRegisterSkip(st, virshStreamSkip, &fd) < 0) { + vshError(ctl, _("cannot register stream skip handling function")); + goto cleanup; + } + } + + if (virStorageVolUpload(vol, st, offset, length, flags) < 0) { vshError(ctl, _("cannot upload to volume %s"), name); goto cleanup; } diff --git a/tools/virsh.c b/tools/virsh.c index af4f9d1..b68b8a3 100644 --- a/tools/virsh.c +++ b/tools/virsh.c @@ -265,6 +265,77 @@ int virshStreamSkip(virStreamPtr st ATTRIBUTE_UNUSED, return lseek(*fd, offset, SEEK_CUR) == (off_t) -1 ? -1 : 0; } +int virshStreamInData(virStreamPtr st ATTRIBUTE_UNUSED, + int *inData, + unsigned long long *offset, + void *opaque) +{ + struct _virshStreamInData *cbData = opaque; + vshControl *ctl = cbData->ctl; + int fd = cbData->fd; + off_t cur, data, hole; + int ret = -1; + + /* Get current position */ + cur = lseek(fd, 0, SEEK_CUR); + if (cur == (off_t) -1) { + vshError(ctl, "%s", _("Unable to get current position in stream")); + goto cleanup; + } + + /* Now try to get data and hole offsets */ + data = lseek(fd, cur, SEEK_DATA); + + /* There are four options: + * 1) data == cur; @cur is in data + * 2) data > cur; @cur is in a hole, next data at @data + * 3) data < 0, errno = ENXIO; either @cur is in trailing hole, or @cur is beyond EOF. + * 4) data < 0, errno != ENXIO; we learned nothing + */ + + if (data == (off_t) -1) { + /* cases 3 and 4 */ + if (errno != ENXIO) { + vshError(ctl, "%s", _("Unable to seek to data")); + goto cleanup; + } + *inData = 0; + *offset = 0; + } else if (data > cur) { + /* case 2 */ + *inData = 0; + *offset = data - cur; + } else { + /* case 1 */ + *inData = 1; + + /* We don't know where does the next hole start. Let's + * find out. Here we get the same 4 possibilities as + * described above.*/ + hole = lseek(fd, data, SEEK_HOLE); + if (hole == (off_t) -1 || hole == data) { + /* cases 1, 3 and 4 */ + /* Wait a second. The reason why we are here is + * because we are in data. But at the same time we + * are in a trailing hole? Wut!? Do the best what we + * can do here. */ + vshError(ctl, "%s", _("unable to seek to hole")); + goto cleanup; + } else { + /* case 2 */ + *offset = (hole - data); + } + } + + ret = 0; + cleanup: + /* At any rate, reposition back to where we started. */ + if (cur != (off_t) -1) + ignore_value(lseek(fd, cur, SEEK_SET)); + return ret; +} + + /* --------------- * Command Connect * --------------- diff --git a/tools/virsh.h b/tools/virsh.h index 5c382ef..cd70699 100644 --- a/tools/virsh.h +++ b/tools/virsh.h @@ -153,4 +153,13 @@ int virshStreamSink(virStreamPtr st, const char *bytes, size_t nbytes, int virshStreamSkip(virStreamPtr st, unsigned long long offset, void *opaque); +struct _virshStreamInData { + vshControl *ctl; + int fd; +}; + +int virshStreamInData(virStreamPtr st, + int *data, + unsigned long long *offset, + void *opaque); #endif /* VIRSH_H */ diff --git a/tools/virsh.pod b/tools/virsh.pod index 64f0f5f..56eda28 100644 --- a/tools/virsh.pod +++ b/tools/virsh.pod @@ -3557,13 +3557,14 @@ the storage volume should be deleted as well. Not all storage drivers support this option, presently only rbd. =item B<vol-upload> [I<--pool> I<pool-or-uuid>] [I<--offset> I<bytes>] -[I<--length> I<bytes>] I<vol-name-or-key-or-path> I<local-file> +[I<--length> I<bytes>] [I<--sparse>] I<vol-name-or-key-or-path> I<local-file> Upload the contents of I<local-file> to a storage volume. I<--pool> I<pool-or-uuid> is the name or UUID of the storage pool the volume is in. I<vol-name-or-key-or-path> is the name or key or path of the volume where the file will be uploaded. +If I<--sparse> is specified, this command will preserve volume sparseness. I<--offset> is the position in the storage volume at which to start writing the data. The value must be 0 or larger. I<--length> is an upper bound of the amount of data to be uploaded. A negative value is interpreted -- 2.8.1

This is kind of a hacky approach to the following problem, but so far I am unable to come up with anything better. On some occasions (esp. when dealing with regular files) libvirt_iohelper is spawned to prefetch data for us. We will then have a pipe then for reading the data from it. This does not fit in our sparse stream implementation as one simply doesn't lseek() over a pipe. Until this is resolved, let's suppress use of the IO helper and read data from FD directly. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/fdstream.c | 58 +++++++++++++++++++++++++++---------------- src/fdstream.h | 3 ++- src/storage/storage_backend.c | 6 +++-- 3 files changed, 43 insertions(+), 24 deletions(-) diff --git a/src/fdstream.c b/src/fdstream.c index 38342a7..41e9e06 100644 --- a/src/fdstream.c +++ b/src/fdstream.c @@ -78,6 +78,11 @@ struct virFDStreamData { virMutex lock; }; +enum { + VIR_FDSTREAM_OPEN_FORCE_IOHELPER = 1 << 0, + VIR_FDSTREAM_OPEN_SPARSE = 1 << 1, +}; + static int virFDStreamRemoveCallback(virStreamPtr stream) { @@ -574,7 +579,8 @@ static int virFDStreamOpenInternal(virStreamPtr st, int fd, virCommandPtr cmd, int errfd, - unsigned long long length) + unsigned long long length, + unsigned int flags) { struct virFDStreamData *fdst; @@ -603,8 +609,10 @@ static int virFDStreamOpenInternal(virStreamPtr st, st->driver = &virFDStreamDrv; st->privateData = fdst; - st->skipCb = virFDStreamSkip; - st->inDataCb = virFDStreamInData; + if (flags & VIR_FDSTREAM_OPEN_SPARSE) { + st->skipCb = virFDStreamSkip; + st->inDataCb = virFDStreamInData; + } return 0; } @@ -613,7 +621,7 @@ static int virFDStreamOpenInternal(virStreamPtr st, int virFDStreamOpen(virStreamPtr st, int fd) { - return virFDStreamOpenInternal(st, fd, NULL, -1, 0); + return virFDStreamOpenInternal(st, fd, NULL, -1, 0, 0); } @@ -659,7 +667,7 @@ int virFDStreamConnectUNIX(virStreamPtr st, goto error; } - if (virFDStreamOpenInternal(st, fd, NULL, -1, 0) < 0) + if (virFDStreamOpenInternal(st, fd, NULL, -1, 0, 0) < 0) goto error; return 0; @@ -685,7 +693,7 @@ virFDStreamOpenFileInternal(virStreamPtr st, unsigned long long length, int oflags, int mode, - bool forceIOHelper) + unsigned int flags) { int fd = -1; int childfd = -1; @@ -694,8 +702,8 @@ virFDStreamOpenFileInternal(virStreamPtr st, int errfd = -1; char *iohelper_path = NULL; - VIR_DEBUG("st=%p path=%s oflags=%x offset=%llu length=%llu mode=%o", - st, path, oflags, offset, length, mode); + VIR_DEBUG("st=%p path=%s oflags=%x offset=%llu length=%llu mode=%o flags=%x", + st, path, oflags, offset, length, mode, flags); oflags |= O_NOCTTY | O_BINARY; @@ -729,10 +737,15 @@ virFDStreamOpenFileInternal(virStreamPtr st, * non-blocking I/O on block devs/regular files. To * support those we need to fork a helper process to do * the I/O so we just have a fifo. Or use AIO :-( + * Moreover, when opening a file for a sparse stream, make + * sure we end up with something seekable which a FIFO is + * not. */ - if ((st->flags & VIR_STREAM_NONBLOCK) && - ((!S_ISCHR(sb.st_mode) && - !S_ISFIFO(sb.st_mode)) || forceIOHelper)) { + if (!(flags & VIR_FDSTREAM_OPEN_SPARSE) && + ((st->flags & VIR_STREAM_NONBLOCK) && + ((!S_ISCHR(sb.st_mode) && + !S_ISFIFO(sb.st_mode)) || + flags & VIR_FDSTREAM_OPEN_FORCE_IOHELPER))) { int fds[2] = { -1, -1 }; if ((oflags & O_ACCMODE) == O_RDWR) { @@ -781,7 +794,7 @@ virFDStreamOpenFileInternal(virStreamPtr st, VIR_FORCE_CLOSE(childfd); } - if (virFDStreamOpenInternal(st, fd, cmd, errfd, length) < 0) + if (virFDStreamOpenInternal(st, fd, cmd, errfd, length, flags) < 0) goto error; return 0; @@ -811,7 +824,7 @@ int virFDStreamOpenFile(virStreamPtr st, } return virFDStreamOpenFileInternal(st, path, offset, length, - oflags, 0, false); + oflags, 0, 0); } int virFDStreamCreateFile(virStreamPtr st, @@ -823,8 +836,7 @@ int virFDStreamCreateFile(virStreamPtr st, { return virFDStreamOpenFileInternal(st, path, offset, length, - oflags | O_CREAT, mode, - false); + oflags | O_CREAT, mode, 0); } #ifdef HAVE_CFMAKERAW @@ -839,8 +851,7 @@ int virFDStreamOpenPTY(virStreamPtr st, if (virFDStreamOpenFileInternal(st, path, offset, length, - oflags | O_CREAT, 0, - false) < 0) + oflags | O_CREAT, 0, 0) < 0) return -1; fdst = st->privateData; @@ -876,8 +887,7 @@ int virFDStreamOpenPTY(virStreamPtr st, { return virFDStreamOpenFileInternal(st, path, offset, length, - oflags | O_CREAT, 0, - false); + oflags | O_CREAT, 0, 0); } #endif /* !HAVE_CFMAKERAW */ @@ -885,11 +895,17 @@ int virFDStreamOpenBlockDevice(virStreamPtr st, const char *path, unsigned long long offset, unsigned long long length, - int oflags) + int oflags, + bool sparse) { + unsigned int flags = VIR_FDSTREAM_OPEN_FORCE_IOHELPER; + + if (sparse) + flags |= VIR_FDSTREAM_OPEN_SPARSE; + return virFDStreamOpenFileInternal(st, path, offset, length, - oflags, 0, true); + oflags, 0, flags); } int virFDStreamSetInternalCloseCb(virStreamPtr st, diff --git a/src/fdstream.h b/src/fdstream.h index 2c913ea..bfdebc2 100644 --- a/src/fdstream.h +++ b/src/fdstream.h @@ -60,7 +60,8 @@ int virFDStreamOpenBlockDevice(virStreamPtr st, const char *path, unsigned long long offset, unsigned long long length, - int oflags); + int oflags, + bool sparse); int virFDStreamSetInternalCloseCb(virStreamPtr st, virFDStreamInternalCloseCb cb, diff --git a/src/storage/storage_backend.c b/src/storage/storage_backend.c index 3a23cd7..f92d074 100644 --- a/src/storage/storage_backend.c +++ b/src/storage/storage_backend.c @@ -2030,7 +2030,8 @@ virStorageBackendVolUploadLocal(virConnectPtr conn ATTRIBUTE_UNUSED, /* Not using O_CREAT because the file is required to already exist at * this point */ ret = virFDStreamOpenBlockDevice(stream, target_path, - offset, len, O_WRONLY); + offset, len, O_WRONLY, + flags & VIR_STORAGE_VOL_UPLOAD_SPARSE_STREAM); cleanup: VIR_FREE(path); @@ -2068,7 +2069,8 @@ virStorageBackendVolDownloadLocal(virConnectPtr conn ATTRIBUTE_UNUSED, } ret = virFDStreamOpenBlockDevice(stream, target_path, - offset, len, O_RDONLY); + offset, len, O_RDONLY, + flags & VIR_STORAGE_VOL_DOWNLOAD_SPARSE_STREAM); cleanup: VIR_FREE(path); -- 2.8.1

On Thu, Apr 28, 2016 at 12:05:12PM +0200, Michal Privoznik wrote:
This is kind of a hacky approach to the following problem, but so far I am unable to come up with anything better. On some occasions (esp. when dealing with regular files) libvirt_iohelper is spawned to prefetch data for us. We will then have a pipe then for reading the data from it. This does not fit in our sparse stream implementation as one simply doesn't lseek() over a pipe. Until this is resolved, let's suppress use of the IO helper and read data from FD directly.
This doesn't really fly - the problem is that with regular files, poll() on the FD will always return ready, even if the read or write will block in I/O. So by nomt using the iohelper this is going to cause our main loop to block on I/O for streams. Regards, Daniel -- |: http://berrange.com -o- http://www.flickr.com/photos/dberrange/ :| |: http://libvirt.org -o- http://virt-manager.org :| |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :| |: http://entangle-photo.org -o- http://live.gnome.org/gtk-vnc :|

On 05/05/2016 09:32 AM, Daniel P. Berrange wrote:
On Thu, Apr 28, 2016 at 12:05:12PM +0200, Michal Privoznik wrote:
This is kind of a hacky approach to the following problem, but so far I am unable to come up with anything better. On some occasions (esp. when dealing with regular files) libvirt_iohelper is spawned to prefetch data for us. We will then have a pipe then for reading the data from it. This does not fit in our sparse stream implementation as one simply doesn't lseek() over a pipe. Until this is resolved, let's suppress use of the IO helper and read data from FD directly.
This doesn't really fly - the problem is that with regular files, poll() on the FD will always return ready, even if the read or write will block in I/O. So by nomt using the iohelper this is going to cause our main loop to block on I/O for streams.
The only real solution is to teach libvirt_iohelper to do structured reads when requested. That is, you'll have to add a command-line flag to libvirt_iohelper, which if present, says all of the output from libvirt_iohelper will be structured as tuples of either <type=data,length,bytes> or of <type=hole,length>. When used in this mode, the client HAS to parse the tuples, rather than assuming that the pipe can be read literally. So that means we also have to teach the consumer of libvirt_iohelper how to read tuples off the pipe, at which point it then knows whether to send a regular VIR_NET_STREAM or the compact VIR_NET_STREAM_SKIP. -- Eric Blake eblake redhat com +1-919-301-3266 Libvirt virtualization library http://libvirt.org

On Thu, May 05, 2016 at 09:51:22AM -0600, Eric Blake wrote:
On 05/05/2016 09:32 AM, Daniel P. Berrange wrote:
On Thu, Apr 28, 2016 at 12:05:12PM +0200, Michal Privoznik wrote:
This is kind of a hacky approach to the following problem, but so far I am unable to come up with anything better. On some occasions (esp. when dealing with regular files) libvirt_iohelper is spawned to prefetch data for us. We will then have a pipe then for reading the data from it. This does not fit in our sparse stream implementation as one simply doesn't lseek() over a pipe. Until this is resolved, let's suppress use of the IO helper and read data from FD directly.
This doesn't really fly - the problem is that with regular files, poll() on the FD will always return ready, even if the read or write will block in I/O. So by nomt using the iohelper this is going to cause our main loop to block on I/O for streams.
The only real solution is to teach libvirt_iohelper to do structured reads when requested. That is, you'll have to add a command-line flag to libvirt_iohelper, which if present, says all of the output from libvirt_iohelper will be structured as tuples of either <type=data,length,bytes> or of <type=hole,length>. When used in this mode, the client HAS to parse the tuples, rather than assuming that the pipe can be read literally. So that means we also have to teach the consumer of libvirt_iohelper how to read tuples off the pipe, at which point it then knows whether to send a regular VIR_NET_STREAM or the compact VIR_NET_STREAM_SKIP.
Yeah, that doesn't sound too bad - its rather similar to the HTTP chunked encoding idea. It isn't much extra overhead to have a type + len field in the byte stream, as long as we put a sensible min size on the holes we transmit. eg don't send a hole that's less than 512 bytes in len. Regards, Daniel -- |: http://berrange.com -o- http://www.flickr.com/photos/dberrange/ :| |: http://libvirt.org -o- http://virt-manager.org :| |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :| |: http://entangle-photo.org -o- http://live.gnome.org/gtk-vnc :|

On 05.05.2016 18:12, Daniel P. Berrange wrote:
On Thu, May 05, 2016 at 09:51:22AM -0600, Eric Blake wrote:
On 05/05/2016 09:32 AM, Daniel P. Berrange wrote:
On Thu, Apr 28, 2016 at 12:05:12PM +0200, Michal Privoznik wrote:
This is kind of a hacky approach to the following problem, but so far I am unable to come up with anything better. On some occasions (esp. when dealing with regular files) libvirt_iohelper is spawned to prefetch data for us. We will then have a pipe then for reading the data from it. This does not fit in our sparse stream implementation as one simply doesn't lseek() over a pipe. Until this is resolved, let's suppress use of the IO helper and read data from FD directly.
This doesn't really fly - the problem is that with regular files, poll() on the FD will always return ready, even if the read or write will block in I/O. So by nomt using the iohelper this is going to cause our main loop to block on I/O for streams.
The only real solution is to teach libvirt_iohelper to do structured reads when requested. That is, you'll have to add a command-line flag to libvirt_iohelper, which if present, says all of the output from libvirt_iohelper will be structured as tuples of either <type=data,length,bytes> or of <type=hole,length>. When used in this mode, the client HAS to parse the tuples, rather than assuming that the pipe can be read literally. So that means we also have to teach the consumer of libvirt_iohelper how to read tuples off the pipe, at which point it then knows whether to send a regular VIR_NET_STREAM or the compact VIR_NET_STREAM_SKIP.
Yeah, that doesn't sound too bad - its rather similar to the HTTP chunked encoding idea. It isn't much extra overhead to have a type + len field in the byte stream, as long as we put a sensible min size on the holes we transmit. eg don't send a hole that's less than 512 bytes in len.
That wouldn't be even possible on filesystem level. The smallest hole there can be is block size. Michal

On 05.05.2016 17:51, Eric Blake wrote:
On 05/05/2016 09:32 AM, Daniel P. Berrange wrote:
On Thu, Apr 28, 2016 at 12:05:12PM +0200, Michal Privoznik wrote:
This is kind of a hacky approach to the following problem, but so far I am unable to come up with anything better. On some occasions (esp. when dealing with regular files) libvirt_iohelper is spawned to prefetch data for us. We will then have a pipe then for reading the data from it. This does not fit in our sparse stream implementation as one simply doesn't lseek() over a pipe. Until this is resolved, let's suppress use of the IO helper and read data from FD directly.
This doesn't really fly - the problem is that with regular files, poll() on the FD will always return ready, even if the read or write will block in I/O. So by nomt using the iohelper this is going to cause our main loop to block on I/O for streams.
The only real solution is to teach libvirt_iohelper to do structured reads when requested. That is, you'll have to add a command-line flag to libvirt_iohelper, which if present, says all of the output from libvirt_iohelper will be structured as tuples of either <type=data,length,bytes> or of <type=hole,length>. When used in this mode, the client HAS to parse the tuples, rather than assuming that the pipe can be read literally. So that means we also have to teach the consumer of libvirt_iohelper how to read tuples off the pipe, at which point it then knows whether to send a regular VIR_NET_STREAM or the compact VIR_NET_STREAM_SKIP.
I know. I had this approach in my mind. But before spending any time on it, I wanted to make sure my design of sparse streams is good. Moreover, this patch set is long enough jut now. My plan was to implement this approach as soon as this patch set it merged so that we can enable the sparse streams. Michal

While virStreamInData() should be a small and quick function, in our implementation it seeks multiple times. Moreover, it is called even if we know that we are in data. Well, quite. If we track its return values and do some basic math with them, we can end up calling virStreamInData right at the boundaries of a data section or a hole and nowhere else. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- daemon/stream.c | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/daemon/stream.c b/daemon/stream.c index 5bf6f23..179c5eb 100644 --- a/daemon/stream.c +++ b/daemon/stream.c @@ -54,6 +54,7 @@ struct daemonClientStream { bool tx; bool seekable; + size_t dataLen; /* How much data is there remaining until we see a hole */ daemonClientStreamPtr next; }; @@ -792,7 +793,7 @@ daemonStreamHandleRead(virNetServerClientPtr client, if (!(msg = virNetMessageNew(false))) goto cleanup; - if (stream->seekable) { + if (stream->seekable && !stream->dataLen) { /* Handle skip. We want to send some data to the client. But we might * be in a hole. Seek to next data. But if we are in data already, just * carry on. */ @@ -836,10 +837,13 @@ daemonStreamHandleRead(virNetServerClientPtr client, } } - if (offset < bufferLen) - bufferLen = offset; + stream->dataLen = offset; } + if (stream->seekable && + bufferLen > stream->dataLen) + bufferLen = stream->dataLen; + rv = virStreamRecv(stream->st, buffer, bufferLen); if (rv == -2) { /* Should never get this, since we're only called when we know @@ -854,6 +858,8 @@ daemonStreamHandleRead(virNetServerClientPtr client, goto cleanup; msg = NULL; } else { + stream->dataLen -= rv; + stream->tx = false; if (rv == 0) stream->recvEOF = true; -- 2.8.1

Now that we have everything prepared, let's enable the feature for these two APIs. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/storage/storage_backend.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/storage/storage_backend.c b/src/storage/storage_backend.c index f92d074..ec066ca 100644 --- a/src/storage/storage_backend.c +++ b/src/storage/storage_backend.c @@ -2005,7 +2005,8 @@ virStorageBackendVolUploadLocal(virConnectPtr conn ATTRIBUTE_UNUSED, int ret = -1; int has_snap = 0; - virCheckFlags(0, -1); + virCheckFlags(VIR_STORAGE_VOL_UPLOAD_SPARSE_STREAM, -1); + /* if volume has target format VIR_STORAGE_FILE_PLOOP * we need to restore DiskDescriptor.xml, according to * new contents of volume. This operation will be perfomed @@ -2052,7 +2053,8 @@ virStorageBackendVolDownloadLocal(virConnectPtr conn ATTRIBUTE_UNUSED, int ret = -1; int has_snap = 0; - virCheckFlags(0, -1); + virCheckFlags(VIR_STORAGE_VOL_DOWNLOAD_SPARSE_STREAM, -1); + if (vol->target.format == VIR_STORAGE_FILE_PLOOP) { has_snap = virStorageBackendPloopHasSnapshots(vol->target.path); if (has_snap < 0) { -- 2.8.1

On 28.04.2016 12:04, Michal Privoznik wrote:
Notes? Oh, patches 01-03 have been ACKed already, but are not pushed yet because of the freeze. But since this feature build on the top of them, I'm sending them too.
Just a notice that I've pushed those patches, so while trying to apply this patch set, just drop them and start from 04.
Also the whole patch set is accessible at my github:
Michal

On 04/28/2016 06:04 AM, Michal Privoznik wrote:
So, after couple of sleepless nights and headaches I'm proud to announce that finally got this working.
What? Our regular streams that are can be used to transfer disk images for domains are unaware of any sparseness. Therefore they have two limitations:
a) transferring big but sparse image can take ages as all the holes (interpreted by kernel as '\0') have to go through our event loop. b) resulting volume is not sparse even if the source was.
How? I went by verified approach that linux kernel has. One way to look at our streams is just like read() and write() with a different names: virStreamRecv() and virStreamSend(). They even have the same prototype (if 'int fd' is substituted with 'virStreamPtr'). Now, holes in files are created and detected via third API: lseek(). Therefore I'm introducing new virStreamSkip() API that mimics the missing primitive. Now, because our streams do not necessarily have to work over files (they are for generic data transfer), I had to let users register a callback that is called whenever the other side calls virStreamSkip().
So now that we have all three primitives, we can focus on making life easier for our users. Nobody is actually using bare virStreamSend() and virStreamRecv() rather than our wrappers: virStreamSendAll() and virStreamRecvAll(). With my approach described above just virStreamSendAll() needs to be adjusted so that it's 'sparse file' aware. The virStreamRecvAll() will only get the data to write (just like it is now) with skip callback called automatically whenever needed. In order for virStreamSendAll() to skip holes I'm introducing yet another callback: virStreamInDataFunc(). This callback will help us to create a map of a file: before each virStreamSend() it checks whether we are in a data section or a hole and calls virStreamSend() or virStreamSkip() respectively.
Do not worry - it will all become clear once you see the code.
Now question is - how will users enable this feature? I mean, we have take into account that we might be talking to an older daemon that does not know how to skip a hole. Or vice versa - older client. The solution I came up with is to introduce flags to APIs where sparse streams make sense. I guess it makes sense for volume upload and download, but almost certainly makes no sense for virDomainOpenConsole().
From users POV they just need to pass correct argument to 'vol-upload' or 'vol-download' virsh commands. One layer down, on
Code? programming level they need to merely:
st = virStreamNew(conn, 0); virStreamRegisterSkip(st, skipFunc, &fd); virStorageVolDownload(st, ...); virStreamRecvAll(st, sinkFunc, &fd);
where:
int skipFunc(virStreamPtr st, unsigned long long offset, void *opaque) { int *fd = opaque; return lseek(*fd, offset, SEEK_CUR) == (off_t) -1 ? -1 : 0; }
And for uploading it's slightly more verbose - see patch 24.
Limitations? While testing this on my machine with XFS, I've noticed that the resulting map of a transferred file is not exactly the same as the source's. Checksums are the same though. After digging deeper I found this commit in the kernel:
http://git.kernel.org/cgit/linux/kernel/git/torvalds/linux.git/commit/?id=05...
Thing is, as we transfer the file, we are practically just seeking at EOF and thus creating holes. But if the hole size is small enough, XFS will use some speculative file allocation algorithm and eventually fully allocate the blocks even if we intended to create a hole. This does not occur when punching a hole into a file though. Well, I guess XFS devels have some reasons to do that.
This behaviour has not been observed on EXT4.
Notes? Oh, patches 01-03 have been ACKed already, but are not pushed yet because of the freeze. But since this feature build on the top of them, I'm sending them too.
Also the whole patch set is accessible at my github:
https://github.com/zippy2/libvirt/tree/sparse_streams4
Michal Privoznik (27): Revert "rpc: Fix slow volume download (virsh vol-download)" virnetclientstream: Process stream messages later virStream{Recv,Send}All: Increase client buffer Introduce virStreamSkip Introduce virStreamRegisterSkip and virStreamSkipCallback Introduce virStreamInData and virStreamRegisterInData virNetClientStreamNew: Track origin stream Track if stream is seekable RPC: Introduce virNetStreamSkip Introduce VIR_NET_STREAM_SKIP message type Teach wireshark plugin about VIR_NET_STREAM_SKIP daemon: Implement VIR_NET_STREAM_SKIP handling daemon: Introduce virNetServerProgramSendStreamSkip virnetclientstream: Introduce virNetClientStreamSendSkip virnetclientstream: Introduce virNetClientStreamHandleSkip remote_driver: Implement virStreamSkip daemonStreamHandleRead: Wire up seekable stream virNetClientStream: Wire up VIR_NET_STREAM_SKIP virStreamSendAll: Wire up sparse streams fdstream: Implement seek gendispatch: Introduce @sparseflag for our calls Introduce virStorageVol{Download,Upload}Flags virsh: Implement sparse stream to vol-download virsh: Implement sparse stream to vol-upload fdstream: Suppress use of IO helper for sparse streams daemon: Don't call virStreamInData so often storage: Enable sparse streams for virStorageVol{Download,Upload}
daemon/remote.c | 2 +- daemon/stream.c | 134 +++++++++++++-- daemon/stream.h | 3 +- include/libvirt/libvirt-storage.h | 9 + include/libvirt/libvirt-stream.h | 47 ++++++ src/datatypes.h | 8 + src/driver-stream.h | 5 + src/fdstream.c | 156 +++++++++++++++--- src/fdstream.h | 3 +- src/libvirt-storage.c | 4 +- src/libvirt-stream.c | 238 ++++++++++++++++++++++++++- src/libvirt_internal.h | 7 + src/libvirt_private.syms | 2 + src/libvirt_public.syms | 7 + src/libvirt_remote.syms | 2 + src/remote/remote_driver.c | 41 ++++- src/remote/remote_protocol.x | 2 + src/rpc/gendispatch.pl | 21 ++- src/rpc/virnetclient.c | 1 + src/rpc/virnetclientstream.c | 308 ++++++++++++++++++++++++----------- src/rpc/virnetclientstream.h | 10 +- src/rpc/virnetprotocol.x | 16 +- src/rpc/virnetserverprogram.c | 33 ++++ src/rpc/virnetserverprogram.h | 7 + src/storage/storage_backend.c | 12 +- src/storage/storage_driver.c | 4 +- src/virnetprotocol-structs | 4 + tools/virsh-volume.c | 40 ++++- tools/virsh.c | 79 +++++++++ tools/virsh.h | 12 ++ tools/virsh.pod | 6 +- tools/wireshark/src/packet-libvirt.c | 40 +++++ tools/wireshark/src/packet-libvirt.h | 2 + 33 files changed, 1104 insertions(+), 161 deletions(-)
Read the code mostly for educational purposes, but also ran through Coverity... First the coverity notes: Patch 11: dissect_xdr_stream_skip() - "start" is not initialized Patch 15: virNetClientStreamHandleSkip() - if (!msg || ...) check then dereferences msg in the virReportError. So it seems !msg needs its own check and message. Couple of questions/thoughts about skipping in general. Is it possible to skip past some sort of EOF? IOW: What if skip 'offset' is larger than perceived maximum size of the file? I didn't dig line to line, but just a general thought along the lines of the either not so bright or even worse malicious code that now has a means to fill up a disk with a mostly empty file. In Patch 23, your commit message indicates enabling skips for specific backends - perhaps batter to note "local" vs "remote" type operations? For iSCSI, it's possible to configure a disk/lun such that the target file is found in the /dev/disk/by-path (mode="host"). John

On Thu, Apr 28, 2016 at 12:04:47PM +0200, Michal Privoznik wrote:
So, after couple of sleepless nights and headaches I'm proud to announce that finally got this working.
What? Our regular streams that are can be used to transfer disk images for domains are unaware of any sparseness. Therefore they have two limitations:
a) transferring big but sparse image can take ages as all the holes (interpreted by kernel as '\0') have to go through our event loop. b) resulting volume is not sparse even if the source was.
How? I went by verified approach that linux kernel has. One way to look at our streams is just like read() and write() with a different names: virStreamRecv() and virStreamSend(). They even have the same prototype (if 'int fd' is substituted with 'virStreamPtr'). Now, holes in files are created and detected via third API: lseek(). Therefore I'm introducing new virStreamSkip() API that mimics the missing primitive. Now, because our streams do not necessarily have to work over files (they are for generic data transfer), I had to let users register a callback that is called whenever the other side calls virStreamSkip().
So now that we have all three primitives, we can focus on making life easier for our users. Nobody is actually using bare virStreamSend() and virStreamRecv() rather than our wrappers: virStreamSendAll() and virStreamRecvAll(). With my approach described above just virStreamSendAll() needs to be adjusted so that it's 'sparse file' aware. The virStreamRecvAll() will only get the data to write (just like it is now) with skip callback called automatically whenever needed. In order for virStreamSendAll() to skip holes I'm introducing yet another callback: virStreamInDataFunc(). This callback will help us to create a map of a file: before each virStreamSend() it checks whether we are in a data section or a hole and calls virStreamSend() or virStreamSkip() respectively.
Do not worry - it will all become clear once you see the code.
Now question is - how will users enable this feature? I mean, we have take into account that we might be talking to an older daemon that does not know how to skip a hole. Or vice versa - older client. The solution I came up with is to introduce flags to APIs where sparse streams make sense. I guess it makes sense for volume upload and download, but almost certainly makes no sense for virDomainOpenConsole().
From users POV they just need to pass correct argument to 'vol-upload' or 'vol-download' virsh commands. One layer down, on
Code? programming level they need to merely:
st = virStreamNew(conn, 0); virStreamRegisterSkip(st, skipFunc, &fd); virStorageVolDownload(st, ...); virStreamRecvAll(st, sinkFunc, &fd);
where:
int skipFunc(virStreamPtr st, unsigned long long offset, void *opaque) { int *fd = opaque; return lseek(*fd, offset, SEEK_CUR) == (off_t) -1 ? -1 : 0; }
And for uploading it's slightly more verbose - see patch 24.
While it looks ok-ish in the context of the RecvAll function, because you have skipFunc + recvFunc both being invoked asynchronously, this design feels a quite odd when used in combination with the plain virStreamRecv(). Just for a minute lets consider st = virStreamNew(conn, 0); virStreamRegisterSkip(st, skipFunc, &fd); virStorageVolDownload(st, ...); while (1) { char buf[4096]; virStreamRecv(st, buf, sizeof(buf)); ..do something with buf.. } I think it is quite unpleasant to have the skipFunc be called out of band to the code dealing with the Recv. I think it is preferrable that we have an explicit synchronous API for consuming the hole. The same really applies from POV of the upload side - I think we should be able to synchronously push the hole in, rather than having the asynchronous InData callback. IIUC, the reason you've chosen this async callback approach is because the virStream{Send,Recv,SendAll,RecvAll} methods are not extensible to cope with holes. I'm thinking though that we'd get a more attractive public API by creating hole friendly versions of virStream{Send,Recv,etc} instead of going the extra callback route. The download side could we made to work if we had a flag for virStreamRecv that instructed it to only read data up to the next hole, and defined an explicit error code to use to indicate that we've hit a hole. This would be used thus: while (1) { char buf[4096]; int ret = virStreamRecvFlags(st, buf, len, VIR_STREAM_STOP_AT_HOLE); if (ret == -1) { virErrorPtr err = virGetLastError(); if (err.code == VIR_ERR_STREAM_HOLE) { ret = virStreamHoleSize(st, buf); ...seek ret bytes in target... } else { return -1; } } else { ...write buf to target... } } To make this work with virStreamRecvAll, we would need to add a virStreamSparseRecvAll() which had two callbacks eg typedef int (*virStreamSinkHoleFunc)(virStreamPtr st, unsigned long long len): virStreamSparseRecvAll(virStreamPtr stream, virStreamSinkFunc handler, virStreamSinkHoleFunc holeHandler, void *opaque) { while (1) { char buf[4096]; int ret = virStreamRecvFlags(st, buf, len, VIR_STREAM_STOP_AT_HOLE); if (ret == -1) { virErrorPtr err = virGetLastError(); if (err.code == VIR_ERR_STREAM_HOLE) { ret = virStreamHoleSize(st, buf); holeHandler(st, ret); } else { return -1; } } else { handler(st, buf, ret); } } } Now considering the upload side of things, the virStreamSend function doesn't actually need cahnges, though it could do with a flags parameter for best practice. We just need the virStreamSkip() function you already add. while (1) { char buf[4096]; if (..in hole...) { ..get hole size... virStreamSkip(st, len); } else { ...read N bytes... virStreamSend(st, buf, len); } } The SendAll method is again more complicated as the callback we pass into it is insufficient to figure out if we have holes. We could add a virStreamSparseSendAll() which had two callbacks again: typedef int (*virStreamSourceHoleFunc)(holeHandler); This returns the length of the current hole, or 0 if not at a hole. virStreamSparseSendAll(virStreamPtr stream, virStreamSourceFunc handler, virStreamSourceHoleFunc holeHandler, void *opaque) while (1) { char buf[4096]; int ret = holeHandler(st); if (ret > 0) { virStreamSkip(st, ret); } else { ret = handler(st, buf, sizeof(buf); virStreamSend(st, buf, ret); } } } So we would avoid the virStreamInData and virStreamRegisterInData and virStreamRegisterSkip and virStreamSkipCallback methods Regards, Daniel -- |: http://berrange.com -o- http://www.flickr.com/photos/dberrange/ :| |: http://libvirt.org -o- http://virt-manager.org :| |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :| |: http://entangle-photo.org -o- http://live.gnome.org/gtk-vnc :|

On 05.05.2016 18:10, Daniel P. Berrange wrote:
On Thu, Apr 28, 2016 at 12:04:47PM +0200, Michal Privoznik wrote:
<snip/>
While it looks ok-ish in the context of the RecvAll function, because you have skipFunc + recvFunc both being invoked asynchronously, this design feels a quite odd when used in combination with the plain virStreamRecv().
Just for a minute lets consider
st = virStreamNew(conn, 0); virStreamRegisterSkip(st, skipFunc, &fd); virStorageVolDownload(st, ...);
while (1) { char buf[4096]; virStreamRecv(st, buf, sizeof(buf));
..do something with buf.. }
I think it is quite unpleasant to have the skipFunc be called out of band to the code dealing with the Recv.
I think it is preferrable that we have an explicit synchronous API for consuming the hole.
The same really applies from POV of the upload side - I think we should be able to synchronously push the hole in, rather than having the asynchronous InData callback.
IIUC, the reason you've chosen this async callback approach is because the virStream{Send,Recv,SendAll,RecvAll} methods are not extensible to cope with holes.
I'm thinking though that we'd get a more attractive public API by creating hole friendly versions of virStream{Send,Recv,etc} instead of going the extra callback route.
The download side could we made to work if we had a flag for virStreamRecv that instructed it to only read data up to the next hole, and defined an explicit error code to use to indicate that we've hit a hole.
This would be used thus:
while (1) { char buf[4096];
int ret = virStreamRecvFlags(st, buf, len, VIR_STREAM_STOP_AT_HOLE); if (ret == -1) { virErrorPtr err = virGetLastError(); if (err.code == VIR_ERR_STREAM_HOLE) { ret = virStreamHoleSize(st, buf);
Could this be made even more friendlier by returning a special value (say -2) in case of a hole?
...seek ret bytes in target... } else { return -1; } } else { ...write buf to target... } }
Okay, so imagine we have STREAM_SKIP packet incoming what should happen if it is processed by bare virStreamRead()? IOW user requests sparse streams but sticks to calling old virStreamRecv(). One thing that I'm worried here about is that @flags of virStreamRecvFlags() would not be transferred to the sender side as one might expect.
To make this work with virStreamRecvAll, we would need to add a virStreamSparseRecvAll() which had two callbacks eg
typedef int (*virStreamSinkHoleFunc)(virStreamPtr st, unsigned long long len):
virStreamSparseRecvAll(virStreamPtr stream, virStreamSinkFunc handler, virStreamSinkHoleFunc holeHandler, void *opaque) { while (1) { char buf[4096];
int ret = virStreamRecvFlags(st, buf, len, VIR_STREAM_STOP_AT_HOLE); if (ret == -1) { virErrorPtr err = virGetLastError(); if (err.code == VIR_ERR_STREAM_HOLE) { ret = virStreamHoleSize(st, buf); holeHandler(st, ret); } else { return -1; } } else { handler(st, buf, ret); } } }
Now considering the upload side of things, the virStreamSend function doesn't actually need cahnges, though it could do with a flags parameter for best practice. We just need the virStreamSkip() function you already add.
while (1) { char buf[4096]; if (..in hole...) { ..get hole size... virStreamSkip(st, len); } else { ...read N bytes... virStreamSend(st, buf, len); } }
The SendAll method is again more complicated as the callback we pass into it is insufficient to figure out if we have holes. We could add a virStreamSparseSendAll() which had two callbacks again:
typedef int (*virStreamSourceHoleFunc)(holeHandler);
This returns the length of the current hole, or 0 if not at a hole.
virStreamSparseSendAll(virStreamPtr stream, virStreamSourceFunc handler, virStreamSourceHoleFunc holeHandler, void *opaque) while (1) { char buf[4096]; int ret = holeHandler(st); if (ret > 0) { virStreamSkip(st, ret); } else { ret = handler(st, buf, sizeof(buf); virStreamSend(st, buf, ret); } } }
So we would avoid the virStreamInData and virStreamRegisterInData and virStreamRegisterSkip and virStreamSkipCallback methods
Okay. Makes sense. Let me see if I could reuse my RPC implementation, because that had been the hardest part to implement of them all. Michal

On Mon, May 09, 2016 at 04:43:44PM +0200, Michal Privoznik wrote:
On 05.05.2016 18:10, Daniel P. Berrange wrote:
On Thu, Apr 28, 2016 at 12:04:47PM +0200, Michal Privoznik wrote:
<snip/>
While it looks ok-ish in the context of the RecvAll function, because you have skipFunc + recvFunc both being invoked asynchronously, this design feels a quite odd when used in combination with the plain virStreamRecv().
Just for a minute lets consider
st = virStreamNew(conn, 0); virStreamRegisterSkip(st, skipFunc, &fd); virStorageVolDownload(st, ...);
while (1) { char buf[4096]; virStreamRecv(st, buf, sizeof(buf));
..do something with buf.. }
I think it is quite unpleasant to have the skipFunc be called out of band to the code dealing with the Recv.
I think it is preferrable that we have an explicit synchronous API for consuming the hole.
The same really applies from POV of the upload side - I think we should be able to synchronously push the hole in, rather than having the asynchronous InData callback.
IIUC, the reason you've chosen this async callback approach is because the virStream{Send,Recv,SendAll,RecvAll} methods are not extensible to cope with holes.
I'm thinking though that we'd get a more attractive public API by creating hole friendly versions of virStream{Send,Recv,etc} instead of going the extra callback route.
The download side could we made to work if we had a flag for virStreamRecv that instructed it to only read data up to the next hole, and defined an explicit error code to use to indicate that we've hit a hole.
This would be used thus:
while (1) { char buf[4096];
int ret = virStreamRecvFlags(st, buf, len, VIR_STREAM_STOP_AT_HOLE); if (ret == -1) { virErrorPtr err = virGetLastError(); if (err.code == VIR_ERR_STREAM_HOLE) { ret = virStreamHoleSize(st, buf);
Could this be made even more friendlier by returning a special value (say -2) in case of a hole?
...seek ret bytes in target... } else { return -1; } } else { ...write buf to target... } }
Okay, so imagine we have STREAM_SKIP packet incoming what should happen if it is processed by bare virStreamRead()? IOW user requests sparse streams but sticks to calling old virStreamRecv().
Given that the app has to pass in the flag VIR_STORAGE_VOL_UPLOAD_SPARSE_STREAM (or similar), we could declare that it is an error to use the plain virStreamRecv when you have already asked for a sparse stream. Alternatively, we could just make virStreamRecv give back a buffer full of zeros :-)
One thing that I'm worried here about is that @flags of virStreamRecvFlags() would not be transferred to the sender side as one might expect.
Yep, that is true, but then everything about the stream APIs is a bit different from the rest of the API.
To make this work with virStreamRecvAll, we would need to add a virStreamSparseRecvAll() which had two callbacks eg
typedef int (*virStreamSinkHoleFunc)(virStreamPtr st, unsigned long long len):
virStreamSparseRecvAll(virStreamPtr stream, virStreamSinkFunc handler, virStreamSinkHoleFunc holeHandler, void *opaque) { while (1) { char buf[4096];
int ret = virStreamRecvFlags(st, buf, len, VIR_STREAM_STOP_AT_HOLE); if (ret == -1) { virErrorPtr err = virGetLastError(); if (err.code == VIR_ERR_STREAM_HOLE) { ret = virStreamHoleSize(st, buf); holeHandler(st, ret); } else { return -1; } } else { handler(st, buf, ret); } } }
Now considering the upload side of things, the virStreamSend function doesn't actually need cahnges, though it could do with a flags parameter for best practice. We just need the virStreamSkip() function you already add.
while (1) { char buf[4096]; if (..in hole...) { ..get hole size... virStreamSkip(st, len); } else { ...read N bytes... virStreamSend(st, buf, len); } }
The SendAll method is again more complicated as the callback we pass into it is insufficient to figure out if we have holes. We could add a virStreamSparseSendAll() which had two callbacks again:
typedef int (*virStreamSourceHoleFunc)(holeHandler);
This returns the length of the current hole, or 0 if not at a hole.
virStreamSparseSendAll(virStreamPtr stream, virStreamSourceFunc handler, virStreamSourceHoleFunc holeHandler, void *opaque) while (1) { char buf[4096]; int ret = holeHandler(st); if (ret > 0) { virStreamSkip(st, ret); } else { ret = handler(st, buf, sizeof(buf); virStreamSend(st, buf, ret); } } }
So we would avoid the virStreamInData and virStreamRegisterInData and virStreamRegisterSkip and virStreamSkipCallback methods
Okay. Makes sense. Let me see if I could reuse my RPC implementation, because that had been the hardest part to implement of them all.
Based on my understanding of your RPC design, I think it should still be ok as is. Regards, Daniel -- |: http://berrange.com -o- http://www.flickr.com/photos/dberrange/ :| |: http://libvirt.org -o- http://virt-manager.org :| |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :| |: http://entangle-photo.org -o- http://live.gnome.org/gtk-vnc :|
participants (4)
-
Daniel P. Berrange
-
Eric Blake
-
John Ferlan
-
Michal Privoznik