[libvirt] [PATCH RFC 0/8] Sparse streams

** NOT TO BE MERGED UPSTREAM ** This is merely an RFC. What's the problem? We have APIs for transferring disk images from/to host. Problem is, disk images can be sparse files. Our code is, however, unaware of that fact so if for instance the disk is one big hole of 8GB all those bytes have to: a) be read b) come through our event loop. This is obviously very inefficient way. How to deal with it? The way I propose (and this is actually what I like you to comment on) is to invent set of new API. We need to make read from and write to a stream sparseness aware. The new APIs are as follows: int virStreamSendOffset(virStreamPtr stream, unsigned long long offset, const char *data, size_t nbytes); int virStreamRecvOffset(virStreamPtr stream, unsigned long long *offset, char *data, size_t nbytes); The SendOffset() is fairly simple. It is given an offset to write @data from so it basically lseek() to @offset and write() data. The RecvOffset() has slightly complicated design - it has to be aware of the fact that @offset it is required to read from fall into a hole. If that's the case it sets @offset to new location where data starts. Are there other ways possible? Sure! If you have any specific in mind I am happy to discuss it. For instance one idea I've heard (from Martin) was instead of SendOffset() and RecvOffset() we may just invent our variant of lseek(). What's left to be done? Basically, I still don't have RPC implementation. But before I dive into that I thought of sharing my approach with you - because it may turn out that a different approach is going to be needed and thus my work would render useless. Is there any bug attached? You can bet! https://bugzilla.redhat.com/show_bug.cgi?id=1282859 Michal Privoznik (8): fdstream: Realign Introduce virStream{Recv,Send}Offset Introduce VIR_STREAM_SPARSE flag fdstream: Implement virStreamSendOffset fdstream: Implement virStreamRecvOffset fdstreamtest: switch from boolean array to @flags fdstreamtest: Test virStreamRecvOffset fdstreamtest: test virStreamSendOffset include/libvirt/libvirt-stream.h | 9 +++ src/driver-stream.h | 13 ++++ src/fdstream.c | 150 +++++++++++++++++++++++++++++++++++---- src/internal.h | 20 ++++++ src/libvirt-stream.c | 119 +++++++++++++++++++++++++++++++ src/libvirt_public.syms | 6 ++ tests/fdstreamtest.c | 116 ++++++++++++++++++++++++------ 7 files changed, 397 insertions(+), 36 deletions(-) -- 2.4.10

