[libvirt] [PATCH 0/6] Handle non-blocking I/O in client send

This is an update to https://www.redhat.com/archives/libvir-list/2011-November/msg00339.html I pulled out alot of the generic cleanup to make the last hard bit easier to review

From: "Daniel P. Berrange" <berrange@redhat.com> Directly messing around with the linked list is potentially dangerous. Introduce some helper APIs to deal with list manipulating the list * src/rpc/virnetclient.c: Create linked list handlers --- src/rpc/virnetclient.c | 207 +++++++++++++++++++++++++++++++++--------------- 1 files changed, 144 insertions(+), 63 deletions(-) diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c index 4b7d4a9..463dc5a 100644 --- a/src/rpc/virnetclient.c +++ b/src/rpc/virnetclient.c @@ -110,6 +110,97 @@ static void virNetClientIncomingEvent(virNetSocketPtr sock, int events, void *opaque); +/* Append a call to the end of the list */ +static void virNetClientCallQueue(virNetClientCallPtr *head, + virNetClientCallPtr call) +{ + virNetClientCallPtr tmp = *head; + while (tmp && tmp->next) { + tmp = tmp->next; + } + if (tmp) + tmp->next = call; + else + *head = call; + call->next = NULL; +} + +#if 0 +/* Obtain a call from the head of the list */ +static virNetClientCallPtr virNetClientCallServe(virNetClientCallPtr *head) +{ + virNetClientCallPtr tmp = *head; + if (tmp) + *head = tmp->next; + else + *head = NULL; + tmp->next = NULL; + return tmp; +} +#endif + +/* Remove a call from anywhere in the list */ +static void virNetClientCallRemove(virNetClientCallPtr *head, + virNetClientCallPtr call) +{ + virNetClientCallPtr tmp = *head; + virNetClientCallPtr prev = NULL; + while (tmp) { + if (tmp == call) { + if (prev) + prev->next = tmp->next; + else + *head = tmp->next; + tmp->next = NULL; + return; + } + prev = tmp; + tmp = tmp->next; + } +} + +/* Predicate returns true if matches */ +typedef bool (*virNetClientCallPredicate)(virNetClientCallPtr call, void *opaque); + +/* Remove a list of calls from the list based on a predicate */ +static void virNetClientCallRemovePredicate(virNetClientCallPtr *head, + virNetClientCallPredicate pred, + void *opaque) +{ + virNetClientCallPtr tmp = *head; + virNetClientCallPtr prev = NULL; + while (tmp) { + virNetClientCallPtr next = tmp->next; + tmp->next = NULL; /* Temp unlink */ + if (pred(tmp, opaque)) { + if (prev) + prev->next = next; + else + *head = next; + } else { + tmp->next = next; /* Reverse temp unlink */ + prev = tmp; + } + tmp = next; + } +} + +/* Returns true if the predicate matches at least one call in the list */ +static bool virNetClientCallMatchPredicate(virNetClientCallPtr head, + virNetClientCallPredicate pred, + void *opaque) +{ + virNetClientCallPtr tmp = head; + while (tmp) { + if (pred(tmp, opaque)) { + return true; + } + tmp = tmp->next; + } + return false; +} + + static void virNetClientEventFree(void *opaque) { virNetClientPtr client = opaque; @@ -896,6 +987,42 @@ virNetClientIOHandleInput(virNetClientPtr client) } +static bool virNetClientIOEventLoopPollEvents(virNetClientCallPtr call, + void *opaque) +{ + struct pollfd *fd = opaque; + + if (call->mode == VIR_NET_CLIENT_MODE_WAIT_RX) + fd->events |= POLLIN; + if (call->mode == VIR_NET_CLIENT_MODE_WAIT_TX) + fd->events |= POLLOUT; + + return true; +} + + +static bool virNetClientIOEventLoopRemoveDone(virNetClientCallPtr call, + void *opaque) +{ + virNetClientCallPtr thiscall = opaque; + + if (call == thiscall) + return false; + + if (call->mode != VIR_NET_CLIENT_MODE_COMPLETE) + return false; + + /* + * ...they won't actually wakeup until + * we release our mutex a short while + * later... + */ + VIR_DEBUG("Waking up sleeping call %p", call); + virCondSignal(&call->cond); + + return true; +} + /* * Process all calls pending dispatch/receive until we * get a reply to our own call. Then quit and pass the buck @@ -911,8 +1038,6 @@ static int virNetClientIOEventLoop(virNetClientPtr client, fds[1].fd = client->wakeupReadFD; for (;;) { - virNetClientCallPtr tmp = client->waitDispatch; - virNetClientCallPtr prev; char ignore; sigset_t oldmask, blockedsigs; int timeout = -1; @@ -928,14 +1053,11 @@ static int virNetClientIOEventLoop(virNetClientPtr client, fds[1].events = fds[1].revents = 0; fds[1].events = POLLIN; - while (tmp) { - if (tmp->mode == VIR_NET_CLIENT_MODE_WAIT_RX) - fds[0].events |= POLLIN; - if (tmp->mode == VIR_NET_CLIENT_MODE_WAIT_TX) - fds[0].events |= POLLOUT; - tmp = tmp->next; - } + /* Calculate poll events for calls */ + virNetClientCallMatchPredicate(client->waitDispatch, + virNetClientIOEventLoopPollEvents, + &fds[0]); /* We have to be prepared to receive stream data * regardless of whether any of the calls waiting @@ -1008,37 +1130,16 @@ static int virNetClientIOEventLoop(virNetClientPtr client, /* Iterate through waiting threads and if * any are complete then tell 'em to wakeup */ - tmp = client->waitDispatch; - prev = NULL; - while (tmp) { - if (tmp != thiscall && - tmp->mode == VIR_NET_CLIENT_MODE_COMPLETE) { - /* Take them out of the list */ - if (prev) - prev->next = tmp->next; - else - client->waitDispatch = tmp->next; - - /* And wake them up.... - * ...they won't actually wakeup until - * we release our mutex a short while - * later... - */ - VIR_DEBUG("Waking up sleep %p %p", tmp, client->waitDispatch); - virCondSignal(&tmp->cond); - } else { - prev = tmp; - } - tmp = tmp->next; - } + virNetClientCallRemovePredicate(&client->waitDispatch, + virNetClientIOEventLoopRemoveDone, + thiscall); /* Now see if *we* are done */ if (thiscall->mode == VIR_NET_CLIENT_MODE_COMPLETE) { - /* We're at head of the list already, so - * remove us - */ - client->waitDispatch = thiscall->next; + virNetClientCallRemove(&client->waitDispatch, thiscall); + VIR_DEBUG("Giving up the buck %p %p", thiscall, client->waitDispatch); + /* See if someone else is still waiting * and if so, then pass the buck ! */ if (client->waitDispatch) { @@ -1058,7 +1159,7 @@ static int virNetClientIOEventLoop(virNetClientPtr client, error: - client->waitDispatch = thiscall->next; + virNetClientCallRemove(&client->waitDispatch, thiscall); VIR_DEBUG("Giving up the buck due to I/O error %p %p", thiscall, client->waitDispatch); /* See if someone else is still waiting * and if so, then pass the buck ! */ @@ -1119,22 +1220,13 @@ static int virNetClientIO(virNetClientPtr client, /* Check to see if another thread is dispatching */ if (client->waitDispatch) { - /* Stick ourselves on the end of the wait queue */ - virNetClientCallPtr tmp = client->waitDispatch; char ignore = 1; - while (tmp && tmp->next) - tmp = tmp->next; - if (tmp) - tmp->next = thiscall; - else - client->waitDispatch = thiscall; + /* Stick ourselves on the end of the wait queue */ + virNetClientCallQueue(&client->waitDispatch, thiscall); /* Force other thread to wakeup from poll */ if (safewrite(client->wakeupSendFD, &ignore, sizeof(ignore)) != sizeof(ignore)) { - if (tmp) - tmp->next = NULL; - else - client->waitDispatch = NULL; + virNetClientCallRemove(&client->waitDispatch, thiscall); virReportSystemError(errno, "%s", _("failed to wake up polling thread")); return -1; @@ -1143,17 +1235,7 @@ static int virNetClientIO(virNetClientPtr client, VIR_DEBUG("Going to sleep %p %p", client->waitDispatch, thiscall); /* Go to sleep while other thread is working... */ if (virCondWait(&thiscall->cond, &client->lock) < 0) { - if (client->waitDispatch == thiscall) { - client->waitDispatch = thiscall->next; - } else { - tmp = client->waitDispatch; - while (tmp && tmp->next && - tmp->next != thiscall) { - tmp = tmp->next; - } - if (tmp && tmp->next == thiscall) - tmp->next = thiscall->next; - } + virNetClientCallRemove(&client->waitDispatch, thiscall); virNetError(VIR_ERR_INTERNAL_ERROR, "%s", _("failed to wait on condition")); return -1; @@ -1177,10 +1259,9 @@ static int virNetClientIO(virNetClientPtr client, } /* Grr, someone passed the buck onto us ... */ - } else { - /* We're first to catch the buck */ - client->waitDispatch = thiscall; + /* We're the first to arrive */ + virNetClientCallQueue(&client->waitDispatch, thiscall); } VIR_DEBUG("We have the buck %p %p", client->waitDispatch, thiscall); -- 1.7.6.4

