[libvirt] [PATCH v2 00/32] Introduce sparse streams

v2 of: https://www.redhat.com/archives/libvir-list/2016-April/msg01869.html diff to v1: - Reworked public API side of the feature. Event approach is no longer used. Instead I've implemented synchronous APIs as Dan suggested. You can find the code at my github repo too: https://github.com/zippy2/libvirt/tree/sparse_streams2 BTW: here is a measurement I made: homer libvirt.git # time ./tools/virsh -c qemu+tcp://lisa/system vol-download /mnt/cdrom/dummy.img /tmp/dummy.img real 3m9.178s user 0m32.978s sys 1m7.758s homer libvirt.git # time ./tools/virsh -c qemu+tcp://lisa/system vol-download --sparse /mnt/cdrom/dummy.img /tmp/dummy.img real 0m1.409s user 0m0.017s sys 0m1.042s masina libvirt.git # ls -lhs /tmp/dummy.img 4.0K -rw-r--r-- 1 root root 21G May 23 17:53 /tmp/dummy.img Michal Privoznik (32): Introduce virStreamRecvFlags Implement virStreamRecvFlags to some drivers Introduce virStreamSkip Introduce virStreamHoleSize Introduce VIR_STREAM_RECV_STOP_AT_HOLE flag Introduce virStreamSparseRecvAll Introduce virStreamSparseSendAll Introduce virStreamInData virNetClientStreamNew: Track origin stream Track if stream is skippable RPC: Introduce virNetStreamSkip Introduce VIR_NET_STREAM_SKIP message type Teach wireshark plugin about VIR_NET_STREAM_SKIP daemon: Introduce virNetServerProgramSendStreamSkip virnetclientstream: Introduce virNetClientStreamSendSkip daemon: Implement VIR_NET_STREAM_SKIP handling virnetclientstream: Introduce virNetClientStreamHandleSkip remote_driver: Implement virStreamSkip virNetClientStreamRecvPacket: Introduce @flags argument Introduce virNetClientStreamHoleSize remote: Implement virStreamHoleSize virNetClientStream: Wire up VIR_NET_STREAM_SKIP remote_driver: Implement VIR_STREAM_RECV_STOP_AT_HOLE daemonStreamHandleRead: Wire up seekable stream fdstream: Implement seek gendispatch: Introduce @sparseflag for our calls Introduce virStorageVol{Download,Upload}Flags virsh: Implement sparse stream to vol-download virsh: Implement sparse stream to vol-upload daemon: Don't call virStreamInData so often fdstream: Suppress use of IO helper for sparse streams storage: Enable sparse streams for virStorageVol{Download,Upload} daemon/remote.c | 2 +- daemon/stream.c | 147 +++++++++++- daemon/stream.h | 3 +- include/libvirt/libvirt-storage.h | 9 + include/libvirt/libvirt-stream.h | 86 ++++++- src/driver-stream.h | 23 ++ src/esx/esx_stream.c | 16 +- src/fdstream.c | 117 ++++++++- src/libvirt-storage.c | 4 +- src/libvirt-stream.c | 454 +++++++++++++++++++++++++++++++++++ src/libvirt_internal.h | 3 + src/libvirt_private.syms | 1 + src/libvirt_public.syms | 10 + src/libvirt_remote.syms | 3 + src/remote/remote_driver.c | 89 ++++++- src/remote/remote_protocol.x | 2 + src/rpc/gendispatch.pl | 21 +- src/rpc/virnetclient.c | 1 + src/rpc/virnetclientstream.c | 203 +++++++++++++++- src/rpc/virnetclientstream.h | 17 +- src/rpc/virnetprotocol.x | 16 +- src/rpc/virnetserverprogram.c | 33 +++ src/rpc/virnetserverprogram.h | 7 + src/storage/storage_backend.c | 6 +- src/storage/storage_driver.c | 4 +- src/virnetprotocol-structs | 4 + tools/virsh-volume.c | 49 ++-- tools/virsh.c | 93 +++++++ tools/virsh.h | 17 ++ tools/virsh.pod | 6 +- tools/wireshark/src/packet-libvirt.c | 40 +++ tools/wireshark/src/packet-libvirt.h | 2 + 32 files changed, 1419 insertions(+), 69 deletions(-) -- 2.8.3

