Whenever server sends a client stream packet (either regular with
actual data or stream skip one) it is queued on @st->rx. So the
list is a mixture of both types of stream packets. So now that we
have all the helpers needed we can wire their processing up. But
since virNetClientStreamRecvPacket doesn't support
VIR_STREAM_RECV_STOP_AT_HOLE flag yet, let's turn all received
skips into zeroes repeating requested times.
Signed-off-by: Michal Privoznik <mprivozn(a)redhat.com>
---
src/rpc/virnetclientstream.c | 45 ++++++++++++++++++++++++++++++++++++++++++--
1 file changed, 43 insertions(+), 2 deletions(-)
diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c
index bf3922cb5..75ec3f57f 100644
--- a/src/rpc/virnetclientstream.c
+++ b/src/rpc/virnetclientstream.c
@@ -295,6 +295,8 @@ int virNetClientStreamQueuePacket(virNetClientStreamPtr st,
virObjectLock(st);
+ /* Don't distinguish VIR_NET_STREAM and VIR_NET_STREAM_SKIP
+ * here just yet. We want in order processing! */
virNetMessageQueuePush(&st->rx, tmp_msg);
virNetClientStreamEventTimerUpdate(st);
@@ -390,7 +392,7 @@ virNetClientStreamSetHole(virNetClientStreamPtr st,
* Returns: 0 on success,
* -1 otherwise.
*/
-static int ATTRIBUTE_UNUSED
+static int
virNetClientStreamHandleHole(virNetClientPtr client,
virNetClientStreamPtr st)
{
@@ -468,6 +470,8 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st,
virCheckFlags(0, -1);
virObjectLock(st);
+
+ reread:
if (!st->rx && !st->incomingEOF) {
virNetMessagePtr msg;
int ret;
@@ -499,8 +503,45 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st,
}
VIR_DEBUG("After IO rx=%p", st->rx);
+
+ if (st->rx &&
+ st->rx->header.type == VIR_NET_STREAM_HOLE &&
+ st->holeLength == 0) {
+ /* Handle skip sent to us by server. */
+
+ if (virNetClientStreamHandleHole(client, st) < 0)
+ goto cleanup;
+ }
+
+ if (!st->rx && !st->incomingEOF && !st->holeLength) {
+ if (nonblock) {
+ VIR_DEBUG("Non-blocking mode and no data available");
+ rv = -2;
+ goto cleanup;
+ }
+
+ /* We have consumed all packets from incoming queue but those
+ * were only skip packets, no data. Read the stream again. */
+ goto reread;
+ }
+
want = nbytes;
- while (want && st->rx) {
+
+ if (st->holeLength) {
+ /* Pretend holeLength zeroes was read from stream. */
+ size_t len = want;
+
+ if (len > st->holeLength)
+ len = st->holeLength;
+
+ memset(data, 0, len);
+ st->holeLength -= len;
+ want -= len;
+ }
+
+ while (want &&
+ st->rx &&
+ st->rx->header.type == VIR_NET_STREAM) {
virNetMessagePtr msg = st->rx;
size_t len = want;
--
2.13.0