* qemud/dispatch.c: Set streamTX flag on outgoing data packets
* qemud/qemud.h: Add streamTX flag to track outgoing data
* qemud/qemud.c: Re-enable further TX when outgoing data packet
has been fully sent.
* qemud/stream.h, qemud/strea.c: Add method for enabling TX.
Support reading from streams and transmitting data out to client
---
qemud/dispatch.c | 2 +
qemud/qemud.c | 4 ++-
qemud/qemud.h | 1 +
qemud/stream.c | 96 ++++++++++++++++++++++++++++++++++++++++++++++++++++++
qemud/stream.h | 4 ++
5 files changed, 106 insertions(+), 1 deletions(-)
diff --git a/qemud/dispatch.c b/qemud/dispatch.c
index 1934d24..7417001 100644
--- a/qemud/dispatch.c
+++ b/qemud/dispatch.c
@@ -636,6 +636,8 @@ remoteSendStreamData(struct qemud_client *client,
DEBUG("Total %d", msg->bufferOffset);
}
+ if (data)
+ msg->streamTX = 1;
/* Reset ready for I/O */
msg->bufferLength = msg->bufferOffset;
diff --git a/qemud/qemud.c b/qemud/qemud.c
index 6c81dec..af71495 100644
--- a/qemud/qemud.c
+++ b/qemud/qemud.c
@@ -1893,7 +1893,9 @@ void
qemudClientMessageRelease(struct qemud_client *client,
struct qemud_client_message *msg)
{
- if (!msg->async)
+ if (msg->streamTX) {
+ remoteStreamMessageFinished(client, msg);
+ } else if (!msg->async)
client->nrequests--;
/* See if the recv queue is currently throttled */
diff --git a/qemud/qemud.h b/qemud/qemud.h
index 8ef5871..911cdc3 100644
--- a/qemud/qemud.h
+++ b/qemud/qemud.h
@@ -130,6 +130,7 @@ struct qemud_client_message {
unsigned int bufferOffset;
unsigned int async : 1;
+ unsigned int streamTX : 1;
remote_message_header hdr;
diff --git a/qemud/stream.c b/qemud/stream.c
index 1fe0e58..584268d 100644
--- a/qemud/stream.c
+++ b/qemud/stream.c
@@ -32,6 +32,9 @@ static int
remoteStreamHandleWrite(struct qemud_client *client,
struct qemud_client_stream *stream);
static int
+remoteStreamHandleRead(struct qemud_client *client,
+ struct qemud_client_stream *stream);
+static int
remoteStreamHandleFinish(struct qemud_client *client,
struct qemud_client_stream *stream,
struct qemud_client_message *msg);
@@ -48,6 +51,8 @@ remoteStreamUpdateEvents(struct qemud_client_stream *stream)
int newEvents = 0;
if (stream->rx)
newEvents |= VIR_STREAM_EVENT_WRITABLE;
+ if (stream->tx && !stream->recvEOF)
+ newEvents |= VIR_STREAM_EVENT_READABLE;
virStreamEventUpdateCallback(stream->st, newEvents);
}
@@ -87,6 +92,16 @@ remoteStreamEvent(virStreamPtr st, int events, void *opaque)
}
}
+ if (!stream->recvEOF &&
+ (events & (VIR_STREAM_EVENT_READABLE | VIR_STREAM_EVENT_HANGUP))) {
+ events = events & ~(VIR_STREAM_EVENT_READABLE | VIR_STREAM_EVENT_HANGUP);
+ if (remoteStreamHandleRead(client, stream) < 0) {
+ remoteRemoveClientStream(client, stream);
+ qemudDispatchClientFailure(client);
+ goto cleanup;
+ }
+ }
+
if (!stream->closed &&
(events & (VIR_STREAM_EVENT_ERROR | VIR_STREAM_EVENT_HANGUP))) {
int ret;
@@ -507,3 +522,84 @@ remoteStreamHandleWrite(struct qemud_client *client,
return 0;
}
+
+
+
+/*
+ * Invoked when a stream is signalled as having data
+ * available to read. This reads upto one message
+ * worth of data, and then queues that for transmission
+ * to the client.
+ *
+ * Returns 0 if data was queued for TX, or a error RPC
+ * was sent, or -1 on fatal error, indicating client should
+ * be killed
+ */
+static int
+remoteStreamHandleRead(struct qemud_client *client,
+ struct qemud_client_stream *stream)
+{
+ char *buffer;
+ size_t bufferLen = REMOTE_MESSAGE_PAYLOAD_MAX;
+ int ret;
+
+ DEBUG("stream=%p", stream);
+
+ /* Shouldn't ever be called unless we're marked able to
+ * transmit, but doesn't hurt to check */
+ if (!stream->tx)
+ return 0;
+
+ if (VIR_ALLOC_N(buffer, bufferLen) < 0)
+ return -1;
+
+ ret = virStreamRecv(stream->st, buffer, bufferLen);
+ if (ret == -2) {
+ /* Should never get this, since we're only called when we know
+ * we're readable, but hey things change... */
+ ret = 0;
+ } else if (ret < 0) {
+ remote_error rerr;
+ memset(&rerr, 0, sizeof rerr);
+ remoteDispatchConnError(&rerr, NULL);
+
+ ret = remoteSerializeStreamError(client, &rerr, stream->procedure,
stream->serial);
+ } else {
+ stream->tx = 0;
+ if (ret == 0)
+ stream->recvEOF = 1;
+ ret = remoteSendStreamData(client, stream, buffer, ret);
+ }
+
+ VIR_FREE(buffer);
+ return ret;
+}
+
+
+/*
+ * Invoked when an outgoing data packet message has been fully sent.
+ * This simply re-enables TX of further data.
+ *
+ * The idea is to stop the daemon growing without bound due to
+ * fast stream, but slow client
+ */
+void
+remoteStreamMessageFinished(struct qemud_client *client,
+ struct qemud_client_message *msg)
+{
+ struct qemud_client_stream *stream = client->streams;
+
+ while (stream) {
+ if (msg->hdr.proc == stream->procedure &&
+ msg->hdr.serial == stream->serial)
+ break;
+ stream = stream->next;
+ }
+
+ DEBUG("Message client=%p stream=%p proc=%d serial=%d", client, stream,
msg->hdr.proc, msg->hdr.serial);
+
+ if (stream) {
+ stream->tx = 1;
+ remoteStreamUpdateEvents(stream);
+ }
+}
diff --git a/qemud/stream.h b/qemud/stream.h
index fe5ce6f..de738ba 100644
--- a/qemud/stream.h
+++ b/qemud/stream.h
@@ -46,4 +46,8 @@ int
remoteRemoveClientStream(struct qemud_client *client,
struct qemud_client_stream *stream);
+void
+remoteStreamMessageFinished(struct qemud_client *client,
+ struct qemud_client_message *msg);
+
#endif /* __LIBVIRTD_STREAM_H__ */
--
1.6.2.5