On 04/21/2016 10:28 AM, Michal Privoznik wrote:
On 20.04.2016 15:57, John Ferlan wrote:
>
>
> On 04/15/2016 09:51 AM, Michal Privoznik wrote:
>> Currently we have two separate functions for handling read from
>> a stream. One is supposed to be low level and reads data in this
>> self allocating chunk of memory. The other read function then
>> copies data over from the chunk into a user buffer. There are two
>> memcpy() involved even though a single would be sufficient.
>> Moreover, since we are copying just data, we can't process
>> alternative stream packets in the latter function, like stream
>> seeks.
>>
>> In my testing, this proved two times faster then implementation
>
> s/then/than the/
>
>> which uses IO vectors.
>
> Can I "assume" this testing covers the reverted patch scenario. IOW: I
> think this needs
https://bugzilla.redhat.com/show_bug.cgi?id=1026136 to
> be reopened...
This showed two times faster than even IO vectors implementation.
>
> Might have been "nice" to indicate/summarize what this algorithm does as
> opposed to the other. I think you started at the end of the first
> paragraph, but I'm not 100% sure - I guess it's easier for me if it's
> explicitly said, such as:
>
> In virNetClientStreamQueuePacket instead of ...
>
> In virNetClientStreamRecvPacket instead of ...
>
> instead of implicitly said if you know the code.
Something like this?
There are two functions on the client that handle incoming stream data.
The first one virNetClientStreamQueuePacket() is a low level function
that just process the incoming stream data from the socket and store it
...just processes ... and stores ...
into an internal structure. This happens in the client event loop
therefore the shorter the callbacks are, the better. The second function
virNetClientStreamRecvPacket() then handles copying data from internal
structure into a client provided buffer. Change introduced in this
New paragraph before "Change"
commit makes just that: new queue for incoming stream packets is
...a new receive (rx) queue...
introduced. Then instead of copying data into intermediate internal
buffer and then copying them into user buffer, incoming stream messages
are enqueued into the queue and data are copied just once - in the upper
... are queue... ... data is copied...
layer function virNetClientStreamRecvPacket(). In the end,
there's just
one copying of data and therefore shorter event loop callback. This
should boost the performance which has proven to be the case in my testing.
Having said that, I don't think there's any need for reopening the bug
since we are not hurting performance here.
The only reason I suggested is I think technically the revert makes the
previous changes essentially NULL and void. Since that commit was
connected with a bug #, I just wanted to be sure "process wise" we're
covered... It's not that important though.
OK... I see , instead of allocating and copying data from incoming
stream socket into a buffer to only be copied again into the client
buffer, we'll "steal" the entire buffer destined for the client from the
incoming stream socket and then create a queue for the client to copy -
seems OK to me...
ACK for 7-8
John
BTW:
I know it's existing, but virNetMessageQueueServe caused me to go look
and see it's really a virNetMessageQueuePop to complement the
virNetMessageQueuePush (sigh)
>
> The functions are just tough to read without (more) knowledge (than I
> have about them) of how they are designed to function. Since he had a
> hand in the above bug, hopefully Martin can take a look at this patch.
>
>>
>> Signed-off-by: Michal Privoznik <mprivozn(a)redhat.com>
>> ---
>> src/rpc/virnetclientstream.c | 106 ++++++++++++++++++++++---------------------
>> 1 file changed, 54 insertions(+), 52 deletions(-)
>>
>> diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c
>> index b428f4b..34989a9 100644
>> --- a/src/rpc/virnetclientstream.c
>> +++ b/src/rpc/virnetclientstream.c
>> @@ -49,9 +49,7 @@ struct _virNetClientStream {
>> * time by stopping consuming any incoming data
>> * off the socket....
>> */
>> - char *incoming;
>> - size_t incomingOffset;
>> - size_t incomingLength;
>> + virNetMessagePtr rx;
>> bool incomingEOF;
>>
>> virNetClientStreamEventCallback cb;
>> @@ -86,9 +84,9 @@ virNetClientStreamEventTimerUpdate(virNetClientStreamPtr st)
>> if (!st->cb)
>> return;
>>
>> - VIR_DEBUG("Check timer offset=%zu %d", st->incomingOffset,
st->cbEvents);
>> + VIR_DEBUG("Check timer rx=%p cbEvents=%d", st->rx,
st->cbEvents);
>>
>> - if (((st->incomingOffset || st->incomingEOF) &&
>> + if (((st->rx || st->incomingEOF) &&
>> (st->cbEvents & VIR_STREAM_EVENT_READABLE)) ||
>> (st->cbEvents & VIR_STREAM_EVENT_WRITABLE)) {
>> VIR_DEBUG("Enabling event timer");
>> @@ -110,13 +108,13 @@ virNetClientStreamEventTimer(int timer ATTRIBUTE_UNUSED,
void *opaque)
>>
>> if (st->cb &&
>> (st->cbEvents & VIR_STREAM_EVENT_READABLE) &&
>> - (st->incomingOffset || st->incomingEOF))
>> + (st->rx || st->incomingEOF))
>> events |= VIR_STREAM_EVENT_READABLE;
>> if (st->cb &&
>> (st->cbEvents & VIR_STREAM_EVENT_WRITABLE))
>> events |= VIR_STREAM_EVENT_WRITABLE;
>>
>> - VIR_DEBUG("Got Timer dispatch %d %d offset=%zu", events,
st->cbEvents, st->incomingOffset);
>> + VIR_DEBUG("Got Timer dispatch events=%d cbEvents=%d rx=%p",
events, st->cbEvents, st->rx);
>> if (events) {
>> virNetClientStreamEventCallback cb = st->cb;
>> void *cbOpaque = st->cbOpaque;
>> @@ -161,7 +159,11 @@ void virNetClientStreamDispose(void *obj)
>> virNetClientStreamPtr st = obj;
>>
>> virResetError(&st->err);
>> - VIR_FREE(st->incoming);
>> + while (st->rx) {
>> + virNetMessagePtr msg = st->rx;
>> + virNetMessageQueueServe(&st->rx);
>> + virNetMessageFree(msg);
>> + }
>> virObjectUnref(st->prog);
>> }
>>
>> @@ -264,41 +266,34 @@ int virNetClientStreamSetError(virNetClientStreamPtr st,
>> int virNetClientStreamQueuePacket(virNetClientStreamPtr st,
>> virNetMessagePtr msg)
>> {
>> - int ret = -1;
>> - size_t need;
>> + virNetMessagePtr tmp_msg;
>> +
>> + VIR_DEBUG("Incoming stream message: stream=%p message=%p", st,
msg);
>> +
>> + /* Unfortunately, we must allocate new message as the one we
>> + * get in @msg is going to be cleared later in the process. */
>> +
>> + if (!(tmp_msg = virNetMessageNew(false)))
>> + return -1;
>> +
>> + /* Copy header */
>> + memcpy(&tmp_msg->header, &msg->header,
sizeof(msg->header));
>> +
>> + /* Steal message buffer */
>> + tmp_msg->buffer = msg->buffer;
>> + tmp_msg->bufferLength = msg->bufferLength;
>> + tmp_msg->bufferOffset = msg->bufferOffset;
>> + msg->buffer = NULL;
>> + msg->bufferLength = msg->bufferOffset = 0;
>>
>> virObjectLock(st);
>> - need = msg->bufferLength - msg->bufferOffset;
>> - if (need) {
>> - size_t avail = st->incomingLength - st->incomingOffset;
>> - if (need > avail) {
>> - size_t extra = need - avail;
>> - if (VIR_REALLOC_N(st->incoming,
>> - st->incomingLength + extra) < 0) {
>> - VIR_DEBUG("Out of memory handling stream data");
>> - goto cleanup;
>> - }
>> - st->incomingLength += extra;
>> - }
>>
>> - memcpy(st->incoming + st->incomingOffset,
>> - msg->buffer + msg->bufferOffset,
>> - msg->bufferLength - msg->bufferOffset);
>> - st->incomingOffset += (msg->bufferLength - msg->bufferOffset);
>> - } else {
>> - st->incomingEOF = true;
>> - }
>> + virNetMessageQueuePush(&st->rx, tmp_msg);
>>
>> - VIR_DEBUG("Stream incoming data offset %zu length %zu EOF %d",
>> - st->incomingOffset, st->incomingLength,
>> - st->incomingEOF);
>> virNetClientStreamEventTimerUpdate(st);
>>
>> - ret = 0;
>> -
>> - cleanup:
>> virObjectUnlock(st);
>> - return ret;
>> + return 0;
>> }
>>
>>
>> @@ -362,10 +357,12 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st,
>> bool nonblock)
>> {
>> int rv = -1;
>> + size_t want;
>> +
>> VIR_DEBUG("st=%p client=%p data=%p nbytes=%zu nonblock=%d",
>> st, client, data, nbytes, nonblock);
>> virObjectLock(st);
>> - if (!st->incomingOffset && !st->incomingEOF) {
>> + if (!st->rx && !st->incomingEOF) {
>> virNetMessagePtr msg;
>> int ret;
>>
>> @@ -395,23 +392,28 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st,
>> goto cleanup;
>> }
>>
>> - VIR_DEBUG("After IO %zu", st->incomingOffset);
>> - if (st->incomingOffset) {
>> - int want = st->incomingOffset;
>> - if (want > nbytes)
>> - want = nbytes;
>> - memcpy(data, st->incoming, want);
>> - if (want < st->incomingOffset) {
>> - memmove(st->incoming, st->incoming + want,
st->incomingOffset - want);
>> - st->incomingOffset -= want;
>> - } else {
>> - VIR_FREE(st->incoming);
>> - st->incomingOffset = st->incomingLength = 0;
>> + VIR_DEBUG("After IO rx=%p", st->rx);
>> + want = nbytes;
>> + while (want && st->rx) {
>
> So if 'st->rx == NULL', then 'rv = nbytes - want;' or 0 - I assume
that
> is 'expected'...
Yes. Calling virStreamRecv() on client side will basically boil down to
calling this function. And return value of this function will become
return value of the wrapper. As described in the docs, virStreamRecv()
and this virNetClientStreamRecvPacket() returns number of bytes read
from stream. In case there's no incoming data, there's nothing we can
read from and therefore we should return 0.
>
>> + virNetMessagePtr msg = st->rx;
>> + size_t len = want;
>> +
>> + if (len > msg->bufferLength - msg->bufferOffset)
>> + len = msg->bufferLength - msg->bufferOffset;
>> +
>> + if (!len)
>> + break;
>> +
>> + memcpy(data + (nbytes - want), msg->buffer + msg->bufferOffset,
len);
>> + want -= len;
>> + msg->bufferOffset += len;
>> +
>> + if (msg->bufferOffset == msg->bufferLength) {
>> + virNetMessageQueueServe(&st->rx);
>> + virNetMessageFree(msg);
>
> Nothing needs to be done with want here? I guess this shows my lack of
> depth of understanding of these algorithms... Big black box that I hope
> works without me needing to intervene!
No. This does nothing more than: if the head of linked list of incoming
stream messages is fully read (*), then pop the message at the head and
move to the other message in the queue (list). In that case, I haven't
copied any data to user, therefore I should not change @want.
(*) - It may happen, that users will read less bytes than there is in
incoming message. For instance, incoming stream packet (message) can be
1024 bytes in size, but user will read 1 byte at the time from stream.
Hope my explanation makes it clear(-er) to you.
Michal