On 05/05/2017 05:43 PM, John Ferlan wrote:
On 04/20/2017 06:01 AM, Michal Privoznik wrote:
> Whenever server sends a client stream packet (either regular with
> actual data or stream skip one) it is queued on @st->rx. So the
> list is a mixture of both types of stream packets. So now that we
> have all the helpers needed we can wire their processing up. But
> since virNetClientStreamRecvPacket doesn't support
> VIR_STREAM_RECV_STOP_AT_HOLE flag yet, let's turn all received
> skips into zeroes repeating requested times.
>
Up to this point - I thought I had a good idea of what was going on, but
this patch loses me.
I thought there was an "ordered" message queue to be processed...
ordered in so much as we're "reading" a file and sending either a
'data'
segment w/ data or a 'skip' segment with some number of bytes to skip.
Where it could be:
start...skip...data...[skip...data...]end
start...data...skip...[data...skip...]end
start...data...end
start...skip...end
So why does the code process and sum up the skips?
Because our streams are slightly more generic than just a file transfer.
I mean, we use it for just any data transfer. Therefore it could be used
even for this
start -> data -> skip -> skip -> data -> end.
You won't have this with regular files stored on a disk, but then again,
virStream can be used (and is used) for generic data transfer.
vol-download an vol-upload is just a subset of its usage.
Now, with that example, it makes sense to merge those skips into one
big, doesn't it? But I agree, it looks a bit weird. I can drop the
merging - it's not a performance issue. I was just trying to save caller
couple of calls to our APIs.
Is this because of some implementation detail (that I already forgot) of
the message queue where signals are done after each "data..." or
"skip...", so it won't matter?
Why not have everything you need in place before you wire this up -
patch 25 and 30 seem to be able to go after "most of" patch 31.
John
I'm going to stop here.
Fair enough. I'm gonna send v2 with most of your review points worked
in. Except, for now I'm still gonna use Skip() and HoleSize() - changing
that now would be a non-trivial piece of work and thus I'd like to do it
just once, when we have clear agreement on naming. Hope you understand that.
> Signed-off-by: Michal Privoznik <mprivozn(a)redhat.com>
> ---
> src/rpc/virnetclientstream.c | 44 ++++++++++++++++++++++++++++++++++++++++++--
> 1 file changed, 42 insertions(+), 2 deletions(-)
>
> diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c
> index c773524..ff35137 100644
> --- a/src/rpc/virnetclientstream.c
> +++ b/src/rpc/virnetclientstream.c
> @@ -296,6 +296,8 @@ int virNetClientStreamQueuePacket(virNetClientStreamPtr st,
>
> virObjectLock(st);
>
> + /* Don't distinguish VIR_NET_STREAM and VIR_NET_STREAM_SKIP
> + * here just yet. We want in order processing! */
> virNetMessageQueuePush(&st->rx, tmp_msg);
>
> virNetClientStreamEventTimerUpdate(st);
> @@ -359,7 +361,7 @@ int virNetClientStreamSendPacket(virNetClientStreamPtr st,
> }
>
>
> -static int ATTRIBUTE_UNUSED
> +static int
> virNetClientStreamHandleSkip(virNetClientPtr client,
> virNetClientStreamPtr st)
> {
> @@ -435,6 +437,8 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st,
> virCheckFlags(0, -1);
>
> virObjectLock(st);
> +
> + reread:
> if (!st->rx && !st->incomingEOF) {
> virNetMessagePtr msg;
> int ret;
> @@ -466,8 +470,44 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st,
> }
>
> VIR_DEBUG("After IO rx=%p", st->rx);
> +
> + while (st->rx &&
> + st->rx->header.type == VIR_NET_STREAM_SKIP) {
> + /* Handle skip sent to us by server. */
> +
> + if (virNetClientStreamHandleSkip(client, st) < 0)
> + goto cleanup;
You can see my reasoning from above in effect here.
> + }
> +
> + if (!st->rx && !st->incomingEOF && !st->skipLength) {
> + if (nonblock) {
> + VIR_DEBUG("Non-blocking mode and no data available");
> + rv = -2;
> + goto cleanup;
> + }
> +
> + /* We have consumed all packets from incoming queue but those
> + * were only skip packets, no data. Read the stream again. */
> + goto reread;
> + }
> +
> want = nbytes;
> - while (want && st->rx) {
> +
> + if (st->skipLength) {
> + /* Pretend skipLength zeroes was read from stream. */
> + size_t len = want;
> +
> + if (len > st->skipLength)
> + len = st->skipLength;
> +
> + memset(data, 0, len);
> + st->skipLength -= len;
> + want -= len;
> + }
> +
> + while (want &&
> + st->rx &&
> + st->rx->header.type == VIR_NET_STREAM) {
> virNetMessagePtr msg = st->rx;
> size_t len = want;
>
>
Michal