On Sat, Jun 06, 2015 at 07:36:48PM +0000, Ossi Herrala wrote:
Sorry to miss this mail, it got buried somehow and I haven't got to it
until now since nobody pinged it. Sorry for the long wait then.
No worries and thank you for taking time to review my patch. See new
patch attached as well as comments on the memory usage below.
The patch is tested on latest master (e46791e003444ce825feaf5bb2a16f778ee951e5).
The only thing I would mention wrt to how it works after this patch
is
that it will consume some memory that's not needed, precisely
(sizeof(struct iovec) + sizeof(void *)) * unreadMBs. It might be
worth it to do:
memmove(st->incomingVec, st->incomingVec + st->readVec,
st->writeVec - st->readVec);
VIR_SHRINK_N(st->incomingVec, st->readVec);
st->writeVec -= st->readVec;
st->readVec = 0;
Apart from that it's definitely *way* better approach. What do you
think of such modification? This would go either instead
'st->readVec++', but rather at the end of this function, so it's not
done after each MB read.
I totally agree and implemented freeing of the memory in the new
patch:
if (st->readVec >= 16) {
memmove(st->incomingVec, st->incomingVec + st->readVec,
sizeof(*st->incomingVec)*(st->writeVec - st->readVec));
VIR_SHRINK_N(st->incomingVec, st->writeVec, st->readVec);
VIR_DEBUG("shrink removed %zu, left %zu", st->readVec, st->writeVec);
st->readVec = 0;
}
now it only frees memory in chunks of 16 to avoid doing memmove() all
the time. The iovec is 16 bytes (in 64 bit Linux) so this frees 256
bytes at a time and in my testing usually everything or almost
everything that is allocated.
Thanks again for taking time to review and commit, and I hope the new
patch below is formatted ok.
From 2ae95c31568eb800c1c6df3641a8ecbdc95bf268 Mon Sep 17 00:00:00 2001
From: Ossi Herrala <oherrala(a)gmail.com>
Date: Mon, 20 Jul 2015 12:44:32 +0000
Subject: [PATCH] rpc: Fix slow volume download (virsh vol-download)
Use I/O vector (iovec) instead of one huge memory buffer as suggested
in
https://bugzilla.redhat.com/show_bug.cgi?id=1026137#c7. This avoids
doing memmove() to big buffers and performance doesn't degrade if
source (virNetClientStreamQueuePacket()) is faster than sink
(virNetClientStreamRecvPacket()).
Resolves:
http://bugzilla.redhat.com/1026137
---
src/rpc/virnetclientstream.c | 152 +++++++++++++++++++++++++++---------------
1 files changed, 99 insertions(+), 53 deletions(-)
diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c
index b428f4b..1cc9002 100644
--- a/src/rpc/virnetclientstream.c
+++ b/src/rpc/virnetclientstream.c
@@ -49,9 +49,9 @@ struct _virNetClientStream {
* time by stopping consuming any incoming data
* off the socket....
*/
- char *incoming;
- size_t incomingOffset;
- size_t incomingLength;
+ struct iovec *incomingVec; /* I/O Vector to hold data */
+ size_t writeVec; /* Vectors produced */
+ size_t readVec; /* Vectors consumed */
bool incomingEOF;
virNetClientStreamEventCallback cb;
@@ -86,9 +86,9 @@ virNetClientStreamEventTimerUpdate(virNetClientStreamPtr st)
if (!st->cb)
return;
- VIR_DEBUG("Check timer offset=%zu %d", st->incomingOffset,
st->cbEvents);
+ VIR_DEBUG("Check timer readVec %zu writeVec %zu %d", st->readVec,
st->writeVec, st->cbEvents);
- if (((st->incomingOffset || st->incomingEOF) &&
+ if ((((st->readVec < st->writeVec) || st->incomingEOF) &&
(st->cbEvents & VIR_STREAM_EVENT_READABLE)) ||
(st->cbEvents & VIR_STREAM_EVENT_WRITABLE)) {
VIR_DEBUG("Enabling event timer");
@@ -110,13 +110,14 @@ virNetClientStreamEventTimer(int timer ATTRIBUTE_UNUSED, void
*opaque)
if (st->cb &&
(st->cbEvents & VIR_STREAM_EVENT_READABLE) &&
- (st->incomingOffset || st->incomingEOF))
+ ((st->readVec < st->writeVec) || st->incomingEOF))
events |= VIR_STREAM_EVENT_READABLE;
if (st->cb &&
(st->cbEvents & VIR_STREAM_EVENT_WRITABLE))
events |= VIR_STREAM_EVENT_WRITABLE;
- VIR_DEBUG("Got Timer dispatch %d %d offset=%zu", events, st->cbEvents,
st->incomingOffset);
+ VIR_DEBUG("Got Timer dispatch %d %d readVec %zu writeVec %zu", events,
st->cbEvents,
+ st->readVec, st->writeVec);
if (events) {
virNetClientStreamEventCallback cb = st->cb;
void *cbOpaque = st->cbOpaque;
@@ -161,7 +162,7 @@ void virNetClientStreamDispose(void *obj)
virNetClientStreamPtr st = obj;
virResetError(&st->err);
- VIR_FREE(st->incoming);
+ VIR_FREE(st->incomingVec);
virObjectUnref(st->prog);
}
@@ -265,38 +266,50 @@ int virNetClientStreamQueuePacket(virNetClientStreamPtr st,
virNetMessagePtr msg)
{
int ret = -1;
- size_t need;
+ struct iovec iov;
+ char *base;
+ size_t piece, pieces, length, offset = 0, size = 1024*1024;
virObjectLock(st);
- need = msg->bufferLength - msg->bufferOffset;
- if (need) {
- size_t avail = st->incomingLength - st->incomingOffset;
- if (need > avail) {
- size_t extra = need - avail;
- if (VIR_REALLOC_N(st->incoming,
- st->incomingLength + extra) < 0) {
- VIR_DEBUG("Out of memory handling stream data");
- goto cleanup;
- }
- st->incomingLength += extra;
- }
- memcpy(st->incoming + st->incomingOffset,
- msg->buffer + msg->bufferOffset,
- msg->bufferLength - msg->bufferOffset);
- st->incomingOffset += (msg->bufferLength - msg->bufferOffset);
- } else {
+ length = msg->bufferLength - msg->bufferOffset;
+
+ if (length == 0) {
st->incomingEOF = true;
+ goto end;
}
- VIR_DEBUG("Stream incoming data offset %zu length %zu EOF %d",
- st->incomingOffset, st->incomingLength,
- st->incomingEOF);
- virNetClientStreamEventTimerUpdate(st);
+ pieces = (length + size - 1) / size;
+ for (piece = 0; piece < pieces; piece++) {
+ if (size > length - offset)
+ size = length - offset;
+
+ if (VIR_ALLOC_N(base, size)) {
+ VIR_DEBUG("Allocation failed");
+ goto cleanup;
+ }
+
+ memcpy(base, msg->buffer + msg->bufferOffset + offset, size);
+ iov.iov_base = base;
+ iov.iov_len = size;
+ offset += size;
+ if (VIR_APPEND_ELEMENT(st->incomingVec, st->writeVec, iov) < 0) {
+ VIR_DEBUG("Append failed");
+ VIR_FREE(base);
+ goto cleanup;
+ }
+ VIR_DEBUG("Wrote piece of vector. readVec %zu, writeVec %zu size %zu",
+ st->readVec, st->writeVec, size);
+ }
+
+ end:
+ virNetClientStreamEventTimerUpdate(st);
ret = 0;
cleanup:
+ VIR_DEBUG("Stream incoming data readVec %zu writeVec %zu EOF %d",
+ st->readVec, st->writeVec, st->incomingEOF);
virObjectUnlock(st);
return ret;
}
@@ -361,17 +374,21 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st,
size_t nbytes,
bool nonblock)
{
- int rv = -1;
+ int ret = -1;
+ size_t partial, offset;
+
+ virObjectLock(st);
+
VIR_DEBUG("st=%p client=%p data=%p nbytes=%zu nonblock=%d",
st, client, data, nbytes, nonblock);
- virObjectLock(st);
- if (!st->incomingOffset && !st->incomingEOF) {
+
+ if ((st->readVec >= st->writeVec) && !st->incomingEOF) {
virNetMessagePtr msg;
- int ret;
+ int rv;
if (nonblock) {
VIR_DEBUG("Non-blocking mode and no data available");
- rv = -2;
+ ret = -2;
goto cleanup;
}
@@ -387,37 +404,66 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st,
VIR_DEBUG("Dummy packet to wait for stream data");
virObjectUnlock(st);
- ret = virNetClientSendWithReplyStream(client, msg, st);
+ rv = virNetClientSendWithReplyStream(client, msg, st);
virObjectLock(st);
virNetMessageFree(msg);
- if (ret < 0)
+ if (rv < 0)
goto cleanup;
}
- VIR_DEBUG("After IO %zu", st->incomingOffset);
- if (st->incomingOffset) {
- int want = st->incomingOffset;
- if (want > nbytes)
- want = nbytes;
- memcpy(data, st->incoming, want);
- if (want < st->incomingOffset) {
- memmove(st->incoming, st->incoming + want, st->incomingOffset -
want);
- st->incomingOffset -= want;
- } else {
- VIR_FREE(st->incoming);
- st->incomingOffset = st->incomingLength = 0;
+ offset = 0;
+ partial = nbytes;
+
+ while (st->incomingVec && (st->readVec < st->writeVec)) {
+ struct iovec *iov = st->incomingVec + st->readVec;
+
+ if (!iov || !iov->iov_base) {
+ virReportError(VIR_ERR_INTERNAL_ERROR,
+ "%s", _("NULL pointer encountered"));
+ goto cleanup;
}
- rv = want;
- } else {
- rv = 0;
+
+ if (partial < iov->iov_len) {
+ memcpy(data+offset, iov->iov_base, partial);
+ memmove(iov->iov_base, (char*)iov->iov_base+partial,
+ iov->iov_len-partial);
+ iov->iov_len -= partial;
+ offset += partial;
+ VIR_DEBUG("Consumed %zu, left %zu", partial, iov->iov_len);
+ break;
+ }
+
+ memcpy(data+offset, iov->iov_base, iov->iov_len);
+ VIR_DEBUG("Consumed %zu. Moving to next piece", iov->iov_len);
+ partial -= iov->iov_len;
+ offset += iov->iov_len;
+ VIR_FREE(iov->iov_base);
+ iov->iov_len = 0;
+ st->readVec++;
+
+ VIR_DEBUG("Read piece of vector. read %zu, readVec %zu, writeVec %zu",
+ offset, st->readVec, st->writeVec);
}
+ /* Shrink the I/O Vector buffer to free up memory. Do the
+ shrinking only when there is selected amount or more buffers to
+ free so it doesn't constantly memmove() and realloc() buffers.
+ */
+ if (st->readVec >= 16) {
+ memmove(st->incomingVec, st->incomingVec + st->readVec,
+ sizeof(*st->incomingVec)*(st->writeVec - st->readVec));
+ VIR_SHRINK_N(st->incomingVec, st->writeVec, st->readVec);
+ VIR_DEBUG("shrink removed %zu, left %zu", st->readVec,
st->writeVec);
+ st->readVec = 0;
+ }
+
+ ret = offset;
virNetClientStreamEventTimerUpdate(st);
cleanup:
virObjectUnlock(st);
- return rv;
+ return ret;
}
--
1.7.1
--
Ossi Herrala