
"Daniel P. Berrange" <berrange@redhat.com> wrote:
This patch re-writes the code for dispatching RPC calls in the remote driver to allow use from multiple threads. Only one thread is allowed to send/recv on the socket at a time though. If another thread comes along it will put itself on a queue and go to sleep. The first thread may actually get around to transmitting the 2nd thread's request while it is waiting for its own reply. It may even get the 2nd threads reply, if its own RPC call is being really slow. So when a thread wakes up from sleeping, it has to check whether its own RPC call has already been processed. Likewise when a thread owning the socket finishes with its own wor, it may have to pass the buck to another thread. The upshot of this, is that we have mutliple RPC calls executing in parallel, and requests+reply are no longer guarenteed to be FIFO on the wire if talking to a new enough server.
This refactoring required use of a self-pipe/poll trick for sync between threads, but fortunately gnulib now provides this on Windows too, so there's no compatability problem there.
Quick summary: dense ;-) though lots of moved code. I haven't finished, but did find at least one problem, below.
diff --git a/src/remote_internal.c b/src/remote_internal.c ... @@ -114,6 +164,11 @@ struct private_data { virDomainEventQueuePtr domainEvents; /* Timer for flushing domainEvents queue */ int eventFlushTimer; + + /* List of threads currently doing dispatch */ + int wakeupSend; + int wakeupRead;
How about appending "FD" to indicate these are file descriptors. The names combined with the comment (which must apply to waitDispatch) made me wonder what they represented. Only when I saw them used in safewrite /saferead calls did I get it.
+ struct remote_thread_call *waitDispatch; };
enum { @@ -160,7 +215,6 @@ static void make_nonnull_storage_pool (r static void make_nonnull_storage_pool (remote_nonnull_storage_pool *pool_dst, virStoragePoolPtr vol_src); static void make_nonnull_storage_vol (remote_nonnull_storage_vol *vol_dst, virStorageVolPtr vol_src); void remoteDomainEventFired(int watch, int fd, int event, void *data); -static void remoteDomainProcessEvent(virConnectPtr conn, XDR *xdr); static void remoteDomainQueueEvent(virConnectPtr conn, XDR *xdr); void remoteDomainEventQueueFlush(int timer, void *opaque); /*----------------------------------------------------------------------*/ @@ -274,6 +328,7 @@ doRemoteOpen (virConnectPtr conn, virConnectAuthPtr auth ATTRIBUTE_UNUSED, int flags) { + int wakeup[2];
Add "fd" to this name, too? Not as big a deal, since this is local and the first use makes it obvious.
char *transport_str = NULL;
if (conn->uri) { @@ -696,6 +751,21 @@ doRemoteOpen (virConnectPtr conn,
} /* switch (transport) */
+ if (virSetNonBlock(priv->sock) < 0) { + errorf (conn, VIR_ERR_SYSTEM_ERROR, + _("unable to make socket non-blocking %s"), + strerror(errno)); + goto failed; + } + + if (pipe(wakeup) < 0) { + errorf (conn, VIR_ERR_SYSTEM_ERROR, + _("unable to make pipe %s"), + strerror(errno)); + goto failed; + } + priv->wakeupRead = wakeup[0]; + priv->wakeupSend = wakeup[1];
/* Try and authenticate with server */ if (remoteAuthenticate(conn, priv, 1, auth, authtype) == -1) @@ -768,6 +838,7 @@ doRemoteOpen (virConnectPtr conn, DEBUG0("virEventAddTimeout failed: No addTimeoutImpl defined. " "continuing without events."); virEventRemoveHandle(priv->watch); + priv->watch = -1; } } /* Successful. */ @@ -848,6 +919,7 @@ remoteOpen (virConnectPtr conn, } remoteDriverLock(priv); priv->localUses = 1; + priv->watch = -1;
if (flags & VIR_CONNECT_RO) rflags |= VIR_DRV_OPEN_REMOTE_RO; @@ -1220,6 +1292,7 @@ doRemoteClose (virConnectPtr conn, struc virEventRemoveTimeout(priv->eventFlushTimer); /* Remove handle for remote events */ virEventRemoveHandle(priv->watch); + priv->watch = -1; }
/* Close socket. */ @@ -5542,12 +5615,658 @@ done:
/*----------------------------------------------------------------------*/
-static int really_write (virConnectPtr conn, struct private_data *priv, - int in_open, char *bytes, int len); -static int really_read (virConnectPtr conn, struct private_data *priv, - int in_open, char *bytes, int len); - -/* This function performs a remote procedure call to procedure PROC_NR. + +static struct remote_thread_call * +prepareCall(virConnectPtr conn, + struct private_data *priv, + int flags, + int proc_nr, + xdrproc_t args_filter, char *args, + xdrproc_t ret_filter, char *ret) +{ + XDR xdr; + struct remote_message_header hdr; + struct remote_thread_call *rv; + + if (VIR_ALLOC(rv) < 0) + return NULL; + + if (virCondInit(&rv->cond) < 0) { + VIR_FREE(rv); + error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, + VIR_ERR_INTERNAL_ERROR, + _("cannot initialize mutex")); + return NULL; + } + + /* Get a unique serial number for this message. */ + rv->serial = priv->counter++; + rv->proc_nr = proc_nr; + rv->ret_filter = ret_filter; + rv->ret = ret; + + hdr.prog = REMOTE_PROGRAM; + hdr.vers = REMOTE_PROTOCOL_VERSION; + hdr.proc = proc_nr; + hdr.direction = REMOTE_CALL; + hdr.serial = rv->serial; + hdr.status = REMOTE_OK; + + /* Serialise header followed by args. */ + xdrmem_create (&xdr, rv->buffer+4, REMOTE_MESSAGE_MAX, XDR_ENCODE); + if (!xdr_remote_message_header (&xdr, &hdr)) { + error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, + VIR_ERR_RPC, _("xdr_remote_message_header failed")); + goto error; + } + + if (!(*args_filter) (&xdr, args)) { + error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, VIR_ERR_RPC, + _("marshalling args")); + goto error; + } + + /* Get the length stored in buffer. */ + rv->bufferLength = xdr_getpos (&xdr); + xdr_destroy (&xdr); + + /* Length must include the length word itself (always encoded in + * 4 bytes as per RFC 4506). + */ + rv->bufferLength += 4; + + /* Encode the length word. */ + xdrmem_create (&xdr, rv->buffer, 4, XDR_ENCODE); + if (!xdr_int (&xdr, (int *)&rv->bufferLength)) { + error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, VIR_ERR_RPC, + _("xdr_int (length word)"));
I haven't done enough xdr* work to know, and man pages didn't provide an immediate answer: Is there no need to call xdr_destroy on this error path? I'd expect xdrmem_create to do any allocation, not xdr_int. There are many like this.
+ goto error; + } + xdr_destroy (&xdr); + + return rv; + +error: + VIR_FREE(ret); + return NULL;
The above should free rv, not ret: VIR_FREE(rv);