[libvirt] [PATCH 00/12] rpc: Fix several issues with keepalive messages

So far, we were dropping non-blocking calls whenever sending them would block. In case a client is sending lots of stream calls (which are not supposed to generate any reply), the assumption that having other calls in a queue is sufficient to get a reply from the server doesn't work. I tried to fix this in b1e374a7ac56927cfe62435179bf0bba1e08b372 but failed and reverted that commit. While working on the proper fix, I discovered several other issues we had in handling keepalive messages in client RPC code. See individual patches for more details. As a nice bonus, the fixed version is shorter by one line than the current broken version :-) Jiri Denemark (12): client rpc: Improve debug messages in virNetClientIO client rpc: Use event loop for writing client rpc: Don't drop non-blocking calls client rpc: Just queue non-blocking call if another thread has the buck client rpc: Drop unused return value of virNetClientSendNonBlock rpc: Refactor keepalive timer code rpc: Add APIs for direct triggering of keepalive timer client rpc: Separate call creation from running IO loop rpc: Do not use timer for sending keepalive responses rpc: Remove unused parameter in virKeepAliveStopInternal server rpc: Remove APIs for manipulating filters on locked client client rpc: Send keepalive requests from IO event loop src/libvirt_probes.d | 2 +- src/rpc/virkeepalive.c | 233 ++++++++++++-------------- src/rpc/virkeepalive.h | 7 +- src/rpc/virnetclient.c | 368 +++++++++++++++++++++++------------------- src/rpc/virnetserverclient.c | 127 +++++++-------- 5 files changed, 368 insertions(+), 369 deletions(-) -- 1.7.10.2

When analyzing our debug log, I'm always confused about what each of the pointers mean. Let's be explicit. --- src/rpc/virnetclient.c | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c index 14f806f..c62e045 100644 --- a/src/rpc/virnetclient.c +++ b/src/rpc/virnetclient.c @@ -1567,7 +1567,8 @@ static int virNetClientIO(virNetClientPtr client, return -1; } - VIR_DEBUG("Going to sleep %p %p", client->waitDispatch, thiscall); + VIR_DEBUG("Going to sleep head=%p call=%p", + client->waitDispatch, thiscall); /* Go to sleep while other thread is working... */ if (virCondWait(&thiscall->cond, &client->lock) < 0) { virNetClientCallRemove(&client->waitDispatch, thiscall); @@ -1576,7 +1577,8 @@ static int virNetClientIO(virNetClientPtr client, return -1; } - VIR_DEBUG("Wokeup from sleep %p %p", client->waitDispatch, thiscall); + VIR_DEBUG("Woken up from sleep head=%p call=%p", + client->waitDispatch, thiscall); /* Three reasons we can be woken up * 1. Other thread has got our reply ready for us * 2. Other thread is all done, and it is our turn to @@ -1608,7 +1610,8 @@ static int virNetClientIO(virNetClientPtr client, client->haveTheBuck = true; } - VIR_DEBUG("We have the buck %p %p", client->waitDispatch, thiscall); + VIR_DEBUG("We have the buck head=%p call=%p", + client->waitDispatch, thiscall); /* * The buck stops here! @@ -1637,7 +1640,8 @@ static int virNetClientIO(virNetClientPtr client, rv = -1; cleanup: - VIR_DEBUG("All done with our call %p %p %d", client->waitDispatch, thiscall, rv); + VIR_DEBUG("All done with our call head=%p call=%p rv=%d", + client->waitDispatch, thiscall, rv); return rv; } -- 1.7.10.2

On 06/12/2012 05:29 PM, Jiri Denemark wrote:
When analyzing our debug log, I'm always confused about what each of the pointers mean. Let's be explicit. --- src/rpc/virnetclient.c | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-)
ACK. No semantic change, and better debug can't hurt. -- Eric Blake eblake@redhat.com +1-919-301-3266 Libvirt virtualization library http://libvirt.org

On Wed, Jun 13, 2012 at 01:29:19AM +0200, Jiri Denemark wrote:
When analyzing our debug log, I'm always confused about what each of the pointers mean. Let's be explicit. --- src/rpc/virnetclient.c | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-)
ACK Daniel -- |: http://berrange.com -o- http://www.flickr.com/photos/dberrange/ :| |: http://libvirt.org -o- http://virt-manager.org :| |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :| |: http://entangle-photo.org -o- http://live.gnome.org/gtk-vnc :|

Normally, when every call has a thread associated with it, the thread may get the buck and be in charge of sending all calls until its own call is done. When we introduced non-blocking calls, we had to add special handling of new non-blocking calls. This patch uses event loop to send data if there is no thread to get the buck so that any non-blocking calls left in the queue are properly sent without having to handle them specially. It also avoids adding even more cruft to client IO loop in the following patches. With this change in, non-blocking calls may see unpredictable delays in delivery when the client has no event loop registered. However, the only non-blocking calls we have are keepalives and we already require event loop for them. --- src/rpc/virnetclient.c | 35 +++++++++++++++++++++++++++++++---- 1 file changed, 31 insertions(+), 4 deletions(-) diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c index c62e045..3e661d2 100644 --- a/src/rpc/virnetclient.c +++ b/src/rpc/virnetclient.c @@ -1470,12 +1470,30 @@ error: } +static bool +virNetClientIOUpdateEvents(virNetClientCallPtr call, + void *opaque) +{ + int *events = opaque; + + if (call->mode == VIR_NET_CLIENT_MODE_WAIT_TX) + *events |= VIR_EVENT_HANDLE_WRITABLE; + + return false; +} + + static void virNetClientIOUpdateCallback(virNetClientPtr client, bool enableCallback) { int events = 0; - if (enableCallback) + + if (enableCallback) { events |= VIR_EVENT_HANDLE_READABLE; + virNetClientCallMatchPredicate(client->waitDispatch, + virNetClientIOUpdateEvents, + &events); + } virNetSocketUpdateIOCallback(client->sock, events); } @@ -1670,11 +1688,20 @@ void virNetClientIncomingEvent(virNetSocketPtr sock, goto done; } - if (virNetClientIOHandleInput(client) < 0) { - VIR_WARN("Something went wrong during async message processing"); - virNetSocketRemoveIOCallback(sock); + if (events & VIR_EVENT_HANDLE_WRITABLE) { + if (virNetClientIOHandleOutput(client) < 0) + virNetSocketRemoveIOCallback(sock); + } + + if (events & VIR_EVENT_HANDLE_READABLE) { + if (virNetClientIOHandleInput(client) < 0) + virNetSocketRemoveIOCallback(sock); } + /* Remove completed calls or signal their threads. */ + virNetClientCallRemovePredicate(&client->waitDispatch, + virNetClientIOEventLoopRemoveDone, + NULL); done: virNetClientUnlock(client); } -- 1.7.10.2

On Wed, Jun 13, 2012 at 01:29:20AM +0200, Jiri Denemark wrote:
Normally, when every call has a thread associated with it, the thread may get the buck and be in charge of sending all calls until its own call is done. When we introduced non-blocking calls, we had to add special handling of new non-blocking calls. This patch uses event loop to send data if there is no thread to get the buck so that any non-blocking calls left in the queue are properly sent without having to handle them specially. It also avoids adding even more cruft to client IO loop in the following patches.
With this change in, non-blocking calls may see unpredictable delays in delivery when the client has no event loop registered. However, the only non-blocking calls we have are keepalives and we already require event loop for them.
Is that 'see unpredictable delays' part really correct. AFAIK, there should be a pretty well defined "delay" - it'll be processed on the very next iteration of the event - assuming the socket is writable. I don't really thing this is a delay at all in fact.
--- src/rpc/virnetclient.c | 35 +++++++++++++++++++++++++++++++---- 1 file changed, 31 insertions(+), 4 deletions(-)
diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c index c62e045..3e661d2 100644 --- a/src/rpc/virnetclient.c +++ b/src/rpc/virnetclient.c @@ -1470,12 +1470,30 @@ error: }
+static bool +virNetClientIOUpdateEvents(virNetClientCallPtr call, + void *opaque) +{ + int *events = opaque; + + if (call->mode == VIR_NET_CLIENT_MODE_WAIT_TX) + *events |= VIR_EVENT_HANDLE_WRITABLE; + + return false; +} + + static void virNetClientIOUpdateCallback(virNetClientPtr client, bool enableCallback) { int events = 0; - if (enableCallback) + + if (enableCallback) { events |= VIR_EVENT_HANDLE_READABLE; + virNetClientCallMatchPredicate(client->waitDispatch, + virNetClientIOUpdateEvents, + &events); + }
virNetSocketUpdateIOCallback(client->sock, events); } @@ -1670,11 +1688,20 @@ void virNetClientIncomingEvent(virNetSocketPtr sock, goto done; }
- if (virNetClientIOHandleInput(client) < 0) { - VIR_WARN("Something went wrong during async message processing"); - virNetSocketRemoveIOCallback(sock); + if (events & VIR_EVENT_HANDLE_WRITABLE) { + if (virNetClientIOHandleOutput(client) < 0) + virNetSocketRemoveIOCallback(sock); + } + + if (events & VIR_EVENT_HANDLE_READABLE) { + if (virNetClientIOHandleInput(client) < 0) + virNetSocketRemoveIOCallback(sock); }
+ /* Remove completed calls or signal their threads. */ + virNetClientCallRemovePredicate(&client->waitDispatch, + virNetClientIOEventLoopRemoveDone, + NULL); done: virNetClientUnlock(client); }
ACK, I always thought we should be doing this :-) Daniel -- |: http://berrange.com -o- http://www.flickr.com/photos/dberrange/ :| |: http://libvirt.org -o- http://virt-manager.org :| |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :| |: http://entangle-photo.org -o- http://live.gnome.org/gtk-vnc :|

