[PATCH 0/3] split daemon: Support sparse streams

See 3/3 for explanation. Michal Prívozník (3): virStreamInData: Allow callback to not rewind the stream rpc: Introduce virNetClientStreamInData() remote_driver: Implement virStreamInData() callback src/libvirt-stream.c | 9 ++++- src/libvirt_remote.syms | 1 + src/remote/remote_daemon_stream.c | 8 ++-- src/remote/remote_driver.c | 26 +++++++++++++ src/rpc/virnetclientstream.c | 61 +++++++++++++++++++++++++++++++ src/rpc/virnetclientstream.h | 4 ++ 6 files changed, 105 insertions(+), 4 deletions(-) -- 2.32.0

So far, virStreamInData() is effectively a wrapper over virFDStreamInData() which means it deals with files which can be rewind (lseek()-ed) to whatever position we need. And in fact, that's what virFDStreamInData() does - it makes sure that the FD is left unchanged in terms of position in the file. Skipping the hole happens soon after - in daemonStreamHandleRead() when virStreamSendHole() is called. But this is about to change. Soon we will have another implementation where we won't be dealing with FDs but virNetMessage queue and it will be handy to pop message at the beginning of the queue. Implement and document this new behavior. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/libvirt-stream.c | 9 ++++++++- src/remote/remote_daemon_stream.c | 8 +++++--- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/src/libvirt-stream.c b/src/libvirt-stream.c index 873d7b1d4e..bacbbfd325 100644 --- a/src/libvirt-stream.c +++ b/src/libvirt-stream.c @@ -505,7 +505,14 @@ virStreamRecvHole(virStreamPtr stream, * hole: @data = false, @length > 0 * EOF: @data = false, @length = 0 * - * Returns 0 on success, + * The position in the underlying stream should not be changed + * upon return from this function, e.g. position in the + * underlying file is kept the same. For streams where this + * condition is impossible to meet, the function can return 1 to + * signal this to a caller. + * + * Returns 0 on success (stream position unchanged), + * 1 on success (stream position changed), * -1 otherwise */ int diff --git a/src/remote/remote_daemon_stream.c b/src/remote/remote_daemon_stream.c index 007ad73e27..eb7ed5edf3 100644 --- a/src/remote/remote_daemon_stream.c +++ b/src/remote/remote_daemon_stream.c @@ -894,9 +894,11 @@ daemonStreamHandleRead(virNetServerClient *client, msg = NULL; - /* We have successfully sent stream skip to the other side. - * To keep streams in sync seek locally too. */ - virStreamSendHole(stream->st, length, 0); + /* We have successfully sent stream skip to the other side. To + * keep streams in sync seek locally too (rv == 0), unless it's + * already done (rv == 1). */ + if (rv == 0) + virStreamSendHole(stream->st, length, 0); /* We're done with this call */ goto done; } -- 2.32.0

On Tue, Dec 07, 2021 at 04:34:40PM +0100, Michal Privoznik wrote:
So far, virStreamInData() is effectively a wrapper over virFDStreamInData() which means it deals with files which can be rewind
*rewound
(lseek()-ed) to whatever position we need. And in fact, that's what virFDStreamInData() does - it makes sure that the FD is left unchanged in terms of position in the file. Skipping the hole happens soon after - in daemonStreamHandleRead() when virStreamSendHole() is called.
But this is about to change. Soon we will have another implementation where we won't be dealing with FDs but virNetMessage queue and it will be handy to pop message at the beginning of the queue. Implement and document this new behavior.
Signed-off-by: Michal Privoznik <mprivozn@redhat.com>
Reviewed-by: Martin Kletzander <mkletzan@redhat.com>

The aim of this function is to look at a virNetClientStream and tell whether the incoming packet (if there's one) contains data (type VIR_NET_STREAM) or a hole (type VIR_NET_STREAM_HOLE) and how big the section is. This function will be called from the remote driver in one of future commits. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/libvirt_remote.syms | 1 + src/rpc/virnetclientstream.c | 61 ++++++++++++++++++++++++++++++++++++ src/rpc/virnetclientstream.h | 4 +++ 3 files changed, 66 insertions(+) diff --git a/src/libvirt_remote.syms b/src/libvirt_remote.syms index 942e1013a6..07d22e368b 100644 --- a/src/libvirt_remote.syms +++ b/src/libvirt_remote.syms @@ -66,6 +66,7 @@ virNetClientStreamEOF; virNetClientStreamEventAddCallback; virNetClientStreamEventRemoveCallback; virNetClientStreamEventUpdateCallback; +virNetClientStreamInData; virNetClientStreamMatches; virNetClientStreamNew; virNetClientStreamQueuePacket; diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index 1ba6167a1d..ffc702cdc3 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -801,3 +801,64 @@ bool virNetClientStreamEOF(virNetClientStream *st) { return st->incomingEOF; } + + +int virNetClientStreamInData(virNetClientStream *st, + int *inData, + long long *length) +{ + int ret = 0; + virNetMessage *msg = NULL; + + if (!st->allowSkip) { + virReportError(VIR_ERR_OPERATION_INVALID, "%s", + _("Holes are not supported with this stream")); + return -1; + } + + virObjectLock(st); + + if (virNetClientStreamCheckState(st) < 0) + goto cleanup; + + msg = st->rx; + + if (!msg) { + /* No incoming message. This means that the stream is at its end. In + * this case, virStreamInData() should set both inData and length to + * zero and return success. */ + *inData = 0; + *length = 0; + } else if (msg->header.type == VIR_NET_STREAM) { + *inData = 1; + *length = msg->bufferLength - msg->bufferOffset; + } else if (msg->header.type == VIR_NET_STREAM_HOLE) { + *inData = 0; + + if (st->holeLength == 0 && + virNetClientStreamHandleHole(NULL, st) < 0) + goto cleanup; + + *length = st->holeLength; + st->holeLength = 0; + + /* virNetClientStreamHandleHole() called above did pop the message from + * the queue (and freed it). Instead of trying to push it back let's + * just signal to the caller what we did. */ + ret = 1; + goto cleanup; + } else { + virReportError(VIR_ERR_INTERNAL_ERROR, + _("Invalid message prog=%d type=%d serial=%u proc=%d"), + msg->header.prog, + msg->header.type, + msg->header.serial, + msg->header.proc); + goto cleanup; + } + + ret = 0; + cleanup: + virObjectUnlock(st); + return ret; +} diff --git a/src/rpc/virnetclientstream.h b/src/rpc/virnetclientstream.h index e16d6e4a9a..7428843f9b 100644 --- a/src/rpc/virnetclientstream.h +++ b/src/rpc/virnetclientstream.h @@ -90,3 +90,7 @@ int virNetClientStreamEventRemoveCallback(virNetClientStream *st); bool virNetClientStreamEOF(virNetClientStream *st) ATTRIBUTE_NONNULL(1); + +int virNetClientStreamInData(virNetClientStream *st, + int *inData, + long long *length); -- 2.32.0

