On Fri, Jan 16, 2009 at 12:11:16PM +0000, Daniel P. Berrange wrote:
> > @@ -114,6 +164,11 @@ struct private_data {
> > virDomainEventQueuePtr domainEvents;
> > /* Timer for flushing domainEvents queue */
> > int eventFlushTimer;
> > +
> > + /* List of threads currently doing dispatch */
> > + int wakeupSend;
> > + int wakeupRead;
>
> How about appending "FD" to indicate these are file descriptors.
> The names combined with the comment (which must apply to waitDispatch)
> made me wonder what they represented. Only when I saw them used
> in safewrite /saferead calls did I get it.
Yes, good idea - and its not really a list of threads either,
so the comment is a little misleading :-)
> > + /* Encode the length word. */
> > + xdrmem_create (&xdr, rv->buffer, 4, XDR_ENCODE);
> > + if (!xdr_int (&xdr, (int *)&rv->bufferLength)) {
> > + error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, VIR_ERR_RPC,
> > + _("xdr_int (length word)"));
>
> I haven't done enough xdr* work to know, and man pages
> didn't provide an immediate answer:
> Is there no need to call xdr_destroy on this error path?
> I'd expect xdrmem_create to do any allocation, not xdr_int.
> There are many like this.
Yes, the 'error:' codepath should be calling 'xdr_destroy(&xdr)'
to ensure free'ing of memory.
>
> > + goto error;
> > + }
> > + xdr_destroy (&xdr);
> > +
> > + return rv;
> > +
> > +error:
> > + VIR_FREE(ret);
> > + return NULL;
>
> The above should free rv, not ret:
>
> VIR_FREE(rv);
Here is an update with those suggested renames & bug fixes in it.
It also addresses the error reporting issue mentioned in
http://www.redhat.com/archives/libvir-list/2009-January/msg00428.html
That code should not have been using DEBUG() - it now correctly
raises a real error including the error string, not just errno.
There were two other bugs with missing error raising in the
path for sasl_encode/decode.
Everything upto this patch is committed, so this is diffed
against current CVS.
libvirt_private.syms | 1
remote_internal.c | 1539 ++++++++++++++++++++++++++++++++-------------------
util.c | 33 -
util.h | 2
4 files changed, 1002 insertions(+), 573 deletions(-)
Daniel
diff --git a/src/libvirt_private.syms b/src/libvirt_private.syms
--- a/src/libvirt_private.syms
+++ b/src/libvirt_private.syms
@@ -290,6 +290,7 @@ virEnumToString;
virEventAddHandle;
virEventRemoveHandle;
virExec;
+virSetNonBlock;
virFormatMacAddr;
virGetHostname;
virParseMacAddr;
diff --git a/src/remote_internal.c b/src/remote_internal.c
--- a/src/remote_internal.c
+++ b/src/remote_internal.c
@@ -68,6 +68,8 @@
#include <netdb.h>
+#include <poll.h>
+
/* AI_ADDRCONFIG is missing on some systems. */
#ifndef AI_ADDRCONFIG
# define AI_ADDRCONFIG 0
@@ -86,8 +88,44 @@
#include "util.h"
#include "event.h"
+#ifdef WIN32
+#define pipe(fds) _pipe(fds,4096, _O_BINARY)
+#endif
+
+
static int inside_daemon = 0;
+struct remote_thread_call;
+
+
+enum {
+ REMOTE_MODE_WAIT_TX,
+ REMOTE_MODE_WAIT_RX,
+ REMOTE_MODE_COMPLETE,
+ REMOTE_MODE_ERROR,
+};
+
+struct remote_thread_call {
+ int mode;
+
+ /* 4 byte length, followed by RPC message header+body */
+ char buffer[4 + REMOTE_MESSAGE_MAX];
+ unsigned int bufferLength;
+ unsigned int bufferOffset;
+
+ unsigned int serial;
+ unsigned int proc_nr;
+
+ virCond cond;
+
+ xdrproc_t ret_filter;
+ char *ret;
+
+ remote_error err;
+
+ struct remote_thread_call *next;
+};
+
struct private_data {
virMutex lock;
@@ -101,12 +139,24 @@ struct private_data {
int localUses; /* Ref count for private data */
char *hostname; /* Original hostname */
FILE *debugLog; /* Debug remote protocol */
+
#if HAVE_SASL
sasl_conn_t *saslconn; /* SASL context */
+
const char *saslDecoded;
unsigned int saslDecodedLength;
unsigned int saslDecodedOffset;
-#endif
+
+ const char *saslEncoded;
+ unsigned int saslEncodedLength;
+ unsigned int saslEncodedOffset;
+#endif
+
+ /* 4 byte length, followed by RPC message header+body */
+ char buffer[4 + REMOTE_MESSAGE_MAX];
+ unsigned int bufferLength;
+ unsigned int bufferOffset;
+
/* The list of domain event callbacks */
virDomainEventCallbackListPtr callbackList;
/* The queue of domain events generated
@@ -114,6 +164,13 @@ struct private_data {
virDomainEventQueuePtr domainEvents;
/* Timer for flushing domainEvents queue */
int eventFlushTimer;
+
+ /* Self-pipe to wakeup threads waiting in poll() */
+ int wakeupSendFD;
+ int wakeupReadFD;
+
+ /* List of threads currently waiting for dispatch */
+ struct remote_thread_call *waitDispatch;
};
enum {
@@ -160,7 +217,6 @@ static void make_nonnull_network (remote
static void make_nonnull_storage_pool (remote_nonnull_storage_pool *pool_dst,
virStoragePoolPtr vol_src);
static void make_nonnull_storage_vol (remote_nonnull_storage_vol *vol_dst,
virStorageVolPtr vol_src);
void remoteDomainEventFired(int watch, int fd, int event, void *data);
-static void remoteDomainProcessEvent(virConnectPtr conn, XDR *xdr);
static void remoteDomainQueueEvent(virConnectPtr conn, XDR *xdr);
void remoteDomainEventQueueFlush(int timer, void *opaque);
/*----------------------------------------------------------------------*/
@@ -274,6 +330,7 @@ doRemoteOpen (virConnectPtr conn,
virConnectAuthPtr auth ATTRIBUTE_UNUSED,
int flags)
{
+ int wakeupFD[2];
char *transport_str = NULL;
if (conn->uri) {
@@ -696,6 +753,21 @@ doRemoteOpen (virConnectPtr conn,
} /* switch (transport) */
+ if (virSetNonBlock(priv->sock) < 0) {
+ errorf (conn, VIR_ERR_SYSTEM_ERROR,
+ _("unable to make socket non-blocking %s"),
+ strerror(errno));
+ goto failed;
+ }
+
+ if (pipe(wakeupFD) < 0) {
+ errorf (conn, VIR_ERR_SYSTEM_ERROR,
+ _("unable to make pipe %s"),
+ strerror(errno));
+ goto failed;
+ }
+ priv->wakeupReadFD = wakeupFD[0];
+ priv->wakeupSendFD = wakeupFD[1];
/* Try and authenticate with server */
if (remoteAuthenticate(conn, priv, 1, auth, authtype) == -1)
@@ -768,6 +840,7 @@ doRemoteOpen (virConnectPtr conn,
DEBUG0("virEventAddTimeout failed: No addTimeoutImpl defined. "
"continuing without events.");
virEventRemoveHandle(priv->watch);
+ priv->watch = -1;
}
}
/* Successful. */
@@ -848,6 +921,7 @@ remoteOpen (virConnectPtr conn,
}
remoteDriverLock(priv);
priv->localUses = 1;
+ priv->watch = -1;
if (flags & VIR_CONNECT_RO)
rflags |= VIR_DRV_OPEN_REMOTE_RO;
@@ -1220,6 +1294,7 @@ doRemoteClose (virConnectPtr conn, struc
virEventRemoveTimeout(priv->eventFlushTimer);
/* Remove handle for remote events */
virEventRemoveHandle(priv->watch);
+ priv->watch = -1;
}
/* Close socket. */
@@ -5537,12 +5612,665 @@ done:
/*----------------------------------------------------------------------*/
-static int really_write (virConnectPtr conn, struct private_data *priv,
- int in_open, char *bytes, int len);
-static int really_read (virConnectPtr conn, struct private_data *priv,
- int in_open, char *bytes, int len);
-
-/* This function performs a remote procedure call to procedure PROC_NR.
+
+static struct remote_thread_call *
+prepareCall(virConnectPtr conn,
+ struct private_data *priv,
+ int flags,
+ int proc_nr,
+ xdrproc_t args_filter, char *args,
+ xdrproc_t ret_filter, char *ret)
+{
+ XDR xdr;
+ struct remote_message_header hdr;
+ struct remote_thread_call *rv;
+
+ if (VIR_ALLOC(rv) < 0)
+ return NULL;
+
+ if (virCondInit(&rv->cond) < 0) {
+ VIR_FREE(rv);
+ error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
+ VIR_ERR_INTERNAL_ERROR,
+ _("cannot initialize mutex"));
+ return NULL;
+ }
+
+ /* Get a unique serial number for this message. */
+ rv->serial = priv->counter++;
+ rv->proc_nr = proc_nr;
+ rv->ret_filter = ret_filter;
+ rv->ret = ret;
+
+ hdr.prog = REMOTE_PROGRAM;
+ hdr.vers = REMOTE_PROTOCOL_VERSION;
+ hdr.proc = proc_nr;
+ hdr.direction = REMOTE_CALL;
+ hdr.serial = rv->serial;
+ hdr.status = REMOTE_OK;
+
+ /* Serialise header followed by args. */
+ xdrmem_create (&xdr, rv->buffer+4, REMOTE_MESSAGE_MAX, XDR_ENCODE);
+ if (!xdr_remote_message_header (&xdr, &hdr)) {
+ error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
+ VIR_ERR_RPC, _("xdr_remote_message_header failed"));
+ goto error;
+ }
+
+ if (!(*args_filter) (&xdr, args)) {
+ error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, VIR_ERR_RPC,
+ _("marshalling args"));
+ goto error;
+ }
+
+ /* Get the length stored in buffer. */
+ rv->bufferLength = xdr_getpos (&xdr);
+ xdr_destroy (&xdr);
+
+ /* Length must include the length word itself (always encoded in
+ * 4 bytes as per RFC 4506).
+ */
+ rv->bufferLength += 4;
+
+ /* Encode the length word. */
+ xdrmem_create (&xdr, rv->buffer, 4, XDR_ENCODE);
+ if (!xdr_int (&xdr, (int *)&rv->bufferLength)) {
+ error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, VIR_ERR_RPC,
+ _("xdr_int (length word)"));
+ goto error;
+ }
+ xdr_destroy (&xdr);
+
+ return rv;
+
+error:
+ xdr_destroy (&xdr);
+ VIR_FREE(rv);
+ return NULL;
+}
+
+
+
+static int
+processCallWrite(virConnectPtr conn,
+ struct private_data *priv,
+ int in_open /* if we are in virConnectOpen */,
+ const char *bytes, int len)
+{
+ int ret;
+
+ if (priv->uses_tls) {
+ tls_resend:
+ ret = gnutls_record_send (priv->session, bytes, len);
+ if (ret < 0) {
+ if (ret == GNUTLS_E_INTERRUPTED)
+ goto tls_resend;
+ if (ret == GNUTLS_E_AGAIN)
+ return 0;
+
+ error (in_open ? NULL : conn,
+ VIR_ERR_GNUTLS_ERROR, gnutls_strerror (ret));
+ return -1;
+ }
+ } else {
+ resend:
+ ret = send (priv->sock, bytes, len, 0);
+ if (ret == -1) {
+ if (errno == EINTR)
+ goto resend;
+ if (errno == EWOULDBLOCK)
+ return 0;
+
+ error (in_open ? NULL : conn,
+ VIR_ERR_SYSTEM_ERROR, strerror (errno));
+ return -1;
+
+ }
+ }
+
+ return ret;
+}
+
+
+static int
+processCallRead(virConnectPtr conn,
+ struct private_data *priv,
+ int in_open /* if we are in virConnectOpen */,
+ char *bytes, int len)
+{
+ int ret;
+
+ if (priv->uses_tls) {
+ tls_resend:
+ ret = gnutls_record_recv (priv->session, bytes, len);
+ if (ret == GNUTLS_E_INTERRUPTED)
+ goto tls_resend;
+ if (ret == GNUTLS_E_AGAIN)
+ return 0;
+
+ /* Treat 0 == EOF as an error */
+ if (ret <= 0) {
+ if (ret < 0)
+ errorf (in_open ? NULL : conn,
+ VIR_ERR_GNUTLS_ERROR,
+ _("failed to read from TLS socket %s"),
+ gnutls_strerror (ret));
+ else
+ errorf (in_open ? NULL : conn,
+ VIR_ERR_SYSTEM_ERROR,
+ "%s", _("server closed connection"));
+ return -1;
+ }
+ } else {
+ resend:
+ ret = recv (priv->sock, bytes, len, 0);
+ if (ret <= 0) {
+ if (ret == -1) {
+ if (errno == EINTR)
+ goto resend;
+ if (errno == EWOULDBLOCK)
+ return 0;
+
+ errorf (in_open ? NULL : conn,
+ VIR_ERR_SYSTEM_ERROR,
+ _("failed to read from socket %s"),
+ strerror (errno));
+ } else {
+ errorf (in_open ? NULL : conn,
+ VIR_ERR_SYSTEM_ERROR,
+ "%s", _("server closed connection"));
+ }
+ return -1;
+ }
+ }
+
+ return ret;
+}
+
+
+static int
+processCallSendOne(virConnectPtr conn,
+ struct private_data *priv,
+ int in_open,
+ struct remote_thread_call *thecall)
+{
+#if HAVE_SASL
+ if (priv->saslconn) {
+ const char *output;
+ unsigned int outputlen;
+ int err, ret;
+
+ if (!priv->saslEncoded) {
+ err = sasl_encode(priv->saslconn,
+ thecall->buffer + thecall->bufferOffset,
+ thecall->bufferLength - thecall->bufferOffset,
+ &output, &outputlen);
+ if (err != SASL_OK) {
+ errorf (in_open ? NULL : conn, VIR_ERR_INTERNAL_ERROR,
+ _("failed to encode SASL data: %s"),
+ sasl_errstring(err, NULL, NULL));
+ return -1;
+ }
+ priv->saslEncoded = output;
+ priv->saslEncodedLength = outputlen;
+ priv->saslEncodedOffset = 0;
+
+ thecall->bufferOffset = thecall->bufferLength;
+ }
+
+ ret = processCallWrite(conn, priv, in_open,
+ priv->saslEncoded + priv->saslEncodedOffset,
+ priv->saslEncodedLength - priv->saslEncodedOffset);
+ if (ret < 0)
+ return ret;
+ priv->saslEncodedOffset += ret;
+
+ if (priv->saslEncodedOffset == priv->saslEncodedLength) {
+ priv->saslEncoded = NULL;
+ priv->saslEncodedOffset = priv->saslEncodedLength = 0;
+ thecall->mode = REMOTE_MODE_WAIT_RX;
+ }
+ } else {
+#endif
+ int ret;
+ ret = processCallWrite(conn, priv, in_open,
+ thecall->buffer + thecall->bufferOffset,
+ thecall->bufferLength - thecall->bufferOffset);
+ if (ret < 0)
+ return ret;
+ thecall->bufferOffset += ret;
+
+ if (thecall->bufferOffset == thecall->bufferLength) {
+ thecall->bufferOffset = thecall->bufferLength = 0;
+ thecall->mode = REMOTE_MODE_WAIT_RX;
+ }
+#if HAVE_SASL
+ }
+#endif
+ return 0;
+}
+
+
+static int
+processCallSend(virConnectPtr conn, struct private_data *priv,
+ int in_open) {
+ struct remote_thread_call *thecall = priv->waitDispatch;
+
+ while (thecall &&
+ thecall->mode != REMOTE_MODE_WAIT_TX)
+ thecall = thecall->next;
+
+ if (!thecall)
+ return -1; /* Shouldn't happen, but you never know... */
+
+ while (thecall) {
+ int ret = processCallSendOne(conn, priv, in_open, thecall);
+ if (ret < 0)
+ return ret;
+
+ if (thecall->mode == REMOTE_MODE_WAIT_TX)
+ return 0; /* Blocking write, to back to event loop */
+
+ thecall = thecall->next;
+ }
+
+ return 0; /* No more calls to send, all done */
+}
+
+static int
+processCallRecvSome(virConnectPtr conn, struct private_data *priv,
+ int in_open) {
+ unsigned int wantData;
+
+ /* Start by reading length word */
+ if (priv->bufferLength == 0)
+ priv->bufferLength = 4;
+
+ wantData = priv->bufferLength - priv->bufferOffset;
+
+#if HAVE_SASL
+ if (priv->saslconn) {
+ if (priv->saslDecoded == NULL) {
+ char encoded[8192];
+ unsigned int encodedLen = sizeof(encoded);
+ int ret, err;
+ ret = processCallRead(conn, priv, in_open,
+ encoded, encodedLen);
+ if (ret < 0)
+ return -1;
+ if (ret == 0)
+ return 0;
+
+ err = sasl_decode(priv->saslconn, encoded, ret,
+ &priv->saslDecoded,
&priv->saslDecodedLength);
+ if (err != SASL_OK) {
+ errorf (in_open ? NULL : conn, VIR_ERR_INTERNAL_ERROR,
+ _("failed to decode SASL data: %s"),
+ sasl_errstring(err, NULL, NULL));
+ return -1;
+ }
+ priv->saslDecodedOffset = 0;
+ }
+
+ if ((priv->saslDecodedLength - priv->saslDecodedOffset) < wantData)
+ wantData = (priv->saslDecodedLength - priv->saslDecodedOffset);
+
+ memcpy(priv->buffer + priv->bufferOffset,
+ priv->saslDecoded + priv->saslDecodedOffset,
+ wantData);
+ priv->saslDecodedOffset += wantData;
+ priv->bufferOffset += wantData;
+ if (priv->saslDecodedOffset == priv->saslDecodedLength) {
+ priv->saslDecodedLength = priv->saslDecodedLength = 0;
+ priv->saslDecoded = NULL;
+ }
+
+ return wantData;
+ } else {
+#endif
+ int ret;
+
+ ret = processCallRead(conn, priv, in_open,
+ priv->buffer + priv->bufferOffset,
+ wantData);
+ if (ret < 0)
+ return -1;
+ if (ret == 0)
+ return 0;
+
+ priv->bufferOffset += ret;
+
+ return ret;
+#if HAVE_SASL
+ }
+#endif
+}
+
+
+static void
+processCallAsyncEvent(virConnectPtr conn, struct private_data *priv,
+ int in_open,
+ remote_message_header *hdr,
+ XDR *xdr) {
+ /* An async message has come in while we were waiting for the
+ * response. Process it to pull it off the wire, and try again
+ */
+ DEBUG0("Encountered an event while waiting for a response");
+
+ if (in_open) {
+ DEBUG("Ignoring bogus event %d received while in open", hdr->proc);
+ return;
+ }
+
+ if (hdr->proc == REMOTE_PROC_DOMAIN_EVENT) {
+ remoteDomainQueueEvent(conn, xdr);
+ virEventUpdateTimeout(priv->eventFlushTimer, 0);
+ } else {
+ DEBUG("Unexpected event proc %d", hdr->proc);
+ }
+}
+
+static int
+processCallRecvLen(virConnectPtr conn, struct private_data *priv,
+ int in_open) {
+ XDR xdr;
+ int len;
+
+ xdrmem_create (&xdr, priv->buffer, priv->bufferLength, XDR_DECODE);
+ if (!xdr_int (&xdr, &len)) {
+ error (in_open ? NULL : conn,
+ VIR_ERR_RPC, _("xdr_int (length word, reply)"));
+ return -1;
+ }
+ xdr_destroy (&xdr);
+
+ /* Length includes length word - adjust to real length to read. */
+ len -= 4;
+
+ if (len < 0 || len > REMOTE_MESSAGE_MAX) {
+ error (in_open ? NULL : conn,
+ VIR_ERR_RPC, _("packet received from server too large"));
+ return -1;
+ }
+
+ /* Extend our declared buffer length and carry
+ on reading the header + payload */
+ priv->bufferLength += len;
+ DEBUG("Got length, now need %d total (%d more)", priv->bufferLength,
len);
+ return 0;
+}
+
+
+static int
+processCallRecvMsg(virConnectPtr conn, struct private_data *priv,
+ int in_open) {
+ XDR xdr;
+ struct remote_message_header hdr;
+ int len = priv->bufferLength - 4;
+ struct remote_thread_call *thecall;
+
+ /* Deserialise reply header. */
+ xdrmem_create (&xdr, priv->buffer + 4, len, XDR_DECODE);
+ if (!xdr_remote_message_header (&xdr, &hdr)) {
+ error (in_open ? NULL : conn,
+ VIR_ERR_RPC, _("invalid header in reply"));
+ return -1;
+ }
+
+ /* Check program, version, etc. are what we expect. */
+ if (hdr.prog != REMOTE_PROGRAM) {
+ virRaiseError (in_open ? NULL : conn,
+ NULL, NULL, VIR_FROM_REMOTE,
+ VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0,
+ _("unknown program (received %x, expected %x)"),
+ hdr.prog, REMOTE_PROGRAM);
+ return -1;
+ }
+ if (hdr.vers != REMOTE_PROTOCOL_VERSION) {
+ virRaiseError (in_open ? NULL : conn,
+ NULL, NULL, VIR_FROM_REMOTE,
+ VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0,
+ _("unknown protocol version (received %x, expected
%x)"),
+ hdr.vers, REMOTE_PROTOCOL_VERSION);
+ return -1;
+ }
+
+ /* Async events from server need special handling */
+ if (hdr.direction == REMOTE_MESSAGE) {
+ processCallAsyncEvent(conn, priv, in_open,
+ &hdr, &xdr);
+ xdr_destroy(&xdr);
+ return 0;
+ }
+
+ if (hdr.direction != REMOTE_REPLY) {
+ virRaiseError (in_open ? NULL : conn,
+ NULL, NULL, VIR_FROM_REMOTE,
+ VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0,
+ _("got unexpected RPC call %d from server"),
+ hdr.proc);
+ xdr_destroy(&xdr);
+ return -1;
+ }
+
+ /* Ok, definitely got an RPC reply now find
+ out who's been waiting for it */
+
+ thecall = priv->waitDispatch;
+ while (thecall &&
+ thecall->serial != hdr.serial)
+ thecall = thecall->next;
+
+ if (!thecall) {
+ virRaiseError (in_open ? NULL : conn,
+ NULL, NULL, VIR_FROM_REMOTE,
+ VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0,
+ _("no call waiting for reply with serial %d"),
+ hdr.serial);
+ xdr_destroy(&xdr);
+ return -1;
+ }
+
+ if (hdr.proc != thecall->proc_nr) {
+ virRaiseError (in_open ? NULL : conn,
+ NULL, NULL, VIR_FROM_REMOTE,
+ VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0,
+ _("unknown procedure (received %x, expected %x)"),
+ hdr.proc, thecall->proc_nr);
+ xdr_destroy (&xdr);
+ return -1;
+ }
+
+ /* Status is either REMOTE_OK (meaning that what follows is a ret
+ * structure), or REMOTE_ERROR (and what follows is a remote_error
+ * structure).
+ */
+ switch (hdr.status) {
+ case REMOTE_OK:
+ if (!(*thecall->ret_filter) (&xdr, thecall->ret)) {
+ error (in_open ? NULL : conn, VIR_ERR_RPC,
+ _("unmarshalling ret"));
+ return -1;
+ }
+ thecall->mode = REMOTE_MODE_COMPLETE;
+ xdr_destroy (&xdr);
+ return 0;
+
+ case REMOTE_ERROR:
+ memset (&thecall->err, 0, sizeof thecall->err);
+ if (!xdr_remote_error (&xdr, &thecall->err)) {
+ error (in_open ? NULL : conn,
+ VIR_ERR_RPC, _("unmarshalling remote_error"));
+ return -1;
+ }
+ xdr_destroy (&xdr);
+ thecall->mode = REMOTE_MODE_ERROR;
+ return 0;
+
+ default:
+ virRaiseError (in_open ? NULL : conn, NULL, NULL, VIR_FROM_REMOTE,
+ VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0,
+ _("unknown status (received %x)"),
+ hdr.status);
+ xdr_destroy (&xdr);
+ return -1;
+ }
+}
+
+
+static int
+processCallRecv(virConnectPtr conn, struct private_data *priv,
+ int in_open) {
+ int ret;
+
+ /* Read as much data as is available, until we get
+ * EGAIN
+ */
+ for (;;) {
+ ret = processCallRecvSome(conn, priv, in_open);
+
+ if (ret < 0)
+ return -1;
+ if (ret == 0)
+ return 0; /* Blocking on read */
+
+ /* Check for completion of our goal */
+ if (priv->bufferOffset == priv->bufferLength) {
+ if (priv->bufferOffset == 4) {
+ ret = processCallRecvLen(conn, priv, in_open);
+ } else {
+ ret = processCallRecvMsg(conn, priv, in_open);
+ priv->bufferOffset = priv->bufferLength = 0;
+ }
+ if (ret < 0)
+ return -1;
+ }
+ }
+}
+
+/*
+ * Process all calls pending dispatch/receive until we
+ * get a reply to our own call. Then quit and pass the buck
+ * to someone else.
+ */
+static int
+processCalls(virConnectPtr conn,
+ struct private_data *priv,
+ int in_open,
+ struct remote_thread_call *thiscall)
+{
+ struct pollfd fds[2];
+ int ret;
+
+ fds[0].fd = priv->sock;
+ fds[1].fd = priv->wakeupReadFD;
+
+ for (;;) {
+ struct remote_thread_call *tmp = priv->waitDispatch;
+ struct remote_thread_call *prev;
+ char ignore;
+
+ fds[0].events = fds[0].revents = 0;
+ fds[1].events = fds[1].revents = 0;
+
+ fds[1].events = POLLIN;
+ while (tmp) {
+ if (tmp->mode == REMOTE_MODE_WAIT_RX)
+ fds[0].events |= POLLIN;
+ if (tmp->mode == REMOTE_MODE_WAIT_TX)
+ fds[0].events |= POLLOUT;
+
+ tmp = tmp->next;
+ }
+
+ /* Release lock while poll'ing so other threads
+ * can stuff themselves on the queue */
+ remoteDriverUnlock(priv);
+
+ repoll:
+ ret = poll(fds, ARRAY_CARDINALITY(fds), -1);
+ if (ret < 0 && errno == EINTR)
+ goto repoll;
+ remoteDriverLock(priv);
+
+ if (fds[1].revents) {
+ DEBUG0("Woken up from poll by other thread");
+ saferead(priv->wakeupReadFD, &ignore, sizeof(ignore));
+ }
+
+ if (ret < 0) {
+ if (errno == EWOULDBLOCK)
+ continue;
+ errorf (in_open ? NULL : conn, VIR_ERR_INTERNAL_ERROR,
+ _("poll on socket failed %s"), strerror(errno));
+ return -1;
+ }
+
+ if (fds[0].revents & POLLOUT) {
+ if (processCallSend(conn, priv, in_open) < 0)
+ return -1;
+ }
+
+ if (fds[0].revents & POLLIN) {
+ if (processCallRecv(conn, priv, in_open) < 0)
+ return -1;
+ }
+
+ /* Iterate through waiting threads and if
+ * any are complete then tell 'em to wakeup
+ */
+ tmp = priv->waitDispatch;
+ prev = NULL;
+ while (tmp) {
+ if (tmp != thiscall &&
+ (tmp->mode == REMOTE_MODE_COMPLETE ||
+ tmp->mode == REMOTE_MODE_ERROR)) {
+ /* Take them out of the list */
+ if (prev)
+ prev->next = tmp->next;
+ else
+ priv->waitDispatch = tmp->next;
+
+ /* And wake them up....
+ * ...they won't actually wakeup until
+ * we release our mutex a short while
+ * later...
+ */
+ DEBUG("Waking up sleep %d %p %p", tmp->proc_nr, tmp,
priv->waitDispatch);
+ virCondSignal(&tmp->cond);
+ }
+ prev = tmp;
+ tmp = tmp->next;
+ }
+
+ /* Now see if *we* are done */
+ if (thiscall->mode == REMOTE_MODE_COMPLETE ||
+ thiscall->mode == REMOTE_MODE_ERROR) {
+ /* We're at head of the list already, so
+ * remove us
+ */
+ priv->waitDispatch = thiscall->next;
+ DEBUG("Giving up the buck %d %p %p", thiscall->proc_nr,
thiscall, priv->waitDispatch);
+ /* See if someone else is still waiting
+ * and if so, then pass the buck ! */
+ if (priv->waitDispatch) {
+ DEBUG("Passing the buck to %d %p",
priv->waitDispatch->proc_nr, priv->waitDispatch);
+ virCondSignal(&priv->waitDispatch->cond);
+ }
+ return 0;
+ }
+
+
+ if (fds[0].revents & (POLLHUP | POLLERR)) {
+ errorf(in_open ? NULL : conn, VIR_ERR_INTERNAL_ERROR,
+ "%s", _("received hangup / error event on
socket"));
+ return -1;
+ }
+ }
+}
+
+/*
+ * This function performs a remote procedure call to procedure PROC_NR.
*
* NB. This does not free the args structure (not desirable, since you
* often want this allocated on the stack or else it contains strings
@@ -5551,204 +6279,29 @@ static int really_read (virConnectPtr co
*
* NB(2). Make sure to memset (&ret, 0, sizeof ret) before calling,
* else Bad Things will happen in the XDR code.
- */
-static int
-doCall (virConnectPtr conn, struct private_data *priv,
- int flags /* if we are in virConnectOpen */,
- int proc_nr,
- xdrproc_t args_filter, char *args,
- xdrproc_t ret_filter, char *ret)
-{
- char buffer[REMOTE_MESSAGE_MAX];
- char buffer2[4];
- struct remote_message_header hdr;
- XDR xdr;
- int len;
- struct remote_error rerror;
-
- /* Get a unique serial number for this message. */
- int serial = priv->counter++;
-
- hdr.prog = REMOTE_PROGRAM;
- hdr.vers = REMOTE_PROTOCOL_VERSION;
- hdr.proc = proc_nr;
- hdr.direction = REMOTE_CALL;
- hdr.serial = serial;
- hdr.status = REMOTE_OK;
-
- /* Serialise header followed by args. */
- xdrmem_create (&xdr, buffer, sizeof buffer, XDR_ENCODE);
- if (!xdr_remote_message_header (&xdr, &hdr)) {
- error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
- VIR_ERR_RPC, _("xdr_remote_message_header failed"));
- return -1;
- }
-
- if (!(*args_filter) (&xdr, args)) {
- error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, VIR_ERR_RPC,
- _("marshalling args"));
- return -1;
- }
-
- /* Get the length stored in buffer. */
- len = xdr_getpos (&xdr);
- xdr_destroy (&xdr);
-
- /* Length must include the length word itself (always encoded in
- * 4 bytes as per RFC 4506).
- */
- len += 4;
-
- /* Encode the length word. */
- xdrmem_create (&xdr, buffer2, sizeof buffer2, XDR_ENCODE);
- if (!xdr_int (&xdr, &len)) {
- error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, VIR_ERR_RPC,
- _("xdr_int (length word)"));
- return -1;
- }
- xdr_destroy (&xdr);
-
- /* Send length word followed by header+args. */
- if (really_write (conn, priv, flags & REMOTE_CALL_IN_OPEN, buffer2, sizeof
buffer2) == -1 ||
- really_write (conn, priv, flags & REMOTE_CALL_IN_OPEN, buffer, len-4) == -1)
- return -1;
-
-retry_read:
- /* Read and deserialise length word. */
- if (really_read (conn, priv, flags & REMOTE_CALL_IN_OPEN, buffer2, sizeof
buffer2) == -1)
- return -1;
-
- xdrmem_create (&xdr, buffer2, sizeof buffer2, XDR_DECODE);
- if (!xdr_int (&xdr, &len)) {
- error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
- VIR_ERR_RPC, _("xdr_int (length word, reply)"));
- return -1;
- }
- xdr_destroy (&xdr);
-
- /* Length includes length word - adjust to real length to read. */
- len -= 4;
-
- if (len < 0 || len > REMOTE_MESSAGE_MAX) {
- error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
- VIR_ERR_RPC, _("packet received from server too large"));
- return -1;
- }
-
- /* Read reply header and what follows (either a ret or an error). */
- if (really_read (conn, priv, flags & REMOTE_CALL_IN_OPEN, buffer, len) == -1)
- return -1;
-
- /* Deserialise reply header. */
- xdrmem_create (&xdr, buffer, len, XDR_DECODE);
- if (!xdr_remote_message_header (&xdr, &hdr)) {
- error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
- VIR_ERR_RPC, _("invalid header in reply"));
- return -1;
- }
-
- /* Check program, version, etc. are what we expect. */
- if (hdr.prog != REMOTE_PROGRAM) {
- virRaiseError (flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
- NULL, NULL, VIR_FROM_REMOTE,
- VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0,
- _("unknown program (received %x, expected %x)"),
- hdr.prog, REMOTE_PROGRAM);
- return -1;
- }
- if (hdr.vers != REMOTE_PROTOCOL_VERSION) {
- virRaiseError (flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
- NULL, NULL, VIR_FROM_REMOTE,
- VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0,
- _("unknown protocol version (received %x, expected
%x)"),
- hdr.vers, REMOTE_PROTOCOL_VERSION);
- return -1;
- }
-
- if (hdr.proc == REMOTE_PROC_DOMAIN_EVENT &&
- hdr.direction == REMOTE_MESSAGE) {
- /* An async message has come in while we were waiting for the
- * response. Process it to pull it off the wire, and try again
- */
- DEBUG0("Encountered an event while waiting for a response");
-
- remoteDomainQueueEvent(conn, &xdr);
- virEventUpdateTimeout(priv->eventFlushTimer, 0);
-
- DEBUG0("Retrying read");
- xdr_destroy (&xdr);
- goto retry_read;
- }
- if (hdr.proc != proc_nr) {
- virRaiseError (flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
- NULL, NULL, VIR_FROM_REMOTE,
- VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0,
- _("unknown procedure (received %x, expected %x)"),
- hdr.proc, proc_nr);
- return -1;
- }
- if (hdr.direction != REMOTE_REPLY) {
- virRaiseError (flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
- NULL, NULL, VIR_FROM_REMOTE,
- VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0,
- _("unknown direction (received %x, expected %x)"),
- hdr.direction, REMOTE_REPLY);
- return -1;
- }
- if (hdr.serial != serial) {
- virRaiseError (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, NULL, NULL,
VIR_FROM_REMOTE,
- VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0,
- _("unknown serial (received %x, expected %x)"),
- hdr.serial, serial);
- return -1;
- }
-
- /* Status is either REMOTE_OK (meaning that what follows is a ret
- * structure), or REMOTE_ERROR (and what follows is a remote_error
- * structure).
- */
- switch (hdr.status) {
- case REMOTE_OK:
- if (!(*ret_filter) (&xdr, ret)) {
- error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, VIR_ERR_RPC,
- _("unmarshalling ret"));
- return -1;
- }
- xdr_destroy (&xdr);
- return 0;
-
- case REMOTE_ERROR:
- memset (&rerror, 0, sizeof rerror);
- if (!xdr_remote_error (&xdr, &rerror)) {
- error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
- VIR_ERR_RPC, _("unmarshalling remote_error"));
- return -1;
- }
- xdr_destroy (&xdr);
- /* See if caller asked us to keep quiet about missing RPCs
- * eg for interop with older servers */
- if (flags & REMOTE_CALL_QUIET_MISSING_RPC &&
- rerror.domain == VIR_FROM_REMOTE &&
- rerror.code == VIR_ERR_RPC &&
- rerror.level == VIR_ERR_ERROR &&
- STRPREFIX(*rerror.message, "unknown procedure")) {
- return -2;
- }
- server_error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, &rerror);
- xdr_free ((xdrproc_t) xdr_remote_error, (char *) &rerror);
- return -1;
-
- default:
- virRaiseError (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, NULL, NULL,
VIR_FROM_REMOTE,
- VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0,
- _("unknown status (received %x)"),
- hdr.status);
- xdr_destroy (&xdr);
- return -1;
- }
-}
-
-
+ *
+ * NB(3) You must have the private_data lock before calling this
+ *
+ * NB(4) This is very complicated. Due to connection cloning, multiple
+ * threads can want to use the socket at once. Obviously only one of
+ * them can. So if someone's using the socket, other threads are put
+ * to sleep on condition variables. THe existing thread may completely
+ * send & receive their RPC call/reply while they're asleep. Or it
+ * may only get around to dealing with sending the call. Or it may
+ * get around to neither. So upon waking up from slumber, the other
+ * thread may or may not have more work todo.
+ *
+ * We call this dance 'passing the buck'
+ *
+ *
http://en.wikipedia.org/wiki/Passing_the_buck
+ *
+ * "Buck passing or passing the buck is the action of transferring
+ * responsibility or blame unto another person. It is also used as
+ * a strategy in power politics when the actions of one country/
+ * nation are blamed on another, providing an opportunity for war."
+ *
+ * NB(5) Don't Panic!
+ */
static int
call (virConnectPtr conn, struct private_data *priv,
int flags /* if we are in virConnectOpen */,
@@ -5757,6 +6310,87 @@ call (virConnectPtr conn, struct private
xdrproc_t ret_filter, char *ret)
{
int rv;
+ struct remote_thread_call *thiscall;
+
+ DEBUG("Doing call %d %p", proc_nr, priv->waitDispatch);
+ thiscall = prepareCall(conn, priv, flags, proc_nr,
+ args_filter, args,
+ ret_filter, ret);
+
+ if (!thiscall) {
+ error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
+ VIR_ERR_NO_MEMORY, NULL);
+ return -1;
+ }
+
+ /* Check to see if another thread is dispatching */
+ if (priv->waitDispatch) {
+ /* Stick ourselves on the end of the wait queue */
+ struct remote_thread_call *tmp = priv->waitDispatch;
+ char ignore = 1;
+ while (tmp && tmp->next)
+ tmp = tmp->next;
+ if (tmp)
+ tmp->next = thiscall;
+ else
+ priv->waitDispatch = thiscall;
+
+ /* Force other thread to wakup from poll */
+ safewrite(priv->wakeupSendFD, &ignore, sizeof(ignore));
+
+ DEBUG("Going to sleep %d %p %p", proc_nr, priv->waitDispatch,
thiscall);
+ /* Go to sleep while other thread is working... */
+ if (virCondWait(&thiscall->cond, &priv->lock) < 0) {
+ if (priv->waitDispatch == thiscall) {
+ priv->waitDispatch = thiscall->next;
+ } else {
+ tmp = priv->waitDispatch;
+ while (tmp && tmp->next &&
+ tmp->next != thiscall) {
+ tmp = tmp->next;
+ }
+ if (tmp && tmp->next == thiscall)
+ tmp->next = thiscall->next;
+ }
+ errorf(flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
+ VIR_ERR_INTERNAL_ERROR, "%s",
+ _("failed to wait on condition"));
+ VIR_FREE(thiscall);
+ return -1;
+ }
+
+ DEBUG("Wokeup from sleep %d %p %p", proc_nr, priv->waitDispatch,
thiscall);
+ /* Two reasons we can be woken up
+ * 1. Other thread has got our reply ready for us
+ * 2. Other thread is all done, and it is our turn to
+ * be the dispatcher to finish waiting for
+ * our reply
+ */
+ if (thiscall->mode == REMOTE_MODE_COMPLETE ||
+ thiscall->mode == REMOTE_MODE_ERROR) {
+ /*
+ * We avoided catching the buck and our reply is ready !
+ * We've already had 'thiscall' removed from the list
+ * so just need to (maybe) handle errors & free it
+ */
+ goto cleanup;
+ }
+
+ /* Grr, someone passed the buck onto us ... */
+
+ } else {
+ /* We're first to catch the buck */
+ priv->waitDispatch = thiscall;
+ }
+
+ DEBUG("We have the buck %d %p %p", proc_nr, priv->waitDispatch,
thiscall);
+ /*
+ * The buck stops here!
+ *
+ * At this point we're about to own the dispatch
+ * process...
+ */
+
/*
* Avoid needless wake-ups of the event loop in the
* case where this call is being made from a different
@@ -5767,207 +6401,146 @@ call (virConnectPtr conn, struct private
if (priv->watch >= 0)
virEventUpdateHandle(priv->watch, 0);
- rv = doCall(conn, priv,flags, proc_nr,
- args_filter, args,
- ret_filter, ret);
+ rv = processCalls(conn, priv,
+ flags & REMOTE_CALL_IN_OPEN ? 1 : 0,
+ thiscall);
if (priv->watch >= 0)
virEventUpdateHandle(priv->watch, VIR_EVENT_HANDLE_READABLE);
- return rv;
-}
-
-static int
-really_write_buf (virConnectPtr conn, struct private_data *priv,
- int in_open /* if we are in virConnectOpen */,
- const char *bytes, int len)
-{
- const char *p;
- int err;
-
- p = bytes;
- if (priv->uses_tls) {
- do {
- err = gnutls_record_send (priv->session, p, len);
- if (err < 0) {
- if (err == GNUTLS_E_INTERRUPTED || err == GNUTLS_E_AGAIN)
- continue;
- error (in_open ? NULL : conn,
- VIR_ERR_GNUTLS_ERROR, gnutls_strerror (err));
- return -1;
- }
- len -= err;
- p += err;
- }
- while (len > 0);
- } else {
- do {
- err = send (priv->sock, p, len, 0);
- if (err == -1) {
- if (errno == EINTR || errno == EAGAIN)
- continue;
- error (in_open ? NULL : conn,
- VIR_ERR_SYSTEM_ERROR, strerror (errno));
- return -1;
- }
- len -= err;
- p += err;
- }
- while (len > 0);
- }
-
- return 0;
-}
-
-static int
-really_write_plain (virConnectPtr conn, struct private_data *priv,
- int in_open /* if we are in virConnectOpen */,
- char *bytes, int len)
-{
- return really_write_buf(conn, priv, in_open, bytes, len);
-}
-
-#if HAVE_SASL
-static int
-really_write_sasl (virConnectPtr conn, struct private_data *priv,
- int in_open /* if we are in virConnectOpen */,
- char *bytes, int len)
-{
- const char *output;
- unsigned int outputlen;
- int err;
-
- err = sasl_encode(priv->saslconn, bytes, len, &output, &outputlen);
- if (err != SASL_OK) {
- return -1;
- }
-
- return really_write_buf(conn, priv, in_open, output, outputlen);
-}
-#endif
-
-static int
-really_write (virConnectPtr conn, struct private_data *priv,
- int in_open /* if we are in virConnectOpen */,
- char *bytes, int len)
-{
-#if HAVE_SASL
- if (priv->saslconn)
- return really_write_sasl(conn, priv, in_open, bytes, len);
- else
-#endif
- return really_write_plain(conn, priv, in_open, bytes, len);
-}
-
-static int
-really_read_buf (virConnectPtr conn, struct private_data *priv,
- int in_open /* if we are in virConnectOpen */,
- char *bytes, int len)
-{
- int err;
-
- if (priv->uses_tls) {
- tlsreread:
- err = gnutls_record_recv (priv->session, bytes, len);
- if (err < 0) {
- if (err == GNUTLS_E_INTERRUPTED)
- goto tlsreread;
- error (in_open ? NULL : conn,
- VIR_ERR_GNUTLS_ERROR, gnutls_strerror (err));
- return -1;
- }
- if (err == 0) {
- error (in_open ? NULL : conn,
- VIR_ERR_RPC, _("socket closed unexpectedly"));
- return -1;
- }
- } else {
- reread:
- err = recv (priv->sock, bytes, len, 0);
- if (err == -1) {
- if (errno == EINTR)
- goto reread;
- error (in_open ? NULL : conn,
- VIR_ERR_SYSTEM_ERROR, strerror (errno));
- return -1;
- }
- if (err == 0) {
- error (in_open ? NULL : conn,
- VIR_ERR_RPC, _("socket closed unexpectedly"));
- return -1;
- }
- }
-
- return err;
-}
-
-static int
-really_read_plain (virConnectPtr conn, struct private_data *priv,
- int in_open /* if we are in virConnectOpen */,
- char *bytes, int len)
-{
- do {
- int ret = really_read_buf (conn, priv, in_open, bytes, len);
- if (ret < 0)
- return -1;
-
- len -= ret;
- bytes += ret;
- } while (len > 0);
-
- return 0;
-}
-
-#if HAVE_SASL
-static int
-really_read_sasl (virConnectPtr conn, struct private_data *priv,
- int in_open /* if we are in virConnectOpen */,
- char *bytes, int len)
-{
- do {
- int want, got;
- if (priv->saslDecoded == NULL) {
- char encoded[8192];
- int encodedLen = sizeof(encoded);
- int err, ret;
- ret = really_read_buf (conn, priv, in_open, encoded, encodedLen);
- if (ret < 0)
- return -1;
-
- err = sasl_decode(priv->saslconn, encoded, ret,
- &priv->saslDecoded,
&priv->saslDecodedLength);
- }
-
- got = priv->saslDecodedLength - priv->saslDecodedOffset;
- want = len;
- if (want > got)
- want = got;
-
- memcpy(bytes, priv->saslDecoded + priv->saslDecodedOffset, want);
- priv->saslDecodedOffset += want;
- if (priv->saslDecodedOffset == priv->saslDecodedLength) {
- priv->saslDecoded = NULL;
- priv->saslDecodedOffset = priv->saslDecodedLength = 0;
- }
- bytes += want;
- len -= want;
- } while (len > 0);
-
- return 0;
-}
-#endif
-
-static int
-really_read (virConnectPtr conn, struct private_data *priv,
- int in_open /* if we are in virConnectOpen */,
- char *bytes, int len)
-{
-#if HAVE_SASL
- if (priv->saslconn)
- return really_read_sasl (conn, priv, in_open, bytes, len);
- else
-#endif
- return really_read_plain (conn, priv, in_open, bytes, len);
-}
+
+ if (rv < 0) {
+ VIR_FREE(thiscall);
+ return -1;
+ }
+
+cleanup:
+ DEBUG("All done with our call %d %p %p", proc_nr, priv->waitDispatch,
thiscall);
+ if (thiscall->mode == REMOTE_MODE_ERROR) {
+ /* See if caller asked us to keep quiet about missing RPCs
+ * eg for interop with older servers */
+ if (flags & REMOTE_CALL_QUIET_MISSING_RPC &&
+ thiscall->err.domain == VIR_FROM_REMOTE &&
+ thiscall->err.code == VIR_ERR_RPC &&
+ thiscall->err.level == VIR_ERR_ERROR &&
+ STRPREFIX(*thiscall->err.message, "unknown procedure")) {
+ rv = -2;
+ } else {
+ server_error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
+ &thiscall->err);
+ rv = -1;
+ }
+ } else {
+ rv = 0;
+ }
+ VIR_FREE(thiscall);
+ return rv;
+}
+
+/**
+ * remoteDomainReadEvent
+ *
+ * Read the event data off the wire
+ */
+static virDomainEventPtr
+remoteDomainReadEvent(virConnectPtr conn, XDR *xdr)
+{
+ remote_domain_event_ret ret;
+ virDomainPtr dom;
+ virDomainEventPtr event = NULL;
+ memset (&ret, 0, sizeof ret);
+
+ /* unmarshall parameters, and process it*/
+ if (! xdr_remote_domain_event_ret(xdr, &ret) ) {
+ error (conn, VIR_ERR_RPC,
+ _("remoteDomainProcessEvent: unmarshalling ret"));
+ return NULL;
+ }
+
+ dom = get_nonnull_domain(conn,ret.dom);
+ if (!dom)
+ return NULL;
+
+ event = virDomainEventNewFromDom(dom, ret.event, ret.detail);
+
+ virDomainFree(dom);
+ return event;
+}
+
+static void
+remoteDomainQueueEvent(virConnectPtr conn, XDR *xdr)
+{
+ struct private_data *priv = conn->privateData;
+ virDomainEventPtr event;
+
+ event = remoteDomainReadEvent(conn, xdr);
+ if (!event)
+ return;
+
+ if (virDomainEventQueuePush(priv->domainEvents,
+ event) < 0) {
+ DEBUG0("Error adding event to queue");
+ virDomainEventFree(event);
+ }
+}
+
+/** remoteDomainEventFired:
+ *
+ * The callback for monitoring the remote socket
+ * for event data
+ */
+void
+remoteDomainEventFired(int watch,
+ int fd,
+ int event,
+ void *opaque)
+{
+ virConnectPtr conn = opaque;
+ struct private_data *priv = conn->privateData;
+
+ remoteDriverLock(priv);
+
+ /* This should be impossible, but it doesn't hurt to check */
+ if (priv->waitDispatch)
+ goto done;
+
+ DEBUG("Event fired %d %d %d %X", watch, fd, event, event);
+
+ if (event & (VIR_EVENT_HANDLE_HANGUP | VIR_EVENT_HANDLE_ERROR)) {
+ DEBUG("%s : VIR_EVENT_HANDLE_HANGUP or "
+ "VIR_EVENT_HANDLE_ERROR encountered", __FUNCTION__);
+ virEventRemoveHandle(watch);
+ priv->watch = -1;
+ goto done;
+ }
+
+ if (fd != priv->sock) {
+ virEventRemoveHandle(watch);
+ priv->watch = -1;
+ goto done;
+ }
+
+ if (processCallRecv(conn, priv, 0) < 0)
+ DEBUG0("Something went wrong during async message processing");
+
+done:
+ remoteDriverUnlock(priv);
+}
+
+void
+remoteDomainEventQueueFlush(int timer ATTRIBUTE_UNUSED, void *opaque)
+{
+ virConnectPtr conn = opaque;
+ struct private_data *priv = conn->privateData;
+
+ remoteDriverLock(priv);
+
+ virDomainEventQueueDispatch(priv->domainEvents, priv->callbackList,
+ virDomainEventDispatchDefaultFunc, NULL);
+ virEventUpdateTimeout(priv->eventFlushTimer, -1);
+
+ remoteDriverUnlock(priv);
+}
+
/* For errors internal to this library. */
static void
@@ -6267,161 +6840,3 @@ remoteRegister (void)
return 0;
}
-/**
- * remoteDomainReadEvent
- *
- * Read the event data off the wire
- */
-static virDomainEventPtr
-remoteDomainReadEvent(virConnectPtr conn, XDR *xdr)
-{
- remote_domain_event_ret ret;
- virDomainPtr dom;
- virDomainEventPtr event = NULL;
- memset (&ret, 0, sizeof ret);
-
- /* unmarshall parameters, and process it*/
- if (! xdr_remote_domain_event_ret(xdr, &ret) ) {
- error (conn, VIR_ERR_RPC,
- _("remoteDomainProcessEvent: unmarshalling ret"));
- return NULL;
- }
-
- dom = get_nonnull_domain(conn,ret.dom);
- if (!dom)
- return NULL;
-
- event = virDomainEventNewFromDom(dom, ret.event, ret.detail);
-
- virDomainFree(dom);
- return event;
-}
-
-static void
-remoteDomainProcessEvent(virConnectPtr conn, XDR *xdr)
-{
- struct private_data *priv = conn->privateData;
- virDomainEventPtr event;
-
- event = remoteDomainReadEvent(conn, xdr);
- if (!event)
- return;
-
- DEBUG0("Calling domain event callbacks (no queue)");
- virDomainEventDispatch(event, priv->callbackList,
- virDomainEventDispatchDefaultFunc, NULL);
- virDomainEventFree(event);
-}
-
-static void
-remoteDomainQueueEvent(virConnectPtr conn, XDR *xdr)
-{
- struct private_data *priv = conn->privateData;
- virDomainEventPtr event;
-
- event = remoteDomainReadEvent(conn, xdr);
- if (!event)
- return;
-
- if (virDomainEventQueuePush(priv->domainEvents,
- event) < 0) {
- DEBUG0("Error adding event to queue");
- virDomainEventFree(event);
- }
-}
-
-/** remoteDomainEventFired:
- *
- * The callback for monitoring the remote socket
- * for event data
- */
-void
-remoteDomainEventFired(int watch,
- int fd,
- int event,
- void *opaque)
-{
- char buffer[REMOTE_MESSAGE_MAX];
- char buffer2[4];
- struct remote_message_header hdr;
- XDR xdr;
- int len;
-
- virConnectPtr conn = opaque;
- struct private_data *priv = conn->privateData;
-
- remoteDriverLock(priv);
-
- DEBUG("Event fired %d %d %d %X", watch, fd, event, event);
-
- if (event & (VIR_EVENT_HANDLE_HANGUP | VIR_EVENT_HANDLE_ERROR)) {
- DEBUG("%s : VIR_EVENT_HANDLE_HANGUP or "
- "VIR_EVENT_HANDLE_ERROR encountered", __FUNCTION__);
- virEventRemoveHandle(watch);
- goto done;
- }
-
- if (fd != priv->sock) {
- virEventRemoveHandle(watch);
- goto done;
- }
-
- /* Read and deserialise length word. */
- if (really_read (conn, priv, 0, buffer2, sizeof buffer2) == -1)
- goto done;
-
- xdrmem_create (&xdr, buffer2, sizeof buffer2, XDR_DECODE);
- if (!xdr_int (&xdr, &len)) {
- error (conn, VIR_ERR_RPC, _("xdr_int (length word, reply)"));
- goto done;
- }
- xdr_destroy (&xdr);
-
- /* Length includes length word - adjust to real length to read. */
- len -= 4;
-
- if (len < 0 || len > REMOTE_MESSAGE_MAX) {
- error (conn, VIR_ERR_RPC, _("packet received from server too large"));
- goto done;
- }
-
- /* Read reply header and what follows (either a ret or an error). */
- if (really_read (conn, priv, 0, buffer, len) == -1) {
- error (conn, VIR_ERR_RPC, _("error reading buffer from memory"));
- goto done;
- }
-
- /* Deserialise reply header. */
- xdrmem_create (&xdr, buffer, len, XDR_DECODE);
- if (!xdr_remote_message_header (&xdr, &hdr)) {
- error (conn, VIR_ERR_RPC, _("invalid header in event firing"));
- goto done;
- }
-
- if (hdr.proc == REMOTE_PROC_DOMAIN_EVENT &&
- hdr.direction == REMOTE_MESSAGE) {
- DEBUG0("Encountered an async event");
- remoteDomainProcessEvent(conn, &xdr);
- } else {
- DEBUG0("invalid proc in event firing");
- error (conn, VIR_ERR_RPC, _("invalid proc in event firing"));
- }
-
-done:
- remoteDriverUnlock(priv);
-}
-
-void
-remoteDomainEventQueueFlush(int timer ATTRIBUTE_UNUSED, void *opaque)
-{
- virConnectPtr conn = opaque;
- struct private_data *priv = conn->privateData;
-
- remoteDriverLock(priv);
-
- virDomainEventQueueDispatch(priv->domainEvents, priv->callbackList,
- virDomainEventDispatchDefaultFunc, NULL);
- virEventUpdateTimeout(priv->eventFlushTimer, -1);
-
- remoteDriverUnlock(priv);
-}
diff --git a/src/util.c b/src/util.c
--- a/src/util.c
+++ b/src/util.c
@@ -34,6 +34,7 @@
#include <poll.h>
#include <sys/types.h>
#include <sys/stat.h>
+#include <sys/ioctl.h>
#if HAVE_SYS_WAIT_H
#include <sys/wait.h>
#endif
@@ -155,8 +156,28 @@ virArgvToString(const char *const *argv)
return ret;
}
+int virSetNonBlock(int fd) {
+#ifndef WIN32
+ int flags;
+ if ((flags = fcntl(fd, F_GETFL)) < 0)
+ return -1;
+ flags |= O_NONBLOCK;
+ if ((fcntl(fd, F_SETFL, flags)) < 0)
+ return -1;
+#else
+ unsigned long flag = 1;
-#ifndef __MINGW32__
+ /* This is actually Gnulib's replacement rpl_ioctl function.
+ * We can't call ioctlsocket directly in any case.
+ */
+ if (ioctl (fd, FIONBIO, (void *) &flag) == -1)
+ return -1;
+#endif
+ return 0;
+}
+
+
+#ifndef WIN32
static int virSetCloseExec(int fd) {
int flags;
@@ -168,16 +189,6 @@ static int virSetCloseExec(int fd) {
return 0;
}
-static int virSetNonBlock(int fd) {
- int flags;
- if ((flags = fcntl(fd, F_GETFL)) < 0)
- return -1;
- flags |= O_NONBLOCK;
- if ((fcntl(fd, F_SETFL, flags)) < 0)
- return -1;
- return 0;
-}
-
static int
__virExec(virConnectPtr conn,
const char *const*argv,
diff --git a/src/util.h b/src/util.h
--- a/src/util.h
+++ b/src/util.h
@@ -38,6 +38,8 @@ enum {
VIR_EXEC_DAEMON = (1 << 1),
};
+int virSetNonBlock(int fd);
+
int virExec(virConnectPtr conn,
const char *const*argv,
const char *const*envp,
--
|: Red Hat, Engineering, London -o-
http://people.redhat.com/berrange/ :|
|:
http://libvirt.org -o-
http://virt-manager.org -o-
http://ovirt.org :|
|:
http://autobuild.org -o-
http://search.cpan.org/~danberr/ :|
|: GnuPG: 7D3B9505 -o- F3C9 553F A1DA 4AC2 5648 23C1 B3DF F742 7D3B 9505 :|