On Wed, Jun 13, 2012 at 09:30:30 +0100, Daniel P. Berrange wrote:
On Wed, Jun 13, 2012 at 01:29:20AM +0200, Jiri Denemark wrote:
Normally, when every call has a thread associated with it, the thread may get the buck and be in charge of sending all calls until its own call is done. When we introduced non-blocking calls, we had to add special handling of new non-blocking calls. This patch uses event loop to send data if there is no thread to get the buck so that any non-blocking calls left in the queue are properly sent without having to handle them specially. It also avoids adding even more cruft to client IO loop in the following patches.
With this change in, non-blocking calls may see unpredictable delays in delivery when the client has no event loop registered. However, the only non-blocking calls we have are keepalives and we already require event loop for them.
Is that 'see unpredictable delays' part really correct. AFAIK, there should be a pretty well defined "delay" - it'll be processed on the very next iteration of the event - assuming the socket is writable. I don't really thing this is a delay at all in fact.
OK, it's unpredictable but in the case of keepalive calls the delay is at most keepalive interval. The call may be processed earlier if a libvirt API is called in the meantime. I'll reword it a bit. Jirka

On Wed, Jun 13, 2012 at 10:40:48AM +0200, Jiri Denemark wrote:
On Wed, Jun 13, 2012 at 09:30:30 +0100, Daniel P. Berrange wrote:
On Wed, Jun 13, 2012 at 01:29:20AM +0200, Jiri Denemark wrote:
Normally, when every call has a thread associated with it, the thread may get the buck and be in charge of sending all calls until its own call is done. When we introduced non-blocking calls, we had to add special handling of new non-blocking calls. This patch uses event loop to send data if there is no thread to get the buck so that any non-blocking calls left in the queue are properly sent without having to handle them specially. It also avoids adding even more cruft to client IO loop in the following patches.
With this change in, non-blocking calls may see unpredictable delays in delivery when the client has no event loop registered. However, the only non-blocking calls we have are keepalives and we already require event loop for them.
Is that 'see unpredictable delays' part really correct. AFAIK, there should be a pretty well defined "delay" - it'll be processed on the very next iteration of the event - assuming the socket is writable. I don't really thing this is a delay at all in fact.
OK, it's unpredictable but in the case of keepalive calls the delay is at most keepalive interval. The call may be processed earlier if a libvirt API is called in the meantime. I'll reword it a bit.
Doesn't the fact that we use the event loop for writing mean that we are not needing to wait for a libvirt API to be called any more ? Daniel -- |: http://berrange.com -o- http://www.flickr.com/photos/dberrange/ :| |: http://libvirt.org -o- http://virt-manager.org :| |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :| |: http://entangle-photo.org -o- http://live.gnome.org/gtk-vnc :|

On Wed, Jun 13, 2012 at 09:43:54 +0100, Daniel P. Berrange wrote:
On Wed, Jun 13, 2012 at 10:40:48AM +0200, Jiri Denemark wrote:
On Wed, Jun 13, 2012 at 09:30:30 +0100, Daniel P. Berrange wrote:
On Wed, Jun 13, 2012 at 01:29:20AM +0200, Jiri Denemark wrote:
Normally, when every call has a thread associated with it, the thread may get the buck and be in charge of sending all calls until its own call is done. When we introduced non-blocking calls, we had to add special handling of new non-blocking calls. This patch uses event loop to send data if there is no thread to get the buck so that any non-blocking calls left in the queue are properly sent without having to handle them specially. It also avoids adding even more cruft to client IO loop in the following patches.
With this change in, non-blocking calls may see unpredictable delays in delivery when the client has no event loop registered. However, the only non-blocking calls we have are keepalives and we already require event loop for them.
Is that 'see unpredictable delays' part really correct. AFAIK, there should be a pretty well defined "delay" - it'll be processed on the very next iteration of the event - assuming the socket is writable. I don't really thing this is a delay at all in fact.
OK, it's unpredictable but in the case of keepalive calls the delay is at most keepalive interval. The call may be processed earlier if a libvirt API is called in the meantime. I'll reword it a bit.
Doesn't the fact that we use the event loop for writing mean that we are not needing to wait for a libvirt API to be called any more ?
Yes, but the last paragraph was specifically talking about the case when the client has no event loop registered, which we still need to support for backward compatibility. However, as already said, we require event loop for keepalive and keepalive calls are the only non-blocking calls we have. Thus it was just a note for the future if someone ever introduces another type of non-blocking calls. Jirka

On Wed, Jun 13, 2012 at 10:50:09AM +0200, Jiri Denemark wrote:
On Wed, Jun 13, 2012 at 09:43:54 +0100, Daniel P. Berrange wrote:
On Wed, Jun 13, 2012 at 10:40:48AM +0200, Jiri Denemark wrote:
On Wed, Jun 13, 2012 at 09:30:30 +0100, Daniel P. Berrange wrote:
On Wed, Jun 13, 2012 at 01:29:20AM +0200, Jiri Denemark wrote:
Normally, when every call has a thread associated with it, the thread may get the buck and be in charge of sending all calls until its own call is done. When we introduced non-blocking calls, we had to add special handling of new non-blocking calls. This patch uses event loop to send data if there is no thread to get the buck so that any non-blocking calls left in the queue are properly sent without having to handle them specially. It also avoids adding even more cruft to client IO loop in the following patches.
With this change in, non-blocking calls may see unpredictable delays in delivery when the client has no event loop registered. However, the only non-blocking calls we have are keepalives and we already require event loop for them.
Is that 'see unpredictable delays' part really correct. AFAIK, there should be a pretty well defined "delay" - it'll be processed on the very next iteration of the event - assuming the socket is writable. I don't really thing this is a delay at all in fact.
OK, it's unpredictable but in the case of keepalive calls the delay is at most keepalive interval. The call may be processed earlier if a libvirt API is called in the meantime. I'll reword it a bit.
Doesn't the fact that we use the event loop for writing mean that we are not needing to wait for a libvirt API to be called any more ?
Yes, but the last paragraph was specifically talking about the case when the client has no event loop registered, which we still need to support for backward compatibility. However, as already said, we require event loop for keepalive and keepalive calls are the only non-blocking calls we have. Thus it was just a note for the future if someone ever introduces another type of non-blocking calls.
Ahhhh, I see. It wasn't clear that you were refering to the no-eventloop case. Daniel -- |: http://berrange.com -o- http://www.flickr.com/photos/dberrange/ :| |: http://libvirt.org -o- http://virt-manager.org :| |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :| |: http://entangle-photo.org -o- http://live.gnome.org/gtk-vnc :|

