[libvirt] [PATCH] Allow non-blocking message sending on virNetClient

From: "Daniel P. Berrange" <berrange@redhat.com> Split the existing virNetClientSend into two parts virNetClientSend and virNetClientSendNoReply, instead of having a 'bool expectReply' parameter. Add a new virNetClientSendNonBlock which returns 2 on full send, 1 on partial send, 0 on no send, -1 on error If a partial send occurs, then a subsequent call to any of the virNetClientSend* APIs will finish any outstanding I/O. TODO: the virNetClientEvent event handler could be used to speed up completion of partial sends if an event loop is present. * src/rpc/virnetclientprogram.c, src/rpc/virnetclientstream.c: Update for changed API --- src/rpc/virnetclient.c | 249 +++++++++++++++++++++++++++++++++------- src/rpc/virnetclient.h | 12 ++- src/rpc/virnetclientprogram.c | 2 +- src/rpc/virnetclientstream.c | 11 ++- 4 files changed, 224 insertions(+), 50 deletions(-) diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c index 4b7d4a9..b0ed507 100644 --- a/src/rpc/virnetclient.c +++ b/src/rpc/virnetclient.c @@ -55,6 +55,8 @@ struct _virNetClientCall { virNetMessagePtr msg; bool expectReply; + bool nonBlock; + bool haveThread; virCond cond; @@ -86,8 +88,15 @@ struct _virNetClient { int wakeupSendFD; int wakeupReadFD; - /* List of threads currently waiting for dispatch */ + /* + * List of calls currently waiting for dispatch + * The calls should all have threads waiting for + * them, except possibly the first call in the list + * which might be a partially sent non-blocking call. + */ virNetClientCallPtr waitDispatch; + /* Whether a thread is dispatching */ + bool haveTheBuck; size_t nstreams; virNetClientStreamPtr *streams; @@ -555,7 +564,7 @@ virNetClientCallDispatchReply(virNetClientPtr client) virNetClientCallPtr thecall; /* Ok, definitely got an RPC reply now find - out who's been waiting for it */ + out which waiting call is associated with it */ thecall = client->waitDispatch; while (thecall && !(thecall->msg->header.prog == client->msg.header.prog && @@ -896,10 +905,31 @@ virNetClientIOHandleInput(virNetClientPtr client) } +static void virNetClientPassTheBuck(virNetClientPtr client) +{ + virNetClientCallPtr tmp = client->waitDispatch; + + /* See if someone else is still waiting + * and if so, then pass the buck ! */ + while (tmp) { + if (tmp->haveThread) { + VIR_DEBUG("Passing the buck to %p", tmp); + virCondSignal(&tmp->cond); + return; + } + tmp = tmp->next; + } + VIR_DEBUG("No thread to pass the buck to"); +} + + /* * Process all calls pending dispatch/receive until we * get a reply to our own call. Then quit and pass the buck * to someone else. + * + * Returns 2 if fully sent, 1 if partially sent (only for nonBlock==true), + * 0 if nothing sent (only for nonBlock==true) and -1 on error */ static int virNetClientIOEventLoop(virNetClientPtr client, virNetClientCallPtr thiscall) @@ -924,6 +954,12 @@ static int virNetClientIOEventLoop(virNetClientPtr client, if (virNetSocketHasCachedData(client->sock)) timeout = 0; + /* If we're a non-blocking call, then we don't + * want to wait for I/O readyness + */ + if (thiscall->nonBlock) + timeout = 0; + fds[0].events = fds[0].revents = 0; fds[1].events = fds[1].revents = 0; @@ -975,8 +1011,34 @@ static int virNetClientIOEventLoop(virNetClientPtr client, /* If we have existing SASL decoded data, pretend * the socket became readable so we consume it */ - if (virNetSocketHasCachedData(client->sock)) + if (virNetSocketHasCachedData(client->sock)) { fds[0].revents |= POLLIN; + } else if (ret == 0 && thiscall->nonBlock) { + if (thiscall->msg->bufferOffset == 0) { + /* No data sent at all, remove ourselves from the list */ + tmp = client->waitDispatch; + prev = NULL; + while (tmp) { + if (tmp == thiscall) { + if (prev) { + prev->next = thiscall->next; + } else { + client->waitDispatch = thiscall->next; + } + break; + } + prev = tmp; + tmp = tmp->next; + } + VIR_DEBUG("Giving up the buck %p %p", thiscall, client->waitDispatch); + virNetClientPassTheBuck(client); + return 0; + } else { + VIR_DEBUG("Giving up the buck %p %p", thiscall, client->waitDispatch); + virNetClientPassTheBuck(client); + return 1; /* partial send */ + } + } if (fds[1].revents) { VIR_DEBUG("Woken up from poll by other thread"); @@ -988,6 +1050,7 @@ static int virNetClientIOEventLoop(virNetClientPtr client, } if (ret < 0) { + /* XXX what's this dubious errno check doing ? */ if (errno == EWOULDBLOCK) continue; virReportSystemError(errno, @@ -1005,8 +1068,8 @@ static int virNetClientIOEventLoop(virNetClientPtr client, goto error; } - /* Iterate through waiting threads and if - * any are complete then tell 'em to wakeup + /* Iterate through waiting calls and if any are + * complete, remove them from the dispatch list.. */ tmp = client->waitDispatch; prev = NULL; @@ -1019,13 +1082,25 @@ static int virNetClientIOEventLoop(virNetClientPtr client, else client->waitDispatch = tmp->next; - /* And wake them up.... - * ...they won't actually wakeup until + /* + * ...if the call being removed from the list + * still has a thread, then wake that thread up, + * otherwise free the call. The latter should + * only happen for calls without replies. + * + * ...the threads won't actually wakeup until * we release our mutex a short while * later... */ - VIR_DEBUG("Waking up sleep %p %p", tmp, client->waitDispatch); - virCondSignal(&tmp->cond); + if (tmp->haveThread) { + VIR_DEBUG("Waking up sleep %p %p", tmp, client->waitDispatch); + virCondSignal(&tmp->cond); + } else { + if (tmp->expectReply) + VIR_WARN("Got a call expecting a reply but without a waiting thread"); + ignore_value(virCondDestroy(&tmp->cond)); + VIR_FREE(tmp); + } } else { prev = tmp; } @@ -1039,13 +1114,8 @@ static int virNetClientIOEventLoop(virNetClientPtr client, */ client->waitDispatch = thiscall->next; VIR_DEBUG("Giving up the buck %p %p", thiscall, client->waitDispatch); - /* See if someone else is still waiting - * and if so, then pass the buck ! */ - if (client->waitDispatch) { - VIR_DEBUG("Passing the buck to %p", client->waitDispatch); - virCondSignal(&client->waitDispatch->cond); - } - return 0; + virNetClientPassTheBuck(client); + return 2; } @@ -1060,16 +1130,21 @@ static int virNetClientIOEventLoop(virNetClientPtr client, error: client->waitDispatch = thiscall->next; VIR_DEBUG("Giving up the buck due to I/O error %p %p", thiscall, client->waitDispatch); - /* See if someone else is still waiting - * and if so, then pass the buck ! */ - if (client->waitDispatch) { - VIR_DEBUG("Passing the buck to %p", client->waitDispatch); - virCondSignal(&client->waitDispatch->cond); - } + virNetClientPassTheBuck(client); return -1; } +static void +virNetClientUpdateIOCallback(virNetClientPtr client, bool enabled) +{ + int events = 0; + if (enabled) { + events |= VIR_EVENT_HANDLE_READABLE; + } + virNetSocketUpdateIOCallback(client->sock, events); +} + /* * This function sends a message to remote server and awaits a reply * @@ -1102,11 +1177,15 @@ error: * nation are blamed on another, providing an opportunity for war." * * NB(5) Don't Panic! + * + * Returns 2 if fully sent, 1 if partially sent (only for nonBlock==true), + * 0 if nothing sent (only for nonBlock==true) and -1 on error */ static int virNetClientIO(virNetClientPtr client, virNetClientCallPtr thiscall) { int rv = -1; + virNetClientCallPtr tmp; VIR_DEBUG("Outgoing message prog=%u version=%u serial=%u proc=%d type=%d length=%zu dispatch=%p", thiscall->msg->header.prog, @@ -1117,20 +1196,27 @@ static int virNetClientIO(virNetClientPtr client, thiscall->msg->bufferLength, client->waitDispatch); + /* Trivially detect blocking if someone else has the buck already */ + if (client->haveTheBuck && + thiscall->nonBlock) + return 0; + + /* Stick ourselves on the end of the wait queue */ + tmp = client->waitDispatch; + while (tmp && tmp->next) + tmp = tmp->next; + if (tmp) + tmp->next = thiscall; + else + client->waitDispatch = thiscall; + /* Check to see if another thread is dispatching */ - if (client->waitDispatch) { - /* Stick ourselves on the end of the wait queue */ - virNetClientCallPtr tmp = client->waitDispatch; + if (client->haveTheBuck) { char ignore = 1; - while (tmp && tmp->next) - tmp = tmp->next; - if (tmp) - tmp->next = thiscall; - else - client->waitDispatch = thiscall; /* Force other thread to wakeup from poll */ if (safewrite(client->wakeupSendFD, &ignore, sizeof(ignore)) != sizeof(ignore)) { + /* Something went wrong, so we need to remove that call we just added */ if (tmp) tmp->next = NULL; else @@ -1143,6 +1229,7 @@ static int virNetClientIO(virNetClientPtr client, VIR_DEBUG("Going to sleep %p %p", client->waitDispatch, thiscall); /* Go to sleep while other thread is working... */ if (virCondWait(&thiscall->cond, &client->lock) < 0) { + /* Something went wrong, so we need to remove that call we previously added */ if (client->waitDispatch == thiscall) { client->waitDispatch = thiscall->next; } else { @@ -1167,7 +1254,7 @@ static int virNetClientIO(virNetClientPtr client, * our reply */ if (thiscall->mode == VIR_NET_CLIENT_MODE_COMPLETE) { - rv = 0; + rv = 2; /* * We avoided catching the buck and our reply is ready ! * We've already had 'thiscall' removed from the list @@ -1177,12 +1264,10 @@ static int virNetClientIO(virNetClientPtr client, } /* Grr, someone passed the buck onto us ... */ - - } else { - /* We're first to catch the buck */ - client->waitDispatch = thiscall; } + client->haveTheBuck = true; + VIR_DEBUG("We have the buck %p %p", client->waitDispatch, thiscall); /* * The buck stops here! @@ -1198,17 +1283,19 @@ static int virNetClientIO(virNetClientPtr client, * cause the event loop thread to be blocked on the * mutex for the duration of the call */ - virNetSocketUpdateIOCallback(client->sock, 0); + virNetClientUpdateIOCallback(client, false); virResetLastError(); rv = virNetClientIOEventLoop(client, thiscall); - virNetSocketUpdateIOCallback(client->sock, VIR_EVENT_HANDLE_READABLE); + virNetClientUpdateIOCallback(client, true); if (rv == 0 && virGetLastError()) rv = -1; + client->haveTheBuck = false; + cleanup: VIR_DEBUG("All done with our call %p %p %d", client->waitDispatch, thiscall, rv); return rv; @@ -1227,7 +1314,7 @@ void virNetClientIncomingEvent(virNetSocketPtr sock, goto done; /* This should be impossible, but it doesn't hurt to check */ - if (client->waitDispatch) + if (client->haveTheBuck) goto done; VIR_DEBUG("Event fired %p %d", sock, events); @@ -1249,9 +1336,14 @@ done: } -int virNetClientSend(virNetClientPtr client, - virNetMessagePtr msg, - bool expectReply) +/* + * Returns 2 if fully sent, 1 if partially sent (only for nonBlock==true), + * 0 if nothing sent (only for nonBlock==true) and -1 on error + */ +static int virNetClientSendInternal(virNetClientPtr client, + virNetMessagePtr msg, + bool expectReply, + bool nonBlock) { virNetClientCallPtr call; int ret = -1; @@ -1270,6 +1362,12 @@ int virNetClientSend(virNetClientPtr client, return -1; } + if (expectReply && nonBlock) { + virNetError(VIR_ERR_INTERNAL_ERROR, "%s", + _("Attempt to send an non-blocking message with a synchronous reply")); + return -1; + } + if (VIR_ALLOC(call) < 0) { virReportOOMError(); return -1; @@ -1290,12 +1388,75 @@ int virNetClientSend(virNetClientPtr client, call->mode = VIR_NET_CLIENT_MODE_WAIT_RX; call->msg = msg; call->expectReply = expectReply; + call->nonBlock = nonBlock; + call->haveThread = true; ret = virNetClientIO(client, call); cleanup: - ignore_value(virCondDestroy(&call->cond)); - VIR_FREE(call); + /* If partially sent, then the call is still on the dispatch queue */ + if (ret == 1) { + call->haveThread = false; + } else { + ignore_value(virCondDestroy(&call->cond)); + VIR_FREE(call); + } virNetClientUnlock(client); return ret; } + + +/* + * @msg: a message allocated on heap or stack + * + * Send a message synchronously, and wait for the reply synchronously + * + * The caller is responsible for free'ing @msg if it was allocated + * on the heap + * + * Returns 0 on success, -1 on failure + */ +int virNetClientSend(virNetClientPtr client, + virNetMessagePtr msg) +{ + int ret = virNetClientSendInternal(client, msg, true, false); + if (ret < 0) + return -1; + return 0; +} + + +/* + * @msg: a message allocated on heap or stack + * + * Send a message synchronously, without any reply + * + * The caller is responsible for free'ing @msg if it was allocated + * on the heap + * + * Returns 0 on success, -1 on failure + */ +int virNetClientSendNoReply(virNetClientPtr client, + virNetMessagePtr msg) +{ + int ret = virNetClientSendInternal(client, msg, false, false); + if (ret < 0) + return -1; + return 0; +} + +/* + * @msg: a message allocated on the heap. + * + * Send a message asynchronously, without any reply + * + * The caller is responsible for free'ing @msg, *except* if + * this method returns -1. + * + * Returns 2 on full send, 1 on partial send, 0 on no send, -1 on error + */ +int virNetClientSendNonBlock(virNetClientPtr client, + virNetMessagePtr msg) +{ + return virNetClientSendInternal(client, msg, false, true); +} diff --git a/src/rpc/virnetclient.h b/src/rpc/virnetclient.h index fb679e8..d3c112a 100644 --- a/src/rpc/virnetclient.h +++ b/src/rpc/virnetclient.h @@ -67,9 +67,17 @@ int virNetClientAddStream(virNetClientPtr client, void virNetClientRemoveStream(virNetClientPtr client, virNetClientStreamPtr st); +/* Send a message and wait for reply */ int virNetClientSend(virNetClientPtr client, - virNetMessagePtr msg, - bool expectReply); + virNetMessagePtr msg); + +/* Send a message without needing a reply */ +int virNetClientSendNoReply(virNetClientPtr client, + virNetMessagePtr msg); + +/* Send a message without needing a reply, and don't block on I/O */ +int virNetClientSendNonBlock(virNetClientPtr client, + virNetMessagePtr msg); # ifdef HAVE_SASL void virNetClientSetSASLSession(virNetClientPtr client, diff --git a/src/rpc/virnetclientprogram.c b/src/rpc/virnetclientprogram.c index 36e2384..cb02a25 100644 --- a/src/rpc/virnetclientprogram.c +++ b/src/rpc/virnetclientprogram.c @@ -327,7 +327,7 @@ int virNetClientProgramCall(virNetClientProgramPtr prog, if (virNetMessageEncodePayload(msg, args_filter, args) < 0) goto error; - if (virNetClientSend(client, msg, true) < 0) + if (virNetClientSend(client, msg) < 0) goto error; /* None of these 3 should ever happen here, because diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index 7e2d9ae..309d48d 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -361,8 +361,13 @@ int virNetClientStreamSendPacket(virNetClientStreamPtr st, wantReply = true; } - if (virNetClientSend(client, msg, wantReply) < 0) - goto error; + if (wantReply) { + if (virNetClientSend(client, msg) < 0) + goto error; + } else { + if (virNetClientSendNoReply(client, msg) < 0) + goto error; + } virNetMessageFree(msg); @@ -407,7 +412,7 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, VIR_DEBUG("Dummy packet to wait for stream data"); virMutexUnlock(&st->lock); - ret = virNetClientSend(client, msg, true); + ret = virNetClientSend(client, msg); virMutexLock(&st->lock); virNetMessageFree(msg); -- 1.7.6.4

On Tue, Nov 08, 2011 at 18:20:02 +0000, Daniel P. Berrange wrote:
From: "Daniel P. Berrange" <berrange@redhat.com>
Split the existing virNetClientSend into two parts virNetClientSend and virNetClientSendNoReply, instead of having a 'bool expectReply' parameter.
Add a new virNetClientSendNonBlock which returns 2 on full send, 1 on partial send, 0 on no send, -1 on error
If a partial send occurs, then a subsequent call to any of the virNetClientSend* APIs will finish any outstanding I/O.
Do I understand it correctly that the main difference between this patch and my patch is that my patch stores the unfinished call separated from the other call while you store it at the beginning of the waitDispatch list? Your solution looks cleaner but I suspect the other minor differences from my patch are actually bugs (more on them later in this email) :-)
diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c index 4b7d4a9..b0ed507 100644 --- a/src/rpc/virnetclient.c +++ b/src/rpc/virnetclient.c ... @@ -924,6 +954,12 @@ static int virNetClientIOEventLoop(virNetClientPtr client, if (virNetSocketHasCachedData(client->sock)) timeout = 0;
+ /* If we're a non-blocking call, then we don't + * want to wait for I/O readyness + */ + if (thiscall->nonBlock) + timeout = 0; + fds[0].events = fds[0].revents = 0; fds[1].events = fds[1].revents = 0;
@@ -975,8 +1011,34 @@ static int virNetClientIOEventLoop(virNetClientPtr client, /* If we have existing SASL decoded data, pretend * the socket became readable so we consume it */ - if (virNetSocketHasCachedData(client->sock)) + if (virNetSocketHasCachedData(client->sock)) { fds[0].revents |= POLLIN; + } else if (ret == 0 && thiscall->nonBlock) { + if (thiscall->msg->bufferOffset == 0) {
Unfortunately, this condition won't work with SASL since it encodes some data first time we call virNetClientIOWriteMessage but returns 0 and requires us to call it repeatedly until it returns bufferLength, which means all data were sent. Thus, thiscall->msg->bufferOffset == 0 doesn't mean we did not send any data.
+ /* No data sent at all, remove ourselves from the list */ + tmp = client->waitDispatch; + prev = NULL; + while (tmp) { + if (tmp == thiscall) { + if (prev) { + prev->next = thiscall->next; + } else { + client->waitDispatch = thiscall->next; + } + break; + } + prev = tmp; + tmp = tmp->next; + } + VIR_DEBUG("Giving up the buck %p %p", thiscall, client->waitDispatch); + virNetClientPassTheBuck(client); + return 0; + } else { + VIR_DEBUG("Giving up the buck %p %p", thiscall, client->waitDispatch); + virNetClientPassTheBuck(client); + return 1; /* partial send */ + } + }
if (fds[1].revents) { VIR_DEBUG("Woken up from poll by other thread"); ... @@ -1117,20 +1196,27 @@ static int virNetClientIO(virNetClientPtr client, thiscall->msg->bufferLength, client->waitDispatch);
+ /* Trivially detect blocking if someone else has the buck already */ + if (client->haveTheBuck && + thiscall->nonBlock) + return 0;
Hmm, I don't think this is correct. We actually want nonBlock call to be send while another thread has the buck and is waiting for its call to finish. The call may take a long time to complete and we need to send keepalive requests/responses during that time.
+ + /* Stick ourselves on the end of the wait queue */ + tmp = client->waitDispatch; + while (tmp && tmp->next) + tmp = tmp->next; + if (tmp) + tmp->next = thiscall; + else + client->waitDispatch = thiscall; + /* Check to see if another thread is dispatching */ - if (client->waitDispatch) { - /* Stick ourselves on the end of the wait queue */ - virNetClientCallPtr tmp = client->waitDispatch; + if (client->haveTheBuck) { char ignore = 1; - while (tmp && tmp->next) - tmp = tmp->next; - if (tmp) - tmp->next = thiscall; - else - client->waitDispatch = thiscall;
/* Force other thread to wakeup from poll */ if (safewrite(client->wakeupSendFD, &ignore, sizeof(ignore)) != sizeof(ignore)) { + /* Something went wrong, so we need to remove that call we just added */ if (tmp) tmp->next = NULL; else ...
Jirka
participants (2)
-
Daniel P. Berrange
-
Jiri Denemark