[libvirt] [PATCH v2 00/12] Implement keepalive protocol for libvirt RPC

This patchset can also be found at https://gitorious.org/~jirka/libvirt/jirka-staging/commits/keepalive This allows us to detect broken connections between server and client without waiting for TCP timeout and dead deamon/client. By default a connection is considered broken after about 30 seconds of no messages received from remote party. After that period, the connection is automatically closed. The main reason for implementing this is that peer-to-peer migration can now be canceled when a connection between source and target breaks. Although this will really work only after qemu fixes migrate_cancel command so that it doesn't block when outgoing TCP buffers are full. Version 2 adds virConnectIsAlive API and uses it to detect that a connection was closed as a result of keepalive timeout. The only patch that was changed in v2 is "Add keepalive support into domain-events examples". All other patches are either new or without any change from v1. Jiri Denemark (12): Define keepalive protocol Implement common keepalive handling Introduce two public APIs for keepalive protocol Implement keepalive protocol in libvirt daemon Add support for non-blocking calls in client RPC Add support for async close of client RPC socket Implement keepalive protocol in remote driver Introduce virConnectIsAlive API Implement virConnectIsAlive in all drivers Add keepalive support into domain-events examples qemu: Add support for keepalive messages during p2p migration qemu: Cancel p2p migration when connection breaks .gitignore | 1 + daemon/libvirtd.aug | 4 + daemon/libvirtd.c | 11 + daemon/libvirtd.conf | 15 + daemon/remote.c | 38 ++ examples/domain-events/events-c/event-test.c | 13 +- examples/domain-events/events-python/event-test.py | 5 +- include/libvirt/libvirt.h.in | 6 + po/POTFILES.in | 1 + src/Makefile.am | 13 +- src/driver.h | 12 + src/esx/esx_driver.c | 18 + src/hyperv/hyperv_driver.c | 18 + src/libvirt.c | 143 ++++++ src/libvirt_internal.h | 10 +- src/libvirt_public.syms | 7 + src/libxl/libxl_driver.c | 8 + src/lxc/lxc_driver.c | 7 + src/openvz/openvz_driver.c | 7 + src/phyp/phyp_driver.c | 18 + src/qemu/libvirtd_qemu.aug | 2 + src/qemu/qemu.conf | 16 + src/qemu/qemu_conf.c | 11 + src/qemu/qemu_conf.h | 3 + src/qemu/qemu_driver.c | 6 + src/qemu/qemu_migration.c | 49 ++- src/qemu/test_libvirtd_qemu.aug | 6 + src/remote/remote_driver.c | 48 ++ src/remote/remote_protocol.x | 2 +- src/rpc/virkeepalive.c | 464 ++++++++++++++++++++ src/rpc/virkeepalive.h | 58 +++ src/rpc/virkeepaliveprotocol.x | 8 + src/rpc/virnetclient.c | 335 ++++++++++++-- src/rpc/virnetclient.h | 6 + src/rpc/virnetserver.c | 10 + src/rpc/virnetserver.h | 2 + src/rpc/virnetserverclient.c | 126 +++++- src/rpc/virnetserverclient.h | 6 + src/test/test_driver.c | 6 + src/uml/uml_driver.c | 7 + src/vbox/vbox_tmpl.c | 6 + src/vmware/vmware_driver.c | 7 + src/xen/xen_driver.c | 8 + src/xenapi/xenapi_driver.c | 12 + 44 files changed, 1483 insertions(+), 76 deletions(-) create mode 100644 src/rpc/virkeepalive.c create mode 100644 src/rpc/virkeepalive.h create mode 100644 src/rpc/virkeepaliveprotocol.x -- 1.7.6.1

The keepalive program has three procedures: ADVERTISE, PING, and PONG. All are used only in asynchronous messages and the sender doesn't wait for any reply. However, the party which receives PING messages is supposed to react by sending PONG message the other party, but no explicit binding between PING and PONG messages is made. ADVERTISE is sent by a client to indicate it supports keepalive protocol. Server is not allowed to send any keepalive message until it sees ADVERTISE. --- .gitignore | 1 + src/Makefile.am | 12 +++++++++--- src/rpc/virkeepaliveprotocol.x | 8 ++++++++ 3 files changed, 18 insertions(+), 3 deletions(-) create mode 100644 src/rpc/virkeepaliveprotocol.x diff --git a/.gitignore b/.gitignore index 41fa50f..3859fab 100644 --- a/.gitignore +++ b/.gitignore @@ -65,6 +65,7 @@ /src/locking/qemu-sanlock.conf /src/remote/*_client_bodies.h /src/remote/*_protocol.[ch] +/src/rpc/virkeepaliveprotocol.[ch] /src/rpc/virnetprotocol.[ch] /src/util/virkeymaps.h /tests/*.log diff --git a/src/Makefile.am b/src/Makefile.am index 738ee91..ff890e1 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -266,7 +266,8 @@ PDWTAGS = \ PROTOCOL_STRUCTS = \ $(srcdir)/remote_protocol-structs \ $(srcdir)/qemu_protocol-structs \ - $(srcdir)/virnetprotocol-structs + $(srcdir)/virnetprotocol-structs \ + $(srcdir)/virkeepaliveprotocol-structs if WITH_REMOTE # The .o file that pdwtags parses is created as a side effect of running # libtool; but from make's perspective we depend on the .lo file. @@ -274,6 +275,7 @@ $(srcdir)/%_protocol-structs: libvirt_driver_remote_la-%_protocol.lo $(PDWTAGS) $(srcdir)/virnetprotocol-structs: libvirt_net_rpc_la-virnetprotocol.lo $(PDWTAGS) +$(srcdir)/virkeepaliveprotocol-structs: libvirt_net_rpc_la-virkeepaliveprotocol.lo else !WITH_REMOTE # These generated files must live in git, because they cannot be re-generated # when configured --without-remote. @@ -1307,12 +1309,15 @@ noinst_LTLIBRARIES += \ EXTRA_DIST += \ rpc/virnetprotocol.x \ + rpc/virkeepaliveprotocol.x \ rpc/gendispatch.pl \ rpc/genprotocol.pl VIR_NET_RPC_GENERATED = \ $(srcdir)/rpc/virnetprotocol.h \ - $(srcdir)/rpc/virnetprotocol.c + $(srcdir)/rpc/virnetprotocol.c \ + $(srcdir)/rpc/virkeepaliveprotocol.h \ + $(srcdir)/rpc/virkeepaliveprotocol.c BUILT_SOURCES += $(VIR_NET_RPC_GENERATED) @@ -1320,7 +1325,8 @@ libvirt_net_rpc_la_SOURCES = \ rpc/virnetmessage.h rpc/virnetmessage.c \ rpc/virnetprotocol.h rpc/virnetprotocol.c \ rpc/virnetsocket.h rpc/virnetsocket.c \ - rpc/virnettlscontext.h rpc/virnettlscontext.c + rpc/virnettlscontext.h rpc/virnettlscontext.c \ + rpc/virkeepaliveprotocol.h rpc/virkeepaliveprotocol.c if HAVE_SASL libvirt_net_rpc_la_SOURCES += \ rpc/virnetsaslcontext.h rpc/virnetsaslcontext.c diff --git a/src/rpc/virkeepaliveprotocol.x b/src/rpc/virkeepaliveprotocol.x new file mode 100644 index 0000000..326ea78 --- /dev/null +++ b/src/rpc/virkeepaliveprotocol.x @@ -0,0 +1,8 @@ +const KEEPALIVE_PROGRAM = 0x6b656570; +const KEEPALIVE_VERSION = 1; + +enum keepalive_procedure { + KEEPALIVE_PROC_ADVERTISE = 1, + KEEPALIVE_PROC_PING = 2, + KEEPALIVE_PROC_PONG = 3 +}; -- 1.7.6.1

On Fri, Sep 23, 2011 at 10:24:46AM +0200, Jiri Denemark wrote:
The keepalive program has three procedures: ADVERTISE, PING, and PONG. All are used only in asynchronous messages and the sender doesn't wait for any reply. However, the party which receives PING messages is supposed to react by sending PONG message the other party, but no explicit binding between PING and PONG messages is made. ADVERTISE is sent by a client to indicate it supports keepalive protocol. Server is not allowed to send any keepalive message until it sees ADVERTISE.
I guess I'm not entirely understanding what the point of the ADVERTISE message here is? IIUC, the flow of messages you are describing will end up as: 1. C -> S remote_supports_feature_args (KEEPALIVE) 2. S -> C remote_supports_feature_ret (TRUE|FALSE) 3. C -> S keepalive ADVERTISE 4. C -> S keepalive PING 5. S -> C keepalive PONG 6. C -> S keepalive PING 7. S -> C keepalive PONG ... n. C -> S keepalive PING n+1. S -> C keepalive PONG We need to the remote_supports_feature method to determine if the keepalive protocol is supported, what purpose is the ADVERTISE message serving ? Regards, Daniel -- |: http://berrange.com -o- http://www.flickr.com/photos/dberrange/ :| |: http://libvirt.org -o- http://virt-manager.org :| |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :| |: http://entangle-photo.org -o- http://live.gnome.org/gtk-vnc :|

On Thu, Sep 29, 2011 at 14:19:43 +0100, Daniel P. Berrange wrote:
On Fri, Sep 23, 2011 at 10:24:46AM +0200, Jiri Denemark wrote:
The keepalive program has three procedures: ADVERTISE, PING, and PONG. All are used only in asynchronous messages and the sender doesn't wait for any reply. However, the party which receives PING messages is supposed to react by sending PONG message the other party, but no explicit binding between PING and PONG messages is made. ADVERTISE is sent by a client to indicate it supports keepalive protocol. Server is not allowed to send any keepalive message until it sees ADVERTISE.
I guess I'm not entirely understanding what the point of the ADVERTISE message here is?
IIUC, the flow of messages you are describing will end up as:
1. C -> S remote_supports_feature_args (KEEPALIVE) 2. S -> C remote_supports_feature_ret (TRUE|FALSE) 3. C -> S keepalive ADVERTISE 4. C -> S keepalive PING 5. S -> C keepalive PONG 6. C -> S keepalive PING 7. S -> C keepalive PONG ... n. C -> S keepalive PING n+1. S -> C keepalive PONG
We need to the remote_supports_feature method to determine if the keepalive protocol is supported, what purpose is the ADVERTISE message serving ?
PING messages can be sent by both client and server and sending them can be independently disabled on both sides. ADVERTISE is sent because a server may never get any PING messages from a client which was configured not to send them. But the server still wants to know that the client supports this feature and thus the server can send PING messages. Also there is a keepalive timeout between ADVERTISE and PING, which can be different on both sides so server may decide to send PING earlier than the client. The server could probably deduce that the client supports keepalive from the fact that the client calls remote_supports_feature(KEEPALIVE) but that seems weak to me and sending a separate ADVERTISE message looks better :-) Jirka

On 09/29/2011 07:46 AM, Jiri Denemark wrote:
On Thu, Sep 29, 2011 at 14:19:43 +0100, Daniel P. Berrange wrote:
On Fri, Sep 23, 2011 at 10:24:46AM +0200, Jiri Denemark wrote:
The keepalive program has three procedures: ADVERTISE, PING, and PONG. All are used only in asynchronous messages and the sender doesn't wait for any reply. However, the party which receives PING messages is supposed to react by sending PONG message the other party, but no explicit binding between PING and PONG messages is made. ADVERTISE is sent by a client to indicate it supports keepalive protocol. Server is not allowed to send any keepalive message until it sees ADVERTISE.
I guess I'm not entirely understanding what the point of the ADVERTISE message here is?
IIUC, the flow of messages you are describing will end up as:
1. C -> S remote_supports_feature_args (KEEPALIVE) 2. S -> C remote_supports_feature_ret (TRUE|FALSE) 3. C -> S keepalive ADVERTISE 4. C -> S keepalive PING 5. S -> C keepalive PONG 6. C -> S keepalive PING 7. S -> C keepalive PONG ... n. C -> S keepalive PING n+1. S -> C keepalive PONG
We need to the remote_supports_feature method to determine if the keepalive protocol is supported, what purpose is the ADVERTISE message serving ?
PING messages can be sent by both client and server and sending them can be independently disabled on both sides. ADVERTISE is sent because a server may never get any PING messages from a client which was configured not to send them. But the server still wants to know that the client supports this feature and thus the server can send PING messages. Also there is a keepalive timeout between ADVERTISE and PING, which can be different on both sides so server may decide to send PING earlier than the client.
So, if I understand correctly, there are two flows: client->server pings: virConnectAllowKeepAlive() 1. C -> S remote_supports_feature_args (KEEPALIVE) 2. S -> C remote_supports_feature_ret (TRUE|FALSE) virConnectStartKeepAlive() 4a. C -> S keepalive PING 5a. S -> C keepalive PONG 6a. C -> S keepalive PING 7a. S -> C keepalive PONG ... server->client pings: virConnectAllowKeepAlive() 1. C -> S remote_supports_feature_args (KEEPALIVE) 2. S -> C remote_supports_feature_ret (TRUE|FALSE) 3. C -> S keepalive ADVERTISE server checks conf file 4b. S -> C keepalive PING 5b. C -> S keepalive PONG 6b. S -> C keepalive PING 7b. C -> S keepalive PONG ... Steps 1-3 are shared by both directions (although step 3 is not essential in the c->s direction). Remaining ping/pong sequences are independent, and can operate at different frequencies (c->s at the frequency in virConnectStartKeepAlive(), and s->c at the frequency in the conf file). Either side can terminate the connection if enough PONGs are not received (c->s terminates according to count in virConnectStartKeepAlive(), and s->c according to conf file). -- Eric Blake eblake@redhat.com +1-801-349-2682 Libvirt virtualization library http://libvirt.org

On Thu, Sep 29, 2011 at 08:30:13 -0600, Eric Blake wrote:
On 09/29/2011 07:46 AM, Jiri Denemark wrote:
PING messages can be sent by both client and server and sending them can be independently disabled on both sides. ADVERTISE is sent because a server may never get any PING messages from a client which was configured not to send them. But the server still wants to know that the client supports this feature and thus the server can send PING messages. Also there is a keepalive timeout between ADVERTISE and PING, which can be different on both sides so server may decide to send PING earlier than the client.
So, if I understand correctly, there are two flows: ...
Yes, you got it right. Jirka