Although we already have virStreamRecv, just like some other older APIs it is missing @flags argument. This means, the function is not that flexible and therefore we need virStreamRecvFlags. The latter is going to be needed when we will want it to stop at a hole in stream. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- include/libvirt/libvirt-stream.h | 5 ++++ src/driver-stream.h | 7 +++++ src/libvirt-stream.c | 60 ++++++++++++++++++++++++++++++++++++++++ src/libvirt_public.syms | 6 ++++ 4 files changed, 78 insertions(+) diff --git a/include/libvirt/libvirt-stream.h b/include/libvirt/libvirt-stream.h index 831640d..bee2516 100644 --- a/include/libvirt/libvirt-stream.h +++ b/include/libvirt/libvirt-stream.h @@ -45,6 +45,11 @@ int virStreamRecv(virStreamPtr st, char *data, size_t nbytes); +int virStreamRecvFlags(virStreamPtr st, + char *data, + size_t nbytes, + unsigned int flags); + /** * virStreamSourceFunc: diff --git a/src/driver-stream.h b/src/driver-stream.h index 85b4e3b..d4b0480 100644 --- a/src/driver-stream.h +++ b/src/driver-stream.h @@ -36,6 +36,12 @@ typedef int size_t nbytes); typedef int +(*virDrvStreamRecvFlags)(virStreamPtr st, + char *data, + size_t nbytes, + unsigned int flags); + +typedef int (*virDrvStreamEventAddCallback)(virStreamPtr stream, int events, virStreamEventCallback cb, @@ -61,6 +67,7 @@ typedef virStreamDriver *virStreamDriverPtr; struct _virStreamDriver { virDrvStreamSend streamSend; virDrvStreamRecv streamRecv; + virDrvStreamRecvFlags streamRecvFlags; virDrvStreamEventAddCallback streamEventAddCallback; virDrvStreamEventUpdateCallback streamEventUpdateCallback; virDrvStreamEventRemoveCallback streamEventRemoveCallback; diff --git a/src/libvirt-stream.c b/src/libvirt-stream.c index 8384b37..80b2d47 100644 --- a/src/libvirt-stream.c +++ b/src/libvirt-stream.c @@ -286,6 +286,66 @@ virStreamRecv(virStreamPtr stream, /** + * virStreamRecvFlags: + * @stream: pointer to the stream object + * @data: buffer to read into from stream + * @nbytes: size of @data buffer + * @flags: extra flags; not used yet, so callers should always pass 0 + * + * Reads a series of bytes from the stream. This method may + * block the calling application for an arbitrary amount + * of time. + * + * This is just like virStreamRecv except this one has extra + * @flags. In fact, calling virStreamRecvFlags(stream, data, + * nbytes, 0) is equivalent to calling virStreamRecv(stream, + * data, nbytes) and vice versa. + * + * Returns 0 when the end of the stream is reached, at + * which time the caller should invoke virStreamFinish() + * to get confirmation of stream completion. + * + * Returns -1 upon error, at which time the stream will + * be marked as aborted, and the caller should now release + * the stream with virStreamFree. + * + * Returns -2 if there is no data pending to be read & the + * stream is marked as non-blocking. + */ +int +virStreamRecvFlags(virStreamPtr stream, + char *data, + size_t nbytes, + unsigned int flags) +{ + VIR_DEBUG("stream=%p, data=%p, nbytes=%zi flags=%x", + stream, data, nbytes, flags); + + virResetLastError(); + + virCheckStreamReturn(stream, -1); + virCheckNonNullArgGoto(data, error); + + if (stream->driver && + stream->driver->streamRecvFlags) { + int ret; + ret = (stream->driver->streamRecvFlags)(stream, data, nbytes, flags); + if (ret == -2) + return -2; + if (ret < 0) + goto error; + return ret; + } + + virReportUnsupportedError(); + + error: + virDispatchError(stream->conn); + return -1; +} + + +/** * virStreamSendAll: * @stream: pointer to the stream object * @handler: source callback for reading data from application diff --git a/src/libvirt_public.syms b/src/libvirt_public.syms index 1e920d6..4cb0ce6 100644 --- a/src/libvirt_public.syms +++ b/src/libvirt_public.syms @@ -732,4 +732,10 @@ LIBVIRT_1.3.3 { virDomainSetPerfEvents; } LIBVIRT_1.2.19; +LIBVIRT_1.3.5 { + global: + virStreamRecvFlags; +} LIBVIRT_1.3.3; + + # .... define new API here using predicted next version number .... -- 2.8.3

We have three virStreamDriver-s currently in our tree. virFDStream, remote driver and ESX driver.f or now, support for remote driver and ESX driver is sufficient, because implementation for virFDStream is going to be supplied later as it needs to be slightly different. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/esx/esx_stream.c | 16 +++++++++++++++- src/remote/remote_driver.c | 21 +++++++++++++++++---- 2 files changed, 32 insertions(+), 5 deletions(-) diff --git a/src/esx/esx_stream.c b/src/esx/esx_stream.c index fb9abbc..b820b38 100644 --- a/src/esx/esx_stream.c +++ b/src/esx/esx_stream.c @@ -252,12 +252,17 @@ esxStreamSend(virStreamPtr stream, const char *data, size_t nbytes) } static int -esxStreamRecv(virStreamPtr stream, char *data, size_t nbytes) +esxStreamRecvFlags(virStreamPtr stream, + char *data, + size_t nbytes, + unsigned int flags) { int result = -1; esxStreamPrivate *priv = stream->privateData; int status; + virCheckFlags(0, -1); + if (nbytes == 0) return 0; @@ -317,6 +322,14 @@ esxStreamRecv(virStreamPtr stream, char *data, size_t nbytes) return result; } +static int +esxStreamRecv(virStreamPtr stream, + char *data, + size_t nbytes) +{ + return esxStreamRecvFlags(stream, data, nbytes, 0); +} + static void esxFreeStreamPrivate(esxStreamPrivate **priv) { @@ -369,6 +382,7 @@ esxStreamAbort(virStreamPtr stream) virStreamDriver esxStreamDriver = { .streamSend = esxStreamSend, .streamRecv = esxStreamRecv, + .streamRecvFlags = esxStreamRecvFlags, /* FIXME: streamAddCallback missing */ /* FIXME: streamUpdateCallback missing */ /* FIXME: streamRemoveCallback missing */ diff --git a/src/remote/remote_driver.c b/src/remote/remote_driver.c index e3cf5fb..883de13 100644 --- a/src/remote/remote_driver.c +++ b/src/remote/remote_driver.c @@ -5085,15 +5085,19 @@ remoteStreamSend(virStreamPtr st, static int -remoteStreamRecv(virStreamPtr st, - char *data, - size_t nbytes) +remoteStreamRecvFlags(virStreamPtr st, + char *data, + size_t nbytes, + unsigned int flags) { - VIR_DEBUG("st=%p data=%p nbytes=%zu", st, data, nbytes); + VIR_DEBUG("st=%p data=%p nbytes=%zu flags=%x", + st, data, nbytes, flags); struct private_data *priv = st->conn->privateData; virNetClientStreamPtr privst = st->privateData; int rv; + virCheckFlags(0, -1); + if (virNetClientStreamRaiseError(privst)) return -1; @@ -5115,6 +5119,14 @@ remoteStreamRecv(virStreamPtr st, return rv; } +static int +remoteStreamRecv(virStreamPtr st, + char *data, + size_t nbytes) +{ + return remoteStreamRecv(st, data, nbytes); +} + struct remoteStreamCallbackData { virStreamPtr st; virStreamEventCallback cb; @@ -5287,6 +5299,7 @@ remoteStreamAbort(virStreamPtr st) static virStreamDriver remoteStreamDrv = { .streamRecv = remoteStreamRecv, + .streamRecvFlags = remoteStreamRecvFlags, .streamSend = remoteStreamSend, .streamFinish = remoteStreamFinish, .streamAbort = remoteStreamAbort, -- 2.8.3

This API can be used to tell the other side of the stream to skip some bytes in the stream. This can be used to create a sparse file on the receiving side of a stream. It takes just one argument @length, which says how big the hole is. Since our streams are not rewindable like regular files, we don't need @whence argument like seek(2) has. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- include/libvirt/libvirt-stream.h | 3 +++ src/driver-stream.h | 5 ++++ src/libvirt-stream.c | 57 ++++++++++++++++++++++++++++++++++++++++ src/libvirt_public.syms | 1 + 4 files changed, 66 insertions(+) diff --git a/include/libvirt/libvirt-stream.h b/include/libvirt/libvirt-stream.h index bee2516..4e0a599 100644 --- a/include/libvirt/libvirt-stream.h +++ b/include/libvirt/libvirt-stream.h @@ -50,6 +50,9 @@ int virStreamRecvFlags(virStreamPtr st, size_t nbytes, unsigned int flags); +int virStreamSkip(virStreamPtr st, + unsigned long long length); + /** * virStreamSourceFunc: diff --git a/src/driver-stream.h b/src/driver-stream.h index d4b0480..20ea13f 100644 --- a/src/driver-stream.h +++ b/src/driver-stream.h @@ -42,6 +42,10 @@ typedef int unsigned int flags); typedef int +(*virDrvStreamSkip)(virStreamPtr st, + unsigned long long length); + +typedef int (*virDrvStreamEventAddCallback)(virStreamPtr stream, int events, virStreamEventCallback cb, @@ -68,6 +72,7 @@ struct _virStreamDriver { virDrvStreamSend streamSend; virDrvStreamRecv streamRecv; virDrvStreamRecvFlags streamRecvFlags; + virDrvStreamSkip streamSkip; virDrvStreamEventAddCallback streamEventAddCallback; virDrvStreamEventUpdateCallback streamEventUpdateCallback; virDrvStreamEventRemoveCallback streamEventRemoveCallback; diff --git a/src/libvirt-stream.c b/src/libvirt-stream.c index 80b2d47..55f3ef5 100644 --- a/src/libvirt-stream.c +++ b/src/libvirt-stream.c @@ -346,6 +346,63 @@ virStreamRecvFlags(virStreamPtr stream, /** + * virStreamSkip: + * @stream: pointer to the stream object + * @length: number of bytes to skip + * + * Skip @length bytes in the stream. This is useful when there's + * no actual data in the stream, just a hole. If that's the case, + * this API can be used to skip the hole properly instead of + * transmitting zeroes to the other side. + * + * An example using this with a hypothetical file upload API + * looks like: + * + * virStream st; + * + * while (1) { + * char buf[4096]; + * size_t len; + * if (..in hole...) { + * ..get hole size... + * virStreamSkip(st, len); + * } else { + * ...read len bytes... + * virStreamSend(st, buf, len); + * } + * } + * + * Returns 0 on success, + * -1 error + */ +int +virStreamSkip(virStreamPtr stream, + unsigned long long length) +{ + VIR_DEBUG("stream=%p, length=%llu", stream, length); + + virResetLastError(); + + virCheckStreamReturn(stream, -1); + + if (stream->driver && + stream->driver->streamSkip) { + int ret; + ret = (stream->driver->streamSkip)(stream, length); + if (ret < 0) + goto error; + return ret; + } + + virReportUnsupportedError(); + + error: + virDispatchError(stream->conn); + return -1; +} + + +/** * virStreamSendAll: * @stream: pointer to the stream object * @handler: source callback for reading data from application diff --git a/src/libvirt_public.syms b/src/libvirt_public.syms index 4cb0ce6..d781a5b 100644 --- a/src/libvirt_public.syms +++ b/src/libvirt_public.syms @@ -735,6 +735,7 @@ LIBVIRT_1.3.3 { LIBVIRT_1.3.5 { global: virStreamRecvFlags; + virStreamSkip; } LIBVIRT_1.3.3; -- 2.8.3

This function is basically a counterpart for virStreamSkip. If one side of a stream called virStreamSkip() the other should call virStreamHoleSize() to get the size of the hole. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- include/libvirt/libvirt-stream.h | 3 +++ src/driver-stream.h | 5 +++++ src/libvirt-stream.c | 42 ++++++++++++++++++++++++++++++++++++++++ src/libvirt_public.syms | 1 + 4 files changed, 51 insertions(+) diff --git a/include/libvirt/libvirt-stream.h b/include/libvirt/libvirt-stream.h index 4e0a599..2ebda74 100644 --- a/include/libvirt/libvirt-stream.h +++ b/include/libvirt/libvirt-stream.h @@ -53,6 +53,9 @@ int virStreamRecvFlags(virStreamPtr st, int virStreamSkip(virStreamPtr st, unsigned long long length); +int virStreamHoleSize(virStreamPtr, + unsigned long long *length); + /** * virStreamSourceFunc: diff --git a/src/driver-stream.h b/src/driver-stream.h index 20ea13f..e196b6d 100644 --- a/src/driver-stream.h +++ b/src/driver-stream.h @@ -46,6 +46,10 @@ typedef int unsigned long long length); typedef int +(*virDrvStreamHoleSize)(virStreamPtr st, + unsigned long long *length); + +typedef int (*virDrvStreamEventAddCallback)(virStreamPtr stream, int events, virStreamEventCallback cb, @@ -73,6 +77,7 @@ struct _virStreamDriver { virDrvStreamRecv streamRecv; virDrvStreamRecvFlags streamRecvFlags; virDrvStreamSkip streamSkip; + virDrvStreamHoleSize streamHoleSize; virDrvStreamEventAddCallback streamEventAddCallback; virDrvStreamEventUpdateCallback streamEventUpdateCallback; virDrvStreamEventRemoveCallback streamEventRemoveCallback; diff --git a/src/libvirt-stream.c b/src/libvirt-stream.c index 55f3ef5..3ac9e0d 100644 --- a/src/libvirt-stream.c +++ b/src/libvirt-stream.c @@ -403,6 +403,48 @@ virStreamSkip(virStreamPtr stream, /** + * virStreamHoleSize: + * @stream: pointer to the stream object + * @length: number of bytes to skip + * + * This function is a counterpart to virStreamSkip(). That is, if + * one side of a stream has called virStreamSkip() the other side + * of the stream should call virStreamHoleSize() to retrieve the + * size of hole. If there's currently no hole in the stream, -1 + * is returned. + * + * Returns 0 on success, + * -1 on error + */ +int +virStreamHoleSize(virStreamPtr stream, + unsigned long long *length) +{ + VIR_DEBUG("stream=%p, length=%p", stream, length); + + virResetLastError(); + + virCheckStreamReturn(stream, -1); + virCheckNonNullArgReturn(length, -1); + + if (stream->driver && + stream->driver->streamHoleSize) { + int ret; + ret = (stream->driver->streamHoleSize)(stream, length); + if (ret < 0) + goto error; + return ret; + } + + virReportUnsupportedError(); + + error: + virDispatchError(stream->conn); + return -1; +} + + +/** * virStreamSendAll: * @stream: pointer to the stream object * @handler: source callback for reading data from application diff --git a/src/libvirt_public.syms b/src/libvirt_public.syms index d781a5b..6d37ddb 100644 --- a/src/libvirt_public.syms +++ b/src/libvirt_public.syms @@ -734,6 +734,7 @@ LIBVIRT_1.3.3 { LIBVIRT_1.3.5 { global: + virStreamHoleSize; virStreamRecvFlags; virStreamSkip; } LIBVIRT_1.3.3; -- 2.8.3

This flag is for virStreamRecvFlags API. Its purpose is to stop reading from the stream if a hole occurred as holes are to be threated separately. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- include/libvirt/libvirt-stream.h | 4 ++++ src/libvirt-stream.c | 30 +++++++++++++++++++++++++++++- 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/include/libvirt/libvirt-stream.h b/include/libvirt/libvirt-stream.h index 2ebda74..23fcc26 100644 --- a/include/libvirt/libvirt-stream.h +++ b/include/libvirt/libvirt-stream.h @@ -45,6 +45,10 @@ int virStreamRecv(virStreamPtr st, char *data, size_t nbytes); +typedef enum { + VIR_STREAM_RECV_STOP_AT_HOLE = (1 << 0), +} virStreamRecvFlagsValues; + int virStreamRecvFlags(virStreamPtr st, char *data, size_t nbytes, diff --git a/src/libvirt-stream.c b/src/libvirt-stream.c index 3ac9e0d..1162d33 100644 --- a/src/libvirt-stream.c +++ b/src/libvirt-stream.c @@ -290,7 +290,7 @@ virStreamRecv(virStreamPtr stream, * @stream: pointer to the stream object * @data: buffer to read into from stream * @nbytes: size of @data buffer - * @flags: extra flags; not used yet, so callers should always pass 0 + * @flags: bitwise-OR of virStreamRecvFlagsValues * * Reads a series of bytes from the stream. This method may * block the calling application for an arbitrary amount @@ -301,6 +301,29 @@ virStreamRecv(virStreamPtr stream, * nbytes, 0) is equivalent to calling virStreamRecv(stream, * data, nbytes) and vice versa. * + * If flag VIR_STREAM_RECV_STOP_AT_HOLE is set, this function + * will stop reading from stream if it has reached a hole. In + * that case, -3 is returned and virStreamHoleSize() should be + * called to get the hole size. An example using this flag might + * look like this: + * + * while (1) { + * char buf[4096]; + * + * int ret = virStreamRecvFlags(st, buf, len, VIR_STREAM_STOP_AT_HOLE); + * if (ret < 0) { + * if (ret == -3) { + * unsigned long long len; + * ret = virStreamHoleSize(st, &len); + * ...seek len bytes in target... + * } else { + * return -1; + * } + * } else { + * ...write buf to target... + * } + * } + * * Returns 0 when the end of the stream is reached, at * which time the caller should invoke virStreamFinish() * to get confirmation of stream completion. @@ -311,6 +334,9 @@ virStreamRecv(virStreamPtr stream, * * Returns -2 if there is no data pending to be read & the * stream is marked as non-blocking. + * + * Returns -3 if there is a hole in stream and caller requested + * to stop at a hole. */ int virStreamRecvFlags(virStreamPtr stream, @@ -332,6 +358,8 @@ virStreamRecvFlags(virStreamPtr stream, ret = (stream->driver->streamRecvFlags)(stream, data, nbytes, flags); if (ret == -2) return -2; + if (ret == -3) + return -3; if (ret < 0) goto error; return ret; -- 2.8.3

This is just a wrapper over new functions that have been just introduced: virStreamRecvFlags(), virStreamHoleSize(). It's very similar to virStreamRecvAll() except it handles sparse streams well. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- include/libvirt/libvirt-stream.h | 28 ++++++++- src/libvirt-stream.c | 120 +++++++++++++++++++++++++++++++++++++++ src/libvirt_public.syms | 1 + 3 files changed, 146 insertions(+), 3 deletions(-) diff --git a/include/libvirt/libvirt-stream.h b/include/libvirt/libvirt-stream.h index 23fcc26..e5f5126 100644 --- a/include/libvirt/libvirt-stream.h +++ b/include/libvirt/libvirt-stream.h @@ -102,9 +102,9 @@ int virStreamSendAll(virStreamPtr st, * @nbytes: size of the data array * @opaque: optional application provided data * - * The virStreamSinkFunc callback is used together - * with the virStreamRecvAll function for libvirt to - * provide the data that has been received. + * The virStreamSinkFunc callback is used together with the + * virStreamRecvAll or virStreamSparseRecvAll functions for + * libvirt to provide the data that has been received. * * The callback will be invoked multiple times, * providing data in small chunks. The application @@ -127,6 +127,28 @@ int virStreamRecvAll(virStreamPtr st, virStreamSinkFunc handler, void *opaque); +/** + * virStreamSinkHoleFunc: + * @st: the stream object + * @length: stream hole size + * @opaque: optional application provided data + * + * This callback is used together with the virStreamSparseRecvAll + * function for libvirt to provide the size of a hole that + * occurred in the stream. + * + * Returns 0 on success, + * -1 upon error + */ +typedef int (*virStreamSinkHoleFunc)(virStreamPtr st, + unsigned long long length, + void *opaque); + +int virStreamSparseRecvAll(virStreamPtr stream, + virStreamSinkFunc handler, + virStreamSinkHoleFunc holeHandler, + void *opaque); + typedef enum { VIR_STREAM_EVENT_READABLE = (1 << 0), VIR_STREAM_EVENT_WRITABLE = (1 << 1), diff --git a/src/libvirt-stream.c b/src/libvirt-stream.c index 1162d33..2e3e319 100644 --- a/src/libvirt-stream.c +++ b/src/libvirt-stream.c @@ -660,6 +660,126 @@ virStreamRecvAll(virStreamPtr stream, /** + * virStreamSparseRecvAll: + * @stream: pointer to the stream object + * @handler: sink callback for writing data to application + * @holeHandler: stream hole callback for skipping holes + * @opaque: application defined data + * + * Receive the entire data stream, sending the data to the + * requested data sink. This is simply a convenient alternative + * to virStreamRecv, for apps that do blocking-I/O. + * + * An example using this with a hypothetical file download + * API looks like: + * + * int mysink(virStreamPtr st, const char *buf, int nbytes, void *opaque) { + * int *fd = opaque; + * + * return write(*fd, buf, nbytes); + * } + * + * int myskip(virStreamPtr st, unsigned long long offset, void *opaque) { + * int *fd = opaque; + * + * return lseek(*fd, offset, SEEK_CUR) == (off_t) -1 ? -1 : 0; + * } + * + * virStreamPtr st = virStreamNew(conn, 0); + * int fd = open("demo.iso", O_WRONLY); + * + * virConnectDownloadSparseFile(conn, st); + * if (virStreamSparseRecvAll(st, mysink, myskip, &fd) < 0) { + * ...report an error ... + * goto done; + * } + * if (virStreamFinish(st) < 0) + * ...report an error... + * virStreamFree(st); + * close(fd); + * + * Note that @opaque data is shared between both @handler and + * @holeHandler callbacks. + * + * Returns 0 if all the data was successfully received. The caller + * should invoke virStreamFinish(st) to flush the stream upon + * success and then virStreamFree + * + * Returns -1 upon any error, with virStreamAbort() already + * having been called, so the caller need only call + * virStreamFree() + */ +int +virStreamSparseRecvAll(virStreamPtr stream, + virStreamSinkFunc handler, + virStreamSinkHoleFunc holeHandler, + void *opaque) +{ + char *bytes = NULL; + size_t want = VIR_NET_MESSAGE_LEGACY_PAYLOAD_MAX; + const unsigned int flags = VIR_STREAM_RECV_STOP_AT_HOLE; + int ret = -1; + + VIR_DEBUG("stream=%p handler=%p holeHandler=%p opaque=%p", + stream, handler, holeHandler, opaque); + + virResetLastError(); + + virCheckStreamReturn(stream, -1); + virCheckNonNullArgGoto(handler, cleanup); + virCheckNonNullArgGoto(holeHandler, cleanup); + + if (stream->flags & VIR_STREAM_NONBLOCK) { + virReportError(VIR_ERR_OPERATION_INVALID, "%s", + _("data sinks cannot be used for non-blocking streams")); + goto cleanup; + } + + if (VIR_ALLOC_N(bytes, want) < 0) + goto cleanup; + + for (;;) { + int got, offset = 0; + unsigned long long holeLen; + got = virStreamRecvFlags(stream, bytes, want, flags); + if (got == -3) { + if (virStreamHoleSize(stream, &holeLen) < 0) { + virStreamAbort(stream); + goto cleanup; + } + + if (holeHandler(stream, holeLen, opaque) < 0) { + virStreamAbort(stream); + goto cleanup; + } + } else { + if (got < 0) + goto cleanup; + } + if (got == 0) + break; + while (offset < got) { + int done; + done = (handler)(stream, bytes + offset, got - offset, opaque); + if (done < 0) { + virStreamAbort(stream); + goto cleanup; + } + offset += done; + } + } + ret = 0; + + cleanup: + VIR_FREE(bytes); + + if (ret != 0) + virDispatchError(stream->conn); + + return ret; +} + +/** * virStreamEventAddCallback: * @stream: pointer to the stream object * @events: set of events to monitor diff --git a/src/libvirt_public.syms b/src/libvirt_public.syms index 6d37ddb..2e7a9a7 100644 --- a/src/libvirt_public.syms +++ b/src/libvirt_public.syms @@ -737,6 +737,7 @@ LIBVIRT_1.3.5 { virStreamHoleSize; virStreamRecvFlags; virStreamSkip; + virStreamSparseRecvAll; } LIBVIRT_1.3.3; -- 2.8.3

Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- include/libvirt/libvirt-stream.h | 43 ++++++++++++++-- src/libvirt-stream.c | 104 +++++++++++++++++++++++++++++++++++++++ src/libvirt_public.syms | 1 + 3 files changed, 145 insertions(+), 3 deletions(-) diff --git a/include/libvirt/libvirt-stream.h b/include/libvirt/libvirt-stream.h index e5f5126..9aa728e 100644 --- a/include/libvirt/libvirt-stream.h +++ b/include/libvirt/libvirt-stream.h @@ -69,9 +69,9 @@ int virStreamHoleSize(virStreamPtr, * @nbytes: size of the data array * @opaque: optional application provided data * - * The virStreamSourceFunc callback is used together - * with the virStreamSendAll function for libvirt to - * obtain the data that is to be sent. + * The virStreamSourceFunc callback is used together with + * the virStreamSendAll and virStreamSparseSendAll functions + * for libvirt to obtain the data that is to be sent. * * The callback will be invoked multiple times, * fetching data in small chunks. The application @@ -95,6 +95,43 @@ int virStreamSendAll(virStreamPtr st, void *opaque); /** + * virStreamSourceHoleFunc: + * @st: the stream object + * @inData: are we in data section + * @length: how long is the section we are currently in + * @opaque: optional application provided data + * + * The virStreamSourceHoleFunc callback is used together + * with the virStreamSparseSendAll function for libvirt to + * obtain the length of section stream is currently in. + * + * Moreover, upon successful return, @length should be + * updated with how much bytes are there left until current + * section ends (be it data section or hole section) and if + * the stream is currently in data section, @inData should + * be set to a non-zero value and vice versa. + * As a corner case, there's an implicit hole at the end of + * each file. If that's the case, @inData should be set to 0 + * as well as @length. + * Moreover, this function should always leave the stream in + * data section. Either the one that we have been to prior + * calling this function, or the one that follows the hole + * we are in. + * + * Returns 0 on success, + * -1 upon error + */ +typedef int (*virStreamSourceHoleFunc)(virStreamPtr st, + int *inData, + unsigned long long *length, + void *opaque); + +int virStreamSparseSendAll(virStreamPtr st, + virStreamSourceFunc handler, + virStreamSourceHoleFunc holeHandler, + void *opaque); + +/** * virStreamSinkFunc: * * @st: the stream object diff --git a/src/libvirt-stream.c b/src/libvirt-stream.c index 2e3e319..707c0ed 100644 --- a/src/libvirt-stream.c +++ b/src/libvirt-stream.c @@ -565,7 +565,111 @@ virStreamSendAll(virStreamPtr stream, } + /** + * virStreamSparseSendAll: + * @stream: pointer to the stream object + * @handler: source callback for reading data from application + * @holeHandler: source callback for determining holes + * @opaque: application defined data + * + * Some dummy description here. + * + * Opaque data in @opaque are shared between @handler and @holeHandler. + * + * Returns 0 if all the data was successfully sent. The caller + * should invoke virStreamFinish(st) to flush the stream upon + * success and then virStreamFree + * + * Returns -1 upon any error, with virStreamAbort() already + * having been called, so the caller need only call + * virStreamFree() + */ +int virStreamSparseSendAll(virStreamPtr stream, + virStreamSourceFunc handler, + virStreamSourceHoleFunc holeHandler, + void *opaque) +{ + char *bytes = NULL; + size_t want = VIR_NET_MESSAGE_LEGACY_PAYLOAD_MAX; + int ret = -1; + unsigned long long dataLen = 0; + + VIR_DEBUG("stream=%p handler=%p holeHandler=%p opaque=%p", + stream, handler, holeHandler, opaque); + + virResetLastError(); + + virCheckStreamReturn(stream, -1); + virCheckNonNullArgGoto(handler, cleanup); + virCheckNonNullArgGoto(holeHandler, cleanup); + + if (stream->flags & VIR_STREAM_NONBLOCK) { + virReportError(VIR_ERR_OPERATION_INVALID, "%s", + _("data sources cannot be used for non-blocking streams")); + goto cleanup; + } + + if (VIR_ALLOC_N(bytes, want) < 0) + goto cleanup; + + for (;;) { + int inData, got, offset = 0; + unsigned long long sectionLen; + + if (!dataLen) { + if (holeHandler(stream, &inData, §ionLen, opaque) < 0) { + virStreamAbort(stream); + goto cleanup; + } + + if (!inData) { + if (sectionLen && virStreamSkip(stream, sectionLen) < 0) { + virStreamAbort(stream); + goto cleanup; + } + + if (sectionLen) + continue; + + /* Here inData == 0 and sectionLen == 0 as well. + * This means, we are in the trailing hole. Don't + * start new iteration but let virStreamSend() + * close the stream gracefully. */ + } else { + dataLen = sectionLen; + } + } + + if (want > dataLen) + want = dataLen; + + got = (handler)(stream, bytes, want, opaque); + if (got < 0) { + virStreamAbort(stream); + goto cleanup; + } + if (got == 0) + break; + while (offset < got) { + int done; + done = virStreamSend(stream, bytes + offset, got - offset); + if (done < 0) + goto cleanup; + offset += done; + dataLen -= done; + } + } + ret = 0; + + cleanup: + VIR_FREE(bytes); + + if (ret != 0) + virDispatchError(stream->conn); + + return ret; +}/** * virStreamRecvAll: * @stream: pointer to the stream object * @handler: sink callback for writing data to application diff --git a/src/libvirt_public.syms b/src/libvirt_public.syms index 2e7a9a7..575bb7f 100644 --- a/src/libvirt_public.syms +++ b/src/libvirt_public.syms @@ -738,6 +738,7 @@ LIBVIRT_1.3.5 { virStreamRecvFlags; virStreamSkip; virStreamSparseRecvAll; + virStreamSparseSendAll; } LIBVIRT_1.3.3; -- 2.8.3

This is just an internal API, that calls corresponding function in stream driver. This function will set @data=1 if the underlying file is in data section, or @data=0 if it is in a hole. At any rate, @length is set to number of bytes remaining in the section the file currently is. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/driver-stream.h | 6 ++++++ src/libvirt-stream.c | 43 +++++++++++++++++++++++++++++++++++++++++++ src/libvirt_internal.h | 3 +++ src/libvirt_private.syms | 1 + 4 files changed, 53 insertions(+) diff --git a/src/driver-stream.h b/src/driver-stream.h index e196b6d..5d84b9a 100644 --- a/src/driver-stream.h +++ b/src/driver-stream.h @@ -50,6 +50,11 @@ typedef int unsigned long long *length); typedef int +(*virDrvStreamInData)(virStreamPtr st, + int *data, + unsigned long long *length); + +typedef int (*virDrvStreamEventAddCallback)(virStreamPtr stream, int events, virStreamEventCallback cb, @@ -78,6 +83,7 @@ struct _virStreamDriver { virDrvStreamRecvFlags streamRecvFlags; virDrvStreamSkip streamSkip; virDrvStreamHoleSize streamHoleSize; + virDrvStreamInData streamInData; virDrvStreamEventAddCallback streamEventAddCallback; virDrvStreamEventUpdateCallback streamEventUpdateCallback; virDrvStreamEventRemoveCallback streamEventRemoveCallback; diff --git a/src/libvirt-stream.c b/src/libvirt-stream.c index 707c0ed..2632d55 100644 --- a/src/libvirt-stream.c +++ b/src/libvirt-stream.c @@ -473,6 +473,49 @@ virStreamHoleSize(virStreamPtr stream, /** + * virStreamInData: + * @stream: stream + * @data: are we in data or hole + * @length: length to next section + * + * This function checks the underlying stream (typically a file) + * to learn whether the current stream position lies within a + * data section or a hold. Upon return @data is set to a nonzero + * value if former is the case, or to zero if @stream is in a + * hole. Moreover, @length is updated to tell caller how many + * bytes can be read from @stream until current section changes + * (from data to a hole or vice versa). + * + * As a special case, there's an implicit hole at EOF. In this + * situation this function should set @data = false, @length = 0 + * and return 0. + * + * Returns 0 on success, + * -1 otherwise + */ +int +virStreamInData(virStreamPtr stream, + int *data, + unsigned long long *length) +{ + VIR_DEBUG("stream=%p, data=%p, length=%p", stream, data, length); + + /* No checks of arguments or error resetting. This is an + * internal function that just happen to live next to some + * public functions of ours. */ + + if (stream->driver->streamInData) { + int ret; + ret = (stream->driver->streamInData)(stream, data, length); + return ret; + } + + virReportUnsupportedError(); + return -1; +} + + +/** * virStreamSendAll: * @stream: pointer to the stream object * @handler: source callback for reading data from application diff --git a/src/libvirt_internal.h b/src/libvirt_internal.h index 96439d8..0e945aa 100644 --- a/src/libvirt_internal.h +++ b/src/libvirt_internal.h @@ -294,4 +294,7 @@ virTypedParameterValidateSet(virConnectPtr conn, virTypedParameterPtr params, int nparams); +int virStreamInData(virStreamPtr stream, + int *data, + unsigned long long *lengtht); #endif diff --git a/src/libvirt_private.syms b/src/libvirt_private.syms index fb5b419..48d4258 100644 --- a/src/libvirt_private.syms +++ b/src/libvirt_private.syms @@ -1017,6 +1017,7 @@ virStateCleanup; virStateInitialize; virStateReload; virStateStop; +virStreamInData; # locking/domain_lock.h -- 2.8.3

This has no real added value right now, but is going to be very helpful later. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/remote/remote_driver.c | 6 ++++-- src/rpc/gendispatch.pl | 2 +- src/rpc/virnetclientstream.c | 6 +++++- src/rpc/virnetclientstream.h | 3 ++- 4 files changed, 12 insertions(+), 5 deletions(-) diff --git a/src/remote/remote_driver.c b/src/remote/remote_driver.c index 883de13..82083c3 100644 --- a/src/remote/remote_driver.c +++ b/src/remote/remote_driver.c @@ -5614,7 +5614,8 @@ remoteDomainMigratePrepareTunnel3(virConnectPtr dconn, memset(&args, 0, sizeof(args)); memset(&ret, 0, sizeof(ret)); - if (!(netst = virNetClientStreamNew(priv->remoteProgram, + if (!(netst = virNetClientStreamNew(st, + priv->remoteProgram, REMOTE_PROC_DOMAIN_MIGRATE_PREPARE_TUNNEL3, priv->counter))) goto done; @@ -6538,7 +6539,8 @@ remoteDomainMigratePrepareTunnel3Params(virConnectPtr dconn, goto cleanup; } - if (!(netst = virNetClientStreamNew(priv->remoteProgram, + if (!(netst = virNetClientStreamNew(st, + priv->remoteProgram, REMOTE_PROC_DOMAIN_MIGRATE_PREPARE_TUNNEL3_PARAMS, priv->counter))) goto cleanup; diff --git a/src/rpc/gendispatch.pl b/src/rpc/gendispatch.pl index 5564b2e..a64920b 100755 --- a/src/rpc/gendispatch.pl +++ b/src/rpc/gendispatch.pl @@ -1701,7 +1701,7 @@ elsif ($mode eq "client") { if ($call->{streamflag} ne "none") { print "\n"; - print " if (!(netst = virNetClientStreamNew(priv->remoteProgram, $call->{constname}, priv->counter)))\n"; + print " if (!(netst = virNetClientStreamNew(st, priv->remoteProgram, $call->{constname}, priv->counter)))\n"; print " goto done;\n"; print "\n"; print " if (virNetClientAddStream(priv->client, netst) < 0) {\n"; diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index 34989a9..8920395 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -36,6 +36,8 @@ VIR_LOG_INIT("rpc.netclientstream"); struct _virNetClientStream { virObjectLockable parent; + virStreamPtr stream; /* Reverse pointer to parent stream */ + virNetClientProgramPtr prog; int proc; unsigned serial; @@ -133,7 +135,8 @@ virNetClientStreamEventTimer(int timer ATTRIBUTE_UNUSED, void *opaque) } -virNetClientStreamPtr virNetClientStreamNew(virNetClientProgramPtr prog, +virNetClientStreamPtr virNetClientStreamNew(virStreamPtr stream, + virNetClientProgramPtr prog, int proc, unsigned serial) { @@ -145,6 +148,7 @@ virNetClientStreamPtr virNetClientStreamNew(virNetClientProgramPtr prog, if (!(st = virObjectLockableNew(virNetClientStreamClass))) return NULL; + st->stream = stream; st->prog = prog; st->proc = proc; st->serial = serial; diff --git a/src/rpc/virnetclientstream.h b/src/rpc/virnetclientstream.h index a0d2be9..e278dab 100644 --- a/src/rpc/virnetclientstream.h +++ b/src/rpc/virnetclientstream.h @@ -32,7 +32,8 @@ typedef virNetClientStream *virNetClientStreamPtr; typedef void (*virNetClientStreamEventCallback)(virNetClientStreamPtr stream, int events, void *opaque); -virNetClientStreamPtr virNetClientStreamNew(virNetClientProgramPtr prog, +virNetClientStreamPtr virNetClientStreamNew(virStreamPtr stream, + virNetClientProgramPtr prog, int proc, unsigned serial); -- 2.8.3

Even though there's no way to make stream skippable right now, it is going to be soon. We need to track this info so that we don't send virStreamSkip to a client that did not want it or vice versa. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- daemon/remote.c | 2 +- daemon/stream.c | 6 +++++- daemon/stream.h | 3 ++- src/remote/remote_driver.c | 6 ++++-- src/rpc/gendispatch.pl | 4 ++-- src/rpc/virnetclientstream.c | 6 +++++- src/rpc/virnetclientstream.h | 3 ++- 7 files changed, 21 insertions(+), 9 deletions(-) diff --git a/daemon/remote.c b/daemon/remote.c index b2a420b..5e92e37 100644 --- a/daemon/remote.c +++ b/daemon/remote.c @@ -4957,7 +4957,7 @@ remoteDispatchDomainMigratePrepareTunnel3Params(virNetServerPtr server ATTRIBUTE if (!(st = virStreamNew(priv->conn, VIR_STREAM_NONBLOCK)) || !(stream = daemonCreateClientStream(client, st, remoteProgram, - &msg->header))) + &msg->header, false))) goto cleanup; if (virDomainMigratePrepareTunnel3Params(priv->conn, st, params, nparams, diff --git a/daemon/stream.c b/daemon/stream.c index bd0b5d2..e07a245 100644 --- a/daemon/stream.c +++ b/daemon/stream.c @@ -52,6 +52,8 @@ struct daemonClientStream { virNetMessagePtr rx; bool tx; + bool skippable; + daemonClientStreamPtr next; }; @@ -321,7 +323,8 @@ daemonClientStream * daemonCreateClientStream(virNetServerClientPtr client, virStreamPtr st, virNetServerProgramPtr prog, - virNetMessageHeaderPtr header) + virNetMessageHeaderPtr header, + bool skippable) { daemonClientStream *stream; daemonClientPrivatePtr priv = virNetServerClientGetPrivateData(client); @@ -339,6 +342,7 @@ daemonCreateClientStream(virNetServerClientPtr client, stream->serial = header->serial; stream->filterID = -1; stream->st = st; + stream->skippable = skippable; return stream; } diff --git a/daemon/stream.h b/daemon/stream.h index cf76e71..bf5dc24 100644 --- a/daemon/stream.h +++ b/daemon/stream.h @@ -30,7 +30,8 @@ daemonClientStream * daemonCreateClientStream(virNetServerClientPtr client, virStreamPtr st, virNetServerProgramPtr prog, - virNetMessageHeaderPtr hdr); + virNetMessageHeaderPtr hdr, + bool seekable); int daemonFreeClientStream(virNetServerClientPtr client, daemonClientStream *stream); diff --git a/src/remote/remote_driver.c b/src/remote/remote_driver.c index 82083c3..8141c2a 100644 --- a/src/remote/remote_driver.c +++ b/src/remote/remote_driver.c @@ -5617,7 +5617,8 @@ remoteDomainMigratePrepareTunnel3(virConnectPtr dconn, if (!(netst = virNetClientStreamNew(st, priv->remoteProgram, REMOTE_PROC_DOMAIN_MIGRATE_PREPARE_TUNNEL3, - priv->counter))) + priv->counter, + false))) goto done; if (virNetClientAddStream(priv->client, netst) < 0) { @@ -6542,7 +6543,8 @@ remoteDomainMigratePrepareTunnel3Params(virConnectPtr dconn, if (!(netst = virNetClientStreamNew(st, priv->remoteProgram, REMOTE_PROC_DOMAIN_MIGRATE_PREPARE_TUNNEL3_PARAMS, - priv->counter))) + priv->counter, + false))) goto cleanup; if (virNetClientAddStream(priv->client, netst) < 0) { diff --git a/src/rpc/gendispatch.pl b/src/rpc/gendispatch.pl index a64920b..1003338 100755 --- a/src/rpc/gendispatch.pl +++ b/src/rpc/gendispatch.pl @@ -1005,7 +1005,7 @@ elsif ($mode eq "server") { print " if (!(st = virStreamNew(priv->conn, VIR_STREAM_NONBLOCK)))\n"; print " goto cleanup;\n"; print "\n"; - print " if (!(stream = daemonCreateClientStream(client, st, remoteProgram, &msg->header)))\n"; + print " if (!(stream = daemonCreateClientStream(client, st, remoteProgram, &msg->header, false)))\n"; print " goto cleanup;\n"; print "\n"; } @@ -1701,7 +1701,7 @@ elsif ($mode eq "client") { if ($call->{streamflag} ne "none") { print "\n"; - print " if (!(netst = virNetClientStreamNew(st, priv->remoteProgram, $call->{constname}, priv->counter)))\n"; + print " if (!(netst = virNetClientStreamNew(st, priv->remoteProgram, $call->{constname}, priv->counter, false)))\n"; print " goto done;\n"; print "\n"; print " if (virNetClientAddStream(priv->client, netst) < 0) {\n"; diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index 8920395..fb83693 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -54,6 +54,8 @@ struct _virNetClientStream { virNetMessagePtr rx; bool incomingEOF; + bool skippable; /* User requested skippable stream */ + virNetClientStreamEventCallback cb; void *cbOpaque; virFreeCallback cbFree; @@ -138,7 +140,8 @@ virNetClientStreamEventTimer(int timer ATTRIBUTE_UNUSED, void *opaque) virNetClientStreamPtr virNetClientStreamNew(virStreamPtr stream, virNetClientProgramPtr prog, int proc, - unsigned serial) + unsigned serial, + bool skippable) { virNetClientStreamPtr st; @@ -152,6 +155,7 @@ virNetClientStreamPtr virNetClientStreamNew(virStreamPtr stream, st->prog = prog; st->proc = proc; st->serial = serial; + st->skippable = skippable; virObjectRef(prog); diff --git a/src/rpc/virnetclientstream.h b/src/rpc/virnetclientstream.h index e278dab..0a5aafd 100644 --- a/src/rpc/virnetclientstream.h +++ b/src/rpc/virnetclientstream.h @@ -35,7 +35,8 @@ typedef void (*virNetClientStreamEventCallback)(virNetClientStreamPtr stream, virNetClientStreamPtr virNetClientStreamNew(virStreamPtr stream, virNetClientProgramPtr prog, int proc, - unsigned serial); + unsigned serial, + bool seekable); bool virNetClientStreamRaiseError(virNetClientStreamPtr st); -- 2.8.3

This is going to be RPC representation for virStreamSkip. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/rpc/virnetprotocol.x | 4 ++++ src/virnetprotocol-structs | 3 +++ 2 files changed, 7 insertions(+) diff --git a/src/rpc/virnetprotocol.x b/src/rpc/virnetprotocol.x index 9ce33b0..3623588 100644 --- a/src/rpc/virnetprotocol.x +++ b/src/rpc/virnetprotocol.x @@ -236,3 +236,7 @@ struct virNetMessageError { int int2; virNetMessageNetwork net; /* unused */ }; + +struct virNetStreamSkip { + unsigned hyper length; +}; diff --git a/src/virnetprotocol-structs b/src/virnetprotocol-structs index af4526c..a8cc603 100644 --- a/src/virnetprotocol-structs +++ b/src/virnetprotocol-structs @@ -42,3 +42,6 @@ struct virNetMessageError { int int2; virNetMessageNetwork net; }; +struct virNetStreamSkip { + uint64_t length; +}; -- 2.8.3

This is a special type of stream packet, that is bidirectional and will contain information on how much bytes are both sides skipping in the stream. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- daemon/stream.c | 3 ++- src/rpc/virnetclient.c | 1 + src/rpc/virnetprotocol.x | 12 +++++++++++- src/virnetprotocol-structs | 1 + 4 files changed, 15 insertions(+), 2 deletions(-) diff --git a/daemon/stream.c b/daemon/stream.c index e07a245..82a99e4 100644 --- a/daemon/stream.c +++ b/daemon/stream.c @@ -287,7 +287,8 @@ daemonStreamFilter(virNetServerClientPtr client ATTRIBUTE_UNUSED, virMutexLock(&stream->priv->lock); - if (msg->header.type != VIR_NET_STREAM) + if (msg->header.type != VIR_NET_STREAM && + msg->header.type != VIR_NET_STREAM_SKIP) goto cleanup; if (!virNetServerProgramMatches(stream->prog, msg)) diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c index 3d59990..9c87ce0 100644 --- a/src/rpc/virnetclient.c +++ b/src/rpc/virnetclient.c @@ -1157,6 +1157,7 @@ virNetClientCallDispatch(virNetClientPtr client) return virNetClientCallDispatchMessage(client); case VIR_NET_STREAM: /* Stream protocol */ + case VIR_NET_STREAM_SKIP: /* Stream skip protocol */ return virNetClientCallDispatchStream(client); default: diff --git a/src/rpc/virnetprotocol.x b/src/rpc/virnetprotocol.x index 3623588..3530694 100644 --- a/src/rpc/virnetprotocol.x +++ b/src/rpc/virnetprotocol.x @@ -143,6 +143,14 @@ const VIR_NET_MESSAGE_NUM_FDS_MAX = 32; * * status == VIR_NET_ERROR * remote_error Error information * + * - type == VIR_NET_STREAM_SKIP + * * status == VIR_NET_CONTINUE + * byte[] skip data + * * status == VIR_NET_ERROR + * remote_error error information + * * status == VIR_NET_OK + * <empty> + * */ enum virNetMessageType { /* client -> server. args from a method call */ @@ -156,7 +164,9 @@ enum virNetMessageType { /* client -> server. args from a method call, with passed FDs */ VIR_NET_CALL_WITH_FDS = 4, /* server -> client. reply/error from a method call, with passed FDs */ - VIR_NET_REPLY_WITH_FDS = 5 + VIR_NET_REPLY_WITH_FDS = 5, + /* either direction, stream skip data packet */ + VIR_NET_STREAM_SKIP = 6 }; enum virNetMessageStatus { diff --git a/src/virnetprotocol-structs b/src/virnetprotocol-structs index a8cc603..c31683f 100644 --- a/src/virnetprotocol-structs +++ b/src/virnetprotocol-structs @@ -6,6 +6,7 @@ enum virNetMessageType { VIR_NET_STREAM = 3, VIR_NET_CALL_WITH_FDS = 4, VIR_NET_REPLY_WITH_FDS = 5, + VIR_NET_STREAM_SKIP = 6, }; enum virNetMessageStatus { VIR_NET_OK = 0, -- 2.8.3

Ideally, this would be generated, but to achieve that corresponding XDR definitions needed to go into a different .x file. But they belong just to the one that they are right now. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- tools/wireshark/src/packet-libvirt.c | 40 ++++++++++++++++++++++++++++++++++++ tools/wireshark/src/packet-libvirt.h | 2 ++ 2 files changed, 42 insertions(+) diff --git a/tools/wireshark/src/packet-libvirt.c b/tools/wireshark/src/packet-libvirt.c index aa1c323..aa49559 100644 --- a/tools/wireshark/src/packet-libvirt.c +++ b/tools/wireshark/src/packet-libvirt.c @@ -52,8 +52,11 @@ static int hf_libvirt_serial = -1; static int hf_libvirt_status = -1; static int hf_libvirt_stream = -1; static int hf_libvirt_num_of_fds = -1; +static int hf_libvirt_stream_skip_length = -1; +static int hf_libvirt_stream_skip = -1; int hf_libvirt_unknown = -1; static gint ett_libvirt = -1; +static gint ett_libvirt_stream_skip = -1; #define XDR_PRIMITIVE_DISSECTOR(xtype, ctype, ftype) \ static gboolean \ @@ -328,6 +331,28 @@ dissect_libvirt_payload_xdr_data(tvbuff_t *tvb, proto_tree *tree, gint payload_l dissect_libvirt_fds(tvb, start + payload_length, nfds); } +static gboolean +dissect_xdr_stream_skip(tvbuff_t *tvb, proto_tree *tree, XDR *xdrs, int hf) +{ + goffset start = VIR_HEADER_LEN; + proto_item *ti; + + ti = proto_tree_add_item(tree, hf_libvirt_stream_skip, tvb, start, -1, ENC_NA); + tree = proto_item_add_subtree(ti, ett_libvirt_stream_skip); + + hf = hf_libvirt_stream_skip_length; + if (!dissect_xdr_u_hyper(tvb, tree, xdrs, hf)) return FALSE; + proto_item_set_len(ti, xdr_getpos(xdrs) - start); + return TRUE; +} + + +static void +dissect_libvirt_stream_skip(tvbuff_t *tvb, proto_tree *tree, gint payload_length, guint32 status) +{ + proto_tree_add_item(tree, hf_libvirt_stream_skip_length, tvb, VIR_HEADER_LEN, -1, ENC_NA); +} + static void dissect_libvirt_payload(tvbuff_t *tvb, proto_tree *tree, guint32 prog, guint32 proc, guint32 type, guint32 status) @@ -348,6 +373,8 @@ dissect_libvirt_payload(tvbuff_t *tvb, proto_tree *tree, dissect_libvirt_payload_xdr_data(tvb, tree, payload_length, status, VIR_ERROR_MESSAGE_DISSECTOR); } else if (type == VIR_NET_STREAM) { /* implicitly, status == VIR_NET_CONTINUE */ dissect_libvirt_stream(tvb, tree, payload_length); + } else if (type == VIR_NET_STREAM_SKIP) { + dissect_libvirt_payload_xdr_data(tvb, tree, payload_length, status, dissect_xdr_stream_skip); } else { goto unknown; } @@ -517,6 +544,18 @@ proto_register_libvirt(void) NULL, 0x0, NULL, HFILL} }, + { &hf_libvirt_stream_skip, + { "stream_skip", "libvirt.stream_skip", + FT_BYTES, BASE_NONE, + NULL, 0x0, + NULL, HFILL} + }, + { &hf_libvirt_stream_skip_length, + { "length", "libvirt.stream_skip.length", + FT_UINT64, BASE_DEC, + NULL, 0x0, + NULL, HFILL} + }, { &hf_libvirt_unknown, { "unknown", "libvirt.unknown", FT_BYTES, BASE_NONE, @@ -527,6 +566,7 @@ proto_register_libvirt(void) static gint *ett[] = { VIR_DYNAMIC_ETTSET + &ett_libvirt_stream_skip, &ett_libvirt }; diff --git a/tools/wireshark/src/packet-libvirt.h b/tools/wireshark/src/packet-libvirt.h index 5f99fdf..006aa6d 100644 --- a/tools/wireshark/src/packet-libvirt.h +++ b/tools/wireshark/src/packet-libvirt.h @@ -53,6 +53,7 @@ enum vir_net_message_type { VIR_NET_STREAM = 3, VIR_NET_CALL_WITH_FDS = 4, VIR_NET_REPLY_WITH_FDS = 5, + VIR_NET_STREAM_SKIP = 6, }; enum vir_net_message_status { @@ -76,6 +77,7 @@ static const value_string type_strings[] = { { VIR_NET_STREAM, "STREAM" }, { VIR_NET_CALL_WITH_FDS, "CALL_WITH_FDS" }, { VIR_NET_REPLY_WITH_FDS, "REPLY_WITH_FDS" }, + { VIR_NET_STREAM_SKIP, "STREAM_SKIP" }, { -1, NULL } }; -- 2.8.3

This is just a helper function that takes in a length value, encodes it into XDR and sends to client. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/libvirt_remote.syms | 1 + src/rpc/virnetserverprogram.c | 33 +++++++++++++++++++++++++++++++++ src/rpc/virnetserverprogram.h | 7 +++++++ 3 files changed, 41 insertions(+) diff --git a/src/libvirt_remote.syms b/src/libvirt_remote.syms index 1a88fff..b97b9b1 100644 --- a/src/libvirt_remote.syms +++ b/src/libvirt_remote.syms @@ -177,6 +177,7 @@ virNetServerProgramNew; virNetServerProgramSendReplyError; virNetServerProgramSendStreamData; virNetServerProgramSendStreamError; +virNetServerProgramSendStreamSkip; virNetServerProgramUnknownError; diff --git a/src/rpc/virnetserverprogram.c b/src/rpc/virnetserverprogram.c index d1597f4..6d84056 100644 --- a/src/rpc/virnetserverprogram.c +++ b/src/rpc/virnetserverprogram.c @@ -548,6 +548,39 @@ int virNetServerProgramSendStreamData(virNetServerProgramPtr prog, } +int virNetServerProgramSendStreamSkip(virNetServerProgramPtr prog, + virNetServerClientPtr client, + virNetMessagePtr msg, + int procedure, + unsigned int serial, + unsigned long long length) +{ + virNetStreamSkip data; + + VIR_DEBUG("client=%p msg=%p length=%llu", client, msg, length); + + memset(&data, 0, sizeof(data)); + data.length = length; + + msg->header.prog = prog->program; + msg->header.vers = prog->version; + msg->header.proc = procedure; + msg->header.type = VIR_NET_STREAM_SKIP; + msg->header.serial = serial; + msg->header.status = VIR_NET_CONTINUE; + + if (virNetMessageEncodeHeader(msg) < 0) + return -1; + + if (virNetMessageEncodePayload(msg, + (xdrproc_t) xdr_virNetStreamSkip, + &data) < 0) + return -1; + + return virNetServerClientSendMessage(client, msg); +} + + void virNetServerProgramDispose(void *obj ATTRIBUTE_UNUSED) { } diff --git a/src/rpc/virnetserverprogram.h b/src/rpc/virnetserverprogram.h index 531fca0..eba2168 100644 --- a/src/rpc/virnetserverprogram.h +++ b/src/rpc/virnetserverprogram.h @@ -104,4 +104,11 @@ int virNetServerProgramSendStreamData(virNetServerProgramPtr prog, const char *data, size_t len); +int virNetServerProgramSendStreamSkip(virNetServerProgramPtr prog, + virNetServerClientPtr client, + virNetMessagePtr msg, + int procedure, + unsigned int serial, + unsigned long long length); + #endif /* __VIR_NET_SERVER_PROGRAM_H__ */ -- 2.8.3

While the previous commit implemented a helper for sending a STREAM_SKIP packet for daemon, this is a client's counterpart. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/libvirt_remote.syms | 1 + src/rpc/virnetclientstream.c | 52 ++++++++++++++++++++++++++++++++++++++++++++ src/rpc/virnetclientstream.h | 4 ++++ 3 files changed, 57 insertions(+) diff --git a/src/libvirt_remote.syms b/src/libvirt_remote.syms index b97b9b1..16fde79 100644 --- a/src/libvirt_remote.syms +++ b/src/libvirt_remote.syms @@ -54,6 +54,7 @@ virNetClientStreamQueuePacket; virNetClientStreamRaiseError; virNetClientStreamRecvPacket; virNetClientStreamSendPacket; +virNetClientStreamSendSkip; virNetClientStreamSetError; diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index fb83693..d7d2b0e 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -431,6 +431,58 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, } +int +virNetClientStreamSendSkip(virNetClientStreamPtr st, + virNetClientPtr client, + unsigned long long length) +{ + virNetMessagePtr msg = NULL; + virNetStreamSkip data; + int ret = -1; + + VIR_DEBUG("st=%p length=%llu", st, length); + + if (!st->skippable) { + virReportError(VIR_ERR_OPERATION_INVALID, "%s", + _("Skipping is not supported with this stream")); + return -1; + } + + memset(&data, 0, sizeof(data)); + data.length = length; + + if (!(msg = virNetMessageNew(false))) + return -1; + + virObjectLock(st); + + msg->header.prog = virNetClientProgramGetProgram(st->prog); + msg->header.vers = virNetClientProgramGetVersion(st->prog); + msg->header.status = VIR_NET_CONTINUE; + msg->header.type = VIR_NET_STREAM_SKIP; + msg->header.serial = st->serial; + msg->header.proc = st->proc; + + virObjectUnlock(st); + + if (virNetMessageEncodeHeader(msg) < 0) + goto cleanup; + + if (virNetMessageEncodePayload(msg, + (xdrproc_t) xdr_virNetStreamSkip, + &data) < 0) + goto cleanup; + + if (virNetClientSendNoReply(client, msg) < 0) + goto cleanup; + + ret = 0; + cleanup: + virNetMessageFree(msg); + return ret; +} + + int virNetClientStreamEventAddCallback(virNetClientStreamPtr st, int events, virNetClientStreamEventCallback cb, diff --git a/src/rpc/virnetclientstream.h b/src/rpc/virnetclientstream.h index 0a5aafd..a648b7c 100644 --- a/src/rpc/virnetclientstream.h +++ b/src/rpc/virnetclientstream.h @@ -61,6 +61,10 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, size_t nbytes, bool nonblock); +int virNetClientStreamSendSkip(virNetClientStreamPtr st, + virNetClientPtr client, + unsigned long long length); + int virNetClientStreamEventAddCallback(virNetClientStreamPtr st, int events, virNetClientStreamEventCallback cb, -- 2.8.3

Basically, whenever the new type of stream packet arrives to the daemon call this function that decodes it and calls virStreamSkipCallback(). Otherwise a regular data stream packet has arrived and therefore continue its processing. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- daemon/stream.c | 81 +++++++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 70 insertions(+), 11 deletions(-) diff --git a/daemon/stream.c b/daemon/stream.c index 82a99e4..f9c0ba1 100644 --- a/daemon/stream.c +++ b/daemon/stream.c @@ -29,6 +29,7 @@ #include "virlog.h" #include "virnetserverclient.h" #include "virerror.h" +#include "libvirt_internal.h" #define VIR_FROM_THIS VIR_FROM_STREAMS @@ -653,6 +654,52 @@ daemonStreamHandleAbort(virNetServerClientPtr client, } +static int +daemonStreamHandleSkip(virNetServerClientPtr client, + daemonClientStream *stream, + virNetMessagePtr msg) +{ + int ret; + virNetStreamSkip data; + + VIR_DEBUG("client=%p, stream=%p, proc=%d, serial=%u", + client, stream, msg->header.proc, msg->header.serial); + + /* Let's check if client plays nicely and advertised usage of + * sparse stream upfront. */ + if (!stream->skippable) { + virReportError(VIR_ERR_RPC, "%s", + _("Unexpected stream skip")); + return -1; + } + + if (virNetMessageDecodePayload(msg, + (xdrproc_t) xdr_virNetStreamSkip, + &data) < 0) + return -1; + + ret = virStreamSkip(stream->st, data.length); + + if (ret < 0) { + virNetMessageError rerr; + + memset(&rerr, 0, sizeof(rerr)); + + VIR_INFO("Stream skip failed"); + stream->closed = true; + virStreamEventRemoveCallback(stream->st); + virStreamAbort(stream->st); + + return virNetServerProgramSendReplyError(stream->prog, + client, + msg, + &rerr, + &msg->header); + } + + return 0; +} + /* * Called when the stream is signalled has being able to accept @@ -671,19 +718,31 @@ daemonStreamHandleWrite(virNetServerClientPtr client, virNetMessagePtr msg = stream->rx; int ret; - switch (msg->header.status) { - case VIR_NET_OK: - ret = daemonStreamHandleFinish(client, stream, msg); - break; + if (msg->header.type == VIR_NET_STREAM_SKIP) { + /* Handle special case when client sent us skip. + * Otherwise just carry on with processing stream + * data. */ + ret = daemonStreamHandleSkip(client, stream, msg); + } else if (msg->header.type == VIR_NET_STREAM) { + switch (msg->header.status) { + case VIR_NET_OK: + ret = daemonStreamHandleFinish(client, stream, msg); + break; - case VIR_NET_CONTINUE: - ret = daemonStreamHandleWriteData(client, stream, msg); - break; + case VIR_NET_CONTINUE: + ret = daemonStreamHandleWriteData(client, stream, msg); + break; - case VIR_NET_ERROR: - default: - ret = daemonStreamHandleAbort(client, stream, msg); - break; + case VIR_NET_ERROR: + default: + ret = daemonStreamHandleAbort(client, stream, msg); + break; + } + } else { + virReportError(VIR_ERR_RPC, + _("Unexpected message type: %d"), + msg->header.type); + ret = -1; } if (ret > 0) -- 2.8.3

This is a function that handles an incoming STREAM_SKIP packet. Even though it is not wired up yet, it will be soon. At the beginning do couple of checks whether server plays nicely and sent us a STREAM_SKIP packed only after we've enabled sparse streams. Then decodes the message payload to see how big the hole is and stores it in passed @length argument. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/rpc/virnetclientstream.c | 63 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 63 insertions(+) diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index d7d2b0e..0e982ba 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -28,6 +28,7 @@ #include "virerror.h" #include "virlog.h" #include "virthread.h" +#include "libvirt_internal.h" #define VIR_FROM_THIS VIR_FROM_RPC @@ -55,6 +56,7 @@ struct _virNetClientStream { bool incomingEOF; bool skippable; /* User requested skippable stream */ + unsigned long long skipLength; /* Size of incoming hole in stream. */ virNetClientStreamEventCallback cb; void *cbOpaque; @@ -358,6 +360,67 @@ int virNetClientStreamSendPacket(virNetClientStreamPtr st, return -1; } + +static int ATTRIBUTE_UNUSED +virNetClientStreamHandleSkip(virNetClientPtr client, + virNetClientStreamPtr st) +{ + virNetMessagePtr msg; + virNetStreamSkip data; + int ret = -1; + + VIR_DEBUG("client=%p st=%p", client, st); + + msg = st->rx; + memset(&data, 0, sizeof(data)); + + /* We should not be called unless there's VIR_NET_STREAM_SKIP + * message at the head of the list. But doesn't hurt to check */ + if (!msg) { + virReportError(VIR_ERR_INTERNAL_ERROR, "%s", + _("No message in the queue")); + goto cleanup; + } + + if (msg->header.type != VIR_NET_STREAM_SKIP) { + virReportError(VIR_ERR_INTERNAL_ERROR, + _("Invalid message prog=%d type=%d serial=%u proc=%d"), + msg->header.prog, + msg->header.type, + msg->header.serial, + msg->header.proc); + goto cleanup; + } + + /* Server should not send us VIR_NET_STREAM_SKIP unless we + * have requested so. But does not hurt to check ... */ + if (!st->skippable) { + virReportError(VIR_ERR_RPC, "%s", + _("Unexpected stream skip")); + goto cleanup; + } + + if (virNetMessageDecodePayload(msg, + (xdrproc_t) xdr_virNetStreamSkip, + &data) < 0) { + virReportError(VIR_ERR_INTERNAL_ERROR, "%s", + _("Malformed stream skip packet")); + goto cleanup; + } + + virNetMessageQueueServe(&st->rx); + virNetMessageFree(msg); + st->skipLength += data.length; + + ret = 0; + cleanup: + if (ret < 0) { + /* Abort stream? */ + } + return ret; +} + + int virNetClientStreamRecvPacket(virNetClientStreamPtr st, virNetClientPtr client, char *data, -- 2.8.3

Now that we have RPC wrappers over VIR_NET_STREAM_SKIP we can start wiring them up. This commit wires up situation when a client wants to send a hole to daemon. To keep stream offsets synchronous, upon successful call on the daemon skip the same hole in local part of the stream. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/remote/remote_driver.c | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/src/remote/remote_driver.c b/src/remote/remote_driver.c index 8141c2a..f0e85ce 100644 --- a/src/remote/remote_driver.c +++ b/src/remote/remote_driver.c @@ -5127,6 +5127,34 @@ remoteStreamRecv(virStreamPtr st, return remoteStreamRecv(st, data, nbytes); } + +static int +remoteStreamSkip(virStreamPtr st, + unsigned long long length) +{ + VIR_DEBUG("st=%p length=%llu", st, length); + struct private_data *priv = st->conn->privateData; + virNetClientStreamPtr privst = st->privateData; + int rv; + + if (virNetClientStreamRaiseError(privst)) + return -1; + + remoteDriverLock(priv); + priv->localUses++; + remoteDriverUnlock(priv); + + rv = virNetClientStreamSendSkip(privst, + priv->client, + length); + + remoteDriverLock(priv); + priv->localUses--; + remoteDriverUnlock(priv); + return rv; +} + + struct remoteStreamCallbackData { virStreamPtr st; virStreamEventCallback cb; @@ -5301,6 +5329,7 @@ static virStreamDriver remoteStreamDrv = { .streamRecv = remoteStreamRecv, .streamRecvFlags = remoteStreamRecvFlags, .streamSend = remoteStreamSend, + .streamSkip = remoteStreamSkip, .streamFinish = remoteStreamFinish, .streamAbort = remoteStreamAbort, .streamEventAddCallback = remoteStreamEventAddCallback, -- 2.8.3

Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/remote/remote_driver.c | 3 ++- src/rpc/virnetclientstream.c | 10 +++++++--- src/rpc/virnetclientstream.h | 3 ++- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/src/remote/remote_driver.c b/src/remote/remote_driver.c index f0e85ce..824fe03 100644 --- a/src/remote/remote_driver.c +++ b/src/remote/remote_driver.c @@ -5109,7 +5109,8 @@ remoteStreamRecvFlags(virStreamPtr st, priv->client, data, nbytes, - (st->flags & VIR_STREAM_NONBLOCK)); + (st->flags & VIR_STREAM_NONBLOCK), + flags); VIR_DEBUG("Done %d", rv); diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index 0e982ba..c8c89ec 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -425,13 +425,17 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, virNetClientPtr client, char *data, size_t nbytes, - bool nonblock) + bool nonblock, + unsigned int flags) { int rv = -1; size_t want; - VIR_DEBUG("st=%p client=%p data=%p nbytes=%zu nonblock=%d", - st, client, data, nbytes, nonblock); + VIR_DEBUG("st=%p client=%p data=%p nbytes=%zu nonblock=%d flags=%x", + st, client, data, nbytes, nonblock, flags); + + virCheckFlags(0, -1); + virObjectLock(st); if (!st->rx && !st->incomingEOF) { virNetMessagePtr msg; diff --git a/src/rpc/virnetclientstream.h b/src/rpc/virnetclientstream.h index a648b7c..2835066 100644 --- a/src/rpc/virnetclientstream.h +++ b/src/rpc/virnetclientstream.h @@ -59,7 +59,8 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, virNetClientPtr client, char *data, size_t nbytes, - bool nonblock); + bool nonblock, + unsigned int flags); int virNetClientStreamSendSkip(virNetClientStreamPtr st, virNetClientPtr client, -- 2.8.3

This function will fetch previously processed stream holes and return their sum. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/libvirt_remote.syms | 1 + src/rpc/virnetclientstream.c | 15 +++++++++++++++ src/rpc/virnetclientstream.h | 4 ++++ 3 files changed, 20 insertions(+) diff --git a/src/libvirt_remote.syms b/src/libvirt_remote.syms index 16fde79..d843fc6 100644 --- a/src/libvirt_remote.syms +++ b/src/libvirt_remote.syms @@ -48,6 +48,7 @@ virNetClientStreamEOF; virNetClientStreamEventAddCallback; virNetClientStreamEventRemoveCallback; virNetClientStreamEventUpdateCallback; +virNetClientStreamHoleSize; virNetClientStreamMatches; virNetClientStreamNew; virNetClientStreamQueuePacket; diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index c8c89ec..37ce257 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -550,6 +550,21 @@ virNetClientStreamSendSkip(virNetClientStreamPtr st, } +int virNetClientStreamHoleSize(virNetClientPtr client ATTRIBUTE_UNUSED, + virNetClientStreamPtr st, + unsigned long long *length) +{ + int ret = st->skipLength; + + if (length) { + *length = st->skipLength; + st->skipLength = 0; + } + + return ret; +} + + int virNetClientStreamEventAddCallback(virNetClientStreamPtr st, int events, virNetClientStreamEventCallback cb, diff --git a/src/rpc/virnetclientstream.h b/src/rpc/virnetclientstream.h index 2835066..9caa091 100644 --- a/src/rpc/virnetclientstream.h +++ b/src/rpc/virnetclientstream.h @@ -66,6 +66,10 @@ int virNetClientStreamSendSkip(virNetClientStreamPtr st, virNetClientPtr client, unsigned long long length); +int virNetClientStreamHoleSize(virNetClientPtr client, + virNetClientStreamPtr st, + unsigned long long *length); + int virNetClientStreamEventAddCallback(virNetClientStreamPtr st, int events, virNetClientStreamEventCallback cb, -- 2.8.3

Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/remote/remote_driver.c | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/src/remote/remote_driver.c b/src/remote/remote_driver.c index 824fe03..5507713 100644 --- a/src/remote/remote_driver.c +++ b/src/remote/remote_driver.c @@ -5156,6 +5156,29 @@ remoteStreamSkip(virStreamPtr st, } +static int +remoteStreamHoleSize(virStreamPtr st, + unsigned long long *length) +{ + VIR_DEBUG("st=%p length=%p", st, length); + struct private_data *priv = st->conn->privateData; + virNetClientStreamPtr privst = st->privateData; + int rv; + + if (virNetClientStreamRaiseError(privst)) + return -1; + + remoteDriverLock(priv); + priv->localUses++; + remoteDriverUnlock(priv); + + rv = virNetClientStreamHoleSize(priv->client, privst, length); + + remoteDriverLock(priv); + priv->localUses--; + remoteDriverUnlock(priv); + return rv; +} struct remoteStreamCallbackData { virStreamPtr st; virStreamEventCallback cb; @@ -5331,6 +5354,7 @@ static virStreamDriver remoteStreamDrv = { .streamRecvFlags = remoteStreamRecvFlags, .streamSend = remoteStreamSend, .streamSkip = remoteStreamSkip, + .streamHoleSize = remoteStreamHoleSize, .streamFinish = remoteStreamFinish, .streamAbort = remoteStreamAbort, .streamEventAddCallback = remoteStreamEventAddCallback, -- 2.8.3

Whenever server sends a client stream packet (either regular with actual data or stream skip one) it is queued on @st->rx. So the list is a mixture of both types of stream packets. So now that we have all the helpers needed we can wire their processing up. But since virNetClientStreamRecvPacket doesn't support VIR_STREAM_RECV_STOP_AT_HOLE flag yet, let's turn all received skips into zeroes repeating requested times. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/rpc/virnetclientstream.c | 44 ++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 42 insertions(+), 2 deletions(-) diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index 37ce257..f63072e 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -298,6 +298,8 @@ int virNetClientStreamQueuePacket(virNetClientStreamPtr st, virObjectLock(st); + /* Don't distinguish VIR_NET_STREAM and VIR_NET_STREAM_SKIP + * here just yet. We want in order processing! */ virNetMessageQueuePush(&st->rx, tmp_msg); virNetClientStreamEventTimerUpdate(st); @@ -361,7 +363,7 @@ int virNetClientStreamSendPacket(virNetClientStreamPtr st, } -static int ATTRIBUTE_UNUSED +static int virNetClientStreamHandleSkip(virNetClientPtr client, virNetClientStreamPtr st) { @@ -437,6 +439,8 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, virCheckFlags(0, -1); virObjectLock(st); + + reread: if (!st->rx && !st->incomingEOF) { virNetMessagePtr msg; int ret; @@ -468,8 +472,44 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, } VIR_DEBUG("After IO rx=%p", st->rx); + + while (st->rx && + st->rx->header.type == VIR_NET_STREAM_SKIP) { + /* Handle skip sent to us by server. */ + + if (virNetClientStreamHandleSkip(client, st) < 0) + goto cleanup; + } + + if (!st->rx && !st->incomingEOF && !st->skipLength) { + if (nonblock) { + VIR_DEBUG("Non-blocking mode and no data available"); + rv = -2; + goto cleanup; + } + + /* We have consumed all packets from incoming queue but those + * were only skip packets, no data. Read the stream again. */ + goto reread; + } + want = nbytes; - while (want && st->rx) { + + if (st->skipLength) { + /* Pretend skipLength zeroes was read from stream. */ + size_t len = want; + + if (len > st->skipLength) + len = st->skipLength; + + memset(data, 0, len); + st->skipLength -= len; + want -= len; + } + + while (want && + st->rx && + st->rx->header.type == VIR_NET_STREAM) { virNetMessagePtr msg = st->rx; size_t len = want; -- 2.8.3

Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/remote/remote_driver.c | 2 +- src/rpc/virnetclientstream.c | 11 ++++++++++- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/src/remote/remote_driver.c b/src/remote/remote_driver.c index 5507713..72f8f21 100644 --- a/src/remote/remote_driver.c +++ b/src/remote/remote_driver.c @@ -5096,7 +5096,7 @@ remoteStreamRecvFlags(virStreamPtr st, virNetClientStreamPtr privst = st->privateData; int rv; - virCheckFlags(0, -1); + virCheckFlags(VIR_STREAM_RECV_STOP_AT_HOLE, -1); if (virNetClientStreamRaiseError(privst)) return -1; diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index f63072e..e8ef3a3 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -436,7 +436,7 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, VIR_DEBUG("st=%p client=%p data=%p nbytes=%zu nonblock=%d flags=%x", st, client, data, nbytes, nonblock, flags); - virCheckFlags(0, -1); + virCheckFlags(VIR_STREAM_RECV_STOP_AT_HOLE, -1); virObjectLock(st); @@ -499,6 +499,15 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, /* Pretend skipLength zeroes was read from stream. */ size_t len = want; + /* Yes, pretend unless we are asked not to. */ + if (flags & VIR_STREAM_RECV_STOP_AT_HOLE) { + virReportError(VIR_ERR_INTERNAL_ERROR, "%s", + _("Stream is in a hole")); + rv = -3; + goto cleanup; + } + + if (len > st->skipLength) len = st->skipLength; -- 2.8.3

Whenever client is able to receive some data from stream daemonStreamHandleRead is called. But now the behaviour of this function needs to be changed a bit. Previously it just read data from underlying file (of chardev or whatever) and sent those through the stream to client. This model will not work any longer because it does not differentiate whether underlying file is in data or hole section. Therefore, at the beginning of this function add code that checks this situation and acts accordingly. So after the this, when wanting to send some data we always check whether we are not in a hole and if so, skip it an inform client about its size. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- daemon/stream.c | 51 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/daemon/stream.c b/daemon/stream.c index f9c0ba1..a8ad8f7 100644 --- a/daemon/stream.c +++ b/daemon/stream.c @@ -797,6 +797,8 @@ daemonStreamHandleRead(virNetServerClientPtr client, size_t bufferLen = VIR_NET_MESSAGE_LEGACY_PAYLOAD_MAX; int ret = -1; int rv; + int inData = 0; + unsigned long long length; VIR_DEBUG("client=%p, stream=%p tx=%d closed=%d", client, stream, stream->tx, stream->closed); @@ -821,6 +823,54 @@ daemonStreamHandleRead(virNetServerClientPtr client, if (!(msg = virNetMessageNew(false))) goto cleanup; + if (stream->skippable) { + /* Handle skip. We want to send some data to the client. But we might + * be in a hole. Seek to next data. But if we are in data already, just + * carry on. */ + + rv = virStreamInData(stream->st, &inData, &length); + VIR_DEBUG("rv=%d inData=%d length=%llu", rv, inData, length); + + if (rv < 0) { + if (virNetServerProgramSendStreamError(remoteProgram, + client, + msg, + &rerr, + stream->procedure, + stream->serial) < 0) + goto cleanup; + msg = NULL; + + /* We're done with this call */ + goto done; + } else { + if (!inData && length) { + stream->tx = false; + msg->cb = daemonStreamMessageFinished; + msg->opaque = stream; + stream->refs++; + if (virNetServerProgramSendStreamSkip(remoteProgram, + client, + msg, + stream->procedure, + stream->serial, + length) < 0) + goto cleanup; + + msg = NULL; + + /* We have successfully sent stream skip to the other side. + * To keep streams in sync seek locally too. */ + virStreamSkip(stream->st, length); + /* We're done with this call */ + goto done; + } + } + + if (length < bufferLen) + bufferLen = length; + } + rv = virStreamRecv(stream->st, buffer, bufferLen); if (rv == -2) { /* Should never get this, since we're only called when we know @@ -852,6 +902,7 @@ daemonStreamHandleRead(virNetServerClientPtr client, msg = NULL; } + done: ret = 0; cleanup: VIR_FREE(buffer); -- 2.8.3

Implement virStreamSkip and virStreamInData callbacks. These callbacks do no magic, just skip a hole or detect whether we are in a data section of a file or in a hole and how much bytes can we read until section changes. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/fdstream.c | 111 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 111 insertions(+) diff --git a/src/fdstream.c b/src/fdstream.c index 3e92577..18a2bc0 100644 --- a/src/fdstream.c +++ b/src/fdstream.c @@ -482,11 +482,122 @@ static int virFDStreamRead(virStreamPtr st, char *bytes, size_t nbytes) } +static int +virFDStreamSkip(virStreamPtr st, + unsigned long long length) +{ + struct virFDStreamData *fdst = st->privateData; + off_t off; + int ret = -1; + + virMutexLock(&fdst->lock); + if (fdst->length) { + if (fdst->offset + length > fdst->length) { + virReportSystemError(ENOSPC, "%s", + _("cannot write to stream")); + goto cleanup; + } + fdst->offset += length; + } + + off = lseek(fdst->fd, length, SEEK_CUR); + if (off == (off_t) -1) { + virReportSystemError(errno, "%s", + _("unable to seek")); + goto cleanup; + } + ret = 0; + cleanup: + virMutexUnlock(&fdst->lock); + return ret; +} + + +static int +virFDStreamInData(virStreamPtr st, + int *inData, + unsigned long long *length) +{ + struct virFDStreamData *fdst = st->privateData; + int fd; + off_t cur, data, hole; + int ret = -1; + + virMutexLock(&fdst->lock); + + fd = fdst->fd; + + /* Get current position */ + cur = lseek(fd, 0, SEEK_CUR); + if (cur == (off_t) -1) { + virReportSystemError(errno, "%s", + _("Unable to get current position in stream")); + goto cleanup; + } + + /* Now try to get data and hole offsets */ + data = lseek(fd, cur, SEEK_DATA); + + /* There are four options: + * 1) data == cur; @cur is in data + * 2) data > cur; @cur is in a hole, next data at @data + * 3) data < 0, errno = ENXIO; either @cur is in trailing hole, or @cur is beyond EOF. + * 4) data < 0, errno != ENXIO; we learned nothing + */ + + if (data == (off_t) -1) { + /* cases 3 and 4 */ + if (errno != ENXIO) { + virReportSystemError(errno, "%s", + _("Unable to seek to data")); + goto cleanup; + } + *inData = 0; + *length = 0; + } else if (data > cur) { + /* case 2 */ + *inData = 0; + *length = data - cur; + } else { + /* case 1 */ + *inData = 1; + + /* We don't know where does the next hole start. Let's + * find out. Here we get the same 4 possibilities as + * described above.*/ + hole = lseek(fd, data, SEEK_HOLE); + if (hole == (off_t) -1 || hole == data) { + /* cases 1, 3 and 4 */ + /* Wait a second. The reason why we are here is + * because we are in data. But at the same time we + * are in a trailing hole? Wut!? Do the best what we + * can do here. */ + virReportSystemError(errno, "%s", + _("unable to seek to hole")); + goto cleanup; + } else { + /* case 2 */ + *length = (hole - data); + } + } + + ret = 0; + cleanup: + /* At any rate, reposition back to where we started. */ + if (cur != (off_t) -1) + ignore_value(lseek(fd, cur, SEEK_SET)); + virMutexUnlock(&fdst->lock); + return ret; +} + + static virStreamDriver virFDStreamDrv = { .streamSend = virFDStreamWrite, .streamRecv = virFDStreamRead, .streamFinish = virFDStreamClose, .streamAbort = virFDStreamAbort, + .streamSkip = virFDStreamSkip, + .streamInData = virFDStreamInData, .streamEventAddCallback = virFDStreamAddCallback, .streamEventUpdateCallback = virFDStreamUpdateCallback, .streamEventRemoveCallback = virFDStreamRemoveCallback -- 2.8.3

Now, not all APIs are going to support sparse streams. To some it makes no sense at all, e.g. virDomainOpenConsole() or virDomainOpenChannel(). To others, we will need a special flag to indicate that client wants to enable sparse streams. Instead of having to write RPC dispatchers by hand we can just annotate in our .x files that a certain flag to certain RPC call enables this feature. For instance: /** * @generate: both * @readstream: 1 * @sparseflag: VIR_SPARSE_STREAM * @acl: storage_vol:data_read */ REMOTE_PROC_DOMAIN_SOME_API = XXX, Therefore, whenever client calls virDomainSomeAPI(.., VIR_SPARSE_STREAM); daemon will mark that down and send stream skips when possible. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/rpc/gendispatch.pl | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/src/rpc/gendispatch.pl b/src/rpc/gendispatch.pl index 1003338..6675d22 100755 --- a/src/rpc/gendispatch.pl +++ b/src/rpc/gendispatch.pl @@ -281,6 +281,13 @@ while (<PROTOCOL>) { $calls{$name}->{streamflag} = "none"; } + if (exists $opts{sparseflag}) { + die "\@sparseflag requires stream" unless $calls{$name}->{streamflag} ne "none"; + $calls{$name}->{sparseflag} = $opts{sparseflag}; + } else { + $calls{$name}->{sparseflag} = "none"; + } + $calls{$name}->{acl} = $opts{acl}; $calls{$name}->{aclfilter} = $opts{aclfilter}; @@ -963,6 +970,11 @@ elsif ($mode eq "server") { if ($call->{streamflag} ne "none") { print " virStreamPtr st = NULL;\n"; print " daemonClientStreamPtr stream = NULL;\n"; + if ($call->{sparseflag} ne "none") { + print " const bool sparse = args->flags & $call->{sparseflag};\n" + } else { + print " const bool sparse = false;\n"; + } } print "\n"; @@ -1005,7 +1017,7 @@ elsif ($mode eq "server") { print " if (!(st = virStreamNew(priv->conn, VIR_STREAM_NONBLOCK)))\n"; print " goto cleanup;\n"; print "\n"; - print " if (!(stream = daemonCreateClientStream(client, st, remoteProgram, &msg->header, false)))\n"; + print " if (!(stream = daemonCreateClientStream(client, st, remoteProgram, &msg->header, sparse)))\n"; print " goto cleanup;\n"; print "\n"; } @@ -1690,6 +1702,11 @@ elsif ($mode eq "client") { if ($call->{streamflag} ne "none") { print " virNetClientStreamPtr netst = NULL;\n"; + if ($call->{sparseflag} ne "none") { + print " const bool sparse = flags & $call->{sparseflag};\n" + } else { + print " const bool sparse = false;\n"; + } } print "\n"; @@ -1701,7 +1718,7 @@ elsif ($mode eq "client") { if ($call->{streamflag} ne "none") { print "\n"; - print " if (!(netst = virNetClientStreamNew(st, priv->remoteProgram, $call->{constname}, priv->counter, false)))\n"; + print " if (!(netst = virNetClientStreamNew(st, priv->remoteProgram, $call->{constname}, priv->counter, sparse)))\n"; print " goto done;\n"; print "\n"; print " if (virNetClientAddStream(priv->client, netst) < 0) {\n"; -- 2.8.3

These flags to APIs will tell if caller wants to use sparse stream for storage transfer. At the same time, it's safe to enable them in storage driver frontend and rely on our backends checking the flags. This way we can enable specific flags only on some specific backends, e.g. enable VIR_STORAGE_VOL_DOWNLOAD_SPARSE_STREAM for filesystem backend but not iSCSI backend. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- include/libvirt/libvirt-storage.h | 9 +++++++++ src/libvirt-storage.c | 4 ++-- src/remote/remote_protocol.x | 2 ++ src/storage/storage_driver.c | 4 ++-- 4 files changed, 15 insertions(+), 4 deletions(-) diff --git a/include/libvirt/libvirt-storage.h b/include/libvirt/libvirt-storage.h index db6f2b4..e98e8a5 100644 --- a/include/libvirt/libvirt-storage.h +++ b/include/libvirt/libvirt-storage.h @@ -337,11 +337,20 @@ virStorageVolPtr virStorageVolCreateXMLFrom (virStoragePoolPtr pool, const char *xmldesc, virStorageVolPtr clonevol, unsigned int flags); + +typedef enum { + VIR_STORAGE_VOL_DOWNLOAD_SPARSE_STREAM = 1 << 0, /* Use sparse stream */ +} virStorageVolDownloadFlags; + int virStorageVolDownload (virStorageVolPtr vol, virStreamPtr stream, unsigned long long offset, unsigned long long length, unsigned int flags); +typedef enum { + VIR_STORAGE_VOL_UPLOAD_SPARSE_STREAM = 1 << 0, /* Use sparse stream */ +} virStorageVolUploadFlags; + int virStorageVolUpload (virStorageVolPtr vol, virStreamPtr stream, unsigned long long offset, diff --git a/src/libvirt-storage.c b/src/libvirt-storage.c index 1ce6745..cd8ce16 100644 --- a/src/libvirt-storage.c +++ b/src/libvirt-storage.c @@ -1549,7 +1549,7 @@ virStorageVolCreateXMLFrom(virStoragePoolPtr pool, * @stream: stream to use as output * @offset: position in @vol to start reading from * @length: limit on amount of data to download - * @flags: extra flags; not used yet, so callers should always pass 0 + * @flags: bitwise-OR of virStorageVolDownloadFlags * * Download the content of the volume as a stream. If @length * is zero, then the remaining contents of the volume after @@ -1613,7 +1613,7 @@ virStorageVolDownload(virStorageVolPtr vol, * @stream: stream to use as input * @offset: position to start writing to * @length: limit on amount of data to upload - * @flags: extra flags; not used yet, so callers should always pass 0 + * @flags: bitwise-OR of virStorageVolUploadFlags * * Upload new content to the volume from a stream. This call * will fail if @offset + @length exceeds the size of the diff --git a/src/remote/remote_protocol.x b/src/remote/remote_protocol.x index bab8ef2..35f56d4 100644 --- a/src/remote/remote_protocol.x +++ b/src/remote/remote_protocol.x @@ -4753,6 +4753,7 @@ enum remote_procedure { /** * @generate: both * @writestream: 1 + * @sparseflag: VIR_STORAGE_VOL_UPLOAD_SPARSE_STREAM * @acl: storage_vol:data_write */ REMOTE_PROC_STORAGE_VOL_UPLOAD = 208, @@ -4760,6 +4761,7 @@ enum remote_procedure { /** * @generate: both * @readstream: 1 + * @sparseflag: VIR_STORAGE_VOL_DOWNLOAD_SPARSE_STREAM * @acl: storage_vol:data_read */ REMOTE_PROC_STORAGE_VOL_DOWNLOAD = 209, diff --git a/src/storage/storage_driver.c b/src/storage/storage_driver.c index eb5f688..a8ce96d 100644 --- a/src/storage/storage_driver.c +++ b/src/storage/storage_driver.c @@ -2161,7 +2161,7 @@ storageVolDownload(virStorageVolPtr obj, virStorageVolDefPtr vol = NULL; int ret = -1; - virCheckFlags(0, -1); + virCheckFlags(VIR_STORAGE_VOL_DOWNLOAD_SPARSE_STREAM, -1); if (!(vol = virStorageVolDefFromVol(obj, &pool, &backend))) return -1; @@ -2323,7 +2323,7 @@ storageVolUpload(virStorageVolPtr obj, virStorageVolStreamInfoPtr cbdata = NULL; int ret = -1; - virCheckFlags(0, -1); + virCheckFlags(VIR_STORAGE_VOL_UPLOAD_SPARSE_STREAM, -1); if (!(vol = virStorageVolDefFromVol(obj, &pool, &backend))) return -1; -- 2.8.3

The command grew new --sparse switch that does nothing more than enables the sparse streams feature for this command. Among with the switch new helper function is introduced: virshStreamSkip(). This is the callback that is called whenever daemon sends us a hole. In the callback we reflect the hole in underlying file by seeking as many bytes as told. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- tools/virsh-volume.c | 12 ++++++++++-- tools/virsh.c | 8 ++++++++ tools/virsh.h | 3 +++ tools/virsh.pod | 3 ++- 4 files changed, 23 insertions(+), 3 deletions(-) diff --git a/tools/virsh-volume.c b/tools/virsh-volume.c index d35fee1..27744c7 100644 --- a/tools/virsh-volume.c +++ b/tools/virsh-volume.c @@ -762,6 +762,10 @@ static const vshCmdOptDef opts_vol_download[] = { .type = VSH_OT_INT, .help = N_("amount of data to download") }, + {.name = "sparse", + .type = VSH_OT_BOOL, + .help = N_("preserve sparseness of volume") + }, {.name = NULL} }; @@ -777,6 +781,7 @@ cmdVolDownload(vshControl *ctl, const vshCmd *cmd) unsigned long long offset = 0, length = 0; bool created = false; virshControlPtr priv = ctl->privData; + unsigned int flags = 0; if (vshCommandOptULongLong(ctl, cmd, "offset", &offset) < 0) return false; @@ -790,6 +795,9 @@ cmdVolDownload(vshControl *ctl, const vshCmd *cmd) if (vshCommandOptStringReq(ctl, cmd, "file", &file) < 0) goto cleanup; + if (vshCommandOptBool(cmd, "sparse")) + flags |= VIR_STORAGE_VOL_DOWNLOAD_SPARSE_STREAM; + if ((fd = open(file, O_WRONLY|O_CREAT|O_EXCL, 0666)) < 0) { if (errno != EEXIST || (fd = open(file, O_WRONLY|O_TRUNC, 0666)) < 0) { @@ -805,12 +813,12 @@ cmdVolDownload(vshControl *ctl, const vshCmd *cmd) goto cleanup; } - if (virStorageVolDownload(vol, st, offset, length, 0) < 0) { + if (virStorageVolDownload(vol, st, offset, length, flags) < 0) { vshError(ctl, _("cannot download from volume %s"), name); goto cleanup; } - if (virStreamRecvAll(st, virshStreamSink, &fd) < 0) { + if (virStreamSparseRecvAll(st, virshStreamSink, virshStreamSkip, &fd) < 0) { vshError(ctl, _("cannot receive data from volume %s"), name); goto cleanup; } diff --git a/tools/virsh.c b/tools/virsh.c index 2a807d9..2f0f46a 100644 --- a/tools/virsh.c +++ b/tools/virsh.c @@ -263,6 +263,14 @@ int virshStreamSink(virStreamPtr st ATTRIBUTE_UNUSED, return safewrite(*fd, bytes, nbytes); } +int virshStreamSkip(virStreamPtr st ATTRIBUTE_UNUSED, + unsigned long long offset, void *opaque) +{ + int *fd = opaque; + + return lseek(*fd, offset, SEEK_CUR) == (off_t) -1 ? -1 : 0; +} + /* --------------- * Command Connect * --------------- diff --git a/tools/virsh.h b/tools/virsh.h index fd552bb..5c382ef 100644 --- a/tools/virsh.h +++ b/tools/virsh.h @@ -150,4 +150,7 @@ int virshDomainState(vshControl *ctl, virDomainPtr dom, int *reason); int virshStreamSink(virStreamPtr st, const char *bytes, size_t nbytes, void *opaque); +int virshStreamSkip(virStreamPtr st, + unsigned long long offset, void *opaque); + #endif /* VIRSH_H */ diff --git a/tools/virsh.pod b/tools/virsh.pod index 6844823..36a15f8 100644 --- a/tools/virsh.pod +++ b/tools/virsh.pod @@ -3581,12 +3581,13 @@ regarding possible target volume and pool changes as a result of the pool refresh when the upload is attempted. =item B<vol-download> [I<--pool> I<pool-or-uuid>] [I<--offset> I<bytes>] -[I<--length> I<bytes>] I<vol-name-or-key-or-path> I<local-file> +[I<--length> I<bytes>] [I<--sparse>] I<vol-name-or-key-or-path> I<local-file> Download the contents of a storage volume to I<local-file>. I<--pool> I<pool-or-uuid> is the name or UUID of the storage pool the volume is in. I<vol-name-or-key-or-path> is the name or key or path of the volume to download. +If I<--sparse> is specified, this command will preserve volume sparseness. I<--offset> is the position in the storage volume at which to start reading the data. The value must be 0 or larger. I<--length> is an upper bound of the amount of data to be downloaded. A negative value is interpreted as -- 2.8.3

Similarly to previous commit, implement sparse streams feature for vol-upload. This is, however, slightly different approach, because we must implement a function that will tell us whether we are in a data section or in a hole. But there's no magic hidden in here. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- tools/virsh-volume.c | 37 +++++++++++++++-------- tools/virsh.c | 85 ++++++++++++++++++++++++++++++++++++++++++++++++++++ tools/virsh.h | 14 +++++++++ tools/virsh.pod | 3 +- 4 files changed, 125 insertions(+), 14 deletions(-) diff --git a/tools/virsh-volume.c b/tools/virsh-volume.c index 27744c7..d6434d5 100644 --- a/tools/virsh-volume.c +++ b/tools/virsh-volume.c @@ -659,18 +659,13 @@ static const vshCmdOptDef opts_vol_upload[] = { .type = VSH_OT_INT, .help = N_("amount of data to upload") }, + {.name = "sparse", + .type = VSH_OT_BOOL, + .help = N_("preserve sparseness of volume") + }, {.name = NULL} }; -static int -cmdVolUploadSource(virStreamPtr st ATTRIBUTE_UNUSED, - char *bytes, size_t nbytes, void *opaque) -{ - int *fd = opaque; - - return saferead(*fd, bytes, nbytes); -} - static bool cmdVolUpload(vshControl *ctl, const vshCmd *cmd) { @@ -682,6 +677,8 @@ cmdVolUpload(vshControl *ctl, const vshCmd *cmd) const char *name = NULL; unsigned long long offset = 0, length = 0; virshControlPtr priv = ctl->privData; + unsigned int flags = 0; + virshStreamCallbackData cbData; if (vshCommandOptULongLong(ctl, cmd, "offset", &offset) < 0) return false; @@ -700,19 +697,33 @@ cmdVolUpload(vshControl *ctl, const vshCmd *cmd) goto cleanup; } + cbData.ctl = ctl; + cbData.fd = fd; + + if (vshCommandOptBool(cmd, "sparse")) + flags |= VIR_STORAGE_VOL_UPLOAD_SPARSE_STREAM; + if (!(st = virStreamNew(priv->conn, 0))) { vshError(ctl, _("cannot create a new stream")); goto cleanup; } - if (virStorageVolUpload(vol, st, offset, length, 0) < 0) { + if (virStorageVolUpload(vol, st, offset, length, flags) < 0) { vshError(ctl, _("cannot upload to volume %s"), name); goto cleanup; } - if (virStreamSendAll(st, cmdVolUploadSource, &fd) < 0) { - vshError(ctl, _("cannot send data to volume %s"), name); - goto cleanup; + if (flags & VIR_STORAGE_VOL_UPLOAD_SPARSE_STREAM) { + if (virStreamSparseSendAll(st, virshStreamSource, + virshStreamInData, &cbData) < 0) { + vshError(ctl, _("cannot send data to volume %s"), name); + goto cleanup; + } + } else { + if (virStreamSendAll(st, virshStreamSource, &cbData) < 0) { + vshError(ctl, _("cannot send data to volume %s"), name); + goto cleanup; + } } if (VIR_CLOSE(fd) < 0) { diff --git a/tools/virsh.c b/tools/virsh.c index 2f0f46a..12b53d0 100644 --- a/tools/virsh.c +++ b/tools/virsh.c @@ -263,6 +263,17 @@ int virshStreamSink(virStreamPtr st ATTRIBUTE_UNUSED, return safewrite(*fd, bytes, nbytes); } +int +virshStreamSource(virStreamPtr st ATTRIBUTE_UNUSED, + char *bytes, size_t nbytes, void *opaque) +{ + virshStreamCallbackDataPtr cbData = opaque; + int fd = cbData->fd; + + return saferead(fd, bytes, nbytes); +} + + int virshStreamSkip(virStreamPtr st ATTRIBUTE_UNUSED, unsigned long long offset, void *opaque) { @@ -271,6 +282,80 @@ int virshStreamSkip(virStreamPtr st ATTRIBUTE_UNUSED, return lseek(*fd, offset, SEEK_CUR) == (off_t) -1 ? -1 : 0; } +int virshStreamInData(virStreamPtr st ATTRIBUTE_UNUSED, + int *inData, + unsigned long long *offset, + void *opaque) +{ + virshStreamCallbackDataPtr cbData = opaque; + vshControl *ctl = cbData->ctl; + int fd = cbData->fd; + off_t cur, data, hole; + int ret = -1; + + *inData = 0; + *offset = 0; + + /* Get current position */ + cur = lseek(fd, 0, SEEK_CUR); + if (cur == (off_t) -1) { + vshError(ctl, "%s", _("Unable to get current position in stream")); + goto cleanup; + } + + /* Now try to get data and hole offsets */ + data = lseek(fd, cur, SEEK_DATA); + + /* There are four options: + * 1) data == cur; @cur is in data + * 2) data > cur; @cur is in a hole, next data at @data + * 3) data < 0, errno = ENXIO; either @cur is in trailing hole, or @cur is beyond EOF. + * 4) data < 0, errno != ENXIO; we learned nothing + */ + + if (data == (off_t) -1) { + /* cases 3 and 4 */ + if (errno != ENXIO) { + vshError(ctl, "%s", _("Unable to seek to data")); + goto cleanup; + } + *inData = 0; + *offset = 0; + } else if (data > cur) { + /* case 2 */ + *inData = 0; + *offset = data - cur; + } else { + /* case 1 */ + *inData = 1; + + /* We don't know where does the next hole start. Let's + * find out. Here we get the same 4 possibilities as + * described above.*/ + hole = lseek(fd, data, SEEK_HOLE); + if (hole == (off_t) -1 || hole == data) { + /* cases 1, 3 and 4 */ + /* Wait a second. The reason why we are here is + * because we are in data. But at the same time we + * are in a trailing hole? Wut!? Do the best what we + * can do here. */ + vshError(ctl, "%s", _("unable to seek to hole")); + goto cleanup; + } else { + /* case 2 */ + *offset = (hole - data); + } + } + + ret = 0; + cleanup: + /* If we were in data initially, reposition back. */ + if (*inData && cur != (off_t) -1) + ignore_value(lseek(fd, cur, SEEK_SET)); + return ret; +} + + /* --------------- * Command Connect * --------------- diff --git a/tools/virsh.h b/tools/virsh.h index 5c382ef..1362819 100644 --- a/tools/virsh.h +++ b/tools/virsh.h @@ -150,7 +150,21 @@ int virshDomainState(vshControl *ctl, virDomainPtr dom, int *reason); int virshStreamSink(virStreamPtr st, const char *bytes, size_t nbytes, void *opaque); +int virshStreamSource(virStreamPtr st, + char *bytes, size_t nbytes, void *opaque); + int virshStreamSkip(virStreamPtr st, unsigned long long offset, void *opaque); +typedef struct _virshStreamCallbackData virshStreamCallbackData; +typedef virshStreamCallbackData *virshStreamCallbackDataPtr; +struct _virshStreamCallbackData { + vshControl *ctl; + int fd; +}; + +int virshStreamInData(virStreamPtr st, + int *data, + unsigned long long *offset, + void *opaque); #endif /* VIRSH_H */ diff --git a/tools/virsh.pod b/tools/virsh.pod index 36a15f8..f10fee9 100644 --- a/tools/virsh.pod +++ b/tools/virsh.pod @@ -3563,13 +3563,14 @@ the storage volume should be deleted as well. Not all storage drivers support this option, presently only rbd. =item B<vol-upload> [I<--pool> I<pool-or-uuid>] [I<--offset> I<bytes>] -[I<--length> I<bytes>] I<vol-name-or-key-or-path> I<local-file> +[I<--length> I<bytes>] [I<--sparse>] I<vol-name-or-key-or-path> I<local-file> Upload the contents of I<local-file> to a storage volume. I<--pool> I<pool-or-uuid> is the name or UUID of the storage pool the volume is in. I<vol-name-or-key-or-path> is the name or key or path of the volume where the file will be uploaded. +If I<--sparse> is specified, this command will preserve volume sparseness. I<--offset> is the position in the storage volume at which to start writing the data. The value must be 0 or larger. I<--length> is an upper bound of the amount of data to be uploaded. A negative value is interpreted -- 2.8.3

While virStreamInData() should be a small and quick function, in our implementation it seeks multiple times. Moreover, it is called even if we know that we are in data. Well, quite. If we track its return values and do some basic math with them, we can end up calling virStreamInData right at the boundaries of a data section or a hole and nowhere else. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- daemon/stream.c | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/daemon/stream.c b/daemon/stream.c index a8ad8f7..22d7cf7 100644 --- a/daemon/stream.c +++ b/daemon/stream.c @@ -54,6 +54,7 @@ struct daemonClientStream { bool tx; bool skippable; + size_t dataLen; /* How much data is there remaining until we see a hole */ daemonClientStreamPtr next; }; @@ -823,7 +824,7 @@ daemonStreamHandleRead(virNetServerClientPtr client, if (!(msg = virNetMessageNew(false))) goto cleanup; - if (stream->skippable) { + if (stream->skippable && !stream->dataLen) { /* Handle skip. We want to send some data to the client. But we might * be in a hole. Seek to next data. But if we are in data already, just * carry on. */ @@ -867,10 +868,13 @@ daemonStreamHandleRead(virNetServerClientPtr client, } } - if (length < bufferLen) - bufferLen = length; + stream->dataLen = length; } + if (stream->skippable && + bufferLen > stream->dataLen) + bufferLen = stream->dataLen; + rv = virStreamRecv(stream->st, buffer, bufferLen); if (rv == -2) { /* Should never get this, since we're only called when we know @@ -885,6 +889,8 @@ daemonStreamHandleRead(virNetServerClientPtr client, goto cleanup; msg = NULL; } else { + stream->dataLen -= rv; + stream->tx = false; if (rv == 0) stream->recvEOF = true; -- 2.8.3

NOT TO BE APPLIED UPSTREAM! This is just a hack so that reviewers can try sparse streams out. Problem is, if nonblocking streams are requested (which is case of every single stream in the daemon), we spawn IO helper and create a pipe to read data from it. But that means fd that virFDStreamInData() sees is to our end of pipe, not the actual file. Therefore any lseek() fails (one simply doesn't seek over a pipe). Solution would be to bring some protocol to our communication with the IO helper. But that's going to be yet another N patches and this patch set is long enough as is. So just for the sake of review, do this hack for now. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/fdstream.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/fdstream.c b/src/fdstream.c index 18a2bc0..cf0c325 100644 --- a/src/fdstream.c +++ b/src/fdstream.c @@ -761,9 +761,9 @@ virFDStreamOpenFileInternal(virStreamPtr st, * support those we need to fork a helper process to do * the I/O so we just have a fifo. Or use AIO :-( */ - if ((st->flags & VIR_STREAM_NONBLOCK) && - ((!S_ISCHR(sb.st_mode) && - !S_ISFIFO(sb.st_mode)) || forceIOHelper)) { + if (0 && ((st->flags & VIR_STREAM_NONBLOCK) && + ((!S_ISCHR(sb.st_mode) && + !S_ISFIFO(sb.st_mode)) || forceIOHelper))) { int fds[2] = { -1, -1 }; if ((oflags & O_ACCMODE) == O_RDWR) { -- 2.8.3

NOT TO BE APPLIED UPSTREAM! Now that we have everything prepared, let's enable the feature for these two APIs. Well, except we can't. Not just yet. Because of the previous commit. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/storage/storage_backend.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/storage/storage_backend.c b/src/storage/storage_backend.c index 3a23cd7..b2a9e3b 100644 --- a/src/storage/storage_backend.c +++ b/src/storage/storage_backend.c @@ -2005,7 +2005,8 @@ virStorageBackendVolUploadLocal(virConnectPtr conn ATTRIBUTE_UNUSED, int ret = -1; int has_snap = 0; - virCheckFlags(0, -1); + virCheckFlags(VIR_STORAGE_VOL_UPLOAD_SPARSE_STREAM, -1); + /* if volume has target format VIR_STORAGE_FILE_PLOOP * we need to restore DiskDescriptor.xml, according to * new contents of volume. This operation will be perfomed @@ -2051,7 +2052,8 @@ virStorageBackendVolDownloadLocal(virConnectPtr conn ATTRIBUTE_UNUSED, int ret = -1; int has_snap = 0; - virCheckFlags(0, -1); + virCheckFlags(VIR_STORAGE_VOL_DOWNLOAD_SPARSE_STREAM, -1); + if (vol->target.format == VIR_STORAGE_FILE_PLOOP) { has_snap = virStorageBackendPloopHasSnapshots(vol->target.path); if (has_snap < 0) { -- 2.8.3
participants (1)
-
Michal Privoznik