[libvirt] [PATCH] rpc: RH1026137: Fix slow volume download (virsh vol-download)

Use I/O vector (iovec) instead of one huge memory buffer as suggested in https://bugzilla.redhat.com/show_bug.cgi?id=1026137#c7. This avoids doing memmove() to big buffers and performance doesn't degrade if source (virNetClientStreamQueuePacket()) is faster than sink (virNetClientStreamRecvPacket()). --- src/rpc/virnetclientstream.c | 134 +++++++++++++++++++++++++---------------- 1 files changed, 82 insertions(+), 52 deletions(-) diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index b428f4b..18c6e8b 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -49,9 +49,9 @@ struct _virNetClientStream { * time by stopping consuming any incoming data * off the socket.... */ - char *incoming; - size_t incomingOffset; - size_t incomingLength; + struct iovec *incomingVec; /* I/O Vector to hold data */ + size_t writeVec; /* Vectors produced */ + size_t readVec; /* Vectors consumed */ bool incomingEOF; virNetClientStreamEventCallback cb; @@ -86,9 +86,9 @@ virNetClientStreamEventTimerUpdate(virNetClientStreamPtr st) if (!st->cb) return; - VIR_DEBUG("Check timer offset=%zu %d", st->incomingOffset, st->cbEvents); + VIR_DEBUG("Check timer readVec %zu writeVec %zu %d", st->readVec, st->writeVec, st->cbEvents); - if (((st->incomingOffset || st->incomingEOF) && + if ((((st->readVec < st->writeVec) || st->incomingEOF) && (st->cbEvents & VIR_STREAM_EVENT_READABLE)) || (st->cbEvents & VIR_STREAM_EVENT_WRITABLE)) { VIR_DEBUG("Enabling event timer"); @@ -110,13 +110,14 @@ virNetClientStreamEventTimer(int timer ATTRIBUTE_UNUSED, void *opaque) if (st->cb && (st->cbEvents & VIR_STREAM_EVENT_READABLE) && - (st->incomingOffset || st->incomingEOF)) + ((st->readVec < st->writeVec) || st->incomingEOF)) events |= VIR_STREAM_EVENT_READABLE; if (st->cb && (st->cbEvents & VIR_STREAM_EVENT_WRITABLE)) events |= VIR_STREAM_EVENT_WRITABLE; - VIR_DEBUG("Got Timer dispatch %d %d offset=%zu", events, st->cbEvents, st->incomingOffset); + VIR_DEBUG("Got Timer dispatch %d %d readVec %zu writeVec %zu", events, st->cbEvents, + st->readVec, st->writeVec); if (events) { virNetClientStreamEventCallback cb = st->cb; void *cbOpaque = st->cbOpaque; @@ -161,7 +162,7 @@ void virNetClientStreamDispose(void *obj) virNetClientStreamPtr st = obj; virResetError(&st->err); - VIR_FREE(st->incoming); + VIR_FREE(st->incomingVec); virObjectUnref(st->prog); } @@ -265,38 +266,49 @@ int virNetClientStreamQueuePacket(virNetClientStreamPtr st, virNetMessagePtr msg) { int ret = -1; - size_t need; + struct iovec iov; + char *base; + size_t piece, pieces, length, offset = 0, size = 1024*1024; virObjectLock(st); - need = msg->bufferLength - msg->bufferOffset; - if (need) { - size_t avail = st->incomingLength - st->incomingOffset; - if (need > avail) { - size_t extra = need - avail; - if (VIR_REALLOC_N(st->incoming, - st->incomingLength + extra) < 0) { - VIR_DEBUG("Out of memory handling stream data"); - goto cleanup; - } - st->incomingLength += extra; - } - memcpy(st->incoming + st->incomingOffset, - msg->buffer + msg->bufferOffset, - msg->bufferLength - msg->bufferOffset); - st->incomingOffset += (msg->bufferLength - msg->bufferOffset); - } else { + length = msg->bufferLength - msg->bufferOffset; + + if (length == 0) { st->incomingEOF = true; + goto end; } - VIR_DEBUG("Stream incoming data offset %zu length %zu EOF %d", - st->incomingOffset, st->incomingLength, - st->incomingEOF); + pieces = (length + size - 1) / size; + for (piece = 0; piece < pieces; piece++) { + if (size > length - offset) + size = length - offset; + + if (VIR_ALLOC_N(base, size)) { + VIR_DEBUG("Allocation failed"); + goto cleanup; + } + + memcpy(base, msg->buffer + msg->bufferOffset + offset, size); + iov.iov_base = base; + iov.iov_len = size; + offset += size; + + if (VIR_APPEND_ELEMENT(st->incomingVec, st->writeVec, iov) < 0) { + VIR_DEBUG("Append failed"); + VIR_FREE(base); + goto cleanup; + } + VIR_DEBUG("Wrote piece of vector. readVec %zu, writeVec %zu size %zu", st->readVec, st->writeVec, size); + } + + end: virNetClientStreamEventTimerUpdate(st); - ret = 0; cleanup: + VIR_DEBUG("Stream incoming data readVec %zu writeVec %zu EOF %d", + st->readVec, st->writeVec, st->incomingEOF); virObjectUnlock(st); return ret; } @@ -361,17 +373,21 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, size_t nbytes, bool nonblock) { - int rv = -1; + int ret = -1; + size_t partial, offset; + + virObjectLock(st); + VIR_DEBUG("st=%p client=%p data=%p nbytes=%zu nonblock=%d", st, client, data, nbytes, nonblock); - virObjectLock(st); - if (!st->incomingOffset && !st->incomingEOF) { + + if ((st->readVec >= st->writeVec) && !st->incomingEOF) { virNetMessagePtr msg; - int ret; + int rv; if (nonblock) { VIR_DEBUG("Non-blocking mode and no data available"); - rv = -2; + ret = -2; goto cleanup; } @@ -387,37 +403,51 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, VIR_DEBUG("Dummy packet to wait for stream data"); virObjectUnlock(st); - ret = virNetClientSendWithReplyStream(client, msg, st); + rv = virNetClientSendWithReplyStream(client, msg, st); virObjectLock(st); virNetMessageFree(msg); - if (ret < 0) + if (rv < 0) goto cleanup; } - VIR_DEBUG("After IO %zu", st->incomingOffset); - if (st->incomingOffset) { - int want = st->incomingOffset; - if (want > nbytes) - want = nbytes; - memcpy(data, st->incoming, want); - if (want < st->incomingOffset) { - memmove(st->incoming, st->incoming + want, st->incomingOffset - want); - st->incomingOffset -= want; + offset = 0; + partial = nbytes; + + while (st->incomingVec && (st->readVec < st->writeVec)) { + struct iovec *iov = st->incomingVec + st->readVec; + + if (!iov || !iov->iov_base) { + VIR_DEBUG("NULL pointer"); + goto cleanup; + } + + if (partial < iov->iov_len) { + memcpy(data+offset, iov->iov_base, partial); + memmove(iov->iov_base, (char*)iov->iov_base+partial, iov->iov_len-partial); + iov->iov_len -= partial; + offset += partial; + VIR_DEBUG("Consumed %zu, left %zu", partial, iov->iov_len); + break; } else { - VIR_FREE(st->incoming); - st->incomingOffset = st->incomingLength = 0; + memcpy(data+offset, iov->iov_base, iov->iov_len); + VIR_DEBUG("Consumed %zu. Moving to next piece", iov->iov_len); + partial -= iov->iov_len; + offset += iov->iov_len; + VIR_FREE(iov->iov_base); + iov->iov_len = 0; + st->readVec++; } - rv = want; - } else { - rv = 0; + + VIR_DEBUG("Read piece of vector. read %zu readVec %zu, writeVec %zu", offset, st->readVec, st->writeVec); } + ret = offset; virNetClientStreamEventTimerUpdate(st); cleanup: virObjectUnlock(st); - return rv; + return ret; } -- 1.7.1