On Thu, Sep 29, 2011 at 03:46:05PM +0200, Jiri Denemark wrote:
On Thu, Sep 29, 2011 at 14:19:43 +0100, Daniel P. Berrange wrote:
On Fri, Sep 23, 2011 at 10:24:46AM +0200, Jiri Denemark wrote:
The keepalive program has three procedures: ADVERTISE, PING, and PONG. All are used only in asynchronous messages and the sender doesn't wait for any reply. However, the party which receives PING messages is supposed to react by sending PONG message the other party, but no explicit binding between PING and PONG messages is made. ADVERTISE is sent by a client to indicate it supports keepalive protocol. Server is not allowed to send any keepalive message until it sees ADVERTISE.
I guess I'm not entirely understanding what the point of the ADVERTISE message here is?
IIUC, the flow of messages you are describing will end up as:
1. C -> S remote_supports_feature_args (KEEPALIVE) 2. S -> C remote_supports_feature_ret (TRUE|FALSE) 3. C -> S keepalive ADVERTISE 4. C -> S keepalive PING 5. S -> C keepalive PONG 6. C -> S keepalive PING 7. S -> C keepalive PONG ... n. C -> S keepalive PING n+1. S -> C keepalive PONG
We need to the remote_supports_feature method to determine if the keepalive protocol is supported, what purpose is the ADVERTISE message serving ?
PING messages can be sent by both client and server and sending them can be independently disabled on both sides. ADVERTISE is sent because a server may never get any PING messages from a client which was configured not to send them. But the server still wants to know that the client supports this feature and thus the server can send PING messages. Also there is a keepalive timeout between ADVERTISE and PING, which can be different on both sides so server may decide to send PING earlier than the client.
Ok, in the case where the intervals are different it appears that whomever has the shorter interval will start PING'ing first, and thus the other side will only ever send PONGs, and never try to send PINGs itself. So we're not getting any duplicated PING/PONG in both direction which is good.
The server could probably deduce that the client supports keepalive from the fact that the client calls remote_supports_feature(KEEPALIVE) but that seems weak to me and sending a separate ADVERTISE message looks better :-)
I've got some comments about initialization, but I'll post them against another patch. Daniel -- |: http://berrange.com -o- http://www.flickr.com/photos/dberrange/ :| |: http://libvirt.org -o- http://virt-manager.org :| |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :| |: http://entangle-photo.org -o- http://live.gnome.org/gtk-vnc :|

These APIs are used by both client and server RPC layer to handle processing of keepalive messages. --- po/POTFILES.in | 1 + src/Makefile.am | 3 +- src/rpc/virkeepalive.c | 464 ++++++++++++++++++++++++++++++++++++++++++++++++ src/rpc/virkeepalive.h | 58 ++++++ 4 files changed, 525 insertions(+), 1 deletions(-) create mode 100644 src/rpc/virkeepalive.c create mode 100644 src/rpc/virkeepalive.h diff --git a/po/POTFILES.in b/po/POTFILES.in index 5ce35ae..71254dd 100644 --- a/po/POTFILES.in +++ b/po/POTFILES.in @@ -72,6 +72,7 @@ src/qemu/qemu_monitor_text.c src/qemu/qemu_process.c src/remote/remote_client_bodies.h src/remote/remote_driver.c +src/rpc/virkeepalive.c src/rpc/virnetclient.c src/rpc/virnetclientprogram.c src/rpc/virnetclientstream.c diff --git a/src/Makefile.am b/src/Makefile.am index ff890e1..d983d28 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -1326,7 +1326,8 @@ libvirt_net_rpc_la_SOURCES = \ rpc/virnetprotocol.h rpc/virnetprotocol.c \ rpc/virnetsocket.h rpc/virnetsocket.c \ rpc/virnettlscontext.h rpc/virnettlscontext.c \ - rpc/virkeepaliveprotocol.h rpc/virkeepaliveprotocol.c + rpc/virkeepaliveprotocol.h rpc/virkeepaliveprotocol.c \ + rpc/virkeepalive.h rpc/virkeepalive.c if HAVE_SASL libvirt_net_rpc_la_SOURCES += \ rpc/virnetsaslcontext.h rpc/virnetsaslcontext.c diff --git a/src/rpc/virkeepalive.c b/src/rpc/virkeepalive.c new file mode 100644 index 0000000..5536b61 --- /dev/null +++ b/src/rpc/virkeepalive.c @@ -0,0 +1,464 @@ +/* + * virkeepalive.c: keepalive handling + * + * Copyright (C) 2011 Red Hat, Inc. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + * + * Author: Jiri Denemark <jdenemar@redhat.com> + */ + +#include <config.h> + +#include "memory.h" +#include "threads.h" +#include "virfile.h" +#include "logging.h" +#include "util.h" +#include "virterror_internal.h" +#include "virnetsocket.h" +#include "virkeepaliveprotocol.h" +#include "virkeepalive.h" + +#define VIR_FROM_THIS VIR_FROM_RPC +#define virNetError(code, ...) \ + virReportErrorHelper(VIR_FROM_THIS, code, __FILE__, \ + __FUNCTION__, __LINE__, __VA_ARGS__) + +struct _virKeepAlive { + int refs; + virMutex lock; + + int interval; + unsigned int count; + unsigned int countToDeath; + time_t lastPacketReceived; + bool advertised; + int timer; + + virNetMessagePtr response; + int responseTimer; + + virKeepAliveSendFunc sendCB; + virKeepAliveDeadFunc deadCB; + virKeepAliveFreeFunc freeCB; + void *client; +}; + + +static void +virKeepAliveLock(virKeepAlivePtr ka) +{ + virMutexLock(&ka->lock); +} + +static void +virKeepAliveUnlock(virKeepAlivePtr ka) +{ + virMutexUnlock(&ka->lock); +} + + +static virNetMessagePtr +virKeepAliveMessage(int proc) +{ + virNetMessagePtr msg; + + if (!(msg = virNetMessageNew(false))) + return NULL; + + msg->header.prog = KEEPALIVE_PROGRAM; + msg->header.vers = KEEPALIVE_VERSION; + msg->header.type = VIR_NET_MESSAGE; + msg->header.proc = proc; + + if (virNetMessageEncodeHeader(msg) < 0 || + virNetMessageEncodePayloadEmpty(msg) < 0) { + virNetMessageFree(msg); + return NULL; + } + + return msg; +} + + +static int +virKeepAliveSend(virKeepAlivePtr ka, virNetMessagePtr msg) +{ + int ret; + const char *proc; + void *client = ka->client; + virKeepAliveSendFunc sendCB = ka->sendCB; + + switch (msg->header.proc) { + case KEEPALIVE_PROC_ADVERTISE: + proc = "advertisement"; + break; + case KEEPALIVE_PROC_PING: + proc = "request"; + break; + case KEEPALIVE_PROC_PONG: + proc = "response"; + break; + } + + VIR_DEBUG("Sending keepalive %s to client %p", proc, ka->client); + + ka->refs++; + virKeepAliveUnlock(ka); + + if ((ret = sendCB(client, msg)) < 0) { + VIR_WARN("Failed to send keepalive %s to client %p", proc, client); + virNetMessageFree(msg); + } + + virKeepAliveLock(ka); + ka->refs--; + + return ret; +} + + +int +virKeepAliveAdvertise(virKeepAlivePtr ka) +{ + virNetMessagePtr msg; + int ret = -1; + + virKeepAliveLock(ka); + + VIR_DEBUG("Advertising keepalive support to client %p", ka->client); + + if (!(msg = virKeepAliveMessage(KEEPALIVE_PROC_ADVERTISE))) { + VIR_WARN("Failed to generate keepalive advertisement"); + goto cleanup; + } + + if ((ret = virKeepAliveSend(ka, msg)) == 0) + ka->advertised = true; + +cleanup: + virKeepAliveUnlock(ka); + return ret; +} + + +static void +virKeepAliveScheduleResponse(virKeepAlivePtr ka) +{ + if (ka->responseTimer == -1) + return; + + VIR_DEBUG("Scheduling keepalive response to client %p", ka->client); + + if (!ka->response && + !(ka->response = virKeepAliveMessage(KEEPALIVE_PROC_PONG))) { + VIR_WARN("Failed to generate keepalive response"); + return; + } + + virEventUpdateTimeout(ka->responseTimer, 0); +} + + +static void +virKeepAliveTimer(int timer ATTRIBUTE_UNUSED, void *opaque) +{ + virKeepAlivePtr ka = opaque; + time_t now = time(NULL); + + virKeepAliveLock(ka); + + VIR_DEBUG("ka=%p, client=%p, countToDeath=%d, lastPacketReceived=%lds ago", + ka, ka->client, ka->countToDeath, now - ka->lastPacketReceived); + + if (now - ka->lastPacketReceived < ka->interval - 1) { + int timeout = ka->interval - (now - ka->lastPacketReceived); + virEventUpdateTimeout(ka->timer, timeout * 1000); + goto cleanup; + } + + if (ka->countToDeath == 0) { + virKeepAliveDeadFunc deadCB = ka->deadCB; + void *client = ka->client; + + VIR_WARN("No response from client %p after %d keepalive messages in" + " %d seconds", + ka->client, + ka->count, + (int) (now - ka->lastPacketReceived)); + ka->refs++; + virKeepAliveUnlock(ka); + deadCB(client); + virKeepAliveLock(ka); + ka->refs--; + } else { + virNetMessagePtr msg; + + ka->countToDeath--; + if (!(msg = virKeepAliveMessage(KEEPALIVE_PROC_PING))) + VIR_WARN("Failed to generate keepalive request"); + else + ignore_value(virKeepAliveSend(ka, msg)); + virEventUpdateTimeout(ka->timer, ka->interval * 1000); + } + +cleanup: + virKeepAliveUnlock(ka); +} + + +static void +virKeepAliveResponseTimer(int timer ATTRIBUTE_UNUSED, void *opaque) +{ + virKeepAlivePtr ka = opaque; + virNetMessagePtr msg; + + virKeepAliveLock(ka); + + VIR_DEBUG("ka=%p, client=%p, response=%p", + ka, ka->client, ka->response); + + if (ka->response) { + msg = ka->response; + ka->response = NULL; + ignore_value(virKeepAliveSend(ka, msg)); + } + + virEventUpdateTimeout(ka->responseTimer, ka->response ? 0 : -1); + + virKeepAliveUnlock(ka); +} + + +static void +virKeepAliveTimerFree(void *opaque) +{ + virKeepAliveFree(opaque); +} + + +virKeepAlivePtr +virKeepAliveNew(int interval, + unsigned int count, + void *client, + virKeepAliveSendFunc sendCB, + virKeepAliveDeadFunc deadCB, + virKeepAliveFreeFunc freeCB) +{ + virKeepAlivePtr ka; + + VIR_DEBUG("client=%p, interval=%d, count=%u", client, interval, count); + + if (VIR_ALLOC(ka) < 0) { + virReportOOMError(); + return NULL; + } + + if (virMutexInit(&ka->lock) < 0) { + VIR_FREE(ka); + return NULL; + } + + ka->refs = 1; + ka->interval = interval; + ka->count = count; + ka->countToDeath = count; + ka->timer = -1; + ka->client = client; + ka->sendCB = sendCB; + ka->deadCB = deadCB; + ka->freeCB = freeCB; + + ka->responseTimer = virEventAddTimeout(-1, virKeepAliveResponseTimer, + ka, virKeepAliveTimerFree); + if (ka->responseTimer < 0) { + virKeepAliveFree(ka); + return NULL; + } + /* the timer now has a reference to ka */ + ka->refs++; + + return ka; +} + + +void +virKeepAliveRef(virKeepAlivePtr ka) +{ + virKeepAliveLock(ka); + ka->refs++; + VIR_DEBUG("ka=%p, client=%p, refs=%d", ka, ka->client, ka->refs); + virKeepAliveUnlock(ka); +} + + +void +virKeepAliveFree(virKeepAlivePtr ka) +{ + if (!ka) + return; + + virKeepAliveLock(ka); + VIR_DEBUG("ka=%p, client=%p, refs=%d", ka, ka->client, ka->refs); + if (--ka->refs > 0) { + virKeepAliveUnlock(ka); + return; + } + + virMutexDestroy(&ka->lock); + ka->freeCB(ka->client); + VIR_FREE(ka); +} + + +int +virKeepAliveStart(virKeepAlivePtr ka, + int interval, + unsigned int count) +{ + int ret = -1; + time_t delay; + int timeout; + + VIR_DEBUG("ka=%p, client=%p, interval=%d, count=%u", + ka, ka->client, interval, count); + + virKeepAliveLock(ka); + + if (!ka->advertised) { + virNetError(VIR_ERR_INTERNAL_ERROR, "%s", + _("keepalive support was not advertised to remote party")); + goto cleanup; + } + + if (ka->timer >= 0) { + VIR_DEBUG("Keepalive messages already enabled"); + ret = 0; + goto cleanup; + } + + if (interval > 0) { + if (ka->interval > 0) { + virNetError(VIR_ERR_INTERNAL_ERROR, "%s", + _("keepalive interval already set")); + goto cleanup; + } + ka->interval = interval; + ka->count = count; + ka->countToDeath = count; + } + + if (ka->interval <= 0) { + VIR_DEBUG("Keepalive messages disabled by configuration"); + ret = 0; + goto cleanup; + } + + VIR_DEBUG("Enabling keepalive messages; interval=%d, count=%u", + ka->interval, ka->count); + + delay = time(NULL) - ka->lastPacketReceived; + if (delay > ka->interval) + timeout = 0; + else + timeout = ka->interval - delay; + ka->timer = virEventAddTimeout(timeout * 1000, virKeepAliveTimer, + ka, virKeepAliveTimerFree); + if (ka->timer < 0) + goto cleanup; + + /* the timer now has another reference to this object */ + ka->refs++; + ret = 0; + +cleanup: + virKeepAliveUnlock(ka); + return ret; +} + + +void +virKeepAliveStop(virKeepAlivePtr ka) +{ + VIR_DEBUG("ka=%p, client=%p", ka, ka->client); + + virKeepAliveLock(ka); + if (ka->timer > 0) { + virEventRemoveTimeout(ka->timer); + ka->timer = -1; + } + if (ka->responseTimer > 0) { + virEventRemoveTimeout(ka->responseTimer); + ka->responseTimer = -1; + } + virKeepAliveUnlock(ka); +} + + +bool +virKeepAliveCheckMessage(virKeepAlivePtr ka, + virNetMessagePtr msg) +{ + bool ret = false; + bool start = false; + + VIR_DEBUG("ka=%p, client=%p, msg=%p", + ka, ka ? ka->client : "(null)", msg); + + if (!ka) + return false; + + virKeepAliveLock(ka); + + ka->countToDeath = ka->count; + ka->lastPacketReceived = time(NULL); + + if (msg->header.prog == KEEPALIVE_PROGRAM && + msg->header.vers == KEEPALIVE_VERSION && + msg->header.type == VIR_NET_MESSAGE) { + ret = true; + switch (msg->header.proc) { + case KEEPALIVE_PROC_ADVERTISE: + VIR_DEBUG("Client %p advertises keepalive support", ka->client); + ka->advertised = true; + start = true; + break; + + case KEEPALIVE_PROC_PING: + VIR_DEBUG("Got keepalive request from client %p", ka->client); + virKeepAliveScheduleResponse(ka); + break; + + case KEEPALIVE_PROC_PONG: + VIR_DEBUG("Got keepalive response from client %p", ka->client); + break; + + default: + VIR_DEBUG("Ignoring unknown keepalive message %d from client %p", + msg->header.proc, ka->client); + } + } + + if (ka->timer >= 0) + virEventUpdateTimeout(ka->timer, ka->interval * 1000); + + virKeepAliveUnlock(ka); + + if (start && virKeepAliveStart(ka, 0, 0) < 0) + VIR_WARN("Failed to start keepalive protocol"); + + return ret; +} diff --git a/src/rpc/virkeepalive.h b/src/rpc/virkeepalive.h new file mode 100644 index 0000000..ac96859 --- /dev/null +++ b/src/rpc/virkeepalive.h @@ -0,0 +1,58 @@ +/* + * virkeepalive.h: keepalive handling + * + * Copyright (C) 2011 Red Hat, Inc. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + * + * Author: Jiri Denemark <jdenemar@redhat.com> + */ + +#ifndef __VIR_KEEPALIVE_H__ +# define __VIR_KEEPALIVE_H__ + +# include "virnetmessage.h" + +typedef int (*virKeepAliveSendFunc)(void *client, virNetMessagePtr msg); +typedef void (*virKeepAliveDeadFunc)(void *client); +typedef void (*virKeepAliveFreeFunc)(void *client); + +typedef struct _virKeepAlive virKeepAlive; +typedef virKeepAlive *virKeepAlivePtr; + + +virKeepAlivePtr virKeepAliveNew(int interval, + unsigned int count, + void *client, + virKeepAliveSendFunc sendCB, + virKeepAliveDeadFunc deadCB, + virKeepAliveFreeFunc freeCB) + ATTRIBUTE_NONNULL(3) ATTRIBUTE_NONNULL(4) + ATTRIBUTE_NONNULL(5) ATTRIBUTE_NONNULL(6); + +void virKeepAliveRef(virKeepAlivePtr ka); +void virKeepAliveFree(virKeepAlivePtr ka); + +int virKeepAliveAdvertise(virKeepAlivePtr ka); + +int virKeepAliveStart(virKeepAlivePtr ka, + int interval, + unsigned int count); +void virKeepAliveStop(virKeepAlivePtr ka); + +bool virKeepAliveCheckMessage(virKeepAlivePtr ka, + virNetMessagePtr msg); + +#endif /* __VIR_KEEPALIVE_H__ */ -- 1.7.6.1

