From: "Daniel P. Berrange" <berrange(a)redhat.com>
Split the existing virNetClientSend into two parts
virNetClientSend and virNetClientSendNoReply, instead
of having a 'bool expectReply' parameter.
Add a new virNetClientSendNonBlock which returns 2 on
full send, 1 on partial send, 0 on no send, -1 on error
If a partial send occurs, then a subsequent call to any
of the virNetClientSend* APIs will finish any outstanding
I/O.
TODO: the virNetClientEvent event handler could be used
to speed up completion of partial sends if an event loop
is present.
* src/rpc/virnetclientprogram.c, src/rpc/virnetclientstream.c:
Update for changed API
---
src/rpc/virnetclient.c | 249 +++++++++++++++++++++++++++++++++-------
src/rpc/virnetclient.h | 12 ++-
src/rpc/virnetclientprogram.c | 2 +-
src/rpc/virnetclientstream.c | 11 ++-
4 files changed, 224 insertions(+), 50 deletions(-)
diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c
index 4b7d4a9..b0ed507 100644
--- a/src/rpc/virnetclient.c
+++ b/src/rpc/virnetclient.c
@@ -55,6 +55,8 @@ struct _virNetClientCall {
virNetMessagePtr msg;
bool expectReply;
+ bool nonBlock;
+ bool haveThread;
virCond cond;
@@ -86,8 +88,15 @@ struct _virNetClient {
int wakeupSendFD;
int wakeupReadFD;
- /* List of threads currently waiting for dispatch */
+ /*
+ * List of calls currently waiting for dispatch
+ * The calls should all have threads waiting for
+ * them, except possibly the first call in the list
+ * which might be a partially sent non-blocking call.
+ */
virNetClientCallPtr waitDispatch;
+ /* Whether a thread is dispatching */
+ bool haveTheBuck;
size_t nstreams;
virNetClientStreamPtr *streams;
@@ -555,7 +564,7 @@ virNetClientCallDispatchReply(virNetClientPtr client)
virNetClientCallPtr thecall;
/* Ok, definitely got an RPC reply now find
- out who's been waiting for it */
+ out which waiting call is associated with it */
thecall = client->waitDispatch;
while (thecall &&
!(thecall->msg->header.prog == client->msg.header.prog &&
@@ -896,10 +905,31 @@ virNetClientIOHandleInput(virNetClientPtr client)
}
+static void virNetClientPassTheBuck(virNetClientPtr client)
+{
+ virNetClientCallPtr tmp = client->waitDispatch;
+
+ /* See if someone else is still waiting
+ * and if so, then pass the buck ! */
+ while (tmp) {
+ if (tmp->haveThread) {
+ VIR_DEBUG("Passing the buck to %p", tmp);
+ virCondSignal(&tmp->cond);
+ return;
+ }
+ tmp = tmp->next;
+ }
+ VIR_DEBUG("No thread to pass the buck to");
+}
+
+
/*
* Process all calls pending dispatch/receive until we
* get a reply to our own call. Then quit and pass the buck
* to someone else.
+ *
+ * Returns 2 if fully sent, 1 if partially sent (only for nonBlock==true),
+ * 0 if nothing sent (only for nonBlock==true) and -1 on error
*/
static int virNetClientIOEventLoop(virNetClientPtr client,
virNetClientCallPtr thiscall)
@@ -924,6 +954,12 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
if (virNetSocketHasCachedData(client->sock))
timeout = 0;
+ /* If we're a non-blocking call, then we don't
+ * want to wait for I/O readyness
+ */
+ if (thiscall->nonBlock)
+ timeout = 0;
+
fds[0].events = fds[0].revents = 0;
fds[1].events = fds[1].revents = 0;
@@ -975,8 +1011,34 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
/* If we have existing SASL decoded data, pretend
* the socket became readable so we consume it
*/
- if (virNetSocketHasCachedData(client->sock))
+ if (virNetSocketHasCachedData(client->sock)) {
fds[0].revents |= POLLIN;
+ } else if (ret == 0 && thiscall->nonBlock) {
+ if (thiscall->msg->bufferOffset == 0) {
+ /* No data sent at all, remove ourselves from the list */
+ tmp = client->waitDispatch;
+ prev = NULL;
+ while (tmp) {
+ if (tmp == thiscall) {
+ if (prev) {
+ prev->next = thiscall->next;
+ } else {
+ client->waitDispatch = thiscall->next;
+ }
+ break;
+ }
+ prev = tmp;
+ tmp = tmp->next;
+ }
+ VIR_DEBUG("Giving up the buck %p %p", thiscall,
client->waitDispatch);
+ virNetClientPassTheBuck(client);
+ return 0;
+ } else {
+ VIR_DEBUG("Giving up the buck %p %p", thiscall,
client->waitDispatch);
+ virNetClientPassTheBuck(client);
+ return 1; /* partial send */
+ }
+ }
if (fds[1].revents) {
VIR_DEBUG("Woken up from poll by other thread");
@@ -988,6 +1050,7 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
}
if (ret < 0) {
+ /* XXX what's this dubious errno check doing ? */
if (errno == EWOULDBLOCK)
continue;
virReportSystemError(errno,
@@ -1005,8 +1068,8 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
goto error;
}
- /* Iterate through waiting threads and if
- * any are complete then tell 'em to wakeup
+ /* Iterate through waiting calls and if any are
+ * complete, remove them from the dispatch list..
*/
tmp = client->waitDispatch;
prev = NULL;
@@ -1019,13 +1082,25 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
else
client->waitDispatch = tmp->next;
- /* And wake them up....
- * ...they won't actually wakeup until
+ /*
+ * ...if the call being removed from the list
+ * still has a thread, then wake that thread up,
+ * otherwise free the call. The latter should
+ * only happen for calls without replies.
+ *
+ * ...the threads won't actually wakeup until
* we release our mutex a short while
* later...
*/
- VIR_DEBUG("Waking up sleep %p %p", tmp,
client->waitDispatch);
- virCondSignal(&tmp->cond);
+ if (tmp->haveThread) {
+ VIR_DEBUG("Waking up sleep %p %p", tmp,
client->waitDispatch);
+ virCondSignal(&tmp->cond);
+ } else {
+ if (tmp->expectReply)
+ VIR_WARN("Got a call expecting a reply but without a waiting
thread");
+ ignore_value(virCondDestroy(&tmp->cond));
+ VIR_FREE(tmp);
+ }
} else {
prev = tmp;
}
@@ -1039,13 +1114,8 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
*/
client->waitDispatch = thiscall->next;
VIR_DEBUG("Giving up the buck %p %p", thiscall,
client->waitDispatch);
- /* See if someone else is still waiting
- * and if so, then pass the buck ! */
- if (client->waitDispatch) {
- VIR_DEBUG("Passing the buck to %p", client->waitDispatch);
- virCondSignal(&client->waitDispatch->cond);
- }
- return 0;
+ virNetClientPassTheBuck(client);
+ return 2;
}
@@ -1060,16 +1130,21 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
error:
client->waitDispatch = thiscall->next;
VIR_DEBUG("Giving up the buck due to I/O error %p %p", thiscall,
client->waitDispatch);
- /* See if someone else is still waiting
- * and if so, then pass the buck ! */
- if (client->waitDispatch) {
- VIR_DEBUG("Passing the buck to %p", client->waitDispatch);
- virCondSignal(&client->waitDispatch->cond);
- }
+ virNetClientPassTheBuck(client);
return -1;
}
+static void
+virNetClientUpdateIOCallback(virNetClientPtr client, bool enabled)
+{
+ int events = 0;
+ if (enabled) {
+ events |= VIR_EVENT_HANDLE_READABLE;
+ }
+ virNetSocketUpdateIOCallback(client->sock, events);
+}
+
/*
* This function sends a message to remote server and awaits a reply
*
@@ -1102,11 +1177,15 @@ error:
* nation are blamed on another, providing an opportunity for war."
*
* NB(5) Don't Panic!
+ *
+ * Returns 2 if fully sent, 1 if partially sent (only for nonBlock==true),
+ * 0 if nothing sent (only for nonBlock==true) and -1 on error
*/
static int virNetClientIO(virNetClientPtr client,
virNetClientCallPtr thiscall)
{
int rv = -1;
+ virNetClientCallPtr tmp;
VIR_DEBUG("Outgoing message prog=%u version=%u serial=%u proc=%d type=%d
length=%zu dispatch=%p",
thiscall->msg->header.prog,
@@ -1117,20 +1196,27 @@ static int virNetClientIO(virNetClientPtr client,
thiscall->msg->bufferLength,
client->waitDispatch);
+ /* Trivially detect blocking if someone else has the buck already */
+ if (client->haveTheBuck &&
+ thiscall->nonBlock)
+ return 0;
+
+ /* Stick ourselves on the end of the wait queue */
+ tmp = client->waitDispatch;
+ while (tmp && tmp->next)
+ tmp = tmp->next;
+ if (tmp)
+ tmp->next = thiscall;
+ else
+ client->waitDispatch = thiscall;
+
/* Check to see if another thread is dispatching */
- if (client->waitDispatch) {
- /* Stick ourselves on the end of the wait queue */
- virNetClientCallPtr tmp = client->waitDispatch;
+ if (client->haveTheBuck) {
char ignore = 1;
- while (tmp && tmp->next)
- tmp = tmp->next;
- if (tmp)
- tmp->next = thiscall;
- else
- client->waitDispatch = thiscall;
/* Force other thread to wakeup from poll */
if (safewrite(client->wakeupSendFD, &ignore, sizeof(ignore)) !=
sizeof(ignore)) {
+ /* Something went wrong, so we need to remove that call we just added */
if (tmp)
tmp->next = NULL;
else
@@ -1143,6 +1229,7 @@ static int virNetClientIO(virNetClientPtr client,
VIR_DEBUG("Going to sleep %p %p", client->waitDispatch, thiscall);
/* Go to sleep while other thread is working... */
if (virCondWait(&thiscall->cond, &client->lock) < 0) {
+ /* Something went wrong, so we need to remove that call we previously added
*/
if (client->waitDispatch == thiscall) {
client->waitDispatch = thiscall->next;
} else {
@@ -1167,7 +1254,7 @@ static int virNetClientIO(virNetClientPtr client,
* our reply
*/
if (thiscall->mode == VIR_NET_CLIENT_MODE_COMPLETE) {
- rv = 0;
+ rv = 2;
/*
* We avoided catching the buck and our reply is ready !
* We've already had 'thiscall' removed from the list
@@ -1177,12 +1264,10 @@ static int virNetClientIO(virNetClientPtr client,
}
/* Grr, someone passed the buck onto us ... */
-
- } else {
- /* We're first to catch the buck */
- client->waitDispatch = thiscall;
}
+ client->haveTheBuck = true;
+
VIR_DEBUG("We have the buck %p %p", client->waitDispatch, thiscall);
/*
* The buck stops here!
@@ -1198,17 +1283,19 @@ static int virNetClientIO(virNetClientPtr client,
* cause the event loop thread to be blocked on the
* mutex for the duration of the call
*/
- virNetSocketUpdateIOCallback(client->sock, 0);
+ virNetClientUpdateIOCallback(client, false);
virResetLastError();
rv = virNetClientIOEventLoop(client, thiscall);
- virNetSocketUpdateIOCallback(client->sock, VIR_EVENT_HANDLE_READABLE);
+ virNetClientUpdateIOCallback(client, true);
if (rv == 0 &&
virGetLastError())
rv = -1;
+ client->haveTheBuck = false;
+
cleanup:
VIR_DEBUG("All done with our call %p %p %d", client->waitDispatch,
thiscall, rv);
return rv;
@@ -1227,7 +1314,7 @@ void virNetClientIncomingEvent(virNetSocketPtr sock,
goto done;
/* This should be impossible, but it doesn't hurt to check */
- if (client->waitDispatch)
+ if (client->haveTheBuck)
goto done;
VIR_DEBUG("Event fired %p %d", sock, events);
@@ -1249,9 +1336,14 @@ done:
}
-int virNetClientSend(virNetClientPtr client,
- virNetMessagePtr msg,
- bool expectReply)
+/*
+ * Returns 2 if fully sent, 1 if partially sent (only for nonBlock==true),
+ * 0 if nothing sent (only for nonBlock==true) and -1 on error
+ */
+static int virNetClientSendInternal(virNetClientPtr client,
+ virNetMessagePtr msg,
+ bool expectReply,
+ bool nonBlock)
{
virNetClientCallPtr call;
int ret = -1;
@@ -1270,6 +1362,12 @@ int virNetClientSend(virNetClientPtr client,
return -1;
}
+ if (expectReply && nonBlock) {
+ virNetError(VIR_ERR_INTERNAL_ERROR, "%s",
+ _("Attempt to send an non-blocking message with a synchronous
reply"));
+ return -1;
+ }
+
if (VIR_ALLOC(call) < 0) {
virReportOOMError();
return -1;
@@ -1290,12 +1388,75 @@ int virNetClientSend(virNetClientPtr client,
call->mode = VIR_NET_CLIENT_MODE_WAIT_RX;
call->msg = msg;
call->expectReply = expectReply;
+ call->nonBlock = nonBlock;
+ call->haveThread = true;
ret = virNetClientIO(client, call);
cleanup:
- ignore_value(virCondDestroy(&call->cond));
- VIR_FREE(call);
+ /* If partially sent, then the call is still on the dispatch queue */
+ if (ret == 1) {
+ call->haveThread = false;
+ } else {
+ ignore_value(virCondDestroy(&call->cond));
+ VIR_FREE(call);
+ }
virNetClientUnlock(client);
return ret;
}
+
+
+/*
+ * @msg: a message allocated on heap or stack
+ *
+ * Send a message synchronously, and wait for the reply synchronously
+ *
+ * The caller is responsible for free'ing @msg if it was allocated
+ * on the heap
+ *
+ * Returns 0 on success, -1 on failure
+ */
+int virNetClientSend(virNetClientPtr client,
+ virNetMessagePtr msg)
+{
+ int ret = virNetClientSendInternal(client, msg, true, false);
+ if (ret < 0)
+ return -1;
+ return 0;
+}
+
+
+/*
+ * @msg: a message allocated on heap or stack
+ *
+ * Send a message synchronously, without any reply
+ *
+ * The caller is responsible for free'ing @msg if it was allocated
+ * on the heap
+ *
+ * Returns 0 on success, -1 on failure
+ */
+int virNetClientSendNoReply(virNetClientPtr client,
+ virNetMessagePtr msg)
+{
+ int ret = virNetClientSendInternal(client, msg, false, false);
+ if (ret < 0)
+ return -1;
+ return 0;
+}
+
+/*
+ * @msg: a message allocated on the heap.
+ *
+ * Send a message asynchronously, without any reply
+ *
+ * The caller is responsible for free'ing @msg, *except* if
+ * this method returns -1.
+ *
+ * Returns 2 on full send, 1 on partial send, 0 on no send, -1 on error
+ */
+int virNetClientSendNonBlock(virNetClientPtr client,
+ virNetMessagePtr msg)
+{
+ return virNetClientSendInternal(client, msg, false, true);
+}
diff --git a/src/rpc/virnetclient.h b/src/rpc/virnetclient.h
index fb679e8..d3c112a 100644
--- a/src/rpc/virnetclient.h
+++ b/src/rpc/virnetclient.h
@@ -67,9 +67,17 @@ int virNetClientAddStream(virNetClientPtr client,
void virNetClientRemoveStream(virNetClientPtr client,
virNetClientStreamPtr st);
+/* Send a message and wait for reply */
int virNetClientSend(virNetClientPtr client,
- virNetMessagePtr msg,
- bool expectReply);
+ virNetMessagePtr msg);
+
+/* Send a message without needing a reply */
+int virNetClientSendNoReply(virNetClientPtr client,
+ virNetMessagePtr msg);
+
+/* Send a message without needing a reply, and don't block on I/O */
+int virNetClientSendNonBlock(virNetClientPtr client,
+ virNetMessagePtr msg);
# ifdef HAVE_SASL
void virNetClientSetSASLSession(virNetClientPtr client,
diff --git a/src/rpc/virnetclientprogram.c b/src/rpc/virnetclientprogram.c
index 36e2384..cb02a25 100644
--- a/src/rpc/virnetclientprogram.c
+++ b/src/rpc/virnetclientprogram.c
@@ -327,7 +327,7 @@ int virNetClientProgramCall(virNetClientProgramPtr prog,
if (virNetMessageEncodePayload(msg, args_filter, args) < 0)
goto error;
- if (virNetClientSend(client, msg, true) < 0)
+ if (virNetClientSend(client, msg) < 0)
goto error;
/* None of these 3 should ever happen here, because
diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c
index 7e2d9ae..309d48d 100644
--- a/src/rpc/virnetclientstream.c
+++ b/src/rpc/virnetclientstream.c
@@ -361,8 +361,13 @@ int virNetClientStreamSendPacket(virNetClientStreamPtr st,
wantReply = true;
}
- if (virNetClientSend(client, msg, wantReply) < 0)
- goto error;
+ if (wantReply) {
+ if (virNetClientSend(client, msg) < 0)
+ goto error;
+ } else {
+ if (virNetClientSendNoReply(client, msg) < 0)
+ goto error;
+ }
virNetMessageFree(msg);
@@ -407,7 +412,7 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st,
VIR_DEBUG("Dummy packet to wait for stream data");
virMutexUnlock(&st->lock);
- ret = virNetClientSend(client, msg, true);
+ ret = virNetClientSend(client, msg);
virMutexLock(&st->lock);
virNetMessageFree(msg);
--
1.7.6.4