So far, we were dropping non-blocking calls whenever sending them would block. In case a client is sending lots of stream calls (which are not supposed to generate any reply), the assumption that having other calls in a queue is sufficient to get a reply from the server doesn't work. I tried to fix this in b1e374a7ac56927cfe62435179bf0bba1e08b372 but failed and reverted that commit. With this patch, non-blocking calls are never dropped (unless the connection is being closed) and will always be sent. --- src/rpc/virnetclient.c | 164 +++++++++++++++++++++--------------------------- 1 file changed, 71 insertions(+), 93 deletions(-) diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c index 3e661d2..614b469 100644 --- a/src/rpc/virnetclient.c +++ b/src/rpc/virnetclient.c @@ -58,7 +58,6 @@ struct _virNetClientCall { bool expectReply; bool nonBlock; bool haveThread; - bool sentSomeData; virCond cond; @@ -108,6 +107,10 @@ struct _virNetClient { }; +static void virNetClientIOEventLoopPassTheBuck(virNetClientPtr client, + virNetClientCallPtr thiscall); + + static void virNetClientLock(virNetClientPtr client) { virMutexLock(&client->lock); @@ -525,19 +528,21 @@ void virNetClientClose(virNetClientPtr client) virNetClientLock(client); - /* If there is a thread polling for data on the socket, set wantClose flag - * and wake the thread up or just immediately close the socket when no-one - * is polling on it. + client->wantClose = true; + + /* If there is a thread polling for data on the socket, wake the thread up + * otherwise try to pass the buck to a possibly waiting thread. If no + * thread is waiting, virNetClientIOEventLoopPassTheBuck will clean the + * queue and close the client because we set client->wantClose. */ - if (client->waitDispatch) { + if (client->haveTheBuck) { char ignore = 1; size_t len = sizeof(ignore); - client->wantClose = true; if (safewrite(client->wakeupSendFD, &ignore, len) != len) VIR_ERROR(_("failed to wake up polling thread")); } else { - virNetClientCloseLocked(client); + virNetClientIOEventLoopPassTheBuck(client, NULL); } virNetClientUnlock(client); @@ -972,8 +977,6 @@ virNetClientIOWriteMessage(virNetClientPtr client, ret = virNetSocketWrite(client->sock, thecall->msg->buffer + thecall->msg->bufferOffset, thecall->msg->bufferLength - thecall->msg->bufferOffset); - if (ret > 0 || virNetSocketHasPendingData(client->sock)) - thecall->sentSomeData = true; if (ret <= 0) return ret; @@ -1197,71 +1200,43 @@ static bool virNetClientIOEventLoopRemoveDone(virNetClientCallPtr call, } -static bool virNetClientIOEventLoopRemoveNonBlocking(virNetClientCallPtr call, - void *opaque) +static bool +virNetClientIOEventLoopDetachNonBlocking(virNetClientCallPtr call, + void *opaque) { virNetClientCallPtr thiscall = opaque; - if (call == thiscall) - return false; - - if (!call->nonBlock) - return false; - - if (call->sentSomeData) { - /* - * If some data has been sent we must keep it in the list, - * but still wakeup any thread - */ - if (call->haveThread) { - VIR_DEBUG("Waking up sleep %p", call); - virCondSignal(&call->cond); - } else { - VIR_DEBUG("Keeping unfinished call %p in the list", call); - } - return false; - } else { - /* - * If no data has been sent, we can remove it from the list. - * Wakup any thread, otherwise free the caller ourselves - */ - if (call->haveThread) { - VIR_DEBUG("Waking up sleep %p", call); - virCondSignal(&call->cond); - } else { - VIR_DEBUG("Removing call %p", call); - if (call->expectReply) - VIR_WARN("Got a call expecting a reply but without a waiting thread"); - ignore_value(virCondDestroy(&call->cond)); - VIR_FREE(call->msg); - VIR_FREE(call); - } + if (call != thiscall && call->nonBlock && call->haveThread) { + VIR_DEBUG("Waking up sleep %p", call); + call->haveThread = false; + virCondSignal(&call->cond); return true; } + + return false; } -static void -virNetClientIOEventLoopRemoveAll(virNetClientPtr client, - virNetClientCallPtr thiscall) +static bool +virNetClientIOEventLoopRemoveAll(virNetClientCallPtr call, + void *opaque) { - if (!client->waitDispatch) - return; + virNetClientCallPtr thiscall = opaque; - if (client->waitDispatch == thiscall) { - /* just pretend nothing was sent and the caller will free the call */ - thiscall->sentSomeData = false; - } else { - virNetClientCallPtr call = client->waitDispatch; - virNetClientCallRemove(&client->waitDispatch, call); - ignore_value(virCondDestroy(&call->cond)); - VIR_FREE(call->msg); - VIR_FREE(call); - } + if (call == thiscall) + return false; + + VIR_DEBUG("Removing call %p", call); + ignore_value(virCondDestroy(&call->cond)); + VIR_FREE(call->msg); + VIR_FREE(call); + return true; } -static void virNetClientIOEventLoopPassTheBuck(virNetClientPtr client, virNetClientCallPtr thiscall) +static void +virNetClientIOEventLoopPassTheBuck(virNetClientPtr client, + virNetClientCallPtr thiscall) { VIR_DEBUG("Giving up the buck %p", thiscall); virNetClientCallPtr tmp = client->waitDispatch; @@ -1280,14 +1255,18 @@ static void virNetClientIOEventLoopPassTheBuck(virNetClientPtr client, virNetCli VIR_DEBUG("No thread to pass the buck to"); if (client->wantClose) { virNetClientCloseLocked(client); - virNetClientIOEventLoopRemoveAll(client, thiscall); + virNetClientCallRemovePredicate(&client->waitDispatch, + virNetClientIOEventLoopRemoveAll, + thiscall); } } -static bool virNetClientIOEventLoopWantNonBlock(virNetClientCallPtr call, void *opaque ATTRIBUTE_UNUSED) +static bool +virNetClientIOEventLoopWantNonBlock(virNetClientCallPtr call, + void *opaque ATTRIBUTE_UNUSED) { - return call->nonBlock; + return call->nonBlock && call->haveThread; } /* @@ -1320,8 +1299,8 @@ static int virNetClientIOEventLoop(virNetClientPtr client, if (virNetSocketHasCachedData(client->sock) || client->wantClose) timeout = 0; - /* If there are any non-blocking calls in the queue, - * then we don't want to sleep in poll() + /* If there are any non-blocking calls with an associated thread + * in the queue, then we don't want to sleep in poll() */ if (virNetClientCallMatchPredicate(client->waitDispatch, virNetClientIOEventLoopWantNonBlock, @@ -1394,12 +1373,15 @@ static int virNetClientIOEventLoop(virNetClientPtr client, } /* If we were woken up because a new non-blocking call was queued, - * we need to re-poll to check if we can send it. + * we need to re-poll to check if we can send it. To be precise, we + * will re-poll even if a blocking call arrived when unhandled + * non-blocking calls are still in the queue. But this can't hurt. */ if (virNetClientCallMatchPredicate(client->waitDispatch, virNetClientIOEventLoopWantNonBlock, NULL)) { - VIR_DEBUG("New non-blocking call arrived; repolling"); + VIR_DEBUG("The queue contains new non-blocking call(s);" + " repolling"); continue; } } @@ -1424,18 +1406,18 @@ static int virNetClientIOEventLoop(virNetClientPtr client, } /* Iterate through waiting calls and if any are - * complete, remove them from the dispatch list.. + * complete, remove them from the dispatch list. */ virNetClientCallRemovePredicate(&client->waitDispatch, virNetClientIOEventLoopRemoveDone, thiscall); - /* Iterate through waiting calls and if any are - * non-blocking, remove them from the dispatch list... + /* Iterate through waiting calls and wake up and detach threads + * attached to non-blocking calls. */ - virNetClientCallRemovePredicate(&client->waitDispatch, - virNetClientIOEventLoopRemoveNonBlocking, - thiscall); + virNetClientCallMatchPredicate(client->waitDispatch, + virNetClientIOEventLoopDetachNonBlocking, + thiscall); /* Now see if *we* are done */ if (thiscall->mode == VIR_NET_CLIENT_MODE_COMPLETE) { @@ -1444,15 +1426,11 @@ static int virNetClientIOEventLoop(virNetClientPtr client, return 2; } - /* We're not done, but we're non-blocking */ + /* We're not done, but we're non-blocking; keep the call queued */ if (thiscall->nonBlock) { + thiscall->haveThread = false; virNetClientIOEventLoopPassTheBuck(client, thiscall); - if (thiscall->sentSomeData) { - return 1; - } else { - virNetClientCallRemove(&client->waitDispatch, thiscall); - return 0; - } + return 1; } if (fds[0].revents & (POLLHUP | POLLERR)) { @@ -1462,7 +1440,6 @@ static int virNetClientIOEventLoop(virNetClientPtr client, } } - error: virNetClientCallRemove(&client->waitDispatch, thiscall); virNetClientIOEventLoopPassTheBuck(client, thiscall); @@ -1614,9 +1591,11 @@ static int virNetClientIO(virNetClientPtr client, goto cleanup; } - /* If we're non-blocking, get outta here */ + /* If we're non-blocking, we were either queued (and detached) or the + * call was not sent because of an error. + */ if (thiscall->nonBlock) { - if (thiscall->sentSomeData) + if (!thiscall->haveThread) rv = 1; /* In progress */ else rv = 0; /* none at all */ @@ -1708,7 +1687,7 @@ done: /* - * Returns 2 if fully sent, 1 if partially sent (only for nonBlock==true), + * Returns 2 if fully sent, 1 if queued (only for nonBlock==true), * 0 if nothing sent (only for nonBlock==true) and -1 on error */ static int virNetClientSendInternal(virNetClientPtr client, @@ -1768,16 +1747,15 @@ static int virNetClientSendInternal(virNetClientPtr client, ret = virNetClientIO(client, call); - /* If partially sent, then the call is still on the dispatch queue */ - if (ret == 1) { - call->haveThread = false; - } else { - ignore_value(virCondDestroy(&call->cond)); - } + /* If queued, the call will be finished and freed later by another thread; + * we're done. */ + if (ret == 1) + return 1; + + ignore_value(virCondDestroy(&call->cond)); cleanup: - if (ret != 1) - VIR_FREE(call); + VIR_FREE(call); return ret; } -- 1.7.10.2

On Wed, Jun 13, 2012 at 01:29:21AM +0200, Jiri Denemark wrote:
So far, we were dropping non-blocking calls whenever sending them would block. In case a client is sending lots of stream calls (which are not supposed to generate any reply), the assumption that having other calls in a queue is sufficient to get a reply from the server doesn't work. I tried to fix this in b1e374a7ac56927cfe62435179bf0bba1e08b372 but failed and reverted that commit.
With this patch, non-blocking calls are never dropped (unless the connection is being closed) and will always be sent. --- src/rpc/virnetclient.c | 164 +++++++++++++++++++++--------------------------- 1 file changed, 71 insertions(+), 93 deletions(-)
ACK, quite complex to follow but the principle is good - dropping calls always felt slightly wrong. Daniel -- |: http://berrange.com -o- http://www.flickr.com/photos/dberrange/ :| |: http://libvirt.org -o- http://virt-manager.org :| |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :| |: http://entangle-photo.org -o- http://live.gnome.org/gtk-vnc :|

As non-blocking calls are no longer dropped, we don't really need to care that much about their faith and wait for the thread with the buck to process them. If another thread has the buck, we can just push a non-blocking call to the queue and be done with it. --- src/rpc/virnetclient.c | 74 ++++++++++-------------------------------------- 1 file changed, 15 insertions(+), 59 deletions(-) diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c index 614b469..b3d8d14 100644 --- a/src/rpc/virnetclient.c +++ b/src/rpc/virnetclient.c @@ -1200,20 +1200,11 @@ static bool virNetClientIOEventLoopRemoveDone(virNetClientCallPtr call, } -static bool -virNetClientIOEventLoopDetachNonBlocking(virNetClientCallPtr call, - void *opaque) +static void +virNetClientIODetachNonBlocking(virNetClientCallPtr call) { - virNetClientCallPtr thiscall = opaque; - - if (call != thiscall && call->nonBlock && call->haveThread) { - VIR_DEBUG("Waking up sleep %p", call); - call->haveThread = false; - virCondSignal(&call->cond); - return true; - } - - return false; + VIR_DEBUG("Keeping unfinished non-blocking call %p in the queue", call); + call->haveThread = false; } @@ -1262,13 +1253,6 @@ virNetClientIOEventLoopPassTheBuck(virNetClientPtr client, } -static bool -virNetClientIOEventLoopWantNonBlock(virNetClientCallPtr call, - void *opaque ATTRIBUTE_UNUSED) -{ - return call->nonBlock && call->haveThread; -} - /* * Process all calls pending dispatch/receive until we * get a reply to our own call. Then quit and pass the buck @@ -1299,12 +1283,8 @@ static int virNetClientIOEventLoop(virNetClientPtr client, if (virNetSocketHasCachedData(client->sock) || client->wantClose) timeout = 0; - /* If there are any non-blocking calls with an associated thread - * in the queue, then we don't want to sleep in poll() - */ - if (virNetClientCallMatchPredicate(client->waitDispatch, - virNetClientIOEventLoopWantNonBlock, - NULL)) + /* If we are non-blocking, then we don't want to sleep in poll() */ + if (thiscall->nonBlock) timeout = 0; fds[0].events = fds[0].revents = 0; @@ -1371,19 +1351,6 @@ static int virNetClientIOEventLoop(virNetClientPtr client, _("read on wakeup fd failed")); goto error; } - - /* If we were woken up because a new non-blocking call was queued, - * we need to re-poll to check if we can send it. To be precise, we - * will re-poll even if a blocking call arrived when unhandled - * non-blocking calls are still in the queue. But this can't hurt. - */ - if (virNetClientCallMatchPredicate(client->waitDispatch, - virNetClientIOEventLoopWantNonBlock, - NULL)) { - VIR_DEBUG("The queue contains new non-blocking call(s);" - " repolling"); - continue; - } } if (ret < 0) { @@ -1412,13 +1379,6 @@ static int virNetClientIOEventLoop(virNetClientPtr client, virNetClientIOEventLoopRemoveDone, thiscall); - /* Iterate through waiting calls and wake up and detach threads - * attached to non-blocking calls. - */ - virNetClientCallMatchPredicate(client->waitDispatch, - virNetClientIOEventLoopDetachNonBlocking, - thiscall); - /* Now see if *we* are done */ if (thiscall->mode == VIR_NET_CLIENT_MODE_COMPLETE) { virNetClientCallRemove(&client->waitDispatch, thiscall); @@ -1428,7 +1388,7 @@ static int virNetClientIOEventLoop(virNetClientPtr client, /* We're not done, but we're non-blocking; keep the call queued */ if (thiscall->nonBlock) { - thiscall->haveThread = false; + virNetClientIODetachNonBlocking(thiscall); virNetClientIOEventLoopPassTheBuck(client, thiscall); return 1; } @@ -1562,6 +1522,14 @@ static int virNetClientIO(virNetClientPtr client, return -1; } + /* If we are non-blocking, detach the thread and keep the call in the + * queue. */ + if (thiscall->nonBlock) { + virNetClientIODetachNonBlocking(thiscall); + rv = 1; + goto cleanup; + } + VIR_DEBUG("Going to sleep head=%p call=%p", client->waitDispatch, thiscall); /* Go to sleep while other thread is working... */ @@ -1579,7 +1547,6 @@ static int virNetClientIO(virNetClientPtr client, * 2. Other thread is all done, and it is our turn to * be the dispatcher to finish waiting for * our reply - * 3. I/O was expected to block */ if (thiscall->mode == VIR_NET_CLIENT_MODE_COMPLETE) { rv = 2; @@ -1591,17 +1558,6 @@ static int virNetClientIO(virNetClientPtr client, goto cleanup; } - /* If we're non-blocking, we were either queued (and detached) or the - * call was not sent because of an error. - */ - if (thiscall->nonBlock) { - if (!thiscall->haveThread) - rv = 1; /* In progress */ - else - rv = 0; /* none at all */ - goto cleanup; - } - /* Grr, someone passed the buck onto us ... */ } else { client->haveTheBuck = true; -- 1.7.10.2

On Wed, Jun 13, 2012 at 01:29:22AM +0200, Jiri Denemark wrote:
As non-blocking calls are no longer dropped, we don't really need to care that much about their faith and wait for the thread with the buck
ITYM to s/faith/fate/ ?
to process them. If another thread has the buck, we can just push a non-blocking call to the queue and be done with it. --- src/rpc/virnetclient.c | 74 ++++++++++-------------------------------------- 1 file changed, 15 insertions(+), 59 deletions(-)
ACK Daniel -- |: http://berrange.com -o- http://www.flickr.com/photos/dberrange/ :| |: http://libvirt.org -o- http://virt-manager.org :| |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :| |: http://entangle-photo.org -o- http://live.gnome.org/gtk-vnc :|

As we never drop non-blocking calls, the return value that used to indicate a call was dropped is no longer needed. --- src/rpc/virnetclient.c | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c index b3d8d14..e9898be 100644 --- a/src/rpc/virnetclient.c +++ b/src/rpc/virnetclient.c @@ -1258,8 +1258,8 @@ virNetClientIOEventLoopPassTheBuck(virNetClientPtr client, * get a reply to our own call. Then quit and pass the buck * to someone else. * - * Returns 2 if fully sent, 1 if partially sent (only for nonBlock==true), - * 0 if nothing sent (only for nonBlock==true) and -1 on error + * Returns 1 if the call was queued and will be completed later (only + * for nonBlock==true), 0 if the call was completed and -1 on error. */ static int virNetClientIOEventLoop(virNetClientPtr client, virNetClientCallPtr thiscall) @@ -1383,7 +1383,7 @@ static int virNetClientIOEventLoop(virNetClientPtr client, if (thiscall->mode == VIR_NET_CLIENT_MODE_COMPLETE) { virNetClientCallRemove(&client->waitDispatch, thiscall); virNetClientIOEventLoopPassTheBuck(client, thiscall); - return 2; + return 0; } /* We're not done, but we're non-blocking; keep the call queued */ @@ -1490,8 +1490,8 @@ static void virNetClientIOUpdateCallback(virNetClientPtr client, * * NB(7) Don't Panic! * - * Returns 2 if fully sent, 1 if partially sent (only for nonBlock==true), - * 0 if nothing sent (only for nonBlock==true) and -1 on error + * Returns 1 if the call was queued and will be completed later (only + * for nonBlock==true), 0 if the call was completed and -1 on error. */ static int virNetClientIO(virNetClientPtr client, virNetClientCallPtr thiscall) @@ -1549,7 +1549,7 @@ static int virNetClientIO(virNetClientPtr client, * our reply */ if (thiscall->mode == VIR_NET_CLIENT_MODE_COMPLETE) { - rv = 2; + rv = 0; /* * We avoided catching the buck and our reply is ready ! * We've already had 'thiscall' removed from the list @@ -1643,8 +1643,8 @@ done: /* - * Returns 2 if fully sent, 1 if queued (only for nonBlock==true), - * 0 if nothing sent (only for nonBlock==true) and -1 on error + * Returns 1 if the call was queued and will be completed later (only + * for nonBlock==true), 0 if the call was completed and -1 on error. */ static int virNetClientSendInternal(virNetClientPtr client, virNetMessagePtr msg, @@ -1769,7 +1769,8 @@ int virNetClientSendNoReply(virNetClientPtr client, * The caller is responsible for free'ing @msg, *except* if * this method returns 1. * - * Returns 2 on full send, 1 on partial send, 0 on no send, -1 on error + * Returns 1 if the message was queued and will be completed later (only + * for nonBlock==true), 0 if the message was completed and -1 on error. */ int virNetClientSendNonBlock(virNetClientPtr client, virNetMessagePtr msg) -- 1.7.10.2

On Wed, Jun 13, 2012 at 01:29:23AM +0200, Jiri Denemark wrote:
As we never drop non-blocking calls, the return value that used to indicate a call was dropped is no longer needed. --- src/rpc/virnetclient.c | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-)
ACK Daniel -- |: http://berrange.com -o- http://www.flickr.com/photos/dberrange/ :| |: http://libvirt.org -o- http://virt-manager.org :| |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :| |: http://entangle-photo.org -o- http://live.gnome.org/gtk-vnc :|

The code that needs to be run every keepalive interval of inactivity was only called from a timer and thus from the main event loop. We will need to call the code directly from another place. --- src/rpc/virkeepalive.c | 57 +++++++++++++++++++++++++++++------------------- 1 file changed, 35 insertions(+), 22 deletions(-) diff --git a/src/rpc/virkeepalive.c b/src/rpc/virkeepalive.c index a5c2b1a..b2e260e 100644 --- a/src/rpc/virkeepalive.c +++ b/src/rpc/virkeepalive.c @@ -152,51 +152,64 @@ virKeepAliveScheduleResponse(virKeepAlivePtr ka) } -static void -virKeepAliveTimer(int timer ATTRIBUTE_UNUSED, void *opaque) +static bool +virKeepAliveTimerInternal(virKeepAlivePtr ka, + virNetMessagePtr *msg) { - virKeepAlivePtr ka = opaque; time_t now = time(NULL); - virKeepAliveLock(ka); + if (now - ka->lastPacketReceived < ka->interval - 1) { + int timeout = ka->interval - (now - ka->lastPacketReceived); + virEventUpdateTimeout(ka->timer, timeout * 1000); + return false; + } PROBE(RPC_KEEPALIVE_TIMEOUT, "ka=%p client=%p countToDeath=%d idle=%d", ka, ka->client, ka->countToDeath, (int) (now - ka->lastPacketReceived)); - if (now - ka->lastPacketReceived < ka->interval - 1) { - int timeout = ka->interval - (now - ka->lastPacketReceived); - virEventUpdateTimeout(ka->timer, timeout * 1000); - goto cleanup; - } if (ka->countToDeath == 0) { - virKeepAliveDeadFunc deadCB = ka->deadCB; - void *client = ka->client; - VIR_WARN("No response from client %p after %d keepalive messages in" " %d seconds", ka->client, ka->count, (int) (now - ka->lastPacketReceived)); + return true; + } else { + ka->countToDeath--; + *msg = virKeepAliveMessage(KEEPALIVE_PROC_PING); + virEventUpdateTimeout(ka->timer, ka->interval * 1000); + return false; + } +} + + +static void +virKeepAliveTimer(int timer ATTRIBUTE_UNUSED, void *opaque) +{ + virKeepAlivePtr ka = opaque; + virNetMessagePtr msg = NULL; + bool dead; + + virKeepAliveLock(ka); + + dead = virKeepAliveTimerInternal(ka, &msg); + + if (dead) { + virKeepAliveDeadFunc deadCB = ka->deadCB; + void *client = ka->client; + ka->refs++; virKeepAliveUnlock(ka); deadCB(client); virKeepAliveLock(ka); ka->refs--; - } else { - virNetMessagePtr msg; - - ka->countToDeath--; - if (!(msg = virKeepAliveMessage(KEEPALIVE_PROC_PING))) - VIR_WARN("Failed to generate keepalive request"); - else - virKeepAliveSend(ka, msg); - virEventUpdateTimeout(ka->timer, ka->interval * 1000); + } else if (msg) { + virKeepAliveSend(ka, msg); } -cleanup: virKeepAliveUnlock(ka); } -- 1.7.10.2

On Wed, Jun 13, 2012 at 01:29:24AM +0200, Jiri Denemark wrote:
The code that needs to be run every keepalive interval of inactivity was only called from a timer and thus from the main event loop. We will need to call the code directly from another place. --- src/rpc/virkeepalive.c | 57 +++++++++++++++++++++++++++++------------------- 1 file changed, 35 insertions(+), 22 deletions(-)
ACK Daniel -- |: http://berrange.com -o- http://www.flickr.com/photos/dberrange/ :| |: http://libvirt.org -o- http://virt-manager.org :| |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :| |: http://entangle-photo.org -o- http://live.gnome.org/gtk-vnc :|

Add virKeepAliveTimeout and virKeepAliveTrigger APIs that can be used to set poll timeouts and trigger keepalive timer. virKeepAliveTrigger checks if it is called to early and does nothing in that case. --- src/rpc/virkeepalive.c | 61 ++++++++++++++++++++++++++++++++++++++++++++---- src/rpc/virkeepalive.h | 3 +++ 2 files changed, 60 insertions(+), 4 deletions(-) diff --git a/src/rpc/virkeepalive.c b/src/rpc/virkeepalive.c index b2e260e..035ac74 100644 --- a/src/rpc/virkeepalive.c +++ b/src/rpc/virkeepalive.c @@ -45,6 +45,7 @@ struct _virKeepAlive { unsigned int count; unsigned int countToDeath; time_t lastPacketReceived; + time_t intervalStart; int timer; virNetMessagePtr response; @@ -158,8 +159,11 @@ virKeepAliveTimerInternal(virKeepAlivePtr ka, { time_t now = time(NULL); - if (now - ka->lastPacketReceived < ka->interval - 1) { - int timeout = ka->interval - (now - ka->lastPacketReceived); + if (ka->interval <= 0 || ka->intervalStart == 0) + return false; + + if (now - ka->intervalStart < ka->interval) { + int timeout = ka->interval - (now - ka->intervalStart); virEventUpdateTimeout(ka->timer, timeout * 1000); return false; } @@ -179,6 +183,7 @@ virKeepAliveTimerInternal(virKeepAlivePtr ka, return true; } else { ka->countToDeath--; + ka->intervalStart = now; *msg = virKeepAliveMessage(KEEPALIVE_PROC_PING); virEventUpdateTimeout(ka->timer, ka->interval * 1000); return false; @@ -335,6 +340,7 @@ virKeepAliveStart(virKeepAlivePtr ka, int ret = -1; time_t delay; int timeout; + time_t now; virKeepAliveLock(ka); @@ -365,11 +371,13 @@ virKeepAliveStart(virKeepAlivePtr ka, "ka=%p client=%p interval=%d count=%u", ka, ka->client, interval, count); - delay = time(NULL) - ka->lastPacketReceived; + now = time(NULL); + delay = now - ka->lastPacketReceived; if (delay > ka->interval) timeout = 0; else timeout = ka->interval - delay; + ka->intervalStart = now - (ka->interval - timeout); ka->timer = virEventAddTimeout(timeout * 1000, virKeepAliveTimer, ka, virKeepAliveTimerFree); if (ka->timer < 0) @@ -427,6 +435,51 @@ virKeepAliveStopSending(virKeepAlivePtr ka) } +int +virKeepAliveTimeout(virKeepAlivePtr ka) +{ + int timeout; + + if (!ka) + return -1; + + virKeepAliveLock(ka); + + if (ka->interval <= 0 || ka->intervalStart == 0) { + timeout = -1; + } else { + timeout = ka->interval - (time(NULL) - ka->intervalStart); + if (timeout < 0) + timeout = 0; + } + + virKeepAliveUnlock(ka); + + if (timeout < 0) + return -1; + else + return timeout * 1000; +} + + +bool +virKeepAliveTrigger(virKeepAlivePtr ka, + virNetMessagePtr *msg) +{ + bool dead; + + *msg = NULL; + if (!ka) + return false; + + virKeepAliveLock(ka); + dead = virKeepAliveTimerInternal(ka, msg); + virKeepAliveUnlock(ka); + + return dead; +} + + bool virKeepAliveCheckMessage(virKeepAlivePtr ka, virNetMessagePtr msg) @@ -442,7 +495,7 @@ virKeepAliveCheckMessage(virKeepAlivePtr ka, virKeepAliveLock(ka); ka->countToDeath = ka->count; - ka->lastPacketReceived = time(NULL); + ka->lastPacketReceived = ka->intervalStart = time(NULL); if (msg->header.prog == KEEPALIVE_PROGRAM && msg->header.vers == KEEPALIVE_PROTOCOL_VERSION && diff --git a/src/rpc/virkeepalive.h b/src/rpc/virkeepalive.h index af9e722..09264a5 100644 --- a/src/rpc/virkeepalive.h +++ b/src/rpc/virkeepalive.h @@ -51,6 +51,9 @@ int virKeepAliveStart(virKeepAlivePtr ka, void virKeepAliveStop(virKeepAlivePtr ka); void virKeepAliveStopSending(virKeepAlivePtr ka); +int virKeepAliveTimeout(virKeepAlivePtr ka); +bool virKeepAliveTrigger(virKeepAlivePtr ka, + virNetMessagePtr *msg); bool virKeepAliveCheckMessage(virKeepAlivePtr ka, virNetMessagePtr msg); -- 1.7.10.2

On Wed, Jun 13, 2012 at 01:29:25AM +0200, Jiri Denemark wrote:
Add virKeepAliveTimeout and virKeepAliveTrigger APIs that can be used to set poll timeouts and trigger keepalive timer. virKeepAliveTrigger checks if it is called to early and does nothing in that case. --- src/rpc/virkeepalive.c | 61 ++++++++++++++++++++++++++++++++++++++++++++---- src/rpc/virkeepalive.h | 3 +++ 2 files changed, 60 insertions(+), 4 deletions(-)
ACK Daniel -- |: http://berrange.com -o- http://www.flickr.com/photos/dberrange/ :| |: http://libvirt.org -o- http://virt-manager.org :| |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :| |: http://entangle-photo.org -o- http://live.gnome.org/gtk-vnc :|

This makes it possible to create and queue new calls while we are running IO loop. --- src/rpc/virnetclient.c | 85 ++++++++++++++++++++++++++++++------------------ 1 file changed, 54 insertions(+), 31 deletions(-) diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c index e9898be..b956f6e 100644 --- a/src/rpc/virnetclient.c +++ b/src/rpc/virnetclient.c @@ -1642,53 +1642,38 @@ done: } -/* - * Returns 1 if the call was queued and will be completed later (only - * for nonBlock==true), 0 if the call was completed and -1 on error. - */ -static int virNetClientSendInternal(virNetClientPtr client, - virNetMessagePtr msg, - bool expectReply, - bool nonBlock) +static virNetClientCallPtr +virNetClientCallNew(virNetMessagePtr msg, + bool expectReply, + bool nonBlock) { - virNetClientCallPtr call; - int ret = -1; - - PROBE(RPC_CLIENT_MSG_TX_QUEUE, - "client=%p len=%zu prog=%u vers=%u proc=%u type=%u status=%u serial=%u", - client, msg->bufferLength, - msg->header.prog, msg->header.vers, msg->header.proc, - msg->header.type, msg->header.status, msg->header.serial); + virNetClientCallPtr call = NULL; if (expectReply && (msg->bufferLength != 0) && (msg->header.status == VIR_NET_CONTINUE)) { virNetError(VIR_ERR_INTERNAL_ERROR, "%s", - _("Attempt to send an asynchronous message with a synchronous reply")); - return -1; + _("Attempt to send an asynchronous message with" + " a synchronous reply")); + goto error; } if (expectReply && nonBlock) { virNetError(VIR_ERR_INTERNAL_ERROR, "%s", - _("Attempt to send a non-blocking message with a synchronous reply")); - return -1; + _("Attempt to send a non-blocking message with" + " a synchronous reply")); + goto error; } if (VIR_ALLOC(call) < 0) { virReportOOMError(); - return -1; - } - - if (!client->sock || client->wantClose) { - virNetError(VIR_ERR_INTERNAL_ERROR, "%s", - _("client socket is closed")); - goto cleanup; + goto error; } if (virCondInit(&call->cond) < 0) { virNetError(VIR_ERR_INTERNAL_ERROR, "%s", _("cannot initialize condition variable")); - goto cleanup; + goto error; } msg->donefds = 0; @@ -1699,8 +1684,48 @@ static int virNetClientSendInternal(virNetClientPtr client, call->msg = msg; call->expectReply = expectReply; call->nonBlock = nonBlock; - call->haveThread = true; + VIR_DEBUG("New call %p: msg=%p, expectReply=%d, nonBlock=%d", + call, msg, expectReply, nonBlock); + + return call; + +error: + VIR_FREE(call); + return NULL; +} + + +/* + * Returns 1 if the call was queued and will be completed later (only + * for nonBlock==true), 0 if the call was completed and -1 on error. + */ +static int virNetClientSendInternal(virNetClientPtr client, + virNetMessagePtr msg, + bool expectReply, + bool nonBlock) +{ + virNetClientCallPtr call; + int ret = -1; + + PROBE(RPC_CLIENT_MSG_TX_QUEUE, + "client=%p len=%zu prog=%u vers=%u proc=%u type=%u status=%u serial=%u", + client, msg->bufferLength, + msg->header.prog, msg->header.vers, msg->header.proc, + msg->header.type, msg->header.status, msg->header.serial); + + if (!client->sock || client->wantClose) { + virNetError(VIR_ERR_INTERNAL_ERROR, "%s", + _("client socket is closed")); + return -1; + } + + if (!(call = virNetClientCallNew(msg, expectReply, nonBlock))) { + virReportOOMError(); + return -1; + } + + call->haveThread = true; ret = virNetClientIO(client, call); /* If queued, the call will be finished and freed later by another thread; @@ -1709,8 +1734,6 @@ static int virNetClientSendInternal(virNetClientPtr client, return 1; ignore_value(virCondDestroy(&call->cond)); - -cleanup: VIR_FREE(call); return ret; } -- 1.7.10.2

On Wed, Jun 13, 2012 at 01:29:26AM +0200, Jiri Denemark wrote:
This makes it possible to create and queue new calls while we are running IO loop. --- src/rpc/virnetclient.c | 85 ++++++++++++++++++++++++++++++------------------ 1 file changed, 54 insertions(+), 31 deletions(-)
ACK Daniel -- |: http://berrange.com -o- http://www.flickr.com/photos/dberrange/ :| |: http://libvirt.org -o- http://virt-manager.org :| |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :| |: http://entangle-photo.org -o- http://live.gnome.org/gtk-vnc :|

When a libvirt API is called from the main event loop (which seems to be common in event-based glib apps), the client IO loop would properly handle keepalive requests sent by a server but will not actually send them because the main event loop is blocked with the API. This patch gets rid of response timer and the thread which is processing keepalive requests is also responsible for queueing responses for delivery. --- src/rpc/virkeepalive.c | 155 +++++++++++------------------------------- src/rpc/virkeepalive.h | 3 +- src/rpc/virnetclient.c | 35 +++++++++- src/rpc/virnetserverclient.c | 88 ++++++++++++------------ 4 files changed, 120 insertions(+), 161 deletions(-) diff --git a/src/rpc/virkeepalive.c b/src/rpc/virkeepalive.c index 035ac74..7984ada 100644 --- a/src/rpc/virkeepalive.c +++ b/src/rpc/virkeepalive.c @@ -48,9 +48,6 @@ struct _virKeepAlive { time_t intervalStart; int timer; - virNetMessagePtr response; - int responseTimer; - virKeepAliveSendFunc sendCB; virKeepAliveDeadFunc deadCB; virKeepAliveFreeFunc freeCB; @@ -72,12 +69,25 @@ virKeepAliveUnlock(virKeepAlivePtr ka) static virNetMessagePtr -virKeepAliveMessage(int proc) +virKeepAliveMessage(virKeepAlivePtr ka, int proc) { virNetMessagePtr msg; + const char *procstr = NULL; - if (!(msg = virNetMessageNew(false))) + switch (proc) { + case KEEPALIVE_PROC_PING: + procstr = "request"; + break; + case KEEPALIVE_PROC_PONG: + procstr = "response"; + break; + default: + VIR_WARN("Refusing to send unknown keepalive message: %d", proc); return NULL; + } + + if (!(msg = virNetMessageNew(false))) + goto error; msg->header.prog = KEEPALIVE_PROGRAM; msg->header.vers = KEEPALIVE_PROTOCOL_VERSION; @@ -87,69 +97,20 @@ virKeepAliveMessage(int proc) if (virNetMessageEncodeHeader(msg) < 0 || virNetMessageEncodePayloadEmpty(msg) < 0) { virNetMessageFree(msg); - return NULL; - } - - return msg; -} - - -static void -virKeepAliveSend(virKeepAlivePtr ka, virNetMessagePtr msg) -{ - const char *proc = NULL; - void *client = ka->client; - virKeepAliveSendFunc sendCB = ka->sendCB; - - switch (msg->header.proc) { - case KEEPALIVE_PROC_PING: - proc = "request"; - break; - case KEEPALIVE_PROC_PONG: - proc = "response"; - break; - } - - if (!proc) { - VIR_WARN("Refusing to send unknown keepalive message: %d", - msg->header.proc); - virNetMessageFree(msg); - return; + goto error; } - VIR_DEBUG("Sending keepalive %s to client %p", proc, ka->client); + VIR_DEBUG("Sending keepalive %s to client %p", procstr, ka->client); PROBE(RPC_KEEPALIVE_SEND, "ka=%p client=%p prog=%d vers=%d proc=%d", ka, ka->client, msg->header.prog, msg->header.vers, msg->header.proc); - ka->refs++; - virKeepAliveUnlock(ka); - - if (sendCB(client, msg) < 0) { - VIR_WARN("Failed to send keepalive %s to client %p", proc, client); - virNetMessageFree(msg); - } - - virKeepAliveLock(ka); - ka->refs--; -} - - -static void -virKeepAliveScheduleResponse(virKeepAlivePtr ka) -{ - if (ka->responseTimer == -1) - return; - - VIR_DEBUG("Scheduling keepalive response to client %p", ka->client); - - if (!ka->response && - !(ka->response = virKeepAliveMessage(KEEPALIVE_PROC_PONG))) { - VIR_WARN("Failed to generate keepalive response"); - return; - } + return msg; - virEventUpdateTimeout(ka->responseTimer, 0); +error: + VIR_WARN("Failed to generate keepalive %s", procstr); + VIR_FREE(msg); + return NULL; } @@ -184,7 +145,7 @@ virKeepAliveTimerInternal(virKeepAlivePtr ka, } else { ka->countToDeath--; ka->intervalStart = now; - *msg = virKeepAliveMessage(KEEPALIVE_PROC_PING); + *msg = virKeepAliveMessage(ka, KEEPALIVE_PROC_PING); virEventUpdateTimeout(ka->timer, ka->interval * 1000); return false; } @@ -197,47 +158,30 @@ virKeepAliveTimer(int timer ATTRIBUTE_UNUSED, void *opaque) virKeepAlivePtr ka = opaque; virNetMessagePtr msg = NULL; bool dead; + void *client; virKeepAliveLock(ka); + client = ka->client; dead = virKeepAliveTimerInternal(ka, &msg); - if (dead) { - virKeepAliveDeadFunc deadCB = ka->deadCB; - void *client = ka->client; - - ka->refs++; - virKeepAliveUnlock(ka); - deadCB(client); - virKeepAliveLock(ka); - ka->refs--; - } else if (msg) { - virKeepAliveSend(ka, msg); - } + if (!dead && !msg) + goto cleanup; + ka->refs++; virKeepAliveUnlock(ka); -} - - -static void -virKeepAliveResponseTimer(int timer ATTRIBUTE_UNUSED, void *opaque) -{ - virKeepAlivePtr ka = opaque; - virNetMessagePtr msg; - - virKeepAliveLock(ka); - VIR_DEBUG("ka=%p, client=%p, response=%p", - ka, ka->client, ka->response); - - if (ka->response) { - msg = ka->response; - ka->response = NULL; - virKeepAliveSend(ka, msg); + if (dead) { + ka->deadCB(client); + } else if (ka->sendCB(client, msg) < 0) { + VIR_WARN("Failed to send keepalive request to client %p", client); + virNetMessageFree(msg); } - virEventUpdateTimeout(ka->responseTimer, ka->response ? 0 : -1); + virKeepAliveLock(ka); + ka->refs--; +cleanup: virKeepAliveUnlock(ka); } @@ -281,15 +225,6 @@ virKeepAliveNew(int interval, ka->deadCB = deadCB; ka->freeCB = freeCB; - ka->responseTimer = virEventAddTimeout(-1, virKeepAliveResponseTimer, - ka, virKeepAliveTimerFree); - if (ka->responseTimer < 0) { - virKeepAliveFree(ka); - return NULL; - } - /* the timer now has a reference to ka */ - ka->refs++; - PROBE(RPC_KEEPALIVE_NEW, "ka=%p client=%p refs=%d", ka, ka->client, ka->refs); @@ -394,7 +329,7 @@ cleanup: static void -virKeepAliveStopInternal(virKeepAlivePtr ka, bool all) +virKeepAliveStopInternal(virKeepAlivePtr ka, bool all ATTRIBUTE_UNUSED) { virKeepAliveLock(ka); @@ -407,16 +342,6 @@ virKeepAliveStopInternal(virKeepAlivePtr ka, bool all) ka->timer = -1; } - if (all) { - if (ka->responseTimer > 0) { - virEventRemoveTimeout(ka->responseTimer); - ka->responseTimer = -1; - } - - virNetMessageFree(ka->response); - ka->response = NULL; - } - virKeepAliveUnlock(ka); } @@ -482,13 +407,15 @@ virKeepAliveTrigger(virKeepAlivePtr ka, bool virKeepAliveCheckMessage(virKeepAlivePtr ka, - virNetMessagePtr msg) + virNetMessagePtr msg, + virNetMessagePtr *response) { bool ret = false; VIR_DEBUG("ka=%p, client=%p, msg=%p", ka, ka ? ka->client : "(null)", msg); + *response = NULL; if (!ka) return false; @@ -508,7 +435,7 @@ virKeepAliveCheckMessage(virKeepAlivePtr ka, switch (msg->header.proc) { case KEEPALIVE_PROC_PING: VIR_DEBUG("Got keepalive request from client %p", ka->client); - virKeepAliveScheduleResponse(ka); + *response = virKeepAliveMessage(ka, KEEPALIVE_PROC_PONG); break; case KEEPALIVE_PROC_PONG: diff --git a/src/rpc/virkeepalive.h b/src/rpc/virkeepalive.h index 09264a5..62227d0 100644 --- a/src/rpc/virkeepalive.h +++ b/src/rpc/virkeepalive.h @@ -55,6 +55,7 @@ int virKeepAliveTimeout(virKeepAlivePtr ka); bool virKeepAliveTrigger(virKeepAlivePtr ka, virNetMessagePtr *msg); bool virKeepAliveCheckMessage(virKeepAlivePtr ka, - virNetMessagePtr msg); + virNetMessagePtr msg, + virNetMessagePtr *response); #endif /* __VIR_KEEPALIVE_H__ */ diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c index b956f6e..48c6a5d 100644 --- a/src/rpc/virnetclient.c +++ b/src/rpc/virnetclient.c @@ -109,6 +109,8 @@ struct _virNetClient { static void virNetClientIOEventLoopPassTheBuck(virNetClientPtr client, virNetClientCallPtr thiscall); +static int virNetClientQueueNonBlocking(virNetClientPtr client, + virNetMessagePtr msg); static void virNetClientLock(virNetClientPtr client) @@ -937,14 +939,22 @@ static int virNetClientCallDispatchStream(virNetClientPtr client) static int virNetClientCallDispatch(virNetClientPtr client) { + virNetMessagePtr response = NULL; + PROBE(RPC_CLIENT_MSG_RX, "client=%p len=%zu prog=%u vers=%u proc=%u type=%u status=%u serial=%u", client, client->msg.bufferLength, client->msg.header.prog, client->msg.header.vers, client->msg.header.proc, client->msg.header.type, client->msg.header.status, client->msg.header.serial); - if (virKeepAliveCheckMessage(client->keepalive, &client->msg)) + if (virKeepAliveCheckMessage(client->keepalive, &client->msg, &response)) { + if (response && + virNetClientQueueNonBlocking(client, response) < 0) { + VIR_WARN("Could not queue keepalive response"); + virNetMessageFree(response); + } return 0; + } switch (client->msg.header.type) { case VIR_NET_REPLY: /* Normal RPC replies */ @@ -1637,6 +1647,8 @@ void virNetClientIncomingEvent(virNetSocketPtr sock, virNetClientCallRemovePredicate(&client->waitDispatch, virNetClientIOEventLoopRemoveDone, NULL); + virNetClientIOUpdateCallback(client, true); + done: virNetClientUnlock(client); } @@ -1696,6 +1708,27 @@ error: } +static int +virNetClientQueueNonBlocking(virNetClientPtr client, + virNetMessagePtr msg) +{ + virNetClientCallPtr call; + + PROBE(RPC_CLIENT_MSG_TX_QUEUE, + "client=%p len=%zu prog=%u vers=%u proc=%u" + " type=%u status=%u serial=%u", + client, msg->bufferLength, + msg->header.prog, msg->header.vers, msg->header.proc, + msg->header.type, msg->header.status, msg->header.serial); + + if (!(call = virNetClientCallNew(msg, false, true))) + return -1; + + virNetClientCallQueue(&client->waitDispatch, call); + return 0; +} + + /* * Returns 1 if the call was queued and will be completed later (only * for nonBlock==true), 0 if the call was completed and -1 on error. diff --git a/src/rpc/virnetserverclient.c b/src/rpc/virnetserverclient.c index 6ae4e25..f3eb61a 100644 --- a/src/rpc/virnetserverclient.c +++ b/src/rpc/virnetserverclient.c @@ -103,13 +103,14 @@ struct _virNetServerClient virNetServerClientCloseFunc privateDataCloseFunc; virKeepAlivePtr keepalive; - int keepaliveFilter; }; static void virNetServerClientDispatchEvent(virNetSocketPtr sock, int events, void *opaque); static void virNetServerClientUpdateEvent(virNetServerClientPtr client); static void virNetServerClientDispatchRead(virNetServerClientPtr client); +static int virNetServerClientSendMessageLocked(virNetServerClientPtr client, + virNetMessagePtr msg); static void virNetServerClientLock(virNetServerClientPtr client) { @@ -364,7 +365,6 @@ virNetServerClientPtr virNetServerClientNew(virNetSocketPtr sock, client->readonly = readonly; client->tlsCtxt = tls; client->nrequests_max = nrequests_max; - client->keepaliveFilter = -1; client->sockTimer = virEventAddTimeout(-1, virNetServerClientSockTimerFunc, client, NULL); @@ -644,9 +644,6 @@ void virNetServerClientClose(virNetServerClientPtr client) return; } - if (client->keepaliveFilter >= 0) - virNetServerClientRemoveFilterLocked(client, client->keepaliveFilter); - if (client->keepalive) { virKeepAliveStop(client->keepalive); ka = client->keepalive; @@ -844,6 +841,7 @@ readmore: } else { /* Grab the completed message */ virNetMessagePtr msg = client->rx; + virNetMessagePtr response = NULL; virNetServerClientFilterPtr filter; size_t i; @@ -894,23 +892,35 @@ readmore: msg->header.prog, msg->header.vers, msg->header.proc, msg->header.type, msg->header.status, msg->header.serial); + if (virKeepAliveCheckMessage(client->keepalive, msg, &response)) { + virNetMessageFree(msg); + client->nrequests--; + msg = NULL; + + if (response && + virNetServerClientSendMessageLocked(client, response) < 0) + virNetMessageFree(response); + } + /* Maybe send off for queue against a filter */ - filter = client->filters; - while (filter) { - int ret = filter->func(client, msg, filter->opaque); - if (ret < 0) { - virNetMessageFree(msg); - msg = NULL; - if (ret < 0) - client->wantClose = true; - break; - } - if (ret > 0) { - msg = NULL; - break; - } + if (msg) { + filter = client->filters; + while (filter) { + int ret = filter->func(client, msg, filter->opaque); + if (ret < 0) { + virNetMessageFree(msg); + msg = NULL; + if (ret < 0) + client->wantClose = true; + break; + } + if (ret > 0) { + msg = NULL; + break; + } - filter = filter->next; + filter = filter->next; + } } /* Send off to for normal dispatch to workers */ @@ -1117,16 +1127,15 @@ virNetServerClientDispatchEvent(virNetSocketPtr sock, int events, void *opaque) } -int virNetServerClientSendMessage(virNetServerClientPtr client, - virNetMessagePtr msg) +static int +virNetServerClientSendMessageLocked(virNetServerClientPtr client, + virNetMessagePtr msg) { int ret = -1; VIR_DEBUG("msg=%p proc=%d len=%zu offset=%zu", msg, msg->header.proc, msg->bufferLength, msg->bufferOffset); - virNetServerClientLock(client); - msg->donefds = 0; if (client->sock && !client->wantClose) { PROBE(RPC_SERVER_CLIENT_MSG_TX_QUEUE, @@ -1140,6 +1149,16 @@ int virNetServerClientSendMessage(virNetServerClientPtr client, ret = 0; } + return ret; +} + +int virNetServerClientSendMessage(virNetServerClientPtr client, + virNetMessagePtr msg) +{ + int ret; + + virNetServerClientLock(client); + ret = virNetServerClientSendMessageLocked(client, msg); virNetServerClientUnlock(client); return ret; @@ -1176,20 +1195,6 @@ virNetServerClientFreeCB(void *opaque) virNetServerClientFree(opaque); } -static int -virNetServerClientKeepAliveFilter(virNetServerClientPtr client, - virNetMessagePtr msg, - void *opaque ATTRIBUTE_UNUSED) -{ - if (virKeepAliveCheckMessage(client->keepalive, msg)) { - virNetMessageFree(msg); - client->nrequests--; - return 1; - } - - return 0; -} - int virNetServerClientInitKeepAlive(virNetServerClientPtr client, int interval, @@ -1208,13 +1213,6 @@ virNetServerClientInitKeepAlive(virNetServerClientPtr client, /* keepalive object has a reference to client */ client->refs++; - client->keepaliveFilter = - virNetServerClientAddFilterLocked(client, - virNetServerClientKeepAliveFilter, - NULL); - if (client->keepaliveFilter < 0) - goto cleanup; - client->keepalive = ka; ka = NULL; -- 1.7.10.2

On Wed, Jun 13, 2012 at 01:29:27AM +0200, Jiri Denemark wrote:
When a libvirt API is called from the main event loop (which seems to be common in event-based glib apps), the client IO loop would properly handle keepalive requests sent by a server but will not actually send them because the main event loop is blocked with the API. This patch gets rid of response timer and the thread which is processing keepalive requests is also responsible for queueing responses for delivery. --- src/rpc/virkeepalive.c | 155 +++++++++++------------------------------- src/rpc/virkeepalive.h | 3 +- src/rpc/virnetclient.c | 35 +++++++++- src/rpc/virnetserverclient.c | 88 ++++++++++++------------ 4 files changed, 120 insertions(+), 161 deletions(-)
ACK. Again quite hard to follow the patch, so I reviewed the result of applying it instead. Daniel -- |: http://berrange.com -o- http://www.flickr.com/photos/dberrange/ :| |: http://libvirt.org -o- http://virt-manager.org :| |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :| |: http://entangle-photo.org -o- http://live.gnome.org/gtk-vnc :|

The previous commit removed the only usage of ``all'' parameter in virKeepAliveStopInternal, which was actually the only reason for having virKeepAliveStopInternal. This effectively reverts most of commit 6446a9e20cc65561ce6061742baf35a3a63d5ba1. --- src/libvirt_probes.d | 2 +- src/rpc/virkeepalive.c | 22 ++++------------------ src/rpc/virkeepalive.h | 1 - src/rpc/virnetclient.c | 2 +- 4 files changed, 6 insertions(+), 21 deletions(-) diff --git a/src/libvirt_probes.d b/src/libvirt_probes.d index ac6c546..0dac8f3 100644 --- a/src/libvirt_probes.d +++ b/src/libvirt_probes.d @@ -78,7 +78,7 @@ provider libvirt { probe rpc_keepalive_ref(void *ka, void *client, int refs); probe rpc_keepalive_free(void *ka, void *client, int refs); probe rpc_keepalive_start(void *ka, void *client, int interval, int count); - probe rpc_keepalive_stop(void *ka, void *client, bool all); + probe rpc_keepalive_stop(void *ka, void *client); probe rpc_keepalive_send(void *ka, void *client, int prog, int vers, int proc); probe rpc_keepalive_received(void *ka, void *client, int prog, int vers, int proc); probe rpc_keepalive_timeout(void *ka, void *client, int coundToDeath, int idle); diff --git a/src/rpc/virkeepalive.c b/src/rpc/virkeepalive.c index 7984ada..70cf31e 100644 --- a/src/rpc/virkeepalive.c +++ b/src/rpc/virkeepalive.c @@ -328,14 +328,14 @@ cleanup: } -static void -virKeepAliveStopInternal(virKeepAlivePtr ka, bool all ATTRIBUTE_UNUSED) +void +virKeepAliveStop(virKeepAlivePtr ka) { virKeepAliveLock(ka); PROBE(RPC_KEEPALIVE_STOP, - "ka=%p client=%p all=%d", - ka, ka->client, all); + "ka=%p client=%p", + ka, ka->client); if (ka->timer > 0) { virEventRemoveTimeout(ka->timer); @@ -346,20 +346,6 @@ virKeepAliveStopInternal(virKeepAlivePtr ka, bool all ATTRIBUTE_UNUSED) } -void -virKeepAliveStop(virKeepAlivePtr ka) -{ - virKeepAliveStopInternal(ka, true); -} - - -void -virKeepAliveStopSending(virKeepAlivePtr ka) -{ - virKeepAliveStopInternal(ka, false); -} - - int virKeepAliveTimeout(virKeepAlivePtr ka) { diff --git a/src/rpc/virkeepalive.h b/src/rpc/virkeepalive.h index 62227d0..1e25214 100644 --- a/src/rpc/virkeepalive.h +++ b/src/rpc/virkeepalive.h @@ -49,7 +49,6 @@ int virKeepAliveStart(virKeepAlivePtr ka, int interval, unsigned int count); void virKeepAliveStop(virKeepAlivePtr ka); -void virKeepAliveStopSending(virKeepAlivePtr ka); int virKeepAliveTimeout(virKeepAlivePtr ka); bool virKeepAliveTrigger(virKeepAlivePtr ka, diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c index 48c6a5d..25bafea 100644 --- a/src/rpc/virnetclient.c +++ b/src/rpc/virnetclient.c @@ -257,7 +257,7 @@ void virNetClientKeepAliveStop(virNetClientPtr client) { virNetClientLock(client); - virKeepAliveStopSending(client->keepalive); + virKeepAliveStop(client->keepalive); virNetClientUnlock(client); } -- 1.7.10.2

On Wed, Jun 13, 2012 at 01:29:28AM +0200, Jiri Denemark wrote:
The previous commit removed the only usage of ``all'' parameter in virKeepAliveStopInternal, which was actually the only reason for having virKeepAliveStopInternal. This effectively reverts most of commit 6446a9e20cc65561ce6061742baf35a3a63d5ba1. --- src/libvirt_probes.d | 2 +- src/rpc/virkeepalive.c | 22 ++++------------------ src/rpc/virkeepalive.h | 1 - src/rpc/virnetclient.c | 2 +- 4 files changed, 6 insertions(+), 21 deletions(-)
ACK Daniel -- |: http://berrange.com -o- http://www.flickr.com/photos/dberrange/ :| |: http://libvirt.org -o- http://virt-manager.org :| |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :| |: http://entangle-photo.org -o- http://live.gnome.org/gtk-vnc :|

We don't need to add or remove filters when client object is already locked anymore. There's no reason to keep the *Locked variants of those APIs. --- src/rpc/virnetserverclient.c | 39 ++++++++++++--------------------------- 1 file changed, 12 insertions(+), 27 deletions(-) diff --git a/src/rpc/virnetserverclient.c b/src/rpc/virnetserverclient.c index f3eb61a..a56031c 100644 --- a/src/rpc/virnetserverclient.c +++ b/src/rpc/virnetserverclient.c @@ -218,20 +218,21 @@ static void virNetServerClientUpdateEvent(virNetServerClientPtr client) } -static int -virNetServerClientAddFilterLocked(virNetServerClientPtr client, - virNetServerClientFilterFunc func, - void *opaque) +int virNetServerClientAddFilter(virNetServerClientPtr client, + virNetServerClientFilterFunc func, + void *opaque) { virNetServerClientFilterPtr filter; virNetServerClientFilterPtr *place; - int ret = -1; + int ret; if (VIR_ALLOC(filter) < 0) { virReportOOMError(); - goto cleanup; + return -1; } + virNetServerClientLock(client); + filter->id = client->nextFilterID++; filter->func = func; filter->opaque = opaque; @@ -243,28 +244,18 @@ virNetServerClientAddFilterLocked(virNetServerClientPtr client, ret = filter->id; -cleanup: - return ret; -} - -int virNetServerClientAddFilter(virNetServerClientPtr client, - virNetServerClientFilterFunc func, - void *opaque) -{ - int ret; - - virNetServerClientLock(client); - ret = virNetServerClientAddFilterLocked(client, func, opaque); virNetServerClientUnlock(client); + return ret; } -static void -virNetServerClientRemoveFilterLocked(virNetServerClientPtr client, - int filterID) +void virNetServerClientRemoveFilter(virNetServerClientPtr client, + int filterID) { virNetServerClientFilterPtr tmp, prev; + virNetServerClientLock(client); + prev = NULL; tmp = client->filters; while (tmp) { @@ -280,13 +271,7 @@ virNetServerClientRemoveFilterLocked(virNetServerClientPtr client, prev = tmp; tmp = tmp->next; } -} -void virNetServerClientRemoveFilter(virNetServerClientPtr client, - int filterID) -{ - virNetServerClientLock(client); - virNetServerClientRemoveFilterLocked(client, filterID); virNetServerClientUnlock(client); } -- 1.7.10.2

On Wed, Jun 13, 2012 at 01:29:29AM +0200, Jiri Denemark wrote:
We don't need to add or remove filters when client object is already locked anymore. There's no reason to keep the *Locked variants of those APIs. --- src/rpc/virnetserverclient.c | 39 ++++++++++++--------------------------- 1 file changed, 12 insertions(+), 27 deletions(-)
ACK Daniel -- |: http://berrange.com -o- http://www.flickr.com/photos/dberrange/ :| |: http://libvirt.org -o- http://virt-manager.org :| |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :| |: http://entangle-photo.org -o- http://live.gnome.org/gtk-vnc :|

In addition to keepalive responses, we also need to send keepalive requests from client IO loop to properly detect dead connection in case a libvirt API call is called from the main loop, which prevents any timers to be called. --- src/rpc/virnetclient.c | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c index 25bafea..033fda6 100644 --- a/src/rpc/virnetclient.c +++ b/src/rpc/virnetclient.c @@ -1284,6 +1284,7 @@ static int virNetClientIOEventLoop(virNetClientPtr client, char ignore; sigset_t oldmask, blockedsigs; int timeout = -1; + virNetMessagePtr msg = NULL; /* If we have existing SASL decoded data we don't want to sleep in * the poll(), just check if any other FDs are also ready. @@ -1297,6 +1298,10 @@ static int virNetClientIOEventLoop(virNetClientPtr client, if (thiscall->nonBlock) timeout = 0; + /* Limit timeout so that we can send keepalive request in time */ + if (timeout == -1) + timeout = virKeepAliveTimeout(client->keepalive); + fds[0].events = fds[0].revents = 0; fds[1].events = fds[1].revents = 0; @@ -1342,6 +1347,13 @@ static int virNetClientIOEventLoop(virNetClientPtr client, virNetClientLock(client); + if (virKeepAliveTrigger(client->keepalive, &msg)) { + client->wantClose = true; + } else if (msg && virNetClientQueueNonBlocking(client, msg) < 0) { + VIR_WARN("Could not queue keepalive request"); + virNetMessageFree(msg); + } + /* If we have existing SASL decoded data, pretend * the socket became readable so we consume it */ -- 1.7.10.2

On Wed, Jun 13, 2012 at 01:29:30AM +0200, Jiri Denemark wrote:
In addition to keepalive responses, we also need to send keepalive requests from client IO loop to properly detect dead connection in case a libvirt API call is called from the main loop, which prevents any timers to be called. --- src/rpc/virnetclient.c | 12 ++++++++++++ 1 file changed, 12 insertions(+)
ACK Daniel -- |: http://berrange.com -o- http://www.flickr.com/photos/dberrange/ :| |: http://libvirt.org -o- http://virt-manager.org :| |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :| |: http://entangle-photo.org -o- http://live.gnome.org/gtk-vnc :|

On Wed, Jun 13, 2012 at 01:29:18AM +0200, Jiri Denemark wrote:
So far, we were dropping non-blocking calls whenever sending them would block. In case a client is sending lots of stream calls (which are not supposed to generate any reply), the assumption that having other calls in a queue is sufficient to get a reply from the server doesn't work. I tried to fix this in b1e374a7ac56927cfe62435179bf0bba1e08b372 but failed and reverted that commit. While working on the proper fix, I discovered several other issues we had in handling keepalive messages in client RPC code. See individual patches for more details.
As a nice bonus, the fixed version is shorter by one line than the current broken version :-)
Ok, this refactoring actually addresses many of the concerns I had with the original design and so passes my totally subjective "feels right" test. In particular the way we handle keepalives while in the virNetClientIOEventLoop() method & use event handles while no, are the key problem areas we're fixing with this. On reviewing the code, I noticed the following (unrelated) issue we should fix. First 'poll' can't return EWOULDBLOCK, and second, we're checking errno so far away from the poll() call that we've probably already trashed the original errno value. Daniel diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c index 033fda6..49d238e 100644 --- a/src/rpc/virnetclient.c +++ b/src/rpc/virnetclient.c @@ -1347,6 +1347,12 @@ static int virNetClientIOEventLoop(virNetClientPtr client, virNetClientLock(client); + if (ret < 0) { + virReportSystemError(errno, + "%s", _("poll on socket failed")); + goto error; + } + if (virKeepAliveTrigger(client->keepalive, &msg)) { client->wantClose = true; } else if (msg && virNetClientQueueNonBlocking(client, msg) < 0) { @@ -1375,15 +1381,6 @@ static int virNetClientIOEventLoop(virNetClientPtr client, } } - if (ret < 0) { - /* XXX what's this dubious errno check doing ? */ - if (errno == EWOULDBLOCK) - continue; - virReportSystemError(errno, - "%s", _("poll on socket failed")); - goto error; - } - if (fds[0].revents & POLLOUT) { if (virNetClientIOHandleOutput(client) < 0) goto error; -- |: http://berrange.com -o- http://www.flickr.com/photos/dberrange/ :| |: http://libvirt.org -o- http://virt-manager.org :| |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :| |: http://entangle-photo.org -o- http://live.gnome.org/gtk-vnc :|

On Wed, Jun 13, 2012 at 10:54:02 +0100, Daniel P. Berrange wrote:
On Wed, Jun 13, 2012 at 01:29:18AM +0200, Jiri Denemark wrote:
In case a client is sending lots of stream calls (which are not supposed to generate any reply), the assumption that having other calls in a queue is sufficient to get a reply from the server doesn't work. I tried to fix this in b1e374a7ac56927cfe62435179bf0bba1e08b372 but failed and reverted that commit. While working on the proper fix, I discovered several other issues we had in handling keepalive messages in client RPC code. See individual patches for more details.
As a nice bonus, the fixed version is shorter by one line than the current broken version :-)
Ok, this refactoring actually addresses many of the concerns I had with the original design and so passes my totally subjective "feels right" test. In particular the way we handle keepalives while in the virNetClientIOEventLoop() method & use event handles while no, are the key problem areas we're fixing with this.
Thanks for the review, I tested scenarios covered by patches 2, 3, 4, 9, and 12 and all was working fine. I'll be pushing the series soon and I hope it won't generate tons of bugs as I'll be on vacation for two weeks starting from Friday :-P
On reviewing the code, I noticed the following (unrelated) issue we should fix. First 'poll' can't return EWOULDBLOCK,
Oh yeah, the code I always wanted to remove but was afraid of doing so :-)
and second, we're checking errno so far away from the poll() call that we've probably already trashed the original errno value.
Daniel
diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c index 033fda6..49d238e 100644 --- a/src/rpc/virnetclient.c +++ b/src/rpc/virnetclient.c @@ -1347,6 +1347,12 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
virNetClientLock(client);
+ if (ret < 0) { + virReportSystemError(errno, + "%s", _("poll on socket failed")); + goto error; + } + if (virKeepAliveTrigger(client->keepalive, &msg)) { client->wantClose = true; } else if (msg && virNetClientQueueNonBlocking(client, msg) < 0) { @@ -1375,15 +1381,6 @@ static int virNetClientIOEventLoop(virNetClientPtr client, } }
- if (ret < 0) { - /* XXX what's this dubious errno check doing ? */ - if (errno == EWOULDBLOCK) - continue; - virReportSystemError(errno, - "%s", _("poll on socket failed")); - goto error; - } - if (fds[0].revents & POLLOUT) { if (virNetClientIOHandleOutput(client) < 0) goto error;
ACK, I'll push this as a 13th patch in this series. Jirka
participants (3)
-
Daniel P. Berrange
-
Eric Blake
-
Jiri Denemark