[libvirt] [PATCH 0/9] Slightly rework our streams

This is not the big patch set that enables sparse streams. Not just yet. I'm merely sending first few patches that prepare the environment for that. These can, however, go in independent of sparse streams. Michal Privoznik (9): daemonClientStream: Use unsigned int to store stream @serial daemon stream: Prefer bool over unsigned int var:1 daemon stream: Convert @tx in daemonClientStream to bool daemon stream: Remove useless empty lines from header file virNetClientCallDispatchStream: Update comment daemonStreamHandleRead: Rework to follow our coding pattern Revert "rpc: Fix slow volume download (virsh vol-download)" virnetclientstream: Process stream messages later virStream{Recv,Send}All: Increase client buffer daemon/stream.c | 116 +++++++++++++++---------------- daemon/stream.h | 2 - src/libvirt-stream.c | 5 +- src/rpc/virnetclient.c | 6 +- src/rpc/virnetclientstream.c | 158 +++++++++++++++--------------------------- src/rpc/virnetserverprogram.c | 12 ++-- src/rpc/virnetserverprogram.h | 4 +- 7 files changed, 129 insertions(+), 174 deletions(-) -- 2.7.3

The stream serial number is the serial number of the RPC call that initiated a data transfer. And as such can never be negative. Moreover, when looking up internal state for a stream, the serial numbers are compared. But hey, the serial number in message header is unsigned too! Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- daemon/stream.c | 22 +++++++++++----------- src/rpc/virnetserverprogram.c | 12 ++++++------ src/rpc/virnetserverprogram.h | 4 ++-- 3 files changed, 19 insertions(+), 19 deletions(-) diff --git a/daemon/stream.c b/daemon/stream.c index dfe0bf9..ce1e054 100644 --- a/daemon/stream.c +++ b/daemon/stream.c @@ -42,7 +42,7 @@ struct daemonClientStream { virStreamPtr st; int procedure; - int serial; + unsigned int serial; unsigned int recvEOF : 1; unsigned int closed : 1; @@ -92,11 +92,11 @@ daemonStreamUpdateEvents(daemonClientStream *stream) * fast stream, but slow client */ static void -daemonStreamMessageFinished(virNetMessagePtr msg ATTRIBUTE_UNUSED, +daemonStreamMessageFinished(virNetMessagePtr msg, void *opaque) { daemonClientStream *stream = opaque; - VIR_DEBUG("stream=%p proc=%d serial=%d", + VIR_DEBUG("stream=%p proc=%d serial=%u", stream, msg->header.proc, msg->header.serial); stream->tx = 1; @@ -293,7 +293,7 @@ daemonStreamFilter(virNetServerClientPtr client ATTRIBUTE_UNUSED, msg->header.serial != stream->serial) goto cleanup; - VIR_DEBUG("Incoming client=%p, rx=%p, serial=%d, proc=%d, status=%d", + VIR_DEBUG("Incoming client=%p, rx=%p, serial=%u, proc=%d, status=%d", client, stream->rx, msg->header.proc, msg->header.serial, msg->header.status); @@ -324,7 +324,7 @@ daemonCreateClientStream(virNetServerClientPtr client, daemonClientStream *stream; daemonClientPrivatePtr priv = virNetServerClientGetPrivateData(client); - VIR_DEBUG("client=%p, proc=%d, serial=%d, st=%p", + VIR_DEBUG("client=%p, proc=%d, serial=%u, st=%p", client, header->proc, header->serial, st); if (VIR_ALLOC(stream) < 0) @@ -360,7 +360,7 @@ int daemonFreeClientStream(virNetServerClientPtr client, if (stream->refs) return 0; - VIR_DEBUG("client=%p, proc=%d, serial=%d", + VIR_DEBUG("client=%p, proc=%d, serial=%u", client, stream->procedure, stream->serial); virObjectUnref(stream->prog); @@ -398,7 +398,7 @@ int daemonAddClientStream(virNetServerClientPtr client, daemonClientStream *stream, bool transmit) { - VIR_DEBUG("client=%p, proc=%d, serial=%d, st=%p, transmit=%d", + VIR_DEBUG("client=%p, proc=%d, serial=%u, st=%p, transmit=%d", client, stream->procedure, stream->serial, stream->st, transmit); daemonClientPrivatePtr priv = virNetServerClientGetPrivateData(client); @@ -448,7 +448,7 @@ int daemonRemoveClientStream(virNetServerClientPtr client, daemonClientStream *stream) { - VIR_DEBUG("client=%p, proc=%d, serial=%d, st=%p", + VIR_DEBUG("client=%p, proc=%d, serial=%u, st=%p", client, stream->procedure, stream->serial, stream->st); daemonClientPrivatePtr priv = virNetServerClientGetPrivateData(client); daemonClientStream *curr = priv->streams; @@ -515,7 +515,7 @@ daemonStreamHandleWriteData(virNetServerClientPtr client, { int ret; - VIR_DEBUG("client=%p, stream=%p, proc=%d, serial=%d, len=%zu, offset=%zu", + VIR_DEBUG("client=%p, stream=%p, proc=%d, serial=%u, len=%zu, offset=%zu", client, stream, msg->header.proc, msg->header.serial, msg->bufferLength, msg->bufferOffset); @@ -565,7 +565,7 @@ daemonStreamHandleFinish(virNetServerClientPtr client, { int ret; - VIR_DEBUG("client=%p, stream=%p, proc=%d, serial=%d", + VIR_DEBUG("client=%p, stream=%p, proc=%d, serial=%u", client, stream, msg->header.proc, msg->header.serial); stream->closed = 1; @@ -602,7 +602,7 @@ daemonStreamHandleAbort(virNetServerClientPtr client, daemonClientStream *stream, virNetMessagePtr msg) { - VIR_DEBUG("client=%p, stream=%p, proc=%d, serial=%d", + VIR_DEBUG("client=%p, stream=%p, proc=%d, serial=%u", client, stream, msg->header.proc, msg->header.serial); virNetMessageError rerr; diff --git a/src/rpc/virnetserverprogram.c b/src/rpc/virnetserverprogram.c index a4d9295..311e344 100644 --- a/src/rpc/virnetserverprogram.c +++ b/src/rpc/virnetserverprogram.c @@ -147,9 +147,9 @@ virNetServerProgramSendError(unsigned program, virNetMessageErrorPtr rerr, int procedure, int type, - int serial) + unsigned int serial) { - VIR_DEBUG("prog=%d ver=%d proc=%d type=%d serial=%d msg=%p rerr=%p", + VIR_DEBUG("prog=%d ver=%d proc=%d type=%d serial=%u msg=%p rerr=%p", program, version, procedure, type, serial, msg, rerr); virNetMessageSaveError(rerr); @@ -217,7 +217,7 @@ int virNetServerProgramSendStreamError(virNetServerProgramPtr prog, virNetMessagePtr msg, virNetMessageErrorPtr rerr, int procedure, - int serial) + unsigned int serial) { return virNetServerProgramSendError(prog->program, prog->version, @@ -282,7 +282,7 @@ int virNetServerProgramDispatch(virNetServerProgramPtr prog, memset(&rerr, 0, sizeof(rerr)); - VIR_DEBUG("prog=%d ver=%d type=%d status=%d serial=%d proc=%d", + VIR_DEBUG("prog=%d ver=%d type=%d status=%d serial=%u proc=%d", msg->header.prog, msg->header.vers, msg->header.type, msg->header.status, msg->header.serial, msg->header.proc); @@ -312,7 +312,7 @@ int virNetServerProgramDispatch(virNetServerProgramPtr prog, * stream packets after we closed down a stream. Just drop & ignore * these. */ - VIR_INFO("Ignoring unexpected stream data serial=%d proc=%d status=%d", + VIR_INFO("Ignoring unexpected stream data serial=%u proc=%d status=%d", msg->header.serial, msg->header.proc, msg->header.status); /* Send a dummy reply to free up 'msg' & unblock client rx */ virNetMessageClear(msg); @@ -510,7 +510,7 @@ int virNetServerProgramSendStreamData(virNetServerProgramPtr prog, virNetServerClientPtr client, virNetMessagePtr msg, int procedure, - int serial, + unsigned int serial, const char *data, size_t len) { diff --git a/src/rpc/virnetserverprogram.h b/src/rpc/virnetserverprogram.h index 0ccc372..531fca0 100644 --- a/src/rpc/virnetserverprogram.h +++ b/src/rpc/virnetserverprogram.h @@ -90,7 +90,7 @@ int virNetServerProgramSendStreamError(virNetServerProgramPtr prog, virNetMessagePtr msg, virNetMessageErrorPtr rerr, int procedure, - int serial); + unsigned int serial); int virNetServerProgramUnknownError(virNetServerClientPtr client, virNetMessagePtr msg, @@ -100,7 +100,7 @@ int virNetServerProgramSendStreamData(virNetServerProgramPtr prog, virNetServerClientPtr client, virNetMessagePtr msg, int procedure, - int serial, + unsigned int serial, const char *data, size_t len); -- 2.7.3

There is no need for doing that since we have a bool type. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- daemon/stream.c | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/daemon/stream.c b/daemon/stream.c index ce1e054..2a9e6e4 100644 --- a/daemon/stream.c +++ b/daemon/stream.c @@ -44,8 +44,8 @@ struct daemonClientStream { int procedure; unsigned int serial; - unsigned int recvEOF : 1; - unsigned int closed : 1; + bool recvEOF; + bool closed; int filterID; @@ -198,7 +198,7 @@ daemonStreamEvent(virStreamPtr st, int events, void *opaque) virNetMessagePtr msg; events &= ~(VIR_STREAM_EVENT_HANGUP); stream->tx = 0; - stream->recvEOF = 1; + stream->recvEOF = true; if (!(msg = virNetMessageNew(false))) { daemonRemoveClientStream(client, stream); virNetServerClientClose(client); @@ -227,7 +227,7 @@ daemonStreamEvent(virStreamPtr st, int events, void *opaque) virNetMessageError rerr; memset(&rerr, 0, sizeof(rerr)); - stream->closed = 1; + stream->closed = true; virStreamEventRemoveCallback(stream->st); virStreamAbort(stream->st); if (events & VIR_STREAM_EVENT_HANGUP) @@ -538,7 +538,7 @@ daemonStreamHandleWriteData(virNetServerClientPtr client, memset(&rerr, 0, sizeof(rerr)); VIR_INFO("Stream send failed"); - stream->closed = 1; + stream->closed = true; return virNetServerProgramSendReplyError(stream->prog, client, msg, @@ -568,7 +568,7 @@ daemonStreamHandleFinish(virNetServerClientPtr client, VIR_DEBUG("client=%p, stream=%p, proc=%d, serial=%u", client, stream, msg->header.proc, msg->header.serial); - stream->closed = 1; + stream->closed = true; virStreamEventRemoveCallback(stream->st); ret = virStreamFinish(stream->st); @@ -608,7 +608,7 @@ daemonStreamHandleAbort(virNetServerClientPtr client, memset(&rerr, 0, sizeof(rerr)); - stream->closed = 1; + stream->closed = true; virStreamEventRemoveCallback(stream->st); virStreamAbort(stream->st); @@ -755,7 +755,7 @@ daemonStreamHandleRead(virNetServerClientPtr client, virNetMessagePtr msg; stream->tx = 0; if (ret == 0) - stream->recvEOF = 1; + stream->recvEOF = true; if (!(msg = virNetMessageNew(false))) ret = -1; -- 2.7.3

This structure item is used as pure boolean. There's no need to hold whole integer for it. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- daemon/stream.c | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/daemon/stream.c b/daemon/stream.c index 2a9e6e4..a2a370c 100644 --- a/daemon/stream.c +++ b/daemon/stream.c @@ -50,7 +50,7 @@ struct daemonClientStream { int filterID; virNetMessagePtr rx; - int tx; + bool tx; daemonClientStreamPtr next; }; @@ -99,7 +99,7 @@ daemonStreamMessageFinished(virNetMessagePtr msg, VIR_DEBUG("stream=%p proc=%d serial=%u", stream, msg->header.proc, msg->header.serial); - stream->tx = 1; + stream->tx = true; daemonStreamUpdateEvents(stream); daemonFreeClientStream(NULL, stream); @@ -197,7 +197,7 @@ daemonStreamEvent(virStreamPtr st, int events, void *opaque) (events & VIR_STREAM_EVENT_HANGUP)) { virNetMessagePtr msg; events &= ~(VIR_STREAM_EVENT_HANGUP); - stream->tx = 0; + stream->tx = false; stream->recvEOF = true; if (!(msg = virNetMessageNew(false))) { daemonRemoveClientStream(client, stream); @@ -422,7 +422,7 @@ int daemonAddClientStream(virNetServerClientPtr client, } if (transmit) - stream->tx = 1; + stream->tx = true; virMutexLock(&priv->lock); stream->next = priv->streams; @@ -753,7 +753,7 @@ daemonStreamHandleRead(virNetServerClientPtr client, stream->serial); } else { virNetMessagePtr msg; - stream->tx = 0; + stream->tx = false; if (ret == 0) stream->recvEOF = true; if (!(msg = virNetMessageNew(false))) -- 2.7.3

Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- daemon/stream.h | 2 -- 1 file changed, 2 deletions(-) diff --git a/daemon/stream.h b/daemon/stream.h index 2be40c7..cf76e71 100644 --- a/daemon/stream.h +++ b/daemon/stream.h @@ -26,8 +26,6 @@ # include "libvirtd.h" - - daemonClientStream * daemonCreateClientStream(virNetServerClientPtr client, virStreamPtr st, -- 2.7.3

After 434de30da545aea137 the status values are prefixed VIR_NET_ rather than REMOTE_. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/rpc/virnetclient.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c index c68da6d..781e74c 100644 --- a/src/rpc/virnetclient.c +++ b/src/rpc/virnetclient.c @@ -1063,9 +1063,9 @@ static int virNetClientCallDispatchStream(virNetClientPtr client) VIR_DEBUG("Found call %p", thecall); /* Status is either - * - REMOTE_OK - no payload for streams - * - REMOTE_ERROR - followed by a remote_error struct - * - REMOTE_CONTINUE - followed by a raw data packet + * - VIR_NET_OK - no payload for streams + * - VIR_NET_ERROR - followed by a remote_error struct + * - VIR_NET_CONTINUE - followed by a raw data packet */ switch (client->msg.header.status) { case VIR_NET_CONTINUE: { -- 2.7.3

On 04/15/2016 09:51 AM, Michal Privoznik wrote:
After 434de30da545aea137 the status values are prefixed VIR_NET_ rather than REMOTE_.
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/rpc/virnetclient.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-)
Should also update comments in virNetServerProgramSendStreamData which use the REMOTE_ prefix (found via cscope). John
diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c index c68da6d..781e74c 100644 --- a/src/rpc/virnetclient.c +++ b/src/rpc/virnetclient.c @@ -1063,9 +1063,9 @@ static int virNetClientCallDispatchStream(virNetClientPtr client) VIR_DEBUG("Found call %p", thecall);
/* Status is either - * - REMOTE_OK - no payload for streams - * - REMOTE_ERROR - followed by a remote_error struct - * - REMOTE_CONTINUE - followed by a raw data packet + * - VIR_NET_OK - no payload for streams + * - VIR_NET_ERROR - followed by a remote_error struct + * - VIR_NET_CONTINUE - followed by a raw data packet */ switch (client->msg.header.status) { case VIR_NET_CONTINUE: {

Usually, we have this 'if() goto cleanup;' pattern in our new code. It is going to be useful here too. Thing is, there was a memleak. If there has been an error in virNetServerProgramSendStreamError() or virNetServerProgramSendStreamData() created message was never freed. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- daemon/stream.c | 68 ++++++++++++++++++++++++++++----------------------------- 1 file changed, 34 insertions(+), 34 deletions(-) diff --git a/daemon/stream.c b/daemon/stream.c index a2a370c..cf42a10 100644 --- a/daemon/stream.c +++ b/daemon/stream.c @@ -709,9 +709,12 @@ static int daemonStreamHandleRead(virNetServerClientPtr client, daemonClientStream *stream) { + virNetMessagePtr msg = NULL; + virNetMessageError rerr; char *buffer; size_t bufferLen = VIR_NET_MESSAGE_LEGACY_PAYLOAD_MAX; - int ret; + int ret = -1; + int rv; VIR_DEBUG("client=%p, stream=%p tx=%d closed=%d", client, stream, stream->tx, stream->closed); @@ -728,50 +731,47 @@ daemonStreamHandleRead(virNetServerClientPtr client, if (!stream->tx) return 0; + memset(&rerr, 0, sizeof(rerr)); + if (VIR_ALLOC_N(buffer, bufferLen) < 0) return -1; - ret = virStreamRecv(stream->st, buffer, bufferLen); - if (ret == -2) { + if (!(msg = virNetMessageNew(false))) + goto cleanup; + + rv = virStreamRecv(stream->st, buffer, bufferLen); + if (rv == -2) { /* Should never get this, since we're only called when we know * we're readable, but hey things change... */ - ret = 0; - } else if (ret < 0) { - virNetMessagePtr msg; - virNetMessageError rerr; - - memset(&rerr, 0, sizeof(rerr)); - - if (!(msg = virNetMessageNew(false))) - ret = -1; - else - ret = virNetServerProgramSendStreamError(remoteProgram, - client, - msg, - &rerr, - stream->procedure, - stream->serial); + } else if (rv < 0) { + if (virNetServerProgramSendStreamError(remoteProgram, + client, + msg, + &rerr, + stream->procedure, + stream->serial) < 0) + goto cleanup; } else { - virNetMessagePtr msg; stream->tx = false; - if (ret == 0) + if (rv == 0) stream->recvEOF = true; - if (!(msg = virNetMessageNew(false))) - ret = -1; - if (msg) { - msg->cb = daemonStreamMessageFinished; - msg->opaque = stream; - stream->refs++; - ret = virNetServerProgramSendStreamData(remoteProgram, - client, - msg, - stream->procedure, - stream->serial, - buffer, ret); - } + msg->cb = daemonStreamMessageFinished; + msg->opaque = stream; + stream->refs++; + if (virNetServerProgramSendStreamData(remoteProgram, + client, + msg, + stream->procedure, + stream->serial, + buffer, rv) < 0) + goto cleanup; } + msg = NULL; + ret = 0; + cleanup: VIR_FREE(buffer); + virNetMessageFree(msg); return ret; } -- 2.7.3

On 04/15/2016 09:51 AM, Michal Privoznik wrote:
Usually, we have this 'if() goto cleanup;' pattern in our new code. It is going to be useful here too. Thing is, there was a memleak. If there has been an error in virNetServerProgramSendStreamError() or virNetServerProgramSendStreamData() created message was never freed.
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- daemon/stream.c | 68 ++++++++++++++++++++++++++++----------------------------- 1 file changed, 34 insertions(+), 34 deletions(-)
diff --git a/daemon/stream.c b/daemon/stream.c index a2a370c..cf42a10 100644 --- a/daemon/stream.c +++ b/daemon/stream.c @@ -709,9 +709,12 @@ static int daemonStreamHandleRead(virNetServerClientPtr client, daemonClientStream *stream) { + virNetMessagePtr msg = NULL; + virNetMessageError rerr; char *buffer; size_t bufferLen = VIR_NET_MESSAGE_LEGACY_PAYLOAD_MAX; - int ret; + int ret = -1; + int rv;
VIR_DEBUG("client=%p, stream=%p tx=%d closed=%d", client, stream, stream->tx, stream->closed); @@ -728,50 +731,47 @@ daemonStreamHandleRead(virNetServerClientPtr client, if (!stream->tx) return 0;
+ memset(&rerr, 0, sizeof(rerr)); + if (VIR_ALLOC_N(buffer, bufferLen) < 0) return -1;
- ret = virStreamRecv(stream->st, buffer, bufferLen); - if (ret == -2) { + if (!(msg = virNetMessageNew(false))) + goto cleanup; + + rv = virStreamRecv(stream->st, buffer, bufferLen); + if (rv == -2) { /* Should never get this, since we're only called when we know * we're readable, but hey things change... */
If for some reason rv == -2, then you later set "msg = NULL" which leaks it (Coverity found) I assume 'msg' gets 'eaten' by virNetServerProgramSendStreamError and virNetServerProgramSendStreamData, so then after successful return from either that's when the "msg = NULL;" should be done. John
- ret = 0; - } else if (ret < 0) { - virNetMessagePtr msg; - virNetMessageError rerr; - - memset(&rerr, 0, sizeof(rerr)); - - if (!(msg = virNetMessageNew(false))) - ret = -1; - else - ret = virNetServerProgramSendStreamError(remoteProgram, - client, - msg, - &rerr, - stream->procedure, - stream->serial); + } else if (rv < 0) { + if (virNetServerProgramSendStreamError(remoteProgram, + client, + msg, + &rerr, + stream->procedure, + stream->serial) < 0) + goto cleanup; } else { - virNetMessagePtr msg; stream->tx = false; - if (ret == 0) + if (rv == 0) stream->recvEOF = true; - if (!(msg = virNetMessageNew(false))) - ret = -1;
- if (msg) { - msg->cb = daemonStreamMessageFinished; - msg->opaque = stream; - stream->refs++; - ret = virNetServerProgramSendStreamData(remoteProgram, - client, - msg, - stream->procedure, - stream->serial, - buffer, ret); - } + msg->cb = daemonStreamMessageFinished; + msg->opaque = stream; + stream->refs++; + if (virNetServerProgramSendStreamData(remoteProgram, + client, + msg, + stream->procedure, + stream->serial, + buffer, rv) < 0) + goto cleanup; }
+ msg = NULL;
^^^^ If (rv == -2) this is leaked.
+ ret = 0; + cleanup: VIR_FREE(buffer); + virNetMessageFree(msg); return ret; }

This reverts commit d9c9e138f22c48626f719f880920e04c639e0177. Unfortunately, things are going to be handled differently so this commit must go. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/rpc/virnetclientstream.c | 152 +++++++++++++++---------------------------- 1 file changed, 53 insertions(+), 99 deletions(-) diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index 64e9cd2..b428f4b 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -49,9 +49,9 @@ struct _virNetClientStream { * time by stopping consuming any incoming data * off the socket.... */ - struct iovec *incomingVec; /* I/O Vector to hold data */ - size_t writeVec; /* Vectors produced */ - size_t readVec; /* Vectors consumed */ + char *incoming; + size_t incomingOffset; + size_t incomingLength; bool incomingEOF; virNetClientStreamEventCallback cb; @@ -86,9 +86,9 @@ virNetClientStreamEventTimerUpdate(virNetClientStreamPtr st) if (!st->cb) return; - VIR_DEBUG("Check timer readVec %zu writeVec %zu %d", st->readVec, st->writeVec, st->cbEvents); + VIR_DEBUG("Check timer offset=%zu %d", st->incomingOffset, st->cbEvents); - if ((((st->readVec < st->writeVec) || st->incomingEOF) && + if (((st->incomingOffset || st->incomingEOF) && (st->cbEvents & VIR_STREAM_EVENT_READABLE)) || (st->cbEvents & VIR_STREAM_EVENT_WRITABLE)) { VIR_DEBUG("Enabling event timer"); @@ -110,14 +110,13 @@ virNetClientStreamEventTimer(int timer ATTRIBUTE_UNUSED, void *opaque) if (st->cb && (st->cbEvents & VIR_STREAM_EVENT_READABLE) && - ((st->readVec < st->writeVec) || st->incomingEOF)) + (st->incomingOffset || st->incomingEOF)) events |= VIR_STREAM_EVENT_READABLE; if (st->cb && (st->cbEvents & VIR_STREAM_EVENT_WRITABLE)) events |= VIR_STREAM_EVENT_WRITABLE; - VIR_DEBUG("Got Timer dispatch %d %d readVec %zu writeVec %zu", events, st->cbEvents, - st->readVec, st->writeVec); + VIR_DEBUG("Got Timer dispatch %d %d offset=%zu", events, st->cbEvents, st->incomingOffset); if (events) { virNetClientStreamEventCallback cb = st->cb; void *cbOpaque = st->cbOpaque; @@ -162,7 +161,7 @@ void virNetClientStreamDispose(void *obj) virNetClientStreamPtr st = obj; virResetError(&st->err); - VIR_FREE(st->incomingVec); + VIR_FREE(st->incoming); virObjectUnref(st->prog); } @@ -266,50 +265,38 @@ int virNetClientStreamQueuePacket(virNetClientStreamPtr st, virNetMessagePtr msg) { int ret = -1; - struct iovec iov; - char *base; - size_t piece, pieces, length, offset = 0, size = 1024*1024; + size_t need; virObjectLock(st); + need = msg->bufferLength - msg->bufferOffset; + if (need) { + size_t avail = st->incomingLength - st->incomingOffset; + if (need > avail) { + size_t extra = need - avail; + if (VIR_REALLOC_N(st->incoming, + st->incomingLength + extra) < 0) { + VIR_DEBUG("Out of memory handling stream data"); + goto cleanup; + } + st->incomingLength += extra; + } - length = msg->bufferLength - msg->bufferOffset; - - if (length == 0) { + memcpy(st->incoming + st->incomingOffset, + msg->buffer + msg->bufferOffset, + msg->bufferLength - msg->bufferOffset); + st->incomingOffset += (msg->bufferLength - msg->bufferOffset); + } else { st->incomingEOF = true; - goto end; } - pieces = VIR_DIV_UP(length, size); - for (piece = 0; piece < pieces; piece++) { - if (size > length - offset) - size = length - offset; - - if (VIR_ALLOC_N(base, size)) { - VIR_DEBUG("Allocation failed"); - goto cleanup; - } - - memcpy(base, msg->buffer + msg->bufferOffset + offset, size); - iov.iov_base = base; - iov.iov_len = size; - offset += size; - - if (VIR_APPEND_ELEMENT(st->incomingVec, st->writeVec, iov) < 0) { - VIR_DEBUG("Append failed"); - VIR_FREE(base); - goto cleanup; - } - VIR_DEBUG("Wrote piece of vector. readVec %zu, writeVec %zu size %zu", - st->readVec, st->writeVec, size); - } - - end: + VIR_DEBUG("Stream incoming data offset %zu length %zu EOF %d", + st->incomingOffset, st->incomingLength, + st->incomingEOF); virNetClientStreamEventTimerUpdate(st); + ret = 0; cleanup: - VIR_DEBUG("Stream incoming data readVec %zu writeVec %zu EOF %d", - st->readVec, st->writeVec, st->incomingEOF); virObjectUnlock(st); return ret; } @@ -374,21 +361,17 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, size_t nbytes, bool nonblock) { - int ret = -1; - size_t partial, offset; - - virObjectLock(st); - + int rv = -1; VIR_DEBUG("st=%p client=%p data=%p nbytes=%zu nonblock=%d", st, client, data, nbytes, nonblock); - - if ((st->readVec >= st->writeVec) && !st->incomingEOF) { + virObjectLock(st); + if (!st->incomingOffset && !st->incomingEOF) { virNetMessagePtr msg; - int rv; + int ret; if (nonblock) { VIR_DEBUG("Non-blocking mode and no data available"); - ret = -2; + rv = -2; goto cleanup; } @@ -404,66 +387,37 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, VIR_DEBUG("Dummy packet to wait for stream data"); virObjectUnlock(st); - rv = virNetClientSendWithReplyStream(client, msg, st); + ret = virNetClientSendWithReplyStream(client, msg, st); virObjectLock(st); virNetMessageFree(msg); - if (rv < 0) + if (ret < 0) goto cleanup; } - offset = 0; - partial = nbytes; - - while (st->incomingVec && (st->readVec < st->writeVec)) { - struct iovec *iov = st->incomingVec + st->readVec; - - if (!iov || !iov->iov_base) { - virReportError(VIR_ERR_INTERNAL_ERROR, - "%s", _("NULL pointer encountered")); - goto cleanup; - } - - if (partial < iov->iov_len) { - memcpy(data+offset, iov->iov_base, partial); - memmove(iov->iov_base, (char*)iov->iov_base+partial, - iov->iov_len-partial); - iov->iov_len -= partial; - offset += partial; - VIR_DEBUG("Consumed %zu, left %zu", partial, iov->iov_len); - break; + VIR_DEBUG("After IO %zu", st->incomingOffset); + if (st->incomingOffset) { + int want = st->incomingOffset; + if (want > nbytes) + want = nbytes; + memcpy(data, st->incoming, want); + if (want < st->incomingOffset) { + memmove(st->incoming, st->incoming + want, st->incomingOffset - want); + st->incomingOffset -= want; + } else { + VIR_FREE(st->incoming); + st->incomingOffset = st->incomingLength = 0; } - - memcpy(data+offset, iov->iov_base, iov->iov_len); - VIR_DEBUG("Consumed %zu. Moving to next piece", iov->iov_len); - partial -= iov->iov_len; - offset += iov->iov_len; - VIR_FREE(iov->iov_base); - iov->iov_len = 0; - st->readVec++; - - VIR_DEBUG("Read piece of vector. read %zu, readVec %zu, writeVec %zu", - offset, st->readVec, st->writeVec); - } - - /* Shrink the I/O Vector buffer to free up memory. Do the - shrinking only when there is selected amount or more buffers to - free so it doesn't constantly memmove() and realloc() buffers. - */ - if (st->readVec >= 16) { - memmove(st->incomingVec, st->incomingVec + st->readVec, - sizeof(*st->incomingVec)*(st->writeVec - st->readVec)); - VIR_SHRINK_N(st->incomingVec, st->writeVec, st->readVec); - VIR_DEBUG("shrink removed %zu, left %zu", st->readVec, st->writeVec); - st->readVec = 0; + rv = want; + } else { + rv = 0; } - ret = offset; virNetClientStreamEventTimerUpdate(st); cleanup: virObjectUnlock(st); - return ret; + return rv; } -- 2.7.3

Currently we have two separate functions for handling read from a stream. One is supposed to be low level and reads data in this self allocating chunk of memory. The other read function then copies data over from the chunk into a user buffer. There are two memcpy() involved even though a single would be sufficient. Moreover, since we are copying just data, we can't process alternative stream packets in the latter function, like stream seeks. In my testing, this proved two times faster then implementation which uses IO vectors. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/rpc/virnetclientstream.c | 106 ++++++++++++++++++++++--------------------- 1 file changed, 54 insertions(+), 52 deletions(-) diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index b428f4b..34989a9 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -49,9 +49,7 @@ struct _virNetClientStream { * time by stopping consuming any incoming data * off the socket.... */ - char *incoming; - size_t incomingOffset; - size_t incomingLength; + virNetMessagePtr rx; bool incomingEOF; virNetClientStreamEventCallback cb; @@ -86,9 +84,9 @@ virNetClientStreamEventTimerUpdate(virNetClientStreamPtr st) if (!st->cb) return; - VIR_DEBUG("Check timer offset=%zu %d", st->incomingOffset, st->cbEvents); + VIR_DEBUG("Check timer rx=%p cbEvents=%d", st->rx, st->cbEvents); - if (((st->incomingOffset || st->incomingEOF) && + if (((st->rx || st->incomingEOF) && (st->cbEvents & VIR_STREAM_EVENT_READABLE)) || (st->cbEvents & VIR_STREAM_EVENT_WRITABLE)) { VIR_DEBUG("Enabling event timer"); @@ -110,13 +108,13 @@ virNetClientStreamEventTimer(int timer ATTRIBUTE_UNUSED, void *opaque) if (st->cb && (st->cbEvents & VIR_STREAM_EVENT_READABLE) && - (st->incomingOffset || st->incomingEOF)) + (st->rx || st->incomingEOF)) events |= VIR_STREAM_EVENT_READABLE; if (st->cb && (st->cbEvents & VIR_STREAM_EVENT_WRITABLE)) events |= VIR_STREAM_EVENT_WRITABLE; - VIR_DEBUG("Got Timer dispatch %d %d offset=%zu", events, st->cbEvents, st->incomingOffset); + VIR_DEBUG("Got Timer dispatch events=%d cbEvents=%d rx=%p", events, st->cbEvents, st->rx); if (events) { virNetClientStreamEventCallback cb = st->cb; void *cbOpaque = st->cbOpaque; @@ -161,7 +159,11 @@ void virNetClientStreamDispose(void *obj) virNetClientStreamPtr st = obj; virResetError(&st->err); - VIR_FREE(st->incoming); + while (st->rx) { + virNetMessagePtr msg = st->rx; + virNetMessageQueueServe(&st->rx); + virNetMessageFree(msg); + } virObjectUnref(st->prog); } @@ -264,41 +266,34 @@ int virNetClientStreamSetError(virNetClientStreamPtr st, int virNetClientStreamQueuePacket(virNetClientStreamPtr st, virNetMessagePtr msg) { - int ret = -1; - size_t need; + virNetMessagePtr tmp_msg; + + VIR_DEBUG("Incoming stream message: stream=%p message=%p", st, msg); + + /* Unfortunately, we must allocate new message as the one we + * get in @msg is going to be cleared later in the process. */ + + if (!(tmp_msg = virNetMessageNew(false))) + return -1; + + /* Copy header */ + memcpy(&tmp_msg->header, &msg->header, sizeof(msg->header)); + + /* Steal message buffer */ + tmp_msg->buffer = msg->buffer; + tmp_msg->bufferLength = msg->bufferLength; + tmp_msg->bufferOffset = msg->bufferOffset; + msg->buffer = NULL; + msg->bufferLength = msg->bufferOffset = 0; virObjectLock(st); - need = msg->bufferLength - msg->bufferOffset; - if (need) { - size_t avail = st->incomingLength - st->incomingOffset; - if (need > avail) { - size_t extra = need - avail; - if (VIR_REALLOC_N(st->incoming, - st->incomingLength + extra) < 0) { - VIR_DEBUG("Out of memory handling stream data"); - goto cleanup; - } - st->incomingLength += extra; - } - memcpy(st->incoming + st->incomingOffset, - msg->buffer + msg->bufferOffset, - msg->bufferLength - msg->bufferOffset); - st->incomingOffset += (msg->bufferLength - msg->bufferOffset); - } else { - st->incomingEOF = true; - } + virNetMessageQueuePush(&st->rx, tmp_msg); - VIR_DEBUG("Stream incoming data offset %zu length %zu EOF %d", - st->incomingOffset, st->incomingLength, - st->incomingEOF); virNetClientStreamEventTimerUpdate(st); - ret = 0; - - cleanup: virObjectUnlock(st); - return ret; + return 0; } @@ -362,10 +357,12 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, bool nonblock) { int rv = -1; + size_t want; + VIR_DEBUG("st=%p client=%p data=%p nbytes=%zu nonblock=%d", st, client, data, nbytes, nonblock); virObjectLock(st); - if (!st->incomingOffset && !st->incomingEOF) { + if (!st->rx && !st->incomingEOF) { virNetMessagePtr msg; int ret; @@ -395,23 +392,28 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, goto cleanup; } - VIR_DEBUG("After IO %zu", st->incomingOffset); - if (st->incomingOffset) { - int want = st->incomingOffset; - if (want > nbytes) - want = nbytes; - memcpy(data, st->incoming, want); - if (want < st->incomingOffset) { - memmove(st->incoming, st->incoming + want, st->incomingOffset - want); - st->incomingOffset -= want; - } else { - VIR_FREE(st->incoming); - st->incomingOffset = st->incomingLength = 0; + VIR_DEBUG("After IO rx=%p", st->rx); + want = nbytes; + while (want && st->rx) { + virNetMessagePtr msg = st->rx; + size_t len = want; + + if (len > msg->bufferLength - msg->bufferOffset) + len = msg->bufferLength - msg->bufferOffset; + + if (!len) + break; + + memcpy(data + (nbytes - want), msg->buffer + msg->bufferOffset, len); + want -= len; + msg->bufferOffset += len; + + if (msg->bufferOffset == msg->bufferLength) { + virNetMessageQueueServe(&st->rx); + virNetMessageFree(msg); } - rv = want; - } else { - rv = 0; } + rv = nbytes - want; virNetClientStreamEventTimerUpdate(st); -- 2.7.3

On 04/15/2016 09:51 AM, Michal Privoznik wrote:
Currently we have two separate functions for handling read from a stream. One is supposed to be low level and reads data in this self allocating chunk of memory. The other read function then copies data over from the chunk into a user buffer. There are two memcpy() involved even though a single would be sufficient. Moreover, since we are copying just data, we can't process alternative stream packets in the latter function, like stream seeks.
In my testing, this proved two times faster then implementation
s/then/than the/
which uses IO vectors.
Can I "assume" this testing covers the reverted patch scenario. IOW: I think this needs https://bugzilla.redhat.com/show_bug.cgi?id=1026136 to be reopened... Might have been "nice" to indicate/summarize what this algorithm does as opposed to the other. I think you started at the end of the first paragraph, but I'm not 100% sure - I guess it's easier for me if it's explicitly said, such as: In virNetClientStreamQueuePacket instead of ... In virNetClientStreamRecvPacket instead of ... instead of implicitly said if you know the code. The functions are just tough to read without (more) knowledge (than I have about them) of how they are designed to function. Since he had a hand in the above bug, hopefully Martin can take a look at this patch.
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/rpc/virnetclientstream.c | 106 ++++++++++++++++++++++--------------------- 1 file changed, 54 insertions(+), 52 deletions(-)
diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index b428f4b..34989a9 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -49,9 +49,7 @@ struct _virNetClientStream { * time by stopping consuming any incoming data * off the socket.... */ - char *incoming; - size_t incomingOffset; - size_t incomingLength; + virNetMessagePtr rx; bool incomingEOF;
virNetClientStreamEventCallback cb; @@ -86,9 +84,9 @@ virNetClientStreamEventTimerUpdate(virNetClientStreamPtr st) if (!st->cb) return;
- VIR_DEBUG("Check timer offset=%zu %d", st->incomingOffset, st->cbEvents); + VIR_DEBUG("Check timer rx=%p cbEvents=%d", st->rx, st->cbEvents);
- if (((st->incomingOffset || st->incomingEOF) && + if (((st->rx || st->incomingEOF) && (st->cbEvents & VIR_STREAM_EVENT_READABLE)) || (st->cbEvents & VIR_STREAM_EVENT_WRITABLE)) { VIR_DEBUG("Enabling event timer"); @@ -110,13 +108,13 @@ virNetClientStreamEventTimer(int timer ATTRIBUTE_UNUSED, void *opaque)
if (st->cb && (st->cbEvents & VIR_STREAM_EVENT_READABLE) && - (st->incomingOffset || st->incomingEOF)) + (st->rx || st->incomingEOF)) events |= VIR_STREAM_EVENT_READABLE; if (st->cb && (st->cbEvents & VIR_STREAM_EVENT_WRITABLE)) events |= VIR_STREAM_EVENT_WRITABLE;
- VIR_DEBUG("Got Timer dispatch %d %d offset=%zu", events, st->cbEvents, st->incomingOffset); + VIR_DEBUG("Got Timer dispatch events=%d cbEvents=%d rx=%p", events, st->cbEvents, st->rx); if (events) { virNetClientStreamEventCallback cb = st->cb; void *cbOpaque = st->cbOpaque; @@ -161,7 +159,11 @@ void virNetClientStreamDispose(void *obj) virNetClientStreamPtr st = obj;
virResetError(&st->err); - VIR_FREE(st->incoming); + while (st->rx) { + virNetMessagePtr msg = st->rx; + virNetMessageQueueServe(&st->rx); + virNetMessageFree(msg); + } virObjectUnref(st->prog); }
@@ -264,41 +266,34 @@ int virNetClientStreamSetError(virNetClientStreamPtr st, int virNetClientStreamQueuePacket(virNetClientStreamPtr st, virNetMessagePtr msg) { - int ret = -1; - size_t need; + virNetMessagePtr tmp_msg; + + VIR_DEBUG("Incoming stream message: stream=%p message=%p", st, msg); + + /* Unfortunately, we must allocate new message as the one we + * get in @msg is going to be cleared later in the process. */ + + if (!(tmp_msg = virNetMessageNew(false))) + return -1; + + /* Copy header */ + memcpy(&tmp_msg->header, &msg->header, sizeof(msg->header)); + + /* Steal message buffer */ + tmp_msg->buffer = msg->buffer; + tmp_msg->bufferLength = msg->bufferLength; + tmp_msg->bufferOffset = msg->bufferOffset; + msg->buffer = NULL; + msg->bufferLength = msg->bufferOffset = 0;
virObjectLock(st); - need = msg->bufferLength - msg->bufferOffset; - if (need) { - size_t avail = st->incomingLength - st->incomingOffset; - if (need > avail) { - size_t extra = need - avail; - if (VIR_REALLOC_N(st->incoming, - st->incomingLength + extra) < 0) { - VIR_DEBUG("Out of memory handling stream data"); - goto cleanup; - } - st->incomingLength += extra; - }
- memcpy(st->incoming + st->incomingOffset, - msg->buffer + msg->bufferOffset, - msg->bufferLength - msg->bufferOffset); - st->incomingOffset += (msg->bufferLength - msg->bufferOffset); - } else { - st->incomingEOF = true; - } + virNetMessageQueuePush(&st->rx, tmp_msg);
- VIR_DEBUG("Stream incoming data offset %zu length %zu EOF %d", - st->incomingOffset, st->incomingLength, - st->incomingEOF); virNetClientStreamEventTimerUpdate(st);
- ret = 0; - - cleanup: virObjectUnlock(st); - return ret; + return 0; }
@@ -362,10 +357,12 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, bool nonblock) { int rv = -1; + size_t want; + VIR_DEBUG("st=%p client=%p data=%p nbytes=%zu nonblock=%d", st, client, data, nbytes, nonblock); virObjectLock(st); - if (!st->incomingOffset && !st->incomingEOF) { + if (!st->rx && !st->incomingEOF) { virNetMessagePtr msg; int ret;
@@ -395,23 +392,28 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, goto cleanup; }
- VIR_DEBUG("After IO %zu", st->incomingOffset); - if (st->incomingOffset) { - int want = st->incomingOffset; - if (want > nbytes) - want = nbytes; - memcpy(data, st->incoming, want); - if (want < st->incomingOffset) { - memmove(st->incoming, st->incoming + want, st->incomingOffset - want); - st->incomingOffset -= want; - } else { - VIR_FREE(st->incoming); - st->incomingOffset = st->incomingLength = 0; + VIR_DEBUG("After IO rx=%p", st->rx); + want = nbytes; + while (want && st->rx) {
So if 'st->rx == NULL', then 'rv = nbytes - want;' or 0 - I assume that is 'expected'...
+ virNetMessagePtr msg = st->rx; + size_t len = want; + + if (len > msg->bufferLength - msg->bufferOffset) + len = msg->bufferLength - msg->bufferOffset; + + if (!len) + break; + + memcpy(data + (nbytes - want), msg->buffer + msg->bufferOffset, len); + want -= len; + msg->bufferOffset += len; + + if (msg->bufferOffset == msg->bufferLength) { + virNetMessageQueueServe(&st->rx); + virNetMessageFree(msg);
Nothing needs to be done with want here? I guess this shows my lack of depth of understanding of these algorithms... Big black box that I hope works without me needing to intervene! John
} - rv = want; - } else { - rv = 0; } + rv = nbytes - want;
virNetClientStreamEventTimerUpdate(st);

On 20.04.2016 15:57, John Ferlan wrote:
On 04/15/2016 09:51 AM, Michal Privoznik wrote:
Currently we have two separate functions for handling read from a stream. One is supposed to be low level and reads data in this self allocating chunk of memory. The other read function then copies data over from the chunk into a user buffer. There are two memcpy() involved even though a single would be sufficient. Moreover, since we are copying just data, we can't process alternative stream packets in the latter function, like stream seeks.
In my testing, this proved two times faster then implementation
s/then/than the/
which uses IO vectors.
Can I "assume" this testing covers the reverted patch scenario. IOW: I think this needs https://bugzilla.redhat.com/show_bug.cgi?id=1026136 to be reopened...
This showed two times faster than even IO vectors implementation.
Might have been "nice" to indicate/summarize what this algorithm does as opposed to the other. I think you started at the end of the first paragraph, but I'm not 100% sure - I guess it's easier for me if it's explicitly said, such as:
In virNetClientStreamQueuePacket instead of ...
In virNetClientStreamRecvPacket instead of ...
instead of implicitly said if you know the code.
Something like this? There are two functions on the client that handle incoming stream data. The first one virNetClientStreamQueuePacket() is a low level function that just process the incoming stream data from the socket and store it into an internal structure. This happens in the client event loop therefore the shorter the callbacks are, the better. The second function virNetClientStreamRecvPacket() then handles copying data from internal structure into a client provided buffer. Change introduced in this commit makes just that: new queue for incoming stream packets is introduced. Then instead of copying data into intermediate internal buffer and then copying them into user buffer, incoming stream messages are enqueued into the queue and data are copied just once - in the upper layer function virNetClientStreamRecvPacket(). In the end, there's just one copying of data and therefore shorter event loop callback. This should boost the performance which has proven to be the case in my testing. Having said that, I don't think there's any need for reopening the bug since we are not hurting performance here.
The functions are just tough to read without (more) knowledge (than I have about them) of how they are designed to function. Since he had a hand in the above bug, hopefully Martin can take a look at this patch.
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/rpc/virnetclientstream.c | 106 ++++++++++++++++++++++--------------------- 1 file changed, 54 insertions(+), 52 deletions(-)
diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index b428f4b..34989a9 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -49,9 +49,7 @@ struct _virNetClientStream { * time by stopping consuming any incoming data * off the socket.... */ - char *incoming; - size_t incomingOffset; - size_t incomingLength; + virNetMessagePtr rx; bool incomingEOF;
virNetClientStreamEventCallback cb; @@ -86,9 +84,9 @@ virNetClientStreamEventTimerUpdate(virNetClientStreamPtr st) if (!st->cb) return;
- VIR_DEBUG("Check timer offset=%zu %d", st->incomingOffset, st->cbEvents); + VIR_DEBUG("Check timer rx=%p cbEvents=%d", st->rx, st->cbEvents);
- if (((st->incomingOffset || st->incomingEOF) && + if (((st->rx || st->incomingEOF) && (st->cbEvents & VIR_STREAM_EVENT_READABLE)) || (st->cbEvents & VIR_STREAM_EVENT_WRITABLE)) { VIR_DEBUG("Enabling event timer"); @@ -110,13 +108,13 @@ virNetClientStreamEventTimer(int timer ATTRIBUTE_UNUSED, void *opaque)
if (st->cb && (st->cbEvents & VIR_STREAM_EVENT_READABLE) && - (st->incomingOffset || st->incomingEOF)) + (st->rx || st->incomingEOF)) events |= VIR_STREAM_EVENT_READABLE; if (st->cb && (st->cbEvents & VIR_STREAM_EVENT_WRITABLE)) events |= VIR_STREAM_EVENT_WRITABLE;
- VIR_DEBUG("Got Timer dispatch %d %d offset=%zu", events, st->cbEvents, st->incomingOffset); + VIR_DEBUG("Got Timer dispatch events=%d cbEvents=%d rx=%p", events, st->cbEvents, st->rx); if (events) { virNetClientStreamEventCallback cb = st->cb; void *cbOpaque = st->cbOpaque; @@ -161,7 +159,11 @@ void virNetClientStreamDispose(void *obj) virNetClientStreamPtr st = obj;
virResetError(&st->err); - VIR_FREE(st->incoming); + while (st->rx) { + virNetMessagePtr msg = st->rx; + virNetMessageQueueServe(&st->rx); + virNetMessageFree(msg); + } virObjectUnref(st->prog); }
@@ -264,41 +266,34 @@ int virNetClientStreamSetError(virNetClientStreamPtr st, int virNetClientStreamQueuePacket(virNetClientStreamPtr st, virNetMessagePtr msg) { - int ret = -1; - size_t need; + virNetMessagePtr tmp_msg; + + VIR_DEBUG("Incoming stream message: stream=%p message=%p", st, msg); + + /* Unfortunately, we must allocate new message as the one we + * get in @msg is going to be cleared later in the process. */ + + if (!(tmp_msg = virNetMessageNew(false))) + return -1; + + /* Copy header */ + memcpy(&tmp_msg->header, &msg->header, sizeof(msg->header)); + + /* Steal message buffer */ + tmp_msg->buffer = msg->buffer; + tmp_msg->bufferLength = msg->bufferLength; + tmp_msg->bufferOffset = msg->bufferOffset; + msg->buffer = NULL; + msg->bufferLength = msg->bufferOffset = 0;
virObjectLock(st); - need = msg->bufferLength - msg->bufferOffset; - if (need) { - size_t avail = st->incomingLength - st->incomingOffset; - if (need > avail) { - size_t extra = need - avail; - if (VIR_REALLOC_N(st->incoming, - st->incomingLength + extra) < 0) { - VIR_DEBUG("Out of memory handling stream data"); - goto cleanup; - } - st->incomingLength += extra; - }
- memcpy(st->incoming + st->incomingOffset, - msg->buffer + msg->bufferOffset, - msg->bufferLength - msg->bufferOffset); - st->incomingOffset += (msg->bufferLength - msg->bufferOffset); - } else { - st->incomingEOF = true; - } + virNetMessageQueuePush(&st->rx, tmp_msg);
- VIR_DEBUG("Stream incoming data offset %zu length %zu EOF %d", - st->incomingOffset, st->incomingLength, - st->incomingEOF); virNetClientStreamEventTimerUpdate(st);
- ret = 0; - - cleanup: virObjectUnlock(st); - return ret; + return 0; }
@@ -362,10 +357,12 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, bool nonblock) { int rv = -1; + size_t want; + VIR_DEBUG("st=%p client=%p data=%p nbytes=%zu nonblock=%d", st, client, data, nbytes, nonblock); virObjectLock(st); - if (!st->incomingOffset && !st->incomingEOF) { + if (!st->rx && !st->incomingEOF) { virNetMessagePtr msg; int ret;
@@ -395,23 +392,28 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, goto cleanup; }
- VIR_DEBUG("After IO %zu", st->incomingOffset); - if (st->incomingOffset) { - int want = st->incomingOffset; - if (want > nbytes) - want = nbytes; - memcpy(data, st->incoming, want); - if (want < st->incomingOffset) { - memmove(st->incoming, st->incoming + want, st->incomingOffset - want); - st->incomingOffset -= want; - } else { - VIR_FREE(st->incoming); - st->incomingOffset = st->incomingLength = 0; + VIR_DEBUG("After IO rx=%p", st->rx); + want = nbytes; + while (want && st->rx) {
So if 'st->rx == NULL', then 'rv = nbytes - want;' or 0 - I assume that is 'expected'...
Yes. Calling virStreamRecv() on client side will basically boil down to calling this function. And return value of this function will become return value of the wrapper. As described in the docs, virStreamRecv() and this virNetClientStreamRecvPacket() returns number of bytes read from stream. In case there's no incoming data, there's nothing we can read from and therefore we should return 0.
+ virNetMessagePtr msg = st->rx; + size_t len = want; + + if (len > msg->bufferLength - msg->bufferOffset) + len = msg->bufferLength - msg->bufferOffset; + + if (!len) + break; + + memcpy(data + (nbytes - want), msg->buffer + msg->bufferOffset, len); + want -= len; + msg->bufferOffset += len; + + if (msg->bufferOffset == msg->bufferLength) { + virNetMessageQueueServe(&st->rx); + virNetMessageFree(msg);
Nothing needs to be done with want here? I guess this shows my lack of depth of understanding of these algorithms... Big black box that I hope works without me needing to intervene!
No. This does nothing more than: if the head of linked list of incoming stream messages is fully read (*), then pop the message at the head and move to the other message in the queue (list). In that case, I haven't copied any data to user, therefore I should not change @want. (*) - It may happen, that users will read less bytes than there is in incoming message. For instance, incoming stream packet (message) can be 1024 bytes in size, but user will read 1 byte at the time from stream. Hope my explanation makes it clear(-er) to you. Michal

On 04/21/2016 10:28 AM, Michal Privoznik wrote:
On 20.04.2016 15:57, John Ferlan wrote:
On 04/15/2016 09:51 AM, Michal Privoznik wrote:
Currently we have two separate functions for handling read from a stream. One is supposed to be low level and reads data in this self allocating chunk of memory. The other read function then copies data over from the chunk into a user buffer. There are two memcpy() involved even though a single would be sufficient. Moreover, since we are copying just data, we can't process alternative stream packets in the latter function, like stream seeks.
In my testing, this proved two times faster then implementation
s/then/than the/
which uses IO vectors.
Can I "assume" this testing covers the reverted patch scenario. IOW: I think this needs https://bugzilla.redhat.com/show_bug.cgi?id=1026136 to be reopened...
This showed two times faster than even IO vectors implementation.
Might have been "nice" to indicate/summarize what this algorithm does as opposed to the other. I think you started at the end of the first paragraph, but I'm not 100% sure - I guess it's easier for me if it's explicitly said, such as:
In virNetClientStreamQueuePacket instead of ...
In virNetClientStreamRecvPacket instead of ...
instead of implicitly said if you know the code.
Something like this?
There are two functions on the client that handle incoming stream data. The first one virNetClientStreamQueuePacket() is a low level function that just process the incoming stream data from the socket and store it
...just processes ... and stores ...
into an internal structure. This happens in the client event loop therefore the shorter the callbacks are, the better. The second function virNetClientStreamRecvPacket() then handles copying data from internal structure into a client provided buffer. Change introduced in this
New paragraph before "Change"
commit makes just that: new queue for incoming stream packets is
...a new receive (rx) queue...
introduced. Then instead of copying data into intermediate internal buffer and then copying them into user buffer, incoming stream messages are enqueued into the queue and data are copied just once - in the upper
... are queue... ... data is copied...
layer function virNetClientStreamRecvPacket(). In the end, there's just one copying of data and therefore shorter event loop callback. This should boost the performance which has proven to be the case in my testing.
Having said that, I don't think there's any need for reopening the bug since we are not hurting performance here.
The only reason I suggested is I think technically the revert makes the previous changes essentially NULL and void. Since that commit was connected with a bug #, I just wanted to be sure "process wise" we're covered... It's not that important though. OK... I see , instead of allocating and copying data from incoming stream socket into a buffer to only be copied again into the client buffer, we'll "steal" the entire buffer destined for the client from the incoming stream socket and then create a queue for the client to copy - seems OK to me... ACK for 7-8 John BTW: I know it's existing, but virNetMessageQueueServe caused me to go look and see it's really a virNetMessageQueuePop to complement the virNetMessageQueuePush (sigh)
The functions are just tough to read without (more) knowledge (than I have about them) of how they are designed to function. Since he had a hand in the above bug, hopefully Martin can take a look at this patch.
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/rpc/virnetclientstream.c | 106 ++++++++++++++++++++++--------------------- 1 file changed, 54 insertions(+), 52 deletions(-)
diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index b428f4b..34989a9 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -49,9 +49,7 @@ struct _virNetClientStream { * time by stopping consuming any incoming data * off the socket.... */ - char *incoming; - size_t incomingOffset; - size_t incomingLength; + virNetMessagePtr rx; bool incomingEOF;
virNetClientStreamEventCallback cb; @@ -86,9 +84,9 @@ virNetClientStreamEventTimerUpdate(virNetClientStreamPtr st) if (!st->cb) return;
- VIR_DEBUG("Check timer offset=%zu %d", st->incomingOffset, st->cbEvents); + VIR_DEBUG("Check timer rx=%p cbEvents=%d", st->rx, st->cbEvents);
- if (((st->incomingOffset || st->incomingEOF) && + if (((st->rx || st->incomingEOF) && (st->cbEvents & VIR_STREAM_EVENT_READABLE)) || (st->cbEvents & VIR_STREAM_EVENT_WRITABLE)) { VIR_DEBUG("Enabling event timer"); @@ -110,13 +108,13 @@ virNetClientStreamEventTimer(int timer ATTRIBUTE_UNUSED, void *opaque)
if (st->cb && (st->cbEvents & VIR_STREAM_EVENT_READABLE) && - (st->incomingOffset || st->incomingEOF)) + (st->rx || st->incomingEOF)) events |= VIR_STREAM_EVENT_READABLE; if (st->cb && (st->cbEvents & VIR_STREAM_EVENT_WRITABLE)) events |= VIR_STREAM_EVENT_WRITABLE;
- VIR_DEBUG("Got Timer dispatch %d %d offset=%zu", events, st->cbEvents, st->incomingOffset); + VIR_DEBUG("Got Timer dispatch events=%d cbEvents=%d rx=%p", events, st->cbEvents, st->rx); if (events) { virNetClientStreamEventCallback cb = st->cb; void *cbOpaque = st->cbOpaque; @@ -161,7 +159,11 @@ void virNetClientStreamDispose(void *obj) virNetClientStreamPtr st = obj;
virResetError(&st->err); - VIR_FREE(st->incoming); + while (st->rx) { + virNetMessagePtr msg = st->rx; + virNetMessageQueueServe(&st->rx); + virNetMessageFree(msg); + } virObjectUnref(st->prog); }
@@ -264,41 +266,34 @@ int virNetClientStreamSetError(virNetClientStreamPtr st, int virNetClientStreamQueuePacket(virNetClientStreamPtr st, virNetMessagePtr msg) { - int ret = -1; - size_t need; + virNetMessagePtr tmp_msg; + + VIR_DEBUG("Incoming stream message: stream=%p message=%p", st, msg); + + /* Unfortunately, we must allocate new message as the one we + * get in @msg is going to be cleared later in the process. */ + + if (!(tmp_msg = virNetMessageNew(false))) + return -1; + + /* Copy header */ + memcpy(&tmp_msg->header, &msg->header, sizeof(msg->header)); + + /* Steal message buffer */ + tmp_msg->buffer = msg->buffer; + tmp_msg->bufferLength = msg->bufferLength; + tmp_msg->bufferOffset = msg->bufferOffset; + msg->buffer = NULL; + msg->bufferLength = msg->bufferOffset = 0;
virObjectLock(st); - need = msg->bufferLength - msg->bufferOffset; - if (need) { - size_t avail = st->incomingLength - st->incomingOffset; - if (need > avail) { - size_t extra = need - avail; - if (VIR_REALLOC_N(st->incoming, - st->incomingLength + extra) < 0) { - VIR_DEBUG("Out of memory handling stream data"); - goto cleanup; - } - st->incomingLength += extra; - }
- memcpy(st->incoming + st->incomingOffset, - msg->buffer + msg->bufferOffset, - msg->bufferLength - msg->bufferOffset); - st->incomingOffset += (msg->bufferLength - msg->bufferOffset); - } else { - st->incomingEOF = true; - } + virNetMessageQueuePush(&st->rx, tmp_msg);
- VIR_DEBUG("Stream incoming data offset %zu length %zu EOF %d", - st->incomingOffset, st->incomingLength, - st->incomingEOF); virNetClientStreamEventTimerUpdate(st);
- ret = 0; - - cleanup: virObjectUnlock(st); - return ret; + return 0; }
@@ -362,10 +357,12 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, bool nonblock) { int rv = -1; + size_t want; + VIR_DEBUG("st=%p client=%p data=%p nbytes=%zu nonblock=%d", st, client, data, nbytes, nonblock); virObjectLock(st); - if (!st->incomingOffset && !st->incomingEOF) { + if (!st->rx && !st->incomingEOF) { virNetMessagePtr msg; int ret;
@@ -395,23 +392,28 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, goto cleanup; }
- VIR_DEBUG("After IO %zu", st->incomingOffset); - if (st->incomingOffset) { - int want = st->incomingOffset; - if (want > nbytes) - want = nbytes; - memcpy(data, st->incoming, want); - if (want < st->incomingOffset) { - memmove(st->incoming, st->incoming + want, st->incomingOffset - want); - st->incomingOffset -= want; - } else { - VIR_FREE(st->incoming); - st->incomingOffset = st->incomingLength = 0; + VIR_DEBUG("After IO rx=%p", st->rx); + want = nbytes; + while (want && st->rx) {
So if 'st->rx == NULL', then 'rv = nbytes - want;' or 0 - I assume that is 'expected'...
Yes. Calling virStreamRecv() on client side will basically boil down to calling this function. And return value of this function will become return value of the wrapper. As described in the docs, virStreamRecv() and this virNetClientStreamRecvPacket() returns number of bytes read from stream. In case there's no incoming data, there's nothing we can read from and therefore we should return 0.
+ virNetMessagePtr msg = st->rx; + size_t len = want; + + if (len > msg->bufferLength - msg->bufferOffset) + len = msg->bufferLength - msg->bufferOffset; + + if (!len) + break; + + memcpy(data + (nbytes - want), msg->buffer + msg->bufferOffset, len); + want -= len; + msg->bufferOffset += len; + + if (msg->bufferOffset == msg->bufferLength) { + virNetMessageQueueServe(&st->rx); + virNetMessageFree(msg);
Nothing needs to be done with want here? I guess this shows my lack of depth of understanding of these algorithms... Big black box that I hope works without me needing to intervene!
No. This does nothing more than: if the head of linked list of incoming stream messages is fully read (*), then pop the message at the head and move to the other message in the queue (list). In that case, I haven't copied any data to user, therefore I should not change @want.
(*) - It may happen, that users will read less bytes than there is in incoming message. For instance, incoming stream packet (message) can be 1024 bytes in size, but user will read 1 byte at the time from stream.
Hope my explanation makes it clear(-er) to you.
Michal

These are wrappers over virStreamRecv and virStreamSend so that users have to care about nothing but writing data into / reading data from a sink (typically a file). Note, that these wrappers are used exclusively on client side as the daemon has slightly different approach. Anyway, the wrappers allocate this buffer and use it for intermediate data storage until the data is passed to stream to send, or to the client application. So far, we are using 64KB buffer. This is enough, but suboptimal because server can send messages up to VIR_NET_MESSAGE_LEGACY_PAYLOAD_MAX bytes big (262120B, roughly 256KB). So if we make the buffer this big, a single message containing the data is sent instead of for, which is current situation. This means lower overhead, because each message contains a header which needs to be processed, each message is processed roughly same amount of time regardless of its size, less bytes need to be sent through the wire, and so on. Note that since server will never sent us a stream message bigger than VIR_NET_MESSAGE_LEGACY_PAYLOAD_MAX there's no point in sizing up the client buffer past this threshold. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/libvirt-stream.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/libvirt-stream.c b/src/libvirt-stream.c index c16f586..8384b37 100644 --- a/src/libvirt-stream.c +++ b/src/libvirt-stream.c @@ -23,6 +23,7 @@ #include "datatypes.h" #include "viralloc.h" #include "virlog.h" +#include "rpc/virnetprotocol.h" VIR_LOG_INIT("libvirt.stream"); @@ -330,7 +331,7 @@ virStreamSendAll(virStreamPtr stream, void *opaque) { char *bytes = NULL; - int want = 1024*64; + size_t want = VIR_NET_MESSAGE_LEGACY_PAYLOAD_MAX; int ret = -1; VIR_DEBUG("stream=%p, handler=%p, opaque=%p", stream, handler, opaque); @@ -423,7 +424,7 @@ virStreamRecvAll(virStreamPtr stream, void *opaque) { char *bytes = NULL; - int want = 1024*64; + size_t want = VIR_NET_MESSAGE_LEGACY_PAYLOAD_MAX; int ret = -1; VIR_DEBUG("stream=%p, handler=%p, opaque=%p", stream, handler, opaque); -- 2.7.3

On 04/15/2016 09:51 AM, Michal Privoznik wrote:
These are wrappers over virStreamRecv and virStreamSend so that users have to care about nothing but writing data into / reading data from a sink (typically a file). Note, that these wrappers are used exclusively on client side as the daemon has slightly different approach. Anyway, the wrappers allocate this buffer and use it for intermediate data storage until the data is passed to stream to send, or to the client application. So far, we are using 64KB buffer. This is enough, but suboptimal because server can send messages up to VIR_NET_MESSAGE_LEGACY_PAYLOAD_MAX bytes big (262120B, roughly 256KB). So if we make the buffer this big, a single message containing the data is sent instead of for, ^^^ I assume you mean, "s/for/four"
John
which is current situation. This means lower overhead, because each message contains a header which needs to be processed, each message is processed roughly same amount of time regardless of its size, less bytes need to be sent through the wire, and so on. Note that since server will never sent us a stream message bigger than VIR_NET_MESSAGE_LEGACY_PAYLOAD_MAX there's no point in sizing up the client buffer past this threshold.
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/libvirt-stream.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-)
diff --git a/src/libvirt-stream.c b/src/libvirt-stream.c index c16f586..8384b37 100644 --- a/src/libvirt-stream.c +++ b/src/libvirt-stream.c @@ -23,6 +23,7 @@ #include "datatypes.h" #include "viralloc.h" #include "virlog.h" +#include "rpc/virnetprotocol.h"
VIR_LOG_INIT("libvirt.stream");
@@ -330,7 +331,7 @@ virStreamSendAll(virStreamPtr stream, void *opaque) { char *bytes = NULL; - int want = 1024*64; + size_t want = VIR_NET_MESSAGE_LEGACY_PAYLOAD_MAX; int ret = -1; VIR_DEBUG("stream=%p, handler=%p, opaque=%p", stream, handler, opaque);
@@ -423,7 +424,7 @@ virStreamRecvAll(virStreamPtr stream, void *opaque) { char *bytes = NULL; - int want = 1024*64; + size_t want = VIR_NET_MESSAGE_LEGACY_PAYLOAD_MAX; int ret = -1; VIR_DEBUG("stream=%p, handler=%p, opaque=%p", stream, handler, opaque);

On 20.04.2016 15:58, John Ferlan wrote:
On 04/15/2016 09:51 AM, Michal Privoznik wrote:
These are wrappers over virStreamRecv and virStreamSend so that users have to care about nothing but writing data into / reading data from a sink (typically a file). Note, that these wrappers are used exclusively on client side as the daemon has slightly different approach. Anyway, the wrappers allocate this buffer and use it for intermediate data storage until the data is passed to stream to send, or to the client application. So far, we are using 64KB buffer. This is enough, but suboptimal because server can send messages up to VIR_NET_MESSAGE_LEGACY_PAYLOAD_MAX bytes big (262120B, roughly 256KB). So if we make the buffer this big, a single message containing the data is sent instead of for, ^^^ I assume you mean, "s/for/four"
Er, yes. See? You understand this area of the code :) Michal

On 04/15/2016 09:51 AM, Michal Privoznik wrote:
This is not the big patch set that enables sparse streams. Not just yet. I'm merely sending first few patches that prepare the environment for that. These can, however, go in independent of sparse streams.
Michal Privoznik (9): daemonClientStream: Use unsigned int to store stream @serial daemon stream: Prefer bool over unsigned int var:1 daemon stream: Convert @tx in daemonClientStream to bool daemon stream: Remove useless empty lines from header file virNetClientCallDispatchStream: Update comment daemonStreamHandleRead: Rework to follow our coding pattern Revert "rpc: Fix slow volume download (virsh vol-download)" virnetclientstream: Process stream messages later virStream{Recv,Send}All: Increase client buffer
daemon/stream.c | 116 +++++++++++++++---------------- daemon/stream.h | 2 - src/libvirt-stream.c | 5 +- src/rpc/virnetclient.c | 6 +- src/rpc/virnetclientstream.c | 158 +++++++++++++++--------------------------- src/rpc/virnetserverprogram.c | 12 ++-- src/rpc/virnetserverprogram.h | 4 +- 7 files changed, 129 insertions(+), 174 deletions(-)
I'm assuming for patch 2-3, the memory for the int -> bool conversions is all "local" to the client side... I've made comments on a few patches... I'm not an expert in this space by any stretch - hopefully Martin can look at patch 8 (and since it goes w/ 7 - they're a matched set. ACK for at at least 1-6... I'm OK with 7&8 (although I had a double check type question in 8 regarding the while loop and usage of 'want' as a loop control along with the st->rx that could have been Queue'd. I'm OK with patch 9 as well - I think it's fine - a bit more memory for a bit less back and forth. Standard tradeoff to me ;-). Perhaps someone else may have agita over it though, so lets just be sure no one does before pushing that one. John

On 20.04.2016 16:04, John Ferlan wrote:
On 04/15/2016 09:51 AM, Michal Privoznik wrote:
This is not the big patch set that enables sparse streams. Not just yet. I'm merely sending first few patches that prepare the environment for that. These can, however, go in independent of sparse streams.
Michal Privoznik (9): daemonClientStream: Use unsigned int to store stream @serial daemon stream: Prefer bool over unsigned int var:1 daemon stream: Convert @tx in daemonClientStream to bool daemon stream: Remove useless empty lines from header file virNetClientCallDispatchStream: Update comment daemonStreamHandleRead: Rework to follow our coding pattern Revert "rpc: Fix slow volume download (virsh vol-download)" virnetclientstream: Process stream messages later virStream{Recv,Send}All: Increase client buffer
daemon/stream.c | 116 +++++++++++++++---------------- daemon/stream.h | 2 - src/libvirt-stream.c | 5 +- src/rpc/virnetclient.c | 6 +- src/rpc/virnetclientstream.c | 158 +++++++++++++++--------------------------- src/rpc/virnetserverprogram.c | 12 ++-- src/rpc/virnetserverprogram.h | 4 +- 7 files changed, 129 insertions(+), 174 deletions(-)
I'm assuming for patch 2-3, the memory for the int -> bool conversions is all "local" to the client side...
I've made comments on a few patches... I'm not an expert in this space by any stretch - hopefully Martin can look at patch 8 (and since it goes w/ 7 - they're a matched set.
ACK for at at least 1-6... I'm OK with 7&8 (although I had a double check type question in 8 regarding the while loop and usage of 'want' as a loop control along with the st->rx that could have been Queue'd.
I'm OK with patch 9 as well - I think it's fine - a bit more memory for a bit less back and forth. Standard tradeoff to me ;-). Perhaps someone else may have agita over it though, so lets just be sure no one does before pushing that one.
Thank you, I've pushed 1-6 for now as I wait for more reviews on the rest. Michal
participants (2)
-
John Ferlan
-
Michal Privoznik