[libvirt] [PATCH 0/9] Implement keepalive protocol for libvirt RPC

This allows us to detect broken connections between server and client without waiting for TCP timeout and dead deamon/client. By default a connection is considered broken after about 30 seconds of no messages received from remote party. After that period, the connection is automatically closed. The main reason for implementing this is that peer-to-peer migration can now be canceled when a connection between source and target breaks. Although this will really work only after qemu fixes migrate_cancel command so that it doesn't block when outgoing TCP buffers are full. Jiri Denemark (9): Define keepalive protocol Implement common keepalive handling Introduce two public APIs for keepalive protocol Implement keepalive protocol in libvirt daemon Add support for non-blocking calls in client RPC Add support for async close of client RPC socket Implement keepalive protocol in remote driver Add keepalive support into domain-events examples qemu: Add support for keepalive messages during p2p migration .gitignore | 1 + daemon/libvirtd.aug | 4 + daemon/libvirtd.c | 11 + daemon/libvirtd.conf | 15 + daemon/remote.c | 38 ++ examples/domain-events/events-c/event-test.c | 11 + examples/domain-events/events-python/event-test.py | 3 + include/libvirt/libvirt.h.in | 5 + po/POTFILES.in | 1 + src/Makefile.am | 13 +- src/driver.h | 9 + src/libvirt.c | 107 +++++ src/libvirt_internal.h | 10 +- src/libvirt_public.syms | 6 + src/qemu/libvirtd_qemu.aug | 2 + src/qemu/qemu.conf | 16 + src/qemu/qemu_conf.c | 11 + src/qemu/qemu_conf.h | 3 + src/qemu/qemu_migration.c | 10 + src/qemu/test_libvirtd_qemu.aug | 6 + src/remote/remote_driver.c | 30 ++ src/remote/remote_protocol.x | 2 +- src/rpc/virkeepalive.c | 464 ++++++++++++++++++++ src/rpc/virkeepalive.h | 58 +++ src/rpc/virkeepaliveprotocol.x | 8 + src/rpc/virnetclient.c | 321 ++++++++++++-- src/rpc/virnetclient.h | 5 + src/rpc/virnetserver.c | 10 + src/rpc/virnetserver.h | 2 + src/rpc/virnetserverclient.c | 126 +++++- src/rpc/virnetserverclient.h | 6 + 31 files changed, 1254 insertions(+), 60 deletions(-) create mode 100644 src/rpc/virkeepalive.c create mode 100644 src/rpc/virkeepalive.h create mode 100644 src/rpc/virkeepaliveprotocol.x -- 1.7.6.1

The keepalive program has three procedures: ADVERTISE, PING, and PONG. All are used only in asynchronous messages and the sender doesn't wait for any reply. However, the party which receives PING messages is supposed to react by sending PONG message the other party, but no explicit binding between PING and PONG messages is made. ADVERTISE is sent by a client to indicate it supports keepalive protocol. Server is not allowed to send any keepalive message until it sees ADVERTISE. --- .gitignore | 1 + src/Makefile.am | 12 +++++++++--- src/rpc/virkeepaliveprotocol.x | 8 ++++++++ 3 files changed, 18 insertions(+), 3 deletions(-) create mode 100644 src/rpc/virkeepaliveprotocol.x diff --git a/.gitignore b/.gitignore index 41fa50f..3859fab 100644 --- a/.gitignore +++ b/.gitignore @@ -65,6 +65,7 @@ /src/locking/qemu-sanlock.conf /src/remote/*_client_bodies.h /src/remote/*_protocol.[ch] +/src/rpc/virkeepaliveprotocol.[ch] /src/rpc/virnetprotocol.[ch] /src/util/virkeymaps.h /tests/*.log diff --git a/src/Makefile.am b/src/Makefile.am index 738ee91..ff890e1 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -266,7 +266,8 @@ PDWTAGS = \ PROTOCOL_STRUCTS = \ $(srcdir)/remote_protocol-structs \ $(srcdir)/qemu_protocol-structs \ - $(srcdir)/virnetprotocol-structs + $(srcdir)/virnetprotocol-structs \ + $(srcdir)/virkeepaliveprotocol-structs if WITH_REMOTE # The .o file that pdwtags parses is created as a side effect of running # libtool; but from make's perspective we depend on the .lo file. @@ -274,6 +275,7 @@ $(srcdir)/%_protocol-structs: libvirt_driver_remote_la-%_protocol.lo $(PDWTAGS) $(srcdir)/virnetprotocol-structs: libvirt_net_rpc_la-virnetprotocol.lo $(PDWTAGS) +$(srcdir)/virkeepaliveprotocol-structs: libvirt_net_rpc_la-virkeepaliveprotocol.lo else !WITH_REMOTE # These generated files must live in git, because they cannot be re-generated # when configured --without-remote. @@ -1307,12 +1309,15 @@ noinst_LTLIBRARIES += \ EXTRA_DIST += \ rpc/virnetprotocol.x \ + rpc/virkeepaliveprotocol.x \ rpc/gendispatch.pl \ rpc/genprotocol.pl VIR_NET_RPC_GENERATED = \ $(srcdir)/rpc/virnetprotocol.h \ - $(srcdir)/rpc/virnetprotocol.c + $(srcdir)/rpc/virnetprotocol.c \ + $(srcdir)/rpc/virkeepaliveprotocol.h \ + $(srcdir)/rpc/virkeepaliveprotocol.c BUILT_SOURCES += $(VIR_NET_RPC_GENERATED) @@ -1320,7 +1325,8 @@ libvirt_net_rpc_la_SOURCES = \ rpc/virnetmessage.h rpc/virnetmessage.c \ rpc/virnetprotocol.h rpc/virnetprotocol.c \ rpc/virnetsocket.h rpc/virnetsocket.c \ - rpc/virnettlscontext.h rpc/virnettlscontext.c + rpc/virnettlscontext.h rpc/virnettlscontext.c \ + rpc/virkeepaliveprotocol.h rpc/virkeepaliveprotocol.c if HAVE_SASL libvirt_net_rpc_la_SOURCES += \ rpc/virnetsaslcontext.h rpc/virnetsaslcontext.c diff --git a/src/rpc/virkeepaliveprotocol.x b/src/rpc/virkeepaliveprotocol.x new file mode 100644 index 0000000..326ea78 --- /dev/null +++ b/src/rpc/virkeepaliveprotocol.x @@ -0,0 +1,8 @@ +const KEEPALIVE_PROGRAM = 0x6b656570; +const KEEPALIVE_VERSION = 1; + +enum keepalive_procedure { + KEEPALIVE_PROC_ADVERTISE = 1, + KEEPALIVE_PROC_PING = 2, + KEEPALIVE_PROC_PONG = 3 +}; -- 1.7.6.1