Some lines in this file are misaligned which fires up my OCD. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/fdstream.c | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/fdstream.c b/src/fdstream.c index b8ea86e..a85cf9d 100644 --- a/src/fdstream.c +++ b/src/fdstream.c @@ -215,10 +215,10 @@ virFDStreamAddCallback(virStreamPtr st, } if ((fdst->watch = virEventAddHandle(fdst->fd, - events, - virFDStreamEvent, - st, - virFDStreamCallbackFree)) < 0) { + events, + virFDStreamEvent, + st, + virFDStreamCallbackFree)) < 0) { virReportError(VIR_ERR_INTERNAL_ERROR, "%s", _("cannot register file watch on stream")); goto cleanup; @@ -624,7 +624,7 @@ virFDStreamOpenFileInternal(virStreamPtr st, */ if ((st->flags & VIR_STREAM_NONBLOCK) && ((!S_ISCHR(sb.st_mode) && - !S_ISFIFO(sb.st_mode)) || forceIOHelper)) { + !S_ISFIFO(sb.st_mode)) || forceIOHelper)) { int fds[2] = { -1, -1 }; if ((oflags & O_ACCMODE) == O_RDWR) { -- 2.4.10

On Fri, Jan 29, 2016 at 02:26:52PM +0100, Michal Privoznik wrote:
Some lines in this file are misaligned which fires up my OCD.
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/fdstream.c | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-)
diff --git a/src/fdstream.c b/src/fdstream.c index b8ea86e..a85cf9d 100644 --- a/src/fdstream.c +++ b/src/fdstream.c @@ -215,10 +215,10 @@ virFDStreamAddCallback(virStreamPtr st, }
if ((fdst->watch = virEventAddHandle(fdst->fd, - events, - virFDStreamEvent, - st, - virFDStreamCallbackFree)) < 0) { + events, + virFDStreamEvent, + st, + virFDStreamCallbackFree)) < 0) { virReportError(VIR_ERR_INTERNAL_ERROR, "%s", _("cannot register file watch on stream")); goto cleanup; @@ -624,7 +624,7 @@ virFDStreamOpenFileInternal(virStreamPtr st, */ if ((st->flags & VIR_STREAM_NONBLOCK) && ((!S_ISCHR(sb.st_mode) && - !S_ISFIFO(sb.st_mode)) || forceIOHelper)) { + !S_ISFIFO(sb.st_mode)) || forceIOHelper)) { int fds[2] = { -1, -1 };
if ((oflags & O_ACCMODE) == O_RDWR) {
ACK, just push this one now regardless. Regards, Daniel -- |: http://berrange.com -o- http://www.flickr.com/photos/dberrange/ :| |: http://libvirt.org -o- http://virt-manager.org :| |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :| |: http://entangle-photo.org -o- http://live.gnome.org/gtk-vnc :|

When dealing with sparse files we need to be able to jump over holes as there's no added value in reading/writing them. For that, we need new set of send and receive APIs that will have @offset argument. Sending data to a stream would be easy - just say from which offset we are sending data. Reading is a bit tricky - we need read function which can detect holes and thus when requested to read from one it will set @offset to new value that contains data. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- include/libvirt/libvirt-stream.h | 8 +++ src/driver-stream.h | 13 +++++ src/libvirt-stream.c | 113 +++++++++++++++++++++++++++++++++++++++ src/libvirt_public.syms | 6 +++ 4 files changed, 140 insertions(+) diff --git a/include/libvirt/libvirt-stream.h b/include/libvirt/libvirt-stream.h index 831640d..5a2bde3 100644 --- a/include/libvirt/libvirt-stream.h +++ b/include/libvirt/libvirt-stream.h @@ -40,11 +40,19 @@ int virStreamRef(virStreamPtr st); int virStreamSend(virStreamPtr st, const char *data, size_t nbytes); +int virStreamSendOffset(virStreamPtr stream, + unsigned long long offset, + const char *data, + size_t nbytes); int virStreamRecv(virStreamPtr st, char *data, size_t nbytes); +int virStreamRecvOffset(virStreamPtr stream, + unsigned long long *offset, + char *data, + size_t nbytes); /** * virStreamSourceFunc: diff --git a/src/driver-stream.h b/src/driver-stream.h index 85b4e3b..5419b85 100644 --- a/src/driver-stream.h +++ b/src/driver-stream.h @@ -31,9 +31,20 @@ typedef int size_t nbytes); typedef int +(*virDrvStreamSendOffset)(virStreamPtr st, + unsigned long long offset, + const char *data, + size_t nbytes); + +typedef int (*virDrvStreamRecv)(virStreamPtr st, char *data, size_t nbytes); +typedef int +(*virDrvStreamRecvOffset)(virStreamPtr st, + unsigned long long *offset, + char *data, + size_t nbytes); typedef int (*virDrvStreamEventAddCallback)(virStreamPtr stream, @@ -60,7 +71,9 @@ typedef virStreamDriver *virStreamDriverPtr; struct _virStreamDriver { virDrvStreamSend streamSend; + virDrvStreamSendOffset streamSendOffset; virDrvStreamRecv streamRecv; + virDrvStreamRecvOffset streamRecvOffset; virDrvStreamEventAddCallback streamEventAddCallback; virDrvStreamEventUpdateCallback streamEventUpdateCallback; virDrvStreamEventRemoveCallback streamEventRemoveCallback; diff --git a/src/libvirt-stream.c b/src/libvirt-stream.c index c16f586..1df188c 100644 --- a/src/libvirt-stream.c +++ b/src/libvirt-stream.c @@ -192,6 +192,58 @@ virStreamSend(virStreamPtr stream, /** + * virStreamSendOffset: + * @stream: pointer to the stream object + * @offset: <something> + * @data: buffer to write to stream + * @nbytes: size of @data buffer + * + * Sends some data down the pipe. + * + * Returns the number of bytes written, which may be less + * than requested. + * + * Returns -1 upon error, at which time the stream will + * be marked as aborted, and the caller should now release + * the stream with virStreamFree. + * + * Returns -2 if the outgoing transmit buffers are full & + * the stream is marked as non-blocking. + */ +int +virStreamSendOffset(virStreamPtr stream, + unsigned long long offset, + const char *data, + size_t nbytes) +{ + VIR_DEBUG("stream=%p, offset=%llu, data=%p, nbytes=%zu", + stream, offset, data, nbytes); + + virResetLastError(); + + virCheckStreamReturn(stream, -1); + virCheckNonNullArgGoto(data, error); + + if (stream->driver && + stream->driver->streamSendOffset) { + int ret; + ret = (stream->driver->streamSendOffset)(stream, offset, data, nbytes); + if (ret == -2) + return -2; + if (ret < 0) + goto error; + return ret; + } + + virReportUnsupportedError(); + + error: + virDispatchError(stream->conn); + return -1; +} + + +/** * virStreamRecv: * @stream: pointer to the stream object * @data: buffer to read into from stream @@ -285,6 +337,67 @@ virStreamRecv(virStreamPtr stream, /** + * virStreamRecvOffset: + * @stream: pointer to the stream object + * @offset: <something> + * @data: buffer to write to stream + * @nbytes: size of @data buffer + * + * Recieve some data from stream. On return set offset to next location. + * + * Returns the number of bytes read, which may be less + * than requested. + * + * Returns 0 when either the end of the stream is reached, or + * there are no data to be sent at current @offset. In case of + * the former, the stream should be finished by calling + * virStreamFinish(). However, in case of the latter, @offset + * should be set to new position where interesting data is. + * Failing to do so will result in assumption that there is no + * data left. + * + * Returns -1 upon error, at which time the stream will + * be marked as aborted, and the caller should now release + * the stream with virStreamFree. + * + * Returns -2 if there is no data pending to be read & the + * stream is marked as non-blocking. + */ +int +virStreamRecvOffset(virStreamPtr stream, + unsigned long long *offset, + char *data, + size_t nbytes) +{ + VIR_DEBUG("stream=%p, offset=%p, data=%p, nbytes=%zu", + stream, offset, data, nbytes); + + virResetLastError(); + + virCheckStreamReturn(stream, -1); + virCheckNonNullArgGoto(offset, error); + virCheckNonNullArgGoto(data, error); + + if (stream->driver && + stream->driver->streamRecvOffset) { + int ret; + ret = (stream->driver->streamRecvOffset)(stream, offset, data, nbytes); + if (ret == -2) + return -2; + if (ret < 0) + goto error; + return ret; + } + + virReportUnsupportedError(); + + error: + virDispatchError(stream->conn); + return -1; +} + + +/** * virStreamSendAll: * @stream: pointer to the stream object * @handler: source callback for reading data from application diff --git a/src/libvirt_public.syms b/src/libvirt_public.syms index dd94191..4229df9 100644 --- a/src/libvirt_public.syms +++ b/src/libvirt_public.syms @@ -725,4 +725,10 @@ LIBVIRT_1.2.19 { virDomainRename; } LIBVIRT_1.2.17; +LIBVIRT_1.3.1 { + global: + virStreamRecvOffset; + virStreamSendOffset; +} LIBVIRT_1.2.19; + # .... define new API here using predicted next version number .... -- 2.4.10

On Fri, Jan 29, 2016 at 02:26:53PM +0100, Michal Privoznik wrote:
When dealing with sparse files we need to be able to jump over holes as there's no added value in reading/writing them. For that, we need new set of send and receive APIs that will have @offset argument. Sending data to a stream would be easy - just say from which offset we are sending data. Reading is a bit tricky - we need read function which can detect holes and thus when requested to read from one it will set @offset to new value that contains data.
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- include/libvirt/libvirt-stream.h | 8 +++ src/driver-stream.h | 13 +++++ src/libvirt-stream.c | 113 +++++++++++++++++++++++++++++++++++++++ src/libvirt_public.syms | 6 +++ 4 files changed, 140 insertions(+)
diff --git a/include/libvirt/libvirt-stream.h b/include/libvirt/libvirt-stream.h index 831640d..5a2bde3 100644 --- a/include/libvirt/libvirt-stream.h +++ b/include/libvirt/libvirt-stream.h @@ -40,11 +40,19 @@ int virStreamRef(virStreamPtr st); int virStreamSend(virStreamPtr st, const char *data, size_t nbytes); +int virStreamSendOffset(virStreamPtr stream, + unsigned long long offset, + const char *data, + size_t nbytes);
int virStreamRecv(virStreamPtr st, char *data, size_t nbytes);
+int virStreamRecvOffset(virStreamPtr stream, + unsigned long long *offset, + char *data, + size_t nbytes);
/** * virStreamSourceFunc: diff --git a/src/driver-stream.h b/src/driver-stream.h index 85b4e3b..5419b85 100644 --- a/src/driver-stream.h +++ b/src/driver-stream.h @@ -31,9 +31,20 @@ typedef int size_t nbytes);
typedef int +(*virDrvStreamSendOffset)(virStreamPtr st, + unsigned long long offset, + const char *data, + size_t nbytes); + +typedef int (*virDrvStreamRecv)(virStreamPtr st, char *data, size_t nbytes); +typedef int +(*virDrvStreamRecvOffset)(virStreamPtr st, + unsigned long long *offset, + char *data, + size_t nbytes);
typedef int (*virDrvStreamEventAddCallback)(virStreamPtr stream, @@ -60,7 +71,9 @@ typedef virStreamDriver *virStreamDriverPtr;
struct _virStreamDriver { virDrvStreamSend streamSend; + virDrvStreamSendOffset streamSendOffset; virDrvStreamRecv streamRecv; + virDrvStreamRecvOffset streamRecvOffset; virDrvStreamEventAddCallback streamEventAddCallback; virDrvStreamEventUpdateCallback streamEventUpdateCallback; virDrvStreamEventRemoveCallback streamEventRemoveCallback; diff --git a/src/libvirt-stream.c b/src/libvirt-stream.c index c16f586..1df188c 100644 --- a/src/libvirt-stream.c +++ b/src/libvirt-stream.c @@ -192,6 +192,58 @@ virStreamSend(virStreamPtr stream,
/** + * virStreamSendOffset: + * @stream: pointer to the stream object + * @offset: <something> + * @data: buffer to write to stream + * @nbytes: size of @data buffer + * + * Sends some data down the pipe. + * + * Returns the number of bytes written, which may be less + * than requested. + * + * Returns -1 upon error, at which time the stream will + * be marked as aborted, and the caller should now release + * the stream with virStreamFree. + * + * Returns -2 if the outgoing transmit buffers are full & + * the stream is marked as non-blocking. + */ +int +virStreamSendOffset(virStreamPtr stream, + unsigned long long offset, + const char *data, + size_t nbytes) +{ + VIR_DEBUG("stream=%p, offset=%llu, data=%p, nbytes=%zu", + stream, offset, data, nbytes); + + virResetLastError(); + + virCheckStreamReturn(stream, -1); + virCheckNonNullArgGoto(data, error); + + if (stream->driver && + stream->driver->streamSendOffset) { + int ret; + ret = (stream->driver->streamSendOffset)(stream, offset, data, nbytes); + if (ret == -2) + return -2; + if (ret < 0) + goto error; + return ret; + } + + virReportUnsupportedError(); + + error: + virDispatchError(stream->conn); + return -1; +} + + +/** * virStreamRecv: * @stream: pointer to the stream object * @data: buffer to read into from stream @@ -285,6 +337,67 @@ virStreamRecv(virStreamPtr stream,
/** + * virStreamRecvOffset: + * @stream: pointer to the stream object + * @offset: <something> + * @data: buffer to write to stream + * @nbytes: size of @data buffer + * + * Recieve some data from stream. On return set offset to next location. + * + * Returns the number of bytes read, which may be less + * than requested. + * + * Returns 0 when either the end of the stream is reached, or + * there are no data to be sent at current @offset. In case of + * the former, the stream should be finished by calling + * virStreamFinish(). However, in case of the latter, @offset + * should be set to new position where interesting data is. + * Failing to do so will result in assumption that there is no + * data left. + * + * Returns -1 upon error, at which time the stream will + * be marked as aborted, and the caller should now release + * the stream with virStreamFree. + * + * Returns -2 if there is no data pending to be read & the + * stream is marked as non-blocking. + */
From an RPC layer POV we currently have VIR_NET_STREAM packets to transmit the data payload. There is no offset information
It is not entirely clear from these docs, but from the impl against 'fdstream', you seem to treating 'offset' as an input and output parameter. I don't think that is going to fly wrt to the RPC protocol. The reads are explicitly asynchronous wrt the remote server transmission of data. ie once the stream is open, the server will push out the data in a continuous stream. There is no "read" operation requested against the server. So it is not possible to send an 'offset' across to the server. So the 'offset' really needs to be an output only parameter. I think it is important that we consider the design additions to the wire protocol at the same time as the API design we're considering. provided in these packets, as it is designed as a continuous stream of data. So from this POV adding offset to the send/recv methods in the API is not a good fit for the wire proocol. We use the status field to distinguish between a packet with payload and a packet without data. ie VIR_NET_CONTINUE status indicates that there is a a byte[] payload. We could introduce a new status field VIR_NET_SKIP to represent a hole in the stream, and the payload would simply be the size of the hole. Regards, Daniel -- |: http://berrange.com -o- http://www.flickr.com/photos/dberrange/ :| |: http://libvirt.org -o- http://virt-manager.org :| |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :| |: http://entangle-photo.org -o- http://live.gnome.org/gtk-vnc :|

On Mon, 2016-02-01 at 14:49 +0000, Daniel P. Berrange wrote:
On Fri, Jan 29, 2016 at 02:26:53PM +0100, Michal Privoznik wrote:
When dealing with sparse files we need to be able to jump over holes as there's no added value in reading/writing them. For that, we need new set of send and receive APIs that will have @offset argument. Sending data to a stream would be easy - just say from which offset we are sending data. Reading is a bit tricky - we need read function which can detect holes and thus when requested to read from one it will set @offset to new value that contains data.
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- include/libvirt/libvirt-stream.h | 8 +++ src/driver-stream.h | 13 +++++ src/libvirt-stream.c | 113 +++++++++++++++++++++++++++++++++++++++ src/libvirt_public.syms | 6 +++ 4 files changed, 140 insertions(+)
diff --git a/include/libvirt/libvirt-stream.h b/include/libvirt/libvirt-stream.h index 831640d..5a2bde3 100644 --- a/include/libvirt/libvirt-stream.h +++ b/include/libvirt/libvirt-stream.h @@ -40,11 +40,19 @@ int virStreamRef(virStreamPtr st); int virStreamSend(virStreamPtr st, const char *data, size_t nbytes); +int virStreamSendOffset(virStreamPtr stream, + unsigned long long offset, + const char *data, + size_t nbytes); int virStreamRecv(virStreamPtr st, char *data, size_t nbytes); +int virStreamRecvOffset(virStreamPtr stream, + unsigned long long *offset, + char *data, + size_t nbytes); /** * virStreamSourceFunc: diff --git a/src/driver-stream.h b/src/driver-stream.h index 85b4e3b..5419b85 100644 --- a/src/driver-stream.h +++ b/src/driver-stream.h @@ -31,9 +31,20 @@ typedef int size_t nbytes); typedef int +(*virDrvStreamSendOffset)(virStreamPtr st, + unsigned long long offset, + const char *data, + size_t nbytes); + +typedef int (*virDrvStreamRecv)(virStreamPtr st, char *data, size_t nbytes); +typedef int +(*virDrvStreamRecvOffset)(virStreamPtr st, + unsigned long long *offset, + char *data, + size_t nbytes); typedef int (*virDrvStreamEventAddCallback)(virStreamPtr stream, @@ -60,7 +71,9 @@ typedef virStreamDriver *virStreamDriverPtr; struct _virStreamDriver { virDrvStreamSend streamSend; + virDrvStreamSendOffset streamSendOffset; virDrvStreamRecv streamRecv; + virDrvStreamRecvOffset streamRecvOffset; virDrvStreamEventAddCallback streamEventAddCallback; virDrvStreamEventUpdateCallback streamEventUpdateCallback; virDrvStreamEventRemoveCallback streamEventRemoveCallback; diff --git a/src/libvirt-stream.c b/src/libvirt-stream.c index c16f586..1df188c 100644 --- a/src/libvirt-stream.c +++ b/src/libvirt-stream.c @@ -192,6 +192,58 @@ virStreamSend(virStreamPtr stream, /** + * virStreamSendOffset: + * @stream: pointer to the stream object + * @offset: <something> + * @data: buffer to write to stream + * @nbytes: size of @data buffer + * + * Sends some data down the pipe. + * + * Returns the number of bytes written, which may be less + * than requested. + * + * Returns -1 upon error, at which time the stream will + * be marked as aborted, and the caller should now release + * the stream with virStreamFree. + * + * Returns -2 if the outgoing transmit buffers are full & + * the stream is marked as non-blocking. + */ +int +virStreamSendOffset(virStreamPtr stream, + unsigned long long offset, + const char *data, + size_t nbytes) +{ + VIR_DEBUG("stream=%p, offset=%llu, data=%p, nbytes=%zu", + stream, offset, data, nbytes); + + virResetLastError(); + + virCheckStreamReturn(stream, -1); + virCheckNonNullArgGoto(data, error); + + if (stream->driver && + stream->driver->streamSendOffset) { + int ret; + ret = (stream->driver->streamSendOffset)(stream, offset, data, nbytes); + if (ret == -2) + return -2; + if (ret < 0) + goto error; + return ret; + } + + virReportUnsupportedError(); + + error: + virDispatchError(stream->conn); + return -1; +} + + +/** * virStreamRecv: * @stream: pointer to the stream object * @data: buffer to read into from stream @@ -285,6 +337,67 @@ virStreamRecv(virStreamPtr stream, /** + * virStreamRecvOffset: + * @stream: pointer to the stream object + * @offset: <something> + * @data: buffer to write to stream + * @nbytes: size of @data buffer + * + * Recieve some data from stream. On return set offset to next location. + * + * Returns the number of bytes read, which may be less + * than requested. + * + * Returns 0 when either the end of the stream is reached, or + * there are no data to be sent at current @offset. In case of + * the former, the stream should be finished by calling + * virStreamFinish(). However, in case of the latter, @offset + * should be set to new position where interesting data is. + * Failing to do so will result in assumption that there is no + * data left. + * + * Returns -1 upon error, at which time the stream will + * be marked as aborted, and the caller should now release + * the stream with virStreamFree. + * + * Returns -2 if there is no data pending to be read & the + * stream is marked as non-blocking. + */
It is not entirely clear from these docs, but from the impl against 'fdstream', you seem to treating 'offset' as an input and output parameter.
I don't think that is going to fly wrt to the RPC protocol. The reads are explicitly asynchronous wrt the remote server transmission of data. ie once the stream is open, the server will push out the data in a continuous stream. There is no "read" operation requested against the server. So it is not possible to send an 'offset' across to the server. So the 'offset' really needs to be an output only parameter.
I think it is important that we consider the design additions to the wire protocol at the same time as the API design we're considering.
From an RPC layer POV we currently have VIR_NET_STREAM packets to transmit the data payload. There is no offset information provided in these packets, as it is designed as a continuous stream of data. So from this POV adding offset to the send/recv methods in the API is not a good fit for the wire proocol.
We use the status field to distinguish between a packet with payload and a packet without data. ie VIR_NET_CONTINUE status indicates that there is a a byte[] payload. We could introduce a new status field VIR_NET_SKIP to represent a hole in the stream, and the payload would simply be the size of the hole.
Regards, Daniel
msg.header.status will need a case added to handle VIR_NET_SKIP
Daniel, Michal: I started looking at this topic last week after Daniel pointed me at Michal's thread back in December re: "Sparse image volDownload". I don't have any proposed patches yet. Still reading code, building my mental map thereof. I was happy to see Michal's RFC and Daniel's responses as it helps in this endeavor. So to check my perceived understanding so far I'd like to discuss Michal's proposed APIs and possible implementations in the context of Daniels comment on the wire protocol -- specifically the notion of a VIR_NET_SKIP packet to represent holes. I have also looked at the virsh volume upload/download use of virStream{Send,Recv)All() but will comment on that separately in response to Daniel's response to message 0/8. I think that Michal's proposed APIs could work atop a wire protocol that sends holes and data chunks separately based on a couple of different interpretations of the 'offset' parameter. 1) If 'offset' is interpreted as "offset from the beginning of the stream", then I think the stream would need to track the amount of data and hole sizes already sent -- effectively the offset from start of the stream of the next chunk to send/receive. Then for virStreamSendOffset() the 'offset' parameter would need to be >= current "stream offset". If equal, just send the data, otherwise send a hole of size offset-"stream offset", followed by the data. This would require that the client also track or be able to query the cumulative data+holes sent to compute the offset. 2) Or, one could interpret offset as relative to the data+holes already sent, constrained to >=0. Greater than zero indicates a hole to send before any data -- a hole could be at end of the stream data source. Less bookkeeping for the client and stream. In either case, the added stream driver function would be "streamSendHole()" rather than streamSendOffset(). For compatibility we only want to send a hole if the remote end supports it. If not, we'd need to expand the offset/hole to the equivalent number of zero data bytes somewhere in the stack. I haven't looked at how libvirt negotiates protocol support yet. The current virStreamSend() could be enhanced to scan data buffers for sufficiently long runs of zeroes and covert to holes. Again, if remote end supports receiving holes. Similar alternative interpretations exist for virStreamRecvOffset(). However, I think that for receiving holes, we want a driver function that returns a hole along with any subsequent data as we don't know where the holes will occur in the incoming stream. Or we could ask for a hole before each chunk of data with the expectation that we might not get one. The receiving client might not need to track data+holes received because it would be able to use SEEK_SET if 'offset' represents offset from start of stream or SEEK_CUR when it represents offset from data+holes received so far. Of course, this assumes that the data sink is capable of seeking at all. If not the client would not want to use virStreamRecvOffset() unless it wants to expand holes on the wire into the corresponding run of zeros itself. We can do that in virStreamRecv() without changing the API. Looking further down the stack: In virNetClientCallDispatchStream(), the switch on client- packets. I think this can be the same path as for VIR_NET_CONTINUE -- that is, let virNetClientStreamQueuePacket() handle it. The latter function would need to recheck the msg.header.status and in the case of '_SKIP, insert a special "hole-iovec" in the incoming queue representing a hole -- perhaps a 0 or ~0 value for iov_base. Inserting an iovec to represent a hole preserves the sequencing of the stream and plays nice with virNetClientStreamRecvPacket(). On the client side in the while() loop that consumes incoming iovecs virNetClientStreamRecvPacket() could check for iovecs representing holes and handle them appropriately. It appears that we can change the virNetClientStreamRecvPacket() API to return holes -- e.g., add a 'size_t *hole' parameter. The function is listed is in libvirt_remote.syms, private to libvirt according to the comments there. On encountering a "hole-iovec" virNetClientStreamRecvPacket() would terminate any partial data buffer and pass the received data up to the caller. Returning the hole should probably be deferred until the next call so that holes precede any accompanying data -- can be used to compute seek offsets to that data -- if we return both together. If virNetClientStreamRecvPacket() has been called from the existing virStreamRecv() -- e.g., NULL 'hole' parameter -- it would expand as much of the hole as will fit into the data buffer as zeroes and adjust the hole-iovec to leave any remainder on the incoming queue. I.e., like it handles data packages that don't fit into the data buffer. We could enhance virStreamRecv() to receive and expand the holes, but virNetClientStreamRecvPacket() already has the logic to handle iovecs larger than the passed in buffer. ... I haven't looked closely at the local stream{Send,Recv} handlers yet other than to note that they are virFDStream functions. Since these are using fds, I'm assuming the can be enhanced or parallel APIs defined to query whether the fd supports seeking at all and, in the case of receiving from a local FDStream whether the fd supports SEEK_{DATA,HOLE}. I suppose the support for data/hole seeks could be hidden behind the API. When such seeks are not supported the function could scan the data stream for sufficiently long runs of zeroes and return them as holes for Recv or expand holes/offsets to runs of zeros for Send. Note that one could also scan the data chunks for sequences of zeroes worth transmitting as holes to keep them off the wire and perhaps end up with a destination file that is "sparser" than the original. It also appears that the remote ends use FDStreams as sinks/sources, so they might be addressed by these or similar changes? Does that all make any sense? Regards, Lee

