* qemud/stream.c: Handle incoming stream data packets, queuing until
stream becomes writable. Handle stream completion handshake
* po/POTFILES.in: Add qemud/stream.c
---
po/POTFILES.in | 1 +
qemud/stream.c | 305 +++++++++++++++++++++++++++++++++++++++++++++++++++++++-
2 files changed, 303 insertions(+), 3 deletions(-)
diff --git a/po/POTFILES.in b/po/POTFILES.in
index 66d3ebd..d144689 100644
--- a/po/POTFILES.in
+++ b/po/POTFILES.in
@@ -1,6 +1,7 @@
qemud/dispatch.c
qemud/qemud.c
qemud/remote.c
+qemud/stream.c
src/bridge.c
src/conf.c
src/console.c
diff --git a/qemud/stream.c b/qemud/stream.c
index 1644a1b..1fe0e58 100644
--- a/qemud/stream.c
+++ b/qemud/stream.c
@@ -28,6 +28,93 @@
#include "dispatch.h"
#include "logging.h"
+static int
+remoteStreamHandleWrite(struct qemud_client *client,
+ struct qemud_client_stream *stream);
+static int
+remoteStreamHandleFinish(struct qemud_client *client,
+ struct qemud_client_stream *stream,
+ struct qemud_client_message *msg);
+static int
+remoteStreamHandleAbort(struct qemud_client *client,
+ struct qemud_client_stream *stream,
+ struct qemud_client_message *msg);
+
+
+
+static void
+remoteStreamUpdateEvents(struct qemud_client_stream *stream)
+{
+ int newEvents = 0;
+ if (stream->rx)
+ newEvents |= VIR_STREAM_EVENT_WRITABLE;
+
+ virStreamEventUpdateCallback(stream->st, newEvents);
+}
+
+
+/*
+ * Callback that gets invoked when a stream becomes writable/readable
+ */
+static void
+remoteStreamEvent(virStreamPtr st, int events, void *opaque)
+{
+ struct qemud_client *client = opaque;
+ struct qemud_client_stream *stream;
+
+ /* XXX sub-optimal - we really should be taking the server lock
+ * first, but we have no handle to the server object
+ * We're lucky to get away with it for now, due to this callback
+ * executing in the main thread, but this should really be fixed
+ */
+ virMutexLock(&client->lock);
+
+ stream = remoteFindClientStream(client, st);
+
+ if (!stream) {
+ VIR_WARN("event for client=%p stream st=%p, but missing stream state",
client, st);
+ virStreamEventRemoveCallback(st);
+ goto cleanup;
+ }
+
+ DEBUG("st=%p events=%d", st, events);
+
+ if (events & VIR_STREAM_EVENT_WRITABLE) {
+ if (remoteStreamHandleWrite(client, stream) < 0) {
+ remoteRemoveClientStream(client, stream);
+ qemudDispatchClientFailure(client);
+ goto cleanup;
+ }
+ }
+
+ if (!stream->closed &&
+ (events & (VIR_STREAM_EVENT_ERROR | VIR_STREAM_EVENT_HANGUP))) {
+ int ret;
+ remote_error rerr;
+ memset(&rerr, 0, sizeof rerr);
+ stream->closed = 1;
+ virStreamAbort(stream->st);
+ if (events & VIR_STREAM_EVENT_HANGUP)
+ remoteDispatchFormatError(&rerr, "%s", _("stream had
unexpected termination"));
+ else
+ remoteDispatchFormatError(&rerr, "%s", _("stream had I/O
failure"));
+ ret = remoteSerializeStreamError(client, &rerr, stream->procedure,
stream->serial);
+ remoteRemoveClientStream(client, stream);
+ if (ret < 0)
+ qemudDispatchClientFailure(client);
+ goto cleanup;
+ }
+
+ if (stream->closed) {
+ remoteRemoveClientStream(client, stream);
+ } else {
+ remoteStreamUpdateEvents(stream);
+ }
+
+cleanup:
+ virMutexUnlock(&client->lock);
+}
+
/*
* @client: a locked client object
@@ -38,10 +125,54 @@
* -1 on fatal client error
*/
static int
-remoteStreamFilter(struct qemud_client *client ATTRIBUTE_UNUSED,
- struct qemud_client_message *msg ATTRIBUTE_UNUSED,
- void *opaque ATTRIBUTE_UNUSED)
+remoteStreamFilter(struct qemud_client *client,
+ struct qemud_client_message *msg, void *opaque)
{
+ struct qemud_client_stream *stream = opaque;
+
+ if (msg->hdr.serial == stream->serial &&
+ msg->hdr.proc == stream->procedure &&
+ msg->hdr.type == REMOTE_STREAM) {
+ DEBUG("Incoming rx=%p serial=%d proc=%d status=%d",
+ stream->rx, msg->hdr.proc, msg->hdr.serial, msg->hdr.status);
+
+ /* If there are queued packets, we need to queue all further
+ * messages, since they must be processed strictly in order.
+ * If there are no queued packets, then OK/ERROR messages
+ * should be processed immediately. Data packets are still
+ * queued to only be processed when the stream is marked as
+ * writable.
+ */
+ if (stream->rx) {
+ qemudClientMessageQueuePush(&stream->rx, msg);
+ remoteStreamUpdateEvents(stream);
+ } else {
+ int ret = 0;
+ switch (msg->hdr.status) {
+ case REMOTE_OK:
+ ret = remoteStreamHandleFinish(client, stream, msg);
+ if (ret == 0)
+ qemudClientMessageRelease(client, msg);
+ break;
+
+ case REMOTE_CONTINUE:
+ qemudClientMessageQueuePush(&stream->rx, msg);
+ remoteStreamUpdateEvents(stream);
+ break;
+
+ case REMOTE_ERROR:
+ default:
+ ret = remoteStreamHandleAbort(client, stream, msg);
+ if (ret == 0)
+ qemudClientMessageRelease(client, msg);
+ break;
+ }
+
+ if (ret < 0)
+ return -1;
+ }
+ return 1;
+ }
return 0;
}
@@ -119,6 +250,10 @@ int remoteAddClientStream(struct qemud_client *client,
DEBUG("client=%p proc=%d serial=%d", client, stream->procedure,
stream->serial);
+ if (virStreamEventAddCallback(stream->st, 0,
+ remoteStreamEvent, client, NULL) < 0)
+ return -1;
+
if (tmp) {
while (tmp->next)
tmp = tmp->next;
@@ -132,6 +267,8 @@ int remoteAddClientStream(struct qemud_client *client,
stream->tx = 1;
+ remoteStreamUpdateEvents(stream);
+
return 0;
}
@@ -208,3 +345,165 @@ remoteRemoveClientStream(struct qemud_client *client,
}
return -1;
}
+
+
+/*
+ * Returns:
+ * -1 if fatal error occurred
+ * 0 if message was fully processed
+ * 1 if message is still being processed
+ */
+static int
+remoteStreamHandleWriteData(struct qemud_client *client,
+ struct qemud_client_stream *stream,
+ struct qemud_client_message *msg)
+{
+ remote_error rerr;
+ int ret;
+
+ DEBUG("stream=%p proc=%d serial=%d len=%d offset=%d",
+ stream, msg->hdr.proc, msg->hdr.serial, msg->bufferLength,
msg->bufferOffset);
+
+ memset(&rerr, 0, sizeof rerr);
+
+ ret = virStreamSend(stream->st,
+ msg->buffer + msg->bufferOffset,
+ msg->bufferLength - msg->bufferOffset);
+
+ if (ret > 0) {
+ msg->bufferOffset += ret;
+
+ /* Partial write, so indicate we have more todo later */
+ if (msg->bufferOffset < msg->bufferLength)
+ return 1;
+ } else if (ret == -2) {
+ /* Blocking, so indicate we have more todo later */
+ return 1;
+ } else {
+ VIR_INFO0("Stream send failed");
+ stream->closed = 1;
+ remoteDispatchConnError(&rerr, client->conn);
+ return remoteSerializeReplyError(client, &rerr, &msg->hdr);
+ }
+
+ return 0;
+}
+
+
+/*
+ * Process an finish handshake from the client.
+ *
+ * Returns a REMOTE_OK confirmation if successful, or a REMOTE_ERROR
+ * if there was a stream error
+ *
+ * Returns 0 if successfully sent RPC reply, -1 upon fatal error
+ */
+static int
+remoteStreamHandleFinish(struct qemud_client *client,
+ struct qemud_client_stream *stream,
+ struct qemud_client_message *msg)
+{
+ remote_error rerr;
+ int ret;
+
+ DEBUG("stream=%p proc=%d serial=%d",
+ stream, msg->hdr.proc, msg->hdr.serial);
+
+ memset(&rerr, 0, sizeof rerr);
+
+ stream->closed = 1;
+ ret = virStreamFinish(stream->st);
+
+ if (ret < 0) {
+ remoteDispatchConnError(&rerr, client->conn);
+ return remoteSerializeReplyError(client, &rerr, &msg->hdr);
+ } else {
+ /* Send zero-length confirm */
+ if (remoteSendStreamData(client, stream, NULL, 0) < 0)
+ return -1;
+ }
+
+ return 0;
+}
+
+
+/*
+ * Process an abort request from the client.
+ *
+ * Returns 0 if successfully aborted, -1 upon error
+ */
+static int
+remoteStreamHandleAbort(struct qemud_client *client,
+ struct qemud_client_stream *stream,
+ struct qemud_client_message *msg)
+{
+ remote_error rerr;
+
+ DEBUG("stream=%p proc=%d serial=%d",
+ stream, msg->hdr.proc, msg->hdr.serial);
+
+ memset(&rerr, 0, sizeof rerr);
+
+ stream->closed = 1;
+ virStreamAbort(stream->st);
+
+ if (msg->hdr.status == REMOTE_ERROR)
+ remoteDispatchFormatError(&rerr, "%s", _("stream aborted at
client request"));
+ else {
+ VIR_WARN("unexpected stream status %d", msg->hdr.status);
+ remoteDispatchFormatError(&rerr, _("stream aborted with unexpected
status %d"),
+ msg->hdr.status);
+ }
+
+ return remoteSerializeReplyError(client, &rerr, &msg->hdr);
+}
+
+
+
+/*
+ * Called when the stream is signalled has being able to accept
+ * data writes. Will process all pending incoming messages
+ * until they're all gone, or I/O blocks
+ *
+ * Returns 0 on success, or -1 upon fatal error
+ */
+static int
+remoteStreamHandleWrite(struct qemud_client *client,
+ struct qemud_client_stream *stream)
+{
+ struct qemud_client_message *msg, *tmp;
+
+ DEBUG("stream=%p", stream);
+
+ msg = stream->rx;
+ while (msg && !stream->closed) {
+ int ret;
+ switch (msg->hdr.status) {
+ case REMOTE_OK:
+ ret = remoteStreamHandleFinish(client, stream, msg);
+ break;
+
+ case REMOTE_CONTINUE:
+ ret = remoteStreamHandleWriteData(client, stream, msg);
+ break;
+
+ case REMOTE_ERROR:
+ default:
+ ret = remoteStreamHandleAbort(client, stream, msg);
+ break;
+ }
+
+ if (ret == 0)
+ qemudClientMessageQueueServe(&stream->rx);
+ else if (ret < 0)
+ return -1;
+ else
+ break; /* still processing data */
+
+ tmp = msg->next;
+ qemudClientMessageRelease(client, msg);
+ msg = tmp;
+ }
+
+ return 0;
+}
--
1.6.2.5