When a libvirt API is called from the main event loop (which seems to be
common in event-based glib apps), the client IO loop would properly
handle keepalive requests sent by a server but will not actually send
them because the main event loop is blocked with the API. This patch
gets rid of response timer and the thread which is processing keepalive
requests is also responsible for queueing responses for delivery.
---
src/rpc/virkeepalive.c | 155 +++++++++++-------------------------------
src/rpc/virkeepalive.h | 3 +-
src/rpc/virnetclient.c | 35 +++++++++-
src/rpc/virnetserverclient.c | 88 ++++++++++++------------
4 files changed, 120 insertions(+), 161 deletions(-)
diff --git a/src/rpc/virkeepalive.c b/src/rpc/virkeepalive.c
index 035ac74..7984ada 100644
--- a/src/rpc/virkeepalive.c
+++ b/src/rpc/virkeepalive.c
@@ -48,9 +48,6 @@ struct _virKeepAlive {
time_t intervalStart;
int timer;
- virNetMessagePtr response;
- int responseTimer;
-
virKeepAliveSendFunc sendCB;
virKeepAliveDeadFunc deadCB;
virKeepAliveFreeFunc freeCB;
@@ -72,12 +69,25 @@ virKeepAliveUnlock(virKeepAlivePtr ka)
static virNetMessagePtr
-virKeepAliveMessage(int proc)
+virKeepAliveMessage(virKeepAlivePtr ka, int proc)
{
virNetMessagePtr msg;
+ const char *procstr = NULL;
- if (!(msg = virNetMessageNew(false)))
+ switch (proc) {
+ case KEEPALIVE_PROC_PING:
+ procstr = "request";
+ break;
+ case KEEPALIVE_PROC_PONG:
+ procstr = "response";
+ break;
+ default:
+ VIR_WARN("Refusing to send unknown keepalive message: %d", proc);
return NULL;
+ }
+
+ if (!(msg = virNetMessageNew(false)))
+ goto error;
msg->header.prog = KEEPALIVE_PROGRAM;
msg->header.vers = KEEPALIVE_PROTOCOL_VERSION;
@@ -87,69 +97,20 @@ virKeepAliveMessage(int proc)
if (virNetMessageEncodeHeader(msg) < 0 ||
virNetMessageEncodePayloadEmpty(msg) < 0) {
virNetMessageFree(msg);
- return NULL;
- }
-
- return msg;
-}
-
-
-static void
-virKeepAliveSend(virKeepAlivePtr ka, virNetMessagePtr msg)
-{
- const char *proc = NULL;
- void *client = ka->client;
- virKeepAliveSendFunc sendCB = ka->sendCB;
-
- switch (msg->header.proc) {
- case KEEPALIVE_PROC_PING:
- proc = "request";
- break;
- case KEEPALIVE_PROC_PONG:
- proc = "response";
- break;
- }
-
- if (!proc) {
- VIR_WARN("Refusing to send unknown keepalive message: %d",
- msg->header.proc);
- virNetMessageFree(msg);
- return;
+ goto error;
}
- VIR_DEBUG("Sending keepalive %s to client %p", proc, ka->client);
+ VIR_DEBUG("Sending keepalive %s to client %p", procstr, ka->client);
PROBE(RPC_KEEPALIVE_SEND,
"ka=%p client=%p prog=%d vers=%d proc=%d",
ka, ka->client, msg->header.prog, msg->header.vers,
msg->header.proc);
- ka->refs++;
- virKeepAliveUnlock(ka);
-
- if (sendCB(client, msg) < 0) {
- VIR_WARN("Failed to send keepalive %s to client %p", proc, client);
- virNetMessageFree(msg);
- }
-
- virKeepAliveLock(ka);
- ka->refs--;
-}
-
-
-static void
-virKeepAliveScheduleResponse(virKeepAlivePtr ka)
-{
- if (ka->responseTimer == -1)
- return;
-
- VIR_DEBUG("Scheduling keepalive response to client %p", ka->client);
-
- if (!ka->response &&
- !(ka->response = virKeepAliveMessage(KEEPALIVE_PROC_PONG))) {
- VIR_WARN("Failed to generate keepalive response");
- return;
- }
+ return msg;
- virEventUpdateTimeout(ka->responseTimer, 0);
+error:
+ VIR_WARN("Failed to generate keepalive %s", procstr);
+ VIR_FREE(msg);
+ return NULL;
}
@@ -184,7 +145,7 @@ virKeepAliveTimerInternal(virKeepAlivePtr ka,
} else {
ka->countToDeath--;
ka->intervalStart = now;
- *msg = virKeepAliveMessage(KEEPALIVE_PROC_PING);
+ *msg = virKeepAliveMessage(ka, KEEPALIVE_PROC_PING);
virEventUpdateTimeout(ka->timer, ka->interval * 1000);
return false;
}
@@ -197,47 +158,30 @@ virKeepAliveTimer(int timer ATTRIBUTE_UNUSED, void *opaque)
virKeepAlivePtr ka = opaque;
virNetMessagePtr msg = NULL;
bool dead;
+ void *client;
virKeepAliveLock(ka);
+ client = ka->client;
dead = virKeepAliveTimerInternal(ka, &msg);
- if (dead) {
- virKeepAliveDeadFunc deadCB = ka->deadCB;
- void *client = ka->client;
-
- ka->refs++;
- virKeepAliveUnlock(ka);
- deadCB(client);
- virKeepAliveLock(ka);
- ka->refs--;
- } else if (msg) {
- virKeepAliveSend(ka, msg);
- }
+ if (!dead && !msg)
+ goto cleanup;
+ ka->refs++;
virKeepAliveUnlock(ka);
-}
-
-
-static void
-virKeepAliveResponseTimer(int timer ATTRIBUTE_UNUSED, void *opaque)
-{
- virKeepAlivePtr ka = opaque;
- virNetMessagePtr msg;
-
- virKeepAliveLock(ka);
- VIR_DEBUG("ka=%p, client=%p, response=%p",
- ka, ka->client, ka->response);
-
- if (ka->response) {
- msg = ka->response;
- ka->response = NULL;
- virKeepAliveSend(ka, msg);
+ if (dead) {
+ ka->deadCB(client);
+ } else if (ka->sendCB(client, msg) < 0) {
+ VIR_WARN("Failed to send keepalive request to client %p", client);
+ virNetMessageFree(msg);
}
- virEventUpdateTimeout(ka->responseTimer, ka->response ? 0 : -1);
+ virKeepAliveLock(ka);
+ ka->refs--;
+cleanup:
virKeepAliveUnlock(ka);
}
@@ -281,15 +225,6 @@ virKeepAliveNew(int interval,
ka->deadCB = deadCB;
ka->freeCB = freeCB;
- ka->responseTimer = virEventAddTimeout(-1, virKeepAliveResponseTimer,
- ka, virKeepAliveTimerFree);
- if (ka->responseTimer < 0) {
- virKeepAliveFree(ka);
- return NULL;
- }
- /* the timer now has a reference to ka */
- ka->refs++;
-
PROBE(RPC_KEEPALIVE_NEW,
"ka=%p client=%p refs=%d",
ka, ka->client, ka->refs);
@@ -394,7 +329,7 @@ cleanup:
static void
-virKeepAliveStopInternal(virKeepAlivePtr ka, bool all)
+virKeepAliveStopInternal(virKeepAlivePtr ka, bool all ATTRIBUTE_UNUSED)
{
virKeepAliveLock(ka);
@@ -407,16 +342,6 @@ virKeepAliveStopInternal(virKeepAlivePtr ka, bool all)
ka->timer = -1;
}
- if (all) {
- if (ka->responseTimer > 0) {
- virEventRemoveTimeout(ka->responseTimer);
- ka->responseTimer = -1;
- }
-
- virNetMessageFree(ka->response);
- ka->response = NULL;
- }
-
virKeepAliveUnlock(ka);
}
@@ -482,13 +407,15 @@ virKeepAliveTrigger(virKeepAlivePtr ka,
bool
virKeepAliveCheckMessage(virKeepAlivePtr ka,
- virNetMessagePtr msg)
+ virNetMessagePtr msg,
+ virNetMessagePtr *response)
{
bool ret = false;
VIR_DEBUG("ka=%p, client=%p, msg=%p",
ka, ka ? ka->client : "(null)", msg);
+ *response = NULL;
if (!ka)
return false;
@@ -508,7 +435,7 @@ virKeepAliveCheckMessage(virKeepAlivePtr ka,
switch (msg->header.proc) {
case KEEPALIVE_PROC_PING:
VIR_DEBUG("Got keepalive request from client %p", ka->client);
- virKeepAliveScheduleResponse(ka);
+ *response = virKeepAliveMessage(ka, KEEPALIVE_PROC_PONG);
break;
case KEEPALIVE_PROC_PONG:
diff --git a/src/rpc/virkeepalive.h b/src/rpc/virkeepalive.h
index 09264a5..62227d0 100644
--- a/src/rpc/virkeepalive.h
+++ b/src/rpc/virkeepalive.h
@@ -55,6 +55,7 @@ int virKeepAliveTimeout(virKeepAlivePtr ka);
bool virKeepAliveTrigger(virKeepAlivePtr ka,
virNetMessagePtr *msg);
bool virKeepAliveCheckMessage(virKeepAlivePtr ka,
- virNetMessagePtr msg);
+ virNetMessagePtr msg,
+ virNetMessagePtr *response);
#endif /* __VIR_KEEPALIVE_H__ */
diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c
index b956f6e..48c6a5d 100644
--- a/src/rpc/virnetclient.c
+++ b/src/rpc/virnetclient.c
@@ -109,6 +109,8 @@ struct _virNetClient {
static void virNetClientIOEventLoopPassTheBuck(virNetClientPtr client,
virNetClientCallPtr thiscall);
+static int virNetClientQueueNonBlocking(virNetClientPtr client,
+ virNetMessagePtr msg);
static void virNetClientLock(virNetClientPtr client)
@@ -937,14 +939,22 @@ static int virNetClientCallDispatchStream(virNetClientPtr client)
static int
virNetClientCallDispatch(virNetClientPtr client)
{
+ virNetMessagePtr response = NULL;
+
PROBE(RPC_CLIENT_MSG_RX,
"client=%p len=%zu prog=%u vers=%u proc=%u type=%u status=%u
serial=%u",
client, client->msg.bufferLength,
client->msg.header.prog, client->msg.header.vers,
client->msg.header.proc,
client->msg.header.type, client->msg.header.status,
client->msg.header.serial);
- if (virKeepAliveCheckMessage(client->keepalive, &client->msg))
+ if (virKeepAliveCheckMessage(client->keepalive, &client->msg,
&response)) {
+ if (response &&
+ virNetClientQueueNonBlocking(client, response) < 0) {
+ VIR_WARN("Could not queue keepalive response");
+ virNetMessageFree(response);
+ }
return 0;
+ }
switch (client->msg.header.type) {
case VIR_NET_REPLY: /* Normal RPC replies */
@@ -1637,6 +1647,8 @@ void virNetClientIncomingEvent(virNetSocketPtr sock,
virNetClientCallRemovePredicate(&client->waitDispatch,
virNetClientIOEventLoopRemoveDone,
NULL);
+ virNetClientIOUpdateCallback(client, true);
+
done:
virNetClientUnlock(client);
}
@@ -1696,6 +1708,27 @@ error:
}
+static int
+virNetClientQueueNonBlocking(virNetClientPtr client,
+ virNetMessagePtr msg)
+{
+ virNetClientCallPtr call;
+
+ PROBE(RPC_CLIENT_MSG_TX_QUEUE,
+ "client=%p len=%zu prog=%u vers=%u proc=%u"
+ " type=%u status=%u serial=%u",
+ client, msg->bufferLength,
+ msg->header.prog, msg->header.vers, msg->header.proc,
+ msg->header.type, msg->header.status, msg->header.serial);
+
+ if (!(call = virNetClientCallNew(msg, false, true)))
+ return -1;
+
+ virNetClientCallQueue(&client->waitDispatch, call);
+ return 0;
+}
+
+
/*
* Returns 1 if the call was queued and will be completed later (only
* for nonBlock==true), 0 if the call was completed and -1 on error.
diff --git a/src/rpc/virnetserverclient.c b/src/rpc/virnetserverclient.c
index 6ae4e25..f3eb61a 100644
--- a/src/rpc/virnetserverclient.c
+++ b/src/rpc/virnetserverclient.c
@@ -103,13 +103,14 @@ struct _virNetServerClient
virNetServerClientCloseFunc privateDataCloseFunc;
virKeepAlivePtr keepalive;
- int keepaliveFilter;
};
static void virNetServerClientDispatchEvent(virNetSocketPtr sock, int events, void
*opaque);
static void virNetServerClientUpdateEvent(virNetServerClientPtr client);
static void virNetServerClientDispatchRead(virNetServerClientPtr client);
+static int virNetServerClientSendMessageLocked(virNetServerClientPtr client,
+ virNetMessagePtr msg);
static void virNetServerClientLock(virNetServerClientPtr client)
{
@@ -364,7 +365,6 @@ virNetServerClientPtr virNetServerClientNew(virNetSocketPtr sock,
client->readonly = readonly;
client->tlsCtxt = tls;
client->nrequests_max = nrequests_max;
- client->keepaliveFilter = -1;
client->sockTimer = virEventAddTimeout(-1, virNetServerClientSockTimerFunc,
client, NULL);
@@ -644,9 +644,6 @@ void virNetServerClientClose(virNetServerClientPtr client)
return;
}
- if (client->keepaliveFilter >= 0)
- virNetServerClientRemoveFilterLocked(client, client->keepaliveFilter);
-
if (client->keepalive) {
virKeepAliveStop(client->keepalive);
ka = client->keepalive;
@@ -844,6 +841,7 @@ readmore:
} else {
/* Grab the completed message */
virNetMessagePtr msg = client->rx;
+ virNetMessagePtr response = NULL;
virNetServerClientFilterPtr filter;
size_t i;
@@ -894,23 +892,35 @@ readmore:
msg->header.prog, msg->header.vers, msg->header.proc,
msg->header.type, msg->header.status, msg->header.serial);
+ if (virKeepAliveCheckMessage(client->keepalive, msg, &response)) {
+ virNetMessageFree(msg);
+ client->nrequests--;
+ msg = NULL;
+
+ if (response &&
+ virNetServerClientSendMessageLocked(client, response) < 0)
+ virNetMessageFree(response);
+ }
+
/* Maybe send off for queue against a filter */
- filter = client->filters;
- while (filter) {
- int ret = filter->func(client, msg, filter->opaque);
- if (ret < 0) {
- virNetMessageFree(msg);
- msg = NULL;
- if (ret < 0)
- client->wantClose = true;
- break;
- }
- if (ret > 0) {
- msg = NULL;
- break;
- }
+ if (msg) {
+ filter = client->filters;
+ while (filter) {
+ int ret = filter->func(client, msg, filter->opaque);
+ if (ret < 0) {
+ virNetMessageFree(msg);
+ msg = NULL;
+ if (ret < 0)
+ client->wantClose = true;
+ break;
+ }
+ if (ret > 0) {
+ msg = NULL;
+ break;
+ }
- filter = filter->next;
+ filter = filter->next;
+ }
}
/* Send off to for normal dispatch to workers */
@@ -1117,16 +1127,15 @@ virNetServerClientDispatchEvent(virNetSocketPtr sock, int events,
void *opaque)
}
-int virNetServerClientSendMessage(virNetServerClientPtr client,
- virNetMessagePtr msg)
+static int
+virNetServerClientSendMessageLocked(virNetServerClientPtr client,
+ virNetMessagePtr msg)
{
int ret = -1;
VIR_DEBUG("msg=%p proc=%d len=%zu offset=%zu",
msg, msg->header.proc,
msg->bufferLength, msg->bufferOffset);
- virNetServerClientLock(client);
-
msg->donefds = 0;
if (client->sock && !client->wantClose) {
PROBE(RPC_SERVER_CLIENT_MSG_TX_QUEUE,
@@ -1140,6 +1149,16 @@ int virNetServerClientSendMessage(virNetServerClientPtr client,
ret = 0;
}
+ return ret;
+}
+
+int virNetServerClientSendMessage(virNetServerClientPtr client,
+ virNetMessagePtr msg)
+{
+ int ret;
+
+ virNetServerClientLock(client);
+ ret = virNetServerClientSendMessageLocked(client, msg);
virNetServerClientUnlock(client);
return ret;
@@ -1176,20 +1195,6 @@ virNetServerClientFreeCB(void *opaque)
virNetServerClientFree(opaque);
}
-static int
-virNetServerClientKeepAliveFilter(virNetServerClientPtr client,
- virNetMessagePtr msg,
- void *opaque ATTRIBUTE_UNUSED)
-{
- if (virKeepAliveCheckMessage(client->keepalive, msg)) {
- virNetMessageFree(msg);
- client->nrequests--;
- return 1;
- }
-
- return 0;
-}
-
int
virNetServerClientInitKeepAlive(virNetServerClientPtr client,
int interval,
@@ -1208,13 +1213,6 @@ virNetServerClientInitKeepAlive(virNetServerClientPtr client,
/* keepalive object has a reference to client */
client->refs++;
- client->keepaliveFilter =
- virNetServerClientAddFilterLocked(client,
- virNetServerClientKeepAliveFilter,
- NULL);
- if (client->keepaliveFilter < 0)
- goto cleanup;
-
client->keepalive = ka;
ka = NULL;
--
1.7.10.2