On 04/15/2016 09:51 AM, Michal Privoznik wrote:
Currently we have two separate functions for handling read from
a stream. One is supposed to be low level and reads data in this
self allocating chunk of memory. The other read function then
copies data over from the chunk into a user buffer. There are two
memcpy() involved even though a single would be sufficient.
Moreover, since we are copying just data, we can't process
alternative stream packets in the latter function, like stream
seeks.
In my testing, this proved two times faster then implementation
s/then/than the/
which uses IO vectors.
Can I "assume" this testing covers the reverted patch scenario. IOW: I
think this needs
https://bugzilla.redhat.com/show_bug.cgi?id=1026136 to
be reopened...
Might have been "nice" to indicate/summarize what this algorithm does as
opposed to the other. I think you started at the end of the first
paragraph, but I'm not 100% sure - I guess it's easier for me if it's
explicitly said, such as:
In virNetClientStreamQueuePacket instead of ...
In virNetClientStreamRecvPacket instead of ...
instead of implicitly said if you know the code.
The functions are just tough to read without (more) knowledge (than I
have about them) of how they are designed to function. Since he had a
hand in the above bug, hopefully Martin can take a look at this patch.
Signed-off-by: Michal Privoznik <mprivozn(a)redhat.com>
---
src/rpc/virnetclientstream.c | 106 ++++++++++++++++++++++---------------------
1 file changed, 54 insertions(+), 52 deletions(-)
diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c
index b428f4b..34989a9 100644
--- a/src/rpc/virnetclientstream.c
+++ b/src/rpc/virnetclientstream.c
@@ -49,9 +49,7 @@ struct _virNetClientStream {
* time by stopping consuming any incoming data
* off the socket....
*/
- char *incoming;
- size_t incomingOffset;
- size_t incomingLength;
+ virNetMessagePtr rx;
bool incomingEOF;
virNetClientStreamEventCallback cb;
@@ -86,9 +84,9 @@ virNetClientStreamEventTimerUpdate(virNetClientStreamPtr st)
if (!st->cb)
return;
- VIR_DEBUG("Check timer offset=%zu %d", st->incomingOffset,
st->cbEvents);
+ VIR_DEBUG("Check timer rx=%p cbEvents=%d", st->rx, st->cbEvents);
- if (((st->incomingOffset || st->incomingEOF) &&
+ if (((st->rx || st->incomingEOF) &&
(st->cbEvents & VIR_STREAM_EVENT_READABLE)) ||
(st->cbEvents & VIR_STREAM_EVENT_WRITABLE)) {
VIR_DEBUG("Enabling event timer");
@@ -110,13 +108,13 @@ virNetClientStreamEventTimer(int timer ATTRIBUTE_UNUSED, void
*opaque)
if (st->cb &&
(st->cbEvents & VIR_STREAM_EVENT_READABLE) &&
- (st->incomingOffset || st->incomingEOF))
+ (st->rx || st->incomingEOF))
events |= VIR_STREAM_EVENT_READABLE;
if (st->cb &&
(st->cbEvents & VIR_STREAM_EVENT_WRITABLE))
events |= VIR_STREAM_EVENT_WRITABLE;
- VIR_DEBUG("Got Timer dispatch %d %d offset=%zu", events, st->cbEvents,
st->incomingOffset);
+ VIR_DEBUG("Got Timer dispatch events=%d cbEvents=%d rx=%p", events,
st->cbEvents, st->rx);
if (events) {
virNetClientStreamEventCallback cb = st->cb;
void *cbOpaque = st->cbOpaque;
@@ -161,7 +159,11 @@ void virNetClientStreamDispose(void *obj)
virNetClientStreamPtr st = obj;
virResetError(&st->err);
- VIR_FREE(st->incoming);
+ while (st->rx) {
+ virNetMessagePtr msg = st->rx;
+ virNetMessageQueueServe(&st->rx);
+ virNetMessageFree(msg);
+ }
virObjectUnref(st->prog);
}
@@ -264,41 +266,34 @@ int virNetClientStreamSetError(virNetClientStreamPtr st,
int virNetClientStreamQueuePacket(virNetClientStreamPtr st,
virNetMessagePtr msg)
{
- int ret = -1;
- size_t need;
+ virNetMessagePtr tmp_msg;
+
+ VIR_DEBUG("Incoming stream message: stream=%p message=%p", st, msg);
+
+ /* Unfortunately, we must allocate new message as the one we
+ * get in @msg is going to be cleared later in the process. */
+
+ if (!(tmp_msg = virNetMessageNew(false)))
+ return -1;
+
+ /* Copy header */
+ memcpy(&tmp_msg->header, &msg->header, sizeof(msg->header));
+
+ /* Steal message buffer */
+ tmp_msg->buffer = msg->buffer;
+ tmp_msg->bufferLength = msg->bufferLength;
+ tmp_msg->bufferOffset = msg->bufferOffset;
+ msg->buffer = NULL;
+ msg->bufferLength = msg->bufferOffset = 0;
virObjectLock(st);
- need = msg->bufferLength - msg->bufferOffset;
- if (need) {
- size_t avail = st->incomingLength - st->incomingOffset;
- if (need > avail) {
- size_t extra = need - avail;
- if (VIR_REALLOC_N(st->incoming,
- st->incomingLength + extra) < 0) {
- VIR_DEBUG("Out of memory handling stream data");
- goto cleanup;
- }
- st->incomingLength += extra;
- }
- memcpy(st->incoming + st->incomingOffset,
- msg->buffer + msg->bufferOffset,
- msg->bufferLength - msg->bufferOffset);
- st->incomingOffset += (msg->bufferLength - msg->bufferOffset);
- } else {
- st->incomingEOF = true;
- }
+ virNetMessageQueuePush(&st->rx, tmp_msg);
- VIR_DEBUG("Stream incoming data offset %zu length %zu EOF %d",
- st->incomingOffset, st->incomingLength,
- st->incomingEOF);
virNetClientStreamEventTimerUpdate(st);
- ret = 0;
-
- cleanup:
virObjectUnlock(st);
- return ret;
+ return 0;
}
@@ -362,10 +357,12 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st,
bool nonblock)
{
int rv = -1;
+ size_t want;
+
VIR_DEBUG("st=%p client=%p data=%p nbytes=%zu nonblock=%d",
st, client, data, nbytes, nonblock);
virObjectLock(st);
- if (!st->incomingOffset && !st->incomingEOF) {
+ if (!st->rx && !st->incomingEOF) {
virNetMessagePtr msg;
int ret;
@@ -395,23 +392,28 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st,
goto cleanup;
}
- VIR_DEBUG("After IO %zu", st->incomingOffset);
- if (st->incomingOffset) {
- int want = st->incomingOffset;
- if (want > nbytes)
- want = nbytes;
- memcpy(data, st->incoming, want);
- if (want < st->incomingOffset) {
- memmove(st->incoming, st->incoming + want, st->incomingOffset -
want);
- st->incomingOffset -= want;
- } else {
- VIR_FREE(st->incoming);
- st->incomingOffset = st->incomingLength = 0;
+ VIR_DEBUG("After IO rx=%p", st->rx);
+ want = nbytes;
+ while (want && st->rx) {
So if 'st->rx == NULL', then 'rv = nbytes - want;' or 0 - I assume
that
is 'expected'...
+ virNetMessagePtr msg = st->rx;
+ size_t len = want;
+
+ if (len > msg->bufferLength - msg->bufferOffset)
+ len = msg->bufferLength - msg->bufferOffset;
+
+ if (!len)
+ break;
+
+ memcpy(data + (nbytes - want), msg->buffer + msg->bufferOffset, len);
+ want -= len;
+ msg->bufferOffset += len;
+
+ if (msg->bufferOffset == msg->bufferLength) {
+ virNetMessageQueueServe(&st->rx);
+ virNetMessageFree(msg);
Nothing needs to be done with want here? I guess this shows my lack of
depth of understanding of these algorithms... Big black box that I hope
works without me needing to intervene!
John
}
- rv = want;
- } else {
- rv = 0;
}
+ rv = nbytes - want;
virNetClientStreamEventTimerUpdate(st);