On Tue, Dec 07, 2021 at 04:34:41PM +0100, Michal Privoznik wrote:
The aim of this function is to look at a virNetClientStream and tell whether the incoming packet (if there's one) contains data (type VIR_NET_STREAM) or a hole (type VIR_NET_STREAM_HOLE) and how big the section is. This function will be called from the remote driver in one of future commits.
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/libvirt_remote.syms | 1 + src/rpc/virnetclientstream.c | 61 ++++++++++++++++++++++++++++++++++++ src/rpc/virnetclientstream.h | 4 +++ 3 files changed, 66 insertions(+)
diff --git a/src/libvirt_remote.syms b/src/libvirt_remote.syms index 942e1013a6..07d22e368b 100644 --- a/src/libvirt_remote.syms +++ b/src/libvirt_remote.syms @@ -66,6 +66,7 @@ virNetClientStreamEOF; virNetClientStreamEventAddCallback; virNetClientStreamEventRemoveCallback; virNetClientStreamEventUpdateCallback; +virNetClientStreamInData; virNetClientStreamMatches; virNetClientStreamNew; virNetClientStreamQueuePacket; diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index 1ba6167a1d..ffc702cdc3 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -801,3 +801,64 @@ bool virNetClientStreamEOF(virNetClientStream *st) { return st->incomingEOF; } + + +int virNetClientStreamInData(virNetClientStream *st, + int *inData, + long long *length) +{ + int ret = 0; + virNetMessage *msg = NULL; + + if (!st->allowSkip) {
The object should be already locked here I presume.
+ virReportError(VIR_ERR_OPERATION_INVALID, "%s", + _("Holes are not supported with this stream")); + return -1; + } + + virObjectLock(st); + + if (virNetClientStreamCheckState(st) < 0) + goto cleanup; + + msg = st->rx; + + if (!msg) { + /* No incoming message. This means that the stream is at its end. In + * this case, virStreamInData() should set both inData and length to + * zero and return success. */ + *inData = 0; + *length = 0; + } else if (msg->header.type == VIR_NET_STREAM) { + *inData = 1; + *length = msg->bufferLength - msg->bufferOffset; + } else if (msg->header.type == VIR_NET_STREAM_HOLE) { + *inData = 0; + + if (st->holeLength == 0 && + virNetClientStreamHandleHole(NULL, st) < 0)
I was never a fried with our way of decoding some data and I guess that this is one of the occasions and st->holeLength just shows whether the message is already decoded. One of the problems I have here is that most of the functions double check their input states, although this one does not, making it called conditionally, which seems weird ...
+ goto cleanup; + + *length = st->holeLength; + st->holeLength = 0; + + /* virNetClientStreamHandleHole() called above did pop the message from + * the queue (and freed it). Instead of trying to push it back let's + * just signal to the caller what we did. */
... especially when this comment makes it seem like it relies on that function being called. Anyway, it looks fine to me, so with the locking issue above fixed, Reviewed-by: Martin Kletzander <mkletzan@redhat.com>

On 12/8/21 21:57, Martin Kletzander wrote:
On Tue, Dec 07, 2021 at 04:34:41PM +0100, Michal Privoznik wrote:
The aim of this function is to look at a virNetClientStream and tell whether the incoming packet (if there's one) contains data (type VIR_NET_STREAM) or a hole (type VIR_NET_STREAM_HOLE) and how big the section is. This function will be called from the remote driver in one of future commits.
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/libvirt_remote.syms | 1 + src/rpc/virnetclientstream.c | 61 ++++++++++++++++++++++++++++++++++++ src/rpc/virnetclientstream.h | 4 +++ 3 files changed, 66 insertions(+)
diff --git a/src/libvirt_remote.syms b/src/libvirt_remote.syms index 942e1013a6..07d22e368b 100644 --- a/src/libvirt_remote.syms +++ b/src/libvirt_remote.syms @@ -66,6 +66,7 @@ virNetClientStreamEOF; virNetClientStreamEventAddCallback; virNetClientStreamEventRemoveCallback; virNetClientStreamEventUpdateCallback; +virNetClientStreamInData; virNetClientStreamMatches; virNetClientStreamNew; virNetClientStreamQueuePacket; diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index 1ba6167a1d..ffc702cdc3 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -801,3 +801,64 @@ bool virNetClientStreamEOF(virNetClientStream *st) { return st->incomingEOF; } + + +int virNetClientStreamInData(virNetClientStream *st, + int *inData, + long long *length) +{ + int ret = 0; + virNetMessage *msg = NULL; + + if (!st->allowSkip) {
The object should be already locked here I presume.
I could move the lock, sure. But allowSkip is set and can never change throughout stream lifetime (one simply doesn't change classic stream to be sparse or vice versa). Also, this is of "better double check" type - virStreamInData() shouldn't be ever called if stream isn't sparse, i.e. allowSkip == true.
+ virReportError(VIR_ERR_OPERATION_INVALID, "%s", + _("Holes are not supported with this stream")); + return -1; + } + + virObjectLock(st); + + if (virNetClientStreamCheckState(st) < 0) + goto cleanup; + + msg = st->rx; + + if (!msg) { + /* No incoming message. This means that the stream is at its end. In + * this case, virStreamInData() should set both inData and length to + * zero and return success. */ + *inData = 0; + *length = 0; + } else if (msg->header.type == VIR_NET_STREAM) { + *inData = 1; + *length = msg->bufferLength - msg->bufferOffset; + } else if (msg->header.type == VIR_NET_STREAM_HOLE) { + *inData = 0; + + if (st->holeLength == 0 && + virNetClientStreamHandleHole(NULL, st) < 0)
I was never a fried with our way of decoding some data and I guess that
s/fried/friend/ ;-)
this is one of the occasions and st->holeLength just shows whether the message is already decoded.
One of the problems I have here is that most of the functions double check their input states, although this one does not, making it called conditionally, which seems weird ...
I'm not sure I follow. In order to call the hole handler there has to be an incoming message of VIR_NET_STREAM_HOLE type and no hole pending. The decode function then checks the same conditions.
+ goto cleanup; + + *length = st->holeLength; + st->holeLength = 0; + + /* virNetClientStreamHandleHole() called above did pop the message from + * the queue (and freed it). Instead of trying to push it back let's + * just signal to the caller what we did. */
... especially when this comment makes it seem like it relies on that function being called.
Ah, yes, ret = 1 should be returned only if the function was called. Let me respin this patch.
Anyway, it looks fine to me, so with the locking issue above fixed,
Reviewed-by: Martin Kletzander <mkletzan@redhat.com>
Michal

On Thu, Dec 09, 2021 at 09:29:07AM +0100, Michal Prívozník wrote:
On 12/8/21 21:57, Martin Kletzander wrote:
On Tue, Dec 07, 2021 at 04:34:41PM +0100, Michal Privoznik wrote:
The aim of this function is to look at a virNetClientStream and tell whether the incoming packet (if there's one) contains data (type VIR_NET_STREAM) or a hole (type VIR_NET_STREAM_HOLE) and how big the section is. This function will be called from the remote driver in one of future commits.
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/libvirt_remote.syms | 1 + src/rpc/virnetclientstream.c | 61 ++++++++++++++++++++++++++++++++++++ src/rpc/virnetclientstream.h | 4 +++ 3 files changed, 66 insertions(+)
diff --git a/src/libvirt_remote.syms b/src/libvirt_remote.syms index 942e1013a6..07d22e368b 100644 --- a/src/libvirt_remote.syms +++ b/src/libvirt_remote.syms @@ -66,6 +66,7 @@ virNetClientStreamEOF; virNetClientStreamEventAddCallback; virNetClientStreamEventRemoveCallback; virNetClientStreamEventUpdateCallback; +virNetClientStreamInData; virNetClientStreamMatches; virNetClientStreamNew; virNetClientStreamQueuePacket; diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index 1ba6167a1d..ffc702cdc3 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -801,3 +801,64 @@ bool virNetClientStreamEOF(virNetClientStream *st) { return st->incomingEOF; } + + +int virNetClientStreamInData(virNetClientStream *st, + int *inData, + long long *length) +{ + int ret = 0; + virNetMessage *msg = NULL; + + if (!st->allowSkip) {
The object should be already locked here I presume.
I could move the lock, sure. But allowSkip is set and can never change throughout stream lifetime (one simply doesn't change classic stream to be sparse or vice versa). Also, this is of "better double check" type - virStreamInData() shouldn't be ever called if stream isn't sparse, i.e. allowSkip == true.
Since we're double-checking something we expect (allowSkip == true) just in case someone calls it without allowSkip, we should also make sure we have the lock just in case. It will never add any time nor complexity, it will be more readable and future-proof. Since we do not have a way to say which parts of a struct is covered by a lock and changeable (apart from a comment in said struct) we should not rely on that in places where it is not necessary (I know we have some places where this is needed). So it makes sense for me to move that even if we never implement changing the value after creation.
+ virReportError(VIR_ERR_OPERATION_INVALID, "%s", + _("Holes are not supported with this stream")); + return -1; + } + + virObjectLock(st); + + if (virNetClientStreamCheckState(st) < 0) + goto cleanup; + + msg = st->rx; + + if (!msg) { + /* No incoming message. This means that the stream is at its end. In + * this case, virStreamInData() should set both inData and length to + * zero and return success. */ + *inData = 0; + *length = 0; + } else if (msg->header.type == VIR_NET_STREAM) { + *inData = 1; + *length = msg->bufferLength - msg->bufferOffset; + } else if (msg->header.type == VIR_NET_STREAM_HOLE) { + *inData = 0; + + if (st->holeLength == 0 && + virNetClientStreamHandleHole(NULL, st) < 0)
I was never a fried with our way of decoding some data and I guess that
s/fried/friend/ ;-)
this is one of the occasions and st->holeLength just shows whether the message is already decoded.
One of the problems I have here is that most of the functions double check their input states, although this one does not, making it called conditionally, which seems weird ...
I'm not sure I follow. In order to call the hole handler there has to be an incoming message of VIR_NET_STREAM_HOLE type and no hole pending. The decode function then checks the same conditions.
+ goto cleanup; + + *length = st->holeLength; + st->holeLength = 0; + + /* virNetClientStreamHandleHole() called above did pop the message from + * the queue (and freed it). Instead of trying to push it back let's + * just signal to the caller what we did. */
... especially when this comment makes it seem like it relies on that function being called.
Ah, yes, ret = 1 should be returned only if the function was called. Let me respin this patch.
So that shows even more that I'm not a friend of this part of the codebase and my review for this patch should not be taken into account =D
Anyway, it looks fine to me, so with the locking issue above fixed,
Reviewed-by: Martin Kletzander <mkletzan@redhat.com>
Michal

When using the monolithic daemon the driver for virStream is always virFDStreamDrv and thus calling virStreamInData() results in calling virFDStreamInData(). But things are different with split daemon, especially when a client connects to one of hypervisor daemons (e.g. virtqemud) and then lets the daemon connect to the storage daemon for vol-upload/vol-download. Here, the hypervisor daemon acts like both client and server. This is reflected by stream->driver pointing to remoteStreamDrv, which doesn't have streamInData callback implemented and thus vol-upload/vol-download with sparse flag fails. Resolves: https://bugzilla.redhat.com/show_bug.cgi?id=2026537 Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/remote/remote_driver.c | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/src/remote/remote_driver.c b/src/remote/remote_driver.c index 235c406a5a..5b179a927d 100644 --- a/src/remote/remote_driver.c +++ b/src/remote/remote_driver.c @@ -5599,6 +5599,31 @@ remoteStreamRecvHole(virStreamPtr st, } +static int +remoteStreamInData(virStreamPtr st, + int *data, + long long *length) +{ + struct private_data *priv = st->conn->privateData; + virNetClientStream *privst = st->privateData; + int rv; + + VIR_DEBUG("st=%p data=%p length=%p", + st, data, length); + + remoteDriverLock(priv); + priv->localUses++; + remoteDriverUnlock(priv); + + rv = virNetClientStreamInData(privst, data, length); + + remoteDriverLock(priv); + priv->localUses--; + remoteDriverUnlock(priv); + return rv; +} + + struct remoteStreamCallbackData { virStreamPtr st; virStreamEventCallback cb; @@ -5745,6 +5770,7 @@ static virStreamDriver remoteStreamDrv = { .streamSend = remoteStreamSend, .streamSendHole = remoteStreamSendHole, .streamRecvHole = remoteStreamRecvHole, + .streamInData = remoteStreamInData, .streamFinish = remoteStreamFinish, .streamAbort = remoteStreamAbort, .streamEventAddCallback = remoteStreamEventAddCallback, -- 2.32.0

On Tue, Dec 07, 2021 at 04:34:42PM +0100, Michal Privoznik wrote:
When using the monolithic daemon the driver for virStream is always virFDStreamDrv and thus calling virStreamInData() results in calling virFDStreamInData().
But things are different with split daemon, especially when a client connects to one of hypervisor daemons (e.g. virtqemud) and then lets the daemon connect to the storage daemon for vol-upload/vol-download. Here, the hypervisor daemon acts like both client and server. This is reflected by stream->driver pointing to remoteStreamDrv, which doesn't have streamInData callback implemented and thus vol-upload/vol-download with sparse flag fails.
Resolves: https://bugzilla.redhat.com/show_bug.cgi?id=2026537 Signed-off-by: Michal Privoznik <mprivozn@redhat.com>
Reviewed-by: Martin Kletzander <mkletzan@redhat.com>

On Wed, Dec 08, 2021 at 09:57:41PM +0100, Martin Kletzander wrote:
On Tue, Dec 07, 2021 at 04:34:42PM +0100, Michal Privoznik wrote:
When using the monolithic daemon the driver for virStream is always virFDStreamDrv and thus calling virStreamInData() results in calling virFDStreamInData().
But things are different with split daemon, especially when a client connects to one of hypervisor daemons (e.g. virtqemud) and then lets the daemon connect to the storage daemon for vol-upload/vol-download. Here, the hypervisor daemon acts like both client and server. This is reflected by stream->driver pointing to remoteStreamDrv, which doesn't have streamInData callback implemented and thus vol-upload/vol-download with sparse flag fails.
Actually vol-upload does not fail for me even without these patches. But with your patches it works both ways, so my R-b stands.
Resolves: https://bugzilla.redhat.com/show_bug.cgi?id=2026537 Signed-off-by: Michal Privoznik <mprivozn@redhat.com>
Reviewed-by: Martin Kletzander <mkletzan@redhat.com>

The aim of this function is to look at a virNetClientStream and tell whether the incoming packet (if there's one) contains data (type VIR_NET_STREAM) or a hole (type VIR_NET_STREAM_HOLE) and how big the section is. This function will be called from the remote driver in one of future commits. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- diff to v1: - initialize ret to -1 - lock stream even for checking allowSkip - return 1 only if message was really popped from the queue - If there's no incoming message, return the size of yet unprocessed hole. This size should always be zero though, because at EOF there's a hole of size 0. I have not met this case and I probably never will, but I figured it's better to be safe than sorry. src/libvirt_remote.syms | 1 + src/rpc/virnetclientstream.c | 64 ++++++++++++++++++++++++++++++++++++ src/rpc/virnetclientstream.h | 4 +++ 3 files changed, 69 insertions(+) diff --git a/src/libvirt_remote.syms b/src/libvirt_remote.syms index 942e1013a6..07d22e368b 100644 --- a/src/libvirt_remote.syms +++ b/src/libvirt_remote.syms @@ -66,6 +66,7 @@ virNetClientStreamEOF; virNetClientStreamEventAddCallback; virNetClientStreamEventRemoveCallback; virNetClientStreamEventUpdateCallback; +virNetClientStreamInData; virNetClientStreamMatches; virNetClientStreamNew; virNetClientStreamQueuePacket; diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index 1ba6167a1d..eb4dc6854d 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -801,3 +801,67 @@ bool virNetClientStreamEOF(virNetClientStream *st) { return st->incomingEOF; } + + +int virNetClientStreamInData(virNetClientStream *st, + int *inData, + long long *length) +{ + int ret = -1; + bool msgPopped = false; + virNetMessage *msg = NULL; + + virObjectLock(st); + + if (!st->allowSkip) { + virReportError(VIR_ERR_OPERATION_INVALID, "%s", + _("Holes are not supported with this stream")); + goto cleanup; + } + + if (virNetClientStreamCheckState(st) < 0) + goto cleanup; + + msg = st->rx; + + if (!msg) { + /* No incoming message. This means that the stream is at its end. In + * this case, virStreamInData() should set both inData and length to + * zero and return success. If there is a trailing hole though (there + * shouldn't be), signal that to the caller. */ + *inData = 0; + *length = st->holeLength; + st->holeLength = 0; + } else if (msg->header.type == VIR_NET_STREAM) { + *inData = 1; + *length = msg->bufferLength - msg->bufferOffset; + } else if (msg->header.type == VIR_NET_STREAM_HOLE) { + *inData = 0; + + if (st->holeLength == 0) { + if (virNetClientStreamHandleHole(NULL, st) < 0) + goto cleanup; + + /* virNetClientStreamHandleHole() called above did pop the message from + * the queue (and freed it). Instead of trying to push it back let's + * just signal to the caller what we did. */ + msgPopped = true; + } + + *length = st->holeLength; + st->holeLength = 0; + } else { + virReportError(VIR_ERR_INTERNAL_ERROR, + _("Invalid message prog=%d type=%d serial=%u proc=%d"), + msg->header.prog, + msg->header.type, + msg->header.serial, + msg->header.proc); + goto cleanup; + } + + ret = msgPopped ? 1 : 0; + cleanup: + virObjectUnlock(st); + return ret; +} diff --git a/src/rpc/virnetclientstream.h b/src/rpc/virnetclientstream.h index e16d6e4a9a..7428843f9b 100644 --- a/src/rpc/virnetclientstream.h +++ b/src/rpc/virnetclientstream.h @@ -90,3 +90,7 @@ int virNetClientStreamEventRemoveCallback(virNetClientStream *st); bool virNetClientStreamEOF(virNetClientStream *st) ATTRIBUTE_NONNULL(1); + +int virNetClientStreamInData(virNetClientStream *st, + int *inData, + long long *length); -- 2.32.0

On Thu, Dec 09, 2021 at 09:44:22AM +0100, Michal Privoznik wrote:
The aim of this function is to look at a virNetClientStream and tell whether the incoming packet (if there's one) contains data (type VIR_NET_STREAM) or a hole (type VIR_NET_STREAM_HOLE) and how big the section is. This function will be called from the remote driver in one of future commits.
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- diff to v1: - initialize ret to -1 - lock stream even for checking allowSkip - return 1 only if message was really popped from the queue - If there's no incoming message, return the size of yet unprocessed hole. This size should always be zero though, because at EOF there's a hole of size 0. I have not met this case and I probably never will, but I figured it's better to be safe than sorry.
I tested as much as I could with this and it works both ways the way it should. Reviewed-by: Martin Kletzander <mkletzan@redhat.com> or maybe Tested-by: ??? =)
participants (3)
-
Martin Kletzander
-
Michal Privoznik
-
Michal Prívozník