* src/remote_internal.c: Add helper APIs for processing data streams
---
src/remote_internal.c | 530 ++++++++++++++++++++++++++++++++++++++++++++++++-
1 files changed, 524 insertions(+), 6 deletions(-)
diff --git a/src/remote_internal.c b/src/remote_internal.c
index de3c288..12e3bb9 100644
--- a/src/remote_internal.c
+++ b/src/remote_internal.c
@@ -1,4 +1,3 @@
-
/*
* remote_internal.c: driver to provide access to libvirtd running
* on a remote machine
@@ -111,7 +110,8 @@ enum {
struct remote_thread_call {
int mode;
- /* 4 byte length, followed by RPC message header+body */
+ /* Buffer for outgoing data packet
+ * 4 byte length, followed by RPC message header+body */
char buffer[4 + REMOTE_MESSAGE_MAX];
unsigned int bufferLength;
unsigned int bufferOffset;
@@ -121,6 +121,7 @@ struct remote_thread_call {
virCond cond;
+ int want_reply;
xdrproc_t ret_filter;
char *ret;
@@ -129,6 +130,26 @@ struct remote_thread_call {
struct remote_thread_call *next;
};
+struct private_stream_data {
+ unsigned int has_error : 1;
+ remote_error err;
+
+ unsigned int serial;
+ unsigned int proc_nr;
+
+ /* XXX this is potentially unbounded if the client
+ * app has domain events registered, since packets
+ * may be read off wire, while app isn't ready to
+ * recv them. Figure out how to address this some
+ * time....
+ */
+ char *incoming;
+ unsigned int incomingOffset;
+ unsigned int incomingLength;
+
+ struct private_stream_data *next;
+};
+
struct private_data {
virMutex lock;
@@ -155,7 +176,8 @@ struct private_data {
unsigned int saslEncodedOffset;
#endif
- /* 4 byte length, followed by RPC message header+body */
+ /* Buffer for incoming data packets
+ * 4 byte length, followed by RPC message header+body */
char buffer[4 + REMOTE_MESSAGE_MAX];
unsigned int bufferLength;
unsigned int bufferOffset;
@@ -176,6 +198,8 @@ struct private_data {
/* List of threads currently waiting for dispatch */
struct remote_thread_call *waitDispatch;
+
+ struct private_stream_data *streams;
};
enum {
@@ -194,6 +218,10 @@ static void remoteDriverUnlock(struct private_data *driver)
virMutexUnlock(&driver->lock);
}
+static int remoteIO(virConnectPtr conn,
+ struct private_data *priv,
+ int flags,
+ struct remote_thread_call *thiscall);
static int call (virConnectPtr conn, struct private_data *priv,
int flags, int proc_nr,
xdrproc_t args_filter, char *args,
@@ -6319,6 +6347,361 @@ done:
return rv;
}
+
+#if 0
+static struct private_stream_data *
+remoteStreamOpen(virStreamPtr st,
+ int output ATTRIBUTE_UNUSED,
+ unsigned int proc_nr,
+ unsigned int serial)
+{
+ struct private_data *priv = st->conn->privateData;
+ struct private_stream_data *stpriv;
+
+ if (VIR_ALLOC(stpriv) < 0)
+ return NULL;
+
+ /* Initialize call object used to receive replies */
+ stpriv->proc_nr = proc_nr;
+ stpriv->serial = serial;
+
+ stpriv->next = priv->streams;
+ priv->streams = stpriv;
+
+ return stpriv;
+}
+
+
+static int
+remoteStreamPacket(virStreamPtr st,
+ int status,
+ const char *data,
+ size_t nbytes)
+{
+ DEBUG("st=%p status=%d data=%p nbytes=%d", st, status, data, nbytes);
+ struct private_data *priv = st->conn->privateData;
+ struct private_stream_data *privst = st->privateData;
+ XDR xdr;
+ struct remote_thread_call *thiscall;
+ remote_message_header hdr;
+
+ memset(&hdr, 0, sizeof hdr);
+
+ if (VIR_ALLOC(thiscall) < 0) {
+ virReportOOMError(st->conn);
+ return -1;
+ }
+
+ thiscall->mode = REMOTE_MODE_WAIT_TX;
+ thiscall->serial = privst->serial;
+ thiscall->proc_nr = privst->proc_nr;
+ if (status == REMOTE_OK ||
+ status == REMOTE_ERROR)
+ thiscall->want_reply = 1;
+
+ if (virCondInit(&thiscall->cond) < 0) {
+ VIR_FREE(thiscall);
+ error (st->conn, VIR_ERR_INTERNAL_ERROR,
+ _("cannot initialize mutex"));
+ return -1;
+ }
+
+ /* Don't fill in any other fields in 'thiscall' since
+ * we're not expecting a reply for this */
+
+ hdr.prog = REMOTE_PROGRAM;
+ hdr.vers = REMOTE_PROTOCOL_VERSION;
+ hdr.proc = privst->proc_nr;
+ hdr.type = REMOTE_STREAM;
+ hdr.serial = privst->serial;
+ hdr.status = status;
+
+
+ /* Length must include the length word itself (always encoded in
+ * 4 bytes as per RFC 4506), so offset start length. We write this
+ * later.
+ */
+ thiscall->bufferLength = REMOTE_MESSAGE_HEADER_XDR_LEN;
+
+ /* Serialise header followed by args. */
+ xdrmem_create (&xdr, thiscall->buffer + thiscall->bufferLength,
+ REMOTE_MESSAGE_MAX, XDR_ENCODE);
+ if (!xdr_remote_message_header (&xdr, &hdr)) {
+ error (st->conn,
+ VIR_ERR_RPC, _("xdr_remote_message_header failed"));
+ goto error;
+ }
+
+ thiscall->bufferLength += xdr_getpos (&xdr);
+ xdr_destroy (&xdr);
+
+ if (status == REMOTE_CONTINUE) {
+ if (((4 + REMOTE_MESSAGE_MAX) - thiscall->bufferLength) < nbytes) {
+ errorf(st->conn,
+ VIR_ERR_RPC, _("data size %d too large for payload %d"),
+ nbytes, ((4 + REMOTE_MESSAGE_MAX) - thiscall->bufferLength));
+ goto error;
+ }
+
+ memcpy(thiscall->buffer + thiscall->bufferLength, data, nbytes);
+ thiscall->bufferLength += nbytes;
+ }
+
+ /* Go back to packet start and encode the length word. */
+ xdrmem_create (&xdr, thiscall->buffer, REMOTE_MESSAGE_HEADER_XDR_LEN,
XDR_ENCODE);
+ if (!xdr_u_int (&xdr, &thiscall->bufferLength)) {
+ error(st->conn, VIR_ERR_RPC,
+ _("xdr_u_int (length word)"));
+ goto error;
+ }
+ xdr_destroy (&xdr);
+
+ /* remoteIO frees 'thiscall' for us (XXX that's dubious semantics) */
+ if (remoteIO(st->conn, priv, 0, thiscall) < 0)
+ return -1;
+
+ return nbytes;
+
+error:
+ xdr_destroy (&xdr);
+ VIR_FREE(thiscall);
+ return -1;
+}
+
+static int
+remoteStreamHasError(virStreamPtr st) {
+ struct private_stream_data *privst = st->privateData;
+ if (!privst->has_error) {
+ return 0;
+ }
+
+ VIR_WARN0("Raising async error");
+ virRaiseErrorFull(st->conn,
+ __FILE__, __FUNCTION__, __LINE__,
+ privst->err.domain,
+ privst->err.code,
+ privst->err.level,
+ privst->err.str1 ? *privst->err.str1 : NULL,
+ privst->err.str2 ? *privst->err.str2 : NULL,
+ privst->err.str3 ? *privst->err.str3 : NULL,
+ privst->err.int1,
+ privst->err.int2,
+ "%s", privst->err.message ? *privst->err.message :
NULL);
+
+ return 1;
+}
+
+static void
+remoteStreamRelease(virStreamPtr st)
+{
+ struct private_data *priv = st->conn->privateData;
+ struct private_stream_data *privst = st->privateData;
+
+ if (priv->streams == privst)
+ priv->streams = privst->next;
+ else {
+ struct private_stream_data *tmp = priv->streams;
+ while (tmp && tmp->next) {
+ if (tmp->next == privst) {
+ tmp->next = privst->next;
+ break;
+ }
+ }
+ }
+
+ if (privst->has_error)
+ xdr_free((xdrproc_t)xdr_remote_error, (char *)&privst->err);
+
+ VIR_FREE(privst);
+
+ st->driver = NULL;
+ st->privateData = NULL;
+}
+
+
+static int
+remoteStreamSend(virStreamPtr st,
+ const char *data,
+ size_t nbytes)
+{
+ DEBUG("st=%p data=%p nbytes=%d", st, data, nbytes);
+ struct private_data *priv = st->conn->privateData;
+ int rv = -1;
+
+ remoteDriverLock(priv);
+
+ if (remoteStreamHasError(st))
+ goto cleanup;
+
+ rv = remoteStreamPacket(st,
+ REMOTE_CONTINUE,
+ data,
+ nbytes);
+
+cleanup:
+ if (rv == -1)
+ remoteStreamRelease(st);
+
+ remoteDriverUnlock(priv);
+
+ return rv;
+}
+
+
+static int
+remoteStreamRecv(virStreamPtr st,
+ char *data,
+ size_t nbytes)
+{
+ DEBUG("st=%p data=%p nbytes=%d", st, data, nbytes);
+ struct private_data *priv = st->conn->privateData;
+ struct private_stream_data *privst = st->privateData;
+ int rv = -1;
+
+ remoteDriverLock(priv);
+
+ if (remoteStreamHasError(st))
+ goto cleanup;
+
+ if (!privst->incomingOffset) {
+ struct remote_thread_call *thiscall;
+
+ if (VIR_ALLOC(thiscall) < 0) {
+ virReportOOMError(st->conn);
+ goto cleanup;
+ }
+
+ /* We're not really doing an RPC calls, so we're
+ * skipping straight to RX part */
+ thiscall->mode = REMOTE_MODE_WAIT_RX;
+ thiscall->serial = privst->serial;
+ thiscall->proc_nr = privst->proc_nr;
+ thiscall->want_reply = 1;
+
+ if (virCondInit(&thiscall->cond) < 0) {
+ VIR_FREE(thiscall);
+ error (st->conn, VIR_ERR_INTERNAL_ERROR,
+ _("cannot initialize mutex"));
+ goto cleanup;
+ }
+
+ /* remoteIO frees 'thiscall' for us (XXX that's dubious semantics)
*/
+ if (remoteIO(st->conn, priv, 0, thiscall) < 0)
+ goto cleanup;
+ }
+
+ DEBUG("After IO %d", privst->incomingOffset);
+ if (privst->incomingOffset) {
+ int want = privst->incomingOffset;
+ if (want > nbytes)
+ want = nbytes;
+ memcpy(data, privst->incoming, want);
+ if (want < privst->incomingOffset) {
+ memmove(privst->incoming, privst->incoming + want,
privst->incomingOffset - want);
+ privst->incomingOffset -= want;
+ } else {
+ VIR_FREE(privst->incoming);
+ privst->incomingOffset = privst->incomingLength = 0;
+ }
+ rv = want;
+ } else {
+ rv = 0;
+ }
+
+ DEBUG("Done %d", rv);
+
+cleanup:
+ if (rv == -1)
+ remoteStreamRelease(st);
+ remoteDriverUnlock(priv);
+
+ return rv;
+}
+
+static int
+remoteStreamEventAddCallback(virStreamPtr stream ATTRIBUTE_UNUSED,
+ int events ATTRIBUTE_UNUSED,
+ virStreamEventCallback cb ATTRIBUTE_UNUSED,
+ void *opaque ATTRIBUTE_UNUSED,
+ virFreeCallback ff ATTRIBUTE_UNUSED)
+{
+ return -1;
+}
+
+static int
+remoteStreamEventUpdateCallback(virStreamPtr stream ATTRIBUTE_UNUSED,
+ int events ATTRIBUTE_UNUSED)
+{
+ return -1;
+}
+
+
+static int
+remoteStreamEventRemoveCallback(virStreamPtr stream ATTRIBUTE_UNUSED)
+{
+ return -1;
+}
+
+static int
+remoteStreamFinish(virStreamPtr st)
+{
+ struct private_data *priv = st->conn->privateData;
+ int ret = -1;
+
+ remoteDriverLock(priv);
+
+ if (remoteStreamHasError(st))
+ goto cleanup;
+
+ ret = remoteStreamPacket(st,
+ REMOTE_OK,
+ NULL,
+ 0);
+
+cleanup:
+ remoteStreamRelease(st);
+
+ remoteDriverUnlock(priv);
+ return ret;
+}
+
+static int
+remoteStreamAbort(virStreamPtr st)
+{
+ struct private_data *priv = st->conn->privateData;
+ int ret = -1;
+
+ remoteDriverLock(priv);
+
+ if (remoteStreamHasError(st))
+ goto cleanup;
+
+ ret = remoteStreamPacket(st,
+ REMOTE_ERROR,
+ NULL,
+ 0);
+
+cleanup:
+ remoteStreamRelease(st);
+
+ remoteDriverUnlock(priv);
+ return ret;
+}
+
+
+
+static virStreamDriver remoteStreamDrv = {
+ .streamRecv = remoteStreamRecv,
+ .streamSend = remoteStreamSend,
+ .streamFinish = remoteStreamFinish,
+ .streamAbort = remoteStreamAbort,
+ .streamAddCallback = remoteStreamEventAddCallback,
+ .streamUpdateCallback = remoteStreamEventUpdateCallback,
+ .streamRemoveCallback = remoteStreamEventRemoveCallback,
+};
+#endif
+
+
/*----------------------------------------------------------------------*/
@@ -6350,6 +6733,7 @@ prepareCall(virConnectPtr conn,
rv->proc_nr = proc_nr;
rv->ret_filter = ret_filter;
rv->ret = ret;
+ rv->want_reply = 1;
hdr.prog = REMOTE_PROGRAM;
hdr.vers = REMOTE_PROTOCOL_VERSION;
@@ -6535,7 +6919,10 @@ remoteIOWriteMessage(virConnectPtr conn,
if (priv->saslEncodedOffset == priv->saslEncodedLength) {
priv->saslEncoded = NULL;
priv->saslEncodedOffset = priv->saslEncodedLength = 0;
- thecall->mode = REMOTE_MODE_WAIT_RX;
+ if (thecall->want_reply)
+ thecall->mode = REMOTE_MODE_WAIT_RX;
+ else
+ thecall->mode = REMOTE_MODE_COMPLETE;
}
} else {
#endif
@@ -6549,7 +6936,10 @@ remoteIOWriteMessage(virConnectPtr conn,
if (thecall->bufferOffset == thecall->bufferLength) {
thecall->bufferOffset = thecall->bufferLength = 0;
- thecall->mode = REMOTE_MODE_WAIT_RX;
+ if (thecall->want_reply)
+ thecall->mode = REMOTE_MODE_WAIT_RX;
+ else
+ thecall->mode = REMOTE_MODE_COMPLETE;
}
#if HAVE_SASL
}
@@ -6703,6 +7093,12 @@ processCallDispatchMessage(virConnectPtr conn, struct private_data
*priv,
remote_message_header *hdr,
XDR *xdr);
+static int
+processCallDispatchStream(virConnectPtr conn, struct private_data *priv,
+ int in_open,
+ remote_message_header *hdr,
+ XDR *xdr);
+
static int
processCallDispatch(virConnectPtr conn, struct private_data *priv,
@@ -6712,14 +7108,19 @@ processCallDispatch(virConnectPtr conn, struct private_data
*priv,
int len = priv->bufferLength - 4;
int rv = -1;
+ /* Length word has already been read */
+ priv->bufferOffset = 4;
+
/* Deserialise reply header. */
- xdrmem_create (&xdr, priv->buffer + 4, len, XDR_DECODE);
+ xdrmem_create (&xdr, priv->buffer + priv->bufferOffset, len, XDR_DECODE);
if (!xdr_remote_message_header (&xdr, &hdr)) {
error (in_open ? NULL : conn,
VIR_ERR_RPC, _("invalid header in reply"));
return -1;
}
+ priv->bufferOffset += xdr_getpos(&xdr);
+
/* Check program, version, etc. are what we expect. */
if (hdr.prog != REMOTE_PROGRAM) {
virRaiseError (in_open ? NULL : conn,
@@ -6738,6 +7139,7 @@ processCallDispatch(virConnectPtr conn, struct private_data *priv,
return -1;
}
+
switch (hdr.type) {
case REMOTE_REPLY: /* Normal RPC replies */
rv = processCallDispatchReply(conn, priv, in_open,
@@ -6749,6 +7151,11 @@ processCallDispatch(virConnectPtr conn, struct private_data *priv,
&hdr, &xdr);
break;
+ case REMOTE_STREAM: /* Stream protocol */
+ rv = processCallDispatchStream(conn, priv, in_open,
+ &hdr, &xdr);
+ break;
+
default:
virRaiseError (in_open ? NULL : conn,
NULL, NULL, VIR_FROM_REMOTE,
@@ -6811,6 +7218,7 @@ processCallDispatchReply(virConnectPtr conn, struct private_data
*priv,
return 0;
case REMOTE_ERROR:
+ VIR_WARN0("Method call error");
memset (&thecall->err, 0, sizeof thecall->err);
if (!xdr_remote_error (xdr, &thecall->err)) {
error (in_open ? NULL : conn,
@@ -6854,6 +7262,113 @@ processCallDispatchMessage(virConnectPtr conn, struct private_data
*priv,
return 0;
}
+static int
+processCallDispatchStream(virConnectPtr conn ATTRIBUTE_UNUSED,
+ struct private_data *priv,
+ int in_open ATTRIBUTE_UNUSED,
+ remote_message_header *hdr,
+ XDR *xdr) {
+ struct private_stream_data *privst;
+ struct remote_thread_call *thecall;
+
+ /* Try and find a matching stream */
+ privst = priv->streams;
+ while (privst &&
+ privst->serial != hdr->serial &&
+ privst->proc_nr != hdr->proc)
+ privst = privst->next;
+
+ if (!privst) {
+ VIR_WARN("No registered stream matching serial=%d, proc=%d",
+ hdr->serial, hdr->proc);
+ return -1;
+ }
+
+ /* See if there's also a (optional) call waiting for this reply */
+ thecall = priv->waitDispatch;
+ while (thecall &&
+ thecall->serial != hdr->serial)
+ thecall = thecall->next;
+
+
+ /* Status is either REMOTE_OK (meaning that what follows is a ret
+ * structure), or REMOTE_ERROR (and what follows is a remote_error
+ * structure).
+ */
+ switch (hdr->status) {
+ case REMOTE_CONTINUE: {
+ int avail = privst->incomingLength - privst->incomingOffset;
+ int need = priv->bufferLength - priv->bufferOffset;
+ VIR_WARN0("Got a stream data packet");
+
+ /* XXX flag stream as complete somwhere if need==0 */
+
+ if (need > avail) {
+ int extra = need - avail;
+ if (VIR_REALLOC_N(privst->incoming,
+ privst->incomingLength + extra) < 0) {
+ VIR_WARN0("Out of memory");
+ return -1;
+ }
+ privst->incomingLength += extra;
+ }
+
+ memcpy(privst->incoming + privst->incomingOffset,
+ priv->buffer + priv->bufferOffset,
+ priv->bufferLength - priv->bufferOffset);
+ privst->incomingOffset += (priv->bufferLength - priv->bufferOffset);
+
+ if (thecall && thecall->want_reply) {
+ VIR_WARN("Got sync data packet offset=%d",
privst->incomingOffset);
+ thecall->mode = REMOTE_MODE_COMPLETE;
+ } else {
+ VIR_WARN("Got aysnc data packet offset=%d",
privst->incomingOffset);
+ }
+ return 0;
+ }
+
+ case REMOTE_OK:
+ VIR_WARN0("Got a synchronous confirm");
+ if (!thecall) {
+ VIR_WARN0("Got unexpected stream finish confirmation");
+ return -1;
+ }
+ thecall->mode = REMOTE_MODE_COMPLETE;
+ return 0;
+
+ case REMOTE_ERROR:
+ if (thecall && thecall->want_reply) {
+ VIR_WARN0("Got a synchronous error");
+ /* Give the error straight to this call */
+ memset (&thecall->err, 0, sizeof thecall->err);
+ if (!xdr_remote_error (xdr, &thecall->err)) {
+ error (in_open ? NULL : conn,
+ VIR_ERR_RPC, _("unmarshalling remote_error"));
+ return -1;
+ }
+ thecall->mode = REMOTE_MODE_ERROR;
+ } else {
+ VIR_WARN0("Got a asynchronous error");
+ /* No call, so queue the error against the stream */
+ if (privst->has_error) {
+ VIR_WARN0("Got unexpected duplicate stream error");
+ return -1;
+ }
+ privst->has_error = 1;
+ memset (&privst->err, 0, sizeof privst->err);
+ if (!xdr_remote_error (xdr, &privst->err)) {
+ VIR_WARN0("Failed to unmarshall error");
+ return -1;
+ }
+ }
+ return 0;
+
+ default:
+ VIR_WARN("Stream with unexpected serial=%d, proc=%d, status=%d",
+ hdr->serial, hdr->proc, hdr->status);
+ return -1;
+ }
+}
static int
remoteIOHandleInput(virConnectPtr conn, struct private_data *priv,
@@ -6934,6 +7449,9 @@ remoteIOEventLoop(virConnectPtr conn,
tmp = tmp->next;
}
+ if (priv->streams)
+ fds[0].events |= POLLIN;
+
/* Release lock while poll'ing so other threads
* can stuff themselves on the queue */
remoteDriverUnlock(priv);
--
1.6.2.5