Whenever server sends a client stream packet (either regular with
actual data or stream skip one) it is queued on @st->rx. So the
list is a mixture of both types of stream packets. So now that we
have all the helpers needed we can wire their processing up. But
since virNetClientStreamRecvPacket doesn't support
VIR_STREAM_RECV_STOP_AT_HOLE flag yet, let's turn all received
skips into zeroes repeating requested times.
Signed-off-by: Michal Privoznik <mprivozn(a)redhat.com>
---
src/rpc/virnetclientstream.c | 44 ++++++++++++++++++++++++++++++++++++++++++--
1 file changed, 42 insertions(+), 2 deletions(-)
diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c
index 37ce257..f63072e 100644
--- a/src/rpc/virnetclientstream.c
+++ b/src/rpc/virnetclientstream.c
@@ -298,6 +298,8 @@ int virNetClientStreamQueuePacket(virNetClientStreamPtr st,
virObjectLock(st);
+ /* Don't distinguish VIR_NET_STREAM and VIR_NET_STREAM_SKIP
+ * here just yet. We want in order processing! */
virNetMessageQueuePush(&st->rx, tmp_msg);
virNetClientStreamEventTimerUpdate(st);
@@ -361,7 +363,7 @@ int virNetClientStreamSendPacket(virNetClientStreamPtr st,
}
-static int ATTRIBUTE_UNUSED
+static int
virNetClientStreamHandleSkip(virNetClientPtr client,
virNetClientStreamPtr st)
{
@@ -437,6 +439,8 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st,
virCheckFlags(0, -1);
virObjectLock(st);
+
+ reread:
if (!st->rx && !st->incomingEOF) {
virNetMessagePtr msg;
int ret;
@@ -468,8 +472,44 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st,
}
VIR_DEBUG("After IO rx=%p", st->rx);
+
+ while (st->rx &&
+ st->rx->header.type == VIR_NET_STREAM_SKIP) {
+ /* Handle skip sent to us by server. */
+
+ if (virNetClientStreamHandleSkip(client, st) < 0)
+ goto cleanup;
+ }
+
+ if (!st->rx && !st->incomingEOF && !st->skipLength) {
+ if (nonblock) {
+ VIR_DEBUG("Non-blocking mode and no data available");
+ rv = -2;
+ goto cleanup;
+ }
+
+ /* We have consumed all packets from incoming queue but those
+ * were only skip packets, no data. Read the stream again. */
+ goto reread;
+ }
+
want = nbytes;
- while (want && st->rx) {
+
+ if (st->skipLength) {
+ /* Pretend skipLength zeroes was read from stream. */
+ size_t len = want;
+
+ if (len > st->skipLength)
+ len = st->skipLength;
+
+ memset(data, 0, len);
+ st->skipLength -= len;
+ want -= len;
+ }
+
+ while (want &&
+ st->rx &&
+ st->rx->header.type == VIR_NET_STREAM) {
virNetMessagePtr msg = st->rx;
size_t len = want;
--
2.8.4