When the remote client receives end of file on the stream
it never invokes the stream callback. Applications relying
on async event driven I/O will thus never see the EOF
condition on the stream
* src/rpc/virnetclient.c, src/rpc/virnetclientstream.c:
Ensure EOF is dispatched
---
src/rpc/virnetclient.c | 3 --
src/rpc/virnetclientstream.c | 43 ++++++++++++++++++++++++-----------------
2 files changed, 25 insertions(+), 21 deletions(-)
diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c
index dc0ce51..39bdf14 100644
--- a/src/rpc/virnetclient.c
+++ b/src/rpc/virnetclient.c
@@ -580,9 +580,6 @@ static int virNetClientCallDispatchStream(virNetClientPtr client)
if (thecall && thecall->expectReply) {
VIR_DEBUG("Got sync data packet completion");
thecall->mode = VIR_NET_CLIENT_MODE_COMPLETE;
- } else {
- // XXX
- //remoteStreamEventTimerUpdate(privst);
}
return 0;
}
diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c
index 9da5aee..d5efab1 100644
--- a/src/rpc/virnetclientstream.c
+++ b/src/rpc/virnetclientstream.c
@@ -55,6 +55,7 @@ struct _virNetClientStream {
char *incoming;
size_t incomingOffset;
size_t incomingLength;
+ bool incomingEOF;
virNetClientStreamEventCallback cb;
void *cbOpaque;
@@ -73,7 +74,7 @@ virNetClientStreamEventTimerUpdate(virNetClientStreamPtr st)
VIR_DEBUG("Check timer offset=%zu %d", st->incomingOffset,
st->cbEvents);
- if ((st->incomingOffset &&
+ if (((st->incomingOffset || st->incomingEOF) &&
(st->cbEvents & VIR_STREAM_EVENT_READABLE)) ||
(st->cbEvents & VIR_STREAM_EVENT_WRITABLE)) {
VIR_DEBUG("Enabling event timer");
@@ -96,7 +97,7 @@ virNetClientStreamEventTimer(int timer ATTRIBUTE_UNUSED, void *opaque)
if (st->cb &&
(st->cbEvents & VIR_STREAM_EVENT_READABLE) &&
- st->incomingOffset)
+ (st->incomingOffset || st->incomingEOF))
events |= VIR_STREAM_EVENT_READABLE;
if (st->cb &&
(st->cbEvents & VIR_STREAM_EVENT_WRITABLE))
@@ -284,24 +285,30 @@ int virNetClientStreamQueuePacket(virNetClientStreamPtr st,
virMutexLock(&st->lock);
need = msg->bufferLength - msg->bufferOffset;
- size_t avail = st->incomingLength - st->incomingOffset;
- if (need > avail) {
- size_t extra = need - avail;
- if (VIR_REALLOC_N(st->incoming,
- st->incomingLength + extra) < 0) {
- VIR_DEBUG("Out of memory handling stream data");
- goto cleanup;
+ if (need) {
+ size_t avail = st->incomingLength - st->incomingOffset;
+ if (need > avail) {
+ size_t extra = need - avail;
+ if (VIR_REALLOC_N(st->incoming,
+ st->incomingLength + extra) < 0) {
+ VIR_DEBUG("Out of memory handling stream data");
+ goto cleanup;
+ }
+ st->incomingLength += extra;
}
- st->incomingLength += extra;
- }
- memcpy(st->incoming + st->incomingOffset,
- msg->buffer + msg->bufferOffset,
- msg->bufferLength - msg->bufferOffset);
- st->incomingOffset += (msg->bufferLength - msg->bufferOffset);
+ memcpy(st->incoming + st->incomingOffset,
+ msg->buffer + msg->bufferOffset,
+ msg->bufferLength - msg->bufferOffset);
+ st->incomingOffset += (msg->bufferLength - msg->bufferOffset);
+ } else {
+ st->incomingEOF = true;
+ }
- VIR_DEBUG("Stream incoming data offset %zu length %zu",
- st->incomingOffset, st->incomingLength);
+ VIR_DEBUG("Stream incoming data offset %zu length %zu EOF %d",
+ st->incomingOffset, st->incomingLength,
+ st->incomingEOF);
+ virNetClientStreamEventTimerUpdate(st);
ret = 0;
@@ -372,7 +379,7 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st,
VIR_DEBUG("st=%p client=%p data=%p nbytes=%zu nonblock=%d",
st, client, data, nbytes, nonblock);
virMutexLock(&st->lock);
- if (!st->incomingOffset) {
+ if (!st->incomingOffset && !st->incomingEOF) {
virNetMessagePtr msg;
int ret;
--
1.7.4.4