This introduces virConnectAllowKeepAlive and virConnectStartKeepAlive public APIs which can be used by a client connecting to remote server to indicate support for keepalive protocol. Both APIs are handled directly by remote driver and not transmitted over the wire to the server. --- include/libvirt/libvirt.h.in | 5 ++ src/driver.h | 9 ++++ src/libvirt.c | 107 ++++++++++++++++++++++++++++++++++++++++++ src/libvirt_internal.h | 10 +++- src/libvirt_public.syms | 6 ++ 5 files changed, 135 insertions(+), 2 deletions(-) diff --git a/include/libvirt/libvirt.h.in b/include/libvirt/libvirt.h.in index 39155a6..6f61cc0 100644 --- a/include/libvirt/libvirt.h.in +++ b/include/libvirt/libvirt.h.in @@ -3210,6 +3210,11 @@ typedef struct _virTypedParameter virMemoryParameter; */ typedef virMemoryParameter *virMemoryParameterPtr; +int virConnectAllowKeepAlive(virConnectPtr conn); +int virConnectStartKeepAlive(virConnectPtr conn, + int interval, + unsigned int count); + #ifdef __cplusplus } #endif diff --git a/src/driver.h b/src/driver.h index 3792003..cd17d83 100644 --- a/src/driver.h +++ b/src/driver.h @@ -718,6 +718,13 @@ typedef int (*virDrvDomainBlockPull)(virDomainPtr dom, const char *path, unsigned long bandwidth, unsigned int flags); +typedef int + (*virDrvAllowKeepAlive)(virConnectPtr conn); + +typedef int + (*virDrvStartKeepAlive)(virConnectPtr conn, + int interval, + unsigned int count); /** * _virDriver: @@ -872,6 +879,8 @@ struct _virDriver { virDrvDomainGetBlockJobInfo domainGetBlockJobInfo; virDrvDomainBlockJobSetSpeed domainBlockJobSetSpeed; virDrvDomainBlockPull domainBlockPull; + virDrvAllowKeepAlive allowKeepAlive; + virDrvStartKeepAlive startKeepAlive; }; typedef int diff --git a/src/libvirt.c b/src/libvirt.c index 8f94b11..138f367 100644 --- a/src/libvirt.c +++ b/src/libvirt.c @@ -16590,3 +16590,110 @@ error: virDispatchError(dom->conn); return -1; } + +/** + * virConnectAllowKeepAlive: + * @conn: pointer to a hypervisor connection + * + * Tell remote party we support keepalive messages so the it can use them and + * we will respond to them. To actually start sending keepalive messages to a + * client needs to call virConnectStartKeepAlive(). + * + * Note: client has to implement and run event loop to be able to respond to + * asynchronous keepalive messages. If a client doesn't run event loop but + * still calls this API, every connection made may be automatically closed by + * remote party after a certain period of inactivity. + * + * Returns -1 on error, 0 on success, 1 when remote party doesn't support + * keepalive messages. + */ +int virConnectAllowKeepAlive(virConnectPtr conn) +{ + int ret = -1; + + VIR_DEBUG("conn=%p", conn); + + virResetLastError(); + + if (!VIR_IS_CONNECT(conn)) { + virLibConnError(VIR_ERR_INVALID_CONN, __FUNCTION__); + virDispatchError(NULL); + return -1; + } + + if (!VIR_DRV_SUPPORTS_FEATURE(conn->driver, conn, + VIR_DRV_FEATURE_PROGRAM_KEEPALIVE)) { + VIR_DEBUG("Remote party doesn't support keepalive messages"); + return 1; + } + + if (conn->driver->allowKeepAlive) { + ret = conn->driver->allowKeepAlive(conn); + if (ret < 0) + goto error; + return ret; + } + + virLibConnError(VIR_ERR_NO_SUPPORT, __FUNCTION__); + +error: + virDispatchError(conn); + return -1; +} + +/** + * virConnectStartKeepAlive: + * @conn: pointer to a hypervisor connection + * @interval: number of seconds of inactivity before a keepalive message is sent + * @count: number of messages that can be sent in a row + * + * Start sending keepalive messages after interval second of inactivity and + * consider the connection to be broken when no response is received after + * count keepalive messages sent in a row. In other words, sending count + 1 + * keepalive message results in closing the connection. + * + * This API may be called only after calling virConnectAllowKeepAlive and + * checking it returned 0, which ensures remote party supports keepalive + * protocol. Failure to do so will be detected and reported as an error. + * + * Note: client has to implement and run event loop to be able to use keepalive + * messages. Failture to do so may result in connections being closed + * unexpectedly. + * + * Returns 0 on success, -1 on error. + */ +int virConnectStartKeepAlive(virConnectPtr conn, + int interval, + unsigned int count) +{ + int ret = -1; + + VIR_DEBUG("conn=%p, interval=%d, count=%u", conn, interval, count); + + virResetLastError(); + + if (!VIR_IS_CONNECT(conn)) { + virLibConnError(VIR_ERR_INVALID_CONN, __FUNCTION__); + virDispatchError(NULL); + return -1; + } + + if (interval <= 0) { + virLibConnError(VIR_ERR_INVALID_ARG, + _("negative or zero interval make no sense")); + goto error; + } + + if (conn->driver->startKeepAlive) { + ret = conn->driver->startKeepAlive(conn, interval, count); + if (ret < 0) + goto error; + return ret; + } + + virLibConnError(VIR_ERR_NO_SUPPORT, __FUNCTION__); + +error: + virDispatchError(conn); + return -1; +} diff --git a/src/libvirt_internal.h b/src/libvirt_internal.h index 6e44341..dbbf7e0 100644 --- a/src/libvirt_internal.h +++ b/src/libvirt_internal.h @@ -39,8 +39,8 @@ int virStateActive(void); * * The remote driver passes features through to the real driver at the * remote end unmodified, except if you query a VIR_DRV_FEATURE_REMOTE* - * feature. - * + * feature. Queries for VIR_DRV_FEATURE_PROGRAM* features are answered + * directly by the RPC layer and not by the real driver. */ enum { /* Driver supports V1-style virDomainMigrate, ie. domainMigratePrepare/ @@ -79,6 +79,12 @@ enum { * to domain configuration, i.e., starting from Begin3 and not Perform3. */ VIR_DRV_FEATURE_MIGRATE_CHANGE_PROTECTION = 7, + + /* + * Remote party supports keepalive program (i.e., sending keepalive + * messages). + */ + VIR_DRV_FEATURE_PROGRAM_KEEPALIVE = 8, }; diff --git a/src/libvirt_public.syms b/src/libvirt_public.syms index 8a6d55a..f7441d7 100644 --- a/src/libvirt_public.syms +++ b/src/libvirt_public.syms @@ -489,4 +489,10 @@ LIBVIRT_0.9.5 { virDomainSnapshotGetName; } LIBVIRT_0.9.4; +LIBVIRT_0.9.7 { + global: + virConnectAllowKeepAlive; + virConnectStartKeepAlive; +} LIBVIRT_0.9.5; + # .... define new API here using predicted next version number .... -- 1.7.6.1

On Fri, Sep 23, 2011 at 10:24:48AM +0200, Jiri Denemark wrote:
This introduces virConnectAllowKeepAlive and virConnectStartKeepAlive public APIs which can be used by a client connecting to remote server to indicate support for keepalive protocol. Both APIs are handled directly by remote driver and not transmitted over the wire to the server. --- include/libvirt/libvirt.h.in | 5 ++ src/driver.h | 9 ++++ src/libvirt.c | 107 ++++++++++++++++++++++++++++++++++++++++++ src/libvirt_internal.h | 10 +++- src/libvirt_public.syms | 6 ++ 5 files changed, 135 insertions(+), 2 deletions(-)
diff --git a/include/libvirt/libvirt.h.in b/include/libvirt/libvirt.h.in index 39155a6..6f61cc0 100644 --- a/include/libvirt/libvirt.h.in +++ b/include/libvirt/libvirt.h.in @@ -3210,6 +3210,11 @@ typedef struct _virTypedParameter virMemoryParameter; */ typedef virMemoryParameter *virMemoryParameterPtr;
+int virConnectAllowKeepAlive(virConnectPtr conn); +int virConnectStartKeepAlive(virConnectPtr conn, + int interval, + unsigned int count);
With this design, even if both the client and server support keepalive at a protocol level, it will only ever be enabled if the client application remembers to call virConnectAllowKeepAlive. I think this puts too much responsibility on the client, at the detriment of the server. An administrator of libvirtd might want to mandate use of keep alive for all clients (knowing this would exclude any libvirt client <= 0.9.6 of course). With this design they cannot do this since they are reliant on the client application programmer to call virConnectAllowKeepAlive, which I believe 95% of people will never bother todo. IMHO we should change this so that - In remote_driver.c, doRemoteOpen(), after performing authentication, but before opening the connection we should send a supports_feature(KEEPALIVE) - Upon receiving supports_feature(KEEPALIVE) the server shall be free to start sending keep alives, if it is configured todo so. - Libvirtd should not send server initiated keepalives by default since this is needlessly short-circuiting the normal TCP client timeout process. - libvirt has a configure paramater to turn on keepalives taking values 0 - disable (default) 1 - enable if supported by client 2 - enable, drop client if not supported and should error upon virConnectOpen if '2' and clients do not support it. - Client simply has a virConnectSetKeepAlive(conn, interval, count). This returns an error if the earlier feature check during doRemoteOpen failed. If 'interval' or 'count' is zero then this stops any previously started keepalive. Daniel -- |: http://berrange.com -o- http://www.flickr.com/photos/dberrange/ :| |: http://libvirt.org -o- http://virt-manager.org :| |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :| |: http://entangle-photo.org -o- http://live.gnome.org/gtk-vnc :|

On Mon, Oct 03, 2011 at 10:26:30AM +0100, Daniel P. Berrange wrote:
On Fri, Sep 23, 2011 at 10:24:48AM +0200, Jiri Denemark wrote:
This introduces virConnectAllowKeepAlive and virConnectStartKeepAlive public APIs which can be used by a client connecting to remote server to indicate support for keepalive protocol. Both APIs are handled directly by remote driver and not transmitted over the wire to the server. --- include/libvirt/libvirt.h.in | 5 ++ src/driver.h | 9 ++++ src/libvirt.c | 107 ++++++++++++++++++++++++++++++++++++++++++ src/libvirt_internal.h | 10 +++- src/libvirt_public.syms | 6 ++ 5 files changed, 135 insertions(+), 2 deletions(-)
diff --git a/include/libvirt/libvirt.h.in b/include/libvirt/libvirt.h.in index 39155a6..6f61cc0 100644 --- a/include/libvirt/libvirt.h.in +++ b/include/libvirt/libvirt.h.in @@ -3210,6 +3210,11 @@ typedef struct _virTypedParameter virMemoryParameter; */ typedef virMemoryParameter *virMemoryParameterPtr;
+int virConnectAllowKeepAlive(virConnectPtr conn); +int virConnectStartKeepAlive(virConnectPtr conn, + int interval, + unsigned int count);
With this design, even if both the client and server support keepalive at a protocol level, it will only ever be enabled if the client application remembers to call virConnectAllowKeepAlive. I think this puts too much responsibility on the client, at the detriment of the server.
An administrator of libvirtd might want to mandate use of keep alive for all clients (knowing this would exclude any libvirt client <= 0.9.6 of course). With this design they cannot do this since they are reliant on the client application programmer to call virConnectAllowKeepAlive, which I believe 95% of people will never bother todo.
IMHO we should change this so that
- In remote_driver.c, doRemoteOpen(), after performing authentication, but before opening the connection we should send a
supports_feature(KEEPALIVE)
- Upon receiving supports_feature(KEEPALIVE) the server shall be free to start sending keep alives, if it is configured todo so.
Hmm, actually I see elsewhere that responding to server initiated pings, requires help of the event loop which we can't assume is running. I guess we could enable it if we detect that an event loop has been registered. This would make us hit the virsh bug again where we register the event loop, but never run it. Arguably that bug should be fixed in virsh for other reasons, so perhaps this is justification enough todo so. We would still not be able to let the admin force enable it on the server though, since plenty of simple tools will never have any event loop... If only we had made use of an event loop mandatory all those years ago :-( Daniel -- |: http://berrange.com -o- http://www.flickr.com/photos/dberrange/ :| |: http://libvirt.org -o- http://virt-manager.org :| |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :| |: http://entangle-photo.org -o- http://live.gnome.org/gtk-vnc :|