These APIs are used by both client and server RPC layer to handle processing of keepalive messages. --- po/POTFILES.in | 1 + src/Makefile.am | 3 +- src/rpc/virkeepalive.c | 464 ++++++++++++++++++++++++++++++++++++++++++++++++ src/rpc/virkeepalive.h | 58 ++++++ 4 files changed, 525 insertions(+), 1 deletions(-) create mode 100644 src/rpc/virkeepalive.c create mode 100644 src/rpc/virkeepalive.h diff --git a/po/POTFILES.in b/po/POTFILES.in index 5ce35ae..71254dd 100644 --- a/po/POTFILES.in +++ b/po/POTFILES.in @@ -72,6 +72,7 @@ src/qemu/qemu_monitor_text.c src/qemu/qemu_process.c src/remote/remote_client_bodies.h src/remote/remote_driver.c +src/rpc/virkeepalive.c src/rpc/virnetclient.c src/rpc/virnetclientprogram.c src/rpc/virnetclientstream.c diff --git a/src/Makefile.am b/src/Makefile.am index ff890e1..d983d28 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -1326,7 +1326,8 @@ libvirt_net_rpc_la_SOURCES = \ rpc/virnetprotocol.h rpc/virnetprotocol.c \ rpc/virnetsocket.h rpc/virnetsocket.c \ rpc/virnettlscontext.h rpc/virnettlscontext.c \ - rpc/virkeepaliveprotocol.h rpc/virkeepaliveprotocol.c + rpc/virkeepaliveprotocol.h rpc/virkeepaliveprotocol.c \ + rpc/virkeepalive.h rpc/virkeepalive.c if HAVE_SASL libvirt_net_rpc_la_SOURCES += \ rpc/virnetsaslcontext.h rpc/virnetsaslcontext.c diff --git a/src/rpc/virkeepalive.c b/src/rpc/virkeepalive.c new file mode 100644 index 0000000..5536b61 --- /dev/null +++ b/src/rpc/virkeepalive.c @@ -0,0 +1,464 @@ +/* + * virkeepalive.c: keepalive handling + * + * Copyright (C) 2011 Red Hat, Inc. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + * + * Author: Jiri Denemark <jdenemar@redhat.com> + */ + +#include <config.h> + +#include "memory.h" +#include "threads.h" +#include "virfile.h" +#include "logging.h" +#include "util.h" +#include "virterror_internal.h" +#include "virnetsocket.h" +#include "virkeepaliveprotocol.h" +#include "virkeepalive.h" + +#define VIR_FROM_THIS VIR_FROM_RPC +#define virNetError(code, ...) \ + virReportErrorHelper(VIR_FROM_THIS, code, __FILE__, \ + __FUNCTION__, __LINE__, __VA_ARGS__) + +struct _virKeepAlive { + int refs; + virMutex lock; + + int interval; + unsigned int count; + unsigned int countToDeath; + time_t lastPacketReceived; + bool advertised; + int timer; + + virNetMessagePtr response; + int responseTimer; + + virKeepAliveSendFunc sendCB; + virKeepAliveDeadFunc deadCB; + virKeepAliveFreeFunc freeCB; + void *client; +}; + + +static void +virKeepAliveLock(virKeepAlivePtr ka) +{ + virMutexLock(&ka->lock); +} + +static void +virKeepAliveUnlock(virKeepAlivePtr ka) +{ + virMutexUnlock(&ka->lock); +} + + +static virNetMessagePtr +virKeepAliveMessage(int proc) +{ + virNetMessagePtr msg; + + if (!(msg = virNetMessageNew(false))) + return NULL; + + msg->header.prog = KEEPALIVE_PROGRAM; + msg->header.vers = KEEPALIVE_VERSION; + msg->header.type = VIR_NET_MESSAGE; + msg->header.proc = proc; + + if (virNetMessageEncodeHeader(msg) < 0 || + virNetMessageEncodePayloadEmpty(msg) < 0) { + virNetMessageFree(msg); + return NULL; + } + + return msg; +} + + +static int +virKeepAliveSend(virKeepAlivePtr ka, virNetMessagePtr msg) +{ + int ret; + const char *proc; + void *client = ka->client; + virKeepAliveSendFunc sendCB = ka->sendCB; + + switch (msg->header.proc) { + case KEEPALIVE_PROC_ADVERTISE: + proc = "advertisement"; + break; + case KEEPALIVE_PROC_PING: + proc = "request"; + break; + case KEEPALIVE_PROC_PONG: + proc = "response"; + break; + } + + VIR_DEBUG("Sending keepalive %s to client %p", proc, ka->client); + + ka->refs++; + virKeepAliveUnlock(ka); + + if ((ret = sendCB(client, msg)) < 0) { + VIR_WARN("Failed to send keepalive %s to client %p", proc, client); + virNetMessageFree(msg); + } + + virKeepAliveLock(ka); + ka->refs--; + + return ret; +} + + +int +virKeepAliveAdvertise(virKeepAlivePtr ka) +{ + virNetMessagePtr msg; + int ret = -1; + + virKeepAliveLock(ka); + + VIR_DEBUG("Advertising keepalive support to client %p", ka->client); + + if (!(msg = virKeepAliveMessage(KEEPALIVE_PROC_ADVERTISE))) { + VIR_WARN("Failed to generate keepalive advertisement"); + goto cleanup; + } + + if ((ret = virKeepAliveSend(ka, msg)) == 0) + ka->advertised = true; + +cleanup: + virKeepAliveUnlock(ka); + return ret; +} + + +static void +virKeepAliveScheduleResponse(virKeepAlivePtr ka) +{ + if (ka->responseTimer == -1) + return; + + VIR_DEBUG("Scheduling keepalive response to client %p", ka->client); + + if (!ka->response && + !(ka->response = virKeepAliveMessage(KEEPALIVE_PROC_PONG))) { + VIR_WARN("Failed to generate keepalive response"); + return; + } + + virEventUpdateTimeout(ka->responseTimer, 0); +} + + +static void +virKeepAliveTimer(int timer ATTRIBUTE_UNUSED, void *opaque) +{ + virKeepAlivePtr ka = opaque; + time_t now = time(NULL); + + virKeepAliveLock(ka); + + VIR_DEBUG("ka=%p, client=%p, countToDeath=%d, lastPacketReceived=%lds ago", + ka, ka->client, ka->countToDeath, now - ka->lastPacketReceived); + + if (now - ka->lastPacketReceived < ka->interval - 1) { + int timeout = ka->interval - (now - ka->lastPacketReceived); + virEventUpdateTimeout(ka->timer, timeout * 1000); + goto cleanup; + } + + if (ka->countToDeath == 0) { + virKeepAliveDeadFunc deadCB = ka->deadCB; + void *client = ka->client; + + VIR_WARN("No response from client %p after %d keepalive messages in" + " %d seconds", + ka->client, + ka->count, + (int) (now - ka->lastPacketReceived)); + ka->refs++; + virKeepAliveUnlock(ka); + deadCB(client); + virKeepAliveLock(ka); + ka->refs--; + } else { + virNetMessagePtr msg; + + ka->countToDeath--; + if (!(msg = virKeepAliveMessage(KEEPALIVE_PROC_PING))) + VIR_WARN("Failed to generate keepalive request"); + else + ignore_value(virKeepAliveSend(ka, msg)); + virEventUpdateTimeout(ka->timer, ka->interval * 1000); + } + +cleanup: + virKeepAliveUnlock(ka); +} + + +static void +virKeepAliveResponseTimer(int timer ATTRIBUTE_UNUSED, void *opaque) +{ + virKeepAlivePtr ka = opaque; + virNetMessagePtr msg; + + virKeepAliveLock(ka); + + VIR_DEBUG("ka=%p, client=%p, response=%p", + ka, ka->client, ka->response); + + if (ka->response) { + msg = ka->response; + ka->response = NULL; + ignore_value(virKeepAliveSend(ka, msg)); + } + + virEventUpdateTimeout(ka->responseTimer, ka->response ? 0 : -1); + + virKeepAliveUnlock(ka); +} + + +static void +virKeepAliveTimerFree(void *opaque) +{ + virKeepAliveFree(opaque); +} + + +virKeepAlivePtr +virKeepAliveNew(int interval, + unsigned int count, + void *client, + virKeepAliveSendFunc sendCB, + virKeepAliveDeadFunc deadCB, + virKeepAliveFreeFunc freeCB) +{ + virKeepAlivePtr ka; + + VIR_DEBUG("client=%p, interval=%d, count=%u", client, interval, count); + + if (VIR_ALLOC(ka) < 0) { + virReportOOMError(); + return NULL; + } + + if (virMutexInit(&ka->lock) < 0) { + VIR_FREE(ka); + return NULL; + } + + ka->refs = 1; + ka->interval = interval; + ka->count = count; + ka->countToDeath = count; + ka->timer = -1; + ka->client = client; + ka->sendCB = sendCB; + ka->deadCB = deadCB; + ka->freeCB = freeCB; + + ka->responseTimer = virEventAddTimeout(-1, virKeepAliveResponseTimer, + ka, virKeepAliveTimerFree); + if (ka->responseTimer < 0) { + virKeepAliveFree(ka); + return NULL; + } + /* the timer now has a reference to ka */ + ka->refs++; + + return ka; +} + + +void +virKeepAliveRef(virKeepAlivePtr ka) +{ + virKeepAliveLock(ka); + ka->refs++; + VIR_DEBUG("ka=%p, client=%p, refs=%d", ka, ka->client, ka->refs); + virKeepAliveUnlock(ka); +} + + +void +virKeepAliveFree(virKeepAlivePtr ka) +{ + if (!ka) + return; + + virKeepAliveLock(ka); + VIR_DEBUG("ka=%p, client=%p, refs=%d", ka, ka->client, ka->refs); + if (--ka->refs > 0) { + virKeepAliveUnlock(ka); + return; + } + + virMutexDestroy(&ka->lock); + ka->freeCB(ka->client); + VIR_FREE(ka); +} + + +int +virKeepAliveStart(virKeepAlivePtr ka, + int interval, + unsigned int count) +{ + int ret = -1; + time_t delay; + int timeout; + + VIR_DEBUG("ka=%p, client=%p, interval=%d, count=%u", + ka, ka->client, interval, count); + + virKeepAliveLock(ka); + + if (!ka->advertised) { + virNetError(VIR_ERR_INTERNAL_ERROR, "%s", + _("keepalive support was not advertised to remote party")); + goto cleanup; + } + + if (ka->timer >= 0) { + VIR_DEBUG("Keepalive messages already enabled"); + ret = 0; + goto cleanup; + } + + if (interval > 0) { + if (ka->interval > 0) { + virNetError(VIR_ERR_INTERNAL_ERROR, "%s", + _("keepalive interval already set")); + goto cleanup; + } + ka->interval = interval; + ka->count = count; + ka->countToDeath = count; + } + + if (ka->interval <= 0) { + VIR_DEBUG("Keepalive messages disabled by configuration"); + ret = 0; + goto cleanup; + } + + VIR_DEBUG("Enabling keepalive messages; interval=%d, count=%u", + ka->interval, ka->count); + + delay = time(NULL) - ka->lastPacketReceived; + if (delay > ka->interval) + timeout = 0; + else + timeout = ka->interval - delay; + ka->timer = virEventAddTimeout(timeout * 1000, virKeepAliveTimer, + ka, virKeepAliveTimerFree); + if (ka->timer < 0) + goto cleanup; + + /* the timer now has another reference to this object */ + ka->refs++; + ret = 0; + +cleanup: + virKeepAliveUnlock(ka); + return ret; +} + + +void +virKeepAliveStop(virKeepAlivePtr ka) +{ + VIR_DEBUG("ka=%p, client=%p", ka, ka->client); + + virKeepAliveLock(ka); + if (ka->timer > 0) { + virEventRemoveTimeout(ka->timer); + ka->timer = -1; + } + if (ka->responseTimer > 0) { + virEventRemoveTimeout(ka->responseTimer); + ka->responseTimer = -1; + } + virKeepAliveUnlock(ka); +} + + +bool +virKeepAliveCheckMessage(virKeepAlivePtr ka, + virNetMessagePtr msg) +{ + bool ret = false; + bool start = false; + + VIR_DEBUG("ka=%p, client=%p, msg=%p", + ka, ka ? ka->client : "(null)", msg); + + if (!ka) + return false; + + virKeepAliveLock(ka); + + ka->countToDeath = ka->count; + ka->lastPacketReceived = time(NULL); + + if (msg->header.prog == KEEPALIVE_PROGRAM && + msg->header.vers == KEEPALIVE_VERSION && + msg->header.type == VIR_NET_MESSAGE) { + ret = true; + switch (msg->header.proc) { + case KEEPALIVE_PROC_ADVERTISE: + VIR_DEBUG("Client %p advertises keepalive support", ka->client); + ka->advertised = true; + start = true; + break; + + case KEEPALIVE_PROC_PING: + VIR_DEBUG("Got keepalive request from client %p", ka->client); + virKeepAliveScheduleResponse(ka); + break; + + case KEEPALIVE_PROC_PONG: + VIR_DEBUG("Got keepalive response from client %p", ka->client); + break; + + default: + VIR_DEBUG("Ignoring unknown keepalive message %d from client %p", + msg->header.proc, ka->client); + } + } + + if (ka->timer >= 0) + virEventUpdateTimeout(ka->timer, ka->interval * 1000); + + virKeepAliveUnlock(ka); + + if (start && virKeepAliveStart(ka, 0, 0) < 0) + VIR_WARN("Failed to start keepalive protocol"); + + return ret; +} diff --git a/src/rpc/virkeepalive.h b/src/rpc/virkeepalive.h new file mode 100644 index 0000000..ac96859 --- /dev/null +++ b/src/rpc/virkeepalive.h @@ -0,0 +1,58 @@ +/* + * virkeepalive.h: keepalive handling + * + * Copyright (C) 2011 Red Hat, Inc. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + * + * Author: Jiri Denemark <jdenemar@redhat.com> + */ + +#ifndef __VIR_KEEPALIVE_H__ +# define __VIR_KEEPALIVE_H__ + +# include "virnetmessage.h" + +typedef int (*virKeepAliveSendFunc)(void *client, virNetMessagePtr msg); +typedef void (*virKeepAliveDeadFunc)(void *client); +typedef void (*virKeepAliveFreeFunc)(void *client); + +typedef struct _virKeepAlive virKeepAlive; +typedef virKeepAlive *virKeepAlivePtr; + + +virKeepAlivePtr virKeepAliveNew(int interval, + unsigned int count, + void *client, + virKeepAliveSendFunc sendCB, + virKeepAliveDeadFunc deadCB, + virKeepAliveFreeFunc freeCB) + ATTRIBUTE_NONNULL(3) ATTRIBUTE_NONNULL(4) + ATTRIBUTE_NONNULL(5) ATTRIBUTE_NONNULL(6); + +void virKeepAliveRef(virKeepAlivePtr ka); +void virKeepAliveFree(virKeepAlivePtr ka); + +int virKeepAliveAdvertise(virKeepAlivePtr ka); + +int virKeepAliveStart(virKeepAlivePtr ka, + int interval, + unsigned int count); +void virKeepAliveStop(virKeepAlivePtr ka); + +bool virKeepAliveCheckMessage(virKeepAlivePtr ka, + virNetMessagePtr msg); + +#endif /* __VIR_KEEPALIVE_H__ */ -- 1.7.6.1

