---
Notes:
Version 5:
- rebased on top of DanB's non-blocking patches; this is the only part that
required non-trivial rebase so I'm posting it for additional review
Version 4:
- no changes
Version 3:
- no changes
Version 2:
- no changes
src/rpc/virnetclient.c | 99 +++++++++++++++++++++++++++++++++++++++++++----
1 files changed, 90 insertions(+), 9 deletions(-)
diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c
index 025d270..b4b2fe7 100644
--- a/src/rpc/virnetclient.c
+++ b/src/rpc/virnetclient.c
@@ -101,9 +101,13 @@ struct _virNetClient {
size_t nstreams;
virNetClientStreamPtr *streams;
+
+ bool wantClose;
};
+void virNetClientRequestClose(virNetClientPtr client);
+
static void virNetClientLock(virNetClientPtr client)
{
virMutexLock(&client->lock);
@@ -409,12 +413,14 @@ void virNetClientFree(virNetClientPtr client)
}
-void virNetClientClose(virNetClientPtr client)
+static void
+virNetClientCloseLocked(virNetClientPtr client)
{
- if (!client)
+ VIR_DEBUG("client=%p, sock=%p", client, client->sock);
+
+ if (!client->sock)
return;
- virNetClientLock(client);
virNetSocketRemoveIOCallback(client->sock);
virNetSocketFree(client->sock);
client->sock = NULL;
@@ -424,6 +430,41 @@ void virNetClientClose(virNetClientPtr client)
virNetSASLSessionFree(client->sasl);
client->sasl = NULL;
#endif
+ client->wantClose = false;
+}
+
+void virNetClientClose(virNetClientPtr client)
+{
+ if (!client)
+ return;
+
+ virNetClientLock(client);
+ virNetClientCloseLocked(client);
+ virNetClientUnlock(client);
+}
+
+void
+virNetClientRequestClose(virNetClientPtr client)
+{
+ VIR_DEBUG("client=%p", client);
+
+ virNetClientLock(client);
+
+ /* If there is a thread polling for data on the socket, set wantClose flag
+ * and wake the thread up or just immediately close the socket when no-one
+ * is polling on it.
+ */
+ if (client->waitDispatch) {
+ char ignore = 1;
+ int len = sizeof(ignore);
+
+ client->wantClose = true;
+ if (safewrite(client->wakeupSendFD, &ignore, len) != len)
+ VIR_ERROR(_("failed to wake up polling thread"));
+ } else {
+ virNetClientCloseLocked(client);
+ }
+
virNetClientUnlock(client);
}
@@ -1096,6 +1137,26 @@ static bool
virNetClientIOEventLoopRemoveNonBlocking(virNetClientCallPtr call,
}
+static void
+virNetClientIOEventLoopRemoveAll(virNetClientPtr client,
+ virNetClientCallPtr thiscall)
+{
+ if (!client->waitDispatch)
+ return;
+
+ if (client->waitDispatch == thiscall) {
+ /* just pretend nothing was sent and the caller will free the call */
+ thiscall->sentSomeData = false;
+ } else {
+ virNetClientCallPtr call = client->waitDispatch;
+ virNetClientCallRemove(&client->waitDispatch, call);
+ ignore_value(virCondDestroy(&call->cond));
+ VIR_FREE(call->msg);
+ VIR_FREE(call);
+ }
+}
+
+
static void virNetClientIOEventLoopPassTheBuck(virNetClientPtr client,
virNetClientCallPtr thiscall)
{
VIR_DEBUG("Giving up the buck %p", thiscall);
@@ -1110,7 +1171,12 @@ static void virNetClientIOEventLoopPassTheBuck(virNetClientPtr
client, virNetCli
}
tmp = tmp->next;
}
+
VIR_DEBUG("No thread to pass the buck to");
+ if (client->wantClose) {
+ virNetClientCloseLocked(client);
+ virNetClientIOEventLoopRemoveAll(client, thiscall);
+ }
}
@@ -1141,11 +1207,12 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
sigset_t oldmask, blockedsigs;
int timeout = -1;
- /* If we have existing SASL decoded data we
- * don't want to sleep in the poll(), just
- * check if any other FDs are also ready
+ /* If we have existing SASL decoded data we don't want to sleep in
+ * the poll(), just check if any other FDs are also ready.
+ * If the connection is going to be closed, we don't want to sleep in
+ * poll() either.
*/
- if (virNetSocketHasCachedData(client->sock))
+ if (virNetSocketHasCachedData(client->sock) || client->wantClose)
timeout = 0;
/* If there are any non-blocking calls in the queue,
@@ -1208,6 +1275,11 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
fds[0].revents |= POLLIN;
}
+ /* If wantClose flag is set, pretend there was an error on the socket
+ */
+ if (client->wantClose)
+ fds[0].revents = POLLERR;
+
if (fds[1].revents) {
VIR_DEBUG("Woken up from poll by other thread");
if (saferead(client->wakeupReadFD, &ignore, sizeof(ignore)) !=
sizeof(ignore)) {
@@ -1441,7 +1513,8 @@ static int virNetClientIO(virNetClientPtr client,
virResetLastError();
rv = virNetClientIOEventLoop(client, thiscall);
- virNetClientIOUpdateCallback(client, true);
+ if (client->sock)
+ virNetClientIOUpdateCallback(client, true);
if (rv == 0 &&
virGetLastError())
@@ -1467,7 +1540,7 @@ void virNetClientIncomingEvent(virNetSocketPtr sock,
goto done;
/* This should be impossible, but it doesn't hurt to check */
- if (client->haveTheBuck)
+ if (client->haveTheBuck || client->wantClose)
goto done;
VIR_DEBUG("Event fired %p %d", sock, events);
@@ -1528,6 +1601,12 @@ static int virNetClientSendInternal(virNetClientPtr client,
virNetClientLock(client);
+ if (!client->sock || client->wantClose) {
+ virNetError(VIR_ERR_INTERNAL_ERROR, "%s",
+ _("client socket is closed"));
+ goto unlock;
+ }
+
if (virCondInit(&call->cond) < 0) {
virNetError(VIR_ERR_INTERNAL_ERROR, "%s",
_("cannot initialize condition variable"));
@@ -1554,6 +1633,8 @@ cleanup:
ignore_value(virCondDestroy(&call->cond));
VIR_FREE(call);
}
+
+unlock:
virNetClientUnlock(client);
return ret;
}
--
1.7.8.rc3