On Sat, Jun 06, 2015 at 07:36:48PM +0000, Ossi Herrala wrote:
Use I/O vector (iovec) instead of one huge memory buffer as suggested in https://bugzilla.redhat.com/show_bug.cgi?id=1026137#c7. This avoids doing memmove() to big buffers and performance doesn't degrade if source (virNetClientStreamQueuePacket()) is faster than sink (virNetClientStreamRecvPacket()).
Sorry to miss this mail, it got buried somehow and I haven't got to it until now since nobody pinged it. Sorry for the long wait then. I would remove the 'RH1026137: ' from the commit message and instead added a 'Resolves: http://bugzilla.redhat.com/1026137' or something similar here.
--- src/rpc/virnetclientstream.c | 134 +++++++++++++++++++++++++---------------- 1 files changed, 82 insertions(+), 52 deletions(-)
diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index b428f4b..18c6e8b 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -49,9 +49,9 @@ struct _virNetClientStream { * time by stopping consuming any incoming data * off the socket.... */ - char *incoming; - size_t incomingOffset; - size_t incomingLength; + struct iovec *incomingVec; /* I/O Vector to hold data */ + size_t writeVec; /* Vectors produced */ + size_t readVec; /* Vectors consumed */ bool incomingEOF;
virNetClientStreamEventCallback cb; @@ -86,9 +86,9 @@ virNetClientStreamEventTimerUpdate(virNetClientStreamPtr st) if (!st->cb) return;
- VIR_DEBUG("Check timer offset=%zu %d", st->incomingOffset, st->cbEvents); + VIR_DEBUG("Check timer readVec %zu writeVec %zu %d", st->readVec, st->writeVec, st->cbEvents);
- if (((st->incomingOffset || st->incomingEOF) && + if ((((st->readVec < st->writeVec) || st->incomingEOF) && (st->cbEvents & VIR_STREAM_EVENT_READABLE)) || (st->cbEvents & VIR_STREAM_EVENT_WRITABLE)) { VIR_DEBUG("Enabling event timer"); @@ -110,13 +110,14 @@ virNetClientStreamEventTimer(int timer ATTRIBUTE_UNUSED, void *opaque)
if (st->cb && (st->cbEvents & VIR_STREAM_EVENT_READABLE) && - (st->incomingOffset || st->incomingEOF)) + ((st->readVec < st->writeVec) || st->incomingEOF)) events |= VIR_STREAM_EVENT_READABLE; if (st->cb && (st->cbEvents & VIR_STREAM_EVENT_WRITABLE)) events |= VIR_STREAM_EVENT_WRITABLE;
- VIR_DEBUG("Got Timer dispatch %d %d offset=%zu", events, st->cbEvents, st->incomingOffset); + VIR_DEBUG("Got Timer dispatch %d %d readVec %zu writeVec %zu", events, st->cbEvents, + st->readVec, st->writeVec); if (events) { virNetClientStreamEventCallback cb = st->cb; void *cbOpaque = st->cbOpaque; @@ -161,7 +162,7 @@ void virNetClientStreamDispose(void *obj) virNetClientStreamPtr st = obj;
virResetError(&st->err); - VIR_FREE(st->incoming); + VIR_FREE(st->incomingVec); virObjectUnref(st->prog); }
@@ -265,38 +266,49 @@ int virNetClientStreamQueuePacket(virNetClientStreamPtr st, virNetMessagePtr msg) { int ret = -1; - size_t need; + struct iovec iov; + char *base; + size_t piece, pieces, length, offset = 0, size = 1024*1024;
virObjectLock(st); - need = msg->bufferLength - msg->bufferOffset; - if (need) { - size_t avail = st->incomingLength - st->incomingOffset; - if (need > avail) { - size_t extra = need - avail; - if (VIR_REALLOC_N(st->incoming, - st->incomingLength + extra) < 0) { - VIR_DEBUG("Out of memory handling stream data"); - goto cleanup; - } - st->incomingLength += extra; - }
- memcpy(st->incoming + st->incomingOffset, - msg->buffer + msg->bufferOffset, - msg->bufferLength - msg->bufferOffset); - st->incomingOffset += (msg->bufferLength - msg->bufferOffset); - } else { + length = msg->bufferLength - msg->bufferOffset; + + if (length == 0) { st->incomingEOF = true; + goto end; }
- VIR_DEBUG("Stream incoming data offset %zu length %zu EOF %d", - st->incomingOffset, st->incomingLength, - st->incomingEOF); + pieces = (length + size - 1) / size; + for (piece = 0; piece < pieces; piece++) { + if (size > length - offset) + size = length - offset; + + if (VIR_ALLOC_N(base, size)) { + VIR_DEBUG("Allocation failed"); + goto cleanup; + } + + memcpy(base, msg->buffer + msg->bufferOffset + offset, size); + iov.iov_base = base; + iov.iov_len = size; + offset += size; + + if (VIR_APPEND_ELEMENT(st->incomingVec, st->writeVec, iov) < 0) { + VIR_DEBUG("Append failed"); + VIR_FREE(base); + goto cleanup; + } + VIR_DEBUG("Wrote piece of vector. readVec %zu, writeVec %zu size %zu", st->readVec, st->writeVec, size);
Long line, should be wrapped.
+ } + + end: virNetClientStreamEventTimerUpdate(st); - ret = 0;
cleanup: + VIR_DEBUG("Stream incoming data readVec %zu writeVec %zu EOF %d", + st->readVec, st->writeVec, st->incomingEOF); virObjectUnlock(st); return ret; } @@ -361,17 +373,21 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, size_t nbytes, bool nonblock) { - int rv = -1; + int ret = -1; + size_t partial, offset; + + virObjectLock(st); + VIR_DEBUG("st=%p client=%p data=%p nbytes=%zu nonblock=%d", st, client, data, nbytes, nonblock); - virObjectLock(st); - if (!st->incomingOffset && !st->incomingEOF) { + + if ((st->readVec >= st->writeVec) && !st->incomingEOF) { virNetMessagePtr msg; - int ret; + int rv;
if (nonblock) { VIR_DEBUG("Non-blocking mode and no data available"); - rv = -2; + ret = -2; goto cleanup; }
@@ -387,37 +403,51 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st,
VIR_DEBUG("Dummy packet to wait for stream data"); virObjectUnlock(st); - ret = virNetClientSendWithReplyStream(client, msg, st); + rv = virNetClientSendWithReplyStream(client, msg, st); virObjectLock(st); virNetMessageFree(msg);
- if (ret < 0) + if (rv < 0) goto cleanup; }
- VIR_DEBUG("After IO %zu", st->incomingOffset); - if (st->incomingOffset) { - int want = st->incomingOffset; - if (want > nbytes) - want = nbytes; - memcpy(data, st->incoming, want); - if (want < st->incomingOffset) { - memmove(st->incoming, st->incoming + want, st->incomingOffset - want); - st->incomingOffset -= want; + offset = 0; + partial = nbytes; + + while (st->incomingVec && (st->readVec < st->writeVec)) { + struct iovec *iov = st->incomingVec + st->readVec; + + if (!iov || !iov->iov_base) { + VIR_DEBUG("NULL pointer");
This should be virReportError(VIR_ERR_INTERNAL_ERROR, ...) or VIR_ERR_RPC.
+ goto cleanup; + } + + if (partial < iov->iov_len) { + memcpy(data+offset, iov->iov_base, partial); + memmove(iov->iov_base, (char*)iov->iov_base+partial, iov->iov_len-partial);
Long line.
+ iov->iov_len -= partial; + offset += partial; + VIR_DEBUG("Consumed %zu, left %zu", partial, iov->iov_len); + break; } else {
You don't need to enclose this in an else body thanks to the break above.
- VIR_FREE(st->incoming); - st->incomingOffset = st->incomingLength = 0; + memcpy(data+offset, iov->iov_base, iov->iov_len); + VIR_DEBUG("Consumed %zu. Moving to next piece", iov->iov_len); + partial -= iov->iov_len; + offset += iov->iov_len; + VIR_FREE(iov->iov_base); + iov->iov_len = 0; + st->readVec++;
The only thing I would mention wrt to how it works after this patch is that it will consume some memory that's not needed, precisely (sizeof(struct iovec) + sizeof(void *)) * unreadMBs. It might be worth it to do: memmove(st->incomingVec, st->incomingVec + st->readVec, st->writeVec - st->readVec); VIR_SHRINK_N(st->incomingVec, st->readVec); st->writeVec -= st->readVec; st->readVec = 0; Apart from that it's definitely *way* better approach. What do you think of such modification? This would go either instead 'st->readVec++', but rather at the end of this function, so it's not done after each MB read.
} - rv = want; - } else { - rv = 0; + + VIR_DEBUG("Read piece of vector. read %zu readVec %zu, writeVec %zu", offset, st->readVec, st->writeVec);
Long line.
}
+ ret = offset; virNetClientStreamEventTimerUpdate(st);
cleanup: virObjectUnlock(st); - return rv; + return ret; }
Apart from mentioned cosmetic changes this looks very nice. Martin

On Sat, Jun 06, 2015 at 07:36:48PM +0000, Ossi Herrala wrote:
Sorry to miss this mail, it got buried somehow and I haven't got to it until now since nobody pinged it. Sorry for the long wait then.
No worries and thank you for taking time to review my patch. See new patch attached as well as comments on the memory usage below. The patch is tested on latest master (e46791e003444ce825feaf5bb2a16f778ee951e5).
The only thing I would mention wrt to how it works after this patch is that it will consume some memory that's not needed, precisely (sizeof(struct iovec) + sizeof(void *)) * unreadMBs. It might be worth it to do:
memmove(st->incomingVec, st->incomingVec + st->readVec, st->writeVec - st->readVec); VIR_SHRINK_N(st->incomingVec, st->readVec); st->writeVec -= st->readVec; st->readVec = 0;
Apart from that it's definitely *way* better approach. What do you think of such modification? This would go either instead 'st->readVec++', but rather at the end of this function, so it's not done after each MB read.
I totally agree and implemented freeing of the memory in the new patch: if (st->readVec >= 16) { memmove(st->incomingVec, st->incomingVec + st->readVec, sizeof(*st->incomingVec)*(st->writeVec - st->readVec)); VIR_SHRINK_N(st->incomingVec, st->writeVec, st->readVec); VIR_DEBUG("shrink removed %zu, left %zu", st->readVec, st->writeVec); st->readVec = 0; } now it only frees memory in chunks of 16 to avoid doing memmove() all the time. The iovec is 16 bytes (in 64 bit Linux) so this frees 256 bytes at a time and in my testing usually everything or almost everything that is allocated. Thanks again for taking time to review and commit, and I hope the new patch below is formatted ok.
From 2ae95c31568eb800c1c6df3641a8ecbdc95bf268 Mon Sep 17 00:00:00 2001 From: Ossi Herrala <oherrala@gmail.com> Date: Mon, 20 Jul 2015 12:44:32 +0000 Subject: [PATCH] rpc: Fix slow volume download (virsh vol-download)
Use I/O vector (iovec) instead of one huge memory buffer as suggested in https://bugzilla.redhat.com/show_bug.cgi?id=1026137#c7. This avoids doing memmove() to big buffers and performance doesn't degrade if source (virNetClientStreamQueuePacket()) is faster than sink (virNetClientStreamRecvPacket()). Resolves: http://bugzilla.redhat.com/1026137 --- src/rpc/virnetclientstream.c | 152 +++++++++++++++++++++++++++--------------- 1 files changed, 99 insertions(+), 53 deletions(-) diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index b428f4b..1cc9002 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -49,9 +49,9 @@ struct _virNetClientStream { * time by stopping consuming any incoming data * off the socket.... */ - char *incoming; - size_t incomingOffset; - size_t incomingLength; + struct iovec *incomingVec; /* I/O Vector to hold data */ + size_t writeVec; /* Vectors produced */ + size_t readVec; /* Vectors consumed */ bool incomingEOF; virNetClientStreamEventCallback cb; @@ -86,9 +86,9 @@ virNetClientStreamEventTimerUpdate(virNetClientStreamPtr st) if (!st->cb) return; - VIR_DEBUG("Check timer offset=%zu %d", st->incomingOffset, st->cbEvents); + VIR_DEBUG("Check timer readVec %zu writeVec %zu %d", st->readVec, st->writeVec, st->cbEvents); - if (((st->incomingOffset || st->incomingEOF) && + if ((((st->readVec < st->writeVec) || st->incomingEOF) && (st->cbEvents & VIR_STREAM_EVENT_READABLE)) || (st->cbEvents & VIR_STREAM_EVENT_WRITABLE)) { VIR_DEBUG("Enabling event timer"); @@ -110,13 +110,14 @@ virNetClientStreamEventTimer(int timer ATTRIBUTE_UNUSED, void *opaque) if (st->cb && (st->cbEvents & VIR_STREAM_EVENT_READABLE) && - (st->incomingOffset || st->incomingEOF)) + ((st->readVec < st->writeVec) || st->incomingEOF)) events |= VIR_STREAM_EVENT_READABLE; if (st->cb && (st->cbEvents & VIR_STREAM_EVENT_WRITABLE)) events |= VIR_STREAM_EVENT_WRITABLE; - VIR_DEBUG("Got Timer dispatch %d %d offset=%zu", events, st->cbEvents, st->incomingOffset); + VIR_DEBUG("Got Timer dispatch %d %d readVec %zu writeVec %zu", events, st->cbEvents, + st->readVec, st->writeVec); if (events) { virNetClientStreamEventCallback cb = st->cb; void *cbOpaque = st->cbOpaque; @@ -161,7 +162,7 @@ void virNetClientStreamDispose(void *obj) virNetClientStreamPtr st = obj; virResetError(&st->err); - VIR_FREE(st->incoming); + VIR_FREE(st->incomingVec); virObjectUnref(st->prog); } @@ -265,38 +266,50 @@ int virNetClientStreamQueuePacket(virNetClientStreamPtr st, virNetMessagePtr msg) { int ret = -1; - size_t need; + struct iovec iov; + char *base; + size_t piece, pieces, length, offset = 0, size = 1024*1024; virObjectLock(st); - need = msg->bufferLength - msg->bufferOffset; - if (need) { - size_t avail = st->incomingLength - st->incomingOffset; - if (need > avail) { - size_t extra = need - avail; - if (VIR_REALLOC_N(st->incoming, - st->incomingLength + extra) < 0) { - VIR_DEBUG("Out of memory handling stream data"); - goto cleanup; - } - st->incomingLength += extra; - } - memcpy(st->incoming + st->incomingOffset, - msg->buffer + msg->bufferOffset, - msg->bufferLength - msg->bufferOffset); - st->incomingOffset += (msg->bufferLength - msg->bufferOffset); - } else { + length = msg->bufferLength - msg->bufferOffset; + + if (length == 0) { st->incomingEOF = true; + goto end; } - VIR_DEBUG("Stream incoming data offset %zu length %zu EOF %d", - st->incomingOffset, st->incomingLength, - st->incomingEOF); - virNetClientStreamEventTimerUpdate(st); + pieces = (length + size - 1) / size; + for (piece = 0; piece < pieces; piece++) { + if (size > length - offset) + size = length - offset; + + if (VIR_ALLOC_N(base, size)) { + VIR_DEBUG("Allocation failed"); + goto cleanup; + } + + memcpy(base, msg->buffer + msg->bufferOffset + offset, size); + iov.iov_base = base; + iov.iov_len = size; + offset += size; + if (VIR_APPEND_ELEMENT(st->incomingVec, st->writeVec, iov) < 0) { + VIR_DEBUG("Append failed"); + VIR_FREE(base); + goto cleanup; + } + VIR_DEBUG("Wrote piece of vector. readVec %zu, writeVec %zu size %zu", + st->readVec, st->writeVec, size); + } + + end: + virNetClientStreamEventTimerUpdate(st); ret = 0; cleanup: + VIR_DEBUG("Stream incoming data readVec %zu writeVec %zu EOF %d", + st->readVec, st->writeVec, st->incomingEOF); virObjectUnlock(st); return ret; } @@ -361,17 +374,21 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, size_t nbytes, bool nonblock) { - int rv = -1; + int ret = -1; + size_t partial, offset; + + virObjectLock(st); + VIR_DEBUG("st=%p client=%p data=%p nbytes=%zu nonblock=%d", st, client, data, nbytes, nonblock); - virObjectLock(st); - if (!st->incomingOffset && !st->incomingEOF) { + + if ((st->readVec >= st->writeVec) && !st->incomingEOF) { virNetMessagePtr msg; - int ret; + int rv; if (nonblock) { VIR_DEBUG("Non-blocking mode and no data available"); - rv = -2; + ret = -2; goto cleanup; } @@ -387,37 +404,66 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, VIR_DEBUG("Dummy packet to wait for stream data"); virObjectUnlock(st); - ret = virNetClientSendWithReplyStream(client, msg, st); + rv = virNetClientSendWithReplyStream(client, msg, st); virObjectLock(st); virNetMessageFree(msg); - if (ret < 0) + if (rv < 0) goto cleanup; } - VIR_DEBUG("After IO %zu", st->incomingOffset); - if (st->incomingOffset) { - int want = st->incomingOffset; - if (want > nbytes) - want = nbytes; - memcpy(data, st->incoming, want); - if (want < st->incomingOffset) { - memmove(st->incoming, st->incoming + want, st->incomingOffset - want); - st->incomingOffset -= want; - } else { - VIR_FREE(st->incoming); - st->incomingOffset = st->incomingLength = 0; + offset = 0; + partial = nbytes; + + while (st->incomingVec && (st->readVec < st->writeVec)) { + struct iovec *iov = st->incomingVec + st->readVec; + + if (!iov || !iov->iov_base) { + virReportError(VIR_ERR_INTERNAL_ERROR, + "%s", _("NULL pointer encountered")); + goto cleanup; } - rv = want; - } else { - rv = 0; + + if (partial < iov->iov_len) { + memcpy(data+offset, iov->iov_base, partial); + memmove(iov->iov_base, (char*)iov->iov_base+partial, + iov->iov_len-partial); + iov->iov_len -= partial; + offset += partial; + VIR_DEBUG("Consumed %zu, left %zu", partial, iov->iov_len); + break; + } + + memcpy(data+offset, iov->iov_base, iov->iov_len); + VIR_DEBUG("Consumed %zu. Moving to next piece", iov->iov_len); + partial -= iov->iov_len; + offset += iov->iov_len; + VIR_FREE(iov->iov_base); + iov->iov_len = 0; + st->readVec++; + + VIR_DEBUG("Read piece of vector. read %zu, readVec %zu, writeVec %zu", + offset, st->readVec, st->writeVec); } + /* Shrink the I/O Vector buffer to free up memory. Do the + shrinking only when there is selected amount or more buffers to + free so it doesn't constantly memmove() and realloc() buffers. + */ + if (st->readVec >= 16) { + memmove(st->incomingVec, st->incomingVec + st->readVec, + sizeof(*st->incomingVec)*(st->writeVec - st->readVec)); + VIR_SHRINK_N(st->incomingVec, st->writeVec, st->readVec); + VIR_DEBUG("shrink removed %zu, left %zu", st->readVec, st->writeVec); + st->readVec = 0; + } + + ret = offset; virNetClientStreamEventTimerUpdate(st); cleanup: virObjectUnlock(st); - return rv; + return ret; } -- 1.7.1 -- Ossi Herrala

On Mon, Jul 20, 2015 at 05:42:11PM +0300, Ossi Herrala wrote:
On Sat, Jun 06, 2015 at 07:36:48PM +0000, Ossi Herrala wrote:
Sorry to miss this mail, it got buried somehow and I haven't got to it until now since nobody pinged it. Sorry for the long wait then.
No worries and thank you for taking time to review my patch. See new patch attached as well as comments on the memory usage below.
The patch is tested on latest master (e46791e003444ce825feaf5bb2a16f778ee951e5).
The only thing I would mention wrt to how it works after this patch is that it will consume some memory that's not needed, precisely (sizeof(struct iovec) + sizeof(void *)) * unreadMBs. It might be worth it to do:
memmove(st->incomingVec, st->incomingVec + st->readVec, st->writeVec - st->readVec); VIR_SHRINK_N(st->incomingVec, st->readVec); st->writeVec -= st->readVec; st->readVec = 0;
Apart from that it's definitely *way* better approach. What do you think of such modification? This would go either instead 'st->readVec++', but rather at the end of this function, so it's not done after each MB read.
I totally agree and implemented freeing of the memory in the new patch:
if (st->readVec >= 16) { memmove(st->incomingVec, st->incomingVec + st->readVec, sizeof(*st->incomingVec)*(st->writeVec - st->readVec)); VIR_SHRINK_N(st->incomingVec, st->writeVec, st->readVec); VIR_DEBUG("shrink removed %zu, left %zu", st->readVec, st->writeVec); st->readVec = 0; }
now it only frees memory in chunks of 16 to avoid doing memmove() all the time. The iovec is 16 bytes (in 64 bit Linux) so this frees 256 bytes at a time and in my testing usually everything or almost everything that is allocated.
Thanks again for taking time to review and commit, and I hope the new patch below is formatted ok.
I managed to apply it as needed. I like how it works now and the feeing is fine, too. I tested it and it seems it's working perfectly. Without this patch I've got stuck on 1MiB/s locally after 11GiB in circa 1 minute, with this patch I managed to download 20GiB in less than a minute. ACK && will push after release since we're way after rc2 already. Again, sorry for the delays and thanks for the contribution! Have a nice day, Martin
participants (2)
-
Martin Kletzander
-
Ossi Herrala