On Mon, Oct 03, 2011 at 10:42:30 +0100, Daniel P. Berrange wrote:
On Mon, Oct 03, 2011 at 10:26:30AM +0100, Daniel P. Berrange wrote:
On Fri, Sep 23, 2011 at 10:24:48AM +0200, Jiri Denemark wrote:
This introduces virConnectAllowKeepAlive and virConnectStartKeepAlive public APIs which can be used by a client connecting to remote server to indicate support for keepalive protocol. Both APIs are handled directly by remote driver and not transmitted over the wire to the server. --- include/libvirt/libvirt.h.in | 5 ++ src/driver.h | 9 ++++ src/libvirt.c | 107 ++++++++++++++++++++++++++++++++++++++++++ src/libvirt_internal.h | 10 +++- src/libvirt_public.syms | 6 ++ 5 files changed, 135 insertions(+), 2 deletions(-)
diff --git a/include/libvirt/libvirt.h.in b/include/libvirt/libvirt.h.in index 39155a6..6f61cc0 100644 --- a/include/libvirt/libvirt.h.in +++ b/include/libvirt/libvirt.h.in @@ -3210,6 +3210,11 @@ typedef struct _virTypedParameter virMemoryParameter; */ typedef virMemoryParameter *virMemoryParameterPtr;
+int virConnectAllowKeepAlive(virConnectPtr conn); +int virConnectStartKeepAlive(virConnectPtr conn, + int interval, + unsigned int count);
With this design, even if both the client and server support keepalive at a protocol level, it will only ever be enabled if the client application remembers to call virConnectAllowKeepAlive. I think this puts too much responsibility on the client, at the detriment of the server.
An administrator of libvirtd might want to mandate use of keep alive for all clients (knowing this would exclude any libvirt client <= 0.9.6 of course). With this design they cannot do this since they are reliant on the client application programmer to call virConnectAllowKeepAlive, which I believe 95% of people will never bother todo.
IMHO we should change this so that
- In remote_driver.c, doRemoteOpen(), after performing authentication, but before opening the connection we should send a
supports_feature(KEEPALIVE)
- Upon receiving supports_feature(KEEPALIVE) the server shall be free to start sending keep alives, if it is configured todo so.
Hmm, actually I see elsewhere that responding to server initiated pings, requires help of the event loop which we can't assume is running.
I guess we could enable it if we detect that an event loop has been registered. This would make us hit the virsh bug again where we register the event loop, but never run it. Arguably that bug should be fixed in virsh for other reasons, so perhaps this is justification enough todo so.
Right, we could do that once the bug in virsh is fixed so I'll try to incorporate the virsh fix into this series.
If only we had made use of an event loop mandatory all those years ago :-(
Oh yes, client-side implementation would be much more simple without the buck passing algorithm. Although that would mean significantly less fun on that part :-) Jirka

