So far, we were dropping non-blocking calls whenever sending them would
block. In case a client is sending lots of stream calls (which are not
supposed to generate any reply), the assumption that having other calls
in a queue is sufficient to get a reply from the server doesn't work. I
tried to fix this in b1e374a7ac56927cfe62435179bf0bba1e08b372 but
failed and reverted that commit.
With this patch, non-blocking calls are never dropped (unless the
connection is being closed) and will always be sent.
---
src/rpc/virnetclient.c | 164 +++++++++++++++++++++---------------------------
1 file changed, 71 insertions(+), 93 deletions(-)
diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c
index 3e661d2..614b469 100644
--- a/src/rpc/virnetclient.c
+++ b/src/rpc/virnetclient.c
@@ -58,7 +58,6 @@ struct _virNetClientCall {
bool expectReply;
bool nonBlock;
bool haveThread;
- bool sentSomeData;
virCond cond;
@@ -108,6 +107,10 @@ struct _virNetClient {
};
+static void virNetClientIOEventLoopPassTheBuck(virNetClientPtr client,
+ virNetClientCallPtr thiscall);
+
+
static void virNetClientLock(virNetClientPtr client)
{
virMutexLock(&client->lock);
@@ -525,19 +528,21 @@ void virNetClientClose(virNetClientPtr client)
virNetClientLock(client);
- /* If there is a thread polling for data on the socket, set wantClose flag
- * and wake the thread up or just immediately close the socket when no-one
- * is polling on it.
+ client->wantClose = true;
+
+ /* If there is a thread polling for data on the socket, wake the thread up
+ * otherwise try to pass the buck to a possibly waiting thread. If no
+ * thread is waiting, virNetClientIOEventLoopPassTheBuck will clean the
+ * queue and close the client because we set client->wantClose.
*/
- if (client->waitDispatch) {
+ if (client->haveTheBuck) {
char ignore = 1;
size_t len = sizeof(ignore);
- client->wantClose = true;
if (safewrite(client->wakeupSendFD, &ignore, len) != len)
VIR_ERROR(_("failed to wake up polling thread"));
} else {
- virNetClientCloseLocked(client);
+ virNetClientIOEventLoopPassTheBuck(client, NULL);
}
virNetClientUnlock(client);
@@ -972,8 +977,6 @@ virNetClientIOWriteMessage(virNetClientPtr client,
ret = virNetSocketWrite(client->sock,
thecall->msg->buffer +
thecall->msg->bufferOffset,
thecall->msg->bufferLength -
thecall->msg->bufferOffset);
- if (ret > 0 || virNetSocketHasPendingData(client->sock))
- thecall->sentSomeData = true;
if (ret <= 0)
return ret;
@@ -1197,71 +1200,43 @@ static bool virNetClientIOEventLoopRemoveDone(virNetClientCallPtr
call,
}
-static bool virNetClientIOEventLoopRemoveNonBlocking(virNetClientCallPtr call,
- void *opaque)
+static bool
+virNetClientIOEventLoopDetachNonBlocking(virNetClientCallPtr call,
+ void *opaque)
{
virNetClientCallPtr thiscall = opaque;
- if (call == thiscall)
- return false;
-
- if (!call->nonBlock)
- return false;
-
- if (call->sentSomeData) {
- /*
- * If some data has been sent we must keep it in the list,
- * but still wakeup any thread
- */
- if (call->haveThread) {
- VIR_DEBUG("Waking up sleep %p", call);
- virCondSignal(&call->cond);
- } else {
- VIR_DEBUG("Keeping unfinished call %p in the list", call);
- }
- return false;
- } else {
- /*
- * If no data has been sent, we can remove it from the list.
- * Wakup any thread, otherwise free the caller ourselves
- */
- if (call->haveThread) {
- VIR_DEBUG("Waking up sleep %p", call);
- virCondSignal(&call->cond);
- } else {
- VIR_DEBUG("Removing call %p", call);
- if (call->expectReply)
- VIR_WARN("Got a call expecting a reply but without a waiting
thread");
- ignore_value(virCondDestroy(&call->cond));
- VIR_FREE(call->msg);
- VIR_FREE(call);
- }
+ if (call != thiscall && call->nonBlock && call->haveThread) {
+ VIR_DEBUG("Waking up sleep %p", call);
+ call->haveThread = false;
+ virCondSignal(&call->cond);
return true;
}
+
+ return false;
}
-static void
-virNetClientIOEventLoopRemoveAll(virNetClientPtr client,
- virNetClientCallPtr thiscall)
+static bool
+virNetClientIOEventLoopRemoveAll(virNetClientCallPtr call,
+ void *opaque)
{
- if (!client->waitDispatch)
- return;
+ virNetClientCallPtr thiscall = opaque;
- if (client->waitDispatch == thiscall) {
- /* just pretend nothing was sent and the caller will free the call */
- thiscall->sentSomeData = false;
- } else {
- virNetClientCallPtr call = client->waitDispatch;
- virNetClientCallRemove(&client->waitDispatch, call);
- ignore_value(virCondDestroy(&call->cond));
- VIR_FREE(call->msg);
- VIR_FREE(call);
- }
+ if (call == thiscall)
+ return false;
+
+ VIR_DEBUG("Removing call %p", call);
+ ignore_value(virCondDestroy(&call->cond));
+ VIR_FREE(call->msg);
+ VIR_FREE(call);
+ return true;
}
-static void virNetClientIOEventLoopPassTheBuck(virNetClientPtr client,
virNetClientCallPtr thiscall)
+static void
+virNetClientIOEventLoopPassTheBuck(virNetClientPtr client,
+ virNetClientCallPtr thiscall)
{
VIR_DEBUG("Giving up the buck %p", thiscall);
virNetClientCallPtr tmp = client->waitDispatch;
@@ -1280,14 +1255,18 @@ static void virNetClientIOEventLoopPassTheBuck(virNetClientPtr
client, virNetCli
VIR_DEBUG("No thread to pass the buck to");
if (client->wantClose) {
virNetClientCloseLocked(client);
- virNetClientIOEventLoopRemoveAll(client, thiscall);
+ virNetClientCallRemovePredicate(&client->waitDispatch,
+ virNetClientIOEventLoopRemoveAll,
+ thiscall);
}
}
-static bool virNetClientIOEventLoopWantNonBlock(virNetClientCallPtr call, void *opaque
ATTRIBUTE_UNUSED)
+static bool
+virNetClientIOEventLoopWantNonBlock(virNetClientCallPtr call,
+ void *opaque ATTRIBUTE_UNUSED)
{
- return call->nonBlock;
+ return call->nonBlock && call->haveThread;
}
/*
@@ -1320,8 +1299,8 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
if (virNetSocketHasCachedData(client->sock) || client->wantClose)
timeout = 0;
- /* If there are any non-blocking calls in the queue,
- * then we don't want to sleep in poll()
+ /* If there are any non-blocking calls with an associated thread
+ * in the queue, then we don't want to sleep in poll()
*/
if (virNetClientCallMatchPredicate(client->waitDispatch,
virNetClientIOEventLoopWantNonBlock,
@@ -1394,12 +1373,15 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
}
/* If we were woken up because a new non-blocking call was queued,
- * we need to re-poll to check if we can send it.
+ * we need to re-poll to check if we can send it. To be precise, we
+ * will re-poll even if a blocking call arrived when unhandled
+ * non-blocking calls are still in the queue. But this can't hurt.
*/
if (virNetClientCallMatchPredicate(client->waitDispatch,
virNetClientIOEventLoopWantNonBlock,
NULL)) {
- VIR_DEBUG("New non-blocking call arrived; repolling");
+ VIR_DEBUG("The queue contains new non-blocking call(s);"
+ " repolling");
continue;
}
}
@@ -1424,18 +1406,18 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
}
/* Iterate through waiting calls and if any are
- * complete, remove them from the dispatch list..
+ * complete, remove them from the dispatch list.
*/
virNetClientCallRemovePredicate(&client->waitDispatch,
virNetClientIOEventLoopRemoveDone,
thiscall);
- /* Iterate through waiting calls and if any are
- * non-blocking, remove them from the dispatch list...
+ /* Iterate through waiting calls and wake up and detach threads
+ * attached to non-blocking calls.
*/
- virNetClientCallRemovePredicate(&client->waitDispatch,
- virNetClientIOEventLoopRemoveNonBlocking,
- thiscall);
+ virNetClientCallMatchPredicate(client->waitDispatch,
+ virNetClientIOEventLoopDetachNonBlocking,
+ thiscall);
/* Now see if *we* are done */
if (thiscall->mode == VIR_NET_CLIENT_MODE_COMPLETE) {
@@ -1444,15 +1426,11 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
return 2;
}
- /* We're not done, but we're non-blocking */
+ /* We're not done, but we're non-blocking; keep the call queued */
if (thiscall->nonBlock) {
+ thiscall->haveThread = false;
virNetClientIOEventLoopPassTheBuck(client, thiscall);
- if (thiscall->sentSomeData) {
- return 1;
- } else {
- virNetClientCallRemove(&client->waitDispatch, thiscall);
- return 0;
- }
+ return 1;
}
if (fds[0].revents & (POLLHUP | POLLERR)) {
@@ -1462,7 +1440,6 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
}
}
-
error:
virNetClientCallRemove(&client->waitDispatch, thiscall);
virNetClientIOEventLoopPassTheBuck(client, thiscall);
@@ -1614,9 +1591,11 @@ static int virNetClientIO(virNetClientPtr client,
goto cleanup;
}
- /* If we're non-blocking, get outta here */
+ /* If we're non-blocking, we were either queued (and detached) or the
+ * call was not sent because of an error.
+ */
if (thiscall->nonBlock) {
- if (thiscall->sentSomeData)
+ if (!thiscall->haveThread)
rv = 1; /* In progress */
else
rv = 0; /* none at all */
@@ -1708,7 +1687,7 @@ done:
/*
- * Returns 2 if fully sent, 1 if partially sent (only for nonBlock==true),
+ * Returns 2 if fully sent, 1 if queued (only for nonBlock==true),
* 0 if nothing sent (only for nonBlock==true) and -1 on error
*/
static int virNetClientSendInternal(virNetClientPtr client,
@@ -1768,16 +1747,15 @@ static int virNetClientSendInternal(virNetClientPtr client,
ret = virNetClientIO(client, call);
- /* If partially sent, then the call is still on the dispatch queue */
- if (ret == 1) {
- call->haveThread = false;
- } else {
- ignore_value(virCondDestroy(&call->cond));
- }
+ /* If queued, the call will be finished and freed later by another thread;
+ * we're done. */
+ if (ret == 1)
+ return 1;
+
+ ignore_value(virCondDestroy(&call->cond));
cleanup:
- if (ret != 1)
- VIR_FREE(call);
+ VIR_FREE(call);
return ret;
}
--
1.7.10.2