When a client wants to send a keepalive message it needs to do so in a
non-blocking way to avoid blocking its event loop. This patch adds
dontBlock flag which says that the call should be processed without
blocking. Such calls do not have a thread waiting for the result
associated with them. This means, that sending such call fails if no
thread is dispatching and writing to the socket would block. In case
there is a thread waiting for its (normal) call to finish, sending
non-blocking call just pushes it into the queue and lets the dispatching
thread send it. The thread which has the buck tries to send all
non-blocking calls in the queue in a best effort way---if sending them
would block or there's an error on the socket, non-blocking calls are
simply removed from the queue and discarded.
---
src/rpc/virnetclient.c | 149 ++++++++++++++++++++++++++++++++++++------------
1 files changed, 113 insertions(+), 36 deletions(-)
diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c
index 055361d..7ea9a27 100644
--- a/src/rpc/virnetclient.c
+++ b/src/rpc/virnetclient.c
@@ -55,6 +55,7 @@ struct _virNetClientCall {
virNetMessagePtr msg;
bool expectReply;
+ bool dontBlock;
virCond cond;
@@ -94,6 +95,11 @@ struct _virNetClient {
};
+static int virNetClientSendInternal(virNetClientPtr client,
+ virNetMessagePtr msg,
+ bool expectReply,
+ bool dontBlock);
+
static void virNetClientLock(virNetClientPtr client)
{
virMutexLock(&client->lock);
@@ -848,6 +854,7 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
char ignore;
sigset_t oldmask, blockedsigs;
int timeout = -1;
+ bool discardNonBlocking;
/* If we have existing SASL decoded data we
* don't want to sleep in the poll(), just
@@ -865,6 +872,11 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
fds[0].events |= POLLIN;
if (tmp->mode == VIR_NET_CLIENT_MODE_WAIT_TX)
fds[0].events |= POLLOUT;
+ /* We don't want to sleep in poll if any of the calls is
+ * non-blocking
+ */
+ if (tmp->dontBlock)
+ timeout = 0;
tmp = tmp->next;
}
@@ -937,35 +949,63 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
goto error;
}
- /* Iterate through waiting threads and if
- * any are complete then tell 'em to wakeup
+ /* Iterate through waiting calls and
+ * - remove all completed nonblocking calls
+ * - remove all nonblocking calls in case poll() would block
+ * - remove all nonblocking calls if we got error from poll()
+ * - wake up threads waiting for calls that have been completed
*/
+ discardNonBlocking = ret == 0 ||
+ (fds[0].revents & POLLHUP) ||
+ (fds[0].revents & POLLERR);
tmp = client->waitDispatch;
prev = NULL;
while (tmp) {
+ virNetClientCallPtr next = tmp->next;
+
if (tmp != thiscall &&
- tmp->mode == VIR_NET_CLIENT_MODE_COMPLETE) {
+ (tmp->mode == VIR_NET_CLIENT_MODE_COMPLETE ||
+ (discardNonBlocking && tmp->dontBlock))) {
/* Take them out of the list */
if (prev)
prev->next = tmp->next;
else
client->waitDispatch = tmp->next;
- /* And wake them up....
- * ...they won't actually wakeup until
- * we release our mutex a short while
- * later...
- */
- VIR_DEBUG("Waking up sleep %p %p", tmp,
client->waitDispatch);
- virCondSignal(&tmp->cond);
+ if (tmp->dontBlock) {
+ /* tmp is a non-blocking call, no-one is waiting for it so
+ * we just free it here
+ */
+ if (tmp->mode != VIR_NET_CLIENT_MODE_COMPLETE) {
+ VIR_DEBUG("Can't finish nonblocking call %p
without"
+ " blocking or error", tmp);
+ }
+ virNetMessageFree(tmp->msg);
+ VIR_FREE(tmp);
+ } else {
+ /* And wake them up....
+ * ...they won't actually wakeup until
+ * we release our mutex a short while
+ * later...
+ */
+ VIR_DEBUG("Waking up sleep %p %p",
+ tmp, client->waitDispatch);
+ virCondSignal(&tmp->cond);
+ }
} else {
prev = tmp;
}
- tmp = tmp->next;
+ tmp = next;
}
/* Now see if *we* are done */
if (thiscall->mode == VIR_NET_CLIENT_MODE_COMPLETE) {
+ /* If next call is non-blocking call, we need to process it
+ * before giving up the buck
+ */
+ if (thiscall->next && thiscall->next->dontBlock)
+ continue;
+
/* We're at head of the list already, so
* remove us
*/
@@ -980,14 +1020,18 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
return 0;
}
-
if (fds[0].revents & (POLLHUP | POLLERR)) {
virNetError(VIR_ERR_INTERNAL_ERROR, "%s",
_("received hangup / error event on socket"));
goto error;
}
- }
+ if (thiscall->dontBlock && discardNonBlocking) {
+ virNetError(VIR_ERR_INTERNAL_ERROR, "%s",
+ _("Can't finish nonblocking call without
blocking"));
+ goto error;
+ }
+ }
error:
client->waitDispatch = thiscall->next;
@@ -1040,38 +1084,41 @@ static int virNetClientIO(virNetClientPtr client,
{
int rv = -1;
- VIR_DEBUG("Outgoing message prog=%u version=%u serial=%u proc=%d type=%d
length=%zu dispatch=%p",
+ VIR_DEBUG("Outgoing message prog=%u version=%u serial=%u proc=%d type=%d"
+ " length=%zu dontBlock=%d dispatch=%p",
thiscall->msg->header.prog,
thiscall->msg->header.vers,
thiscall->msg->header.serial,
thiscall->msg->header.proc,
thiscall->msg->header.type,
thiscall->msg->bufferLength,
+ thiscall->dontBlock,
client->waitDispatch);
/* Check to see if another thread is dispatching */
if (client->waitDispatch) {
- /* Stick ourselves on the end of the wait queue */
- virNetClientCallPtr tmp = client->waitDispatch;
+ virNetClientCallPtr tmp;
char ignore = 1;
- while (tmp && tmp->next)
- tmp = tmp->next;
- if (tmp)
- tmp->next = thiscall;
- else
- client->waitDispatch = thiscall;
/* Force other thread to wakeup from poll */
if (safewrite(client->wakeupSendFD, &ignore, sizeof(ignore)) !=
sizeof(ignore)) {
- if (tmp)
- tmp->next = NULL;
- else
- client->waitDispatch = NULL;
virReportSystemError(errno, "%s",
_("failed to wake up polling thread"));
return -1;
}
+ /* Stick ourselves on the end of the wait queue */
+ tmp = client->waitDispatch;
+ while (tmp->next)
+ tmp = tmp->next;
+ tmp->next = thiscall;
+
+ if (thiscall->dontBlock) {
+ VIR_DEBUG("Sending non-blocking call while another thread is"
+ " dispatching; it will send the call for us");
+ return 0;
+ }
+
VIR_DEBUG("Going to sleep %p %p", client->waitDispatch, thiscall);
/* Go to sleep while other thread is working... */
if (virCondWait(&thiscall->cond, &client->lock) < 0) {
@@ -1091,7 +1138,7 @@ static int virNetClientIO(virNetClientPtr client,
return -1;
}
- VIR_DEBUG("Wokeup from sleep %p %p", client->waitDispatch,
thiscall);
+ VIR_DEBUG("Woken up from sleep %p %p", client->waitDispatch,
thiscall);
/* Two reasons we can be woken up
* 1. Other thread has got our reply ready for us
* 2. Other thread is all done, and it is our turn to
@@ -1181,15 +1228,17 @@ done:
}
-int virNetClientSend(virNetClientPtr client,
- virNetMessagePtr msg,
- bool expectReply)
+static int
+virNetClientSendInternal(virNetClientPtr client,
+ virNetMessagePtr msg,
+ bool expectReply,
+ bool dontBlock)
{
virNetClientCallPtr call;
int ret = -1;
if (expectReply &&
- (msg->header.status == VIR_NET_CONTINUE)) {
+ (msg->header.status == VIR_NET_CONTINUE || dontBlock)) {
virNetError(VIR_ERR_INTERNAL_ERROR, "%s",
_("Attempt to send an asynchronous message with a synchronous
reply"));
return -1;
@@ -1202,10 +1251,15 @@ int virNetClientSend(virNetClientPtr client,
virNetClientLock(client);
- if (virCondInit(&call->cond) < 0) {
- virNetError(VIR_ERR_INTERNAL_ERROR, "%s",
- _("cannot initialize condition variable"));
- goto cleanup;
+ /* We don't need call->cond for non-blocking calls since there's no
+ * thread to be woken up anyway
+ */
+ if (!dontBlock) {
+ if (virCondInit(&call->cond) < 0) {
+ virNetError(VIR_ERR_INTERNAL_ERROR, "%s",
+ _("cannot initialize condition variable"));
+ goto cleanup;
+ }
}
if (msg->bufferLength)
@@ -1214,12 +1268,35 @@ int virNetClientSend(virNetClientPtr client,
call->mode = VIR_NET_CLIENT_MODE_WAIT_RX;
call->msg = msg;
call->expectReply = expectReply;
+ call->dontBlock = dontBlock;
ret = virNetClientIO(client, call);
cleanup:
ignore_value(virCondDestroy(&call->cond));
- VIR_FREE(call);
+ if (ret != 0) {
+ VIR_FREE(call);
+ } else if (dontBlock) {
+ /* Only free the call if it was completed since otherwise it was just
+ * queued up and will be processed later.
+ */
+ if (call->mode == VIR_NET_CLIENT_MODE_COMPLETE) {
+ /* We need to free the message as well since no-one is waiting for
+ * it.
+ */
+ virNetMessageFree(msg);
+ VIR_FREE(call);
+ }
+ } else {
+ VIR_FREE(call);
+ }
virNetClientUnlock(client);
return ret;
}
+
+int virNetClientSend(virNetClientPtr client,
+ virNetMessagePtr msg,
+ bool expectReply)
+{
+ return virNetClientSendInternal(client, msg, expectReply, false);
+}
--
1.7.6.1