This is a function that handles an incoming STREAM_HOLE packet.
Even though it is not wired up yet, it will be soon. At the
beginning do couple of checks whether server plays nicely and
sent us a STREAM_HOLE packed only after we've enabled sparse
streams. Then decodes the message payload to see how big the hole
is and stores it in passed @length argument.
Signed-off-by: Michal Privoznik <mprivozn(a)redhat.com>
---
src/rpc/virnetclientstream.c | 96 ++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 96 insertions(+)
diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c
index 9005e6be9..2f4f92a96 100644
--- a/src/rpc/virnetclientstream.c
+++ b/src/rpc/virnetclientstream.c
@@ -55,6 +55,7 @@ struct _virNetClientStream {
bool incomingEOF;
bool allowSkip;
+ long long holeLength; /* Size of incoming hole in stream. */
virNetClientStreamEventCallback cb;
void *cbOpaque;
@@ -356,6 +357,101 @@ int virNetClientStreamSendPacket(virNetClientStreamPtr st,
return -1;
}
+
+static int
+virNetClientStreamSetHole(virNetClientStreamPtr st,
+ long long length,
+ unsigned int flags)
+{
+ virCheckFlags(0, -1);
+
+ /* Shouldn't happen, But it's better to safe than sorry. */
+ if (st->holeLength) {
+ virReportError(VIR_ERR_INTERNAL_ERROR,
+ _("unprocessed hole of size %lld already in the
queue"),
+ st->holeLength);
+ return -1;
+ }
+
+ st->holeLength += length;
+ return 0;
+}
+
+
+/**
+ * virNetClientStreamHandleHole:
+ * @client: client
+ * @st: stream
+ *
+ * Called whenever current message processed in the stream is
+ * VIR_NET_STREAM_HOLE. The stream @st is expected to be locked
+ * already.
+ *
+ * Returns: 0 on success,
+ * -1 otherwise.
+ */
+static int ATTRIBUTE_UNUSED
+virNetClientStreamHandleHole(virNetClientPtr client,
+ virNetClientStreamPtr st)
+{
+ virNetMessagePtr msg;
+ virNetStreamHole data;
+ int ret = -1;
+
+ VIR_DEBUG("client=%p st=%p", client, st);
+
+ msg = st->rx;
+ memset(&data, 0, sizeof(data));
+
+ /* We should not be called unless there's VIR_NET_STREAM_HOLE
+ * message at the head of the list. But doesn't hurt to check */
+ if (!msg) {
+ virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
+ _("No message in the queue"));
+ goto cleanup;
+ }
+
+ if (msg->header.type != VIR_NET_STREAM_HOLE) {
+ virReportError(VIR_ERR_INTERNAL_ERROR,
+ _("Invalid message prog=%d type=%d serial=%u proc=%d"),
+ msg->header.prog,
+ msg->header.type,
+ msg->header.serial,
+ msg->header.proc);
+ goto cleanup;
+ }
+
+ /* Server should not send us VIR_NET_STREAM_HOLE unless we
+ * have requested so. But does not hurt to check ... */
+ if (!st->allowSkip) {
+ virReportError(VIR_ERR_RPC, "%s",
+ _("Unexpected stream hole"));
+ goto cleanup;
+ }
+
+ if (virNetMessageDecodePayload(msg,
+ (xdrproc_t) xdr_virNetStreamHole,
+ &data) < 0) {
+ virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
+ _("Malformed stream hole packet"));
+ goto cleanup;
+ }
+
+ virNetMessageQueueServe(&st->rx);
+ virNetMessageFree(msg);
+
+ if (virNetClientStreamSetHole(st, data.length, data.flags) < 0)
+ goto cleanup;
+
+ ret = 0;
+ cleanup:
+ if (ret < 0) {
+ /* Abort stream? */
+ }
+ return ret;
+}
+
+
int virNetClientStreamRecvPacket(virNetClientStreamPtr st,
virNetClientPtr client,
char *data,
--
2.13.0