"Daniel P. Berrange" <berrange(a)redhat.com> wrote:
This patch re-writes the code for dispatching RPC calls in the
remote driver to allow use from multiple threads. Only one thread
is allowed to send/recv on the socket at a time though. If another
thread comes along it will put itself on a queue and go to sleep.
The first thread may actually get around to transmitting the 2nd
thread's request while it is waiting for its own reply. It may
even get the 2nd threads reply, if its own RPC call is being really
slow. So when a thread wakes up from sleeping, it has to check
whether its own RPC call has already been processed. Likewise when
a thread owning the socket finishes with its own wor, it may have
to pass the buck to another thread. The upshot of this, is that
we have mutliple RPC calls executing in parallel, and requests+reply
are no longer guarenteed to be FIFO on the wire if talking to a new
enough server.
This refactoring required use of a self-pipe/poll trick for sync
between threads, but fortunately gnulib now provides this on Windows
too, so there's no compatability problem there.
Quick summary: dense ;-)
though lots of moved code.
I haven't finished, but did find at least one problem, below.
diff --git a/src/remote_internal.c b/src/remote_internal.c
...
@@ -114,6 +164,11 @@ struct private_data {
virDomainEventQueuePtr domainEvents;
/* Timer for flushing domainEvents queue */
int eventFlushTimer;
+
+ /* List of threads currently doing dispatch */
+ int wakeupSend;
+ int wakeupRead;
How about appending "FD" to indicate these are file descriptors.
The names combined with the comment (which must apply to waitDispatch)
made me wonder what they represented. Only when I saw them used
in safewrite /saferead calls did I get it.
+ struct remote_thread_call *waitDispatch;
};
enum {
@@ -160,7 +215,6 @@ static void make_nonnull_storage_pool (r
static void make_nonnull_storage_pool (remote_nonnull_storage_pool *pool_dst,
virStoragePoolPtr vol_src);
static void make_nonnull_storage_vol (remote_nonnull_storage_vol *vol_dst,
virStorageVolPtr vol_src);
void remoteDomainEventFired(int watch, int fd, int event, void *data);
-static void remoteDomainProcessEvent(virConnectPtr conn, XDR *xdr);
static void remoteDomainQueueEvent(virConnectPtr conn, XDR *xdr);
void remoteDomainEventQueueFlush(int timer, void *opaque);
/*----------------------------------------------------------------------*/
@@ -274,6 +328,7 @@ doRemoteOpen (virConnectPtr conn,
virConnectAuthPtr auth ATTRIBUTE_UNUSED,
int flags)
{
+ int wakeup[2];
Add "fd" to this name, too?
Not as big a deal, since this is local and the
first use makes it obvious.
char *transport_str = NULL;
if (conn->uri) {
@@ -696,6 +751,21 @@ doRemoteOpen (virConnectPtr conn,
} /* switch (transport) */
+ if (virSetNonBlock(priv->sock) < 0) {
+ errorf (conn, VIR_ERR_SYSTEM_ERROR,
+ _("unable to make socket non-blocking %s"),
+ strerror(errno));
+ goto failed;
+ }
+
+ if (pipe(wakeup) < 0) {
+ errorf (conn, VIR_ERR_SYSTEM_ERROR,
+ _("unable to make pipe %s"),
+ strerror(errno));
+ goto failed;
+ }
+ priv->wakeupRead = wakeup[0];
+ priv->wakeupSend = wakeup[1];
/* Try and authenticate with server */
if (remoteAuthenticate(conn, priv, 1, auth, authtype) == -1)
@@ -768,6 +838,7 @@ doRemoteOpen (virConnectPtr conn,
DEBUG0("virEventAddTimeout failed: No addTimeoutImpl defined. "
"continuing without events.");
virEventRemoveHandle(priv->watch);
+ priv->watch = -1;
}
}
/* Successful. */
@@ -848,6 +919,7 @@ remoteOpen (virConnectPtr conn,
}
remoteDriverLock(priv);
priv->localUses = 1;
+ priv->watch = -1;
if (flags & VIR_CONNECT_RO)
rflags |= VIR_DRV_OPEN_REMOTE_RO;
@@ -1220,6 +1292,7 @@ doRemoteClose (virConnectPtr conn, struc
virEventRemoveTimeout(priv->eventFlushTimer);
/* Remove handle for remote events */
virEventRemoveHandle(priv->watch);
+ priv->watch = -1;
}
/* Close socket. */
@@ -5542,12 +5615,658 @@ done:
/*----------------------------------------------------------------------*/
-static int really_write (virConnectPtr conn, struct private_data *priv,
- int in_open, char *bytes, int len);
-static int really_read (virConnectPtr conn, struct private_data *priv,
- int in_open, char *bytes, int len);
-
-/* This function performs a remote procedure call to procedure PROC_NR.
+
+static struct remote_thread_call *
+prepareCall(virConnectPtr conn,
+ struct private_data *priv,
+ int flags,
+ int proc_nr,
+ xdrproc_t args_filter, char *args,
+ xdrproc_t ret_filter, char *ret)
+{
+ XDR xdr;
+ struct remote_message_header hdr;
+ struct remote_thread_call *rv;
+
+ if (VIR_ALLOC(rv) < 0)
+ return NULL;
+
+ if (virCondInit(&rv->cond) < 0) {
+ VIR_FREE(rv);
+ error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
+ VIR_ERR_INTERNAL_ERROR,
+ _("cannot initialize mutex"));
+ return NULL;
+ }
+
+ /* Get a unique serial number for this message. */
+ rv->serial = priv->counter++;
+ rv->proc_nr = proc_nr;
+ rv->ret_filter = ret_filter;
+ rv->ret = ret;
+
+ hdr.prog = REMOTE_PROGRAM;
+ hdr.vers = REMOTE_PROTOCOL_VERSION;
+ hdr.proc = proc_nr;
+ hdr.direction = REMOTE_CALL;
+ hdr.serial = rv->serial;
+ hdr.status = REMOTE_OK;
+
+ /* Serialise header followed by args. */
+ xdrmem_create (&xdr, rv->buffer+4, REMOTE_MESSAGE_MAX, XDR_ENCODE);
+ if (!xdr_remote_message_header (&xdr, &hdr)) {
+ error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
+ VIR_ERR_RPC, _("xdr_remote_message_header failed"));
+ goto error;
+ }
+
+ if (!(*args_filter) (&xdr, args)) {
+ error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, VIR_ERR_RPC,
+ _("marshalling args"));
+ goto error;
+ }
+
+ /* Get the length stored in buffer. */
+ rv->bufferLength = xdr_getpos (&xdr);
+ xdr_destroy (&xdr);
+
+ /* Length must include the length word itself (always encoded in
+ * 4 bytes as per RFC 4506).
+ */
+ rv->bufferLength += 4;
+
+ /* Encode the length word. */
+ xdrmem_create (&xdr, rv->buffer, 4, XDR_ENCODE);
+ if (!xdr_int (&xdr, (int *)&rv->bufferLength)) {
+ error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, VIR_ERR_RPC,
+ _("xdr_int (length word)"));
I haven't done enough xdr* work to know, and man pages
didn't provide an immediate answer:
Is there no need to call xdr_destroy on this error path?
I'd expect xdrmem_create to do any allocation, not xdr_int.
There are many like this.
+ goto error;
+ }
+ xdr_destroy (&xdr);
+
+ return rv;
+
+error:
+ VIR_FREE(ret);
+ return NULL;
The above should free rv, not ret:
VIR_FREE(rv);