Use I/O vector (iovec) instead of one huge memory buffer as suggested
in
https://bugzilla.redhat.com/show_bug.cgi?id=1026137#c7. This avoids
doing memmove() to big buffers and performance doesn't degrade if
source (virNetClientStreamQueuePacket()) is faster than sink
(virNetClientStreamRecvPacket()).
---
src/rpc/virnetclientstream.c | 134 +++++++++++++++++++++++++----------------
1 files changed, 82 insertions(+), 52 deletions(-)
diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c
index b428f4b..18c6e8b 100644
--- a/src/rpc/virnetclientstream.c
+++ b/src/rpc/virnetclientstream.c
@@ -49,9 +49,9 @@ struct _virNetClientStream {
* time by stopping consuming any incoming data
* off the socket....
*/
- char *incoming;
- size_t incomingOffset;
- size_t incomingLength;
+ struct iovec *incomingVec; /* I/O Vector to hold data */
+ size_t writeVec; /* Vectors produced */
+ size_t readVec; /* Vectors consumed */
bool incomingEOF;
virNetClientStreamEventCallback cb;
@@ -86,9 +86,9 @@ virNetClientStreamEventTimerUpdate(virNetClientStreamPtr st)
if (!st->cb)
return;
- VIR_DEBUG("Check timer offset=%zu %d", st->incomingOffset,
st->cbEvents);
+ VIR_DEBUG("Check timer readVec %zu writeVec %zu %d", st->readVec,
st->writeVec, st->cbEvents);
- if (((st->incomingOffset || st->incomingEOF) &&
+ if ((((st->readVec < st->writeVec) || st->incomingEOF) &&
(st->cbEvents & VIR_STREAM_EVENT_READABLE)) ||
(st->cbEvents & VIR_STREAM_EVENT_WRITABLE)) {
VIR_DEBUG("Enabling event timer");
@@ -110,13 +110,14 @@ virNetClientStreamEventTimer(int timer ATTRIBUTE_UNUSED, void
*opaque)
if (st->cb &&
(st->cbEvents & VIR_STREAM_EVENT_READABLE) &&
- (st->incomingOffset || st->incomingEOF))
+ ((st->readVec < st->writeVec) || st->incomingEOF))
events |= VIR_STREAM_EVENT_READABLE;
if (st->cb &&
(st->cbEvents & VIR_STREAM_EVENT_WRITABLE))
events |= VIR_STREAM_EVENT_WRITABLE;
- VIR_DEBUG("Got Timer dispatch %d %d offset=%zu", events, st->cbEvents,
st->incomingOffset);
+ VIR_DEBUG("Got Timer dispatch %d %d readVec %zu writeVec %zu", events,
st->cbEvents,
+ st->readVec, st->writeVec);
if (events) {
virNetClientStreamEventCallback cb = st->cb;
void *cbOpaque = st->cbOpaque;
@@ -161,7 +162,7 @@ void virNetClientStreamDispose(void *obj)
virNetClientStreamPtr st = obj;
virResetError(&st->err);
- VIR_FREE(st->incoming);
+ VIR_FREE(st->incomingVec);
virObjectUnref(st->prog);
}
@@ -265,38 +266,49 @@ int virNetClientStreamQueuePacket(virNetClientStreamPtr st,
virNetMessagePtr msg)
{
int ret = -1;
- size_t need;
+ struct iovec iov;
+ char *base;
+ size_t piece, pieces, length, offset = 0, size = 1024*1024;
virObjectLock(st);
- need = msg->bufferLength - msg->bufferOffset;
- if (need) {
- size_t avail = st->incomingLength - st->incomingOffset;
- if (need > avail) {
- size_t extra = need - avail;
- if (VIR_REALLOC_N(st->incoming,
- st->incomingLength + extra) < 0) {
- VIR_DEBUG("Out of memory handling stream data");
- goto cleanup;
- }
- st->incomingLength += extra;
- }
- memcpy(st->incoming + st->incomingOffset,
- msg->buffer + msg->bufferOffset,
- msg->bufferLength - msg->bufferOffset);
- st->incomingOffset += (msg->bufferLength - msg->bufferOffset);
- } else {
+ length = msg->bufferLength - msg->bufferOffset;
+
+ if (length == 0) {
st->incomingEOF = true;
+ goto end;
}
- VIR_DEBUG("Stream incoming data offset %zu length %zu EOF %d",
- st->incomingOffset, st->incomingLength,
- st->incomingEOF);
+ pieces = (length + size - 1) / size;
+ for (piece = 0; piece < pieces; piece++) {
+ if (size > length - offset)
+ size = length - offset;
+
+ if (VIR_ALLOC_N(base, size)) {
+ VIR_DEBUG("Allocation failed");
+ goto cleanup;
+ }
+
+ memcpy(base, msg->buffer + msg->bufferOffset + offset, size);
+ iov.iov_base = base;
+ iov.iov_len = size;
+ offset += size;
+
+ if (VIR_APPEND_ELEMENT(st->incomingVec, st->writeVec, iov) < 0) {
+ VIR_DEBUG("Append failed");
+ VIR_FREE(base);
+ goto cleanup;
+ }
+ VIR_DEBUG("Wrote piece of vector. readVec %zu, writeVec %zu size %zu",
st->readVec, st->writeVec, size);
+ }
+
+ end:
virNetClientStreamEventTimerUpdate(st);
-
ret = 0;
cleanup:
+ VIR_DEBUG("Stream incoming data readVec %zu writeVec %zu EOF %d",
+ st->readVec, st->writeVec, st->incomingEOF);
virObjectUnlock(st);
return ret;
}
@@ -361,17 +373,21 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st,
size_t nbytes,
bool nonblock)
{
- int rv = -1;
+ int ret = -1;
+ size_t partial, offset;
+
+ virObjectLock(st);
+
VIR_DEBUG("st=%p client=%p data=%p nbytes=%zu nonblock=%d",
st, client, data, nbytes, nonblock);
- virObjectLock(st);
- if (!st->incomingOffset && !st->incomingEOF) {
+
+ if ((st->readVec >= st->writeVec) && !st->incomingEOF) {
virNetMessagePtr msg;
- int ret;
+ int rv;
if (nonblock) {
VIR_DEBUG("Non-blocking mode and no data available");
- rv = -2;
+ ret = -2;
goto cleanup;
}
@@ -387,37 +403,51 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st,
VIR_DEBUG("Dummy packet to wait for stream data");
virObjectUnlock(st);
- ret = virNetClientSendWithReplyStream(client, msg, st);
+ rv = virNetClientSendWithReplyStream(client, msg, st);
virObjectLock(st);
virNetMessageFree(msg);
- if (ret < 0)
+ if (rv < 0)
goto cleanup;
}
- VIR_DEBUG("After IO %zu", st->incomingOffset);
- if (st->incomingOffset) {
- int want = st->incomingOffset;
- if (want > nbytes)
- want = nbytes;
- memcpy(data, st->incoming, want);
- if (want < st->incomingOffset) {
- memmove(st->incoming, st->incoming + want, st->incomingOffset -
want);
- st->incomingOffset -= want;
+ offset = 0;
+ partial = nbytes;
+
+ while (st->incomingVec && (st->readVec < st->writeVec)) {
+ struct iovec *iov = st->incomingVec + st->readVec;
+
+ if (!iov || !iov->iov_base) {
+ VIR_DEBUG("NULL pointer");
+ goto cleanup;
+ }
+
+ if (partial < iov->iov_len) {
+ memcpy(data+offset, iov->iov_base, partial);
+ memmove(iov->iov_base, (char*)iov->iov_base+partial,
iov->iov_len-partial);
+ iov->iov_len -= partial;
+ offset += partial;
+ VIR_DEBUG("Consumed %zu, left %zu", partial, iov->iov_len);
+ break;
} else {
- VIR_FREE(st->incoming);
- st->incomingOffset = st->incomingLength = 0;
+ memcpy(data+offset, iov->iov_base, iov->iov_len);
+ VIR_DEBUG("Consumed %zu. Moving to next piece", iov->iov_len);
+ partial -= iov->iov_len;
+ offset += iov->iov_len;
+ VIR_FREE(iov->iov_base);
+ iov->iov_len = 0;
+ st->readVec++;
}
- rv = want;
- } else {
- rv = 0;
+
+ VIR_DEBUG("Read piece of vector. read %zu readVec %zu, writeVec %zu",
offset, st->readVec, st->writeVec);
}
+ ret = offset;
virNetClientStreamEventTimerUpdate(st);
cleanup:
virObjectUnlock(st);
- return rv;
+ return ret;
}
--
1.7.1