On Fri, Nov 11, 2011 at 16:21:59 +0000, Daniel P. Berrange wrote:
From: "Daniel P. Berrange" <berrange@redhat.com>
Directly messing around with the linked list is potentially dangerous. Introduce some helper APIs to deal with list manipulating the list
* src/rpc/virnetclient.c: Create linked list handlers --- src/rpc/virnetclient.c | 207 +++++++++++++++++++++++++++++++++--------------- 1 files changed, 144 insertions(+), 63 deletions(-)
diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c index 4b7d4a9..463dc5a 100644 --- a/src/rpc/virnetclient.c +++ b/src/rpc/virnetclient.c @@ -110,6 +110,97 @@ static void virNetClientIncomingEvent(virNetSocketPtr sock, int events, void *opaque);
+/* Append a call to the end of the list */ +static void virNetClientCallQueue(virNetClientCallPtr *head, + virNetClientCallPtr call) +{ + virNetClientCallPtr tmp = *head; + while (tmp && tmp->next) { + tmp = tmp->next; + } + if (tmp) + tmp->next = call; + else + *head = call; + call->next = NULL; +} + +#if 0 +/* Obtain a call from the head of the list */ +static virNetClientCallPtr virNetClientCallServe(virNetClientCallPtr *head) +{ + virNetClientCallPtr tmp = *head; + if (tmp) + *head = tmp->next; + else + *head = NULL; + tmp->next = NULL; + return tmp; +} +#endif + +/* Remove a call from anywhere in the list */ +static void virNetClientCallRemove(virNetClientCallPtr *head, + virNetClientCallPtr call) +{ + virNetClientCallPtr tmp = *head; + virNetClientCallPtr prev = NULL; + while (tmp) { + if (tmp == call) { + if (prev) + prev->next = tmp->next; + else + *head = tmp->next; + tmp->next = NULL; + return; + } + prev = tmp; + tmp = tmp->next; + } +} + +/* Predicate returns true if matches */ +typedef bool (*virNetClientCallPredicate)(virNetClientCallPtr call, void *opaque); + +/* Remove a list of calls from the list based on a predicate */ +static void virNetClientCallRemovePredicate(virNetClientCallPtr *head, + virNetClientCallPredicate pred, + void *opaque) +{ + virNetClientCallPtr tmp = *head; + virNetClientCallPtr prev = NULL; + while (tmp) { + virNetClientCallPtr next = tmp->next; + tmp->next = NULL; /* Temp unlink */ + if (pred(tmp, opaque)) { + if (prev) + prev->next = next; + else + *head = next; + } else { + tmp->next = next; /* Reverse temp unlink */ + prev = tmp; + } + tmp = next; + } +} + +/* Returns true if the predicate matches at least one call in the list */ +static bool virNetClientCallMatchPredicate(virNetClientCallPtr head, + virNetClientCallPredicate pred, + void *opaque) +{ + virNetClientCallPtr tmp = head; + while (tmp) { + if (pred(tmp, opaque)) { + return true; + } + tmp = tmp->next; + } + return false; +} + + static void virNetClientEventFree(void *opaque) { virNetClientPtr client = opaque; @@ -896,6 +987,42 @@ virNetClientIOHandleInput(virNetClientPtr client) }
+static bool virNetClientIOEventLoopPollEvents(virNetClientCallPtr call, + void *opaque) +{ + struct pollfd *fd = opaque; + + if (call->mode == VIR_NET_CLIENT_MODE_WAIT_RX) + fd->events |= POLLIN; + if (call->mode == VIR_NET_CLIENT_MODE_WAIT_TX) + fd->events |= POLLOUT; + + return true; +}
This should return false otherwise we only set fd->events according to the first call in the queue. We need to go through all calls.
+ + +static bool virNetClientIOEventLoopRemoveDone(virNetClientCallPtr call, + void *opaque) +{ + virNetClientCallPtr thiscall = opaque; + + if (call == thiscall) + return false; + + if (call->mode != VIR_NET_CLIENT_MODE_COMPLETE) + return false; + + /* + * ...they won't actually wakeup until + * we release our mutex a short while + * later... + */ + VIR_DEBUG("Waking up sleeping call %p", call); + virCondSignal(&call->cond); + + return true; +} + /* * Process all calls pending dispatch/receive until we * get a reply to our own call. Then quit and pass the buck @@ -911,8 +1038,6 @@ static int virNetClientIOEventLoop(virNetClientPtr client, fds[1].fd = client->wakeupReadFD;
for (;;) { - virNetClientCallPtr tmp = client->waitDispatch; - virNetClientCallPtr prev; char ignore; sigset_t oldmask, blockedsigs; int timeout = -1; @@ -928,14 +1053,11 @@ static int virNetClientIOEventLoop(virNetClientPtr client, fds[1].events = fds[1].revents = 0;
fds[1].events = POLLIN; - while (tmp) { - if (tmp->mode == VIR_NET_CLIENT_MODE_WAIT_RX) - fds[0].events |= POLLIN; - if (tmp->mode == VIR_NET_CLIENT_MODE_WAIT_TX) - fds[0].events |= POLLOUT;
- tmp = tmp->next; - } + /* Calculate poll events for calls */ + virNetClientCallMatchPredicate(client->waitDispatch, + virNetClientIOEventLoopPollEvents, + &fds[0]);
/* We have to be prepared to receive stream data * regardless of whether any of the calls waiting ...
And we should avoid lines longer than 80 character, which is mostly not a problem of this patch but some of the other patches in this series don't follow this rule (mainly in function prototypes). ACK with the small issue fixed. Jirka

From: "Daniel P. Berrange" <berrange@redhat.com> Instead of inferring whether the buck is held from the waitDispatch pointer, use an explicit 'bool haveTheBuck' field * src/rpc/virnetclient.c: Explicitly track the buck --- src/rpc/virnetclient.c | 18 +++++++++++------- 1 files changed, 11 insertions(+), 7 deletions(-) diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c index 463dc5a..73dfc6e 100644 --- a/src/rpc/virnetclient.c +++ b/src/rpc/virnetclient.c @@ -88,6 +88,8 @@ struct _virNetClient { /* List of threads currently waiting for dispatch */ virNetClientCallPtr waitDispatch; + /* True if a thread holds the buck */ + bool haveTheBuck; size_t nstreams; virNetClientStreamPtr *streams; @@ -1218,11 +1220,12 @@ static int virNetClientIO(virNetClientPtr client, thiscall->msg->bufferLength, client->waitDispatch); + /* Stick ourselves on the end of the wait queue */ + virNetClientCallQueue(&client->waitDispatch, thiscall); + /* Check to see if another thread is dispatching */ - if (client->waitDispatch) { + if (client->haveTheBuck) { char ignore = 1; - /* Stick ourselves on the end of the wait queue */ - virNetClientCallQueue(&client->waitDispatch, thiscall); /* Force other thread to wakeup from poll */ if (safewrite(client->wakeupSendFD, &ignore, sizeof(ignore)) != sizeof(ignore)) { @@ -1259,12 +1262,11 @@ static int virNetClientIO(virNetClientPtr client, } /* Grr, someone passed the buck onto us ... */ - } else { - /* We're the first to arrive */ - virNetClientCallQueue(&client->waitDispatch, thiscall); } VIR_DEBUG("We have the buck %p %p", client->waitDispatch, thiscall); + client->haveTheBuck = true; + /* * The buck stops here! * @@ -1290,6 +1292,8 @@ static int virNetClientIO(virNetClientPtr client, virGetLastError()) rv = -1; + client->haveTheBuck = false; + cleanup: VIR_DEBUG("All done with our call %p %p %d", client->waitDispatch, thiscall, rv); return rv; @@ -1308,7 +1312,7 @@ void virNetClientIncomingEvent(virNetSocketPtr sock, goto done; /* This should be impossible, but it doesn't hurt to check */ - if (client->waitDispatch) + if (client->haveTheBuck) goto done; VIR_DEBUG("Event fired %p %d", sock, events); -- 1.7.6.4

On Fri, Nov 11, 2011 at 16:22:00 +0000, Daniel P. Berrange wrote:
From: "Daniel P. Berrange" <berrange@redhat.com>
Instead of inferring whether the buck is held from the waitDispatch pointer, use an explicit 'bool haveTheBuck' field
* src/rpc/virnetclient.c: Explicitly track the buck --- src/rpc/virnetclient.c | 18 +++++++++++------- 1 files changed, 11 insertions(+), 7 deletions(-)
ACK Jirka

From: "Daniel P. Berrange" <berrange@redhat.com> Remove some duplication by pulling the code for passing the buck out into a helper method * src/rpc/virnetclient.c: Introduce virNetClientIOEventLoopPassTheBuck --- src/rpc/virnetclient.c | 35 +++++++++++++++++++---------------- 1 files changed, 19 insertions(+), 16 deletions(-) diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c index 73dfc6e..1f46965 100644 --- a/src/rpc/virnetclient.c +++ b/src/rpc/virnetclient.c @@ -1025,6 +1025,23 @@ static bool virNetClientIOEventLoopRemoveDone(virNetClientCallPtr call, return true; } + +static void virNetClientIOEventLoopPassTheBuck(virNetClientPtr client, virNetClientCallPtr thiscall) +{ + VIR_DEBUG("Giving up the buck %p", thiscall); + virNetClientCallPtr tmp = client->waitDispatch; + /* See if someone else is still waiting + * and if so, then pass the buck ! */ + while (tmp) { + if (tmp != thiscall) { + VIR_DEBUG("Passing the buck to %p", tmp); + virCondSignal(&tmp->cond); + break; + } + tmp = tmp->next; + } +} + /* * Process all calls pending dispatch/receive until we * get a reply to our own call. Then quit and pass the buck @@ -1139,15 +1156,7 @@ static int virNetClientIOEventLoop(virNetClientPtr client, /* Now see if *we* are done */ if (thiscall->mode == VIR_NET_CLIENT_MODE_COMPLETE) { virNetClientCallRemove(&client->waitDispatch, thiscall); - - VIR_DEBUG("Giving up the buck %p %p", thiscall, client->waitDispatch); - - /* See if someone else is still waiting - * and if so, then pass the buck ! */ - if (client->waitDispatch) { - VIR_DEBUG("Passing the buck to %p", client->waitDispatch); - virCondSignal(&client->waitDispatch->cond); - } + virNetClientIOEventLoopPassTheBuck(client, thiscall); return 0; } @@ -1162,13 +1171,7 @@ static int virNetClientIOEventLoop(virNetClientPtr client, error: virNetClientCallRemove(&client->waitDispatch, thiscall); - VIR_DEBUG("Giving up the buck due to I/O error %p %p", thiscall, client->waitDispatch); - /* See if someone else is still waiting - * and if so, then pass the buck ! */ - if (client->waitDispatch) { - VIR_DEBUG("Passing the buck to %p", client->waitDispatch); - virCondSignal(&client->waitDispatch->cond); - } + virNetClientIOEventLoopPassTheBuck(client, thiscall); return -1; } -- 1.7.6.4

On Fri, Nov 11, 2011 at 16:22:01 +0000, Daniel P. Berrange wrote:
From: "Daniel P. Berrange" <berrange@redhat.com>
Remove some duplication by pulling the code for passing the buck out into a helper method
* src/rpc/virnetclient.c: Introduce virNetClientIOEventLoopPassTheBuck --- src/rpc/virnetclient.c | 35 +++++++++++++++++++---------------- 1 files changed, 19 insertions(+), 16 deletions(-)
ACK Jirka

From: "Daniel P. Berrange" <berrange@redhat.com> Stop multiplexing virNetClientSend for two different purposes, instead add virNetClientSendWithReply and virNetClientSendNoReply * src/rpc/virnetclient.c, src/rpc/virnetclient.h: Replace virNetClientSend with virNetClientSendWithReply and virNetClientSendNoReply * src/rpc/virnetclientprogram.c, src/rpc/virnetclientstream.c: Update for new API names --- src/rpc/virnetclient.c | 45 ++++++++++++++++++++++++++++++++++++++-- src/rpc/virnetclient.h | 8 ++++-- src/rpc/virnetclientprogram.c | 2 +- src/rpc/virnetclientstream.c | 13 ++++++----- 4 files changed, 55 insertions(+), 13 deletions(-) diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c index 1f46965..c4136b4 100644 --- a/src/rpc/virnetclient.c +++ b/src/rpc/virnetclient.c @@ -1337,9 +1337,9 @@ done: } -int virNetClientSend(virNetClientPtr client, - virNetMessagePtr msg, - bool expectReply) +static int virNetClientSendInternal(virNetClientPtr client, + virNetMessagePtr msg, + bool expectReply) { virNetClientCallPtr call; int ret = -1; @@ -1387,3 +1387,42 @@ cleanup: virNetClientUnlock(client); return ret; } + +/* + * @msg: a message allocated on heap or stack + * + * Send a message synchronously, and wait for the reply synchronously + * + * The caller is responsible for free'ing @msg if it was allocated + * on the heap + * + * Returns 0 on success, -1 on failure + */ +int virNetClientSendWithReply(virNetClientPtr client, + virNetMessagePtr msg) +{ + int ret = virNetClientSendInternal(client, msg, true); + if (ret < 0) + return -1; + return 0; +} + + +/* + * @msg: a message allocated on heap or stack + * + * Send a message synchronously, without any reply + * + * The caller is responsible for free'ing @msg if it was allocated + * on the heap + * + * Returns 0 on success, -1 on failure + */ +int virNetClientSendNoReply(virNetClientPtr client, + virNetMessagePtr msg) +{ + int ret = virNetClientSendInternal(client, msg, false); + if (ret < 0) + return -1; + return 0; +} diff --git a/src/rpc/virnetclient.h b/src/rpc/virnetclient.h index fb679e8..eef3eb3 100644 --- a/src/rpc/virnetclient.h +++ b/src/rpc/virnetclient.h @@ -67,9 +67,11 @@ int virNetClientAddStream(virNetClientPtr client, void virNetClientRemoveStream(virNetClientPtr client, virNetClientStreamPtr st); -int virNetClientSend(virNetClientPtr client, - virNetMessagePtr msg, - bool expectReply); +int virNetClientSendWithReply(virNetClientPtr client, + virNetMessagePtr msg); + +int virNetClientSendNoReply(virNetClientPtr client, + virNetMessagePtr msg); # ifdef HAVE_SASL void virNetClientSetSASLSession(virNetClientPtr client, diff --git a/src/rpc/virnetclientprogram.c b/src/rpc/virnetclientprogram.c index 36e2384..e1e8846 100644 --- a/src/rpc/virnetclientprogram.c +++ b/src/rpc/virnetclientprogram.c @@ -327,7 +327,7 @@ int virNetClientProgramCall(virNetClientProgramPtr prog, if (virNetMessageEncodePayload(msg, args_filter, args) < 0) goto error; - if (virNetClientSend(client, msg, true) < 0) + if (virNetClientSendWithReply(client, msg) < 0) goto error; /* None of these 3 should ever happen here, because diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index 7e2d9ae..a4292e7 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -328,7 +328,6 @@ int virNetClientStreamSendPacket(virNetClientStreamPtr st, size_t nbytes) { virNetMessagePtr msg; - bool wantReply; VIR_DEBUG("st=%p status=%d data=%p nbytes=%zu", st, status, data, nbytes); if (!(msg = virNetMessageNew(false))) @@ -354,15 +353,17 @@ int virNetClientStreamSendPacket(virNetClientStreamPtr st, if (status == VIR_NET_CONTINUE) { if (virNetMessageEncodePayloadRaw(msg, data, nbytes) < 0) goto error; - wantReply = false; + + if (virNetClientSendNoReply(client, msg) < 0) + goto error; } else { if (virNetMessageEncodePayloadRaw(msg, NULL, 0) < 0) goto error; - wantReply = true; + + if (virNetClientSendWithReply(client, msg) < 0) + goto error; } - if (virNetClientSend(client, msg, wantReply) < 0) - goto error; virNetMessageFree(msg); @@ -407,7 +408,7 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, VIR_DEBUG("Dummy packet to wait for stream data"); virMutexUnlock(&st->lock); - ret = virNetClientSend(client, msg, true); + ret = virNetClientSendWithReply(client, msg); virMutexLock(&st->lock); virNetMessageFree(msg); -- 1.7.6.4

On Fri, Nov 11, 2011 at 16:22:02 +0000, Daniel P. Berrange wrote:
From: "Daniel P. Berrange" <berrange@redhat.com>
Stop multiplexing virNetClientSend for two different purposes, instead add virNetClientSendWithReply and virNetClientSendNoReply
* src/rpc/virnetclient.c, src/rpc/virnetclient.h: Replace virNetClientSend with virNetClientSendWithReply and virNetClientSendNoReply * src/rpc/virnetclientprogram.c, src/rpc/virnetclientstream.c: Update for new API names --- src/rpc/virnetclient.c | 45 ++++++++++++++++++++++++++++++++++++++-- src/rpc/virnetclient.h | 8 ++++-- src/rpc/virnetclientprogram.c | 2 +- src/rpc/virnetclientstream.c | 13 ++++++----- 4 files changed, 55 insertions(+), 13 deletions(-)
ACK Jirka

From: "Daniel P. Berrange" <berrange@redhat.com> * src/rpc/virnetclient.c: Add helper for setting I/O callback events --- src/rpc/virnetclient.c | 15 +++++++++++++-- 1 files changed, 13 insertions(+), 2 deletions(-) diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c index c4136b4..96d1886 100644 --- a/src/rpc/virnetclient.c +++ b/src/rpc/virnetclient.c @@ -1176,6 +1176,17 @@ error: } +static void virNetClientIOUpdateCallback(virNetClientPtr client, + bool enableCallback) +{ + int events = 0; + if (enableCallback) + events |= VIR_EVENT_HANDLE_READABLE; + + virNetSocketUpdateIOCallback(client->sock, events); +} + + /* * This function sends a message to remote server and awaits a reply * @@ -1284,12 +1295,12 @@ static int virNetClientIO(virNetClientPtr client, * cause the event loop thread to be blocked on the * mutex for the duration of the call */ - virNetSocketUpdateIOCallback(client->sock, 0); + virNetClientIOUpdateCallback(client, false); virResetLastError(); rv = virNetClientIOEventLoop(client, thiscall); - virNetSocketUpdateIOCallback(client->sock, VIR_EVENT_HANDLE_READABLE); + virNetClientIOUpdateCallback(client, true); if (rv == 0 && virGetLastError()) -- 1.7.6.4

On Fri, Nov 11, 2011 at 16:22:03 +0000, Daniel P. Berrange wrote:
From: "Daniel P. Berrange" <berrange@redhat.com>
* src/rpc/virnetclient.c: Add helper for setting I/O callback events --- src/rpc/virnetclient.c | 15 +++++++++++++-- 1 files changed, 13 insertions(+), 2 deletions(-)
ACK Jirka

From: "Daniel P. Berrange" <berrange@redhat.com> Split the existing virNetClientSend into two parts virNetClientSend and virNetClientSendNoReply, instead of having a 'bool expectReply' parameter. Add a new virNetClientSendNonBlock which returns 2 on full send, 1 on partial send, 0 on no send, -1 on error If a partial send occurs, then a subsequent call to any of the virNetClientSend* APIs will finish any outstanding I/O. TODO: the virNetClientEvent event handler could be used to speed up completion of partial sends if an event loop is present. * src/rpc/virnetclientprogram.c, src/rpc/virnetclientstream.c: Update for changed API --- src/rpc/virnetclient.c | 197 +++++++++++++++++++++++++++++++++++++++++++----- src/rpc/virnetclient.h | 4 + src/rpc/virnetsocket.c | 13 +++ src/rpc/virnetsocket.h | 1 + 4 files changed, 197 insertions(+), 18 deletions(-) diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c index 96d1886..9891f55 100644 --- a/src/rpc/virnetclient.c +++ b/src/rpc/virnetclient.c @@ -55,6 +55,9 @@ struct _virNetClientCall { virNetMessagePtr msg; bool expectReply; + bool nonBlock; + bool haveThread; + bool sentSomeData; virCond cond; @@ -86,7 +89,12 @@ struct _virNetClient { int wakeupSendFD; int wakeupReadFD; - /* List of threads currently waiting for dispatch */ + /* + * List of calls currently waiting for dispatch + * The calls should all have threads waiting for + * them, except possibly the first call in the list + * which might be a partially sent non-blocking call. + */ virNetClientCallPtr waitDispatch; /* True if a thread holds the buck */ bool haveTheBuck; @@ -648,7 +656,7 @@ virNetClientCallDispatchReply(virNetClientPtr client) virNetClientCallPtr thecall; /* Ok, definitely got an RPC reply now find - out who's been waiting for it */ + out which waiting call is associated with it */ thecall = client->waitDispatch; while (thecall && !(thecall->msg->header.prog == client->msg.header.prog && @@ -824,6 +832,8 @@ virNetClientIOWriteMessage(virNetClientPtr client, ret = virNetSocketWrite(client->sock, thecall->msg->buffer + thecall->msg->bufferOffset, thecall->msg->bufferLength - thecall->msg->bufferOffset); + if (ret || virNetSocketHasPendingData(client->sock)) + thecall->sentSomeData = true; if (ret <= 0) return ret; @@ -1015,17 +1025,66 @@ static bool virNetClientIOEventLoopRemoveDone(virNetClientCallPtr call, return false; /* - * ...they won't actually wakeup until + * ...if the call being removed from the list + * still has a thread, then wake that thread up, + * otherwise free the call. The latter should + * only happen for calls without replies. + * + * ...the threads won't actually wakeup until * we release our mutex a short while * later... */ - VIR_DEBUG("Waking up sleeping call %p", call); - virCondSignal(&call->cond); + if (call->haveThread) { + VIR_DEBUG("Waking up sleep %p", call); + virCondSignal(&call->cond); + } else { + if (call->expectReply) + VIR_WARN("Got a call expecting a reply but without a waiting thread"); + ignore_value(virCondDestroy(&call->cond)); + VIR_FREE(call); + } return true; } +static bool virNetClientIOEventLoopRemoveNonBlocking(virNetClientCallPtr call, + void *opaque) +{ + virNetClientCallPtr thiscall = opaque; + + if (call == thiscall) + return false; + + if (!call->nonBlock) + return false; + + /* + * ...if the call being removed from the list + * still has a thread, then wake that thread up, + * otherwise free the call. The latter should + * only happen for calls without replies. + * + * ...the threads won't actually wakeup until + * we release our mutex a short while + * later... + */ + if (call->haveThread) { + VIR_DEBUG("Waking up sleep %p", call); + virCondSignal(&call->cond); + return true; + } else if (!call->sentSomeData) { + if (call->expectReply) + VIR_WARN("Got a call expecting a reply but without a waiting thread"); + ignore_value(virCondDestroy(&call->cond)); + VIR_FREE(call); + return true; + } + + return false; +} + + static void virNetClientIOEventLoopPassTheBuck(virNetClientPtr client, virNetClientCallPtr thiscall) { VIR_DEBUG("Giving up the buck %p", thiscall); @@ -1033,19 +1092,29 @@ static void virNetClientIOEventLoopPassTheBuck(virNetClientPtr client, virNetCli /* See if someone else is still waiting * and if so, then pass the buck ! */ while (tmp) { - if (tmp != thiscall) { + if (tmp != thiscall && tmp->haveThread) { VIR_DEBUG("Passing the buck to %p", tmp); virCondSignal(&tmp->cond); break; } tmp = tmp->next; } + VIR_DEBUG("No thread to pass the buck to"); +} + + +static bool virNetClientIOEventLoopWantNonBlock(virNetClientCallPtr call, void *opaque ATTRIBUTE_UNUSED) +{ + return call->nonBlock; } /* * Process all calls pending dispatch/receive until we * get a reply to our own call. Then quit and pass the buck * to someone else. + * + * Returns 2 if fully sent, 1 if partially sent (only for nonBlock==true), + * 0 if nothing sent (only for nonBlock==true) and -1 on error */ static int virNetClientIOEventLoop(virNetClientPtr client, virNetClientCallPtr thiscall) @@ -1068,6 +1137,14 @@ static int virNetClientIOEventLoop(virNetClientPtr client, if (virNetSocketHasCachedData(client->sock)) timeout = 0; + /* If there are any non-blocking calls in the queue, + * then we don't want to sleep in poll() + */ + if (virNetClientCallMatchPredicate(client->waitDispatch, + virNetClientIOEventLoopWantNonBlock, + NULL)) + timeout = 0; + fds[0].events = fds[0].revents = 0; fds[1].events = fds[1].revents = 0; @@ -1116,8 +1193,9 @@ static int virNetClientIOEventLoop(virNetClientPtr client, /* If we have existing SASL decoded data, pretend * the socket became readable so we consume it */ - if (virNetSocketHasCachedData(client->sock)) + if (virNetSocketHasCachedData(client->sock)) { fds[0].revents |= POLLIN; + } if (fds[1].revents) { VIR_DEBUG("Woken up from poll by other thread"); @@ -1129,6 +1207,7 @@ static int virNetClientIOEventLoop(virNetClientPtr client, } if (ret < 0) { + /* XXX what's this dubious errno check doing ? */ if (errno == EWOULDBLOCK) continue; virReportSystemError(errno, @@ -1146,20 +1225,33 @@ static int virNetClientIOEventLoop(virNetClientPtr client, goto error; } - /* Iterate through waiting threads and if - * any are complete then tell 'em to wakeup + /* Iterate through waiting calls and if any are + * complete, remove them from the dispatch list.. */ virNetClientCallRemovePredicate(&client->waitDispatch, virNetClientIOEventLoopRemoveDone, thiscall); + /* Iterate through waiting calls and if any are + * non-blocking, remove them from the dispatch list... + */ + virNetClientCallRemovePredicate(&client->waitDispatch, + virNetClientIOEventLoopRemoveNonBlocking, + thiscall); + /* Now see if *we* are done */ if (thiscall->mode == VIR_NET_CLIENT_MODE_COMPLETE) { virNetClientCallRemove(&client->waitDispatch, thiscall); virNetClientIOEventLoopPassTheBuck(client, thiscall); - return 0; + return 2; } + /* We're not done, but we're non-blocking */ + if (thiscall->nonBlock) { + VIR_DEBUG("Giving up the buck %p %p", thiscall, client->waitDispatch); + virNetClientIOEventLoopPassTheBuck(client, thiscall); + return thiscall->sentSomeData ? 1 : 0; + } if (fds[0].revents & (POLLHUP | POLLERR)) { virNetError(VIR_ERR_INTERNAL_ERROR, "%s", @@ -1218,7 +1310,31 @@ static void virNetClientIOUpdateCallback(virNetClientPtr client, * a strategy in power politics when the actions of one country/ * nation are blamed on another, providing an opportunity for war." * - * NB(5) Don't Panic! + * NB(5) If the 'thiscall' has the 'nonBlock' flag set, the caller + * must *NOT* free it, if this returns '1' (ie partial send). + * + * NB(6) The following input states are valid if *no* threads + * are currently executing this method + * + * - waitDispatch == NULL, + * - waitDispatch != NULL, waitDispatch.nonBlock == true + * + * The following input states are valid, if n threads are currently + * executing + * + * - waitDispatch != NULL + * - 0 or 1 waitDispatch.nonBlock == false, without any threads + * - 0 or more waitDispatch.nonBlock == false, with threads + * + * The following output states are valid when all threads are done + * + * - waitDispatch == NULL, + * - waitDispatch != NULL, waitDispatch.nonBlock == true + * + * NB(7) Don't Panic! + * + * Returns 2 if fully sent, 1 if partially sent (only for nonBlock==true), + * 0 if nothing sent (only for nonBlock==true) and -1 on error */ static int virNetClientIO(virNetClientPtr client, virNetClientCallPtr thiscall) @@ -1259,14 +1375,15 @@ static int virNetClientIO(virNetClientPtr client, } VIR_DEBUG("Wokeup from sleep %p %p", client->waitDispatch, thiscall); - /* Two reasons we can be woken up + /* Three reasons we can be woken up * 1. Other thread has got our reply ready for us * 2. Other thread is all done, and it is our turn to * be the dispatcher to finish waiting for * our reply + * 3. I/O was expected to block */ if (thiscall->mode == VIR_NET_CLIENT_MODE_COMPLETE) { - rv = 0; + rv = 2; /* * We avoided catching the buck and our reply is ready ! * We've already had 'thiscall' removed from the list @@ -1275,6 +1392,15 @@ static int virNetClientIO(virNetClientPtr client, goto cleanup; } + /* If we're non-blocking, get outta here */ + if (thiscall->nonBlock) { + if (thiscall->sentSomeData) + rv = 1; /* In progress */ + else + rv = 0; /* none at all */ + goto cleanup; + } + /* Grr, someone passed the buck onto us ... */ } @@ -1348,9 +1474,14 @@ done: } +/* + * Returns 2 if fully sent, 1 if partially sent (only for nonBlock==true), + * 0 if nothing sent (only for nonBlock==true) and -1 on error + */ static int virNetClientSendInternal(virNetClientPtr client, virNetMessagePtr msg, - bool expectReply) + bool expectReply, + bool nonBlock) { virNetClientCallPtr call; int ret = -1; @@ -1369,6 +1500,12 @@ static int virNetClientSendInternal(virNetClientPtr client, return -1; } + if (expectReply && nonBlock) { + virNetError(VIR_ERR_INTERNAL_ERROR, "%s", + _("Attempt to send an non-blocking message with a synchronous reply")); + return -1; + } + if (VIR_ALLOC(call) < 0) { virReportOOMError(); return -1; @@ -1389,16 +1526,24 @@ static int virNetClientSendInternal(virNetClientPtr client, call->mode = VIR_NET_CLIENT_MODE_WAIT_RX; call->msg = msg; call->expectReply = expectReply; + call->nonBlock = nonBlock; + call->haveThread = true; ret = virNetClientIO(client, call); cleanup: - ignore_value(virCondDestroy(&call->cond)); - VIR_FREE(call); + /* If partially sent, then the call is still on the dispatch queue */ + if (ret == 1) { + call->haveThread = false; + } else { + ignore_value(virCondDestroy(&call->cond)); + VIR_FREE(call); + } virNetClientUnlock(client); return ret; } + /* * @msg: a message allocated on heap or stack * @@ -1412,7 +1557,7 @@ cleanup: int virNetClientSendWithReply(virNetClientPtr client, virNetMessagePtr msg) { - int ret = virNetClientSendInternal(client, msg, true); + int ret = virNetClientSendInternal(client, msg, true, false); if (ret < 0) return -1; return 0; @@ -1432,8 +1577,24 @@ int virNetClientSendWithReply(virNetClientPtr client, int virNetClientSendNoReply(virNetClientPtr client, virNetMessagePtr msg) { - int ret = virNetClientSendInternal(client, msg, false); + int ret = virNetClientSendInternal(client, msg, false, false); if (ret < 0) return -1; return 0; } + +/* + * @msg: a message allocated on the heap. + * + * Send a message asynchronously, without any reply + * + * The caller is responsible for free'ing @msg, *except* if + * this method returns -1. + * + * Returns 2 on full send, 1 on partial send, 0 on no send, -1 on error + */ +int virNetClientSendNonBlock(virNetClientPtr client, + virNetMessagePtr msg) +{ + return virNetClientSendInternal(client, msg, false, true); +} diff --git a/src/rpc/virnetclient.h b/src/rpc/virnetclient.h index eef3eb3..71db543 100644 --- a/src/rpc/virnetclient.h +++ b/src/rpc/virnetclient.h @@ -73,6 +73,10 @@ int virNetClientSendWithReply(virNetClientPtr client, int virNetClientSendNoReply(virNetClientPtr client, virNetMessagePtr msg); +int virNetClientSendNonBlock(virNetClientPtr client, + virNetMessagePtr msg); + + # ifdef HAVE_SASL void virNetClientSetSASLSession(virNetClientPtr client, virNetSASLSessionPtr sasl); diff --git a/src/rpc/virnetsocket.c b/src/rpc/virnetsocket.c index 30b8fe6..2449353 100644 --- a/src/rpc/virnetsocket.c +++ b/src/rpc/virnetsocket.c @@ -931,6 +931,19 @@ bool virNetSocketHasCachedData(virNetSocketPtr sock ATTRIBUTE_UNUSED) } +bool virNetSocketHasPendingData(virNetSocketPtr sock ATTRIBUTE_UNUSED) +{ + bool hasPending = false; + virMutexLock(&sock->lock); +#if HAVE_SASL + if (sock->saslEncoded) + hasPending = true; +#endif + virMutexUnlock(&sock->lock); + return hasPending; +} + + static ssize_t virNetSocketReadWire(virNetSocketPtr sock, char *buf, size_t len) { char *errout = NULL; diff --git a/src/rpc/virnetsocket.h b/src/rpc/virnetsocket.h index e444aef..508dd47 100644 --- a/src/rpc/virnetsocket.h +++ b/src/rpc/virnetsocket.h @@ -106,6 +106,7 @@ void virNetSocketSetSASLSession(virNetSocketPtr sock, virNetSASLSessionPtr sess); # endif bool virNetSocketHasCachedData(virNetSocketPtr sock); +bool virNetSocketHasPendingData(virNetSocketPtr sock); void virNetSocketRef(virNetSocketPtr sock); void virNetSocketFree(virNetSocketPtr sock); -- 1.7.6.4

From: "Daniel P. Berrange" <berrange@redhat.com> Split the existing virNetClientSend into two parts virNetClientSend and virNetClientSendNoReply, instead of having a 'bool expectReply' parameter. Add a new virNetClientSendNonBlock which returns 2 on full send, 1 on partial send, 0 on no send, -1 on error If a partial send occurs, then a subsequent call to any of the virNetClientSend* APIs will finish any outstanding I/O. TODO: the virNetClientEvent event handler could be used to speed up completion of partial sends if an event loop is present. In v2: - Fix logic in virNetClientIOEventLoopRemoveNonBlocking * src/rpc/virnetclientprogram.c, src/rpc/virnetclientstream.c: Update for changed API --- src/rpc/virnetclient.c | 200 +++++++++++++++++++++++++++++++++++++++++++----- src/rpc/virnetclient.h | 4 + src/rpc/virnetsocket.c | 13 +++ src/rpc/virnetsocket.h | 1 + 4 files changed, 200 insertions(+), 18 deletions(-) diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c index 96d1886..17105fe 100644 --- a/src/rpc/virnetclient.c +++ b/src/rpc/virnetclient.c @@ -55,6 +55,9 @@ struct _virNetClientCall { virNetMessagePtr msg; bool expectReply; + bool nonBlock; + bool haveThread; + bool sentSomeData; virCond cond; @@ -86,7 +89,12 @@ struct _virNetClient { int wakeupSendFD; int wakeupReadFD; - /* List of threads currently waiting for dispatch */ + /* + * List of calls currently waiting for dispatch + * The calls should all have threads waiting for + * them, except possibly the first call in the list + * which might be a partially sent non-blocking call. + */ virNetClientCallPtr waitDispatch; /* True if a thread holds the buck */ bool haveTheBuck; @@ -648,7 +656,7 @@ virNetClientCallDispatchReply(virNetClientPtr client) virNetClientCallPtr thecall; /* Ok, definitely got an RPC reply now find - out who's been waiting for it */ + out which waiting call is associated with it */ thecall = client->waitDispatch; while (thecall && !(thecall->msg->header.prog == client->msg.header.prog && @@ -824,6 +832,8 @@ virNetClientIOWriteMessage(virNetClientPtr client, ret = virNetSocketWrite(client->sock, thecall->msg->buffer + thecall->msg->bufferOffset, thecall->msg->bufferLength - thecall->msg->bufferOffset); + if (ret || virNetSocketHasPendingData(client->sock)) + thecall->sentSomeData = true; if (ret <= 0) return ret; @@ -1015,17 +1025,69 @@ static bool virNetClientIOEventLoopRemoveDone(virNetClientCallPtr call, return false; /* - * ...they won't actually wakeup until + * ...if the call being removed from the list + * still has a thread, then wake that thread up, + * otherwise free the call. The latter should + * only happen for calls without replies. + * + * ...the threads won't actually wakeup until * we release our mutex a short while * later... */ - VIR_DEBUG("Waking up sleeping call %p", call); - virCondSignal(&call->cond); + if (call->haveThread) { + VIR_DEBUG("Waking up sleep %p", call); + virCondSignal(&call->cond); + } else { + if (call->expectReply) + VIR_WARN("Got a call expecting a reply but without a waiting thread"); + ignore_value(virCondDestroy(&call->cond)); + VIR_FREE(call); + } return true; } +static bool virNetClientIOEventLoopRemoveNonBlocking(virNetClientCallPtr call, + void *opaque) +{ + virNetClientCallPtr thiscall = opaque; + + if (call == thiscall) + return false; + + if (!call->nonBlock) + return false; + + if (call->sentSomeData) { + /* + * If some data has been sent we must keep it in the list, + * but still wakeup any thread + */ + if (call->haveThread) { + VIR_DEBUG("Waking up sleep %p", call); + virCondSignal(&call->cond); + } + return false; + } else { + /* + * If no data has been sent, we can remove it from the list. + * Wakup any thread, otherwise free the caller ourselves + */ + if (call->haveThread) { + VIR_DEBUG("Waking up sleep %p", call); + virCondSignal(&call->cond); + } else { + if (call->expectReply) + VIR_WARN("Got a call expecting a reply but without a waiting thread"); + ignore_value(virCondDestroy(&call->cond)); + VIR_FREE(call); + } + return true; + } +} + + static void virNetClientIOEventLoopPassTheBuck(virNetClientPtr client, virNetClientCallPtr thiscall) { VIR_DEBUG("Giving up the buck %p", thiscall); @@ -1033,19 +1095,29 @@ static void virNetClientIOEventLoopPassTheBuck(virNetClientPtr client, virNetCli /* See if someone else is still waiting * and if so, then pass the buck ! */ while (tmp) { - if (tmp != thiscall) { + if (tmp != thiscall && tmp->haveThread) { VIR_DEBUG("Passing the buck to %p", tmp); virCondSignal(&tmp->cond); break; } tmp = tmp->next; } + VIR_DEBUG("No thread to pass the buck to"); +} + + +static bool virNetClientIOEventLoopWantNonBlock(virNetClientCallPtr call, void *opaque ATTRIBUTE_UNUSED) +{ + return call->nonBlock; } /* * Process all calls pending dispatch/receive until we * get a reply to our own call. Then quit and pass the buck * to someone else. + * + * Returns 2 if fully sent, 1 if partially sent (only for nonBlock==true), + * 0 if nothing sent (only for nonBlock==true) and -1 on error */ static int virNetClientIOEventLoop(virNetClientPtr client, virNetClientCallPtr thiscall) @@ -1068,6 +1140,14 @@ static int virNetClientIOEventLoop(virNetClientPtr client, if (virNetSocketHasCachedData(client->sock)) timeout = 0; + /* If there are any non-blocking calls in the queue, + * then we don't want to sleep in poll() + */ + if (virNetClientCallMatchPredicate(client->waitDispatch, + virNetClientIOEventLoopWantNonBlock, + NULL)) + timeout = 0; + fds[0].events = fds[0].revents = 0; fds[1].events = fds[1].revents = 0; @@ -1116,8 +1196,9 @@ static int virNetClientIOEventLoop(virNetClientPtr client, /* If we have existing SASL decoded data, pretend * the socket became readable so we consume it */ - if (virNetSocketHasCachedData(client->sock)) + if (virNetSocketHasCachedData(client->sock)) { fds[0].revents |= POLLIN; + } if (fds[1].revents) { VIR_DEBUG("Woken up from poll by other thread"); @@ -1129,6 +1210,7 @@ static int virNetClientIOEventLoop(virNetClientPtr client, } if (ret < 0) { + /* XXX what's this dubious errno check doing ? */ if (errno == EWOULDBLOCK) continue; virReportSystemError(errno, @@ -1146,20 +1228,33 @@ static int virNetClientIOEventLoop(virNetClientPtr client, goto error; } - /* Iterate through waiting threads and if - * any are complete then tell 'em to wakeup + /* Iterate through waiting calls and if any are + * complete, remove them from the dispatch list.. */ virNetClientCallRemovePredicate(&client->waitDispatch, virNetClientIOEventLoopRemoveDone, thiscall); + /* Iterate through waiting calls and if any are + * non-blocking, remove them from the dispatch list... + */ + virNetClientCallRemovePredicate(&client->waitDispatch, + virNetClientIOEventLoopRemoveNonBlocking, + thiscall); + /* Now see if *we* are done */ if (thiscall->mode == VIR_NET_CLIENT_MODE_COMPLETE) { virNetClientCallRemove(&client->waitDispatch, thiscall); virNetClientIOEventLoopPassTheBuck(client, thiscall); - return 0; + return 2; } + /* We're not done, but we're non-blocking */ + if (thiscall->nonBlock) { + VIR_DEBUG("Giving up the buck %p %p", thiscall, client->waitDispatch); + virNetClientIOEventLoopPassTheBuck(client, thiscall); + return thiscall->sentSomeData ? 1 : 0; + } if (fds[0].revents & (POLLHUP | POLLERR)) { virNetError(VIR_ERR_INTERNAL_ERROR, "%s", @@ -1218,7 +1313,31 @@ static void virNetClientIOUpdateCallback(virNetClientPtr client, * a strategy in power politics when the actions of one country/ * nation are blamed on another, providing an opportunity for war." * - * NB(5) Don't Panic! + * NB(5) If the 'thiscall' has the 'nonBlock' flag set, the caller + * must *NOT* free it, if this returns '1' (ie partial send). + * + * NB(6) The following input states are valid if *no* threads + * are currently executing this method + * + * - waitDispatch == NULL, + * - waitDispatch != NULL, waitDispatch.nonBlock == true + * + * The following input states are valid, if n threads are currently + * executing + * + * - waitDispatch != NULL + * - 0 or 1 waitDispatch.nonBlock == false, without any threads + * - 0 or more waitDispatch.nonBlock == false, with threads + * + * The following output states are valid when all threads are done + * + * - waitDispatch == NULL, + * - waitDispatch != NULL, waitDispatch.nonBlock == true + * + * NB(7) Don't Panic! + * + * Returns 2 if fully sent, 1 if partially sent (only for nonBlock==true), + * 0 if nothing sent (only for nonBlock==true) and -1 on error */ static int virNetClientIO(virNetClientPtr client, virNetClientCallPtr thiscall) @@ -1259,14 +1378,15 @@ static int virNetClientIO(virNetClientPtr client, } VIR_DEBUG("Wokeup from sleep %p %p", client->waitDispatch, thiscall); - /* Two reasons we can be woken up + /* Three reasons we can be woken up * 1. Other thread has got our reply ready for us * 2. Other thread is all done, and it is our turn to * be the dispatcher to finish waiting for * our reply + * 3. I/O was expected to block */ if (thiscall->mode == VIR_NET_CLIENT_MODE_COMPLETE) { - rv = 0; + rv = 2; /* * We avoided catching the buck and our reply is ready ! * We've already had 'thiscall' removed from the list @@ -1275,6 +1395,15 @@ static int virNetClientIO(virNetClientPtr client, goto cleanup; } + /* If we're non-blocking, get outta here */ + if (thiscall->nonBlock) { + if (thiscall->sentSomeData) + rv = 1; /* In progress */ + else + rv = 0; /* none at all */ + goto cleanup; + } + /* Grr, someone passed the buck onto us ... */ } @@ -1348,9 +1477,14 @@ done: } +/* + * Returns 2 if fully sent, 1 if partially sent (only for nonBlock==true), + * 0 if nothing sent (only for nonBlock==true) and -1 on error + */ static int virNetClientSendInternal(virNetClientPtr client, virNetMessagePtr msg, - bool expectReply) + bool expectReply, + bool nonBlock) { virNetClientCallPtr call; int ret = -1; @@ -1369,6 +1503,12 @@ static int virNetClientSendInternal(virNetClientPtr client, return -1; } + if (expectReply && nonBlock) { + virNetError(VIR_ERR_INTERNAL_ERROR, "%s", + _("Attempt to send an non-blocking message with a synchronous reply")); + return -1; + } + if (VIR_ALLOC(call) < 0) { virReportOOMError(); return -1; @@ -1389,16 +1529,24 @@ static int virNetClientSendInternal(virNetClientPtr client, call->mode = VIR_NET_CLIENT_MODE_WAIT_RX; call->msg = msg; call->expectReply = expectReply; + call->nonBlock = nonBlock; + call->haveThread = true; ret = virNetClientIO(client, call); cleanup: - ignore_value(virCondDestroy(&call->cond)); - VIR_FREE(call); + /* If partially sent, then the call is still on the dispatch queue */ + if (ret == 1) { + call->haveThread = false; + } else { + ignore_value(virCondDestroy(&call->cond)); + VIR_FREE(call); + } virNetClientUnlock(client); return ret; } + /* * @msg: a message allocated on heap or stack * @@ -1412,7 +1560,7 @@ cleanup: int virNetClientSendWithReply(virNetClientPtr client, virNetMessagePtr msg) { - int ret = virNetClientSendInternal(client, msg, true); + int ret = virNetClientSendInternal(client, msg, true, false); if (ret < 0) return -1; return 0; @@ -1432,8 +1580,24 @@ int virNetClientSendWithReply(virNetClientPtr client, int virNetClientSendNoReply(virNetClientPtr client, virNetMessagePtr msg) { - int ret = virNetClientSendInternal(client, msg, false); + int ret = virNetClientSendInternal(client, msg, false, false); if (ret < 0) return -1; return 0; } + +/* + * @msg: a message allocated on the heap. + * + * Send a message asynchronously, without any reply + * + * The caller is responsible for free'ing @msg, *except* if + * this method returns -1. + * + * Returns 2 on full send, 1 on partial send, 0 on no send, -1 on error + */ +int virNetClientSendNonBlock(virNetClientPtr client, + virNetMessagePtr msg) +{ + return virNetClientSendInternal(client, msg, false, true); +} diff --git a/src/rpc/virnetclient.h b/src/rpc/virnetclient.h index eef3eb3..71db543 100644 --- a/src/rpc/virnetclient.h +++ b/src/rpc/virnetclient.h @@ -73,6 +73,10 @@ int virNetClientSendWithReply(virNetClientPtr client, int virNetClientSendNoReply(virNetClientPtr client, virNetMessagePtr msg); +int virNetClientSendNonBlock(virNetClientPtr client, + virNetMessagePtr msg); + + # ifdef HAVE_SASL void virNetClientSetSASLSession(virNetClientPtr client, virNetSASLSessionPtr sasl); diff --git a/src/rpc/virnetsocket.c b/src/rpc/virnetsocket.c index 30b8fe6..2449353 100644 --- a/src/rpc/virnetsocket.c +++ b/src/rpc/virnetsocket.c @@ -931,6 +931,19 @@ bool virNetSocketHasCachedData(virNetSocketPtr sock ATTRIBUTE_UNUSED) } +bool virNetSocketHasPendingData(virNetSocketPtr sock ATTRIBUTE_UNUSED) +{ + bool hasPending = false; + virMutexLock(&sock->lock); +#if HAVE_SASL + if (sock->saslEncoded) + hasPending = true; +#endif + virMutexUnlock(&sock->lock); + return hasPending; +} + + static ssize_t virNetSocketReadWire(virNetSocketPtr sock, char *buf, size_t len) { char *errout = NULL; diff --git a/src/rpc/virnetsocket.h b/src/rpc/virnetsocket.h index e444aef..508dd47 100644 --- a/src/rpc/virnetsocket.h +++ b/src/rpc/virnetsocket.h @@ -106,6 +106,7 @@ void virNetSocketSetSASLSession(virNetSocketPtr sock, virNetSASLSessionPtr sess); # endif bool virNetSocketHasCachedData(virNetSocketPtr sock); +bool virNetSocketHasPendingData(virNetSocketPtr sock); void virNetSocketRef(virNetSocketPtr sock); void virNetSocketFree(virNetSocketPtr sock); -- 1.7.6.4

On Mon, Nov 14, 2011 at 12:03:52 +0000, Daniel P. Berrange wrote:
From: "Daniel P. Berrange" <berrange@redhat.com>
Split the existing virNetClientSend into two parts virNetClientSend and virNetClientSendNoReply, instead of having a 'bool expectReply' parameter.
This is done by a separate patch, so remove the paragraph.
Add a new virNetClientSendNonBlock which returns 2 on full send, 1 on partial send, 0 on no send, -1 on error
If a partial send occurs, then a subsequent call to any of the virNetClientSend* APIs will finish any outstanding I/O.
TODO: the virNetClientEvent event handler could be used to speed up completion of partial sends if an event loop is present.
In v2: - Fix logic in virNetClientIOEventLoopRemoveNonBlocking
* src/rpc/virnetclientprogram.c, src/rpc/virnetclientstream.c: Update for changed API
Neither of these is really changed by this patch.
--- src/rpc/virnetclient.c | 200 +++++++++++++++++++++++++++++++++++++++++++----- src/rpc/virnetclient.h | 4 + src/rpc/virnetsocket.c | 13 +++ src/rpc/virnetsocket.h | 1 + 4 files changed, 200 insertions(+), 18 deletions(-)
diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c index 96d1886..17105fe 100644 --- a/src/rpc/virnetclient.c +++ b/src/rpc/virnetclient.c @@ -55,6 +55,9 @@ struct _virNetClientCall {
virNetMessagePtr msg; bool expectReply; + bool nonBlock; + bool haveThread; + bool sentSomeData;
virCond cond;
@@ -86,7 +89,12 @@ struct _virNetClient { int wakeupSendFD; int wakeupReadFD;
- /* List of threads currently waiting for dispatch */ + /* + * List of calls currently waiting for dispatch + * The calls should all have threads waiting for + * them, except possibly the first call in the list + * which might be a partially sent non-blocking call. + */ virNetClientCallPtr waitDispatch; /* True if a thread holds the buck */ bool haveTheBuck; @@ -648,7 +656,7 @@ virNetClientCallDispatchReply(virNetClientPtr client) virNetClientCallPtr thecall;
/* Ok, definitely got an RPC reply now find - out who's been waiting for it */ + out which waiting call is associated with it */ thecall = client->waitDispatch; while (thecall && !(thecall->msg->header.prog == client->msg.header.prog && @@ -824,6 +832,8 @@ virNetClientIOWriteMessage(virNetClientPtr client, ret = virNetSocketWrite(client->sock, thecall->msg->buffer + thecall->msg->bufferOffset, thecall->msg->bufferLength - thecall->msg->bufferOffset); + if (ret || virNetSocketHasPendingData(client->sock)) + thecall->sentSomeData = true;
Should the above condition check for just ret > 0? It won't make any difference in practise since we are in error condition anyway but it seems more logical.
if (ret <= 0) return ret;
@@ -1015,17 +1025,69 @@ static bool virNetClientIOEventLoopRemoveDone(virNetClientCallPtr call, return false;
/* - * ...they won't actually wakeup until + * ...if the call being removed from the list + * still has a thread, then wake that thread up, + * otherwise free the call. The latter should + * only happen for calls without replies. + * + * ...the threads won't actually wakeup until * we release our mutex a short while * later... */ - VIR_DEBUG("Waking up sleeping call %p", call); - virCondSignal(&call->cond); + if (call->haveThread) { + VIR_DEBUG("Waking up sleep %p", call); + virCondSignal(&call->cond); + } else { + if (call->expectReply) + VIR_WARN("Got a call expecting a reply but without a waiting thread"); + ignore_value(virCondDestroy(&call->cond)); + VIR_FREE(call);
We should also free the associated message.
+ }
return true; }
+static bool virNetClientIOEventLoopRemoveNonBlocking(virNetClientCallPtr call, + void *opaque) +{ + virNetClientCallPtr thiscall = opaque; + + if (call == thiscall) + return false; + + if (!call->nonBlock) + return false; + + if (call->sentSomeData) { + /* + * If some data has been sent we must keep it in the list, + * but still wakeup any thread + */ + if (call->haveThread) { + VIR_DEBUG("Waking up sleep %p", call); + virCondSignal(&call->cond); + } + return false; + } else { + /* + * If no data has been sent, we can remove it from the list. + * Wakup any thread, otherwise free the caller ourselves + */ + if (call->haveThread) { + VIR_DEBUG("Waking up sleep %p", call); + virCondSignal(&call->cond); + } else { + if (call->expectReply) + VIR_WARN("Got a call expecting a reply but without a waiting thread"); + ignore_value(virCondDestroy(&call->cond)); + VIR_FREE(call);
We should also free the associated message.
+ } + return true; + } +} + + static void virNetClientIOEventLoopPassTheBuck(virNetClientPtr client, virNetClientCallPtr thiscall) { VIR_DEBUG("Giving up the buck %p", thiscall); @@ -1033,19 +1095,29 @@ static void virNetClientIOEventLoopPassTheBuck(virNetClientPtr client, virNetCli /* See if someone else is still waiting * and if so, then pass the buck ! */ while (tmp) { - if (tmp != thiscall) { + if (tmp != thiscall && tmp->haveThread) { VIR_DEBUG("Passing the buck to %p", tmp); virCondSignal(&tmp->cond); break; } tmp = tmp->next; } + VIR_DEBUG("No thread to pass the buck to"); +} + + +static bool virNetClientIOEventLoopWantNonBlock(virNetClientCallPtr call, void *opaque ATTRIBUTE_UNUSED) +{ + return call->nonBlock; }
/* * Process all calls pending dispatch/receive until we * get a reply to our own call. Then quit and pass the buck * to someone else. + * + * Returns 2 if fully sent, 1 if partially sent (only for nonBlock==true), + * 0 if nothing sent (only for nonBlock==true) and -1 on error */ static int virNetClientIOEventLoop(virNetClientPtr client, virNetClientCallPtr thiscall) @@ -1068,6 +1140,14 @@ static int virNetClientIOEventLoop(virNetClientPtr client, if (virNetSocketHasCachedData(client->sock)) timeout = 0;
+ /* If there are any non-blocking calls in the queue, + * then we don't want to sleep in poll() + */ + if (virNetClientCallMatchPredicate(client->waitDispatch, + virNetClientIOEventLoopWantNonBlock, + NULL)) + timeout = 0; + fds[0].events = fds[0].revents = 0; fds[1].events = fds[1].revents = 0;
@@ -1116,8 +1196,9 @@ static int virNetClientIOEventLoop(virNetClientPtr client, /* If we have existing SASL decoded data, pretend * the socket became readable so we consume it */ - if (virNetSocketHasCachedData(client->sock)) + if (virNetSocketHasCachedData(client->sock)) { fds[0].revents |= POLLIN; + }
if (fds[1].revents) { VIR_DEBUG("Woken up from poll by other thread"); @@ -1129,6 +1210,7 @@ static int virNetClientIOEventLoop(virNetClientPtr client, }
if (ret < 0) { + /* XXX what's this dubious errno check doing ? */ if (errno == EWOULDBLOCK) continue;
Yeah, I don't understand this either. According to poll(2,3p) man pages, poll doesn't seem to ever set EWOULDBLOCK to errno.
virReportSystemError(errno, @@ -1146,20 +1228,33 @@ static int virNetClientIOEventLoop(virNetClientPtr client, goto error; }
- /* Iterate through waiting threads and if - * any are complete then tell 'em to wakeup + /* Iterate through waiting calls and if any are + * complete, remove them from the dispatch list.. */ virNetClientCallRemovePredicate(&client->waitDispatch, virNetClientIOEventLoopRemoveDone, thiscall);
+ /* Iterate through waiting calls and if any are + * non-blocking, remove them from the dispatch list... + */ + virNetClientCallRemovePredicate(&client->waitDispatch, + virNetClientIOEventLoopRemoveNonBlocking, + thiscall); + /* Now see if *we* are done */ if (thiscall->mode == VIR_NET_CLIENT_MODE_COMPLETE) { virNetClientCallRemove(&client->waitDispatch, thiscall); virNetClientIOEventLoopPassTheBuck(client, thiscall); - return 0; + return 2; }
+ /* We're not done, but we're non-blocking */ + if (thiscall->nonBlock) { + VIR_DEBUG("Giving up the buck %p %p", thiscall, client->waitDispatch);
This debug line is redundant since virNetClientIOEventLoopPassTheBuck will print similar message itself.
+ virNetClientIOEventLoopPassTheBuck(client, thiscall); + return thiscall->sentSomeData ? 1 : 0; + }
if (fds[0].revents & (POLLHUP | POLLERR)) { virNetError(VIR_ERR_INTERNAL_ERROR, "%s", ...
So to sum this up, when a non blocking message is being sent while another thread has the buck, we put the message in the queue, wake up the dispatching thread, which will try to send all it can without blocking and signal us back. We than check whether the message was sent completely, partially or not at all. In other words, we do the same as if no thread had the buck but instead of doing it in our thread, we offload it to the dispatching thread. In case my analysis is right, ACK with the nits fixed. Jirka
participants (2)
-
Daniel P. Berrange
-
Jiri Denemark