This introduces virConnectAllowKeepAlive and virConnectStartKeepAlive public APIs which can be used by a client connecting to remote server to indicate support for keepalive protocol. Both APIs are handled directly by remote driver and not transmitted over the wire to the server. --- include/libvirt/libvirt.h.in | 5 ++ src/driver.h | 9 ++++ src/libvirt.c | 107 ++++++++++++++++++++++++++++++++++++++++++ src/libvirt_internal.h | 10 +++- src/libvirt_public.syms | 6 ++ 5 files changed, 135 insertions(+), 2 deletions(-) diff --git a/include/libvirt/libvirt.h.in b/include/libvirt/libvirt.h.in index 39155a6..6f61cc0 100644 --- a/include/libvirt/libvirt.h.in +++ b/include/libvirt/libvirt.h.in @@ -3210,6 +3210,11 @@ typedef struct _virTypedParameter virMemoryParameter; */ typedef virMemoryParameter *virMemoryParameterPtr; +int virConnectAllowKeepAlive(virConnectPtr conn); +int virConnectStartKeepAlive(virConnectPtr conn, + int interval, + unsigned int count); + #ifdef __cplusplus } #endif diff --git a/src/driver.h b/src/driver.h index 3792003..cd17d83 100644 --- a/src/driver.h +++ b/src/driver.h @@ -718,6 +718,13 @@ typedef int (*virDrvDomainBlockPull)(virDomainPtr dom, const char *path, unsigned long bandwidth, unsigned int flags); +typedef int + (*virDrvAllowKeepAlive)(virConnectPtr conn); + +typedef int + (*virDrvStartKeepAlive)(virConnectPtr conn, + int interval, + unsigned int count); /** * _virDriver: @@ -872,6 +879,8 @@ struct _virDriver { virDrvDomainGetBlockJobInfo domainGetBlockJobInfo; virDrvDomainBlockJobSetSpeed domainBlockJobSetSpeed; virDrvDomainBlockPull domainBlockPull; + virDrvAllowKeepAlive allowKeepAlive; + virDrvStartKeepAlive startKeepAlive; }; typedef int diff --git a/src/libvirt.c b/src/libvirt.c index 8f94b11..138f367 100644 --- a/src/libvirt.c +++ b/src/libvirt.c @@ -16590,3 +16590,110 @@ error: virDispatchError(dom->conn); return -1; } + +/** + * virConnectAllowKeepAlive: + * @conn: pointer to a hypervisor connection + * + * Tell remote party we support keepalive messages so the it can use them and + * we will respond to them. To actually start sending keepalive messages to a + * client needs to call virConnectStartKeepAlive(). + * + * Note: client has to implement and run event loop to be able to respond to + * asynchronous keepalive messages. If a client doesn't run event loop but + * still calls this API, every connection made may be automatically closed by + * remote party after a certain period of inactivity. + * + * Returns -1 on error, 0 on success, 1 when remote party doesn't support + * keepalive messages. + */ +int virConnectAllowKeepAlive(virConnectPtr conn) +{ + int ret = -1; + + VIR_DEBUG("conn=%p", conn); + + virResetLastError(); + + if (!VIR_IS_CONNECT(conn)) { + virLibConnError(VIR_ERR_INVALID_CONN, __FUNCTION__); + virDispatchError(NULL); + return -1; + } + + if (!VIR_DRV_SUPPORTS_FEATURE(conn->driver, conn, + VIR_DRV_FEATURE_PROGRAM_KEEPALIVE)) { + VIR_DEBUG("Remote party doesn't support keepalive messages"); + return 1; + } + + if (conn->driver->allowKeepAlive) { + ret = conn->driver->allowKeepAlive(conn); + if (ret < 0) + goto error; + return ret; + } + + virLibConnError(VIR_ERR_NO_SUPPORT, __FUNCTION__); + +error: + virDispatchError(conn); + return -1; +} + +/** + * virConnectStartKeepAlive: + * @conn: pointer to a hypervisor connection + * @interval: number of seconds of inactivity before a keepalive message is sent + * @count: number of messages that can be sent in a row + * + * Start sending keepalive messages after interval second of inactivity and + * consider the connection to be broken when no response is received after + * count keepalive messages sent in a row. In other words, sending count + 1 + * keepalive message results in closing the connection. + * + * This API may be called only after calling virConnectAllowKeepAlive and + * checking it returned 0, which ensures remote party supports keepalive + * protocol. Failure to do so will be detected and reported as an error. + * + * Note: client has to implement and run event loop to be able to use keepalive + * messages. Failture to do so may result in connections being closed + * unexpectedly. + * + * Returns 0 on success, -1 on error. + */ +int virConnectStartKeepAlive(virConnectPtr conn, + int interval, + unsigned int count) +{ + int ret = -1; + + VIR_DEBUG("conn=%p, interval=%d, count=%u", conn, interval, count); + + virResetLastError(); + + if (!VIR_IS_CONNECT(conn)) { + virLibConnError(VIR_ERR_INVALID_CONN, __FUNCTION__); + virDispatchError(NULL); + return -1; + } + + if (interval <= 0) { + virLibConnError(VIR_ERR_INVALID_ARG, + _("negative or zero interval make no sense")); + goto error; + } + + if (conn->driver->startKeepAlive) { + ret = conn->driver->startKeepAlive(conn, interval, count); + if (ret < 0) + goto error; + return ret; + } + + virLibConnError(VIR_ERR_NO_SUPPORT, __FUNCTION__); + +error: + virDispatchError(conn); + return -1; +} diff --git a/src/libvirt_internal.h b/src/libvirt_internal.h index 6e44341..dbbf7e0 100644 --- a/src/libvirt_internal.h +++ b/src/libvirt_internal.h @@ -39,8 +39,8 @@ int virStateActive(void); * * The remote driver passes features through to the real driver at the * remote end unmodified, except if you query a VIR_DRV_FEATURE_REMOTE* - * feature. - * + * feature. Queries for VIR_DRV_FEATURE_PROGRAM* features are answered + * directly by the RPC layer and not by the real driver. */ enum { /* Driver supports V1-style virDomainMigrate, ie. domainMigratePrepare/ @@ -79,6 +79,12 @@ enum { * to domain configuration, i.e., starting from Begin3 and not Perform3. */ VIR_DRV_FEATURE_MIGRATE_CHANGE_PROTECTION = 7, + + /* + * Remote party supports keepalive program (i.e., sending keepalive + * messages). + */ + VIR_DRV_FEATURE_PROGRAM_KEEPALIVE = 8, }; diff --git a/src/libvirt_public.syms b/src/libvirt_public.syms index 8a6d55a..f7441d7 100644 --- a/src/libvirt_public.syms +++ b/src/libvirt_public.syms @@ -489,4 +489,10 @@ LIBVIRT_0.9.5 { virDomainSnapshotGetName; } LIBVIRT_0.9.4; +LIBVIRT_0.9.7 { + global: + virConnectAllowKeepAlive; + virConnectStartKeepAlive; +} LIBVIRT_0.9.5; + # .... define new API here using predicted next version number .... -- 1.7.6.1