--- daemon/libvirtd.aug | 4 + daemon/libvirtd.c | 11 ++++ daemon/libvirtd.conf | 15 +++++ daemon/remote.c | 38 +++++++++++++ src/remote/remote_protocol.x | 2 +- src/rpc/virnetserver.c | 10 +++ src/rpc/virnetserver.h | 2 + src/rpc/virnetserverclient.c | 126 +++++++++++++++++++++++++++++++++++++++--- src/rpc/virnetserverclient.h | 6 ++ 9 files changed, 204 insertions(+), 10 deletions(-) diff --git a/daemon/libvirtd.aug b/daemon/libvirtd.aug index ce00db5..6cd3f28 100644 --- a/daemon/libvirtd.aug +++ b/daemon/libvirtd.aug @@ -66,6 +66,9 @@ module Libvirtd = let auditing_entry = int_entry "audit_level" | bool_entry "audit_logging" + let keepalive_entry = int_entry "keepalive_interval" + | int_entry "keepalive_count" + (* Each enty in the config is one of the following three ... *) let entry = network_entry | sock_acl_entry @@ -75,6 +78,7 @@ module Libvirtd = | processing_entry | logging_entry | auditing_entry + | keepalive_entry let comment = [ label "#comment" . del /#[ \t]*/ "# " . store /([^ \t\n][^\n]*)?/ . del /\n/ "\n" ] let empty = [ label "#empty" . eol ] diff --git a/daemon/libvirtd.c b/daemon/libvirtd.c index d1bc3dd..4ad2b2e 100644 --- a/daemon/libvirtd.c +++ b/daemon/libvirtd.c @@ -146,6 +146,9 @@ struct daemonConfig { int audit_level; int audit_logging; + + int keepalive_interval; + unsigned int keepalive_count; }; enum { @@ -899,6 +902,9 @@ daemonConfigNew(bool privileged ATTRIBUTE_UNUSED) data->audit_level = 1; data->audit_logging = 0; + data->keepalive_interval = 5; + data->keepalive_count = 5; + localhost = virGetHostname(NULL); if (localhost == NULL) { /* we couldn't resolve the hostname; assume that we are @@ -1062,6 +1068,9 @@ daemonConfigLoad(struct daemonConfig *data, GET_CONF_STR (conf, filename, log_outputs); GET_CONF_INT (conf, filename, log_buffer_size); + GET_CONF_INT (conf, filename, keepalive_interval); + GET_CONF_INT (conf, filename, keepalive_count); + virConfFree (conf); return 0; @@ -1452,6 +1461,8 @@ int main(int argc, char **argv) { config->max_workers, config->prio_workers, config->max_clients, + config->keepalive_interval, + config->keepalive_count, config->mdns_adv ? config->mdns_name : NULL, use_polkit_dbus, remoteClientInitHook))) { diff --git a/daemon/libvirtd.conf b/daemon/libvirtd.conf index da3983e..4f6ab9f 100644 --- a/daemon/libvirtd.conf +++ b/daemon/libvirtd.conf @@ -366,3 +366,18 @@ # it with the output of the 'uuidgen' command and then # uncomment this entry #host_uuid = "00000000-0000-0000-0000-000000000000" + +################################################################### +# Keepalive protocol: +# This allows libvirtd to detect broken client connections or even +# dead client. A keepalive message is sent to a client after +# keepalive_interval seconds of inactivity to check if the client is +# still responding; keepalive_count is a maximum number of keepalive +# messages that are allowed to be sent to the client without getting +# any response before the connection is considered broken. In other +# words, the connection is automatically closed approximately after +# keepalive_interval * (keepalive_count + 1) seconds since the last +# message received from the client. +# +#keepalive_interval = 5 +#keepalive_count = 5 diff --git a/daemon/remote.c b/daemon/remote.c index 245d41c..946bb7e 100644 --- a/daemon/remote.c +++ b/daemon/remote.c @@ -3112,6 +3112,44 @@ cleanup: } +static int +remoteDispatchSupportsFeature(virNetServerPtr server ATTRIBUTE_UNUSED, + virNetServerClientPtr client, + virNetMessageHeaderPtr hdr ATTRIBUTE_UNUSED, + virNetMessageErrorPtr rerr, + remote_supports_feature_args *args, + remote_supports_feature_ret *ret) +{ + int rv = -1; + int supported; + struct daemonClientPrivate *priv = + virNetServerClientGetPrivateData(client); + + if (!priv->conn) { + virNetError(VIR_ERR_INTERNAL_ERROR, "%s", _("connection not open")); + goto cleanup; + } + + if (args->feature == VIR_DRV_FEATURE_PROGRAM_KEEPALIVE) { + supported = 1; + goto done; + } + + if ((supported = virDrvSupportsFeature(priv->conn, args->feature)) < 0) + goto cleanup; + +done: + ret->supported = supported; + rv = 0; + +cleanup: + if (rv < 0) + virNetMessageSaveError(rerr); + return rv; +} + + + /*----- Helpers. -----*/ /* get_nonnull_domain and get_nonnull_network turn an on-wire diff --git a/src/remote/remote_protocol.x b/src/remote/remote_protocol.x index 455e324..ac0cd0e 100644 --- a/src/remote/remote_protocol.x +++ b/src/remote/remote_protocol.x @@ -2307,7 +2307,7 @@ enum remote_procedure { REMOTE_PROC_DOMAIN_GET_SCHEDULER_PARAMETERS = 57, /* skipgen autogen */ REMOTE_PROC_DOMAIN_SET_SCHEDULER_PARAMETERS = 58, /* autogen autogen */ REMOTE_PROC_GET_HOSTNAME = 59, /* autogen autogen priority:high */ - REMOTE_PROC_SUPPORTS_FEATURE = 60, /* autogen autogen priority:high */ + REMOTE_PROC_SUPPORTS_FEATURE = 60, /* skipgen autogen priority:high */ REMOTE_PROC_DOMAIN_MIGRATE_PREPARE = 61, /* skipgen skipgen */ REMOTE_PROC_DOMAIN_MIGRATE_PERFORM = 62, /* autogen autogen */ diff --git a/src/rpc/virnetserver.c b/src/rpc/virnetserver.c index d71ed18..34dac66 100644 --- a/src/rpc/virnetserver.c +++ b/src/rpc/virnetserver.c @@ -102,6 +102,9 @@ struct _virNetServer { size_t nclients_max; virNetServerClientPtr *clients; + int keepaliveInterval; + unsigned int keepaliveCount; + unsigned int quit :1; virNetTLSContextPtr tls; @@ -239,6 +242,9 @@ static int virNetServerDispatchNewClient(virNetServerServicePtr svc ATTRIBUTE_UN virNetServerDispatchNewMessage, srv); + virNetServerClientInitKeepAlive(client, srv->keepaliveInterval, + srv->keepaliveCount); + virNetServerUnlock(srv); return 0; @@ -278,6 +284,8 @@ virNetServerPtr virNetServerNew(size_t min_workers, size_t max_workers, size_t priority_workers, size_t max_clients, + int keepaliveInterval, + unsigned int keepaliveCount, const char *mdnsGroupName, bool connectDBus ATTRIBUTE_UNUSED, virNetServerClientInitHook clientInitHook) @@ -299,6 +307,8 @@ virNetServerPtr virNetServerNew(size_t min_workers, goto error; srv->nclients_max = max_clients; + srv->keepaliveInterval = keepaliveInterval; + srv->keepaliveCount = keepaliveCount; srv->sigwrite = srv->sigread = -1; srv->clientInitHook = clientInitHook; srv->privileged = geteuid() == 0 ? true : false; diff --git a/src/rpc/virnetserver.h b/src/rpc/virnetserver.h index cc9d039..99b65d6 100644 --- a/src/rpc/virnetserver.h +++ b/src/rpc/virnetserver.h @@ -41,6 +41,8 @@ virNetServerPtr virNetServerNew(size_t min_workers, size_t max_workers, size_t priority_workers, size_t max_clients, + int keepaliveInterval, + unsigned int keepaliveCount, const char *mdnsGroupName, bool connectDBus, virNetServerClientInitHook clientInitHook); diff --git a/src/rpc/virnetserverclient.c b/src/rpc/virnetserverclient.c index 412814d..9bd8914 100644 --- a/src/rpc/virnetserverclient.c +++ b/src/rpc/virnetserverclient.c @@ -33,6 +33,7 @@ #include "virterror_internal.h" #include "memory.h" #include "threads.h" +#include "virkeepalive.h" #define VIR_FROM_THIS VIR_FROM_RPC #define virNetError(code, ...) \ @@ -98,6 +99,9 @@ struct _virNetServerClient void *privateData; virNetServerClientFreeFunc privateDataFreeFunc; virNetServerClientCloseFunc privateDataCloseFunc; + + virKeepAlivePtr keepalive; + int keepaliveFilter; }; @@ -207,15 +211,14 @@ static void virNetServerClientUpdateEvent(virNetServerClientPtr client) } -int virNetServerClientAddFilter(virNetServerClientPtr client, - virNetServerClientFilterFunc func, - void *opaque) +static int +virNetServerClientAddFilterLocked(virNetServerClientPtr client, + virNetServerClientFilterFunc func, + void *opaque) { virNetServerClientFilterPtr filter; int ret = -1; - virNetServerClientLock(client); - if (VIR_ALLOC(filter) < 0) { virReportOOMError(); goto cleanup; @@ -231,16 +234,26 @@ int virNetServerClientAddFilter(virNetServerClientPtr client, ret = filter->id; cleanup: - virNetServerClientUnlock(client); return ret; } +int virNetServerClientAddFilter(virNetServerClientPtr client, + virNetServerClientFilterFunc func, + void *opaque) +{ + int ret; -void virNetServerClientRemoveFilter(virNetServerClientPtr client, - int filterID) + virNetServerClientLock(client); + ret = virNetServerClientAddFilterLocked(client, func, opaque); + virNetServerClientUnlock(client); + return ret; +} + +static void +virNetServerClientRemoveFilterLocked(virNetServerClientPtr client, + int filterID) { virNetServerClientFilterPtr tmp, prev; - virNetServerClientLock(client); prev = NULL; tmp = client->filters; @@ -257,7 +270,13 @@ void virNetServerClientRemoveFilter(virNetServerClientPtr client, prev = tmp; tmp = tmp->next; } +} +void virNetServerClientRemoveFilter(virNetServerClientPtr client, + int filterID) +{ + virNetServerClientLock(client); + virNetServerClientRemoveFilterLocked(client, filterID); virNetServerClientUnlock(client); } @@ -318,6 +337,7 @@ virNetServerClientPtr virNetServerClientNew(virNetSocketPtr sock, client->readonly = readonly; client->tlsCtxt = tls; client->nrequests_max = nrequests_max; + client->keepaliveFilter = -1; if (tls) virNetTLSContextRef(tls); @@ -571,6 +591,7 @@ void virNetServerClientFree(virNetServerClientPtr client) void virNetServerClientClose(virNetServerClientPtr client) { virNetServerClientCloseFunc cf; + virKeepAlivePtr ka; virNetServerClientLock(client); VIR_DEBUG("client=%p refs=%d", client, client->refs); @@ -579,6 +600,20 @@ void virNetServerClientClose(virNetServerClientPtr client) return; } + if (client->keepaliveFilter >= 0) + virNetServerClientRemoveFilterLocked(client, client->keepaliveFilter); + + if (client->keepalive) { + virKeepAliveStop(client->keepalive); + ka = client->keepalive; + client->keepalive = NULL; + client->refs++; + virNetServerClientUnlock(client); + virKeepAliveFree(ka); + virNetServerClientLock(client); + client->refs--; + } + if (client->privateDataCloseFunc) { cf = client->privateDataCloseFunc; client->refs++; @@ -976,6 +1011,7 @@ int virNetServerClientSendMessage(virNetServerClientPtr client, VIR_DEBUG("msg=%p proc=%d len=%zu offset=%zu", msg, msg->header.proc, msg->bufferLength, msg->bufferOffset); + virNetServerClientLock(client); if (client->sock && !client->wantClose) { @@ -986,6 +1022,7 @@ int virNetServerClientSendMessage(virNetServerClientPtr client, } virNetServerClientUnlock(client); + return ret; } @@ -999,3 +1036,74 @@ bool virNetServerClientNeedAuth(virNetServerClientPtr client) virNetServerClientUnlock(client); return need; } + + +static void +virNetServerClientKeepAliveDeadCB(void *opaque) +{ + virNetServerClientImmediateClose(opaque); +} + +static int +virNetServerClientKeepAliveSendCB(void *opaque, + virNetMessagePtr msg) +{ + return virNetServerClientSendMessage(opaque, msg); +} + +static void +virNetServerClientFreeCB(void *opaque) +{ + virNetServerClientFree(opaque); +} + +static int +virNetServerClientKeepAliveFilter(virNetServerClientPtr client, + virNetMessagePtr msg, + void *opaque ATTRIBUTE_UNUSED) +{ + if (virKeepAliveCheckMessage(client->keepalive, msg)) { + virNetMessageFree(msg); + client->nrequests--; + return 1; + } + + return 0; +} + +int +virNetServerClientInitKeepAlive(virNetServerClientPtr client, + int interval, + unsigned int count) +{ + virKeepAlivePtr ka; + int ret = -1; + + virNetServerClientLock(client); + + if (!(ka = virKeepAliveNew(interval, count, client, + virNetServerClientKeepAliveSendCB, + virNetServerClientKeepAliveDeadCB, + virNetServerClientFreeCB))) + goto cleanup; + /* keepalive object has a reference to client */ + client->refs++; + + client->keepaliveFilter = + virNetServerClientAddFilterLocked(client, + virNetServerClientKeepAliveFilter, + NULL); + if (client->keepaliveFilter < 0) + goto cleanup; + + client->keepalive = ka; + ka = NULL; + +cleanup: + virNetServerClientUnlock(client); + if (ka) + virKeepAliveStop(ka); + virKeepAliveFree(ka); + + return ret; +} diff --git a/src/rpc/virnetserverclient.h b/src/rpc/virnetserverclient.h index bedb179..1987c6d 100644 --- a/src/rpc/virnetserverclient.h +++ b/src/rpc/virnetserverclient.h @@ -99,6 +99,12 @@ bool virNetServerClientWantClose(virNetServerClientPtr client); int virNetServerClientInit(virNetServerClientPtr client); +int virNetServerClientInitKeepAlive(virNetServerClientPtr client, + int interval, + unsigned int count); +bool virNetServerClientCheckKeepAlive(virNetServerClientPtr client, + virNetMessagePtr msg); + const char *virNetServerClientLocalAddrString(virNetServerClientPtr client); const char *virNetServerClientRemoteAddrString(virNetServerClientPtr client); -- 1.7.6.1

When a client wants to send a keepalive message it needs to do so in a non-blocking way to avoid blocking its event loop. This patch adds dontBlock flag which says that the call should be processed without blocking. Such calls do not have a thread waiting for the result associated with them. This means, that sending such call fails if no thread is dispatching and writing to the socket would block. In case there is a thread waiting for its (normal) call to finish, sending non-blocking call just pushes it into the queue and lets the dispatching thread send it. The thread which has the buck tries to send all non-blocking calls in the queue in a best effort way---if sending them would block or there's an error on the socket, non-blocking calls are simply removed from the queue and discarded. --- src/rpc/virnetclient.c | 149 ++++++++++++++++++++++++++++++++++++------------ 1 files changed, 113 insertions(+), 36 deletions(-) diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c index 055361d..7ea9a27 100644 --- a/src/rpc/virnetclient.c +++ b/src/rpc/virnetclient.c @@ -55,6 +55,7 @@ struct _virNetClientCall { virNetMessagePtr msg; bool expectReply; + bool dontBlock; virCond cond; @@ -94,6 +95,11 @@ struct _virNetClient { }; +static int virNetClientSendInternal(virNetClientPtr client, + virNetMessagePtr msg, + bool expectReply, + bool dontBlock); + static void virNetClientLock(virNetClientPtr client) { virMutexLock(&client->lock); @@ -848,6 +854,7 @@ static int virNetClientIOEventLoop(virNetClientPtr client, char ignore; sigset_t oldmask, blockedsigs; int timeout = -1; + bool discardNonBlocking; /* If we have existing SASL decoded data we * don't want to sleep in the poll(), just @@ -865,6 +872,11 @@ static int virNetClientIOEventLoop(virNetClientPtr client, fds[0].events |= POLLIN; if (tmp->mode == VIR_NET_CLIENT_MODE_WAIT_TX) fds[0].events |= POLLOUT; + /* We don't want to sleep in poll if any of the calls is + * non-blocking + */ + if (tmp->dontBlock) + timeout = 0; tmp = tmp->next; } @@ -937,35 +949,63 @@ static int virNetClientIOEventLoop(virNetClientPtr client, goto error; } - /* Iterate through waiting threads and if - * any are complete then tell 'em to wakeup + /* Iterate through waiting calls and + * - remove all completed nonblocking calls + * - remove all nonblocking calls in case poll() would block + * - remove all nonblocking calls if we got error from poll() + * - wake up threads waiting for calls that have been completed */ + discardNonBlocking = ret == 0 || + (fds[0].revents & POLLHUP) || + (fds[0].revents & POLLERR); tmp = client->waitDispatch; prev = NULL; while (tmp) { + virNetClientCallPtr next = tmp->next; + if (tmp != thiscall && - tmp->mode == VIR_NET_CLIENT_MODE_COMPLETE) { + (tmp->mode == VIR_NET_CLIENT_MODE_COMPLETE || + (discardNonBlocking && tmp->dontBlock))) { /* Take them out of the list */ if (prev) prev->next = tmp->next; else client->waitDispatch = tmp->next; - /* And wake them up.... - * ...they won't actually wakeup until - * we release our mutex a short while - * later... - */ - VIR_DEBUG("Waking up sleep %p %p", tmp, client->waitDispatch); - virCondSignal(&tmp->cond); + if (tmp->dontBlock) { + /* tmp is a non-blocking call, no-one is waiting for it so + * we just free it here + */ + if (tmp->mode != VIR_NET_CLIENT_MODE_COMPLETE) { + VIR_DEBUG("Can't finish nonblocking call %p without" + " blocking or error", tmp); + } + virNetMessageFree(tmp->msg); + VIR_FREE(tmp); + } else { + /* And wake them up.... + * ...they won't actually wakeup until + * we release our mutex a short while + * later... + */ + VIR_DEBUG("Waking up sleep %p %p", + tmp, client->waitDispatch); + virCondSignal(&tmp->cond); + } } else { prev = tmp; } - tmp = tmp->next; + tmp = next; } /* Now see if *we* are done */ if (thiscall->mode == VIR_NET_CLIENT_MODE_COMPLETE) { + /* If next call is non-blocking call, we need to process it + * before giving up the buck + */ + if (thiscall->next && thiscall->next->dontBlock) + continue; + /* We're at head of the list already, so * remove us */ @@ -980,14 +1020,18 @@ static int virNetClientIOEventLoop(virNetClientPtr client, return 0; } - if (fds[0].revents & (POLLHUP | POLLERR)) { virNetError(VIR_ERR_INTERNAL_ERROR, "%s", _("received hangup / error event on socket")); goto error; } - } + if (thiscall->dontBlock && discardNonBlocking) { + virNetError(VIR_ERR_INTERNAL_ERROR, "%s", + _("Can't finish nonblocking call without blocking")); + goto error; + } + } error: client->waitDispatch = thiscall->next; @@ -1040,38 +1084,41 @@ static int virNetClientIO(virNetClientPtr client, { int rv = -1; - VIR_DEBUG("Outgoing message prog=%u version=%u serial=%u proc=%d type=%d length=%zu dispatch=%p", + VIR_DEBUG("Outgoing message prog=%u version=%u serial=%u proc=%d type=%d" + " length=%zu dontBlock=%d dispatch=%p", thiscall->msg->header.prog, thiscall->msg->header.vers, thiscall->msg->header.serial, thiscall->msg->header.proc, thiscall->msg->header.type, thiscall->msg->bufferLength, + thiscall->dontBlock, client->waitDispatch); /* Check to see if another thread is dispatching */ if (client->waitDispatch) { - /* Stick ourselves on the end of the wait queue */ - virNetClientCallPtr tmp = client->waitDispatch; + virNetClientCallPtr tmp; char ignore = 1; - while (tmp && tmp->next) - tmp = tmp->next; - if (tmp) - tmp->next = thiscall; - else - client->waitDispatch = thiscall; /* Force other thread to wakeup from poll */ if (safewrite(client->wakeupSendFD, &ignore, sizeof(ignore)) != sizeof(ignore)) { - if (tmp) - tmp->next = NULL; - else - client->waitDispatch = NULL; virReportSystemError(errno, "%s", _("failed to wake up polling thread")); return -1; } + /* Stick ourselves on the end of the wait queue */ + tmp = client->waitDispatch; + while (tmp->next) + tmp = tmp->next; + tmp->next = thiscall; + + if (thiscall->dontBlock) { + VIR_DEBUG("Sending non-blocking call while another thread is" + " dispatching; it will send the call for us"); + return 0; + } + VIR_DEBUG("Going to sleep %p %p", client->waitDispatch, thiscall); /* Go to sleep while other thread is working... */ if (virCondWait(&thiscall->cond, &client->lock) < 0) { @@ -1091,7 +1138,7 @@ static int virNetClientIO(virNetClientPtr client, return -1; } - VIR_DEBUG("Wokeup from sleep %p %p", client->waitDispatch, thiscall); + VIR_DEBUG("Woken up from sleep %p %p", client->waitDispatch, thiscall); /* Two reasons we can be woken up * 1. Other thread has got our reply ready for us * 2. Other thread is all done, and it is our turn to @@ -1181,15 +1228,17 @@ done: } -int virNetClientSend(virNetClientPtr client, - virNetMessagePtr msg, - bool expectReply) +static int +virNetClientSendInternal(virNetClientPtr client, + virNetMessagePtr msg, + bool expectReply, + bool dontBlock) { virNetClientCallPtr call; int ret = -1; if (expectReply && - (msg->header.status == VIR_NET_CONTINUE)) { + (msg->header.status == VIR_NET_CONTINUE || dontBlock)) { virNetError(VIR_ERR_INTERNAL_ERROR, "%s", _("Attempt to send an asynchronous message with a synchronous reply")); return -1; @@ -1202,10 +1251,15 @@ int virNetClientSend(virNetClientPtr client, virNetClientLock(client); - if (virCondInit(&call->cond) < 0) { - virNetError(VIR_ERR_INTERNAL_ERROR, "%s", - _("cannot initialize condition variable")); - goto cleanup; + /* We don't need call->cond for non-blocking calls since there's no + * thread to be woken up anyway + */ + if (!dontBlock) { + if (virCondInit(&call->cond) < 0) { + virNetError(VIR_ERR_INTERNAL_ERROR, "%s", + _("cannot initialize condition variable")); + goto cleanup; + } } if (msg->bufferLength) @@ -1214,12 +1268,35 @@ int virNetClientSend(virNetClientPtr client, call->mode = VIR_NET_CLIENT_MODE_WAIT_RX; call->msg = msg; call->expectReply = expectReply; + call->dontBlock = dontBlock; ret = virNetClientIO(client, call); cleanup: ignore_value(virCondDestroy(&call->cond)); - VIR_FREE(call); + if (ret != 0) { + VIR_FREE(call); + } else if (dontBlock) { + /* Only free the call if it was completed since otherwise it was just + * queued up and will be processed later. + */ + if (call->mode == VIR_NET_CLIENT_MODE_COMPLETE) { + /* We need to free the message as well since no-one is waiting for + * it. + */ + virNetMessageFree(msg); + VIR_FREE(call); + } + } else { + VIR_FREE(call); + } virNetClientUnlock(client); return ret; } + +int virNetClientSend(virNetClientPtr client, + virNetMessagePtr msg, + bool expectReply) +{ + return virNetClientSendInternal(client, msg, expectReply, false); +} -- 1.7.6.1

--- src/rpc/virnetclient.c | 76 ++++++++++++++++++++++++++++++++++++++++++------ 1 files changed, 67 insertions(+), 9 deletions(-) diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c index 7ea9a27..45b0edb 100644 --- a/src/rpc/virnetclient.c +++ b/src/rpc/virnetclient.c @@ -92,9 +92,13 @@ struct _virNetClient { size_t nstreams; virNetClientStreamPtr *streams; + + bool wantClose; }; +void virNetClientRequestClose(virNetClientPtr client); + static int virNetClientSendInternal(virNetClientPtr client, virNetMessagePtr msg, bool expectReply, @@ -297,12 +301,14 @@ void virNetClientFree(virNetClientPtr client) } -void virNetClientClose(virNetClientPtr client) +static void +virNetClientCloseLocked(virNetClientPtr client) { - if (!client) + VIR_DEBUG("client=%p, sock=%p", client, client->sock); + + if (!client->sock) return; - virNetClientLock(client); virNetSocketRemoveIOCallback(client->sock); virNetSocketFree(client->sock); client->sock = NULL; @@ -312,6 +318,41 @@ void virNetClientClose(virNetClientPtr client) virNetSASLSessionFree(client->sasl); client->sasl = NULL; #endif + client->wantClose = false; +} + +void virNetClientClose(virNetClientPtr client) +{ + if (!client) + return; + + virNetClientLock(client); + virNetClientCloseLocked(client); + virNetClientUnlock(client); +} + +void +virNetClientRequestClose(virNetClientPtr client) +{ + VIR_DEBUG("client=%p", client); + + virNetClientLock(client); + + /* If there is a thread polling for data on the socket, set wantClose flag + * and wake the thread up or just immediately close the socket when no-one + * is polling on it. + */ + if (client->waitDispatch) { + char ignore = 1; + int len = sizeof(ignore); + + client->wantClose = true; + if (safewrite(client->wakeupSendFD, &ignore, len) != len) + VIR_ERROR(_("failed to wake up polling thread")); + } else { + virNetClientCloseLocked(client); + } + virNetClientUnlock(client); } @@ -856,11 +897,12 @@ static int virNetClientIOEventLoop(virNetClientPtr client, int timeout = -1; bool discardNonBlocking; - /* If we have existing SASL decoded data we - * don't want to sleep in the poll(), just - * check if any other FDs are also ready + /* If we have existing SASL decoded data we don't want to sleep in + * the poll(), just check if any other FDs are also ready. + * If the connection is going to be closed, we don't want to sleep in + * poll() either. */ - if (virNetSocketHasCachedData(client->sock)) + if (virNetSocketHasCachedData(client->sock) || client->wantClose) timeout = 0; fds[0].events = fds[0].revents = 0; @@ -922,6 +964,11 @@ static int virNetClientIOEventLoop(virNetClientPtr client, if (virNetSocketHasCachedData(client->sock)) fds[0].revents |= POLLIN; + /* If wantClose flag is set, pretend there was an error on the socket + */ + if (client->wantClose) + fds[0].revents = POLLERR; + if (fds[1].revents) { VIR_DEBUG("Woken up from poll by other thread"); if (saferead(client->wakeupReadFD, &ignore, sizeof(ignore)) != sizeof(ignore)) { @@ -1041,6 +1088,8 @@ error: if (client->waitDispatch) { VIR_DEBUG("Passing the buck to %p", client->waitDispatch); virCondSignal(&client->waitDispatch->cond); + } else if (client->wantClose) { + virNetClientCloseLocked(client); } return -1; } @@ -1182,7 +1231,8 @@ static int virNetClientIO(virNetClientPtr client, virResetLastError(); rv = virNetClientIOEventLoop(client, thiscall); - virNetSocketUpdateIOCallback(client->sock, VIR_EVENT_HANDLE_READABLE); + if (client->sock) + virNetSocketUpdateIOCallback(client->sock, VIR_EVENT_HANDLE_READABLE); if (rv == 0 && virGetLastError()) @@ -1206,7 +1256,7 @@ void virNetClientIncomingEvent(virNetSocketPtr sock, goto done; /* This should be impossible, but it doesn't hurt to check */ - if (client->waitDispatch) + if (client->waitDispatch || client->wantClose) goto done; VIR_DEBUG("Event fired %p %d", sock, events); @@ -1251,6 +1301,12 @@ virNetClientSendInternal(virNetClientPtr client, virNetClientLock(client); + if (!client->sock || client->wantClose) { + virNetError(VIR_ERR_INTERNAL_ERROR, "%s", + _("Client socket is closed")); + goto unlock; + } + /* We don't need call->cond for non-blocking calls since there's no * thread to be woken up anyway */ @@ -1290,6 +1346,8 @@ cleanup: } else { VIR_FREE(call); } + +unlock: virNetClientUnlock(client); return ret; } -- 1.7.6.1

--- src/remote/remote_driver.c | 30 +++++++++++++ src/rpc/virnetclient.c | 102 ++++++++++++++++++++++++++++++++++++++++++- src/rpc/virnetclient.h | 5 ++ 3 files changed, 134 insertions(+), 3 deletions(-) diff --git a/src/remote/remote_driver.c b/src/remote/remote_driver.c index 1217d94..e218c40 100644 --- a/src/remote/remote_driver.c +++ b/src/remote/remote_driver.c @@ -4082,6 +4082,34 @@ done: } +static int +remoteAllowKeepAlive(virConnectPtr conn) +{ + struct private_data *priv = conn->privateData; + int ret = -1; + + remoteDriverLock(priv); + ret = virNetClientKeepAliveAdvertise(priv->client); + remoteDriverUnlock(priv); + + return ret; +} + + +static int +remoteStartKeepAlive(virConnectPtr conn, int interval, unsigned int count) +{ + struct private_data *priv = conn->privateData; + int ret = -1; + + remoteDriverLock(priv); + ret = virNetClientKeepAliveStart(priv->client, interval, count); + remoteDriverUnlock(priv); + + return ret; +} + + #include "remote_client_bodies.h" #include "qemu_client_bodies.h" @@ -4430,6 +4458,8 @@ static virDriver remote_driver = { .domainGetBlockJobInfo = remoteDomainGetBlockJobInfo, /* 0.9.4 */ .domainBlockJobSetSpeed = remoteDomainBlockJobSetSpeed, /* 0.9.4 */ .domainBlockPull = remoteDomainBlockPull, /* 0.9.4 */ + .allowKeepAlive = remoteAllowKeepAlive, /* 0.9.7 */ + .startKeepAlive = remoteStartKeepAlive, /* 0.9.7 */ }; static virNetworkDriver network_driver = { diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c index 45b0edb..dee5059 100644 --- a/src/rpc/virnetclient.c +++ b/src/rpc/virnetclient.c @@ -29,6 +29,7 @@ #include "virnetclient.h" #include "virnetsocket.h" +#include "virkeepalive.h" #include "memory.h" #include "threads.h" #include "virfile.h" @@ -93,11 +94,12 @@ struct _virNetClient { size_t nstreams; virNetClientStreamPtr *streams; + virKeepAlivePtr keepalive; bool wantClose; }; -void virNetClientRequestClose(virNetClientPtr client); +static void virNetClientRequestClose(virNetClientPtr client); static int virNetClientSendInternal(virNetClientPtr client, virNetMessagePtr msg, @@ -127,11 +129,71 @@ static void virNetClientEventFree(void *opaque) virNetClientFree(client); } +int +virNetClientKeepAliveAdvertise(virNetClientPtr client) +{ + virKeepAlivePtr ka; + int ret = -1; + + virNetClientLock(client); + if ((ka = client->keepalive)) + virKeepAliveRef(ka); + virNetClientUnlock(client); + + if (ka) { + ret = virKeepAliveAdvertise(ka); + virKeepAliveFree(ka); + } else { + virNetError(VIR_ERR_INTERNAL_ERROR, "%s", + _("the caller doesn't support keepalive protocol;" + " perhaps it's missing event loop implementation")); + } + + return ret; +} + +int +virNetClientKeepAliveStart(virNetClientPtr client, + int interval, + unsigned int count) +{ + int ret = -1; + + virNetClientLock(client); + + if (!client->keepalive) { + virNetError(VIR_ERR_INTERNAL_ERROR, "%s", + _("the caller doesn't support keepalive protocol;" + " perhaps it's missing event loop implementation")); + goto cleanup; + } + + ret = virKeepAliveStart(client->keepalive, interval, count); + +cleanup: + virNetClientUnlock(client); + return ret; +} + +static void +virNetClientKeepAliveDeadCB(void *opaque) +{ + virNetClientRequestClose(opaque); +} + +static int +virNetClientKeepAliveSendCB(void *opaque, + virNetMessagePtr msg) +{ + return virNetClientSendInternal(opaque, msg, false, true); +} + static virNetClientPtr virNetClientNew(virNetSocketPtr sock, const char *hostname) { virNetClientPtr client = NULL; int wakeupFD[2] = { -1, -1 }; + virKeepAlivePtr ka = NULL; if (pipe2(wakeupFD, O_CLOEXEC) < 0) { virReportSystemError(errno, "%s", @@ -164,9 +226,21 @@ static virNetClientPtr virNetClientNew(virNetSocketPtr sock, client, virNetClientEventFree) < 0) { client->refs--; - VIR_DEBUG("Failed to add event watch, disabling events"); + VIR_DEBUG("Failed to add event watch, disabling events and support for" + " keepalive messages"); + } else { + /* Keepalive protocol consists of async messages so it can only be used + * if the client supports them */ + if (!(ka = virKeepAliveNew(-1, 0, client, + virNetClientKeepAliveSendCB, + virNetClientKeepAliveDeadCB, + virNetClientEventFree))) + goto error; + /* keepalive object has a reference to client */ + client->refs++; } + client->keepalive = ka; VIR_DEBUG("client=%p refs=%d", client, client->refs); return client; @@ -175,6 +249,10 @@ no_memory: error: VIR_FORCE_CLOSE(wakeupFD[0]); VIR_FORCE_CLOSE(wakeupFD[1]); + if (ka) { + virKeepAliveStop(ka); + virKeepAliveFree(ka); + } virNetClientFree(client); return NULL; } @@ -304,6 +382,8 @@ void virNetClientFree(virNetClientPtr client) static void virNetClientCloseLocked(virNetClientPtr client) { + virKeepAlivePtr ka; + VIR_DEBUG("client=%p, sock=%p", client, client->sock); if (!client->sock) @@ -318,7 +398,20 @@ virNetClientCloseLocked(virNetClientPtr client) virNetSASLSessionFree(client->sasl); client->sasl = NULL; #endif + ka = client->keepalive; + client->keepalive = NULL; client->wantClose = false; + + if (ka) { + client->refs++; + virNetClientUnlock(client); + + virKeepAliveStop(ka); + virKeepAliveFree(ka); + + virNetClientLock(client); + client->refs--; + } } void virNetClientClose(virNetClientPtr client) @@ -331,7 +424,7 @@ void virNetClientClose(virNetClientPtr client) virNetClientUnlock(client); } -void +static void virNetClientRequestClose(virNetClientPtr client) { VIR_DEBUG("client=%p", client); @@ -723,6 +816,9 @@ virNetClientCallDispatch(virNetClientPtr client) client->msg.header.proc, client->msg.header.type, client->msg.header.status, client->msg.header.serial); + if (virKeepAliveCheckMessage(client->keepalive, &client->msg)) + return 0; + switch (client->msg.header.type) { case VIR_NET_REPLY: /* Normal RPC replies */ return virNetClientCallDispatchReply(client); diff --git a/src/rpc/virnetclient.h b/src/rpc/virnetclient.h index 1fabcfd..e337ee0 100644 --- a/src/rpc/virnetclient.h +++ b/src/rpc/virnetclient.h @@ -87,4 +87,9 @@ int virNetClientGetTLSKeySize(virNetClientPtr client); void virNetClientFree(virNetClientPtr client); void virNetClientClose(virNetClientPtr client); +int virNetClientKeepAliveAdvertise(virNetClientPtr client); +int virNetClientKeepAliveStart(virNetClientPtr client, + int interval, + unsigned int count); + #endif /* __VIR_NET_CLIENT_H__ */ -- 1.7.6.1

This API can be used to check if the socket associated with virConnectPtr is still open or it was closed (probably because keepalive protocol timed out). If there the connection is local (i.e., no socket is associated with the connection, it is trivially always alive. --- include/libvirt/libvirt.h.in | 1 + src/driver.h | 3 +++ src/libvirt.c | 36 ++++++++++++++++++++++++++++++++++++ src/libvirt_public.syms | 1 + 4 files changed, 41 insertions(+), 0 deletions(-) diff --git a/include/libvirt/libvirt.h.in b/include/libvirt/libvirt.h.in index 6f61cc0..3767582 100644 --- a/include/libvirt/libvirt.h.in +++ b/include/libvirt/libvirt.h.in @@ -2546,6 +2546,7 @@ int virInterfaceIsActive(virInterfacePtr iface); int virConnectIsEncrypted(virConnectPtr conn); int virConnectIsSecure(virConnectPtr conn); +int virConnectIsAlive(virConnectPtr conn); /* * CPU specification API diff --git a/src/driver.h b/src/driver.h index cd17d83..8c01690 100644 --- a/src/driver.h +++ b/src/driver.h @@ -506,6 +506,8 @@ typedef int typedef int (*virDrvConnectIsSecure)(virConnectPtr conn); typedef int + (*virDrvConnectIsAlive)(virConnectPtr conn); +typedef int (*virDrvDomainIsActive)(virDomainPtr dom); typedef int (*virDrvDomainIsPersistent)(virDomainPtr dom); @@ -881,6 +883,7 @@ struct _virDriver { virDrvDomainBlockPull domainBlockPull; virDrvAllowKeepAlive allowKeepAlive; virDrvStartKeepAlive startKeepAlive; + virDrvConnectIsAlive isAlive; }; typedef int diff --git a/src/libvirt.c b/src/libvirt.c index 138f367..19ac4b5 100644 --- a/src/libvirt.c +++ b/src/libvirt.c @@ -16697,3 +16697,39 @@ error: virDispatchError(conn); return -1; } + +/** + * virConnectIsAlive: + * @conn: pointer to the connection object + * + * Determine if the connection to the hypervisor is still alive + * + * A connection will be classed as alive if it is either local, or running + * over a channel (TCP or UNIX socket) which is not closed. + * + * Returns 1 if alive, 0 if dead, -1 on error + */ +int virConnectIsAlive(virConnectPtr conn) +{ + VIR_DEBUG("conn=%p", conn); + + virResetLastError(); + + if (!VIR_IS_CONNECT(conn)) { + virLibConnError(VIR_ERR_INVALID_CONN, __FUNCTION__); + virDispatchError(NULL); + return -1; + } + if (conn->driver->isAlive) { + int ret; + ret = conn->driver->isAlive(conn); + if (ret < 0) + goto error; + return ret; + } + + virLibConnError(VIR_ERR_NO_SUPPORT, __FUNCTION__); +error: + virDispatchError(conn); + return -1; +} diff --git a/src/libvirt_public.syms b/src/libvirt_public.syms index f7441d7..e60f66d 100644 --- a/src/libvirt_public.syms +++ b/src/libvirt_public.syms @@ -492,6 +492,7 @@ LIBVIRT_0.9.5 { LIBVIRT_0.9.7 { global: virConnectAllowKeepAlive; + virConnectIsAlive; virConnectStartKeepAlive; } LIBVIRT_0.9.5; -- 1.7.6.1

--- src/esx/esx_driver.c | 18 ++++++++++++++++++ src/hyperv/hyperv_driver.c | 18 ++++++++++++++++++ src/libxl/libxl_driver.c | 8 ++++++++ src/lxc/lxc_driver.c | 7 +++++++ src/openvz/openvz_driver.c | 7 +++++++ src/phyp/phyp_driver.c | 18 ++++++++++++++++++ src/qemu/qemu_driver.c | 6 ++++++ src/remote/remote_driver.c | 18 ++++++++++++++++++ src/rpc/virnetclient.c | 14 ++++++++++++++ src/rpc/virnetclient.h | 1 + src/test/test_driver.c | 6 ++++++ src/uml/uml_driver.c | 7 +++++++ src/vbox/vbox_tmpl.c | 6 ++++++ src/vmware/vmware_driver.c | 7 +++++++ src/xen/xen_driver.c | 8 ++++++++ src/xenapi/xenapi_driver.c | 12 ++++++++++++ 16 files changed, 161 insertions(+), 0 deletions(-) diff --git a/src/esx/esx_driver.c b/src/esx/esx_driver.c index f1102ea..c71f4f8 100644 --- a/src/esx/esx_driver.c +++ b/src/esx/esx_driver.c @@ -4145,6 +4145,23 @@ esxIsSecure(virConnectPtr conn) static int +esxIsAlive(virConnectPtr conn) +{ + esxPrivate *priv = conn->privateData; + + /* XXX we should be able to do something better than this is simple, safe, + * and good enough for now. In worst case, the function will return true + * even though the connection is not alive. + */ + if (priv->host) + return 1; + else + return 0; +} + + + +static int esxDomainIsActive(virDomainPtr domain) { int result = -1; @@ -4813,6 +4830,7 @@ static virDriver esxDriver = { .domainSnapshotCurrent = esxDomainSnapshotCurrent, /* 0.8.0 */ .domainRevertToSnapshot = esxDomainRevertToSnapshot, /* 0.8.0 */ .domainSnapshotDelete = esxDomainSnapshotDelete, /* 0.8.0 */ + .isAlive = esxIsAlive, /* 0.9.7 */ }; diff --git a/src/hyperv/hyperv_driver.c b/src/hyperv/hyperv_driver.c index b022fee..bf8575c 100644 --- a/src/hyperv/hyperv_driver.c +++ b/src/hyperv/hyperv_driver.c @@ -1100,6 +1100,23 @@ hypervIsSecure(virConnectPtr conn) static int +hypervIsAlive(virConnectPtr conn) +{ + hypervPrivate *priv = conn->privateData; + + /* XXX we should be able to do something better than this is simple, safe, + * and good enough for now. In worst case, the function will return true + * even though the connection is not alive. + */ + if (priv->client) + return 1; + else + return 0; +} + + + +static int hypervDomainIsActive(virDomainPtr domain) { int result = -1; @@ -1257,6 +1274,7 @@ static virDriver hypervDriver = { .domainManagedSave = hypervDomainManagedSave, /* 0.9.5 */ .domainHasManagedSaveImage = hypervDomainHasManagedSaveImage, /* 0.9.5 */ .domainManagedSaveRemove = hypervDomainManagedSaveRemove, /* 0.9.5 */ + .isAlive = hypervIsAlive, /* 0.9.7 */ }; diff --git a/src/libxl/libxl_driver.c b/src/libxl/libxl_driver.c index d324632..42503d6 100644 --- a/src/libxl/libxl_driver.c +++ b/src/libxl/libxl_driver.c @@ -3875,6 +3875,13 @@ libxlDomainEventDeregisterAny(virConnectPtr conn, int callbackID) } +static int +libxlIsAlive(virConnectPtr conn ATTRIBUTE_UNUSED) +{ + return 1; +} + + static virDriver libxlDriver = { .no = VIR_DRV_LIBXL, .name = "xenlight", @@ -3948,6 +3955,7 @@ static virDriver libxlDriver = { .domainIsUpdated = libxlDomainIsUpdated, /* 0.9.0 */ .domainEventRegisterAny = libxlDomainEventRegisterAny, /* 0.9.0 */ .domainEventDeregisterAny = libxlDomainEventDeregisterAny, /* 0.9.0 */ + .isAlive = libxlIsAlive, /* 0.9.7 */ }; static virStateDriver libxlStateDriver = { diff --git a/src/lxc/lxc_driver.c b/src/lxc/lxc_driver.c index 6cf7203..28a7f9c 100644 --- a/src/lxc/lxc_driver.c +++ b/src/lxc/lxc_driver.c @@ -185,6 +185,12 @@ static int lxcIsEncrypted(virConnectPtr conn ATTRIBUTE_UNUSED) } +static int lxcIsAlive(virConnectPtr conn ATTRIBUTE_UNUSED) +{ + return 1; +} + + static char *lxcGetCapabilities(virConnectPtr conn) { lxc_driver_t *driver = conn->privateData; char *xml; @@ -2986,6 +2992,7 @@ static virDriver lxcDriver = { .domainEventRegisterAny = lxcDomainEventRegisterAny, /* 0.8.0 */ .domainEventDeregisterAny = lxcDomainEventDeregisterAny, /* 0.8.0 */ .domainOpenConsole = lxcDomainOpenConsole, /* 0.8.6 */ + .isAlive = lxcIsAlive, /* 0.9.7 */ }; static virStateDriver lxcStateDriver = { diff --git a/src/openvz/openvz_driver.c b/src/openvz/openvz_driver.c index 69ff444..8a4e6cf 100644 --- a/src/openvz/openvz_driver.c +++ b/src/openvz/openvz_driver.c @@ -1426,6 +1426,12 @@ static int openvzIsSecure(virConnectPtr conn ATTRIBUTE_UNUSED) { return 1; } +static int +openvzIsAlive(virConnectPtr conn ATTRIBUTE_UNUSED) +{ + return 1; +} + static char *openvzGetCapabilities(virConnectPtr conn) { struct openvz_driver *driver = conn->privateData; char *ret; @@ -1714,6 +1720,7 @@ static virDriver openvzDriver = { .domainIsActive = openvzDomainIsActive, /* 0.7.3 */ .domainIsPersistent = openvzDomainIsPersistent, /* 0.7.3 */ .domainIsUpdated = openvzDomainIsUpdated, /* 0.8.6 */ + .isAlive = openvzIsAlive, /* 0.9.7 */ }; int openvzRegister(void) { diff --git a/src/phyp/phyp_driver.c b/src/phyp/phyp_driver.c index ff16aae..458c9c9 100644 --- a/src/phyp/phyp_driver.c +++ b/src/phyp/phyp_driver.c @@ -1284,6 +1284,23 @@ phypIsSecure(virConnectPtr conn ATTRIBUTE_UNUSED) return 1; } + +static int +phypIsAlive(virConnectPtr conn) +{ + ConnectionData *connection_data = conn->networkPrivateData; + + /* XXX we should be able to do something better but this is simple, safe, + * and good enough for now. In worst case, the function will return true + * even though the connection is not alive. + */ + if (connection_data && connection_data->session) + return 1; + else + return 0; +} + + static int phypIsUpdated(virDomainPtr conn ATTRIBUTE_UNUSED) { @@ -3786,6 +3803,7 @@ static virDriver phypDriver = { .isEncrypted = phypIsEncrypted, /* 0.7.3 */ .isSecure = phypIsSecure, /* 0.7.3 */ .domainIsUpdated = phypIsUpdated, /* 0.8.6 */ + .isAlive = phypIsAlive, /* 0.9.7 */ }; static virStorageDriver phypStorageDriver = { diff --git a/src/qemu/qemu_driver.c b/src/qemu/qemu_driver.c index 0d0bea2..c5fd9eb 100644 --- a/src/qemu/qemu_driver.c +++ b/src/qemu/qemu_driver.c @@ -938,6 +938,11 @@ static int qemuIsEncrypted(virConnectPtr conn ATTRIBUTE_UNUSED) return 0; } +static int qemuIsAlive(virConnectPtr conn ATTRIBUTE_UNUSED) +{ + return 1; +} + static int kvmGetMaxVCPUs(void) { int maxvcpus = 1; @@ -10450,6 +10455,7 @@ static virDriver qemuDriver = { .domainGetBlockJobInfo = qemuDomainGetBlockJobInfo, /* 0.9.4 */ .domainBlockJobSetSpeed = qemuDomainBlockJobSetSpeed, /* 0.9.4 */ .domainBlockPull = qemuDomainBlockPull, /* 0.9.4 */ + .isAlive = qemuIsAlive, /* 0.9.7 */ }; diff --git a/src/remote/remote_driver.c b/src/remote/remote_driver.c index e218c40..9bae3fc 100644 --- a/src/remote/remote_driver.c +++ b/src/remote/remote_driver.c @@ -4110,6 +4110,23 @@ remoteStartKeepAlive(virConnectPtr conn, int interval, unsigned int count) } +static int +remoteIsAlive(virConnectPtr conn) +{ + struct private_data *priv = conn->privateData; + bool ret; + + remoteDriverLock(priv); + ret = virNetClientIsOpen(priv->client); + remoteDriverUnlock(priv); + + if (ret) + return 1; + else + return 0; +} + + #include "remote_client_bodies.h" #include "qemu_client_bodies.h" @@ -4460,6 +4477,7 @@ static virDriver remote_driver = { .domainBlockPull = remoteDomainBlockPull, /* 0.9.4 */ .allowKeepAlive = remoteAllowKeepAlive, /* 0.9.7 */ .startKeepAlive = remoteStartKeepAlive, /* 0.9.7 */ + .isAlive = remoteIsAlive, /* 0.9.7 */ }; static virNetworkDriver network_driver = { diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c index dee5059..9ccdf08 100644 --- a/src/rpc/virnetclient.c +++ b/src/rpc/virnetclient.c @@ -581,6 +581,20 @@ bool virNetClientIsEncrypted(virNetClientPtr client) } +bool virNetClientIsOpen(virNetClientPtr client) +{ + bool ret; + + if (!client) + return false; + + virNetClientLock(client); + ret = client->sock && !client->wantClose; + virNetClientUnlock(client); + return ret; +} + + int virNetClientAddProgram(virNetClientPtr client, virNetClientProgramPtr prog) { diff --git a/src/rpc/virnetclient.h b/src/rpc/virnetclient.h index e337ee0..5787a7c 100644 --- a/src/rpc/virnetclient.h +++ b/src/rpc/virnetclient.h @@ -78,6 +78,7 @@ int virNetClientSetTLSSession(virNetClientPtr client, virNetTLSContextPtr tls); bool virNetClientIsEncrypted(virNetClientPtr client); +bool virNetClientIsOpen(virNetClientPtr client); const char *virNetClientLocalAddrString(virNetClientPtr client); const char *virNetClientRemoteAddrString(virNetClientPtr client); diff --git a/src/test/test_driver.c b/src/test/test_driver.c index b3e24b4..7f1f8b6 100644 --- a/src/test/test_driver.c +++ b/src/test/test_driver.c @@ -1184,6 +1184,11 @@ static int testIsEncrypted(virConnectPtr conn ATTRIBUTE_UNUSED) return 0; } +static int testIsAlive(virConnectPtr conn ATTRIBUTE_UNUSED) +{ + return 1; +} + static int testGetMaxVCPUs(virConnectPtr conn ATTRIBUTE_UNUSED, const char *type ATTRIBUTE_UNUSED) { @@ -5624,6 +5629,7 @@ static virDriver testDriver = { .domainIsUpdated = testDomainIsUpdated, /* 0.8.6 */ .domainEventRegisterAny = testDomainEventRegisterAny, /* 0.8.0 */ .domainEventDeregisterAny = testDomainEventDeregisterAny, /* 0.8.0 */ + .isAlive = testIsAlive, /* 0.9.7 */ }; static virNetworkDriver testNetworkDriver = { diff --git a/src/uml/uml_driver.c b/src/uml/uml_driver.c index 2b7219a..2ad3084 100644 --- a/src/uml/uml_driver.c +++ b/src/uml/uml_driver.c @@ -1098,6 +1098,12 @@ static int umlIsEncrypted(virConnectPtr conn ATTRIBUTE_UNUSED) } +static int umlIsAlive(virConnectPtr conn ATTRIBUTE_UNUSED) +{ + return 1; +} + + static char *umlGetCapabilities(virConnectPtr conn) { struct uml_driver *driver = (struct uml_driver *)conn->privateData; char *xml; @@ -2462,6 +2468,7 @@ static virDriver umlDriver = { .domainEventRegisterAny = umlDomainEventRegisterAny, /* 0.9.4 */ .domainEventDeregisterAny = umlDomainEventDeregisterAny, /* 0.9.4 */ .domainOpenConsole = umlDomainOpenConsole, /* 0.8.6 */ + .isAlive = umlIsAlive, /* 0.9.7 */ }; static int diff --git a/src/vbox/vbox_tmpl.c b/src/vbox/vbox_tmpl.c index 0e4364f..93ab930 100644 --- a/src/vbox/vbox_tmpl.c +++ b/src/vbox/vbox_tmpl.c @@ -1070,6 +1070,11 @@ static int vboxIsEncrypted(virConnectPtr conn ATTRIBUTE_UNUSED) { return 0; } +static int vboxIsAlive(virConnectPtr conn ATTRIBUTE_UNUSED) +{ + return 1; +} + static int vboxGetMaxVcpus(virConnectPtr conn, const char *type ATTRIBUTE_UNUSED) { VBOX_OBJECT_CHECK(conn, int, -1); PRUint32 maxCPUCount = 0; @@ -8875,6 +8880,7 @@ virDriver NAME(Driver) = { .domainSnapshotCurrent = vboxDomainSnapshotCurrent, /* 0.8.0 */ .domainRevertToSnapshot = vboxDomainRevertToSnapshot, /* 0.8.0 */ .domainSnapshotDelete = vboxDomainSnapshotDelete, /* 0.8.0 */ + .isAlive = vboxIsAlive, /* 0.9.7 */ }; virNetworkDriver NAME(NetworkDriver) = { diff --git a/src/vmware/vmware_driver.c b/src/vmware/vmware_driver.c index b2cfdce..987a7a8 100644 --- a/src/vmware/vmware_driver.c +++ b/src/vmware/vmware_driver.c @@ -958,6 +958,12 @@ vmwareDomainGetState(virDomainPtr dom, return ret; } +static int +vmwareIsAlive(virConnectPtr conn ATTRIBUTE_UNUSED) +{ + return 1; +} + static virDriver vmwareDriver = { .no = VIR_DRV_VMWARE, .name = "VMWARE", @@ -990,6 +996,7 @@ static virDriver vmwareDriver = { .domainUndefineFlags = vmwareDomainUndefineFlags, /* 0.9.4 */ .domainIsActive = vmwareDomainIsActive, /* 0.8.7 */ .domainIsPersistent = vmwareDomainIsPersistent, /* 0.8.7 */ + .isAlive = vmwareIsAlive, /* 0.9.7 */ }; int diff --git a/src/xen/xen_driver.c b/src/xen/xen_driver.c index 9c96fca..52f8601 100644 --- a/src/xen/xen_driver.c +++ b/src/xen/xen_driver.c @@ -516,6 +516,13 @@ xenUnifiedIsSecure(virConnectPtr conn) return ret; } +static int +xenUnifiedIsAlive(virConnectPtr conn ATTRIBUTE_UNUSED) +{ + /* XenD reconnects for each request */ + return 1; +} + int xenUnifiedGetMaxVcpus (virConnectPtr conn, const char *type) { @@ -2259,6 +2266,7 @@ static virDriver xenUnifiedDriver = { .domainEventRegisterAny = xenUnifiedDomainEventRegisterAny, /* 0.8.0 */ .domainEventDeregisterAny = xenUnifiedDomainEventDeregisterAny, /* 0.8.0 */ .domainOpenConsole = xenUnifiedDomainOpenConsole, /* 0.8.6 */ + .isAlive = xenUnifiedIsAlive, /* 0.9.7 */ }; /** diff --git a/src/xenapi/xenapi_driver.c b/src/xenapi/xenapi_driver.c index 80a706a..a5ab98a 100644 --- a/src/xenapi/xenapi_driver.c +++ b/src/xenapi/xenapi_driver.c @@ -1887,6 +1887,17 @@ xenapiNodeGetCellsFreeMemory (virConnectPtr conn, unsigned long long *freeMems, } } +static int +xenapiIsAlive(virConnectPtr conn) +{ + struct _xenapiPrivate *priv = conn->privateData; + + if (priv->session && priv->session->ok) + return 1; + else + return 0; +} + /* The interface which we export upwards to libvirt.c. */ static virDriver xenapiDriver = { .no = VIR_DRV_XENAPI, @@ -1937,6 +1948,7 @@ static virDriver xenapiDriver = { .nodeGetCellsFreeMemory = xenapiNodeGetCellsFreeMemory, /* 0.8.0 */ .nodeGetFreeMemory = xenapiNodeGetFreeMemory, /* 0.8.0 */ .domainIsUpdated = xenapiDomainIsUpdated, /* 0.8.6 */ + .isAlive = xenapiIsAlive, /* 0.9.7 */ }; /** -- 1.7.6.1

--- Notes: Version 2: - automatically exit when a connection is closed because of keepalive timeout examples/domain-events/events-c/event-test.c | 13 ++++++++++++- examples/domain-events/events-python/event-test.py | 5 ++++- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/examples/domain-events/events-c/event-test.c b/examples/domain-events/events-c/event-test.c index 6a3ed26..e310eb4 100644 --- a/examples/domain-events/events-c/event-test.c +++ b/examples/domain-events/events-c/event-test.c @@ -390,7 +390,18 @@ int main(int argc, char **argv) (callback5ret != -1) && (callback6ret != -1) && (callback7ret != -1)) { - while (run) { + int rc; + + if ((rc = virConnectAllowKeepAlive(dconn)) == 0) + rc = virConnectStartKeepAlive(dconn, 5, 3); + if (rc < 0) { + virErrorPtr err = virGetLastError(); + fprintf(stderr, "Failed to start keepalive protocol: %s\n", + err && err->message ? err->message : "Unknown error"); + run = 0; + } + + while (run && virConnectIsAlive(dconn) == 1) { if (virEventRunDefaultImpl() < 0) { virErrorPtr err = virGetLastError(); fprintf(stderr, "Failed to run event loop: %s\n", diff --git a/examples/domain-events/events-python/event-test.py b/examples/domain-events/events-python/event-test.py index 76fda2b..bc854b8 100644 --- a/examples/domain-events/events-python/event-test.py +++ b/examples/domain-events/events-python/event-test.py @@ -518,11 +518,14 @@ def main(): vc.domainEventRegisterAny(None, libvirt.VIR_DOMAIN_EVENT_ID_WATCHDOG, myDomainEventWatchdogCallback, None) vc.domainEventRegisterAny(None, libvirt.VIR_DOMAIN_EVENT_ID_GRAPHICS, myDomainEventGraphicsCallback, None) + vc.allowKeepAlive() + vc.startKeepAlive(5, 3) + # The rest of your app would go here normally, but for sake # of demo we'll just go to sleep. The other option is to # run the event loop in your main thread if your app is # totally event based. - while 1: + while vc.isAlive() == 1: time.sleep(1) -- 1.7.6.1

--- src/qemu/libvirtd_qemu.aug | 2 ++ src/qemu/qemu.conf | 16 ++++++++++++++++ src/qemu/qemu_conf.c | 11 +++++++++++ src/qemu/qemu_conf.h | 3 +++ src/qemu/qemu_migration.c | 10 ++++++++++ src/qemu/test_libvirtd_qemu.aug | 6 ++++++ 6 files changed, 48 insertions(+), 0 deletions(-) diff --git a/src/qemu/libvirtd_qemu.aug b/src/qemu/libvirtd_qemu.aug index 6c145c7..ad34e42 100644 --- a/src/qemu/libvirtd_qemu.aug +++ b/src/qemu/libvirtd_qemu.aug @@ -52,6 +52,8 @@ module Libvirtd_qemu = | int_entry "max_processes" | str_entry "lock_manager" | int_entry "max_queued" + | int_entry "keepalive_interval" + | int_entry "keepalive_count" (* Each enty in the config is one of the following three ... *) let entry = vnc_entry diff --git a/src/qemu/qemu.conf b/src/qemu/qemu.conf index 4da5d5a..e623bc3 100644 --- a/src/qemu/qemu.conf +++ b/src/qemu/qemu.conf @@ -316,3 +316,19 @@ # Note, that job lock is per domain. # # max_queued = 0 + +################################################################### +# Keepalive protocol: +# This allows qemu driver to detect broken connections to remote +# libvirtd during peer-to-peer migration. A keepalive message is +# sent to the deamon after keepalive_interval seconds of inactivity +# to check if the deamon is still responding; keepalive_count is a +# maximum number of keepalive messages that are allowed to be sent +# to the deamon without getting any response before the connection +# is considered broken. In other words, the connection is +# automatically closed approximately after +# keepalive_interval * (keepalive_count + 1) seconds since the last +# message received from the deamon. +# +#keepalive_interval = 5 +#keepalive_count = 5 diff --git a/src/qemu/qemu_conf.c b/src/qemu/qemu_conf.c index d1bf075..19f24b1 100644 --- a/src/qemu/qemu_conf.c +++ b/src/qemu/qemu_conf.c @@ -118,6 +118,9 @@ int qemudLoadDriverConfig(struct qemud_driver *driver, virLockManagerPluginNew("nop", NULL, 0))) return -1; + driver->keepAliveInterval = 5; + driver->keepAliveCount = 5; + /* Just check the file is readable before opening it, otherwise * libvirt emits an error. */ @@ -462,6 +465,14 @@ int qemudLoadDriverConfig(struct qemud_driver *driver, CHECK_TYPE("max_queued", VIR_CONF_LONG); if (p) driver->max_queued = p->l; + p = virConfGetValue(conf, "keepalive_interval"); + CHECK_TYPE("keepalive_interval", VIR_CONF_LONG); + if (p) driver->keepAliveInterval = p->l; + + p = virConfGetValue(conf, "keepalive_count"); + CHECK_TYPE("keepalive_count", VIR_CONF_LONG); + if (p) driver->keepAliveCount = p->l; + virConfFree (conf); return 0; } diff --git a/src/qemu/qemu_conf.h b/src/qemu/qemu_conf.h index e8b92a4..70c3fda 100644 --- a/src/qemu/qemu_conf.h +++ b/src/qemu/qemu_conf.h @@ -138,6 +138,9 @@ struct qemud_driver { * of guests which will be automatically killed * when the virConnectPtr is closed*/ virHashTablePtr autodestroy; + + int keepAliveInterval; + unsigned int keepAliveCount; }; typedef struct _qemuDomainCmdlineDef qemuDomainCmdlineDef; diff --git a/src/qemu/qemu_migration.c b/src/qemu/qemu_migration.c index 0a5a13d..301d8ba 100644 --- a/src/qemu/qemu_migration.c +++ b/src/qemu/qemu_migration.c @@ -2173,6 +2173,7 @@ static int doPeer2PeerMigrate(struct qemud_driver *driver, virConnectPtr dconn = NULL; bool p2p; virErrorPtr orig_err = NULL; + int rc; VIR_DEBUG("driver=%p, sconn=%p, vm=%p, xmlin=%s, dconnuri=%s, " "uri=%s, flags=%lx, dname=%s, resource=%lu", @@ -2193,6 +2194,15 @@ static int doPeer2PeerMigrate(struct qemud_driver *driver, } qemuDomainObjEnterRemoteWithDriver(driver, vm); + if ((rc = virConnectAllowKeepAlive(dconn)) == 0) { + rc = virConnectStartKeepAlive(dconn, driver->keepAliveInterval, + driver->keepAliveCount); + } + qemuDomainObjExitRemoteWithDriver(driver, vm); + if (rc < 0) + goto cleanup; + + qemuDomainObjEnterRemoteWithDriver(driver, vm); p2p = VIR_DRV_SUPPORTS_FEATURE(dconn->driver, dconn, VIR_DRV_FEATURE_MIGRATION_P2P); /* v3proto reflects whether the caller used Perform3, but with diff --git a/src/qemu/test_libvirtd_qemu.aug b/src/qemu/test_libvirtd_qemu.aug index b1f9114..f7476ae 100644 --- a/src/qemu/test_libvirtd_qemu.aug +++ b/src/qemu/test_libvirtd_qemu.aug @@ -115,6 +115,9 @@ vnc_auto_unix_socket = 1 max_processes = 12345 lock_manager = \"fcntl\" + +keepalive_interval = 1 +keepalive_count = 42 " test Libvirtd_qemu.lns get conf = @@ -240,3 +243,6 @@ lock_manager = \"fcntl\" { "max_processes" = "12345" } { "#empty" } { "lock_manager" = "fcntl" } +{ "#empty" } +{ "keepalive_interval" = "1" } +{ "keepalive_count" = "42" } -- 1.7.6.1

If a connection to destination host is lost during peer-to-peer migration (because keepalive protocol timed out), we won't be able to finish the migration and it doesn't make sense to wait for qemu to transmit all data. This patch automatically cancels such migration without waiting for virDomainAbortJob to be called. --- src/qemu/qemu_migration.c | 39 +++++++++++++++++++++++++-------------- 1 files changed, 25 insertions(+), 14 deletions(-) diff --git a/src/qemu/qemu_migration.c b/src/qemu/qemu_migration.c index 301d8ba..fb3b411 100644 --- a/src/qemu/qemu_migration.c +++ b/src/qemu/qemu_migration.c @@ -901,7 +901,8 @@ qemuMigrationUpdateJobStatus(struct qemud_driver *driver, static int qemuMigrationWaitForCompletion(struct qemud_driver *driver, virDomainObjPtr vm, - enum qemuDomainAsyncJob asyncJob) + enum qemuDomainAsyncJob asyncJob, + virConnectPtr dconn) { qemuDomainObjPrivatePtr priv = vm->privateData; const char *job; @@ -929,6 +930,12 @@ qemuMigrationWaitForCompletion(struct qemud_driver *driver, virDomainObjPtr vm, if (qemuMigrationUpdateJobStatus(driver, vm, job, asyncJob) < 0) goto cleanup; + if (dconn && virConnectIsAlive(dconn) <= 0) { + qemuReportError(VIR_ERR_OPERATION_FAILED, "%s", + _("Lost connection to destination host")); + goto cleanup; + } + virDomainObjUnlock(vm); qemuDriverUnlock(driver); @@ -1491,7 +1498,8 @@ qemuMigrationRun(struct qemud_driver *driver, int *cookieoutlen, unsigned long flags, unsigned long resource, - qemuMigrationSpecPtr spec) + qemuMigrationSpecPtr spec, + virConnectPtr dconn) { int ret = -1; unsigned int migrate_flags = QEMU_MONITOR_MIGRATE_BACKGROUND; @@ -1610,7 +1618,8 @@ qemuMigrationRun(struct qemud_driver *driver, goto cancel; if (qemuMigrationWaitForCompletion(driver, vm, - QEMU_ASYNC_JOB_MIGRATION_OUT) < 0) + QEMU_ASYNC_JOB_MIGRATION_OUT, + dconn) < 0) goto cleanup; /* When migration completed, QEMU will have paused the @@ -1667,7 +1676,8 @@ static int doNativeMigrate(struct qemud_driver *driver, char **cookieout, int *cookieoutlen, unsigned long flags, - unsigned long resource) + unsigned long resource, + virConnectPtr dconn) { qemuDomainObjPrivatePtr priv = vm->privateData; xmlURIPtr uribits = NULL; @@ -1725,7 +1735,7 @@ static int doNativeMigrate(struct qemud_driver *driver, } ret = qemuMigrationRun(driver, vm, cookiein, cookieinlen, cookieout, - cookieoutlen, flags, resource, &spec); + cookieoutlen, flags, resource, &spec, dconn); cleanup: if (spec.destType == MIGRATION_DEST_FD) @@ -1746,7 +1756,8 @@ static int doTunnelMigrate(struct qemud_driver *driver, char **cookieout, int *cookieoutlen, unsigned long flags, - unsigned long resource) + unsigned long resource, + virConnectPtr dconn) { qemuDomainObjPrivatePtr priv = vm->privateData; virNetSocketPtr sock = NULL; @@ -1809,7 +1820,7 @@ static int doTunnelMigrate(struct qemud_driver *driver, } ret = qemuMigrationRun(driver, vm, cookiein, cookieinlen, cookieout, - cookieoutlen, flags, resource, &spec); + cookieoutlen, flags, resource, &spec, dconn); cleanup: if (spec.destType == MIGRATION_DEST_FD) { @@ -1912,12 +1923,12 @@ static int doPeer2PeerMigrate2(struct qemud_driver *driver, if (flags & VIR_MIGRATE_TUNNELLED) ret = doTunnelMigrate(driver, vm, st, NULL, 0, NULL, NULL, - flags, resource); + flags, resource, dconn); else ret = doNativeMigrate(driver, vm, uri_out, cookie, cookielen, NULL, NULL, /* No out cookie with v2 migration */ - flags, resource); + flags, resource, dconn); /* Perform failed. Make sure Finish doesn't overwrite the error */ if (ret < 0) @@ -2058,12 +2069,12 @@ static int doPeer2PeerMigrate3(struct qemud_driver *driver, ret = doTunnelMigrate(driver, vm, st, cookiein, cookieinlen, &cookieout, &cookieoutlen, - flags, resource); + flags, resource, dconn); else ret = doNativeMigrate(driver, vm, uri_out, cookiein, cookieinlen, &cookieout, &cookieoutlen, - flags, resource); + flags, resource, dconn); /* Perform failed. Make sure Finish doesn't overwrite the error */ if (ret < 0) { @@ -2301,7 +2312,7 @@ qemuMigrationPerformJob(struct qemud_driver *driver, qemuMigrationJobSetPhase(driver, vm, QEMU_MIGRATION_PHASE_PERFORM2); ret = doNativeMigrate(driver, vm, uri, cookiein, cookieinlen, cookieout, cookieoutlen, - flags, resource); + flags, resource, NULL); } if (ret < 0) goto endjob; @@ -2390,7 +2401,7 @@ qemuMigrationPerformPhase(struct qemud_driver *driver, resume = virDomainObjGetState(vm, NULL) == VIR_DOMAIN_RUNNING; ret = doNativeMigrate(driver, vm, uri, cookiein, cookieinlen, cookieout, cookieoutlen, - flags, resource); + flags, resource, NULL); if (ret < 0 && resume && virDomainObjGetState(vm, NULL) == VIR_DOMAIN_PAUSED) { @@ -2909,7 +2920,7 @@ qemuMigrationToFile(struct qemud_driver *driver, virDomainObjPtr vm, if (rc < 0) goto cleanup; - rc = qemuMigrationWaitForCompletion(driver, vm, asyncJob); + rc = qemuMigrationWaitForCompletion(driver, vm, asyncJob, NULL); if (rc < 0) goto cleanup; -- 1.7.6.1
participants (3)
-
Daniel P. Berrange
-
Eric Blake
-
Jiri Denemark