There are two functions on the client that handle incoming stream
data. The first one virNetClientStreamQueuePacket() is a low
level function that just processes the incoming stream data from
the socket and stores it into an internal structure. This happens
in the client event loop therefore the shorter the callbacks are,
the better. The second function virNetClientStreamRecvPacket()
then handles copying data from internal structure into a client
provided buffer.
Change introduced in this commit makes just that: new queue for
incoming stream packets is introduced. Then instead of copying
data into intermediate internal buffer and then copying them into
user buffer, incoming stream messages are queue into the queue
and data is copied just once - in the upper layer function
virNetClientStreamRecvPacket(). In the end, there's just one
copying of data and therefore shorter event loop callback. This
should boost the performance which has proven to be the case in
my testing.
Signed-off-by: Michal Privoznik <mprivozn(a)redhat.com>
---
src/rpc/virnetclientstream.c | 106 ++++++++++++++++++++++---------------------
1 file changed, 54 insertions(+), 52 deletions(-)
diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c
index b428f4b..34989a9 100644
--- a/src/rpc/virnetclientstream.c
+++ b/src/rpc/virnetclientstream.c
@@ -49,9 +49,7 @@ struct _virNetClientStream {
* time by stopping consuming any incoming data
* off the socket....
*/
- char *incoming;
- size_t incomingOffset;
- size_t incomingLength;
+ virNetMessagePtr rx;
bool incomingEOF;
virNetClientStreamEventCallback cb;
@@ -86,9 +84,9 @@ virNetClientStreamEventTimerUpdate(virNetClientStreamPtr st)
if (!st->cb)
return;
- VIR_DEBUG("Check timer offset=%zu %d", st->incomingOffset,
st->cbEvents);
+ VIR_DEBUG("Check timer rx=%p cbEvents=%d", st->rx, st->cbEvents);
- if (((st->incomingOffset || st->incomingEOF) &&
+ if (((st->rx || st->incomingEOF) &&
(st->cbEvents & VIR_STREAM_EVENT_READABLE)) ||
(st->cbEvents & VIR_STREAM_EVENT_WRITABLE)) {
VIR_DEBUG("Enabling event timer");
@@ -110,13 +108,13 @@ virNetClientStreamEventTimer(int timer ATTRIBUTE_UNUSED, void
*opaque)
if (st->cb &&
(st->cbEvents & VIR_STREAM_EVENT_READABLE) &&
- (st->incomingOffset || st->incomingEOF))
+ (st->rx || st->incomingEOF))
events |= VIR_STREAM_EVENT_READABLE;
if (st->cb &&
(st->cbEvents & VIR_STREAM_EVENT_WRITABLE))
events |= VIR_STREAM_EVENT_WRITABLE;
- VIR_DEBUG("Got Timer dispatch %d %d offset=%zu", events, st->cbEvents,
st->incomingOffset);
+ VIR_DEBUG("Got Timer dispatch events=%d cbEvents=%d rx=%p", events,
st->cbEvents, st->rx);
if (events) {
virNetClientStreamEventCallback cb = st->cb;
void *cbOpaque = st->cbOpaque;
@@ -161,7 +159,11 @@ void virNetClientStreamDispose(void *obj)
virNetClientStreamPtr st = obj;
virResetError(&st->err);
- VIR_FREE(st->incoming);
+ while (st->rx) {
+ virNetMessagePtr msg = st->rx;
+ virNetMessageQueueServe(&st->rx);
+ virNetMessageFree(msg);
+ }
virObjectUnref(st->prog);
}
@@ -264,41 +266,34 @@ int virNetClientStreamSetError(virNetClientStreamPtr st,
int virNetClientStreamQueuePacket(virNetClientStreamPtr st,
virNetMessagePtr msg)
{
- int ret = -1;
- size_t need;
+ virNetMessagePtr tmp_msg;
+
+ VIR_DEBUG("Incoming stream message: stream=%p message=%p", st, msg);
+
+ /* Unfortunately, we must allocate new message as the one we
+ * get in @msg is going to be cleared later in the process. */
+
+ if (!(tmp_msg = virNetMessageNew(false)))
+ return -1;
+
+ /* Copy header */
+ memcpy(&tmp_msg->header, &msg->header, sizeof(msg->header));
+
+ /* Steal message buffer */
+ tmp_msg->buffer = msg->buffer;
+ tmp_msg->bufferLength = msg->bufferLength;
+ tmp_msg->bufferOffset = msg->bufferOffset;
+ msg->buffer = NULL;
+ msg->bufferLength = msg->bufferOffset = 0;
virObjectLock(st);
- need = msg->bufferLength - msg->bufferOffset;
- if (need) {
- size_t avail = st->incomingLength - st->incomingOffset;
- if (need > avail) {
- size_t extra = need - avail;
- if (VIR_REALLOC_N(st->incoming,
- st->incomingLength + extra) < 0) {
- VIR_DEBUG("Out of memory handling stream data");
- goto cleanup;
- }
- st->incomingLength += extra;
- }
- memcpy(st->incoming + st->incomingOffset,
- msg->buffer + msg->bufferOffset,
- msg->bufferLength - msg->bufferOffset);
- st->incomingOffset += (msg->bufferLength - msg->bufferOffset);
- } else {
- st->incomingEOF = true;
- }
+ virNetMessageQueuePush(&st->rx, tmp_msg);
- VIR_DEBUG("Stream incoming data offset %zu length %zu EOF %d",
- st->incomingOffset, st->incomingLength,
- st->incomingEOF);
virNetClientStreamEventTimerUpdate(st);
- ret = 0;
-
- cleanup:
virObjectUnlock(st);
- return ret;
+ return 0;
}
@@ -362,10 +357,12 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st,
bool nonblock)
{
int rv = -1;
+ size_t want;
+
VIR_DEBUG("st=%p client=%p data=%p nbytes=%zu nonblock=%d",
st, client, data, nbytes, nonblock);
virObjectLock(st);
- if (!st->incomingOffset && !st->incomingEOF) {
+ if (!st->rx && !st->incomingEOF) {
virNetMessagePtr msg;
int ret;
@@ -395,23 +392,28 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st,
goto cleanup;
}
- VIR_DEBUG("After IO %zu", st->incomingOffset);
- if (st->incomingOffset) {
- int want = st->incomingOffset;
- if (want > nbytes)
- want = nbytes;
- memcpy(data, st->incoming, want);
- if (want < st->incomingOffset) {
- memmove(st->incoming, st->incoming + want, st->incomingOffset -
want);
- st->incomingOffset -= want;
- } else {
- VIR_FREE(st->incoming);
- st->incomingOffset = st->incomingLength = 0;
+ VIR_DEBUG("After IO rx=%p", st->rx);
+ want = nbytes;
+ while (want && st->rx) {
+ virNetMessagePtr msg = st->rx;
+ size_t len = want;
+
+ if (len > msg->bufferLength - msg->bufferOffset)
+ len = msg->bufferLength - msg->bufferOffset;
+
+ if (!len)
+ break;
+
+ memcpy(data + (nbytes - want), msg->buffer + msg->bufferOffset, len);
+ want -= len;
+ msg->bufferOffset += len;
+
+ if (msg->bufferOffset == msg->bufferLength) {
+ virNetMessageQueueServe(&st->rx);
+ virNetMessageFree(msg);
}
- rv = want;
- } else {
- rv = 0;
}
+ rv = nbytes - want;
virNetClientStreamEventTimerUpdate(st);
--
2.8.1