--- daemon/libvirtd.aug | 4 + daemon/libvirtd.c | 11 ++++ daemon/libvirtd.conf | 15 +++++ daemon/remote.c | 38 +++++++++++++ src/remote/remote_protocol.x | 2 +- src/rpc/virnetserver.c | 10 +++ src/rpc/virnetserver.h | 2 + src/rpc/virnetserverclient.c | 126 +++++++++++++++++++++++++++++++++++++++--- src/rpc/virnetserverclient.h | 6 ++ 9 files changed, 204 insertions(+), 10 deletions(-) diff --git a/daemon/libvirtd.aug b/daemon/libvirtd.aug index ce00db5..6cd3f28 100644 --- a/daemon/libvirtd.aug +++ b/daemon/libvirtd.aug @@ -66,6 +66,9 @@ module Libvirtd = let auditing_entry = int_entry "audit_level" | bool_entry "audit_logging" + let keepalive_entry = int_entry "keepalive_interval" + | int_entry "keepalive_count" + (* Each enty in the config is one of the following three ... *) let entry = network_entry | sock_acl_entry @@ -75,6 +78,7 @@ module Libvirtd = | processing_entry | logging_entry | auditing_entry + | keepalive_entry let comment = [ label "#comment" . del /#[ \t]*/ "# " . store /([^ \t\n][^\n]*)?/ . del /\n/ "\n" ] let empty = [ label "#empty" . eol ] diff --git a/daemon/libvirtd.c b/daemon/libvirtd.c index d1bc3dd..4ad2b2e 100644 --- a/daemon/libvirtd.c +++ b/daemon/libvirtd.c @@ -146,6 +146,9 @@ struct daemonConfig { int audit_level; int audit_logging; + + int keepalive_interval; + unsigned int keepalive_count; }; enum { @@ -899,6 +902,9 @@ daemonConfigNew(bool privileged ATTRIBUTE_UNUSED) data->audit_level = 1; data->audit_logging = 0; + data->keepalive_interval = 5; + data->keepalive_count = 5; + localhost = virGetHostname(NULL); if (localhost == NULL) { /* we couldn't resolve the hostname; assume that we are @@ -1062,6 +1068,9 @@ daemonConfigLoad(struct daemonConfig *data, GET_CONF_STR (conf, filename, log_outputs); GET_CONF_INT (conf, filename, log_buffer_size); + GET_CONF_INT (conf, filename, keepalive_interval); + GET_CONF_INT (conf, filename, keepalive_count); + virConfFree (conf); return 0; @@ -1452,6 +1461,8 @@ int main(int argc, char **argv) { config->max_workers, config->prio_workers, config->max_clients, + config->keepalive_interval, + config->keepalive_count, config->mdns_adv ? config->mdns_name : NULL, use_polkit_dbus, remoteClientInitHook))) { diff --git a/daemon/libvirtd.conf b/daemon/libvirtd.conf index da3983e..4f6ab9f 100644 --- a/daemon/libvirtd.conf +++ b/daemon/libvirtd.conf @@ -366,3 +366,18 @@ # it with the output of the 'uuidgen' command and then # uncomment this entry #host_uuid = "00000000-0000-0000-0000-000000000000" + +################################################################### +# Keepalive protocol: +# This allows libvirtd to detect broken client connections or even +# dead client. A keepalive message is sent to a client after +# keepalive_interval seconds of inactivity to check if the client is +# still responding; keepalive_count is a maximum number of keepalive +# messages that are allowed to be sent to the client without getting +# any response before the connection is considered broken. In other +# words, the connection is automatically closed approximately after +# keepalive_interval * (keepalive_count + 1) seconds since the last +# message received from the client. +# +#keepalive_interval = 5 +#keepalive_count = 5 diff --git a/daemon/remote.c b/daemon/remote.c index 245d41c..946bb7e 100644 --- a/daemon/remote.c +++ b/daemon/remote.c @@ -3112,6 +3112,44 @@ cleanup: } +static int +remoteDispatchSupportsFeature(virNetServerPtr server ATTRIBUTE_UNUSED, + virNetServerClientPtr client, + virNetMessageHeaderPtr hdr ATTRIBUTE_UNUSED, + virNetMessageErrorPtr rerr, + remote_supports_feature_args *args, + remote_supports_feature_ret *ret) +{ + int rv = -1; + int supported; + struct daemonClientPrivate *priv = + virNetServerClientGetPrivateData(client); + + if (!priv->conn) { + virNetError(VIR_ERR_INTERNAL_ERROR, "%s", _("connection not open")); + goto cleanup; + } + + if (args->feature == VIR_DRV_FEATURE_PROGRAM_KEEPALIVE) { + supported = 1; + goto done; + } + + if ((supported = virDrvSupportsFeature(priv->conn, args->feature)) < 0) + goto cleanup; + +done: + ret->supported = supported; + rv = 0; + +cleanup: + if (rv < 0) + virNetMessageSaveError(rerr); + return rv; +} + + + /*----- Helpers. -----*/ /* get_nonnull_domain and get_nonnull_network turn an on-wire diff --git a/src/remote/remote_protocol.x b/src/remote/remote_protocol.x index 455e324..ac0cd0e 100644 --- a/src/remote/remote_protocol.x +++ b/src/remote/remote_protocol.x @@ -2307,7 +2307,7 @@ enum remote_procedure { REMOTE_PROC_DOMAIN_GET_SCHEDULER_PARAMETERS = 57, /* skipgen autogen */ REMOTE_PROC_DOMAIN_SET_SCHEDULER_PARAMETERS = 58, /* autogen autogen */ REMOTE_PROC_GET_HOSTNAME = 59, /* autogen autogen priority:high */ - REMOTE_PROC_SUPPORTS_FEATURE = 60, /* autogen autogen priority:high */ + REMOTE_PROC_SUPPORTS_FEATURE = 60, /* skipgen autogen priority:high */ REMOTE_PROC_DOMAIN_MIGRATE_PREPARE = 61, /* skipgen skipgen */ REMOTE_PROC_DOMAIN_MIGRATE_PERFORM = 62, /* autogen autogen */ diff --git a/src/rpc/virnetserver.c b/src/rpc/virnetserver.c index d71ed18..34dac66 100644 --- a/src/rpc/virnetserver.c +++ b/src/rpc/virnetserver.c @@ -102,6 +102,9 @@ struct _virNetServer { size_t nclients_max; virNetServerClientPtr *clients; + int keepaliveInterval; + unsigned int keepaliveCount; + unsigned int quit :1; virNetTLSContextPtr tls; @@ -239,6 +242,9 @@ static int virNetServerDispatchNewClient(virNetServerServicePtr svc ATTRIBUTE_UN virNetServerDispatchNewMessage, srv); + virNetServerClientInitKeepAlive(client, srv->keepaliveInterval, + srv->keepaliveCount); + virNetServerUnlock(srv); return 0; @@ -278,6 +284,8 @@ virNetServerPtr virNetServerNew(size_t min_workers, size_t max_workers, size_t priority_workers, size_t max_clients, + int keepaliveInterval, + unsigned int keepaliveCount, const char *mdnsGroupName, bool connectDBus ATTRIBUTE_UNUSED, virNetServerClientInitHook clientInitHook) @@ -299,6 +307,8 @@ virNetServerPtr virNetServerNew(size_t min_workers, goto error; srv->nclients_max = max_clients; + srv->keepaliveInterval = keepaliveInterval; + srv->keepaliveCount = keepaliveCount; srv->sigwrite = srv->sigread = -1; srv->clientInitHook = clientInitHook; srv->privileged = geteuid() == 0 ? true : false; diff --git a/src/rpc/virnetserver.h b/src/rpc/virnetserver.h index cc9d039..99b65d6 100644 --- a/src/rpc/virnetserver.h +++ b/src/rpc/virnetserver.h @@ -41,6 +41,8 @@ virNetServerPtr virNetServerNew(size_t min_workers, size_t max_workers, size_t priority_workers, size_t max_clients, + int keepaliveInterval, + unsigned int keepaliveCount, const char *mdnsGroupName, bool connectDBus, virNetServerClientInitHook clientInitHook); diff --git a/src/rpc/virnetserverclient.c b/src/rpc/virnetserverclient.c index 412814d..9bd8914 100644 --- a/src/rpc/virnetserverclient.c +++ b/src/rpc/virnetserverclient.c @@ -33,6 +33,7 @@ #include "virterror_internal.h" #include "memory.h" #include "threads.h" +#include "virkeepalive.h" #define VIR_FROM_THIS VIR_FROM_RPC #define virNetError(code, ...) \ @@ -98,6 +99,9 @@ struct _virNetServerClient void *privateData; virNetServerClientFreeFunc privateDataFreeFunc; virNetServerClientCloseFunc privateDataCloseFunc; + + virKeepAlivePtr keepalive; + int keepaliveFilter; }; @@ -207,15 +211,14 @@ static void virNetServerClientUpdateEvent(virNetServerClientPtr client) } -int virNetServerClientAddFilter(virNetServerClientPtr client, - virNetServerClientFilterFunc func, - void *opaque) +static int +virNetServerClientAddFilterLocked(virNetServerClientPtr client, + virNetServerClientFilterFunc func, + void *opaque) { virNetServerClientFilterPtr filter; int ret = -1; - virNetServerClientLock(client); - if (VIR_ALLOC(filter) < 0) { virReportOOMError(); goto cleanup; @@ -231,16 +234,26 @@ int virNetServerClientAddFilter(virNetServerClientPtr client, ret = filter->id; cleanup: - virNetServerClientUnlock(client); return ret; } +int virNetServerClientAddFilter(virNetServerClientPtr client, + virNetServerClientFilterFunc func, + void *opaque) +{ + int ret; -void virNetServerClientRemoveFilter(virNetServerClientPtr client, - int filterID) + virNetServerClientLock(client); + ret = virNetServerClientAddFilterLocked(client, func, opaque); + virNetServerClientUnlock(client); + return ret; +} + +static void +virNetServerClientRemoveFilterLocked(virNetServerClientPtr client, + int filterID) { virNetServerClientFilterPtr tmp, prev; - virNetServerClientLock(client); prev = NULL; tmp = client->filters; @@ -257,7 +270,13 @@ void virNetServerClientRemoveFilter(virNetServerClientPtr client, prev = tmp; tmp = tmp->next; } +} +void virNetServerClientRemoveFilter(virNetServerClientPtr client, + int filterID) +{ + virNetServerClientLock(client); + virNetServerClientRemoveFilterLocked(client, filterID); virNetServerClientUnlock(client); } @@ -318,6 +337,7 @@ virNetServerClientPtr virNetServerClientNew(virNetSocketPtr sock, client->readonly = readonly; client->tlsCtxt = tls; client->nrequests_max = nrequests_max; + client->keepaliveFilter = -1; if (tls) virNetTLSContextRef(tls); @@ -571,6 +591,7 @@ void virNetServerClientFree(virNetServerClientPtr client) void virNetServerClientClose(virNetServerClientPtr client) { virNetServerClientCloseFunc cf; + virKeepAlivePtr ka; virNetServerClientLock(client); VIR_DEBUG("client=%p refs=%d", client, client->refs); @@ -579,6 +600,20 @@ void virNetServerClientClose(virNetServerClientPtr client) return; } + if (client->keepaliveFilter >= 0) + virNetServerClientRemoveFilterLocked(client, client->keepaliveFilter); + + if (client->keepalive) { + virKeepAliveStop(client->keepalive); + ka = client->keepalive; + client->keepalive = NULL; + client->refs++; + virNetServerClientUnlock(client); + virKeepAliveFree(ka); + virNetServerClientLock(client); + client->refs--; + } + if (client->privateDataCloseFunc) { cf = client->privateDataCloseFunc; client->refs++; @@ -976,6 +1011,7 @@ int virNetServerClientSendMessage(virNetServerClientPtr client, VIR_DEBUG("msg=%p proc=%d len=%zu offset=%zu", msg, msg->header.proc, msg->bufferLength, msg->bufferOffset); + virNetServerClientLock(client); if (client->sock && !client->wantClose) { @@ -986,6 +1022,7 @@ int virNetServerClientSendMessage(virNetServerClientPtr client, } virNetServerClientUnlock(client); + return ret; } @@ -999,3 +1036,74 @@ bool virNetServerClientNeedAuth(virNetServerClientPtr client) virNetServerClientUnlock(client); return need; } + + +static void +virNetServerClientKeepAliveDeadCB(void *opaque) +{ + virNetServerClientImmediateClose(opaque); +} + +static int +virNetServerClientKeepAliveSendCB(void *opaque, + virNetMessagePtr msg) +{ + return virNetServerClientSendMessage(opaque, msg); +} + +static void +virNetServerClientFreeCB(void *opaque) +{ + virNetServerClientFree(opaque); +} + +static int +virNetServerClientKeepAliveFilter(virNetServerClientPtr client, + virNetMessagePtr msg, + void *opaque ATTRIBUTE_UNUSED) +{ + if (virKeepAliveCheckMessage(client->keepalive, msg)) { + virNetMessageFree(msg); + client->nrequests--; + return 1; + } + + return 0; +} + +int +virNetServerClientInitKeepAlive(virNetServerClientPtr client, + int interval, + unsigned int count) +{ + virKeepAlivePtr ka; + int ret = -1; + + virNetServerClientLock(client); + + if (!(ka = virKeepAliveNew(interval, count, client, + virNetServerClientKeepAliveSendCB, + virNetServerClientKeepAliveDeadCB, + virNetServerClientFreeCB))) + goto cleanup; + /* keepalive object has a reference to client */ + client->refs++; + + client->keepaliveFilter = + virNetServerClientAddFilterLocked(client, + virNetServerClientKeepAliveFilter, + NULL); + if (client->keepaliveFilter < 0) + goto cleanup; + + client->keepalive = ka; + ka = NULL; + +cleanup: + virNetServerClientUnlock(client); + if (ka) + virKeepAliveStop(ka); + virKeepAliveFree(ka); + + return ret; +} diff --git a/src/rpc/virnetserverclient.h b/src/rpc/virnetserverclient.h index bedb179..1987c6d 100644 --- a/src/rpc/virnetserverclient.h +++ b/src/rpc/virnetserverclient.h @@ -99,6 +99,12 @@ bool virNetServerClientWantClose(virNetServerClientPtr client); int virNetServerClientInit(virNetServerClientPtr client); +int virNetServerClientInitKeepAlive(virNetServerClientPtr client, + int interval, + unsigned int count); +bool virNetServerClientCheckKeepAlive(virNetServerClientPtr client, + virNetMessagePtr msg); + const char *virNetServerClientLocalAddrString(virNetServerClientPtr client); const char *virNetServerClientRemoteAddrString(virNetServerClientPtr client); -- 1.7.6.1

When a client wants to send a keepalive message it needs to do so in a non-blocking way to avoid blocking its event loop. This patch adds dontBlock flag which says that the call should be processed without blocking. Such calls do not have a thread waiting for the result associated with them. This means, that sending such call fails if no thread is dispatching and writing to the socket would block. In case there is a thread waiting for its (normal) call to finish, sending non-blocking call just pushes it into the queue and lets the dispatching thread send it. The thread which has the buck tries to send all non-blocking calls in the queue in a best effort way---if sending them would block or there's an error on the socket, non-blocking calls are simply removed from the queue and discarded. --- src/rpc/virnetclient.c | 149 ++++++++++++++++++++++++++++++++++++------------ 1 files changed, 113 insertions(+), 36 deletions(-) diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c index 055361d..7ea9a27 100644 --- a/src/rpc/virnetclient.c +++ b/src/rpc/virnetclient.c @@ -55,6 +55,7 @@ struct _virNetClientCall { virNetMessagePtr msg; bool expectReply; + bool dontBlock; virCond cond; @@ -94,6 +95,11 @@ struct _virNetClient { }; +static int virNetClientSendInternal(virNetClientPtr client, + virNetMessagePtr msg, + bool expectReply, + bool dontBlock); + static void virNetClientLock(virNetClientPtr client) { virMutexLock(&client->lock); @@ -848,6 +854,7 @@ static int virNetClientIOEventLoop(virNetClientPtr client, char ignore; sigset_t oldmask, blockedsigs; int timeout = -1; + bool discardNonBlocking; /* If we have existing SASL decoded data we * don't want to sleep in the poll(), just @@ -865,6 +872,11 @@ static int virNetClientIOEventLoop(virNetClientPtr client, fds[0].events |= POLLIN; if (tmp->mode == VIR_NET_CLIENT_MODE_WAIT_TX) fds[0].events |= POLLOUT; + /* We don't want to sleep in poll if any of the calls is + * non-blocking + */ + if (tmp->dontBlock) + timeout = 0; tmp = tmp->next; } @@ -937,35 +949,63 @@ static int virNetClientIOEventLoop(virNetClientPtr client, goto error; } - /* Iterate through waiting threads and if - * any are complete then tell 'em to wakeup + /* Iterate through waiting calls and + * - remove all completed nonblocking calls + * - remove all nonblocking calls in case poll() would block + * - remove all nonblocking calls if we got error from poll() + * - wake up threads waiting for calls that have been completed */ + discardNonBlocking = ret == 0 || + (fds[0].revents & POLLHUP) || + (fds[0].revents & POLLERR); tmp = client->waitDispatch; prev = NULL; while (tmp) { + virNetClientCallPtr next = tmp->next; + if (tmp != thiscall && - tmp->mode == VIR_NET_CLIENT_MODE_COMPLETE) { + (tmp->mode == VIR_NET_CLIENT_MODE_COMPLETE || + (discardNonBlocking && tmp->dontBlock))) { /* Take them out of the list */ if (prev) prev->next = tmp->next; else client->waitDispatch = tmp->next; - /* And wake them up.... - * ...they won't actually wakeup until - * we release our mutex a short while - * later... - */ - VIR_DEBUG("Waking up sleep %p %p", tmp, client->waitDispatch); - virCondSignal(&tmp->cond); + if (tmp->dontBlock) { + /* tmp is a non-blocking call, no-one is waiting for it so + * we just free it here + */ + if (tmp->mode != VIR_NET_CLIENT_MODE_COMPLETE) { + VIR_DEBUG("Can't finish nonblocking call %p without" + " blocking or error", tmp); + } + virNetMessageFree(tmp->msg); + VIR_FREE(tmp); + } else { + /* And wake them up.... + * ...they won't actually wakeup until + * we release our mutex a short while + * later... + */ + VIR_DEBUG("Waking up sleep %p %p", + tmp, client->waitDispatch); + virCondSignal(&tmp->cond); + } } else { prev = tmp; } - tmp = tmp->next; + tmp = next; } /* Now see if *we* are done */ if (thiscall->mode == VIR_NET_CLIENT_MODE_COMPLETE) { + /* If next call is non-blocking call, we need to process it + * before giving up the buck + */ + if (thiscall->next && thiscall->next->dontBlock) + continue; + /* We're at head of the list already, so * remove us */ @@ -980,14 +1020,18 @@ static int virNetClientIOEventLoop(virNetClientPtr client, return 0; } - if (fds[0].revents & (POLLHUP | POLLERR)) { virNetError(VIR_ERR_INTERNAL_ERROR, "%s", _("received hangup / error event on socket")); goto error; } - } + if (thiscall->dontBlock && discardNonBlocking) { + virNetError(VIR_ERR_INTERNAL_ERROR, "%s", + _("Can't finish nonblocking call without blocking")); + goto error; + } + } error: client->waitDispatch = thiscall->next; @@ -1040,38 +1084,41 @@ static int virNetClientIO(virNetClientPtr client, { int rv = -1; - VIR_DEBUG("Outgoing message prog=%u version=%u serial=%u proc=%d type=%d length=%zu dispatch=%p", + VIR_DEBUG("Outgoing message prog=%u version=%u serial=%u proc=%d type=%d" + " length=%zu dontBlock=%d dispatch=%p", thiscall->msg->header.prog, thiscall->msg->header.vers, thiscall->msg->header.serial, thiscall->msg->header.proc, thiscall->msg->header.type, thiscall->msg->bufferLength, + thiscall->dontBlock, client->waitDispatch); /* Check to see if another thread is dispatching */ if (client->waitDispatch) { - /* Stick ourselves on the end of the wait queue */ - virNetClientCallPtr tmp = client->waitDispatch; + virNetClientCallPtr tmp; char ignore = 1; - while (tmp && tmp->next) - tmp = tmp->next; - if (tmp) - tmp->next = thiscall; - else - client->waitDispatch = thiscall; /* Force other thread to wakeup from poll */ if (safewrite(client->wakeupSendFD, &ignore, sizeof(ignore)) != sizeof(ignore)) { - if (tmp) - tmp->next = NULL; - else - client->waitDispatch = NULL; virReportSystemError(errno, "%s", _("failed to wake up polling thread")); return -1; } + /* Stick ourselves on the end of the wait queue */ + tmp = client->waitDispatch; + while (tmp->next) + tmp = tmp->next; + tmp->next = thiscall; + + if (thiscall->dontBlock) { + VIR_DEBUG("Sending non-blocking call while another thread is" + " dispatching; it will send the call for us"); + return 0; + } + VIR_DEBUG("Going to sleep %p %p", client->waitDispatch, thiscall); /* Go to sleep while other thread is working... */ if (virCondWait(&thiscall->cond, &client->lock) < 0) { @@ -1091,7 +1138,7 @@ static int virNetClientIO(virNetClientPtr client, return -1; } - VIR_DEBUG("Wokeup from sleep %p %p", client->waitDispatch, thiscall); + VIR_DEBUG("Woken up from sleep %p %p", client->waitDispatch, thiscall); /* Two reasons we can be woken up * 1. Other thread has got our reply ready for us * 2. Other thread is all done, and it is our turn to @@ -1181,15 +1228,17 @@ done: } -int virNetClientSend(virNetClientPtr client, - virNetMessagePtr msg, - bool expectReply) +static int +virNetClientSendInternal(virNetClientPtr client, + virNetMessagePtr msg, + bool expectReply, + bool dontBlock) { virNetClientCallPtr call; int ret = -1; if (expectReply && - (msg->header.status == VIR_NET_CONTINUE)) { + (msg->header.status == VIR_NET_CONTINUE || dontBlock)) { virNetError(VIR_ERR_INTERNAL_ERROR, "%s", _("Attempt to send an asynchronous message with a synchronous reply")); return -1; @@ -1202,10 +1251,15 @@ int virNetClientSend(virNetClientPtr client, virNetClientLock(client); - if (virCondInit(&call->cond) < 0) { - virNetError(VIR_ERR_INTERNAL_ERROR, "%s", - _("cannot initialize condition variable")); - goto cleanup; + /* We don't need call->cond for non-blocking calls since there's no + * thread to be woken up anyway + */ + if (!dontBlock) { + if (virCondInit(&call->cond) < 0) { + virNetError(VIR_ERR_INTERNAL_ERROR, "%s", + _("cannot initialize condition variable")); + goto cleanup; + } } if (msg->bufferLength) @@ -1214,12 +1268,35 @@ int virNetClientSend(virNetClientPtr client, call->mode = VIR_NET_CLIENT_MODE_WAIT_RX; call->msg = msg; call->expectReply = expectReply; + call->dontBlock = dontBlock; ret = virNetClientIO(client, call); cleanup: ignore_value(virCondDestroy(&call->cond)); - VIR_FREE(call); + if (ret != 0) { + VIR_FREE(call); + } else if (dontBlock) { + /* Only free the call if it was completed since otherwise it was just + * queued up and will be processed later. + */ + if (call->mode == VIR_NET_CLIENT_MODE_COMPLETE) { + /* We need to free the message as well since no-one is waiting for + * it. + */ + virNetMessageFree(msg); + VIR_FREE(call); + } + } else { + VIR_FREE(call); + } virNetClientUnlock(client); return ret; } + +int virNetClientSend(virNetClientPtr client, + virNetMessagePtr msg, + bool expectReply) +{ + return virNetClientSendInternal(client, msg, expectReply, false); +} -- 1.7.6.1

--- src/rpc/virnetclient.c | 76 ++++++++++++++++++++++++++++++++++++++++++------ 1 files changed, 67 insertions(+), 9 deletions(-) diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c index 7ea9a27..45b0edb 100644 --- a/src/rpc/virnetclient.c +++ b/src/rpc/virnetclient.c @@ -92,9 +92,13 @@ struct _virNetClient { size_t nstreams; virNetClientStreamPtr *streams; + + bool wantClose; }; +void virNetClientRequestClose(virNetClientPtr client); + static int virNetClientSendInternal(virNetClientPtr client, virNetMessagePtr msg, bool expectReply, @@ -297,12 +301,14 @@ void virNetClientFree(virNetClientPtr client) } -void virNetClientClose(virNetClientPtr client) +static void +virNetClientCloseLocked(virNetClientPtr client) { - if (!client) + VIR_DEBUG("client=%p, sock=%p", client, client->sock); + + if (!client->sock) return; - virNetClientLock(client); virNetSocketRemoveIOCallback(client->sock); virNetSocketFree(client->sock); client->sock = NULL; @@ -312,6 +318,41 @@ void virNetClientClose(virNetClientPtr client) virNetSASLSessionFree(client->sasl); client->sasl = NULL; #endif + client->wantClose = false; +} + +void virNetClientClose(virNetClientPtr client) +{ + if (!client) + return; + + virNetClientLock(client); + virNetClientCloseLocked(client); + virNetClientUnlock(client); +} + +void +virNetClientRequestClose(virNetClientPtr client) +{ + VIR_DEBUG("client=%p", client); + + virNetClientLock(client); + + /* If there is a thread polling for data on the socket, set wantClose flag + * and wake the thread up or just immediately close the socket when no-one + * is polling on it. + */ + if (client->waitDispatch) { + char ignore = 1; + int len = sizeof(ignore); + + client->wantClose = true; + if (safewrite(client->wakeupSendFD, &ignore, len) != len) + VIR_ERROR(_("failed to wake up polling thread")); + } else { + virNetClientCloseLocked(client); + } + virNetClientUnlock(client); } @@ -856,11 +897,12 @@ static int virNetClientIOEventLoop(virNetClientPtr client, int timeout = -1; bool discardNonBlocking; - /* If we have existing SASL decoded data we - * don't want to sleep in the poll(), just - * check if any other FDs are also ready + /* If we have existing SASL decoded data we don't want to sleep in + * the poll(), just check if any other FDs are also ready. + * If the connection is going to be closed, we don't want to sleep in + * poll() either. */ - if (virNetSocketHasCachedData(client->sock)) + if (virNetSocketHasCachedData(client->sock) || client->wantClose) timeout = 0; fds[0].events = fds[0].revents = 0; @@ -922,6 +964,11 @@ static int virNetClientIOEventLoop(virNetClientPtr client, if (virNetSocketHasCachedData(client->sock)) fds[0].revents |= POLLIN; + /* If wantClose flag is set, pretend there was an error on the socket + */ + if (client->wantClose) + fds[0].revents = POLLERR; + if (fds[1].revents) { VIR_DEBUG("Woken up from poll by other thread"); if (saferead(client->wakeupReadFD, &ignore, sizeof(ignore)) != sizeof(ignore)) { @@ -1041,6 +1088,8 @@ error: if (client->waitDispatch) { VIR_DEBUG("Passing the buck to %p", client->waitDispatch); virCondSignal(&client->waitDispatch->cond); + } else if (client->wantClose) { + virNetClientCloseLocked(client); } return -1; } @@ -1182,7 +1231,8 @@ static int virNetClientIO(virNetClientPtr client, virResetLastError(); rv = virNetClientIOEventLoop(client, thiscall); - virNetSocketUpdateIOCallback(client->sock, VIR_EVENT_HANDLE_READABLE); + if (client->sock) + virNetSocketUpdateIOCallback(client->sock, VIR_EVENT_HANDLE_READABLE); if (rv == 0 && virGetLastError()) @@ -1206,7 +1256,7 @@ void virNetClientIncomingEvent(virNetSocketPtr sock, goto done; /* This should be impossible, but it doesn't hurt to check */ - if (client->waitDispatch) + if (client->waitDispatch || client->wantClose) goto done; VIR_DEBUG("Event fired %p %d", sock, events); @@ -1251,6 +1301,12 @@ virNetClientSendInternal(virNetClientPtr client, virNetClientLock(client); + if (!client->sock || client->wantClose) { + virNetError(VIR_ERR_INTERNAL_ERROR, "%s", + _("Client socket is closed")); + goto unlock; + } + /* We don't need call->cond for non-blocking calls since there's no * thread to be woken up anyway */ @@ -1290,6 +1346,8 @@ cleanup: } else { VIR_FREE(call); } + +unlock: virNetClientUnlock(client); return ret; } -- 1.7.6.1

--- src/remote/remote_driver.c | 30 +++++++++++++ src/rpc/virnetclient.c | 102 ++++++++++++++++++++++++++++++++++++++++++- src/rpc/virnetclient.h | 5 ++ 3 files changed, 134 insertions(+), 3 deletions(-) diff --git a/src/remote/remote_driver.c b/src/remote/remote_driver.c index 1217d94..e218c40 100644 --- a/src/remote/remote_driver.c +++ b/src/remote/remote_driver.c @@ -4082,6 +4082,34 @@ done: } +static int +remoteAllowKeepAlive(virConnectPtr conn) +{ + struct private_data *priv = conn->privateData; + int ret = -1; + + remoteDriverLock(priv); + ret = virNetClientKeepAliveAdvertise(priv->client); + remoteDriverUnlock(priv); + + return ret; +} + + +static int +remoteStartKeepAlive(virConnectPtr conn, int interval, unsigned int count) +{ + struct private_data *priv = conn->privateData; + int ret = -1; + + remoteDriverLock(priv); + ret = virNetClientKeepAliveStart(priv->client, interval, count); + remoteDriverUnlock(priv); + + return ret; +} + + #include "remote_client_bodies.h" #include "qemu_client_bodies.h" @@ -4430,6 +4458,8 @@ static virDriver remote_driver = { .domainGetBlockJobInfo = remoteDomainGetBlockJobInfo, /* 0.9.4 */ .domainBlockJobSetSpeed = remoteDomainBlockJobSetSpeed, /* 0.9.4 */ .domainBlockPull = remoteDomainBlockPull, /* 0.9.4 */ + .allowKeepAlive = remoteAllowKeepAlive, /* 0.9.7 */ + .startKeepAlive = remoteStartKeepAlive, /* 0.9.7 */ }; static virNetworkDriver network_driver = { diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c index 45b0edb..dee5059 100644 --- a/src/rpc/virnetclient.c +++ b/src/rpc/virnetclient.c @@ -29,6 +29,7 @@ #include "virnetclient.h" #include "virnetsocket.h" +#include "virkeepalive.h" #include "memory.h" #include "threads.h" #include "virfile.h" @@ -93,11 +94,12 @@ struct _virNetClient { size_t nstreams; virNetClientStreamPtr *streams; + virKeepAlivePtr keepalive; bool wantClose; }; -void virNetClientRequestClose(virNetClientPtr client); +static void virNetClientRequestClose(virNetClientPtr client); static int virNetClientSendInternal(virNetClientPtr client, virNetMessagePtr msg, @@ -127,11 +129,71 @@ static void virNetClientEventFree(void *opaque) virNetClientFree(client); } +int +virNetClientKeepAliveAdvertise(virNetClientPtr client) +{ + virKeepAlivePtr ka; + int ret = -1; + + virNetClientLock(client); + if ((ka = client->keepalive)) + virKeepAliveRef(ka); + virNetClientUnlock(client); + + if (ka) { + ret = virKeepAliveAdvertise(ka); + virKeepAliveFree(ka); + } else { + virNetError(VIR_ERR_INTERNAL_ERROR, "%s", + _("the caller doesn't support keepalive protocol;" + " perhaps it's missing event loop implementation")); + } + + return ret; +} + +int +virNetClientKeepAliveStart(virNetClientPtr client, + int interval, + unsigned int count) +{ + int ret = -1; + + virNetClientLock(client); + + if (!client->keepalive) { + virNetError(VIR_ERR_INTERNAL_ERROR, "%s", + _("the caller doesn't support keepalive protocol;" + " perhaps it's missing event loop implementation")); + goto cleanup; + } + + ret = virKeepAliveStart(client->keepalive, interval, count); + +cleanup: + virNetClientUnlock(client); + return ret; +} + +static void +virNetClientKeepAliveDeadCB(void *opaque) +{ + virNetClientRequestClose(opaque); +} + +static int +virNetClientKeepAliveSendCB(void *opaque, + virNetMessagePtr msg) +{ + return virNetClientSendInternal(opaque, msg, false, true); +} + static virNetClientPtr virNetClientNew(virNetSocketPtr sock, const char *hostname) { virNetClientPtr client = NULL; int wakeupFD[2] = { -1, -1 }; + virKeepAlivePtr ka = NULL; if (pipe2(wakeupFD, O_CLOEXEC) < 0) { virReportSystemError(errno, "%s", @@ -164,9 +226,21 @@ static virNetClientPtr virNetClientNew(virNetSocketPtr sock, client, virNetClientEventFree) < 0) { client->refs--; - VIR_DEBUG("Failed to add event watch, disabling events"); + VIR_DEBUG("Failed to add event watch, disabling events and support for" + " keepalive messages"); + } else { + /* Keepalive protocol consists of async messages so it can only be used + * if the client supports them */ + if (!(ka = virKeepAliveNew(-1, 0, client, + virNetClientKeepAliveSendCB, + virNetClientKeepAliveDeadCB, + virNetClientEventFree))) + goto error; + /* keepalive object has a reference to client */ + client->refs++; } + client->keepalive = ka; VIR_DEBUG("client=%p refs=%d", client, client->refs); return client; @@ -175,6 +249,10 @@ no_memory: error: VIR_FORCE_CLOSE(wakeupFD[0]); VIR_FORCE_CLOSE(wakeupFD[1]); + if (ka) { + virKeepAliveStop(ka); + virKeepAliveFree(ka); + } virNetClientFree(client); return NULL; } @@ -304,6 +382,8 @@ void virNetClientFree(virNetClientPtr client) static void virNetClientCloseLocked(virNetClientPtr client) { + virKeepAlivePtr ka; + VIR_DEBUG("client=%p, sock=%p", client, client->sock); if (!client->sock) @@ -318,7 +398,20 @@ virNetClientCloseLocked(virNetClientPtr client) virNetSASLSessionFree(client->sasl); client->sasl = NULL; #endif + ka = client->keepalive; + client->keepalive = NULL; client->wantClose = false; + + if (ka) { + client->refs++; + virNetClientUnlock(client); + + virKeepAliveStop(ka); + virKeepAliveFree(ka); + + virNetClientLock(client); + client->refs--; + } } void virNetClientClose(virNetClientPtr client) @@ -331,7 +424,7 @@ void virNetClientClose(virNetClientPtr client) virNetClientUnlock(client); } -void +static void virNetClientRequestClose(virNetClientPtr client) { VIR_DEBUG("client=%p", client); @@ -723,6 +816,9 @@ virNetClientCallDispatch(virNetClientPtr client) client->msg.header.proc, client->msg.header.type, client->msg.header.status, client->msg.header.serial); + if (virKeepAliveCheckMessage(client->keepalive, &client->msg)) + return 0; + switch (client->msg.header.type) { case VIR_NET_REPLY: /* Normal RPC replies */ return virNetClientCallDispatchReply(client); diff --git a/src/rpc/virnetclient.h b/src/rpc/virnetclient.h index 1fabcfd..e337ee0 100644 --- a/src/rpc/virnetclient.h +++ b/src/rpc/virnetclient.h @@ -87,4 +87,9 @@ int virNetClientGetTLSKeySize(virNetClientPtr client); void virNetClientFree(virNetClientPtr client); void virNetClientClose(virNetClientPtr client); +int virNetClientKeepAliveAdvertise(virNetClientPtr client); +int virNetClientKeepAliveStart(virNetClientPtr client, + int interval, + unsigned int count); + #endif /* __VIR_NET_CLIENT_H__ */ -- 1.7.6.1

--- examples/domain-events/events-c/event-test.c | 11 +++++++++++ examples/domain-events/events-python/event-test.py | 3 +++ 2 files changed, 14 insertions(+), 0 deletions(-) diff --git a/examples/domain-events/events-c/event-test.c b/examples/domain-events/events-c/event-test.c index 6a3ed26..982907d 100644 --- a/examples/domain-events/events-c/event-test.c +++ b/examples/domain-events/events-c/event-test.c @@ -390,6 +390,17 @@ int main(int argc, char **argv) (callback5ret != -1) && (callback6ret != -1) && (callback7ret != -1)) { + int rc; + + if ((rc = virConnectAllowKeepAlive(dconn)) == 0) + rc = virConnectStartKeepAlive(dconn, 5, 3); + if (rc < 0) { + virErrorPtr err = virGetLastError(); + fprintf(stderr, "Failed to start keepalive protocol: %s\n", + err && err->message ? err->message : "Unknown error"); + run = 0; + } + while (run) { if (virEventRunDefaultImpl() < 0) { virErrorPtr err = virGetLastError(); diff --git a/examples/domain-events/events-python/event-test.py b/examples/domain-events/events-python/event-test.py index 76fda2b..11d73f4 100644 --- a/examples/domain-events/events-python/event-test.py +++ b/examples/domain-events/events-python/event-test.py @@ -518,6 +518,9 @@ def main(): vc.domainEventRegisterAny(None, libvirt.VIR_DOMAIN_EVENT_ID_WATCHDOG, myDomainEventWatchdogCallback, None) vc.domainEventRegisterAny(None, libvirt.VIR_DOMAIN_EVENT_ID_GRAPHICS, myDomainEventGraphicsCallback, None) + vc.allowKeepAlive() + vc.startKeepAlive(5, 3) + # The rest of your app would go here normally, but for sake # of demo we'll just go to sleep. The other option is to # run the event loop in your main thread if your app is -- 1.7.6.1

--- src/qemu/libvirtd_qemu.aug | 2 ++ src/qemu/qemu.conf | 16 ++++++++++++++++ src/qemu/qemu_conf.c | 11 +++++++++++ src/qemu/qemu_conf.h | 3 +++ src/qemu/qemu_migration.c | 10 ++++++++++ src/qemu/test_libvirtd_qemu.aug | 6 ++++++ 6 files changed, 48 insertions(+), 0 deletions(-) diff --git a/src/qemu/libvirtd_qemu.aug b/src/qemu/libvirtd_qemu.aug index 6c145c7..ad34e42 100644 --- a/src/qemu/libvirtd_qemu.aug +++ b/src/qemu/libvirtd_qemu.aug @@ -52,6 +52,8 @@ module Libvirtd_qemu = | int_entry "max_processes" | str_entry "lock_manager" | int_entry "max_queued" + | int_entry "keepalive_interval" + | int_entry "keepalive_count" (* Each enty in the config is one of the following three ... *) let entry = vnc_entry diff --git a/src/qemu/qemu.conf b/src/qemu/qemu.conf index 4da5d5a..e623bc3 100644 --- a/src/qemu/qemu.conf +++ b/src/qemu/qemu.conf @@ -316,3 +316,19 @@ # Note, that job lock is per domain. # # max_queued = 0 + +################################################################### +# Keepalive protocol: +# This allows qemu driver to detect broken connections to remote +# libvirtd during peer-to-peer migration. A keepalive message is +# sent to the deamon after keepalive_interval seconds of inactivity +# to check if the deamon is still responding; keepalive_count is a +# maximum number of keepalive messages that are allowed to be sent +# to the deamon without getting any response before the connection +# is considered broken. In other words, the connection is +# automatically closed approximately after +# keepalive_interval * (keepalive_count + 1) seconds since the last +# message received from the deamon. +# +#keepalive_interval = 5 +#keepalive_count = 5 diff --git a/src/qemu/qemu_conf.c b/src/qemu/qemu_conf.c index d1bf075..19f24b1 100644 --- a/src/qemu/qemu_conf.c +++ b/src/qemu/qemu_conf.c @@ -118,6 +118,9 @@ int qemudLoadDriverConfig(struct qemud_driver *driver, virLockManagerPluginNew("nop", NULL, 0))) return -1; + driver->keepAliveInterval = 5; + driver->keepAliveCount = 5; + /* Just check the file is readable before opening it, otherwise * libvirt emits an error. */ @@ -462,6 +465,14 @@ int qemudLoadDriverConfig(struct qemud_driver *driver, CHECK_TYPE("max_queued", VIR_CONF_LONG); if (p) driver->max_queued = p->l; + p = virConfGetValue(conf, "keepalive_interval"); + CHECK_TYPE("keepalive_interval", VIR_CONF_LONG); + if (p) driver->keepAliveInterval = p->l; + + p = virConfGetValue(conf, "keepalive_count"); + CHECK_TYPE("keepalive_count", VIR_CONF_LONG); + if (p) driver->keepAliveCount = p->l; + virConfFree (conf); return 0; } diff --git a/src/qemu/qemu_conf.h b/src/qemu/qemu_conf.h index e8b92a4..70c3fda 100644 --- a/src/qemu/qemu_conf.h +++ b/src/qemu/qemu_conf.h @@ -138,6 +138,9 @@ struct qemud_driver { * of guests which will be automatically killed * when the virConnectPtr is closed*/ virHashTablePtr autodestroy; + + int keepAliveInterval; + unsigned int keepAliveCount; }; typedef struct _qemuDomainCmdlineDef qemuDomainCmdlineDef; diff --git a/src/qemu/qemu_migration.c b/src/qemu/qemu_migration.c index 0a5a13d..301d8ba 100644 --- a/src/qemu/qemu_migration.c +++ b/src/qemu/qemu_migration.c @@ -2173,6 +2173,7 @@ static int doPeer2PeerMigrate(struct qemud_driver *driver, virConnectPtr dconn = NULL; bool p2p; virErrorPtr orig_err = NULL; + int rc; VIR_DEBUG("driver=%p, sconn=%p, vm=%p, xmlin=%s, dconnuri=%s, " "uri=%s, flags=%lx, dname=%s, resource=%lu", @@ -2193,6 +2194,15 @@ static int doPeer2PeerMigrate(struct qemud_driver *driver, } qemuDomainObjEnterRemoteWithDriver(driver, vm); + if ((rc = virConnectAllowKeepAlive(dconn)) == 0) { + rc = virConnectStartKeepAlive(dconn, driver->keepAliveInterval, + driver->keepAliveCount); + } + qemuDomainObjExitRemoteWithDriver(driver, vm); + if (rc < 0) + goto cleanup; + + qemuDomainObjEnterRemoteWithDriver(driver, vm); p2p = VIR_DRV_SUPPORTS_FEATURE(dconn->driver, dconn, VIR_DRV_FEATURE_MIGRATION_P2P); /* v3proto reflects whether the caller used Perform3, but with diff --git a/src/qemu/test_libvirtd_qemu.aug b/src/qemu/test_libvirtd_qemu.aug index b1f9114..f7476ae 100644 --- a/src/qemu/test_libvirtd_qemu.aug +++ b/src/qemu/test_libvirtd_qemu.aug @@ -115,6 +115,9 @@ vnc_auto_unix_socket = 1 max_processes = 12345 lock_manager = \"fcntl\" + +keepalive_interval = 1 +keepalive_count = 42 " test Libvirtd_qemu.lns get conf = @@ -240,3 +243,6 @@ lock_manager = \"fcntl\" { "max_processes" = "12345" } { "#empty" } { "lock_manager" = "fcntl" } +{ "#empty" } +{ "keepalive_interval" = "1" } +{ "keepalive_count" = "42" } -- 1.7.6.1
participants (1)
-
Jiri Denemark