Currently if the virNetServer instance is created with max_workers==0 to
request a non-threaded dispatch process, we deadlock during dispatch
#0 0x00007fb845f6f42d in __lll_lock_wait () from /lib64/libpthread.so.0
#1 0x00007fb845f681d3 in pthread_mutex_lock () from /lib64/libpthread.so.0
#2 0x000055a6628bb305 in virMutexLock (m=<optimized out>) at util/virthread.c:89
#3 0x000055a6628a984b in virObjectLock (anyobj=<optimized out>) at
util/virobject.c:435
#4 0x000055a66286fcde in virNetServerClientIsAuthenticated
(client=client@entry=0x55a663a7b960)
at rpc/virnetserverclient.c:1565
#5 0x000055a66286cc17 in virNetServerProgramDispatchCall (msg=0x55a663a7bc50,
client=0x55a663a7b960,
server=0x55a663a77550, prog=0x55a663a78020) at rpc/virnetserverprogram.c:407
#6 virNetServerProgramDispatch (prog=prog@entry=0x55a663a78020,
server=server@entry=0x55a663a77550,
client=client@entry=0x55a663a7b960, msg=msg@entry=0x55a663a7bc50) at
rpc/virnetserverprogram.c:307
#7 0x000055a662871d56 in virNetServerProcessMsg (msg=0x55a663a7bc50,
prog=0x55a663a78020, client=0x55a663a7b960,
srv=0x55a663a77550) at rpc/virnetserver.c:148
#8 virNetServerDispatchNewMessage (client=0x55a663a7b960, msg=0x55a663a7bc50,
opaque=0x55a663a77550)
at rpc/virnetserver.c:227
#9 0x000055a66286e4c0 in virNetServerClientDispatchRead
(client=client@entry=0x55a663a7b960)
at rpc/virnetserverclient.c:1322
#10 0x000055a66286e813 in virNetServerClientDispatchEvent (sock=<optimized out>,
events=1, opaque=0x55a663a7b960)
at rpc/virnetserverclient.c:1507
#11 0x000055a662899be0 in virEventPollDispatchHandles (fds=0x55a663a7bdc0,
nfds=<optimized out>)
at util/vireventpoll.c:508
#12 virEventPollRunOnce () at util/vireventpoll.c:657
#13 0x000055a6628982f1 in virEventRunDefaultImpl () at util/virevent.c:327
#14 0x000055a6628716d5 in virNetDaemonRun (dmn=0x55a663a771b0) at
rpc/virnetdaemon.c:858
#15 0x000055a662864c1d in main (argc=<optimized out>, argv=0x7ffd105b4838) at
logging/log_daemon.c:1235
Signed-off-by: Daniel P. Berrangé <berrange(a)redhat.com>
---
src/rpc/virnetserverclient.c | 79 ++++++++++++++++++++++++++++++--------------
1 file changed, 55 insertions(+), 24 deletions(-)
diff --git a/src/rpc/virnetserverclient.c b/src/rpc/virnetserverclient.c
index ea0d5abdee..fb4775235d 100644
--- a/src/rpc/virnetserverclient.c
+++ b/src/rpc/virnetserverclient.c
@@ -143,7 +143,7 @@ VIR_ONCE_GLOBAL_INIT(virNetServerClient)
static void virNetServerClientDispatchEvent(virNetSocketPtr sock, int events, void
*opaque);
static void virNetServerClientUpdateEvent(virNetServerClientPtr client);
-static void virNetServerClientDispatchRead(virNetServerClientPtr client);
+static virNetMessagePtr virNetServerClientDispatchRead(virNetServerClientPtr client);
static int virNetServerClientSendMessageLocked(virNetServerClientPtr client,
virNetMessagePtr msg);
@@ -340,18 +340,41 @@ virNetServerClientCheckAccess(virNetServerClientPtr client)
}
#endif
+static void virNetServerClientDispatchMessage(virNetServerClientPtr client,
+ virNetMessagePtr msg)
+{
+ virObjectLock(client);
+ if (!client->dispatchFunc) {
+ virNetMessageFree(msg);
+ client->wantClose = true;
+ virObjectUnlock(client);
+ } else {
+ virObjectUnlock(client);
+ /* Accessing 'client' is safe, because virNetServerClientSetDispatcher
+ * only permits setting 'dispatchFunc' once, so if non-NULL, it will
+ * never change again
+ */
+ client->dispatchFunc(client, msg, client->dispatchOpaque);
+ }
+}
+
static void virNetServerClientSockTimerFunc(int timer,
void *opaque)
{
virNetServerClientPtr client = opaque;
+ virNetMessagePtr msg = NULL;
virObjectLock(client);
virEventUpdateTimeout(timer, -1);
/* Although client->rx != NULL when this timer is enabled, it might have
* changed since the client was unlocked in the meantime. */
if (client->rx)
- virNetServerClientDispatchRead(client);
+ msg = virNetServerClientDispatchRead(client);
virObjectUnlock(client);
+
+ if (msg) {
+ virNetServerClientDispatchMessage(client, msg);
+ }
}
@@ -950,8 +973,13 @@ void virNetServerClientSetDispatcher(virNetServerClientPtr client,
void *opaque)
{
virObjectLock(client);
- client->dispatchFunc = func;
- client->dispatchOpaque = opaque;
+ /* Only set dispatcher if not already set, to avoid race
+ * with dispatch code that runs without locks held
+ */
+ if (!client->dispatchFunc) {
+ client->dispatchFunc = func;
+ client->dispatchOpaque = opaque;
+ }
virObjectUnlock(client);
}
@@ -1196,26 +1224,32 @@ static ssize_t virNetServerClientRead(virNetServerClientPtr
client)
/*
- * Read data until we get a complete message to process
+ * Read data until we get a complete message to process.
+ * If a complete message is available, it will be returned
+ * from this method, for dispatch by the caller.
+ *
+ * Returns a complete message for dispatch, or NULL if none is
+ * yet available, or an error occurred. On error, the wantClose
+ * flag will be set.
*/
-static void virNetServerClientDispatchRead(virNetServerClientPtr client)
+static virNetMessagePtr virNetServerClientDispatchRead(virNetServerClientPtr client)
{
readmore:
if (client->rx->nfds == 0) {
if (virNetServerClientRead(client) < 0) {
client->wantClose = true;
- return; /* Error */
+ return NULL; /* Error */
}
}
if (client->rx->bufferOffset < client->rx->bufferLength)
- return; /* Still not read enough */
+ return NULL; /* Still not read enough */
/* Either done with length word header */
if (client->rx->bufferLength == VIR_NET_MESSAGE_LEN_MAX) {
if (virNetMessageDecodeLength(client->rx) < 0) {
client->wantClose = true;
- return;
+ return NULL;
}
virNetServerClientUpdateEvent(client);
@@ -1236,7 +1270,7 @@ static void virNetServerClientDispatchRead(virNetServerClientPtr
client)
virNetMessageQueueServe(&client->rx);
virNetMessageFree(msg);
client->wantClose = true;
- return;
+ return NULL;
}
/* Now figure out if we need to read more data to get some
@@ -1246,7 +1280,7 @@ static void virNetServerClientDispatchRead(virNetServerClientPtr
client)
virNetMessageQueueServe(&client->rx);
virNetMessageFree(msg);
client->wantClose = true;
- return; /* Error */
+ return NULL; /* Error */
}
/* Try getting the file descriptors (may fail if blocking) */
@@ -1256,7 +1290,7 @@ static void virNetServerClientDispatchRead(virNetServerClientPtr
client)
virNetMessageQueueServe(&client->rx);
virNetMessageFree(msg);
client->wantClose = true;
- return;
+ return NULL;
}
if (rv == 0) /* Blocking */
break;
@@ -1270,7 +1304,7 @@ static void virNetServerClientDispatchRead(virNetServerClientPtr
client)
* again next time we run this method
*/
client->rx->bufferOffset = client->rx->bufferLength;
- return;
+ return NULL;
}
}
@@ -1313,16 +1347,6 @@ static void virNetServerClientDispatchRead(virNetServerClientPtr
client)
}
}
- /* Send off to for normal dispatch to workers */
- if (msg) {
- if (!client->dispatchFunc) {
- virNetMessageFree(msg);
- client->wantClose = true;
- } else {
- client->dispatchFunc(client, msg, client->dispatchOpaque);
- }
- }
-
/* Possibly need to create another receive buffer */
if (client->nrequests < client->nrequests_max) {
if (!(client->rx = virNetMessageNew(true))) {
@@ -1338,6 +1362,8 @@ static void virNetServerClientDispatchRead(virNetServerClientPtr
client)
}
}
virNetServerClientUpdateEvent(client);
+
+ return msg;
}
}
@@ -1482,6 +1508,7 @@ static void
virNetServerClientDispatchEvent(virNetSocketPtr sock, int events, void *opaque)
{
virNetServerClientPtr client = opaque;
+ virNetMessagePtr msg = NULL;
virObjectLock(client);
@@ -1504,7 +1531,7 @@ virNetServerClientDispatchEvent(virNetSocketPtr sock, int events,
void *opaque)
virNetServerClientDispatchWrite(client);
if (events & VIR_EVENT_HANDLE_READABLE &&
client->rx)
- virNetServerClientDispatchRead(client);
+ msg = virNetServerClientDispatchRead(client);
#if WITH_GNUTLS
}
#endif
@@ -1517,6 +1544,10 @@ virNetServerClientDispatchEvent(virNetSocketPtr sock, int events,
void *opaque)
client->wantClose = true;
virObjectUnlock(client);
+
+ if (msg) {
+ virNetServerClientDispatchMessage(client, msg);
+ }
}
--
2.14.3