On 04/20/2017 06:01 AM, Michal Privoznik wrote:
Whenever server sends a client stream packet (either regular with
actual data or stream skip one) it is queued on @st->rx. So the
list is a mixture of both types of stream packets. So now that we
have all the helpers needed we can wire their processing up. But
since virNetClientStreamRecvPacket doesn't support
VIR_STREAM_RECV_STOP_AT_HOLE flag yet, let's turn all received
skips into zeroes repeating requested times.
Up to this point - I thought I had a good idea of what was going on, but
this patch loses me.
I thought there was an "ordered" message queue to be processed...
ordered in so much as we're "reading" a file and sending either a
'data'
segment w/ data or a 'skip' segment with some number of bytes to skip.
Where it could be:
start...skip...data...[skip...data...]end
start...data...skip...[data...skip...]end
start...data...end
start...skip...end
So why does the code process and sum up the skips?
Is this because of some implementation detail (that I already forgot) of
the message queue where signals are done after each "data..." or
"skip...", so it won't matter?
Why not have everything you need in place before you wire this up -
patch 25 and 30 seem to be able to go after "most of" patch 31.
John
I'm going to stop here.
Signed-off-by: Michal Privoznik <mprivozn(a)redhat.com>
---
src/rpc/virnetclientstream.c | 44 ++++++++++++++++++++++++++++++++++++++++++--
1 file changed, 42 insertions(+), 2 deletions(-)
diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c
index c773524..ff35137 100644
--- a/src/rpc/virnetclientstream.c
+++ b/src/rpc/virnetclientstream.c
@@ -296,6 +296,8 @@ int virNetClientStreamQueuePacket(virNetClientStreamPtr st,
virObjectLock(st);
+ /* Don't distinguish VIR_NET_STREAM and VIR_NET_STREAM_SKIP
+ * here just yet. We want in order processing! */
virNetMessageQueuePush(&st->rx, tmp_msg);
virNetClientStreamEventTimerUpdate(st);
@@ -359,7 +361,7 @@ int virNetClientStreamSendPacket(virNetClientStreamPtr st,
}
-static int ATTRIBUTE_UNUSED
+static int
virNetClientStreamHandleSkip(virNetClientPtr client,
virNetClientStreamPtr st)
{
@@ -435,6 +437,8 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st,
virCheckFlags(0, -1);
virObjectLock(st);
+
+ reread:
if (!st->rx && !st->incomingEOF) {
virNetMessagePtr msg;
int ret;
@@ -466,8 +470,44 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st,
}
VIR_DEBUG("After IO rx=%p", st->rx);
+
+ while (st->rx &&
+ st->rx->header.type == VIR_NET_STREAM_SKIP) {
+ /* Handle skip sent to us by server. */
+
+ if (virNetClientStreamHandleSkip(client, st) < 0)
+ goto cleanup;
+ }
+
+ if (!st->rx && !st->incomingEOF && !st->skipLength) {
+ if (nonblock) {
+ VIR_DEBUG("Non-blocking mode and no data available");
+ rv = -2;
+ goto cleanup;
+ }
+
+ /* We have consumed all packets from incoming queue but those
+ * were only skip packets, no data. Read the stream again. */
+ goto reread;
+ }
+
want = nbytes;
- while (want && st->rx) {
+
+ if (st->skipLength) {
+ /* Pretend skipLength zeroes was read from stream. */
+ size_t len = want;
+
+ if (len > st->skipLength)
+ len = st->skipLength;
+
+ memset(data, 0, len);
+ st->skipLength -= len;
+ want -= len;
+ }
+
+ while (want &&
+ st->rx &&
+ st->rx->header.type == VIR_NET_STREAM) {
virNetMessagePtr msg = st->rx;
size_t len = want;