Whenever server sends a client stream packet (either regular with
actual data or stream skip one) it is queued on @st->rx. So the
list is a mixture of both types of stream packets. So now that we
have all the helpers needed we can wire their processing up.
Signed-off-by: Michal Privoznik <mprivozn(a)redhat.com>
---
src/rpc/virnetclientstream.c | 37 +++++++++++++++++++++++++++++++++++--
1 file changed, 35 insertions(+), 2 deletions(-)
diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c
index e4dee9b..65a18c9 100644
--- a/src/rpc/virnetclientstream.c
+++ b/src/rpc/virnetclientstream.c
@@ -297,6 +297,8 @@ int virNetClientStreamQueuePacket(virNetClientStreamPtr st,
virObjectLock(st);
+ /* Don't distinguish VIR_NET_STREAM and VIR_NET_STREAM_SKIP
+ * here just yet. We want in order processing! */
virNetMessageQueuePush(&st->rx, tmp_msg);
virNetClientStreamEventTimerUpdate(st);
@@ -360,7 +362,7 @@ int virNetClientStreamSendPacket(virNetClientStreamPtr st,
}
-static int ATTRIBUTE_UNUSED
+static int
virNetClientStreamHandleSkip(virNetClientPtr client,
virNetClientStreamPtr st)
{
@@ -433,6 +435,8 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st,
VIR_DEBUG("st=%p client=%p data=%p nbytes=%zu nonblock=%d",
st, client, data, nbytes, nonblock);
virObjectLock(st);
+
+ reread:
if (!st->rx && !st->incomingEOF) {
virNetMessagePtr msg;
int ret;
@@ -464,8 +468,37 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st,
}
VIR_DEBUG("After IO rx=%p", st->rx);
+
+ while (st->rx &&
+ st->rx->header.type == VIR_NET_STREAM_SKIP) {
+ /* Handle skip sent to us by server. Moreover, if client
+ * lacks event loop, this is only chance for us to
+ * process the skip. Therefore we should:
+ * a) process it,
+ * b) carry on with regular read from stream (if possible
+ * of course).
+ */
+
+ if (virNetClientStreamHandleSkip(client, st) < 0)
+ goto cleanup;
+ }
+
+ if (!st->rx && !st->incomingEOF) {
+ if (nonblock) {
+ VIR_DEBUG("Non-blocking mode and no data available");
+ rv = -2;
+ goto cleanup;
+ }
+
+ /* We have consumed all packets from incoming queue but those
+ * were only skip packets, no data. Read the stream again. */
+ goto reread;
+ }
+
want = nbytes;
- while (want && st->rx) {
+ while (want &&
+ st->rx &&
+ st->rx->header.type == VIR_NET_STREAM) {
virNetMessagePtr msg = st->rx;
size_t len = want;
--
2.8.1