This flag will create a virStream that will use sparse APIs instead of those meant for regular, dense streams. Therefore we must check for stream (non-)sparseness and deny API meant for one or another type. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- include/libvirt/libvirt-stream.h | 1 + src/internal.h | 20 ++++++++++++++++++++ src/libvirt-stream.c | 6 ++++++ 3 files changed, 27 insertions(+) diff --git a/include/libvirt/libvirt-stream.h b/include/libvirt/libvirt-stream.h index 5a2bde3..f05e703 100644 --- a/include/libvirt/libvirt-stream.h +++ b/include/libvirt/libvirt-stream.h @@ -31,6 +31,7 @@ typedef enum { VIR_STREAM_NONBLOCK = (1 << 0), + VIR_STREAM_SPARSE = (1 << 1), } virStreamFlags; virStreamPtr virStreamNew(virConnectPtr conn, diff --git a/src/internal.h b/src/internal.h index db26fb0..b99d9fd 100644 --- a/src/internal.h +++ b/src/internal.h @@ -493,6 +493,26 @@ goto label; \ } \ } while (0) +# define virCheckStreamSparseReturn(flags, retval) \ + do { \ + if (!((flags) & VIR_STREAM_SPARSE)) { \ + virReportInvalidArg(flags, \ + _("%s function is not supported " \ + "with sparse streams"), \ + __FUNCTION__); \ + return retval; \ + } \ + } while (0) +# define virCheckStreamNonSparseReturn(flags, retval) \ + do { \ + if ((flags) & VIR_STREAM_SPARSE) { \ + virReportInvalidArg(flags, \ + _("%s function is not supported " \ + "on dense streams"), \ + __FUNCTION__); \ + return retval; \ + } \ + } while(0) diff --git a/src/libvirt-stream.c b/src/libvirt-stream.c index 1df188c..5c6b83f 100644 --- a/src/libvirt-stream.c +++ b/src/libvirt-stream.c @@ -171,6 +171,7 @@ virStreamSend(virStreamPtr stream, virCheckStreamReturn(stream, -1); virCheckNonNullArgGoto(data, error); + virCheckStreamSparseReturn(stream->flags, -1); if (stream->driver && stream->driver->streamSend) { @@ -223,6 +224,7 @@ virStreamSendOffset(virStreamPtr stream, virCheckStreamReturn(stream, -1); virCheckNonNullArgGoto(data, error); + virCheckStreamNonSparseReturn(stream->flags, -1); if (stream->driver && stream->driver->streamSendOffset) { @@ -316,6 +318,7 @@ virStreamRecv(virStreamPtr stream, virCheckStreamReturn(stream, -1); virCheckNonNullArgGoto(data, error); + virCheckStreamSparseReturn(stream->flags, -1); if (stream->driver && stream->driver->streamRecv) { @@ -377,6 +380,7 @@ virStreamRecvOffset(virStreamPtr stream, virCheckStreamReturn(stream, -1); virCheckNonNullArgGoto(offset, error); virCheckNonNullArgGoto(data, error); + virCheckStreamNonSparseReturn(stream->flags, -1); if (stream->driver && stream->driver->streamRecvOffset) { @@ -451,6 +455,7 @@ virStreamSendAll(virStreamPtr stream, virCheckStreamReturn(stream, -1); virCheckNonNullArgGoto(handler, cleanup); + virCheckStreamSparseReturn(stream->flags, -1); if (stream->flags & VIR_STREAM_NONBLOCK) { virReportError(VIR_ERR_OPERATION_INVALID, "%s", @@ -544,6 +549,7 @@ virStreamRecvAll(virStreamPtr stream, virCheckStreamReturn(stream, -1); virCheckNonNullArgGoto(handler, cleanup); + virCheckStreamSparseReturn(stream->flags, -1); if (stream->flags & VIR_STREAM_NONBLOCK) { virReportError(VIR_ERR_OPERATION_INVALID, "%s", -- 2.4.10

Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/fdstream.c | 35 +++++++++++++++++++++++++++++++---- 1 file changed, 31 insertions(+), 4 deletions(-) diff --git a/src/fdstream.c b/src/fdstream.c index a85cf9d..403ddf6 100644 --- a/src/fdstream.c +++ b/src/fdstream.c @@ -353,10 +353,14 @@ virFDStreamAbort(virStreamPtr st) return virFDStreamCloseInt(st, true); } -static int virFDStreamWrite(virStreamPtr st, const char *bytes, size_t nbytes) +static int +virFDStreamWriteInternal(virStreamPtr st, + off_t offset, + const char *bytes, + size_t nbytes) { struct virFDStreamData *fdst = st->privateData; - int ret; + int ret = -1; if (nbytes > INT_MAX) { virReportSystemError(ERANGE, "%s", @@ -376,14 +380,20 @@ static int virFDStreamWrite(virStreamPtr st, const char *bytes, size_t nbytes) if (fdst->length == fdst->offset) { virReportSystemError(ENOSPC, "%s", _("cannot write to stream")); - virMutexUnlock(&fdst->lock); - return -1; + goto cleanup; } if ((fdst->length - fdst->offset) < nbytes) nbytes = fdst->length - fdst->offset; } + if (offset != (off_t) -1 && + lseek(fdst->fd, offset, SEEK_SET) < 0) { + virReportSystemError(errno, "%s", + _("unable to set stream position")); + goto cleanup; + } + retry: ret = write(fdst->fd, bytes, nbytes); if (ret < 0) { @@ -395,15 +405,31 @@ static int virFDStreamWrite(virStreamPtr st, const char *bytes, size_t nbytes) ret = -1; virReportSystemError(errno, "%s", _("cannot write to stream")); + goto cleanup; } } else if (fdst->length) { fdst->offset += ret; } + cleanup: virMutexUnlock(&fdst->lock); return ret; } +static int +virFDStreamWrite(virStreamPtr st, const char *bytes, size_t nbytes) +{ + return virFDStreamWriteInternal(st, (off_t) -1, bytes, nbytes); +} + +static int +virFDStreamWriteOffset(virStreamPtr st, + unsigned long long offset, + const char *bytes, + size_t nbytes) +{ + return virFDStreamWriteInternal(st, offset, bytes, nbytes); +} static int virFDStreamRead(virStreamPtr st, char *bytes, size_t nbytes) { @@ -457,6 +483,7 @@ static int virFDStreamRead(virStreamPtr st, char *bytes, size_t nbytes) static virStreamDriver virFDStreamDrv = { .streamSend = virFDStreamWrite, + .streamSendOffset = virFDStreamWriteOffset, .streamRecv = virFDStreamRead, .streamFinish = virFDStreamClose, .streamAbort = virFDStreamAbort, -- 2.4.10

Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/fdstream.c | 105 ++++++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 101 insertions(+), 4 deletions(-) diff --git a/src/fdstream.c b/src/fdstream.c index 403ddf6..20eda07 100644 --- a/src/fdstream.c +++ b/src/fdstream.c @@ -431,10 +431,15 @@ virFDStreamWriteOffset(virStreamPtr st, return virFDStreamWriteInternal(st, offset, bytes, nbytes); } -static int virFDStreamRead(virStreamPtr st, char *bytes, size_t nbytes) +static int +virFDStreamReadInternal(virStreamPtr st, + off_t *offset, + char *bytes, + size_t nbytes) { struct virFDStreamData *fdst = st->privateData; - int ret; + int ret = -1; + off_t cur, data, hole; if (nbytes > INT_MAX) { virReportSystemError(ERANGE, "%s", @@ -450,10 +455,84 @@ static int virFDStreamRead(virStreamPtr st, char *bytes, size_t nbytes) virMutexLock(&fdst->lock); + if (offset) { + /* Firstly, seek to desired position. */ + if ((cur = lseek(fdst->fd, *offset, SEEK_SET)) == (off_t) -1) { + virReportSystemError(errno, "%s", + _("unable to set stream position")); + goto cleanup; + } + + /* Now try to detect if @cur is in hole or in data. There are four + * options: + * 1) data == cur; @cur is in data + * 2) data > cur; @cur is in a hole, next data at @data + * 3) data < 0, errno = ENXIO; either @cur is in trailing hole or @cur + * is beyond EOF. + * 4) data < 0, errno != ENXIO; we learned nothing + */ + + if ((data = lseek(fdst->fd, cur, SEEK_DATA)) == (off_t) -1) { + /* case 3 and 4 */ + if (errno != ENXIO) { + virReportSystemError(errno, "%s", + _("unable to get data position")); + } else { + ret = 0; + } + goto cleanup; + } + + if (data > cur) { + /* case 2 */ + *offset = data; + fdst->offset = *offset; + ret = 0; + goto cleanup; + } else { + /* case 1 */ + } + + /* Now, we must ensure we will not read out of data boundaries. + * Get position of the next hole. Cases are the same as previously. */ + + if ((hole = lseek(fdst->fd, cur, SEEK_HOLE)) == (off_t) -1) { + /* Interesting, so we are in data, but failed to seek to next hole. + * There's an implicit hole at EOF, if no is to be found earlier. + * This is obviously an error so we merge case 3 and 4. */ + virReportSystemError(errno, "%s", + _("unable to get hole position")); + goto cleanup; + } + + if (hole == cur) { + /* Interesting, so the code above ensures @cur is in data, but now + * we found out it's in hole too. This shouldn't happen, but it's + * safer to error out. */ + virReportError(VIR_ERR_INTERNAL_ERROR, + _("stream is in both data (%llu) and hole (%llu)"), + (unsigned long long) data, + (unsigned long long) hole); + goto cleanup; + } + + if (nbytes > (hole - cur)) + nbytes = hole - cur; + + /* We may possibly have moved to a hole. Restore original position. */ + if ((cur = lseek(fdst->fd, *offset, SEEK_SET)) == (off_t) -1) { + virReportSystemError(errno, "%s", + _("unable to set stream position")); + goto cleanup; + } + + fdst->offset = *offset; + } + if (fdst->length) { if (fdst->length == fdst->offset) { - virMutexUnlock(&fdst->lock); - return 0; + ret = 0; + goto cleanup; } if ((fdst->length - fdst->offset) < nbytes) @@ -471,20 +550,38 @@ static int virFDStreamRead(virStreamPtr st, char *bytes, size_t nbytes) ret = -1; virReportSystemError(errno, "%s", _("cannot read from stream")); + goto cleanup; } } else if (fdst->length) { fdst->offset += ret; } + cleanup: virMutexUnlock(&fdst->lock); return ret; } +static int +virFDStreamRead(virStreamPtr st, char *bytes, size_t nbytes) +{ + return virFDStreamReadInternal(st, NULL, bytes, nbytes); +} + +static int +virFDStreamReadOffset(virStreamPtr st, + unsigned long long *offset, + char *bytes, + size_t nbytes) +{ + return virFDStreamReadInternal(st, (off_t *) offset, bytes, nbytes); +} + static virStreamDriver virFDStreamDrv = { .streamSend = virFDStreamWrite, .streamSendOffset = virFDStreamWriteOffset, .streamRecv = virFDStreamRead, + .streamRecvOffset = virFDStreamReadOffset, .streamFinish = virFDStreamClose, .streamAbort = virFDStreamAbort, .streamEventAddCallback = virFDStreamAddCallback, -- 2.4.10

This is no functional change, it just merely prepares environment for next patch. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- tests/fdstreamtest.c | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/tests/fdstreamtest.c b/tests/fdstreamtest.c index 56ba5d9..f9caebf 100644 --- a/tests/fdstreamtest.c +++ b/tests/fdstreamtest.c @@ -40,7 +40,8 @@ VIR_LOG_INIT("tests.fdstreamtest"); #define PATTERN_LEN 256 -static int testFDStreamReadCommon(const char *scratchdir, bool blocking) +static int +testFDStreamReadCommon(const char *scratchdir, const unsigned int flags) { int fd = -1; char *file = NULL; @@ -50,10 +51,9 @@ static int testFDStreamReadCommon(const char *scratchdir, bool blocking) virStreamPtr st = NULL; size_t i; virConnectPtr conn = NULL; - int flags = 0; + bool blocking = !(flags & VIR_STREAM_NONBLOCK); - if (!blocking) - flags |= VIR_STREAM_NONBLOCK; + virCheckFlags(VIR_STREAM_NONBLOCK, -1); if (!(conn = virConnectOpen("test:///default"))) goto cleanup; @@ -164,15 +164,15 @@ static int testFDStreamReadCommon(const char *scratchdir, bool blocking) static int testFDStreamReadBlock(const void *data) { - return testFDStreamReadCommon(data, true); + return testFDStreamReadCommon(data, 0); } static int testFDStreamReadNonblock(const void *data) { - return testFDStreamReadCommon(data, false); + return testFDStreamReadCommon(data, VIR_STREAM_NONBLOCK); } -static int testFDStreamWriteCommon(const char *scratchdir, bool blocking) +static int testFDStreamWriteCommon(const char *scratchdir, const unsigned int flags) { int fd = -1; char *file = NULL; @@ -182,10 +182,9 @@ static int testFDStreamWriteCommon(const char *scratchdir, bool blocking) virStreamPtr st = NULL; size_t i; virConnectPtr conn = NULL; - int flags = 0; + bool blocking = !(flags & VIR_STREAM_NONBLOCK); - if (!blocking) - flags |= VIR_STREAM_NONBLOCK; + virCheckFlags(VIR_STREAM_NONBLOCK, -1); if (!(conn = virConnectOpen("test:///default"))) goto cleanup; -- 2.4.10

Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- tests/fdstreamtest.c | 62 ++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 51 insertions(+), 11 deletions(-) diff --git a/tests/fdstreamtest.c b/tests/fdstreamtest.c index f9caebf..5036cec 100644 --- a/tests/fdstreamtest.c +++ b/tests/fdstreamtest.c @@ -51,9 +51,11 @@ testFDStreamReadCommon(const char *scratchdir, const unsigned int flags) virStreamPtr st = NULL; size_t i; virConnectPtr conn = NULL; + unsigned long long streamOffset; bool blocking = !(flags & VIR_STREAM_NONBLOCK); + bool sparse = flags & VIR_STREAM_SPARSE; - virCheckFlags(VIR_STREAM_NONBLOCK, -1); + virCheckFlags(VIR_STREAM_NONBLOCK | VIR_STREAM_SPARSE, -1); if (!(conn = virConnectOpen("test:///default"))) goto cleanup; @@ -72,6 +74,10 @@ testFDStreamReadCommon(const char *scratchdir, const unsigned int flags) goto cleanup; for (i = 0; i < 10; i++) { + if (i && sparse && + lseek(fd, 8192, SEEK_CUR) < 0) + goto cleanup; + if (safewrite(fd, pattern, PATTERN_LEN) != PATTERN_LEN) goto cleanup; } @@ -82,17 +88,23 @@ testFDStreamReadCommon(const char *scratchdir, const unsigned int flags) if (!(st = virStreamNew(conn, flags))) goto cleanup; - /* Start reading 1/2 way through first pattern - * and end 1/2 way through last pattern + /* Start reading 1/2 way through first pattern and end 1/2 + * way through last pattern. In case of sparse file between + * each data blocks is one hole. */ + streamOffset = PATTERN_LEN / 2; if (virFDStreamOpenFile(st, file, - PATTERN_LEN / 2, PATTERN_LEN * 9, + streamOffset, + sparse ? PATTERN_LEN * 18 : PATTERN_LEN * 9, O_RDONLY) < 0) goto cleanup; for (i = 0; i < 10; i++) { size_t offset = 0; size_t want; + bool data = true; + unsigned long long oldStreamOffset = streamOffset; + if (i == 0) want = PATTERN_LEN / 2; else @@ -101,7 +113,12 @@ testFDStreamReadCommon(const char *scratchdir, const unsigned int flags) while (want > 0) { int got; reread: - got = st->driver->streamRecv(st, buf + offset, want); + if (sparse) + got = st->driver->streamRecvOffset(st, &oldStreamOffset, + buf + offset, want); + else + got = st->driver->streamRecv(st, buf + offset, want); + if (got < 0) { if (got == -2 && !blocking) { usleep(20 * 1000); @@ -112,16 +129,33 @@ testFDStreamReadCommon(const char *scratchdir, const unsigned int flags) goto cleanup; } if (got == 0) { - /* Expect EOF 1/2 through last pattern */ - if (i == 9 && want == (PATTERN_LEN / 2)) - break; - virFilePrintf(stderr, "Unexpected EOF block %zu want %zu\n", - i, want); - goto cleanup; + if (sparse && + oldStreamOffset != streamOffset) { + /* oldStreamOffset points to data position. Re-read. */ + if (data) { + virFilePrintf(stderr, "Unexpected hole at offset %llu\n", + streamOffset); + } + data = !data; + streamOffset = oldStreamOffset; + goto reread; + } else { + /* Expect EOF 1/2 through last pattern */ + if (i == 9 && want == (PATTERN_LEN / 2)) + break; + virFilePrintf(stderr, "Unexpected EOF block %zu want %zu\n", + i, want); + goto cleanup; + } } + if (i == 0) + streamOffset = 8192 + PATTERN_LEN; + else + streamOffset += 8192 + PATTERN_LEN; offset += got; want -= got; } + if (i == 0) { if (memcmp(buf, pattern + (PATTERN_LEN / 2), PATTERN_LEN / 2) != 0) { virFilePrintf(stderr, "Mismatched pattern data iteration %zu\n", i); @@ -170,6 +204,10 @@ static int testFDStreamReadNonblock(const void *data) { return testFDStreamReadCommon(data, VIR_STREAM_NONBLOCK); } +static int testFDStreamSparseReadBlock(const void *data) +{ + return testFDStreamReadCommon(data, VIR_STREAM_SPARSE); +} static int testFDStreamWriteCommon(const char *scratchdir, const unsigned int flags) @@ -334,6 +372,8 @@ mymain(void) ret = -1; if (virtTestRun("Stream write non-blocking ", testFDStreamWriteNonblock, scratchdir) < 0) ret = -1; + if (virtTestRun("Sparse stream read blocking ", testFDStreamSparseReadBlock, scratchdir) < 0) + ret = -1; if (getenv("LIBVIRT_SKIP_CLEANUP") == NULL) virFileDeleteTree(scratchdir); -- 2.4.10

Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- tests/fdstreamtest.c | 39 +++++++++++++++++++++++++++++++++++---- 1 file changed, 35 insertions(+), 4 deletions(-) diff --git a/tests/fdstreamtest.c b/tests/fdstreamtest.c index 5036cec..4fc88a7 100644 --- a/tests/fdstreamtest.c +++ b/tests/fdstreamtest.c @@ -220,9 +220,12 @@ static int testFDStreamWriteCommon(const char *scratchdir, const unsigned int fl virStreamPtr st = NULL; size_t i; virConnectPtr conn = NULL; + unsigned long long streamOffset; + unsigned long long streamLength; bool blocking = !(flags & VIR_STREAM_NONBLOCK); + bool sparse = flags & VIR_STREAM_SPARSE; - virCheckFlags(VIR_STREAM_NONBLOCK, -1); + virCheckFlags(VIR_STREAM_NONBLOCK | VIR_STREAM_SPARSE, -1); if (!(conn = virConnectOpen("test:///default"))) goto cleanup; @@ -243,8 +246,11 @@ static int testFDStreamWriteCommon(const char *scratchdir, const unsigned int fl /* Start writing 1/2 way through first pattern * and end 1/2 way through last pattern */ + streamOffset = PATTERN_LEN / 2; + streamLength = sparse ? PATTERN_LEN * 8192 : PATTERN_LEN * 9; if (virFDStreamCreateFile(st, file, - PATTERN_LEN / 2, PATTERN_LEN * 9, + streamOffset, + streamLength, O_WRONLY, 0600) < 0) goto cleanup; @@ -258,8 +264,11 @@ static int testFDStreamWriteCommon(const char *scratchdir, const unsigned int fl while (want > 0) { int got; - rewrite: - got = st->driver->streamSend(st, pattern + offset, want); + rewrite: + if (sparse) + got = st->driver->streamSendOffset(st, streamOffset, pattern + offset, want); + else + got = st->driver->streamSend(st, pattern + offset, want); if (got < 0) { if (got == -2 && !blocking) { usleep(20 * 1000); @@ -272,6 +281,10 @@ static int testFDStreamWriteCommon(const char *scratchdir, const unsigned int fl virGetLastErrorMessage()); goto cleanup; } + if (i == 0) + streamOffset = 8192 + PATTERN_LEN; + else + streamOffset += 8192 + PATTERN_LEN; offset += got; want -= got; } @@ -286,6 +299,7 @@ static int testFDStreamWriteCommon(const char *scratchdir, const unsigned int fl if ((fd = open(file, O_RDONLY)) < 0) goto cleanup; + streamOffset = 0; for (i = 0; i < 10; i++) { size_t want; if (i == 9) @@ -293,6 +307,12 @@ static int testFDStreamWriteCommon(const char *scratchdir, const unsigned int fl else want = PATTERN_LEN; + if (sparse && + lseek(fd, streamOffset, SEEK_SET) < 0) { + virFilePrintf(stderr, "unable to seek"); + goto cleanup; + } + if (saferead(fd, buf, want) != want) { virFilePrintf(stderr, "Short read from data\n"); goto cleanup; @@ -321,6 +341,11 @@ static int testFDStreamWriteCommon(const char *scratchdir, const unsigned int fl goto cleanup; } } + + if (i == 0) + streamOffset = 8192 + PATTERN_LEN; + else + streamOffset += 8192 + PATTERN_LEN; } if (VIR_CLOSE(fd) < 0) @@ -350,6 +375,10 @@ static int testFDStreamWriteNonblock(const void *data) { return testFDStreamWriteCommon(data, false); } +static int testFDStreamSparseWriteBlock(const void *data) +{ + return testFDStreamWriteCommon(data, VIR_STREAM_SPARSE); +} #define SCRATCHDIRTEMPLATE abs_builddir "/fakesysfsdir-XXXXXX" @@ -374,6 +403,8 @@ mymain(void) ret = -1; if (virtTestRun("Sparse stream read blocking ", testFDStreamSparseReadBlock, scratchdir) < 0) ret = -1; + if (virtTestRun("Sparse stream write blocking ", testFDStreamSparseWriteBlock, scratchdir) < 0) + ret = -1; if (getenv("LIBVIRT_SKIP_CLEANUP") == NULL) virFileDeleteTree(scratchdir); -- 2.4.10

On Fri, Jan 29, 2016 at 02:26:51PM +0100, Michal Privoznik wrote:
** NOT TO BE MERGED UPSTREAM **
This is merely an RFC.
What's the problem? We have APIs for transferring disk images from/to host. Problem is, disk images can be sparse files. Our code is, however, unaware of that fact so if for instance the disk is one big hole of 8GB all those bytes have to: a) be read b) come through our event loop. This is obviously very inefficient way.
How to deal with it? The way I propose (and this is actually what I like you to comment on) is to invent set of new API. We need to make read from and write to a stream sparseness aware. The new APIs are as follows:
int virStreamSendOffset(virStreamPtr stream, unsigned long long offset, const char *data, size_t nbytes);
int virStreamRecvOffset(virStreamPtr stream, unsigned long long *offset, char *data, size_t nbytes);
The SendOffset() is fairly simple. It is given an offset to write @data from so it basically lseek() to @offset and write() data. The RecvOffset() has slightly complicated design - it has to be aware of the fact that @offset it is required to read from fall into a hole. If that's the case it sets @offset to new location where data starts.
Are there other ways possible? Sure! If you have any specific in mind I am happy to discuss it. For instance one idea I've heard (from Martin) was instead of SendOffset() and RecvOffset() we may just invent our variant of lseek().
Also StreamRecv (and send) that would return the number of bytes read (skipped) in a parameter and the return value itself would be an enum saying whether that read ended up in a hole or end of file or it was success. One more idea was to have a a way of registering a callback that would be called with offsets and buffers and the holes would be skipped that way.

On Fri, Jan 29, 2016 at 02:26:51PM +0100, Michal Privoznik wrote:
** NOT TO BE MERGED UPSTREAM **
This is merely an RFC.
What's the problem? We have APIs for transferring disk images from/to host. Problem is, disk images can be sparse files. Our code is, however, unaware of that fact so if for instance the disk is one big hole of 8GB all those bytes have to: a) be read b) come through our event loop. This is obviously very inefficient way.
How to deal with it? The way I propose (and this is actually what I like you to comment on) is to invent set of new API. We need to make read from and write to a stream sparseness aware. The new APIs are as follows:
int virStreamSendOffset(virStreamPtr stream, unsigned long long offset, const char *data, size_t nbytes);
int virStreamRecvOffset(virStreamPtr stream, unsigned long long *offset, char *data, size_t nbytes);
The SendOffset() is fairly simple. It is given an offset to write @data from so it basically lseek() to @offset and write() data. The RecvOffset() has slightly complicated design - it has to be aware of the fact that @offset it is required to read from fall into a hole. If that's the case it sets @offset to new location where data starts.
Are there other ways possible? Sure! If you have any specific in mind I am happy to discuss it. For instance one idea I've heard (from Martin) was instead of SendOffset() and RecvOffset() we may just invent our variant of lseek().
What's left to be done? Basically, I still don't have RPC implementation. But before I dive into that I thought of sharing my approach with you - because it may turn out that a different approach is going to be needed and thus my work would render useless.
It would be intesting to see the virsh vol-upload/download client code updated to use the new APIs to deal with holes, so we can see how this API design looks from the POV of apps using libvirt. Regards, Daniel -- |: http://berrange.com -o- http://www.flickr.com/photos/dberrange/ :| |: http://libvirt.org -o- http://virt-manager.org :| |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :| |: http://entangle-photo.org -o- http://live.gnome.org/gtk-vnc :|

On Mon, 2016-02-01 at 14:50 +0000, Daniel P. Berrange wrote:
On Fri, Jan 29, 2016 at 02:26:51PM +0100, Michal Privoznik wrote:
** NOT TO BE MERGED UPSTREAM **
This is merely an RFC.
What's the problem? We have APIs for transferring disk images from/to host. Problem is, disk images can be sparse files. Our code is, however, unaware of that fact so if for instance the disk is one big hole of 8GB all those bytes have to: a) be read b) come through our event loop. This is obviously very inefficient way.
How to deal with it? The way I propose (and this is actually what I like you to comment on) is to invent set of new API. We need to make read from and write to a stream sparseness aware. The new APIs are as follows:
int virStreamSendOffset(virStreamPtr stream, unsigned long long offset, const char *data, size_t nbytes);
int virStreamRecvOffset(virStreamPtr stream, unsigned long long *offset, char *data, size_t nbytes);
The SendOffset() is fairly simple. It is given an offset to write @data from so it basically lseek() to @offset and write() data. The RecvOffset() has slightly complicated design - it has to be aware of the fact that @offset it is required to read from fall into a hole. If that's the case it sets @offset to new location where data starts.
Are there other ways possible? Sure! If you have any specific in mind I am happy to discuss it. For instance one idea I've heard (from Martin) was instead of SendOffset() and RecvOffset() we may just invent our variant of lseek().
What's left to be done? Basically, I still don't have RPC implementation. But before I dive into that I thought of sharing my approach with you - because it may turn out that a different approach is going to be needed and thus my work would render useless.
It would be intesting to see the virsh vol-upload/download client code updated to use the new APIs to deal with holes, so we can see how this API design looks from the POV of apps using libvirt.
Regards, Daniel
Still an RFC. Below I've included example of updated virsh vol-download client code to handle sparse files/streams as mentioned in earlier mail. This example assumes a different take on the stream APIs than discussed previously. For this exercise, let's assume that we don't change the function profiles of the existing virStream APIs; rather their behavior will be slightly different for a "sparse stream". A client will need a way to tell libvirt that it wants/expects the sparse behavior. To that purpose let's define a couple of new API's: int virStreamSetSparse(virStreamPtr stream) requests that sparse behavior be enabled for this stream. Returns 0 on success; -1 on error. Possible errors: maybe don't support enabling sparseness after data already sent/received on stream? Note: a client could request sparseness via new flag to virStreamNew() as well or instead. Will also want a: int virStreamIsSparse(virStreamPtr stream) for use inside stream driver, unless open coding a flags test is preferred? Existing Streams [and FDStreams] APIs that behave differently for sparse streams: virStreamSend(virStreamPtr stream, const char *data, size_t nbytes) will recognize a NULL @data as indicating that @nbytes contains the size of a hole in the data source rather than an error. This will be true also for stream driver streamSend() handlers: remoteStreamSend() and virFDStreamWrite() which take the same arguments as virStreamSend(). Internally when called from remoteStreamSend() with status=VIR_NET_SKIP virNetClientStreamSendPacket() will encode the hole length nbytes as the payload of a VIR_STREAM_SKIP packet as Daniel suggested earlier. virStreamRecv(virStreamPtr stream, char *data, size_t nbytes), upon encountering a hole in the incoming data stream will return the negative value of the hole size. Similarly for the stream driver streamRecv handlers remoteStreamRecv() and virFDStreamRead(). All of these functions current return bytes received [>0], EOF [== 0], or -1 or -2 to indicate error or no data on a non-blocking stream. The minimum hole size will be defined [much?] greater than 2, so a negative return value < (0 - VIR_STREAM_MIN_HOLE_SIZE) can indicate a hole. A macro to test the return value for a hole would be convenient. Prior mail discussed how a VIR_STREAM_SKIP could be propagated up the stack as an iovec with NULL iov_base to represent a hole. virNetClientStreamRecvPacket() would the return the hole as the large negative value in this model. One limitation with this approach is that the return values are declared as 'int' which will limit the size of the hole we can receive. This will require that multi-GB holes be broken up into ~2GB chunks. If this [or the use of large negative returns to represent holes] sounds too onerous, one can adopt an approach with new sparse stream APIs with an explicit @holesize out parameter for receive. I don't think this will affect the lower level implementation nor virsh's vol{Up,Down}load commands all that much. Next, virsh's cmdVol{Upload,Download)() functions use virStream{Send,Recv}All(), respectively. These functions take a stream, a data source or sink handler function and an opaque object representing the actual source or sink. virsh need only request sparse stream behavior and pass a "sparse stream aware" source or sink handler and object to use the existing APIs. As a convenience, libvirt can provide support for sparse aware source/sink handlers layered on FDStreams as follows: typedef void *virStreamSparseFilePtr; virStreamSparseFilePtr virStreamSparseFileOpen(const char *path, unsigned long long offset, unsigned long long length, int oflags) This is a public wrapper around the libvirt private virFDStreamOpenFile() et al that creates an FDStream associated with the specified file/path for use with sparse aware source/sink handlers. Open with O_RDONLY for sources, O_WRONLY+O_TRUNC... for sinks. I don't know that offset and length are needed for source and sink, but I kept them above. We'll also need a: int virStreamSparseFileClose(virStreamSparseFilePtr sparseFile) to clean things up. Libvirt can also provide [needs to while FDStreams are libvirt private] sparse aware source and sink handlers: int virStreamSparseSource(virStreamPtr st [ATTRIBUTE_UNUSED ? TBD], char *bytes, size_t nbytes, void *opaque) This is a public wrapper around virFDStreamRead() that knows that the @opaque parameter is a sparse FDStream created by virStreamSparseFileOpen(). It will pass along @bytes and @nbytes to the sparse FDStream which may return holes as large negative values. int virStreamSparseSink(virStreamPtr st [ATTRIBUTE_UNUSED ? TBD], const char *bytes, size_t nbytes, void *opaque) A public wrapper around virFDStreamWrite() that passes along its arguments to the FDStream indicated by @opaque. One more thing before looking at the virsh cmdVol{Up,Down}load() code: we may want to maintain the current, non-sparse behavior as default and require a '--sparse' option to enable sparse behavior. The following code segments represent the sparse path after parameter parsing. cmdVolDownload() sparse file/stream path: ! virStreamSparseFilePtr sparseFile; ! if ((sparseFile = virStreamSparseFileOpen(file, ! O_WRONLY|O_CREAT|O_EXCL, ! 0666)) < 0) { if (errno != EEXIST || ! (sparseFile = virStreamSparseFileOpen(file, ! O_WRONLY|O_TRUNC, ! 0666)) < 0) { vshError(ctl, _("cannot create %s"), file); goto cleanup; } } else { created = true; } if (!(st = virStreamNew(priv->conn, 0))) { vshError(ctl, _("cannot create a new stream")); goto cleanup; } if (virStorageVolDownload(vol, st, offset, length, 0) < 0) { vshError(ctl, _("cannot download from volume %s"), name); goto cleanup; } ! if (virStreamRecvAll(st, virStreamSparseSink, sparseFile) < 0) { vshError(ctl, _("cannot receive data from volume %s"), name); goto cleanup; } ! if (virStreamSparseFileClose(sparseFile) < 0) { vshError(ctl, _("cannot close file %s"), file); virStreamAbort(st); goto cleanup; } if (virStreamFinish(st) < 0) { vshError(ctl, _("cannot close volume %s"), name); goto cleanup; } ret = true; cleanup: ! // cleanup will be different for sparseFile upload, too. Changes to cmdVolUpload() will mirror those above with a data source opened O_RDONLY. I hope this is sufficient detail to explain this model -- reusing the existing with a couple of additions for sparse streams. I've started generating patches, but would like to settle on the APIs before going to far at that level. Changes to the stream driver functions will be fairly straightforward if we stick to sending and receiving holes explicitly, whatever the API. Handling hole detection and expansion for sources and sinks that don't support seeking() and when the remote end doesn't support the new protocol may contain some interesting issues. I'll be out of the office for a couple of weeks and so won't be able to work on this much during that time. Perhaps sparse stream support will be a done deal by the time I return. Regards, Lee
participants (4)
-
Daniel P. Berrange
-
Lee Schermerhorn
-
Martin Kletzander
-
Michal Privoznik