[libvirt] [PATCH v4 00/13] Implement keepalive protocol for libvirt RPC

This patchset can also be found at https://gitorious.org/~jirka/libvirt/jirka-staging/commits/keepalive This allows us to detect broken connections between server and client without waiting for TCP timeout and dead deamon/client. By default a connection is considered broken after about 30 seconds of no messages received from remote party. After that period, the connection is automatically closed. The main reason for implementing this is that peer-to-peer migration can now be canceled when a connection between source and target breaks. Although this will really work only after qemu fixes migrate_cancel command so that it doesn't block when outgoing TCP buffers are full. Version 4 addresses comments from Daniel and Matthias. Although most of the patches were already (conditionally) acked, I'm sending all of them to provide a complete picture of the change. Patches that were already acked are explicitly marked so in the Notes section so anyone can just skip them if they like. Jiri Denemark (13): Define keepalive protocol Implement common keepalive handling Introduce virConnectSetKeepAlive virsh: Always run event loop Implement keepalive protocol in libvirt daemon Add support for non-blocking calls in client RPC Add support for async close of client RPC socket Implement keepalive protocol in remote driver Introduce virConnectIsAlive API Implement virConnectIsAlive in all drivers Add keepalive support into domain-events examples qemu: Add support for keepalive messages during p2p migration qemu: Cancel p2p migration when connection breaks .gitignore | 1 + daemon/libvirtd.aug | 5 + daemon/libvirtd.c | 15 + daemon/libvirtd.conf | 25 ++ daemon/libvirtd.h | 1 + daemon/remote.c | 48 ++- examples/domain-events/events-c/event-test.c | 9 +- examples/domain-events/events-python/event-test.py | 4 +- include/libvirt/libvirt.h.in | 5 + po/POTFILES.in | 1 + src/Makefile.am | 20 +- src/driver.h | 8 + src/esx/esx_driver.c | 18 + src/hyperv/hyperv_driver.c | 18 + src/libvirt.c | 93 ++++ src/libvirt_internal.h | 10 +- src/libvirt_private.syms | 2 + src/libvirt_public.syms | 2 + src/libxl/libxl_driver.c | 8 + src/lxc/lxc_driver.c | 7 + src/openvz/openvz_driver.c | 7 + src/phyp/phyp_driver.c | 18 + src/probes.d | 12 + src/qemu/libvirtd_qemu.aug | 2 + src/qemu/qemu.conf | 22 + src/qemu/qemu_conf.c | 11 + src/qemu/qemu_conf.h | 3 + src/qemu/qemu_driver.c | 6 + src/qemu/qemu_migration.c | 43 ++- src/qemu/test_libvirtd_qemu.aug | 6 + src/remote/remote_driver.c | 70 +++ src/remote/remote_protocol.x | 2 +- src/rpc/virkeepalive.c | 448 ++++++++++++++++++++ src/rpc/virkeepalive.h | 56 +++ src/rpc/virkeepaliveprotocol.x | 7 + src/rpc/virnetclient.c | 428 ++++++++++++++++--- src/rpc/virnetclient.h | 6 + src/rpc/virnetserver.c | 22 + src/rpc/virnetserver.h | 5 + src/rpc/virnetserverclient.c | 143 ++++++- src/rpc/virnetserverclient.h | 7 + src/test/test_driver.c | 6 + src/uml/uml_driver.c | 7 + src/util/event.c | 6 +- src/vbox/vbox_tmpl.c | 6 + src/vmware/vmware_driver.c | 7 + src/xen/xen_driver.c | 8 + src/xenapi/xenapi_driver.c | 12 + tools/console.c | 17 +- tools/virsh.c | 31 ++ 50 files changed, 1621 insertions(+), 103 deletions(-) create mode 100644 src/rpc/virkeepalive.c create mode 100644 src/rpc/virkeepalive.h create mode 100644 src/rpc/virkeepaliveprotocol.x -- 1.7.7.1

The keepalive program has two procedures: PING, and PONG. Both are used only in asynchronous messages and the sender doesn't wait for any reply. However, the party which receives PING messages is supposed to react by sending PONG message the other party, but no explicit binding between PING and PONG messages is made. For backward compatibility neither server nor client are allowed to send keepalive messages before checking that remote party supports them. --- Notes: Version 3 was ACKed provided I do the following changes: Version 4: - update systemptap functions with the new protocol - s/KEEPALIVE_VERSION/KEEPALIVE_PROTOCOL_VERSION/ to make stp generator happy Version 3: - remove ADVERTISE message which is no longer used Version 2: - no change .gitignore | 1 + src/Makefile.am | 19 ++++++++++++++----- src/rpc/virkeepaliveprotocol.x | 7 +++++++ 3 files changed, 22 insertions(+), 5 deletions(-) create mode 100644 src/rpc/virkeepaliveprotocol.x diff --git a/.gitignore b/.gitignore index 0552b52..61a9a38 100644 --- a/.gitignore +++ b/.gitignore @@ -65,6 +65,7 @@ /src/locking/qemu-sanlock.conf /src/remote/*_client_bodies.h /src/remote/*_protocol.[ch] +/src/rpc/virkeepaliveprotocol.[ch] /src/rpc/virnetprotocol.[ch] /src/util/virkeymaps.h /tests/*.log diff --git a/src/Makefile.am b/src/Makefile.am index 81ec730..872ac37 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -274,7 +274,8 @@ PDWTAGS = \ PROTOCOL_STRUCTS = \ $(srcdir)/remote_protocol-structs \ $(srcdir)/qemu_protocol-structs \ - $(srcdir)/virnetprotocol-structs + $(srcdir)/virnetprotocol-structs \ + $(srcdir)/virkeepaliveprotocol-structs if WITH_REMOTE # The .o file that pdwtags parses is created as a side effect of running # libtool; but from make's perspective we depend on the .lo file. @@ -282,6 +283,7 @@ $(srcdir)/%_protocol-structs: libvirt_driver_remote_la-%_protocol.lo $(PDWTAGS) $(srcdir)/virnetprotocol-structs: libvirt_net_rpc_la-virnetprotocol.lo $(PDWTAGS) +$(srcdir)/virkeepaliveprotocol-structs: libvirt_net_rpc_la-virkeepaliveprotocol.lo else !WITH_REMOTE # These generated files must live in git, because they cannot be re-generated # when configured --without-remote. @@ -1282,7 +1284,10 @@ probes.h: probes.d probes.o: probes.d $(AM_V_GEN)$(DTRACE) -o $@ -G -s $< -RPC_PROBE_FILES = $(srcdir)/rpc/virnetprotocol.x $(srcdir)/remote/remote_protocol.x $(srcdir)/remote/qemu_protocol.x +RPC_PROBE_FILES = $(srcdir)/rpc/virnetprotocol.x \ + $(srcdir)/rpc/virkeepaliveprotocol.x \ + $(srcdir)/remote/remote_protocol.x \ + $(srcdir)/remote/qemu_protocol.x libvirt_functions.stp: $(RPC_PROBE_FILES) $(srcdir)/rpc/gensystemtap.pl $(AM_V_GEN)perl -w $(srcdir)/rpc/gensystemtap.pl $(RPC_PROBE_FILES) > $@ @@ -1361,11 +1366,14 @@ EXTRA_DIST += \ rpc/gendispatch.pl \ rpc/genprotocol.pl \ rpc/gensystemtap.pl \ - rpc/virnetprotocol.x + rpc/virnetprotocol.x \ + rpc/virkeepaliveprotocol.x VIR_NET_RPC_GENERATED = \ $(srcdir)/rpc/virnetprotocol.h \ - $(srcdir)/rpc/virnetprotocol.c + $(srcdir)/rpc/virnetprotocol.c \ + $(srcdir)/rpc/virkeepaliveprotocol.h \ + $(srcdir)/rpc/virkeepaliveprotocol.c BUILT_SOURCES += $(VIR_NET_RPC_GENERATED) @@ -1373,7 +1381,8 @@ libvirt_net_rpc_la_SOURCES = \ rpc/virnetmessage.h rpc/virnetmessage.c \ rpc/virnetprotocol.h rpc/virnetprotocol.c \ rpc/virnetsocket.h rpc/virnetsocket.c \ - rpc/virnettlscontext.h rpc/virnettlscontext.c + rpc/virnettlscontext.h rpc/virnettlscontext.c \ + rpc/virkeepaliveprotocol.h rpc/virkeepaliveprotocol.c if HAVE_SASL libvirt_net_rpc_la_SOURCES += \ rpc/virnetsaslcontext.h rpc/virnetsaslcontext.c diff --git a/src/rpc/virkeepaliveprotocol.x b/src/rpc/virkeepaliveprotocol.x new file mode 100644 index 0000000..5f158cf --- /dev/null +++ b/src/rpc/virkeepaliveprotocol.x @@ -0,0 +1,7 @@ +const KEEPALIVE_PROGRAM = 0x6b656570; +const KEEPALIVE_PROTOCOL_VERSION = 1; + +enum keepalive_procedure { + KEEPALIVE_PROC_PING = 1, + KEEPALIVE_PROC_PONG = 2 +}; -- 1.7.7.1

On Thu, Oct 27, 2011 at 06:05:37PM +0200, Jiri Denemark wrote:
The keepalive program has two procedures: PING, and PONG. Both are used only in asynchronous messages and the sender doesn't wait for any reply. However, the party which receives PING messages is supposed to react by sending PONG message the other party, but no explicit binding between PING and PONG messages is made. For backward compatibility neither server nor client are allowed to send keepalive messages before checking that remote party supports them. --- Notes: Version 3 was ACKed provided I do the following changes:
Version 4: - update systemptap functions with the new protocol - s/KEEPALIVE_VERSION/KEEPALIVE_PROTOCOL_VERSION/ to make stp generator happy
Version 3: - remove ADVERTISE message which is no longer used
Version 2: - no change
.gitignore | 1 + src/Makefile.am | 19 ++++++++++++++----- src/rpc/virkeepaliveprotocol.x | 7 +++++++ 3 files changed, 22 insertions(+), 5 deletions(-) create mode 100644 src/rpc/virkeepaliveprotocol.x
ACK Daniel -- |: http://berrange.com -o- http://www.flickr.com/photos/dberrange/ :| |: http://libvirt.org -o- http://virt-manager.org :| |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :| |: http://entangle-photo.org -o- http://live.gnome.org/gtk-vnc :|

These APIs are used by both client and server RPC layer to handle processing of keepalive messages. --- Notes: Version 3 was ACKed provided I cleare some things up (which I believe I did in v4). I also tried to get the bonus points :-) Version 4: - s/KEEPALIVE_VERSION/KEEPALIVE_PROTOCOL_VERSION/ - always free message in virKeepAliveSend - free ka->response in virKeepAliveStop - systemtap probes Version 3: - remove ADVERTISE message handling Version 2: - no change po/POTFILES.in | 1 + src/Makefile.am | 3 +- src/probes.d | 12 ++ src/rpc/virkeepalive.c | 448 ++++++++++++++++++++++++++++++++++++++++++++++++ src/rpc/virkeepalive.h | 56 ++++++ 5 files changed, 519 insertions(+), 1 deletions(-) create mode 100644 src/rpc/virkeepalive.c create mode 100644 src/rpc/virkeepalive.h diff --git a/po/POTFILES.in b/po/POTFILES.in index 5ce35ae..71254dd 100644 --- a/po/POTFILES.in +++ b/po/POTFILES.in @@ -72,6 +72,7 @@ src/qemu/qemu_monitor_text.c src/qemu/qemu_process.c src/remote/remote_client_bodies.h src/remote/remote_driver.c +src/rpc/virkeepalive.c src/rpc/virnetclient.c src/rpc/virnetclientprogram.c src/rpc/virnetclientstream.c diff --git a/src/Makefile.am b/src/Makefile.am index 872ac37..33cc5f8 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -1382,7 +1382,8 @@ libvirt_net_rpc_la_SOURCES = \ rpc/virnetprotocol.h rpc/virnetprotocol.c \ rpc/virnetsocket.h rpc/virnetsocket.c \ rpc/virnettlscontext.h rpc/virnettlscontext.c \ - rpc/virkeepaliveprotocol.h rpc/virkeepaliveprotocol.c + rpc/virkeepaliveprotocol.h rpc/virkeepaliveprotocol.c \ + rpc/virkeepalive.h rpc/virkeepalive.c if HAVE_SASL libvirt_net_rpc_la_SOURCES += \ rpc/virnetsaslcontext.h rpc/virnetsaslcontext.c diff --git a/src/probes.d b/src/probes.d index 7f66ac0..d84ebf0 100644 --- a/src/probes.d +++ b/src/probes.d @@ -70,6 +70,18 @@ provider libvirt { probe rpc_tls_session_handshake_fail(void *sess); + # file: src/rpc/virkeepalive.c + # prefix: rpc + probe rpc_keepalive_new(void *ka, void *client, int refs); + probe rpc_keepalive_ref(void *ka, void *client, int refs); + probe rpc_keepalive_free(void *ka, void *client, int refs); + probe rpc_keepalive_start(void *ka, void *client, int interval, int count); + probe rpc_keepalive_stop(void *ka, void *client); + probe rpc_keepalive_send(void *ka, void *client, int prog, int vers, int proc); + probe rpc_keepalive_received(void *ka, void *client, int prog, int vers, int proc); + probe rpc_keepalive_timeout(void *ka, void *client, int coundToDeath, int idle); + + # file: src/qemu/qemu_monitor.c # prefix: qemu # binary: libvirtd diff --git a/src/rpc/virkeepalive.c b/src/rpc/virkeepalive.c new file mode 100644 index 0000000..06b8e63 --- /dev/null +++ b/src/rpc/virkeepalive.c @@ -0,0 +1,448 @@ +/* + * virkeepalive.c: keepalive handling + * + * Copyright (C) 2011 Red Hat, Inc. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + * + * Author: Jiri Denemark <jdenemar@redhat.com> + */ + +#include <config.h> + +#include "memory.h" +#include "threads.h" +#include "virfile.h" +#include "logging.h" +#include "util.h" +#include "virterror_internal.h" +#include "virnetsocket.h" +#include "virkeepaliveprotocol.h" +#include "virkeepalive.h" + +#define VIR_FROM_THIS VIR_FROM_RPC +#define virNetError(code, ...) \ + virReportErrorHelper(VIR_FROM_THIS, code, __FILE__, \ + __FUNCTION__, __LINE__, __VA_ARGS__) + +struct _virKeepAlive { + int refs; + virMutex lock; + + int interval; + unsigned int count; + unsigned int countToDeath; + time_t lastPacketReceived; + int timer; + + virNetMessagePtr response; + int responseTimer; + + virKeepAliveSendFunc sendCB; + virKeepAliveDeadFunc deadCB; + virKeepAliveFreeFunc freeCB; + void *client; +}; + + +static void +virKeepAliveLock(virKeepAlivePtr ka) +{ + virMutexLock(&ka->lock); +} + +static void +virKeepAliveUnlock(virKeepAlivePtr ka) +{ + virMutexUnlock(&ka->lock); +} + + +static virNetMessagePtr +virKeepAliveMessage(int proc) +{ + virNetMessagePtr msg; + + if (!(msg = virNetMessageNew(false))) + return NULL; + + msg->header.prog = KEEPALIVE_PROGRAM; + msg->header.vers = KEEPALIVE_PROTOCOL_VERSION; + msg->header.type = VIR_NET_MESSAGE; + msg->header.proc = proc; + + if (virNetMessageEncodeHeader(msg) < 0 || + virNetMessageEncodePayloadEmpty(msg) < 0) { + virNetMessageFree(msg); + return NULL; + } + + return msg; +} + + +static void +virKeepAliveSend(virKeepAlivePtr ka, virNetMessagePtr msg) +{ + const char *proc = NULL; + void *client = ka->client; + virKeepAliveSendFunc sendCB = ka->sendCB; + + switch (msg->header.proc) { + case KEEPALIVE_PROC_PING: + proc = "request"; + break; + case KEEPALIVE_PROC_PONG: + proc = "response"; + break; + } + + if (!proc) { + VIR_WARN("Refusing to send unknown keepalive message: %d", + msg->header.proc); + virNetMessageFree(msg); + return; + } + + VIR_DEBUG("Sending keepalive %s to client %p", proc, ka->client); + PROBE(RPC_KEEPALIVE_SEND, + "ka=%p client=%p prog=%d vers=%d proc=%d", + ka, ka->client, msg->header.prog, msg->header.vers, msg->header.proc); + + ka->refs++; + virKeepAliveUnlock(ka); + + if (sendCB(client, msg) < 0) { + VIR_WARN("Failed to send keepalive %s to client %p", proc, client); + virNetMessageFree(msg); + } + + virKeepAliveLock(ka); + ka->refs--; +} + + +static void +virKeepAliveScheduleResponse(virKeepAlivePtr ka) +{ + if (ka->responseTimer == -1) + return; + + VIR_DEBUG("Scheduling keepalive response to client %p", ka->client); + + if (!ka->response && + !(ka->response = virKeepAliveMessage(KEEPALIVE_PROC_PONG))) { + VIR_WARN("Failed to generate keepalive response"); + return; + } + + virEventUpdateTimeout(ka->responseTimer, 0); +} + + +static void +virKeepAliveTimer(int timer ATTRIBUTE_UNUSED, void *opaque) +{ + virKeepAlivePtr ka = opaque; + time_t now = time(NULL); + + virKeepAliveLock(ka); + + PROBE(RPC_KEEPALIVE_TIMEOUT, + "ka=%p client=%p countToDeath=%d idle=%d", + ka, ka->client, ka->countToDeath, + (int) (now - ka->lastPacketReceived)); + + if (now - ka->lastPacketReceived < ka->interval - 1) { + int timeout = ka->interval - (now - ka->lastPacketReceived); + virEventUpdateTimeout(ka->timer, timeout * 1000); + goto cleanup; + } + + if (ka->countToDeath == 0) { + virKeepAliveDeadFunc deadCB = ka->deadCB; + void *client = ka->client; + + VIR_WARN("No response from client %p after %d keepalive messages in" + " %d seconds", + ka->client, + ka->count, + (int) (now - ka->lastPacketReceived)); + ka->refs++; + virKeepAliveUnlock(ka); + deadCB(client); + virKeepAliveLock(ka); + ka->refs--; + } else { + virNetMessagePtr msg; + + ka->countToDeath--; + if (!(msg = virKeepAliveMessage(KEEPALIVE_PROC_PING))) + VIR_WARN("Failed to generate keepalive request"); + else + virKeepAliveSend(ka, msg); + virEventUpdateTimeout(ka->timer, ka->interval * 1000); + } + +cleanup: + virKeepAliveUnlock(ka); +} + + +static void +virKeepAliveResponseTimer(int timer ATTRIBUTE_UNUSED, void *opaque) +{ + virKeepAlivePtr ka = opaque; + virNetMessagePtr msg; + + virKeepAliveLock(ka); + + VIR_DEBUG("ka=%p, client=%p, response=%p", + ka, ka->client, ka->response); + + if (ka->response) { + msg = ka->response; + ka->response = NULL; + virKeepAliveSend(ka, msg); + } + + virEventUpdateTimeout(ka->responseTimer, ka->response ? 0 : -1); + + virKeepAliveUnlock(ka); +} + + +static void +virKeepAliveTimerFree(void *opaque) +{ + virKeepAliveFree(opaque); +} + + +virKeepAlivePtr +virKeepAliveNew(int interval, + unsigned int count, + void *client, + virKeepAliveSendFunc sendCB, + virKeepAliveDeadFunc deadCB, + virKeepAliveFreeFunc freeCB) +{ + virKeepAlivePtr ka; + + VIR_DEBUG("client=%p, interval=%d, count=%u", client, interval, count); + + if (VIR_ALLOC(ka) < 0) { + virReportOOMError(); + return NULL; + } + + if (virMutexInit(&ka->lock) < 0) { + VIR_FREE(ka); + return NULL; + } + + ka->refs = 1; + ka->interval = interval; + ka->count = count; + ka->countToDeath = count; + ka->timer = -1; + ka->client = client; + ka->sendCB = sendCB; + ka->deadCB = deadCB; + ka->freeCB = freeCB; + + ka->responseTimer = virEventAddTimeout(-1, virKeepAliveResponseTimer, + ka, virKeepAliveTimerFree); + if (ka->responseTimer < 0) { + virKeepAliveFree(ka); + return NULL; + } + /* the timer now has a reference to ka */ + ka->refs++; + + PROBE(RPC_KEEPALIVE_NEW, + "ka=%p client=%p refs=%d", + ka, ka->client, ka->refs); + + return ka; +} + + +void +virKeepAliveRef(virKeepAlivePtr ka) +{ + virKeepAliveLock(ka); + ka->refs++; + PROBE(RPC_KEEPALIVE_REF, + "ka=%p client=%p refs=%d", + ka, ka->client, ka->refs); + virKeepAliveUnlock(ka); +} + + +void +virKeepAliveFree(virKeepAlivePtr ka) +{ + if (!ka) + return; + + virKeepAliveLock(ka); + PROBE(RPC_KEEPALIVE_FREE, + "ka=%p client=%p refs=%d", + ka, ka->client, ka->refs); + + if (--ka->refs > 0) { + virKeepAliveUnlock(ka); + return; + } + + virMutexDestroy(&ka->lock); + ka->freeCB(ka->client); + VIR_FREE(ka); +} + + +int +virKeepAliveStart(virKeepAlivePtr ka, + int interval, + unsigned int count) +{ + int ret = -1; + time_t delay; + int timeout; + + virKeepAliveLock(ka); + + if (ka->timer >= 0) { + VIR_DEBUG("Keepalive messages already enabled"); + ret = 0; + goto cleanup; + } + + if (interval > 0) { + if (ka->interval > 0) { + virNetError(VIR_ERR_INTERNAL_ERROR, "%s", + _("keepalive interval already set")); + goto cleanup; + } + ka->interval = interval; + ka->count = count; + ka->countToDeath = count; + } + + if (ka->interval <= 0) { + VIR_DEBUG("Keepalive messages disabled by configuration"); + ret = 0; + goto cleanup; + } + + PROBE(RPC_KEEPALIVE_START, + "ka=%p client=%p interval=%d count=%u", + ka, ka->client, interval, count); + + delay = time(NULL) - ka->lastPacketReceived; + if (delay > ka->interval) + timeout = 0; + else + timeout = ka->interval - delay; + ka->timer = virEventAddTimeout(timeout * 1000, virKeepAliveTimer, + ka, virKeepAliveTimerFree); + if (ka->timer < 0) + goto cleanup; + + /* the timer now has another reference to this object */ + ka->refs++; + ret = 0; + +cleanup: + virKeepAliveUnlock(ka); + return ret; +} + + +void +virKeepAliveStop(virKeepAlivePtr ka) +{ + virKeepAliveLock(ka); + + PROBE(RPC_KEEPALIVE_STOP, + "ka=%p client=%p", + ka, ka->client); + + if (ka->timer > 0) { + virEventRemoveTimeout(ka->timer); + ka->timer = -1; + } + + if (ka->responseTimer > 0) { + virEventRemoveTimeout(ka->responseTimer); + ka->responseTimer = -1; + } + + virNetMessageFree(ka->response); + ka->response = NULL; + + virKeepAliveUnlock(ka); +} + + +bool +virKeepAliveCheckMessage(virKeepAlivePtr ka, + virNetMessagePtr msg) +{ + bool ret = false; + + VIR_DEBUG("ka=%p, client=%p, msg=%p", + ka, ka ? ka->client : "(null)", msg); + + if (!ka) + return false; + + virKeepAliveLock(ka); + + ka->countToDeath = ka->count; + ka->lastPacketReceived = time(NULL); + + if (msg->header.prog == KEEPALIVE_PROGRAM && + msg->header.vers == KEEPALIVE_PROTOCOL_VERSION && + msg->header.type == VIR_NET_MESSAGE) { + PROBE(RPC_KEEPALIVE_RECEIVED, + "ka=%p client=%p prog=%d vers=%d proc=%d", + ka, ka->client, msg->header.prog, + msg->header.vers, msg->header.proc); + ret = true; + switch (msg->header.proc) { + case KEEPALIVE_PROC_PING: + VIR_DEBUG("Got keepalive request from client %p", ka->client); + virKeepAliveScheduleResponse(ka); + break; + + case KEEPALIVE_PROC_PONG: + VIR_DEBUG("Got keepalive response from client %p", ka->client); + break; + + default: + VIR_DEBUG("Ignoring unknown keepalive message %d from client %p", + msg->header.proc, ka->client); + } + } + + if (ka->timer >= 0) + virEventUpdateTimeout(ka->timer, ka->interval * 1000); + + virKeepAliveUnlock(ka); + + return ret; +} diff --git a/src/rpc/virkeepalive.h b/src/rpc/virkeepalive.h new file mode 100644 index 0000000..f1654eb --- /dev/null +++ b/src/rpc/virkeepalive.h @@ -0,0 +1,56 @@ +/* + * virkeepalive.h: keepalive handling + * + * Copyright (C) 2011 Red Hat, Inc. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + * + * Author: Jiri Denemark <jdenemar@redhat.com> + */ + +#ifndef __VIR_KEEPALIVE_H__ +# define __VIR_KEEPALIVE_H__ + +# include "virnetmessage.h" + +typedef int (*virKeepAliveSendFunc)(void *client, virNetMessagePtr msg); +typedef void (*virKeepAliveDeadFunc)(void *client); +typedef void (*virKeepAliveFreeFunc)(void *client); + +typedef struct _virKeepAlive virKeepAlive; +typedef virKeepAlive *virKeepAlivePtr; + + +virKeepAlivePtr virKeepAliveNew(int interval, + unsigned int count, + void *client, + virKeepAliveSendFunc sendCB, + virKeepAliveDeadFunc deadCB, + virKeepAliveFreeFunc freeCB) + ATTRIBUTE_NONNULL(3) ATTRIBUTE_NONNULL(4) + ATTRIBUTE_NONNULL(5) ATTRIBUTE_NONNULL(6); + +void virKeepAliveRef(virKeepAlivePtr ka); +void virKeepAliveFree(virKeepAlivePtr ka); + +int virKeepAliveStart(virKeepAlivePtr ka, + int interval, + unsigned int count); +void virKeepAliveStop(virKeepAlivePtr ka); + +bool virKeepAliveCheckMessage(virKeepAlivePtr ka, + virNetMessagePtr msg); + +#endif /* __VIR_KEEPALIVE_H__ */ -- 1.7.7.1

On Thu, Oct 27, 2011 at 06:05:38PM +0200, Jiri Denemark wrote:
These APIs are used by both client and server RPC layer to handle processing of keepalive messages. --- Notes: Version 3 was ACKed provided I cleare some things up (which I believe I did in v4). I also tried to get the bonus points :-)
Version 4: - s/KEEPALIVE_VERSION/KEEPALIVE_PROTOCOL_VERSION/ - always free message in virKeepAliveSend - free ka->response in virKeepAliveStop - systemtap probes
Version 3: - remove ADVERTISE message handling
Version 2: - no change
po/POTFILES.in | 1 + src/Makefile.am | 3 +- src/probes.d | 12 ++ src/rpc/virkeepalive.c | 448 ++++++++++++++++++++++++++++++++++++++++++++++++ src/rpc/virkeepalive.h | 56 ++++++ 5 files changed, 519 insertions(+), 1 deletions(-) create mode 100644 src/rpc/virkeepalive.c create mode 100644 src/rpc/virkeepalive.h
ACK Daniel -- |: http://berrange.com -o- http://www.flickr.com/photos/dberrange/ :| |: http://libvirt.org -o- http://virt-manager.org :| |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :| |: http://entangle-photo.org -o- http://live.gnome.org/gtk-vnc :|

virConnectSetKeepAlive public API can be used by a client connecting to remote server to start using keepalive protocol. The API is handled directly by remote driver and not transmitted over the wire to the server. --- Notes: Version 3 ACKed. Version 4: - explicitly document the semantics of interval <= 0 and count = 0 Version 3: - remove virConnectAllowKeepAlive - rename virConnectStartKeepAlive as virConnectSetKeepAlive - add a note to virEventRegisterImpl that running the event loop is mandatory once registered Version 2: - no change include/libvirt/libvirt.h.in | 4 +++ src/driver.h | 5 +++ src/libvirt.c | 57 ++++++++++++++++++++++++++++++++++++++++++ src/libvirt_internal.h | 10 ++++++- src/libvirt_public.syms | 1 + src/util/event.c | 6 ++-- 6 files changed, 78 insertions(+), 5 deletions(-) diff --git a/include/libvirt/libvirt.h.in b/include/libvirt/libvirt.h.in index 7102bce..8ce4335 100644 --- a/include/libvirt/libvirt.h.in +++ b/include/libvirt/libvirt.h.in @@ -3275,6 +3275,10 @@ typedef struct _virTypedParameter virMemoryParameter; */ typedef virMemoryParameter *virMemoryParameterPtr; +int virConnectSetKeepAlive(virConnectPtr conn, + int interval, + unsigned int count); + #ifdef __cplusplus } #endif diff --git a/src/driver.h b/src/driver.h index b899d0e..c1223c1 100644 --- a/src/driver.h +++ b/src/driver.h @@ -735,6 +735,10 @@ typedef int (*virDrvDomainBlockPull)(virDomainPtr dom, const char *path, unsigned long bandwidth, unsigned int flags); +typedef int + (*virDrvSetKeepAlive)(virConnectPtr conn, + int interval, + unsigned int count); /** * _virDriver: @@ -893,6 +897,7 @@ struct _virDriver { virDrvDomainGetBlockJobInfo domainGetBlockJobInfo; virDrvDomainBlockJobSetSpeed domainBlockJobSetSpeed; virDrvDomainBlockPull domainBlockPull; + virDrvSetKeepAlive setKeepAlive; }; typedef int diff --git a/src/libvirt.c b/src/libvirt.c index a6bcee6..0c33da1 100644 --- a/src/libvirt.c +++ b/src/libvirt.c @@ -16964,3 +16964,60 @@ error: virDispatchError(dom->conn); return -1; } + +/** + * virConnectSetKeepAlive: + * @conn: pointer to a hypervisor connection + * @interval: number of seconds of inactivity before a keepalive message is sent + * @count: number of messages that can be sent in a row + * + * Start sending keepalive messages after interval second of inactivity and + * consider the connection to be broken when no response is received after + * count keepalive messages sent in a row. In other words, sending count + 1 + * keepalive message results in closing the connection. When interval is <= 0, + * no keepalive messages will be sent. When count is 0, the connection will be + * automatically closed after interval seconds of inactivity without sending + * any keepalive messages. + * + * Note: client has to implement and run event loop to be able to use keepalive + * messages. Failure to do so may result in connections being closed + * unexpectedly. + * + * Returns -1 on error, 0 on success, 1 when remote party doesn't support + * keepalive messages. + */ +int virConnectSetKeepAlive(virConnectPtr conn, + int interval, + unsigned int count) +{ + int ret = -1; + + VIR_DEBUG("conn=%p, interval=%d, count=%u", conn, interval, count); + + virResetLastError(); + + if (!VIR_IS_CONNECT(conn)) { + virLibConnError(VIR_ERR_INVALID_CONN, __FUNCTION__); + virDispatchError(NULL); + return -1; + } + + if (interval <= 0) { + virLibConnError(VIR_ERR_INVALID_ARG, + _("negative or zero interval make no sense")); + goto error; + } + + if (conn->driver->setKeepAlive) { + ret = conn->driver->setKeepAlive(conn, interval, count); + if (ret < 0) + goto error; + return ret; + } + + virLibConnError(VIR_ERR_NO_SUPPORT, __FUNCTION__); + +error: + virDispatchError(conn); + return -1; +} diff --git a/src/libvirt_internal.h b/src/libvirt_internal.h index 6e44341..dbbf7e0 100644 --- a/src/libvirt_internal.h +++ b/src/libvirt_internal.h @@ -39,8 +39,8 @@ int virStateActive(void); * * The remote driver passes features through to the real driver at the * remote end unmodified, except if you query a VIR_DRV_FEATURE_REMOTE* - * feature. - * + * feature. Queries for VIR_DRV_FEATURE_PROGRAM* features are answered + * directly by the RPC layer and not by the real driver. */ enum { /* Driver supports V1-style virDomainMigrate, ie. domainMigratePrepare/ @@ -79,6 +79,12 @@ enum { * to domain configuration, i.e., starting from Begin3 and not Perform3. */ VIR_DRV_FEATURE_MIGRATE_CHANGE_PROTECTION = 7, + + /* + * Remote party supports keepalive program (i.e., sending keepalive + * messages). + */ + VIR_DRV_FEATURE_PROGRAM_KEEPALIVE = 8, }; diff --git a/src/libvirt_public.syms b/src/libvirt_public.syms index 9762fc4..468e28a 100644 --- a/src/libvirt_public.syms +++ b/src/libvirt_public.syms @@ -491,6 +491,7 @@ LIBVIRT_0.9.5 { LIBVIRT_0.9.7 { global: + virConnectSetKeepAlive; virDomainReset; virDomainSnapshotGetParent; virDomainSnapshotListChildrenNames; diff --git a/src/util/event.c b/src/util/event.c index bd781ec..495a1f3 100644 --- a/src/util/event.c +++ b/src/util/event.c @@ -193,9 +193,9 @@ void virEventRegisterImpl(virEventAddHandleFunc addHandle, * not have a need to integrate with an external event * loop impl. * - * Once registered, the application can invoke - * virEventRunDefaultImpl in a loop to process - * events + * Once registered, the application has to invoke virEventRunDefaultImpl in + * a loop to process events. Failure to do so may result in connections being + * closed unexpectedly as a result of keepalive timeout. * * Returns 0 on success, -1 on failure. */ -- 1.7.7.1

On Thu, Oct 27, 2011 at 06:05:39PM +0200, Jiri Denemark wrote:
virConnectSetKeepAlive public API can be used by a client connecting to remote server to start using keepalive protocol. The API is handled directly by remote driver and not transmitted over the wire to the server. --- Notes: Version 3 ACKed.
Version 4: - explicitly document the semantics of interval <= 0 and count = 0
Version 3: - remove virConnectAllowKeepAlive - rename virConnectStartKeepAlive as virConnectSetKeepAlive - add a note to virEventRegisterImpl that running the event loop is mandatory once registered
Version 2: - no change
include/libvirt/libvirt.h.in | 4 +++ src/driver.h | 5 +++ src/libvirt.c | 57 ++++++++++++++++++++++++++++++++++++++++++ src/libvirt_internal.h | 10 ++++++- src/libvirt_public.syms | 1 + src/util/event.c | 6 ++-- 6 files changed, 78 insertions(+), 5 deletions(-)
ACK Daniel -- |: http://berrange.com -o- http://www.flickr.com/photos/dberrange/ :| |: http://libvirt.org -o- http://virt-manager.org :| |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :| |: http://entangle-photo.org -o- http://live.gnome.org/gtk-vnc :|

Since virsh already implements event loop, it has to also run it. So far the event loop was only running during virsh console command. --- Notes: ACKed Version 4: - no changes Version 3: - new patch tools/console.c | 17 ++++++++++++++--- tools/virsh.c | 31 +++++++++++++++++++++++++++++++ 2 files changed, 45 insertions(+), 3 deletions(-) diff --git a/tools/console.c b/tools/console.c index 0f85bc7..e9e01a4 100644 --- a/tools/console.c +++ b/tools/console.c @@ -41,6 +41,7 @@ # include "util.h" # include "virfile.h" # include "memory.h" +# include "threads.h" # include "virterror_internal.h" @@ -60,6 +61,8 @@ typedef virConsole *virConsolePtr; struct virConsole { virStreamPtr st; bool quit; + virMutex lock; + virCond cond; int stdinWatch; int stdoutWatch; @@ -89,7 +92,6 @@ cfmakeraw (struct termios *attr) static void virConsoleShutdown(virConsolePtr con) { - con->quit = true; if (con->st) { virStreamEventRemoveCallback(con->st); virStreamAbort(con->st); @@ -101,6 +103,8 @@ virConsoleShutdown(virConsolePtr con) virEventRemoveHandle(con->stdoutWatch); con->stdinWatch = -1; con->stdoutWatch = -1; + con->quit = true; + virCondSignal(&con->cond); } static void @@ -334,6 +338,9 @@ int vshRunConsole(virDomainPtr dom, const char *dev_name) if (virDomainOpenConsole(dom, dev_name, con->st, 0) < 0) goto cleanup; + if (virCondInit(&con->cond) < 0 || virMutexInit(&con->lock) < 0) + goto cleanup; + con->stdinWatch = virEventAddHandle(STDIN_FILENO, VIR_EVENT_HANDLE_READABLE, virConsoleEventOnStdin, @@ -352,8 +359,10 @@ int vshRunConsole(virDomainPtr dom, const char *dev_name) NULL); while (!con->quit) { - if (virEventRunDefaultImpl() < 0) - break; + if (virCondWait(&con->cond, &con->lock) < 0) { + VIR_ERROR(_("unable to wait on console condition")); + goto cleanup; + } } ret = 0; @@ -363,6 +372,8 @@ int vshRunConsole(virDomainPtr dom, const char *dev_name) if (con) { if (con->st) virStreamFree(con->st); + virMutexDestroy(&con->lock); + ignore_value(virCondDestroy(&con->cond)); VIR_FREE(con); } diff --git a/tools/virsh.c b/tools/virsh.c index 72344f0..9ed9a04 100644 --- a/tools/virsh.c +++ b/tools/virsh.c @@ -249,6 +249,9 @@ typedef struct __vshControl { virDomainGetState is not supported */ bool useSnapshotOld; /* cannot use virDomainSnapshotGetParent or virDomainSnapshotNumChildren */ + virThread eventLoop; + bool eventLoopStarted; + bool quit; } __vshControl; typedef struct vshCmdGrp { @@ -15954,6 +15957,19 @@ vshError(vshControl *ctl, const char *format, ...) } +static void +vshEventLoop(void *opaque) +{ + vshControl *ctl = opaque; + + while (!ctl->quit) { + if (virEventRunDefaultImpl() < 0) { + virshReportError(ctl); + } + } +} + + /* * Initialize connection. */ @@ -15999,6 +16015,10 @@ vshInit(vshControl *ctl) if (virEventRegisterDefaultImpl() < 0) return false; + if (virThreadCreate(&ctl->eventLoop, true, vshEventLoop, ctl) < 0) + return false; + ctl->eventLoopStarted = true; + if (ctl->name) { ctl->conn = virConnectOpenAuth(ctl->name, virConnectAuthPtrDefault, @@ -16387,6 +16407,7 @@ vshReadline (vshControl *ctl, const char *prompt) static bool vshDeinit(vshControl *ctl) { + ctl->quit = true; vshReadlineDeinit(ctl); vshCloseLogFile(ctl); VIR_FREE(ctl->name); @@ -16398,6 +16419,16 @@ vshDeinit(vshControl *ctl) } virResetLastError(); + if (ctl->eventLoopStarted) { + /* HACK: Add a dummy timeout to break event loop */ + int timer = virEventAddTimeout(-1, NULL, NULL, NULL); + if (timer != -1) + virEventRemoveTimeout(timer); + + virThreadJoin(&ctl->eventLoop); + ctl->eventLoopStarted = false; + } + return true; } -- 1.7.7.1

On Thu, Oct 27, 2011 at 06:05:40PM +0200, Jiri Denemark wrote:
Since virsh already implements event loop, it has to also run it. So far the event loop was only running during virsh console command. --- Notes: ACKed
Version 4: - no changes
Version 3: - new patch
tools/console.c | 17 ++++++++++++++--- tools/virsh.c | 31 +++++++++++++++++++++++++++++++ 2 files changed, 45 insertions(+), 3 deletions(-)
ACK Daniel -- |: http://berrange.com -o- http://www.flickr.com/photos/dberrange/ :| |: http://libvirt.org -o- http://virt-manager.org :| |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :| |: http://entangle-photo.org -o- http://live.gnome.org/gtk-vnc :|

--- Notes: ACKed Version 4: - explicitly document the semantics of keepalive_count = 0 Version 3: - keepalive_supported configuration option can be used to refuse connections from clients that do not support keepalive protocol - explain what keepalive_interval = -1 means - start up the keepalive protocol when a client asks if we support it (clients without keepalive support do not ask for it) - add filters to the end of the list so that they are processed in the same order they were added (and not in reverse order); as a result of that the keepalive filter will always be the first one and libvirtd will not send keepalive requests while client is sending stream packets Version 2: - no change daemon/libvirtd.aug | 5 ++ daemon/libvirtd.c | 15 +++++ daemon/libvirtd.conf | 25 +++++++ daemon/libvirtd.h | 1 + daemon/remote.c | 48 ++++++++++++++- src/libvirt_private.syms | 2 + src/remote/remote_protocol.x | 2 +- src/rpc/virnetserver.c | 22 +++++++ src/rpc/virnetserver.h | 5 ++ src/rpc/virnetserverclient.c | 143 ++++++++++++++++++++++++++++++++++++++--- src/rpc/virnetserverclient.h | 7 ++ 11 files changed, 262 insertions(+), 13 deletions(-) diff --git a/daemon/libvirtd.aug b/daemon/libvirtd.aug index ce00db5..9d78bd7 100644 --- a/daemon/libvirtd.aug +++ b/daemon/libvirtd.aug @@ -66,6 +66,10 @@ module Libvirtd = let auditing_entry = int_entry "audit_level" | bool_entry "audit_logging" + let keepalive_entry = int_entry "keepalive_interval" + | int_entry "keepalive_count" + | bool_entry "keepalive_required" + (* Each enty in the config is one of the following three ... *) let entry = network_entry | sock_acl_entry @@ -75,6 +79,7 @@ module Libvirtd = | processing_entry | logging_entry | auditing_entry + | keepalive_entry let comment = [ label "#comment" . del /#[ \t]*/ "# " . store /([^ \t\n][^\n]*)?/ . del /\n/ "\n" ] let empty = [ label "#empty" . eol ] diff --git a/daemon/libvirtd.c b/daemon/libvirtd.c index 5e1fc96..d7a03d7 100644 --- a/daemon/libvirtd.c +++ b/daemon/libvirtd.c @@ -146,6 +146,10 @@ struct daemonConfig { int audit_level; int audit_logging; + + int keepalive_interval; + unsigned int keepalive_count; + int keepalive_required; }; enum { @@ -899,6 +903,10 @@ daemonConfigNew(bool privileged ATTRIBUTE_UNUSED) data->audit_level = 1; data->audit_logging = 0; + data->keepalive_interval = 5; + data->keepalive_count = 5; + data->keepalive_required = 0; + localhost = virGetHostname(NULL); if (localhost == NULL) { /* we couldn't resolve the hostname; assume that we are @@ -1062,6 +1070,10 @@ daemonConfigLoad(struct daemonConfig *data, GET_CONF_STR (conf, filename, log_outputs); GET_CONF_INT (conf, filename, log_buffer_size); + GET_CONF_INT (conf, filename, keepalive_interval); + GET_CONF_INT (conf, filename, keepalive_count); + GET_CONF_INT (conf, filename, keepalive_required); + virConfFree (conf); return 0; @@ -1452,6 +1464,9 @@ int main(int argc, char **argv) { config->max_workers, config->prio_workers, config->max_clients, + config->keepalive_interval, + config->keepalive_count, + !!config->keepalive_required, config->mdns_adv ? config->mdns_name : NULL, use_polkit_dbus, remoteClientInitHook))) { diff --git a/daemon/libvirtd.conf b/daemon/libvirtd.conf index da3983e..f218454 100644 --- a/daemon/libvirtd.conf +++ b/daemon/libvirtd.conf @@ -366,3 +366,28 @@ # it with the output of the 'uuidgen' command and then # uncomment this entry #host_uuid = "00000000-0000-0000-0000-000000000000" + +################################################################### +# Keepalive protocol: +# This allows libvirtd to detect broken client connections or even +# dead client. A keepalive message is sent to a client after +# keepalive_interval seconds of inactivity to check if the client is +# still responding; keepalive_count is a maximum number of keepalive +# messages that are allowed to be sent to the client without getting +# any response before the connection is considered broken. In other +# words, the connection is automatically closed approximately after +# keepalive_interval * (keepalive_count + 1) seconds since the last +# message received from the client. If keepalive_interval is set to +# -1, libvirtd will never send keepalive requests; however clients +# can still send them and the deamon will send responses. When +# keepalive_count is set to 0, connections will be automatically +# closed after keepalive_interval seconds of inactivity without +# sending any keepalive messages. +# +#keepalive_interval = 5 +#keepalive_count = 5 +# +# If set to 1, libvirtd will refuse to talk to clients that do not +# support keepalive protocol. Defaults to 0. +# +#keepalive_required = 1 diff --git a/daemon/libvirtd.h b/daemon/libvirtd.h index ecb7374..e02d7b8 100644 --- a/daemon/libvirtd.h +++ b/daemon/libvirtd.h @@ -62,6 +62,7 @@ struct daemonClientPrivate { virConnectPtr conn; daemonClientStreamPtr streams; + bool keepalive_supported; }; # if HAVE_SASL diff --git a/daemon/remote.c b/daemon/remote.c index 9d70163..c35fe07 100644 --- a/daemon/remote.c +++ b/daemon/remote.c @@ -584,7 +584,7 @@ int remoteClientInitHook(virNetServerPtr srv ATTRIBUTE_UNUSED, /*----- Functions. -----*/ static int -remoteDispatchOpen(virNetServerPtr server ATTRIBUTE_UNUSED, +remoteDispatchOpen(virNetServerPtr server, virNetServerClientPtr client, virNetMessageHeaderPtr hdr ATTRIBUTE_UNUSED, virNetMessageErrorPtr rerr, @@ -603,6 +603,12 @@ remoteDispatchOpen(virNetServerPtr server ATTRIBUTE_UNUSED, goto cleanup; } + if (virNetServerKeepAliveRequired(server) && !priv->keepalive_supported) { + virNetError(VIR_ERR_OPERATION_FAILED, "%s", + _("keepalive support is required to connect")); + goto cleanup; + } + name = args->name ? *args->name : NULL; /* If this connection arrived on a readonly socket, force @@ -3175,6 +3181,46 @@ cleanup: } +static int +remoteDispatchSupportsFeature(virNetServerPtr server ATTRIBUTE_UNUSED, + virNetServerClientPtr client, + virNetMessageHeaderPtr hdr ATTRIBUTE_UNUSED, + virNetMessageErrorPtr rerr, + remote_supports_feature_args *args, + remote_supports_feature_ret *ret) +{ + int rv = -1; + int supported; + struct daemonClientPrivate *priv = + virNetServerClientGetPrivateData(client); + + if (args->feature == VIR_DRV_FEATURE_PROGRAM_KEEPALIVE) { + if (virNetServerClientStartKeepAlive(client) < 0) + goto cleanup; + supported = 1; + goto done; + } + + if (!priv->conn) { + virNetError(VIR_ERR_INTERNAL_ERROR, "%s", _("connection not open")); + goto cleanup; + } + + if ((supported = virDrvSupportsFeature(priv->conn, args->feature)) < 0) + goto cleanup; + +done: + ret->supported = supported; + rv = 0; + +cleanup: + if (rv < 0) + virNetMessageSaveError(rerr); + return rv; +} + + + /*----- Helpers. -----*/ /* get_nonnull_domain and get_nonnull_network turn an on-wire diff --git a/src/libvirt_private.syms b/src/libvirt_private.syms index a81966d..d5c5c0f 100644 --- a/src/libvirt_private.syms +++ b/src/libvirt_private.syms @@ -1202,6 +1202,7 @@ virNetServerAutoShutdown; virNetServerClose; virNetServerFree; virNetServerIsPrivileged; +virNetServerKeepAliveRequired; virNetServerNew; virNetServerQuit; virNetServerRef; @@ -1234,6 +1235,7 @@ virNetServerClientSendMessage; virNetServerClientSetCloseHook; virNetServerClientSetIdentity; virNetServerClientSetPrivateData; +virNetServerClientStartKeepAlive; # virnetserverprogram.h diff --git a/src/remote/remote_protocol.x b/src/remote/remote_protocol.x index d135653..47b8957 100644 --- a/src/remote/remote_protocol.x +++ b/src/remote/remote_protocol.x @@ -2348,7 +2348,7 @@ enum remote_procedure { REMOTE_PROC_DOMAIN_GET_SCHEDULER_PARAMETERS = 57, /* skipgen autogen */ REMOTE_PROC_DOMAIN_SET_SCHEDULER_PARAMETERS = 58, /* autogen autogen */ REMOTE_PROC_GET_HOSTNAME = 59, /* autogen autogen priority:high */ - REMOTE_PROC_SUPPORTS_FEATURE = 60, /* autogen autogen priority:high */ + REMOTE_PROC_SUPPORTS_FEATURE = 60, /* skipgen autogen priority:high */ REMOTE_PROC_DOMAIN_MIGRATE_PREPARE = 61, /* skipgen skipgen */ REMOTE_PROC_DOMAIN_MIGRATE_PERFORM = 62, /* autogen autogen */ diff --git a/src/rpc/virnetserver.c b/src/rpc/virnetserver.c index f739743..4c8273f8 100644 --- a/src/rpc/virnetserver.c +++ b/src/rpc/virnetserver.c @@ -102,6 +102,10 @@ struct _virNetServer { size_t nclients_max; virNetServerClientPtr *clients; + int keepaliveInterval; + unsigned int keepaliveCount; + bool keepaliveRequired; + unsigned int quit :1; virNetTLSContextPtr tls; @@ -260,6 +264,9 @@ static int virNetServerDispatchNewClient(virNetServerServicePtr svc ATTRIBUTE_UN virNetServerDispatchNewMessage, srv); + virNetServerClientInitKeepAlive(client, srv->keepaliveInterval, + srv->keepaliveCount); + virNetServerUnlock(srv); return 0; @@ -299,6 +306,9 @@ virNetServerPtr virNetServerNew(size_t min_workers, size_t max_workers, size_t priority_workers, size_t max_clients, + int keepaliveInterval, + unsigned int keepaliveCount, + bool keepaliveRequired, const char *mdnsGroupName, bool connectDBus ATTRIBUTE_UNUSED, virNetServerClientInitHook clientInitHook) @@ -320,6 +330,9 @@ virNetServerPtr virNetServerNew(size_t min_workers, goto error; srv->nclients_max = max_clients; + srv->keepaliveInterval = keepaliveInterval; + srv->keepaliveCount = keepaliveCount; + srv->keepaliveRequired = keepaliveRequired; srv->sigwrite = srv->sigread = -1; srv->clientInitHook = clientInitHook; srv->privileged = geteuid() == 0 ? true : false; @@ -839,3 +852,12 @@ void virNetServerClose(virNetServerPtr srv) virNetServerUnlock(srv); } + +bool virNetServerKeepAliveRequired(virNetServerPtr srv) +{ + bool required; + virNetServerLock(srv); + required = srv->keepaliveRequired; + virNetServerUnlock(srv); + return required; +} diff --git a/src/rpc/virnetserver.h b/src/rpc/virnetserver.h index cc9d039..a04ffdd 100644 --- a/src/rpc/virnetserver.h +++ b/src/rpc/virnetserver.h @@ -41,6 +41,9 @@ virNetServerPtr virNetServerNew(size_t min_workers, size_t max_workers, size_t priority_workers, size_t max_clients, + int keepaliveInterval, + unsigned int keepaliveCount, + bool keepaliveRequired, const char *mdnsGroupName, bool connectDBus, virNetServerClientInitHook clientInitHook); @@ -88,4 +91,6 @@ void virNetServerFree(virNetServerPtr srv); void virNetServerClose(virNetServerPtr srv); +bool virNetServerKeepAliveRequired(virNetServerPtr srv); + #endif diff --git a/src/rpc/virnetserverclient.c b/src/rpc/virnetserverclient.c index 05077d6..c5641ef 100644 --- a/src/rpc/virnetserverclient.c +++ b/src/rpc/virnetserverclient.c @@ -33,6 +33,7 @@ #include "virterror_internal.h" #include "memory.h" #include "threads.h" +#include "virkeepalive.h" #define VIR_FROM_THIS VIR_FROM_RPC #define virNetError(code, ...) \ @@ -98,6 +99,9 @@ struct _virNetServerClient void *privateData; virNetServerClientFreeFunc privateDataFreeFunc; virNetServerClientCloseFunc privateDataCloseFunc; + + virKeepAlivePtr keepalive; + int keepaliveFilter; }; @@ -207,15 +211,15 @@ static void virNetServerClientUpdateEvent(virNetServerClientPtr client) } -int virNetServerClientAddFilter(virNetServerClientPtr client, - virNetServerClientFilterFunc func, - void *opaque) +static int +virNetServerClientAddFilterLocked(virNetServerClientPtr client, + virNetServerClientFilterFunc func, + void *opaque) { virNetServerClientFilterPtr filter; + virNetServerClientFilterPtr *place; int ret = -1; - virNetServerClientLock(client); - if (VIR_ALLOC(filter) < 0) { virReportOOMError(); goto cleanup; @@ -225,22 +229,34 @@ int virNetServerClientAddFilter(virNetServerClientPtr client, filter->func = func; filter->opaque = opaque; - filter->next = client->filters; - client->filters = filter; + place = &client->filters; + while (*place) + place = &(*place)->next; + *place = filter; ret = filter->id; cleanup: - virNetServerClientUnlock(client); return ret; } +int virNetServerClientAddFilter(virNetServerClientPtr client, + virNetServerClientFilterFunc func, + void *opaque) +{ + int ret; -void virNetServerClientRemoveFilter(virNetServerClientPtr client, - int filterID) + virNetServerClientLock(client); + ret = virNetServerClientAddFilterLocked(client, func, opaque); + virNetServerClientUnlock(client); + return ret; +} + +static void +virNetServerClientRemoveFilterLocked(virNetServerClientPtr client, + int filterID) { virNetServerClientFilterPtr tmp, prev; - virNetServerClientLock(client); prev = NULL; tmp = client->filters; @@ -257,7 +273,13 @@ void virNetServerClientRemoveFilter(virNetServerClientPtr client, prev = tmp; tmp = tmp->next; } +} +void virNetServerClientRemoveFilter(virNetServerClientPtr client, + int filterID) +{ + virNetServerClientLock(client); + virNetServerClientRemoveFilterLocked(client, filterID); virNetServerClientUnlock(client); } @@ -318,6 +340,7 @@ virNetServerClientPtr virNetServerClientNew(virNetSocketPtr sock, client->readonly = readonly; client->tlsCtxt = tls; client->nrequests_max = nrequests_max; + client->keepaliveFilter = -1; if (tls) virNetTLSContextRef(tls); @@ -577,6 +600,7 @@ void virNetServerClientFree(virNetServerClientPtr client) void virNetServerClientClose(virNetServerClientPtr client) { virNetServerClientCloseFunc cf; + virKeepAlivePtr ka; virNetServerClientLock(client); VIR_DEBUG("client=%p refs=%d", client, client->refs); @@ -585,6 +609,20 @@ void virNetServerClientClose(virNetServerClientPtr client) return; } + if (client->keepaliveFilter >= 0) + virNetServerClientRemoveFilterLocked(client, client->keepaliveFilter); + + if (client->keepalive) { + virKeepAliveStop(client->keepalive); + ka = client->keepalive; + client->keepalive = NULL; + client->refs++; + virNetServerClientUnlock(client); + virKeepAliveFree(ka); + virNetServerClientLock(client); + client->refs--; + } + if (client->privateDataCloseFunc) { cf = client->privateDataCloseFunc; client->refs++; @@ -988,6 +1026,7 @@ int virNetServerClientSendMessage(virNetServerClientPtr client, VIR_DEBUG("msg=%p proc=%d len=%zu offset=%zu", msg, msg->header.proc, msg->bufferLength, msg->bufferOffset); + virNetServerClientLock(client); if (client->sock && !client->wantClose) { @@ -1003,6 +1042,7 @@ int virNetServerClientSendMessage(virNetServerClientPtr client, } virNetServerClientUnlock(client); + return ret; } @@ -1016,3 +1056,84 @@ bool virNetServerClientNeedAuth(virNetServerClientPtr client) virNetServerClientUnlock(client); return need; } + + +static void +virNetServerClientKeepAliveDeadCB(void *opaque) +{ + virNetServerClientImmediateClose(opaque); +} + +static int +virNetServerClientKeepAliveSendCB(void *opaque, + virNetMessagePtr msg) +{ + return virNetServerClientSendMessage(opaque, msg); +} + +static void +virNetServerClientFreeCB(void *opaque) +{ + virNetServerClientFree(opaque); +} + +static int +virNetServerClientKeepAliveFilter(virNetServerClientPtr client, + virNetMessagePtr msg, + void *opaque ATTRIBUTE_UNUSED) +{ + if (virKeepAliveCheckMessage(client->keepalive, msg)) { + virNetMessageFree(msg); + client->nrequests--; + return 1; + } + + return 0; +} + +int +virNetServerClientInitKeepAlive(virNetServerClientPtr client, + int interval, + unsigned int count) +{ + virKeepAlivePtr ka; + int ret = -1; + + virNetServerClientLock(client); + + if (!(ka = virKeepAliveNew(interval, count, client, + virNetServerClientKeepAliveSendCB, + virNetServerClientKeepAliveDeadCB, + virNetServerClientFreeCB))) + goto cleanup; + /* keepalive object has a reference to client */ + client->refs++; + + client->keepaliveFilter = + virNetServerClientAddFilterLocked(client, + virNetServerClientKeepAliveFilter, + NULL); + if (client->keepaliveFilter < 0) + goto cleanup; + + client->keepalive = ka; + ka = NULL; + +cleanup: + virNetServerClientUnlock(client); + if (ka) + virKeepAliveStop(ka); + virKeepAliveFree(ka); + + return ret; +} + +int +virNetServerClientStartKeepAlive(virNetServerClientPtr client) +{ + int ret; + virNetServerClientLock(client); + ret = virKeepAliveStart(client->keepalive, 0, 0); + virNetServerClientUnlock(client); + return ret; +} diff --git a/src/rpc/virnetserverclient.h b/src/rpc/virnetserverclient.h index bedb179..a201dca 100644 --- a/src/rpc/virnetserverclient.h +++ b/src/rpc/virnetserverclient.h @@ -99,6 +99,13 @@ bool virNetServerClientWantClose(virNetServerClientPtr client); int virNetServerClientInit(virNetServerClientPtr client); +int virNetServerClientInitKeepAlive(virNetServerClientPtr client, + int interval, + unsigned int count); +bool virNetServerClientCheckKeepAlive(virNetServerClientPtr client, + virNetMessagePtr msg); +int virNetServerClientStartKeepAlive(virNetServerClientPtr client); + const char *virNetServerClientLocalAddrString(virNetServerClientPtr client); const char *virNetServerClientRemoteAddrString(virNetServerClientPtr client); -- 1.7.7.1

On Thu, Oct 27, 2011 at 06:05:41PM +0200, Jiri Denemark wrote:
--- Notes: ACKed
Version 4: - explicitly document the semantics of keepalive_count = 0
Version 3: - keepalive_supported configuration option can be used to refuse connections from clients that do not support keepalive protocol - explain what keepalive_interval = -1 means - start up the keepalive protocol when a client asks if we support it (clients without keepalive support do not ask for it) - add filters to the end of the list so that they are processed in the same order they were added (and not in reverse order); as a result of that the keepalive filter will always be the first one and libvirtd will not send keepalive requests while client is sending stream packets
Version 2: - no change
daemon/libvirtd.aug | 5 ++ daemon/libvirtd.c | 15 +++++ daemon/libvirtd.conf | 25 +++++++ daemon/libvirtd.h | 1 + daemon/remote.c | 48 ++++++++++++++- src/libvirt_private.syms | 2 + src/remote/remote_protocol.x | 2 +- src/rpc/virnetserver.c | 22 +++++++ src/rpc/virnetserver.h | 5 ++ src/rpc/virnetserverclient.c | 143 ++++++++++++++++++++++++++++++++++++++--- src/rpc/virnetserverclient.h | 7 ++ 11 files changed, 262 insertions(+), 13 deletions(-)
ACK Daniel -- |: http://berrange.com -o- http://www.flickr.com/photos/dberrange/ :| |: http://libvirt.org -o- http://virt-manager.org :| |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :| |: http://entangle-photo.org -o- http://live.gnome.org/gtk-vnc :|

When a client wants to send a keepalive message it needs to do so in a non-blocking way to avoid blocking its event loop. This patch adds dontBlock flag which says that the call should be processed without blocking. Such calls do not have a thread waiting for the result associated with them. This means, that sending such call fails if no thread is dispatching and writing to the socket would block. In case there is a thread waiting for its (normal) call to finish, sending non-blocking call just pushes it into the queue and lets the dispatching thread send it. The thread which has the buck tries to send all non-blocking calls in the queue in a best effort way---if sending them would block or there's an error on the socket, non-blocking calls are simply removed from the queue and discarded. In case a non-blocking call is partially sent but sending the rest of it would block, it is moved into client's unfinishedCall and left for future delivery. Every sending attempt first sends the rest of unfinishedCall and than continues with other queued calls. --- Notes: Version 4: - correctly handle partially sent non-blocking calls that would block Version 3: - no changes Version 2: - no changes src/rpc/virnetclient.c | 261 ++++++++++++++++++++++++++++++++++++++---------- 1 files changed, 210 insertions(+), 51 deletions(-) diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c index 085dc8d..58ba66d 100644 --- a/src/rpc/virnetclient.c +++ b/src/rpc/virnetclient.c @@ -55,6 +55,7 @@ struct _virNetClientCall { virNetMessagePtr msg; bool expectReply; + bool dontBlock; virCond cond; @@ -86,6 +87,9 @@ struct _virNetClient { int wakeupSendFD; int wakeupReadFD; + /* Unfinished call that needs to be finished before any of the calls in + * the queue can be processed */ + virNetClientCallPtr unfinishedCall; /* List of threads currently waiting for dispatch */ virNetClientCallPtr waitDispatch; @@ -94,6 +98,11 @@ struct _virNetClient { }; +static int virNetClientSendInternal(virNetClientPtr client, + virNetMessagePtr msg, + bool expectReply, + bool dontBlock); + static void virNetClientLock(virNetClientPtr client) { virMutexLock(&client->lock); @@ -743,26 +752,42 @@ static ssize_t virNetClientIOHandleOutput(virNetClientPtr client) { virNetClientCallPtr thecall = client->waitDispatch; + ssize_t ret = -1; while (thecall && thecall->mode != VIR_NET_CLIENT_MODE_WAIT_TX) thecall = thecall->next; + /* If there is an unfinished non-blocking call, process it first */ + if (client->unfinishedCall) { + client->unfinishedCall->next = thecall; + thecall = client->unfinishedCall; + } + if (!thecall) - return -1; /* Shouldn't happen, but you never know... */ + goto cleanup; /* Shouldn't happen, but you never know... */ while (thecall) { - ssize_t ret = virNetClientIOWriteMessage(client, thecall); + ret = virNetClientIOWriteMessage(client, thecall); if (ret < 0) - return ret; + goto cleanup; - if (thecall->mode == VIR_NET_CLIENT_MODE_WAIT_TX) - return 0; /* Blocking write, to back to event loop */ + if (thecall->mode == VIR_NET_CLIENT_MODE_WAIT_TX) { + /* Blocking write, go back to event loop */ + ret = 0; + goto cleanup; + } thecall = thecall->next; } - return 0; /* No more calls to send, all done */ + ret = 0; /* No more calls to send, all done */ + +cleanup: + if (client->unfinishedCall) + client->unfinishedCall->next = NULL; + + return ret; } static ssize_t @@ -845,6 +870,91 @@ virNetClientIOHandleInput(virNetClientPtr client) } +static void +virNetClientDiscardNonBlocking(virNetClientPtr client, + virNetClientCallPtr thiscall, + bool error) +{ + virNetClientCallPtr call = client->waitDispatch; + virNetClientCallPtr prev = NULL; + + if (client->unfinishedCall) { + client->unfinishedCall->next = call; + call = client->unfinishedCall; + } + + while (call) { + virNetClientCallPtr next = call->next; + + if (!call->dontBlock) { + prev = call; + goto skip; + } + + /* We can't remove nonblocking call which was already partially sent + * to the remote party (unless there was an error in which case we + * won't be able to send anything anymore anyway); we store it in + * unfinishedCall and when someone needs to send something in the + * future, it will first send the rest of the unfinishedCall. + */ + if (!error && + call->mode != VIR_NET_CLIENT_MODE_COMPLETE && + call->msg->bufferOffset > 0) { + VIR_DEBUG("Can't finish nonblocking call %p without blocking", + call); + if (call == client->unfinishedCall) + goto skip; + + client->unfinishedCall = call; + goto next; + } + + /* We should never free thiscall since it will be freed by the caller. + * We shouldn't remove thiscall from the queue either since that is + * handled elsewhere. + */ + if (call == thiscall) { + prev = call; + goto skip; + } + + /* Remove and free completed calls or calls that we didn't even get to + * without blocking or error. + */ + if (call->mode != VIR_NET_CLIENT_MODE_COMPLETE) { + const char *action; + if (call->msg->bufferOffset > 0) + action = "finish"; + else + action = "send"; + + VIR_DEBUG("Can't %s nonblocking call %p without %s", + action, call, error ? "error" : "blocking"); + } + + if (call == client->unfinishedCall) { + client->unfinishedCall = NULL; + virNetMessageFree(call->msg); + VIR_FREE(call); + goto skip; + } + + virNetMessageFree(call->msg); + VIR_FREE(call); + +next: + if (prev) + prev->next = next; + else + client->waitDispatch = next; +skip: + call = next; + } + + if (client->unfinishedCall) + client->unfinishedCall->next = NULL; +} + /* * Process all calls pending dispatch/receive until we * get a reply to our own call. Then quit and pass the buck @@ -854,7 +964,9 @@ static int virNetClientIOEventLoop(virNetClientPtr client, virNetClientCallPtr thiscall) { struct pollfd fds[2]; - int ret; + int pollret; + bool error; + int ret = -1; fds[0].fd = virNetSocketGetFD(client->sock); fds[1].fd = client->wakeupReadFD; @@ -877,11 +989,19 @@ static int virNetClientIOEventLoop(virNetClientPtr client, fds[1].events = fds[1].revents = 0; fds[1].events = POLLIN; + if (client->unfinishedCall) + fds[0].events = POLLOUT; + while (tmp) { if (tmp->mode == VIR_NET_CLIENT_MODE_WAIT_RX) fds[0].events |= POLLIN; if (tmp->mode == VIR_NET_CLIENT_MODE_WAIT_TX) fds[0].events |= POLLOUT; + /* We don't want to sleep in poll if any of the calls is + * non-blocking + */ + if (tmp->dontBlock) + timeout = 0; tmp = tmp->next; } @@ -913,8 +1033,8 @@ static int virNetClientIOEventLoop(virNetClientPtr client, ignore_value(pthread_sigmask(SIG_BLOCK, &blockedsigs, &oldmask)); repoll: - ret = poll(fds, ARRAY_CARDINALITY(fds), timeout); - if (ret < 0 && errno == EAGAIN) + pollret = poll(fds, ARRAY_CARDINALITY(fds), timeout); + if (pollret < 0 && errno == EAGAIN) goto repoll; ignore_value(pthread_sigmask(SIG_SETMASK, &oldmask, NULL)); @@ -936,7 +1056,7 @@ static int virNetClientIOEventLoop(virNetClientPtr client, } } - if (ret < 0) { + if (pollret < 0) { if (errno == EWOULDBLOCK) continue; virReportSystemError(errno, @@ -954,8 +1074,14 @@ static int virNetClientIOEventLoop(virNetClientPtr client, goto error; } - /* Iterate through waiting threads and if - * any are complete then tell 'em to wakeup + /* All calls in the queue have been sent or sending would block, remove + * nonblocking calls since we did all we could for them. + */ + error = !!(fds[0].revents & (POLLHUP | POLLERR)); + virNetClientDiscardNonBlocking(client, thiscall, error); + + /* Iterate through waiting calls and if any are complete, tell + * their threads to wake up. */ tmp = client->waitDispatch; prev = NULL; @@ -983,39 +1109,39 @@ static int virNetClientIOEventLoop(virNetClientPtr client, /* Now see if *we* are done */ if (thiscall->mode == VIR_NET_CLIENT_MODE_COMPLETE) { - /* We're at head of the list already, so - * remove us - */ - client->waitDispatch = thiscall->next; - VIR_DEBUG("Giving up the buck %p %p", thiscall, client->waitDispatch); - /* See if someone else is still waiting - * and if so, then pass the buck ! */ - if (client->waitDispatch) { - VIR_DEBUG("Passing the buck to %p", client->waitDispatch); - virCondSignal(&client->waitDispatch->cond); - } - return 0; + VIR_DEBUG("Giving up the buck %p %p", thiscall, thiscall->next); + ret = 0; + goto pass; } - if (fds[0].revents & (POLLHUP | POLLERR)) { virNetError(VIR_ERR_INTERNAL_ERROR, "%s", _("received hangup / error event on socket")); goto error; } - } + if (thiscall->dontBlock && thiscall != client->unfinishedCall) { + virNetError(VIR_ERR_INTERNAL_ERROR, "%s", + _("Can't send nonblocking call without blocking")); + VIR_DEBUG("Giving up the buck %p %p", thiscall, thiscall->next); + goto pass; + } + } error: + virNetClientDiscardNonBlocking(client, thiscall, true); + VIR_DEBUG("Giving up the buck due to I/O error %p %p", + thiscall, thiscall->next); + +pass: client->waitDispatch = thiscall->next; - VIR_DEBUG("Giving up the buck due to I/O error %p %p", thiscall, client->waitDispatch); /* See if someone else is still waiting * and if so, then pass the buck ! */ if (client->waitDispatch) { VIR_DEBUG("Passing the buck to %p", client->waitDispatch); virCondSignal(&client->waitDispatch->cond); } - return -1; + return ret; } @@ -1057,38 +1183,41 @@ static int virNetClientIO(virNetClientPtr client, { int rv = -1; - VIR_DEBUG("Outgoing message prog=%u version=%u serial=%u proc=%d type=%d length=%zu dispatch=%p", + VIR_DEBUG("Outgoing message prog=%u version=%u serial=%u proc=%d type=%d" + " length=%zu dontBlock=%d dispatch=%p", thiscall->msg->header.prog, thiscall->msg->header.vers, thiscall->msg->header.serial, thiscall->msg->header.proc, thiscall->msg->header.type, thiscall->msg->bufferLength, + thiscall->dontBlock, client->waitDispatch); /* Check to see if another thread is dispatching */ if (client->waitDispatch) { - /* Stick ourselves on the end of the wait queue */ - virNetClientCallPtr tmp = client->waitDispatch; + virNetClientCallPtr tmp; char ignore = 1; - while (tmp && tmp->next) - tmp = tmp->next; - if (tmp) - tmp->next = thiscall; - else - client->waitDispatch = thiscall; /* Force other thread to wakeup from poll */ if (safewrite(client->wakeupSendFD, &ignore, sizeof(ignore)) != sizeof(ignore)) { - if (tmp) - tmp->next = NULL; - else - client->waitDispatch = NULL; virReportSystemError(errno, "%s", _("failed to wake up polling thread")); return -1; } + /* Stick ourselves on the end of the wait queue */ + tmp = client->waitDispatch; + while (tmp->next) + tmp = tmp->next; + tmp->next = thiscall; + + if (thiscall->dontBlock) { + VIR_DEBUG("Sending non-blocking call while another thread is" + " dispatching; it will send the call for us"); + return 0; + } + VIR_DEBUG("Going to sleep %p %p", client->waitDispatch, thiscall); /* Go to sleep while other thread is working... */ if (virCondWait(&thiscall->cond, &client->lock) < 0) { @@ -1108,7 +1237,7 @@ static int virNetClientIO(virNetClientPtr client, return -1; } - VIR_DEBUG("Wokeup from sleep %p %p", client->waitDispatch, thiscall); + VIR_DEBUG("Woken up from sleep %p %p", client->waitDispatch, thiscall); /* Two reasons we can be woken up * 1. Other thread has got our reply ready for us * 2. Other thread is all done, and it is our turn to @@ -1198,9 +1327,11 @@ done: } -int virNetClientSend(virNetClientPtr client, - virNetMessagePtr msg, - bool expectReply) +static int +virNetClientSendInternal(virNetClientPtr client, + virNetMessagePtr msg, + bool expectReply, + bool dontBlock) { virNetClientCallPtr call; int ret = -1; @@ -1213,7 +1344,7 @@ int virNetClientSend(virNetClientPtr client, if (expectReply && (msg->bufferLength != 0) && - (msg->header.status == VIR_NET_CONTINUE)) { + (msg->header.status == VIR_NET_CONTINUE || dontBlock)) { virNetError(VIR_ERR_INTERNAL_ERROR, "%s", _("Attempt to send an asynchronous message with a synchronous reply")); return -1; @@ -1226,10 +1357,15 @@ int virNetClientSend(virNetClientPtr client, virNetClientLock(client); - if (virCondInit(&call->cond) < 0) { - virNetError(VIR_ERR_INTERNAL_ERROR, "%s", - _("cannot initialize condition variable")); - goto cleanup; + /* We don't need call->cond for non-blocking calls since there's no + * thread to be woken up anyway + */ + if (!dontBlock) { + if (virCondInit(&call->cond) < 0) { + virNetError(VIR_ERR_INTERNAL_ERROR, "%s", + _("cannot initialize condition variable")); + goto cleanup; + } } if (msg->bufferLength) @@ -1238,12 +1374,35 @@ int virNetClientSend(virNetClientPtr client, call->mode = VIR_NET_CLIENT_MODE_WAIT_RX; call->msg = msg; call->expectReply = expectReply; + call->dontBlock = dontBlock; ret = virNetClientIO(client, call); cleanup: ignore_value(virCondDestroy(&call->cond)); - VIR_FREE(call); + if (ret != 0) { + VIR_FREE(call); + } else if (dontBlock) { + /* Only free the call if it was completed since otherwise it was just + * queued up and will be processed later. + */ + if (call->mode == VIR_NET_CLIENT_MODE_COMPLETE) { + /* We need to free the message as well since no-one is waiting for + * it. + */ + virNetMessageFree(msg); + VIR_FREE(call); + } + } else { + VIR_FREE(call); + } virNetClientUnlock(client); return ret; } + +int virNetClientSend(virNetClientPtr client, + virNetMessagePtr msg, + bool expectReply) +{ + return virNetClientSendInternal(client, msg, expectReply, false); +} -- 1.7.7.1

On Thu, Oct 27, 2011 at 18:05:42 +0200, Jiri Denemark wrote:
When a client wants to send a keepalive message it needs to do so in a non-blocking way to avoid blocking its event loop. This patch adds dontBlock flag which says that the call should be processed without blocking. Such calls do not have a thread waiting for the result associated with them. This means, that sending such call fails if no thread is dispatching and writing to the socket would block. In case there is a thread waiting for its (normal) call to finish, sending non-blocking call just pushes it into the queue and lets the dispatching thread send it. The thread which has the buck tries to send all non-blocking calls in the queue in a best effort way---if sending them would block or there's an error on the socket, non-blocking calls are simply removed from the queue and discarded. In case a non-blocking call is partially sent but sending the rest of it would block, it is moved into client's unfinishedCall and left for future delivery. Every sending attempt first sends the rest of unfinishedCall and than continues with other queued calls. --- Notes: Version 4: - correctly handle partially sent non-blocking calls that would block
Version 3: - no changes
Version 2: - no changes
src/rpc/virnetclient.c | 261 ++++++++++++++++++++++++++++++++++++++---------- 1 files changed, 210 insertions(+), 51 deletions(-)
diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c index 085dc8d..58ba66d 100644 --- a/src/rpc/virnetclient.c +++ b/src/rpc/virnetclient.c ... +static void +virNetClientDiscardNonBlocking(virNetClientPtr client, + virNetClientCallPtr thiscall, + bool error) +{ + virNetClientCallPtr call = client->waitDispatch; + virNetClientCallPtr prev = NULL; + + if (client->unfinishedCall) { + client->unfinishedCall->next = call; + call = client->unfinishedCall; + } + + while (call) { + virNetClientCallPtr next = call->next; + + if (!call->dontBlock) { + prev = call; + goto skip; + } + + /* We can't remove nonblocking call which was already partially sent + * to the remote party (unless there was an error in which case we + * won't be able to send anything anymore anyway); we store it in + * unfinishedCall and when someone needs to send something in the + * future, it will first send the rest of the unfinishedCall. + */ + if (!error && + call->mode != VIR_NET_CLIENT_MODE_COMPLETE && + call->msg->bufferOffset > 0) { + VIR_DEBUG("Can't finish nonblocking call %p without blocking", + call); + if (call == client->unfinishedCall) + goto skip; + + client->unfinishedCall = call; + goto next; + }
I just realized that this won't work in case virNetClientIOWriteMessage lies about sent bytes, which it unfortunately does at least for SASL since in that case it returns zero if [0, bufferLength) bytes were sent and bufferLength when the last byte of the buffer is sent. It also buffers the data so anytime virNetClientIOWriteMessage is called, we need to consider the message to be partially sent even though zero is returned (and bufferOffset is still 0). Jirka

When a client wants to send a keepalive message it needs to do so in a non-blocking way to avoid blocking its event loop. This patch adds dontBlock flag which says that the call should be processed without blocking. Such calls do not have a thread waiting for the result associated with them. This means, that sending such call fails if no thread is dispatching and writing to the socket would block. In case there is a thread waiting for its (normal) call to finish, sending non-blocking call just pushes it into the queue and lets the dispatching thread send it. The thread which has the buck tries to send all non-blocking calls in the queue in a best effort way---if sending them would block or there's an error on the socket, non-blocking calls are simply removed from the queue and discarded. In case a non-blocking call is partially sent but sending the rest of it would block, it is moved into client's unfinishedCall and left for future delivery. Every sending attempt first sends the rest of unfinishedCall and than continues with other queued calls. --- Notes: Version 5: - partially sent non-blocking calls now work even for SASL (or other transports that cache data internally) - fixed several other bugs in that area Version 4: - correctly handle partially sent non-blocking calls that would block Version 3: - no changes Version 2: - no changes src/rpc/virnetclient.c | 285 ++++++++++++++++++++++++++++++++++++++---------- 1 files changed, 229 insertions(+), 56 deletions(-) diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c index 2b5f67c..66d86e0 100644 --- a/src/rpc/virnetclient.c +++ b/src/rpc/virnetclient.c @@ -55,6 +55,8 @@ struct _virNetClientCall { virNetMessagePtr msg; bool expectReply; + bool dontBlock; + bool sending; virCond cond; @@ -86,6 +88,9 @@ struct _virNetClient { int wakeupSendFD; int wakeupReadFD; + /* Unfinished call that needs to be finished before any of the calls in + * the queue can be processed */ + virNetClientCallPtr unfinishedCall; /* List of threads currently waiting for dispatch */ virNetClientCallPtr waitDispatch; @@ -94,6 +99,11 @@ struct _virNetClient { }; +static int virNetClientSendInternal(virNetClientPtr client, + virNetMessagePtr msg, + bool expectReply, + bool dontBlock); + static void virNetClientLock(virNetClientPtr client) { virMutexLock(&client->lock); @@ -739,6 +749,7 @@ virNetClientIOWriteMessage(virNetClientPtr client, { ssize_t ret; + thecall->sending = true; ret = virNetSocketWrite(client->sock, thecall->msg->buffer + thecall->msg->bufferOffset, thecall->msg->bufferLength - thecall->msg->bufferOffset); @@ -754,6 +765,7 @@ virNetClientIOWriteMessage(virNetClientPtr client, return -1; } thecall->msg->bufferOffset = thecall->msg->bufferLength = 0; + thecall->sending = false; if (thecall->expectReply) thecall->mode = VIR_NET_CLIENT_MODE_WAIT_RX; else @@ -768,26 +780,45 @@ static ssize_t virNetClientIOHandleOutput(virNetClientPtr client) { virNetClientCallPtr thecall = client->waitDispatch; + ssize_t ret = -1; while (thecall && thecall->mode != VIR_NET_CLIENT_MODE_WAIT_TX) thecall = thecall->next; + /* If there is an unfinished non-blocking call, process it first */ + if (client->unfinishedCall) { + client->unfinishedCall->next = thecall; + thecall = client->unfinishedCall; + } + if (!thecall) - return -1; /* Shouldn't happen, but you never know... */ + goto cleanup; /* Shouldn't happen, but you never know... */ while (thecall) { - ssize_t ret = virNetClientIOWriteMessage(client, thecall); + ret = virNetClientIOWriteMessage(client, thecall); if (ret < 0) - return ret; + goto cleanup; + + if (thecall->mode == VIR_NET_CLIENT_MODE_WAIT_TX) { + /* Blocking write, go back to event loop */ + ret = 0; + goto cleanup; + } - if (thecall->mode == VIR_NET_CLIENT_MODE_WAIT_TX) - return 0; /* Blocking write, to back to event loop */ + if (thecall == client->unfinishedCall) + VIR_DEBUG("Nonblocking call %p finished", thecall); thecall = thecall->next; } - return 0; /* No more calls to send, all done */ + ret = 0; /* No more calls to send, all done */ + +cleanup: + if (client->unfinishedCall) + client->unfinishedCall->next = NULL; + + return ret; } static ssize_t @@ -870,6 +901,89 @@ virNetClientIOHandleInput(virNetClientPtr client) } +static void +virNetClientDiscardNonBlocking(virNetClientPtr client, + virNetClientCallPtr thiscall, + bool error) +{ + virNetClientCallPtr call = client->waitDispatch; + virNetClientCallPtr prev = NULL; + + if (client->unfinishedCall) { + client->unfinishedCall->next = call; + call = client->unfinishedCall; + } + + while (call) { + virNetClientCallPtr next = call->next; + + if (!call->dontBlock) { + prev = call; + goto skip; + } + + /* We can't remove nonblocking call which was already partially sent + * to the remote party (unless there was an error in which case we + * won't be able to send anything anymore anyway); we store it in + * unfinishedCall and when someone needs to send something in the + * future, it will first send the rest of the unfinishedCall. + */ + if (!error && call->sending) { + VIR_DEBUG("Can't finish nonblocking call %p without blocking", + call); + if (call == client->unfinishedCall) + goto skip; + + client->unfinishedCall = call; + goto next; + } + + /* We should never free thiscall since it will be freed by the caller. + * We shouldn't remove thiscall from the queue either since that is + * handled elsewhere. + */ + if (call == thiscall) { + prev = call; + goto skip; + } + + /* Remove and free completed calls or calls that we didn't even get to + * without blocking or error. + */ + if (call->mode != VIR_NET_CLIENT_MODE_COMPLETE) { + const char *action; + if (call->sending > 0) + action = "finish"; + else + action = "send"; + + VIR_DEBUG("Can't %s nonblocking call %p without %s", + action, call, error ? "error" : "blocking"); + } + + if (call == client->unfinishedCall) { + client->unfinishedCall = NULL; + virNetMessageFree(call->msg); + VIR_FREE(call); + goto skip; + } + + virNetMessageFree(call->msg); + VIR_FREE(call); + +next: + if (prev) + prev->next = next; + else + client->waitDispatch = next; +skip: + call = next; + } + + if (client->unfinishedCall) + client->unfinishedCall->next = NULL; +} + /* * Process all calls pending dispatch/receive until we * get a reply to our own call. Then quit and pass the buck @@ -879,7 +993,9 @@ static int virNetClientIOEventLoop(virNetClientPtr client, virNetClientCallPtr thiscall) { struct pollfd fds[2]; - int ret; + int pollret; + bool error; + int ret = -1; fds[0].fd = virNetSocketGetFD(client->sock); fds[1].fd = client->wakeupReadFD; @@ -902,11 +1018,19 @@ static int virNetClientIOEventLoop(virNetClientPtr client, fds[1].events = fds[1].revents = 0; fds[1].events = POLLIN; + if (client->unfinishedCall) + fds[0].events = POLLOUT; + while (tmp) { if (tmp->mode == VIR_NET_CLIENT_MODE_WAIT_RX) fds[0].events |= POLLIN; if (tmp->mode == VIR_NET_CLIENT_MODE_WAIT_TX) fds[0].events |= POLLOUT; + /* We don't want to sleep in poll if any of the calls is + * non-blocking + */ + if (tmp->dontBlock) + timeout = 0; tmp = tmp->next; } @@ -938,8 +1062,8 @@ static int virNetClientIOEventLoop(virNetClientPtr client, ignore_value(pthread_sigmask(SIG_BLOCK, &blockedsigs, &oldmask)); repoll: - ret = poll(fds, ARRAY_CARDINALITY(fds), timeout); - if (ret < 0 && errno == EAGAIN) + pollret = poll(fds, ARRAY_CARDINALITY(fds), timeout); + if (pollret < 0 && errno == EAGAIN) goto repoll; ignore_value(pthread_sigmask(SIG_SETMASK, &oldmask, NULL)); @@ -961,7 +1085,7 @@ static int virNetClientIOEventLoop(virNetClientPtr client, } } - if (ret < 0) { + if (pollret < 0) { if (errno == EWOULDBLOCK) continue; virReportSystemError(errno, @@ -979,8 +1103,14 @@ static int virNetClientIOEventLoop(virNetClientPtr client, goto error; } - /* Iterate through waiting threads and if - * any are complete then tell 'em to wakeup + /* All calls in the queue have been sent or sending would block, remove + * nonblocking calls since we did all we could for them. + */ + error = !!(fds[0].revents & (POLLHUP | POLLERR)); + virNetClientDiscardNonBlocking(client, thiscall, error); + + /* Iterate through waiting calls and if any are complete, tell + * their threads to wake up. */ tmp = client->waitDispatch; prev = NULL; @@ -1006,41 +1136,48 @@ static int virNetClientIOEventLoop(virNetClientPtr client, tmp = tmp->next; } - /* Now see if *we* are done */ + /* Now see if *we* are done or deferred */ if (thiscall->mode == VIR_NET_CLIENT_MODE_COMPLETE) { - /* We're at head of the list already, so - * remove us - */ - client->waitDispatch = thiscall->next; - VIR_DEBUG("Giving up the buck %p %p", thiscall, client->waitDispatch); - /* See if someone else is still waiting - * and if so, then pass the buck ! */ - if (client->waitDispatch) { - VIR_DEBUG("Passing the buck to %p", client->waitDispatch); - virCondSignal(&client->waitDispatch->cond); - } - return 0; + ret = 0; + goto pass; } - if (fds[0].revents & (POLLHUP | POLLERR)) { virNetError(VIR_ERR_INTERNAL_ERROR, "%s", _("received hangup / error event on socket")); goto error; } - } + if (thiscall->dontBlock) { + if (thiscall == client->unfinishedCall) { + ret = 0; + } else { + virNetError(VIR_ERR_INTERNAL_ERROR, "%s", + _("Can't send nonblocking call without blocking")); + } + goto pass; + } + } error: - client->waitDispatch = thiscall->next; - VIR_DEBUG("Giving up the buck due to I/O error %p %p", thiscall, client->waitDispatch); + virNetClientDiscardNonBlocking(client, thiscall, true); + VIR_DEBUG("Giving up the buck due to I/O error"); + +pass: + if (thiscall != client->unfinishedCall) + client->waitDispatch = thiscall->next; + else if (ret != 0) + client->unfinishedCall = NULL; + + VIR_DEBUG("Giving up the buck call=%p unfinishedCall=%p waitDispatch=%p", + thiscall, client->unfinishedCall, client->waitDispatch); /* See if someone else is still waiting * and if so, then pass the buck ! */ if (client->waitDispatch) { VIR_DEBUG("Passing the buck to %p", client->waitDispatch); virCondSignal(&client->waitDispatch->cond); } - return -1; + return ret; } @@ -1082,39 +1219,43 @@ static int virNetClientIO(virNetClientPtr client, { int rv = -1; - VIR_DEBUG("Outgoing message prog=%u version=%u serial=%u proc=%d type=%d length=%zu dispatch=%p", + VIR_DEBUG("Outgoing message prog=%u version=%u serial=%u proc=%d type=%d" + " length=%zu dontBlock=%d dispatch=%p", thiscall->msg->header.prog, thiscall->msg->header.vers, thiscall->msg->header.serial, thiscall->msg->header.proc, thiscall->msg->header.type, thiscall->msg->bufferLength, + thiscall->dontBlock, client->waitDispatch); /* Check to see if another thread is dispatching */ if (client->waitDispatch) { - /* Stick ourselves on the end of the wait queue */ - virNetClientCallPtr tmp = client->waitDispatch; + virNetClientCallPtr tmp; char ignore = 1; - while (tmp && tmp->next) - tmp = tmp->next; - if (tmp) - tmp->next = thiscall; - else - client->waitDispatch = thiscall; /* Force other thread to wakeup from poll */ if (safewrite(client->wakeupSendFD, &ignore, sizeof(ignore)) != sizeof(ignore)) { - if (tmp) - tmp->next = NULL; - else - client->waitDispatch = NULL; virReportSystemError(errno, "%s", _("failed to wake up polling thread")); return -1; } - VIR_DEBUG("Going to sleep %p %p", client->waitDispatch, thiscall); + /* Stick ourselves on the end of the wait queue */ + tmp = client->waitDispatch; + while (tmp->next) + tmp = tmp->next; + tmp->next = thiscall; + + if (thiscall->dontBlock) { + VIR_DEBUG("Sending non-blocking call while another thread is" + " dispatching; it will send the call for us"); + return 0; + } + + VIR_DEBUG("Going to sleep call=%p unfinishedCall=%p waitDispatch=%p", + thiscall, client->unfinishedCall, client->waitDispatch); /* Go to sleep while other thread is working... */ if (virCondWait(&thiscall->cond, &client->lock) < 0) { if (client->waitDispatch == thiscall) { @@ -1133,7 +1274,7 @@ static int virNetClientIO(virNetClientPtr client, return -1; } - VIR_DEBUG("Wokeup from sleep %p %p", client->waitDispatch, thiscall); + VIR_DEBUG("Woken up from sleep %p %p", client->waitDispatch, thiscall); /* Two reasons we can be woken up * 1. Other thread has got our reply ready for us * 2. Other thread is all done, and it is our turn to @@ -1157,7 +1298,8 @@ static int virNetClientIO(virNetClientPtr client, client->waitDispatch = thiscall; } - VIR_DEBUG("We have the buck %p %p", client->waitDispatch, thiscall); + VIR_DEBUG("We have the buck call=%p unfinishedCall=%p waitDispatch=%p", + thiscall, client->unfinishedCall, client->waitDispatch); /* * The buck stops here! * @@ -1184,7 +1326,8 @@ static int virNetClientIO(virNetClientPtr client, rv = -1; cleanup: - VIR_DEBUG("All done with our call %p %p %d", client->waitDispatch, thiscall, rv); + VIR_DEBUG("All done with our call %p %d unfinishedCall=%p waitDispatch=%p", + thiscall, rv, client->unfinishedCall, client->waitDispatch); return rv; } @@ -1223,9 +1366,11 @@ done: } -int virNetClientSend(virNetClientPtr client, - virNetMessagePtr msg, - bool expectReply) +static int +virNetClientSendInternal(virNetClientPtr client, + virNetMessagePtr msg, + bool expectReply, + bool dontBlock) { virNetClientCallPtr call; int ret = -1; @@ -1238,7 +1383,7 @@ int virNetClientSend(virNetClientPtr client, if (expectReply && (msg->bufferLength != 0) && - (msg->header.status == VIR_NET_CONTINUE)) { + (msg->header.status == VIR_NET_CONTINUE || dontBlock)) { virNetError(VIR_ERR_INTERNAL_ERROR, "%s", _("Attempt to send an asynchronous message with a synchronous reply")); return -1; @@ -1251,10 +1396,15 @@ int virNetClientSend(virNetClientPtr client, virNetClientLock(client); - if (virCondInit(&call->cond) < 0) { - virNetError(VIR_ERR_INTERNAL_ERROR, "%s", - _("cannot initialize condition variable")); - goto cleanup; + /* We don't need call->cond for non-blocking calls since there's no + * thread to be woken up anyway + */ + if (!dontBlock) { + if (virCondInit(&call->cond) < 0) { + virNetError(VIR_ERR_INTERNAL_ERROR, "%s", + _("cannot initialize condition variable")); + goto cleanup; + } } if (msg->bufferLength) @@ -1263,12 +1413,35 @@ int virNetClientSend(virNetClientPtr client, call->mode = VIR_NET_CLIENT_MODE_WAIT_RX; call->msg = msg; call->expectReply = expectReply; + call->dontBlock = dontBlock; ret = virNetClientIO(client, call); cleanup: ignore_value(virCondDestroy(&call->cond)); - VIR_FREE(call); + if (ret != 0) { + VIR_FREE(call); + } else if (dontBlock) { + /* Only free the call if it was completed since otherwise it was just + * queued up and will be processed later. + */ + if (call->mode == VIR_NET_CLIENT_MODE_COMPLETE) { + /* We need to free the message as well since no-one is waiting for + * it. + */ + virNetMessageFree(msg); + VIR_FREE(call); + } + } else { + VIR_FREE(call); + } virNetClientUnlock(client); return ret; } + +int virNetClientSend(virNetClientPtr client, + virNetMessagePtr msg, + bool expectReply) +{ + return virNetClientSendInternal(client, msg, expectReply, false); +} -- 1.7.7.2

On Thu, Nov 03, 2011 at 11:03:13AM +0100, Jiri Denemark wrote:
When a client wants to send a keepalive message it needs to do so in a non-blocking way to avoid blocking its event loop. This patch adds dontBlock flag which says that the call should be processed without blocking. Such calls do not have a thread waiting for the result associated with them. This means, that sending such call fails if no thread is dispatching and writing to the socket would block. In case there is a thread waiting for its (normal) call to finish, sending non-blocking call just pushes it into the queue and lets the dispatching thread send it. The thread which has the buck tries to send all non-blocking calls in the queue in a best effort way---if sending them would block or there's an error on the socket, non-blocking calls are simply removed from the queue and discarded. In case a non-blocking call is partially sent but sending the rest of it would block, it is moved into client's unfinishedCall and left for future delivery. Every sending attempt first sends the rest of unfinishedCall and than continues with other queued calls. --- Notes: Version 5: - partially sent non-blocking calls now work even for SASL (or other transports that cache data internally) - fixed several other bugs in that area
Version 4: - correctly handle partially sent non-blocking calls that would block
Version 3: - no changes
Version 2: - no changes
src/rpc/virnetclient.c | 285 ++++++++++++++++++++++++++++++++++++++---------- 1 files changed, 229 insertions(+), 56 deletions(-)
I started reviewing this patch, but I'm finding the changes somewhat confusing. I think there might be a different way to go about it, so I'm hacking up a proof of concept counter-proposal for non-blocking I/O. We sort of needed this already for the streams code, but we rather ignored it because it was rarely important in practice. Regards, Daniel -- |: http://berrange.com -o- http://www.flickr.com/photos/dberrange/ :| |: http://libvirt.org -o- http://virt-manager.org :| |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :| |: http://entangle-photo.org -o- http://live.gnome.org/gtk-vnc :|

--- Notes: ACKed Version 4: - no changes Version 3: - no changes Version 2: - no changes src/rpc/virnetclient.c | 76 ++++++++++++++++++++++++++++++++++++++++++------ 1 files changed, 67 insertions(+), 9 deletions(-) diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c index 58ba66d..aaf072a 100644 --- a/src/rpc/virnetclient.c +++ b/src/rpc/virnetclient.c @@ -95,9 +95,13 @@ struct _virNetClient { size_t nstreams; virNetClientStreamPtr *streams; + + bool wantClose; }; +void virNetClientRequestClose(virNetClientPtr client); + static int virNetClientSendInternal(virNetClientPtr client, virNetMessagePtr msg, bool expectReply, @@ -307,12 +311,14 @@ void virNetClientFree(virNetClientPtr client) } -void virNetClientClose(virNetClientPtr client) +static void +virNetClientCloseLocked(virNetClientPtr client) { - if (!client) + VIR_DEBUG("client=%p, sock=%p", client, client->sock); + + if (!client->sock) return; - virNetClientLock(client); virNetSocketRemoveIOCallback(client->sock); virNetSocketFree(client->sock); client->sock = NULL; @@ -322,6 +328,41 @@ void virNetClientClose(virNetClientPtr client) virNetSASLSessionFree(client->sasl); client->sasl = NULL; #endif + client->wantClose = false; +} + +void virNetClientClose(virNetClientPtr client) +{ + if (!client) + return; + + virNetClientLock(client); + virNetClientCloseLocked(client); + virNetClientUnlock(client); +} + +void +virNetClientRequestClose(virNetClientPtr client) +{ + VIR_DEBUG("client=%p", client); + + virNetClientLock(client); + + /* If there is a thread polling for data on the socket, set wantClose flag + * and wake the thread up or just immediately close the socket when no-one + * is polling on it. + */ + if (client->waitDispatch) { + char ignore = 1; + int len = sizeof(ignore); + + client->wantClose = true; + if (safewrite(client->wakeupSendFD, &ignore, len) != len) + VIR_ERROR(_("failed to wake up polling thread")); + } else { + virNetClientCloseLocked(client); + } + virNetClientUnlock(client); } @@ -978,11 +1019,12 @@ static int virNetClientIOEventLoop(virNetClientPtr client, sigset_t oldmask, blockedsigs; int timeout = -1; - /* If we have existing SASL decoded data we - * don't want to sleep in the poll(), just - * check if any other FDs are also ready + /* If we have existing SASL decoded data we don't want to sleep in + * the poll(), just check if any other FDs are also ready. + * If the connection is going to be closed, we don't want to sleep in + * poll() either. */ - if (virNetSocketHasCachedData(client->sock)) + if (virNetSocketHasCachedData(client->sock) || client->wantClose) timeout = 0; fds[0].events = fds[0].revents = 0; @@ -1047,6 +1089,11 @@ static int virNetClientIOEventLoop(virNetClientPtr client, if (virNetSocketHasCachedData(client->sock)) fds[0].revents |= POLLIN; + /* If wantClose flag is set, pretend there was an error on the socket + */ + if (client->wantClose) + fds[0].revents = POLLERR; + if (fds[1].revents) { VIR_DEBUG("Woken up from poll by other thread"); if (saferead(client->wakeupReadFD, &ignore, sizeof(ignore)) != sizeof(ignore)) { @@ -1140,6 +1187,8 @@ pass: if (client->waitDispatch) { VIR_DEBUG("Passing the buck to %p", client->waitDispatch); virCondSignal(&client->waitDispatch->cond); + } else if (client->wantClose) { + virNetClientCloseLocked(client); } return ret; } @@ -1281,7 +1330,8 @@ static int virNetClientIO(virNetClientPtr client, virResetLastError(); rv = virNetClientIOEventLoop(client, thiscall); - virNetSocketUpdateIOCallback(client->sock, VIR_EVENT_HANDLE_READABLE); + if (client->sock) + virNetSocketUpdateIOCallback(client->sock, VIR_EVENT_HANDLE_READABLE); if (rv == 0 && virGetLastError()) @@ -1305,7 +1355,7 @@ void virNetClientIncomingEvent(virNetSocketPtr sock, goto done; /* This should be impossible, but it doesn't hurt to check */ - if (client->waitDispatch) + if (client->waitDispatch || client->wantClose) goto done; VIR_DEBUG("Event fired %p %d", sock, events); @@ -1357,6 +1407,12 @@ virNetClientSendInternal(virNetClientPtr client, virNetClientLock(client); + if (!client->sock || client->wantClose) { + virNetError(VIR_ERR_INTERNAL_ERROR, "%s", + _("Client socket is closed")); + goto unlock; + } + /* We don't need call->cond for non-blocking calls since there's no * thread to be woken up anyway */ @@ -1396,6 +1452,8 @@ cleanup: } else { VIR_FREE(call); } + +unlock: virNetClientUnlock(client); return ret; } -- 1.7.7.1

On Thu, Oct 27, 2011 at 06:05:43PM +0200, Jiri Denemark wrote:
--- Notes: ACKed
Version 4: - no changes
Version 3: - no changes
Version 2: - no changes
src/rpc/virnetclient.c | 76 ++++++++++++++++++++++++++++++++++++++++++------ 1 files changed, 67 insertions(+), 9 deletions(-)
ACK Daniel -- |: http://berrange.com -o- http://www.flickr.com/photos/dberrange/ :| |: http://libvirt.org -o- http://virt-manager.org :| |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :| |: http://entangle-photo.org -o- http://live.gnome.org/gtk-vnc :|

--- Notes: Version 5: - rebased on top of DanB's non-blocking patches; this is the only part that required non-trivial rebase so I'm posting it for additional review Version 4: - no changes Version 3: - no changes Version 2: - no changes src/rpc/virnetclient.c | 99 +++++++++++++++++++++++++++++++++++++++++++---- 1 files changed, 90 insertions(+), 9 deletions(-) diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c index 025d270..b4b2fe7 100644 --- a/src/rpc/virnetclient.c +++ b/src/rpc/virnetclient.c @@ -101,9 +101,13 @@ struct _virNetClient { size_t nstreams; virNetClientStreamPtr *streams; + + bool wantClose; }; +void virNetClientRequestClose(virNetClientPtr client); + static void virNetClientLock(virNetClientPtr client) { virMutexLock(&client->lock); @@ -409,12 +413,14 @@ void virNetClientFree(virNetClientPtr client) } -void virNetClientClose(virNetClientPtr client) +static void +virNetClientCloseLocked(virNetClientPtr client) { - if (!client) + VIR_DEBUG("client=%p, sock=%p", client, client->sock); + + if (!client->sock) return; - virNetClientLock(client); virNetSocketRemoveIOCallback(client->sock); virNetSocketFree(client->sock); client->sock = NULL; @@ -424,6 +430,41 @@ void virNetClientClose(virNetClientPtr client) virNetSASLSessionFree(client->sasl); client->sasl = NULL; #endif + client->wantClose = false; +} + +void virNetClientClose(virNetClientPtr client) +{ + if (!client) + return; + + virNetClientLock(client); + virNetClientCloseLocked(client); + virNetClientUnlock(client); +} + +void +virNetClientRequestClose(virNetClientPtr client) +{ + VIR_DEBUG("client=%p", client); + + virNetClientLock(client); + + /* If there is a thread polling for data on the socket, set wantClose flag + * and wake the thread up or just immediately close the socket when no-one + * is polling on it. + */ + if (client->waitDispatch) { + char ignore = 1; + int len = sizeof(ignore); + + client->wantClose = true; + if (safewrite(client->wakeupSendFD, &ignore, len) != len) + VIR_ERROR(_("failed to wake up polling thread")); + } else { + virNetClientCloseLocked(client); + } + virNetClientUnlock(client); } @@ -1096,6 +1137,26 @@ static bool virNetClientIOEventLoopRemoveNonBlocking(virNetClientCallPtr call, } +static void +virNetClientIOEventLoopRemoveAll(virNetClientPtr client, + virNetClientCallPtr thiscall) +{ + if (!client->waitDispatch) + return; + + if (client->waitDispatch == thiscall) { + /* just pretend nothing was sent and the caller will free the call */ + thiscall->sentSomeData = false; + } else { + virNetClientCallPtr call = client->waitDispatch; + virNetClientCallRemove(&client->waitDispatch, call); + ignore_value(virCondDestroy(&call->cond)); + VIR_FREE(call->msg); + VIR_FREE(call); + } +} + + static void virNetClientIOEventLoopPassTheBuck(virNetClientPtr client, virNetClientCallPtr thiscall) { VIR_DEBUG("Giving up the buck %p", thiscall); @@ -1110,7 +1171,12 @@ static void virNetClientIOEventLoopPassTheBuck(virNetClientPtr client, virNetCli } tmp = tmp->next; } + VIR_DEBUG("No thread to pass the buck to"); + if (client->wantClose) { + virNetClientCloseLocked(client); + virNetClientIOEventLoopRemoveAll(client, thiscall); + } } @@ -1141,11 +1207,12 @@ static int virNetClientIOEventLoop(virNetClientPtr client, sigset_t oldmask, blockedsigs; int timeout = -1; - /* If we have existing SASL decoded data we - * don't want to sleep in the poll(), just - * check if any other FDs are also ready + /* If we have existing SASL decoded data we don't want to sleep in + * the poll(), just check if any other FDs are also ready. + * If the connection is going to be closed, we don't want to sleep in + * poll() either. */ - if (virNetSocketHasCachedData(client->sock)) + if (virNetSocketHasCachedData(client->sock) || client->wantClose) timeout = 0; /* If there are any non-blocking calls in the queue, @@ -1208,6 +1275,11 @@ static int virNetClientIOEventLoop(virNetClientPtr client, fds[0].revents |= POLLIN; } + /* If wantClose flag is set, pretend there was an error on the socket + */ + if (client->wantClose) + fds[0].revents = POLLERR; + if (fds[1].revents) { VIR_DEBUG("Woken up from poll by other thread"); if (saferead(client->wakeupReadFD, &ignore, sizeof(ignore)) != sizeof(ignore)) { @@ -1441,7 +1513,8 @@ static int virNetClientIO(virNetClientPtr client, virResetLastError(); rv = virNetClientIOEventLoop(client, thiscall); - virNetClientIOUpdateCallback(client, true); + if (client->sock) + virNetClientIOUpdateCallback(client, true); if (rv == 0 && virGetLastError()) @@ -1467,7 +1540,7 @@ void virNetClientIncomingEvent(virNetSocketPtr sock, goto done; /* This should be impossible, but it doesn't hurt to check */ - if (client->haveTheBuck) + if (client->haveTheBuck || client->wantClose) goto done; VIR_DEBUG("Event fired %p %d", sock, events); @@ -1528,6 +1601,12 @@ static int virNetClientSendInternal(virNetClientPtr client, virNetClientLock(client); + if (!client->sock || client->wantClose) { + virNetError(VIR_ERR_INTERNAL_ERROR, "%s", + _("client socket is closed")); + goto unlock; + } + if (virCondInit(&call->cond) < 0) { virNetError(VIR_ERR_INTERNAL_ERROR, "%s", _("cannot initialize condition variable")); @@ -1554,6 +1633,8 @@ cleanup: ignore_value(virCondDestroy(&call->cond)); VIR_FREE(call); } + +unlock: virNetClientUnlock(client); return ret; } -- 1.7.8.rc3

On Tue, Nov 22, 2011 at 04:45:27PM +0100, Jiri Denemark wrote:
--- Notes: Version 5: - rebased on top of DanB's non-blocking patches; this is the only part that required non-trivial rebase so I'm posting it for additional review
Version 4: - no changes
Version 3: - no changes
Version 2: - no changes
src/rpc/virnetclient.c | 99 +++++++++++++++++++++++++++++++++++++++++++---- 1 files changed, 90 insertions(+), 9 deletions(-)
diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c index 025d270..b4b2fe7 100644 --- a/src/rpc/virnetclient.c +++ b/src/rpc/virnetclient.c @@ -101,9 +101,13 @@ struct _virNetClient {
size_t nstreams; virNetClientStreamPtr *streams; + + bool wantClose; };
+void virNetClientRequestClose(virNetClientPtr client); + static void virNetClientLock(virNetClientPtr client) { virMutexLock(&client->lock); @@ -409,12 +413,14 @@ void virNetClientFree(virNetClientPtr client) }
-void virNetClientClose(virNetClientPtr client) +static void +virNetClientCloseLocked(virNetClientPtr client) { - if (!client) + VIR_DEBUG("client=%p, sock=%p", client, client->sock); + + if (!client->sock) return;
- virNetClientLock(client); virNetSocketRemoveIOCallback(client->sock); virNetSocketFree(client->sock); client->sock = NULL; @@ -424,6 +430,41 @@ void virNetClientClose(virNetClientPtr client) virNetSASLSessionFree(client->sasl); client->sasl = NULL; #endif + client->wantClose = false; +} + +void virNetClientClose(virNetClientPtr client) +{ + if (!client) + return; + + virNetClientLock(client); + virNetClientCloseLocked(client); + virNetClientUnlock(client); +} + +void +virNetClientRequestClose(virNetClientPtr client) +{ + VIR_DEBUG("client=%p", client); + + virNetClientLock(client); + + /* If there is a thread polling for data on the socket, set wantClose flag + * and wake the thread up or just immediately close the socket when no-one + * is polling on it. + */ + if (client->waitDispatch) { + char ignore = 1; + int len = sizeof(ignore);
Minor nitpick s/int/size_t/ ACK Daniel -- |: http://berrange.com -o- http://www.flickr.com/photos/dberrange/ :| |: http://libvirt.org -o- http://virt-manager.org :| |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :| |: http://entangle-photo.org -o- http://live.gnome.org/gtk-vnc :|

--- Notes: ACKed Version 4: - no changes Version 3: - remoteStartKeepAlive renamed as remoteSetKeepAlive - clients that implement event loop are required to run it, thus keepalive is enabled if event loop implementation is found without the need to call remoteAllowKeepAlive (which was dropped) - keepalive support is advertised to a server implicitly by asking for keepalive support between authentication and virConnectOpen Version 2: - no changes src/remote/remote_driver.c | 52 +++++++++++++++++++++++++++ src/rpc/virnetclient.c | 83 +++++++++++++++++++++++++++++++++++++++++-- src/rpc/virnetclient.h | 5 +++ 3 files changed, 136 insertions(+), 4 deletions(-) diff --git a/src/remote/remote_driver.c b/src/remote/remote_driver.c index e98ebd7..f99c32d 100644 --- a/src/remote/remote_driver.c +++ b/src/remote/remote_driver.c @@ -68,6 +68,7 @@ #endif static int inside_daemon = 0; +static virDriverPtr remoteDriver = NULL; struct private_data { virMutex lock; @@ -84,6 +85,7 @@ struct private_data { char *type; /* Cached return from remoteType. */ int localUses; /* Ref count for private data */ char *hostname; /* Original hostname */ + bool serverKeepAlive; /* Does server support keepalive protocol? */ virDomainEventStatePtr domainEventState; }; @@ -663,6 +665,26 @@ doRemoteOpen (virConnectPtr conn, if (remoteAuthenticate(conn, priv, auth, authtype) == -1) goto failed; + if (virNetClientKeepAliveIsSupported(priv->client)) { + remote_supports_feature_args args = + { VIR_DRV_FEATURE_PROGRAM_KEEPALIVE }; + remote_supports_feature_ret ret = { 0 }; + int rc; + + rc = call(conn, priv, 0, REMOTE_PROC_SUPPORTS_FEATURE, + (xdrproc_t)xdr_remote_supports_feature_args, (char *) &args, + (xdrproc_t)xdr_remote_supports_feature_ret, (char *) &ret); + if (rc == -1) + goto failed; + + if (ret.supported) { + priv->serverKeepAlive = true; + } else { + VIR_WARN("Disabling keepalive protocol since it is not supported" + " by the server"); + } + } + /* Finally we can call the remote side's open function. */ { remote_open_args args = { &name, flags }; @@ -4122,6 +4144,33 @@ done: } +static int +remoteSetKeepAlive(virConnectPtr conn, int interval, unsigned int count) +{ + struct private_data *priv = conn->privateData; + int ret = -1; + + remoteDriverLock(priv); + if (!virNetClientKeepAliveIsSupported(priv->client)) { + remoteError(VIR_ERR_INTERNAL_ERROR, "%s", + _("the caller doesn't support keepalive protocol;" + " perhaps it's missing event loop implementation")); + goto cleanup; + } + + if (!priv->serverKeepAlive) { + ret = 1; + goto cleanup; + } + + ret = virNetClientKeepAliveStart(priv->client, interval, count); + +cleanup: + remoteDriverUnlock(priv); + return ret; +} + + #include "remote_client_bodies.h" #include "qemu_client_bodies.h" @@ -4474,6 +4523,7 @@ static virDriver remote_driver = { .domainGetBlockJobInfo = remoteDomainGetBlockJobInfo, /* 0.9.4 */ .domainBlockJobSetSpeed = remoteDomainBlockJobSetSpeed, /* 0.9.4 */ .domainBlockPull = remoteDomainBlockPull, /* 0.9.4 */ + .setKeepAlive = remoteSetKeepAlive, /* 0.9.7 */ }; static virNetworkDriver network_driver = { @@ -4624,6 +4674,8 @@ static virStateDriver state_driver = { int remoteRegister (void) { + remoteDriver = &remote_driver; + if (virRegisterDriver (&remote_driver) == -1) return -1; if (virRegisterNetworkDriver (&network_driver) == -1) return -1; if (virRegisterInterfaceDriver (&interface_driver) == -1) return -1; diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c index aaf072a..37e9cc8 100644 --- a/src/rpc/virnetclient.c +++ b/src/rpc/virnetclient.c @@ -29,6 +29,7 @@ #include "virnetclient.h" #include "virnetsocket.h" +#include "virkeepalive.h" #include "memory.h" #include "threads.h" #include "virfile.h" @@ -96,11 +97,12 @@ struct _virNetClient { size_t nstreams; virNetClientStreamPtr *streams; + virKeepAlivePtr keepalive; bool wantClose; }; -void virNetClientRequestClose(virNetClientPtr client); +static void virNetClientRequestClose(virNetClientPtr client); static int virNetClientSendInternal(virNetClientPtr client, virNetMessagePtr msg, @@ -130,11 +132,51 @@ static void virNetClientEventFree(void *opaque) virNetClientFree(client); } +bool +virNetClientKeepAliveIsSupported(virNetClientPtr client) +{ + bool supported; + + virNetClientLock(client); + supported = !!client->keepalive; + virNetClientUnlock(client); + + return supported; +} + +int +virNetClientKeepAliveStart(virNetClientPtr client, + int interval, + unsigned int count) +{ + int ret; + + virNetClientLock(client); + ret = virKeepAliveStart(client->keepalive, interval, count); + virNetClientUnlock(client); + + return ret; +} + +static void +virNetClientKeepAliveDeadCB(void *opaque) +{ + virNetClientRequestClose(opaque); +} + +static int +virNetClientKeepAliveSendCB(void *opaque, + virNetMessagePtr msg) +{ + return virNetClientSendInternal(opaque, msg, false, true); +} + static virNetClientPtr virNetClientNew(virNetSocketPtr sock, const char *hostname) { virNetClientPtr client = NULL; int wakeupFD[2] = { -1, -1 }; + virKeepAlivePtr ka = NULL; if (pipe2(wakeupFD, O_CLOEXEC) < 0) { virReportSystemError(errno, "%s", @@ -167,13 +209,24 @@ static virNetClientPtr virNetClientNew(virNetSocketPtr sock, client, virNetClientEventFree) < 0) { client->refs--; - VIR_DEBUG("Failed to add event watch, disabling events"); + VIR_DEBUG("Failed to add event watch, disabling events and support for" + " keepalive messages"); + } else { + /* Keepalive protocol consists of async messages so it can only be used + * if the client supports them */ + if (!(ka = virKeepAliveNew(-1, 0, client, + virNetClientKeepAliveSendCB, + virNetClientKeepAliveDeadCB, + virNetClientEventFree))) + goto error; + /* keepalive object has a reference to client */ + client->refs++; } + client->keepalive = ka; PROBE(RPC_CLIENT_NEW, "client=%p refs=%d sock=%p", client, client->refs, client->sock); - return client; no_memory: @@ -181,6 +234,10 @@ no_memory: error: VIR_FORCE_CLOSE(wakeupFD[0]); VIR_FORCE_CLOSE(wakeupFD[1]); + if (ka) { + virKeepAliveStop(ka); + virKeepAliveFree(ka); + } virNetClientFree(client); return NULL; } @@ -314,6 +371,8 @@ void virNetClientFree(virNetClientPtr client) static void virNetClientCloseLocked(virNetClientPtr client) { + virKeepAlivePtr ka; + VIR_DEBUG("client=%p, sock=%p", client, client->sock); if (!client->sock) @@ -328,7 +387,20 @@ virNetClientCloseLocked(virNetClientPtr client) virNetSASLSessionFree(client->sasl); client->sasl = NULL; #endif + ka = client->keepalive; + client->keepalive = NULL; client->wantClose = false; + + if (ka) { + client->refs++; + virNetClientUnlock(client); + + virKeepAliveStop(ka); + virKeepAliveFree(ka); + + virNetClientLock(client); + client->refs--; + } } void virNetClientClose(virNetClientPtr client) @@ -341,7 +413,7 @@ void virNetClientClose(virNetClientPtr client) virNetClientUnlock(client); } -void +static void virNetClientRequestClose(virNetClientPtr client) { VIR_DEBUG("client=%p", client); @@ -743,6 +815,9 @@ virNetClientCallDispatch(virNetClientPtr client) client->msg.header.prog, client->msg.header.vers, client->msg.header.proc, client->msg.header.type, client->msg.header.status, client->msg.header.serial); + if (virKeepAliveCheckMessage(client->keepalive, &client->msg)) + return 0; + switch (client->msg.header.type) { case VIR_NET_REPLY: /* Normal RPC replies */ return virNetClientCallDispatchReply(client); diff --git a/src/rpc/virnetclient.h b/src/rpc/virnetclient.h index 1fabcfd..3227a4e 100644 --- a/src/rpc/virnetclient.h +++ b/src/rpc/virnetclient.h @@ -87,4 +87,9 @@ int virNetClientGetTLSKeySize(virNetClientPtr client); void virNetClientFree(virNetClientPtr client); void virNetClientClose(virNetClientPtr client); +bool virNetClientKeepAliveIsSupported(virNetClientPtr client); +int virNetClientKeepAliveStart(virNetClientPtr client, + int interval, + unsigned int count); + #endif /* __VIR_NET_CLIENT_H__ */ -- 1.7.7.1

On Thu, Oct 27, 2011 at 06:05:44PM +0200, Jiri Denemark wrote:
--- Notes: ACKed
Version 4: - no changes
Version 3: - remoteStartKeepAlive renamed as remoteSetKeepAlive - clients that implement event loop are required to run it, thus keepalive is enabled if event loop implementation is found without the need to call remoteAllowKeepAlive (which was dropped) - keepalive support is advertised to a server implicitly by asking for keepalive support between authentication and virConnectOpen
Version 2: - no changes
src/remote/remote_driver.c | 52 +++++++++++++++++++++++++++ src/rpc/virnetclient.c | 83 +++++++++++++++++++++++++++++++++++++++++-- src/rpc/virnetclient.h | 5 +++ 3 files changed, 136 insertions(+), 4 deletions(-)
ACK
@@ -663,6 +665,26 @@ doRemoteOpen (virConnectPtr conn, if (remoteAuthenticate(conn, priv, auth, authtype) == -1) goto failed;
+ if (virNetClientKeepAliveIsSupported(priv->client)) { + remote_supports_feature_args args = + { VIR_DRV_FEATURE_PROGRAM_KEEPALIVE }; + remote_supports_feature_ret ret = { 0 }; + int rc; + + rc = call(conn, priv, 0, REMOTE_PROC_SUPPORTS_FEATURE, + (xdrproc_t)xdr_remote_supports_feature_args, (char *) &args, + (xdrproc_t)xdr_remote_supports_feature_ret, (char *) &ret); + if (rc == -1) + goto failed; + + if (ret.supported) { + priv->serverKeepAlive = true; + } else { + VIR_WARN("Disabling keepalive protocol since it is not supported" + " by the server");
Hmm, won't this cause new clients to always issue a warning when talking to old servers ? Can probably be dropped to VIR_INFO ACK Daniel -- |: http://berrange.com -o- http://www.flickr.com/photos/dberrange/ :| |: http://libvirt.org -o- http://virt-manager.org :| |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :| |: http://entangle-photo.org -o- http://live.gnome.org/gtk-vnc :|

This API can be used to check if the socket associated with virConnectPtr is still open or it was closed (probably because keepalive protocol timed out). If there the connection is local (i.e., no socket is associated with the connection, it is trivially always alive. --- Notes: ACKed Version 4: - no changes Version 3: - no changes Version 2: - new patch include/libvirt/libvirt.h.in | 1 + src/driver.h | 3 +++ src/libvirt.c | 36 ++++++++++++++++++++++++++++++++++++ src/libvirt_public.syms | 1 + 4 files changed, 41 insertions(+), 0 deletions(-) diff --git a/include/libvirt/libvirt.h.in b/include/libvirt/libvirt.h.in index 8ce4335..50e5b89 100644 --- a/include/libvirt/libvirt.h.in +++ b/include/libvirt/libvirt.h.in @@ -2553,6 +2553,7 @@ int virInterfaceIsActive(virInterfacePtr iface); int virConnectIsEncrypted(virConnectPtr conn); int virConnectIsSecure(virConnectPtr conn); +int virConnectIsAlive(virConnectPtr conn); /* * CPU specification API diff --git a/src/driver.h b/src/driver.h index c1223c1..a23f05b 100644 --- a/src/driver.h +++ b/src/driver.h @@ -509,6 +509,8 @@ typedef int typedef int (*virDrvConnectIsSecure)(virConnectPtr conn); typedef int + (*virDrvConnectIsAlive)(virConnectPtr conn); +typedef int (*virDrvDomainIsActive)(virDomainPtr dom); typedef int (*virDrvDomainIsPersistent)(virDomainPtr dom); @@ -898,6 +900,7 @@ struct _virDriver { virDrvDomainBlockJobSetSpeed domainBlockJobSetSpeed; virDrvDomainBlockPull domainBlockPull; virDrvSetKeepAlive setKeepAlive; + virDrvConnectIsAlive isAlive; }; typedef int diff --git a/src/libvirt.c b/src/libvirt.c index 0c33da1..580b29a 100644 --- a/src/libvirt.c +++ b/src/libvirt.c @@ -17021,3 +17021,39 @@ error: virDispatchError(conn); return -1; } + +/** + * virConnectIsAlive: + * @conn: pointer to the connection object + * + * Determine if the connection to the hypervisor is still alive + * + * A connection will be classed as alive if it is either local, or running + * over a channel (TCP or UNIX socket) which is not closed. + * + * Returns 1 if alive, 0 if dead, -1 on error + */ +int virConnectIsAlive(virConnectPtr conn) +{ + VIR_DEBUG("conn=%p", conn); + + virResetLastError(); + + if (!VIR_IS_CONNECT(conn)) { + virLibConnError(VIR_ERR_INVALID_CONN, __FUNCTION__); + virDispatchError(NULL); + return -1; + } + if (conn->driver->isAlive) { + int ret; + ret = conn->driver->isAlive(conn); + if (ret < 0) + goto error; + return ret; + } + + virLibConnError(VIR_ERR_NO_SUPPORT, __FUNCTION__); +error: + virDispatchError(conn); + return -1; +} diff --git a/src/libvirt_public.syms b/src/libvirt_public.syms index 468e28a..f1de89d 100644 --- a/src/libvirt_public.syms +++ b/src/libvirt_public.syms @@ -491,6 +491,7 @@ LIBVIRT_0.9.5 { LIBVIRT_0.9.7 { global: + virConnectIsAlive; virConnectSetKeepAlive; virDomainReset; virDomainSnapshotGetParent; -- 1.7.7.1

On Thu, Oct 27, 2011 at 06:05:45PM +0200, Jiri Denemark wrote:
This API can be used to check if the socket associated with virConnectPtr is still open or it was closed (probably because keepalive protocol timed out). If there the connection is local (i.e., no socket is associated with the connection, it is trivially always alive. --- Notes: ACKed
Version 4: - no changes
Version 3: - no changes
Version 2: - new patch
include/libvirt/libvirt.h.in | 1 + src/driver.h | 3 +++ src/libvirt.c | 36 ++++++++++++++++++++++++++++++++++++ src/libvirt_public.syms | 1 + 4 files changed, 41 insertions(+), 0 deletions(-)
diff --git a/include/libvirt/libvirt.h.in b/include/libvirt/libvirt.h.in index 8ce4335..50e5b89 100644 --- a/include/libvirt/libvirt.h.in +++ b/include/libvirt/libvirt.h.in @@ -2553,6 +2553,7 @@ int virInterfaceIsActive(virInterfacePtr iface);
int virConnectIsEncrypted(virConnectPtr conn); int virConnectIsSecure(virConnectPtr conn); +int virConnectIsAlive(virConnectPtr conn);
/* * CPU specification API diff --git a/src/driver.h b/src/driver.h index c1223c1..a23f05b 100644 --- a/src/driver.h +++ b/src/driver.h @@ -509,6 +509,8 @@ typedef int typedef int (*virDrvConnectIsSecure)(virConnectPtr conn); typedef int + (*virDrvConnectIsAlive)(virConnectPtr conn); +typedef int (*virDrvDomainIsActive)(virDomainPtr dom); typedef int (*virDrvDomainIsPersistent)(virDomainPtr dom); @@ -898,6 +900,7 @@ struct _virDriver { virDrvDomainBlockJobSetSpeed domainBlockJobSetSpeed; virDrvDomainBlockPull domainBlockPull; virDrvSetKeepAlive setKeepAlive; + virDrvConnectIsAlive isAlive; };
typedef int diff --git a/src/libvirt.c b/src/libvirt.c index 0c33da1..580b29a 100644 --- a/src/libvirt.c +++ b/src/libvirt.c @@ -17021,3 +17021,39 @@ error: virDispatchError(conn); return -1; } + +/** + * virConnectIsAlive: + * @conn: pointer to the connection object + * + * Determine if the connection to the hypervisor is still alive + * + * A connection will be classed as alive if it is either local, or running + * over a channel (TCP or UNIX socket) which is not closed. + * + * Returns 1 if alive, 0 if dead, -1 on error + */ +int virConnectIsAlive(virConnectPtr conn) +{ + VIR_DEBUG("conn=%p", conn); + + virResetLastError(); + + if (!VIR_IS_CONNECT(conn)) { + virLibConnError(VIR_ERR_INVALID_CONN, __FUNCTION__); + virDispatchError(NULL); + return -1; + } + if (conn->driver->isAlive) { + int ret; + ret = conn->driver->isAlive(conn); + if (ret < 0) + goto error; + return ret; + } + + virLibConnError(VIR_ERR_NO_SUPPORT, __FUNCTION__); +error: + virDispatchError(conn); + return -1; +} diff --git a/src/libvirt_public.syms b/src/libvirt_public.syms index 468e28a..f1de89d 100644 --- a/src/libvirt_public.syms +++ b/src/libvirt_public.syms @@ -491,6 +491,7 @@ LIBVIRT_0.9.5 {
LIBVIRT_0.9.7 { global: + virConnectIsAlive; virConnectSetKeepAlive; virDomainReset; virDomainSnapshotGetParent;
ACK, with update of syms file to 0.9.8 of course Daniel -- |: http://berrange.com -o- http://www.flickr.com/photos/dberrange/ :| |: http://libvirt.org -o- http://virt-manager.org :| |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :| |: http://entangle-photo.org -o- http://live.gnome.org/gtk-vnc :|

--- Notes: ACKed except for esx driver Version 4: - fix esx implementation Version 3: - no changes Version 2: - new patch src/esx/esx_driver.c | 18 ++++++++++++++++++ src/hyperv/hyperv_driver.c | 18 ++++++++++++++++++ src/libxl/libxl_driver.c | 8 ++++++++ src/lxc/lxc_driver.c | 7 +++++++ src/openvz/openvz_driver.c | 7 +++++++ src/phyp/phyp_driver.c | 18 ++++++++++++++++++ src/qemu/qemu_driver.c | 6 ++++++ src/remote/remote_driver.c | 18 ++++++++++++++++++ src/rpc/virnetclient.c | 14 ++++++++++++++ src/rpc/virnetclient.h | 1 + src/test/test_driver.c | 6 ++++++ src/uml/uml_driver.c | 7 +++++++ src/vbox/vbox_tmpl.c | 6 ++++++ src/vmware/vmware_driver.c | 7 +++++++ src/xen/xen_driver.c | 8 ++++++++ src/xenapi/xenapi_driver.c | 12 ++++++++++++ 16 files changed, 161 insertions(+), 0 deletions(-) diff --git a/src/esx/esx_driver.c b/src/esx/esx_driver.c index 41086ef..14ab5f5 100644 --- a/src/esx/esx_driver.c +++ b/src/esx/esx_driver.c @@ -4166,6 +4166,23 @@ esxIsSecure(virConnectPtr conn) static int +esxIsAlive(virConnectPtr conn) +{ + esxPrivate *priv = conn->privateData; + + /* XXX we should be able to do something better than this but this is + * simple, safe, and good enough for now. In worst case, the function will + * return true even though the connection is not alive. + */ + if (priv->primary) + return 1; + else + return 0; +} + + + +static int esxDomainIsActive(virDomainPtr domain) { int result = -1; @@ -4992,6 +5009,7 @@ static virDriver esxDriver = { .domainSnapshotCurrent = esxDomainSnapshotCurrent, /* 0.8.0 */ .domainRevertToSnapshot = esxDomainRevertToSnapshot, /* 0.8.0 */ .domainSnapshotDelete = esxDomainSnapshotDelete, /* 0.8.0 */ + .isAlive = esxIsAlive, /* 0.9.7 */ }; diff --git a/src/hyperv/hyperv_driver.c b/src/hyperv/hyperv_driver.c index 39b55f8..7665a76 100644 --- a/src/hyperv/hyperv_driver.c +++ b/src/hyperv/hyperv_driver.c @@ -1119,6 +1119,23 @@ hypervIsSecure(virConnectPtr conn) static int +hypervIsAlive(virConnectPtr conn) +{ + hypervPrivate *priv = conn->privateData; + + /* XXX we should be able to do something better than this is simple, safe, + * and good enough for now. In worst case, the function will return true + * even though the connection is not alive. + */ + if (priv->client) + return 1; + else + return 0; +} + + + +static int hypervDomainIsActive(virDomainPtr domain) { int result = -1; @@ -1276,6 +1293,7 @@ static virDriver hypervDriver = { .domainManagedSave = hypervDomainManagedSave, /* 0.9.5 */ .domainHasManagedSaveImage = hypervDomainHasManagedSaveImage, /* 0.9.5 */ .domainManagedSaveRemove = hypervDomainManagedSaveRemove, /* 0.9.5 */ + .isAlive = hypervIsAlive, /* 0.9.7 */ }; diff --git a/src/libxl/libxl_driver.c b/src/libxl/libxl_driver.c index d324632..42503d6 100644 --- a/src/libxl/libxl_driver.c +++ b/src/libxl/libxl_driver.c @@ -3875,6 +3875,13 @@ libxlDomainEventDeregisterAny(virConnectPtr conn, int callbackID) } +static int +libxlIsAlive(virConnectPtr conn ATTRIBUTE_UNUSED) +{ + return 1; +} + + static virDriver libxlDriver = { .no = VIR_DRV_LIBXL, .name = "xenlight", @@ -3948,6 +3955,7 @@ static virDriver libxlDriver = { .domainIsUpdated = libxlDomainIsUpdated, /* 0.9.0 */ .domainEventRegisterAny = libxlDomainEventRegisterAny, /* 0.9.0 */ .domainEventDeregisterAny = libxlDomainEventDeregisterAny, /* 0.9.0 */ + .isAlive = libxlIsAlive, /* 0.9.7 */ }; static virStateDriver libxlStateDriver = { diff --git a/src/lxc/lxc_driver.c b/src/lxc/lxc_driver.c index 06bfa85..701bd8d 100644 --- a/src/lxc/lxc_driver.c +++ b/src/lxc/lxc_driver.c @@ -200,6 +200,12 @@ static int lxcIsEncrypted(virConnectPtr conn ATTRIBUTE_UNUSED) } +static int lxcIsAlive(virConnectPtr conn ATTRIBUTE_UNUSED) +{ + return 1; +} + + static char *lxcGetCapabilities(virConnectPtr conn) { lxc_driver_t *driver = conn->privateData; char *xml; @@ -3139,6 +3145,7 @@ static virDriver lxcDriver = { .domainEventRegisterAny = lxcDomainEventRegisterAny, /* 0.8.0 */ .domainEventDeregisterAny = lxcDomainEventDeregisterAny, /* 0.8.0 */ .domainOpenConsole = lxcDomainOpenConsole, /* 0.8.6 */ + .isAlive = lxcIsAlive, /* 0.9.7 */ }; static virStateDriver lxcStateDriver = { diff --git a/src/openvz/openvz_driver.c b/src/openvz/openvz_driver.c index 69ff444..8a4e6cf 100644 --- a/src/openvz/openvz_driver.c +++ b/src/openvz/openvz_driver.c @@ -1426,6 +1426,12 @@ static int openvzIsSecure(virConnectPtr conn ATTRIBUTE_UNUSED) { return 1; } +static int +openvzIsAlive(virConnectPtr conn ATTRIBUTE_UNUSED) +{ + return 1; +} + static char *openvzGetCapabilities(virConnectPtr conn) { struct openvz_driver *driver = conn->privateData; char *ret; @@ -1714,6 +1720,7 @@ static virDriver openvzDriver = { .domainIsActive = openvzDomainIsActive, /* 0.7.3 */ .domainIsPersistent = openvzDomainIsPersistent, /* 0.7.3 */ .domainIsUpdated = openvzDomainIsUpdated, /* 0.8.6 */ + .isAlive = openvzIsAlive, /* 0.9.7 */ }; int openvzRegister(void) { diff --git a/src/phyp/phyp_driver.c b/src/phyp/phyp_driver.c index ff16aae..458c9c9 100644 --- a/src/phyp/phyp_driver.c +++ b/src/phyp/phyp_driver.c @@ -1284,6 +1284,23 @@ phypIsSecure(virConnectPtr conn ATTRIBUTE_UNUSED) return 1; } + +static int +phypIsAlive(virConnectPtr conn) +{ + ConnectionData *connection_data = conn->networkPrivateData; + + /* XXX we should be able to do something better but this is simple, safe, + * and good enough for now. In worst case, the function will return true + * even though the connection is not alive. + */ + if (connection_data && connection_data->session) + return 1; + else + return 0; +} + + static int phypIsUpdated(virDomainPtr conn ATTRIBUTE_UNUSED) { @@ -3786,6 +3803,7 @@ static virDriver phypDriver = { .isEncrypted = phypIsEncrypted, /* 0.7.3 */ .isSecure = phypIsSecure, /* 0.7.3 */ .domainIsUpdated = phypIsUpdated, /* 0.8.6 */ + .isAlive = phypIsAlive, /* 0.9.7 */ }; static virStorageDriver phypStorageDriver = { diff --git a/src/qemu/qemu_driver.c b/src/qemu/qemu_driver.c index e053a97..e375fd2 100644 --- a/src/qemu/qemu_driver.c +++ b/src/qemu/qemu_driver.c @@ -942,6 +942,11 @@ static int qemuIsEncrypted(virConnectPtr conn ATTRIBUTE_UNUSED) return 0; } +static int qemuIsAlive(virConnectPtr conn ATTRIBUTE_UNUSED) +{ + return 1; +} + static int kvmGetMaxVCPUs(void) { int maxvcpus = 1; @@ -10802,6 +10807,7 @@ static virDriver qemuDriver = { .domainGetBlockJobInfo = qemuDomainGetBlockJobInfo, /* 0.9.4 */ .domainBlockJobSetSpeed = qemuDomainBlockJobSetSpeed, /* 0.9.4 */ .domainBlockPull = qemuDomainBlockPull, /* 0.9.4 */ + .isAlive = qemuIsAlive, /* 0.9.7 */ }; diff --git a/src/remote/remote_driver.c b/src/remote/remote_driver.c index f99c32d..09139aa 100644 --- a/src/remote/remote_driver.c +++ b/src/remote/remote_driver.c @@ -4171,6 +4171,23 @@ cleanup: } +static int +remoteIsAlive(virConnectPtr conn) +{ + struct private_data *priv = conn->privateData; + bool ret; + + remoteDriverLock(priv); + ret = virNetClientIsOpen(priv->client); + remoteDriverUnlock(priv); + + if (ret) + return 1; + else + return 0; +} + + #include "remote_client_bodies.h" #include "qemu_client_bodies.h" @@ -4524,6 +4541,7 @@ static virDriver remote_driver = { .domainBlockJobSetSpeed = remoteDomainBlockJobSetSpeed, /* 0.9.4 */ .domainBlockPull = remoteDomainBlockPull, /* 0.9.4 */ .setKeepAlive = remoteSetKeepAlive, /* 0.9.7 */ + .isAlive = remoteIsAlive, /* 0.9.7 */ }; static virNetworkDriver network_driver = { diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c index 37e9cc8..d546253 100644 --- a/src/rpc/virnetclient.c +++ b/src/rpc/virnetclient.c @@ -570,6 +570,20 @@ bool virNetClientIsEncrypted(virNetClientPtr client) } +bool virNetClientIsOpen(virNetClientPtr client) +{ + bool ret; + + if (!client) + return false; + + virNetClientLock(client); + ret = client->sock && !client->wantClose; + virNetClientUnlock(client); + return ret; +} + + int virNetClientAddProgram(virNetClientPtr client, virNetClientProgramPtr prog) { diff --git a/src/rpc/virnetclient.h b/src/rpc/virnetclient.h index 3227a4e..7d7a8c0 100644 --- a/src/rpc/virnetclient.h +++ b/src/rpc/virnetclient.h @@ -78,6 +78,7 @@ int virNetClientSetTLSSession(virNetClientPtr client, virNetTLSContextPtr tls); bool virNetClientIsEncrypted(virNetClientPtr client); +bool virNetClientIsOpen(virNetClientPtr client); const char *virNetClientLocalAddrString(virNetClientPtr client); const char *virNetClientRemoteAddrString(virNetClientPtr client); diff --git a/src/test/test_driver.c b/src/test/test_driver.c index 326409d..bc76640 100644 --- a/src/test/test_driver.c +++ b/src/test/test_driver.c @@ -1184,6 +1184,11 @@ static int testIsEncrypted(virConnectPtr conn ATTRIBUTE_UNUSED) return 0; } +static int testIsAlive(virConnectPtr conn ATTRIBUTE_UNUSED) +{ + return 1; +} + static int testGetMaxVCPUs(virConnectPtr conn ATTRIBUTE_UNUSED, const char *type ATTRIBUTE_UNUSED) { @@ -5623,6 +5628,7 @@ static virDriver testDriver = { .domainIsUpdated = testDomainIsUpdated, /* 0.8.6 */ .domainEventRegisterAny = testDomainEventRegisterAny, /* 0.8.0 */ .domainEventDeregisterAny = testDomainEventDeregisterAny, /* 0.8.0 */ + .isAlive = testIsAlive, /* 0.9.7 */ }; static virNetworkDriver testNetworkDriver = { diff --git a/src/uml/uml_driver.c b/src/uml/uml_driver.c index 16ab73a..c2880fe 100644 --- a/src/uml/uml_driver.c +++ b/src/uml/uml_driver.c @@ -1218,6 +1218,12 @@ static int umlIsEncrypted(virConnectPtr conn ATTRIBUTE_UNUSED) } +static int umlIsAlive(virConnectPtr conn ATTRIBUTE_UNUSED) +{ + return 1; +} + + static char *umlGetCapabilities(virConnectPtr conn) { struct uml_driver *driver = (struct uml_driver *)conn->privateData; char *xml; @@ -2584,6 +2590,7 @@ static virDriver umlDriver = { .domainEventRegisterAny = umlDomainEventRegisterAny, /* 0.9.4 */ .domainEventDeregisterAny = umlDomainEventDeregisterAny, /* 0.9.4 */ .domainOpenConsole = umlDomainOpenConsole, /* 0.8.6 */ + .isAlive = umlIsAlive, /* 0.9.7 */ }; static int diff --git a/src/vbox/vbox_tmpl.c b/src/vbox/vbox_tmpl.c index bc19b63..9f54c18 100644 --- a/src/vbox/vbox_tmpl.c +++ b/src/vbox/vbox_tmpl.c @@ -1070,6 +1070,11 @@ static int vboxIsEncrypted(virConnectPtr conn ATTRIBUTE_UNUSED) { return 0; } +static int vboxIsAlive(virConnectPtr conn ATTRIBUTE_UNUSED) +{ + return 1; +} + static int vboxGetMaxVcpus(virConnectPtr conn, const char *type ATTRIBUTE_UNUSED) { VBOX_OBJECT_CHECK(conn, int, -1); PRUint32 maxCPUCount = 0; @@ -8976,6 +8981,7 @@ virDriver NAME(Driver) = { .domainSnapshotCurrent = vboxDomainSnapshotCurrent, /* 0.8.0 */ .domainRevertToSnapshot = vboxDomainRevertToSnapshot, /* 0.8.0 */ .domainSnapshotDelete = vboxDomainSnapshotDelete, /* 0.8.0 */ + .isAlive = vboxIsAlive, /* 0.9.7 */ }; virNetworkDriver NAME(NetworkDriver) = { diff --git a/src/vmware/vmware_driver.c b/src/vmware/vmware_driver.c index b2cfdce..987a7a8 100644 --- a/src/vmware/vmware_driver.c +++ b/src/vmware/vmware_driver.c @@ -958,6 +958,12 @@ vmwareDomainGetState(virDomainPtr dom, return ret; } +static int +vmwareIsAlive(virConnectPtr conn ATTRIBUTE_UNUSED) +{ + return 1; +} + static virDriver vmwareDriver = { .no = VIR_DRV_VMWARE, .name = "VMWARE", @@ -990,6 +996,7 @@ static virDriver vmwareDriver = { .domainUndefineFlags = vmwareDomainUndefineFlags, /* 0.9.4 */ .domainIsActive = vmwareDomainIsActive, /* 0.8.7 */ .domainIsPersistent = vmwareDomainIsPersistent, /* 0.8.7 */ + .isAlive = vmwareIsAlive, /* 0.9.7 */ }; int diff --git a/src/xen/xen_driver.c b/src/xen/xen_driver.c index b3e7782..6f8b981 100644 --- a/src/xen/xen_driver.c +++ b/src/xen/xen_driver.c @@ -506,6 +506,13 @@ xenUnifiedIsSecure(virConnectPtr conn) return ret; } +static int +xenUnifiedIsAlive(virConnectPtr conn ATTRIBUTE_UNUSED) +{ + /* XenD reconnects for each request */ + return 1; +} + int xenUnifiedGetMaxVcpus (virConnectPtr conn, const char *type) { @@ -2249,6 +2256,7 @@ static virDriver xenUnifiedDriver = { .domainEventRegisterAny = xenUnifiedDomainEventRegisterAny, /* 0.8.0 */ .domainEventDeregisterAny = xenUnifiedDomainEventDeregisterAny, /* 0.8.0 */ .domainOpenConsole = xenUnifiedDomainOpenConsole, /* 0.8.6 */ + .isAlive = xenUnifiedIsAlive, /* 0.9.7 */ }; /** diff --git a/src/xenapi/xenapi_driver.c b/src/xenapi/xenapi_driver.c index 3946455..b2888d2 100644 --- a/src/xenapi/xenapi_driver.c +++ b/src/xenapi/xenapi_driver.c @@ -1888,6 +1888,17 @@ xenapiNodeGetCellsFreeMemory (virConnectPtr conn, unsigned long long *freeMems, } } +static int +xenapiIsAlive(virConnectPtr conn) +{ + struct _xenapiPrivate *priv = conn->privateData; + + if (priv->session && priv->session->ok) + return 1; + else + return 0; +} + /* The interface which we export upwards to libvirt.c. */ static virDriver xenapiDriver = { .no = VIR_DRV_XENAPI, @@ -1938,6 +1949,7 @@ static virDriver xenapiDriver = { .nodeGetCellsFreeMemory = xenapiNodeGetCellsFreeMemory, /* 0.8.0 */ .nodeGetFreeMemory = xenapiNodeGetFreeMemory, /* 0.8.0 */ .domainIsUpdated = xenapiDomainIsUpdated, /* 0.8.6 */ + .isAlive = xenapiIsAlive, /* 0.9.7 */ }; /** -- 1.7.7.1

On Thu, Oct 27, 2011 at 06:05:46PM +0200, Jiri Denemark wrote:
--- Notes: ACKed except for esx driver
Version 4: - fix esx implementation
Version 3: - no changes
Version 2: - new patch
src/esx/esx_driver.c | 18 ++++++++++++++++++ src/hyperv/hyperv_driver.c | 18 ++++++++++++++++++ src/libxl/libxl_driver.c | 8 ++++++++ src/lxc/lxc_driver.c | 7 +++++++ src/openvz/openvz_driver.c | 7 +++++++ src/phyp/phyp_driver.c | 18 ++++++++++++++++++ src/qemu/qemu_driver.c | 6 ++++++ src/remote/remote_driver.c | 18 ++++++++++++++++++ src/rpc/virnetclient.c | 14 ++++++++++++++ src/rpc/virnetclient.h | 1 + src/test/test_driver.c | 6 ++++++ src/uml/uml_driver.c | 7 +++++++ src/vbox/vbox_tmpl.c | 6 ++++++ src/vmware/vmware_driver.c | 7 +++++++ src/xen/xen_driver.c | 8 ++++++++ src/xenapi/xenapi_driver.c | 12 ++++++++++++ 16 files changed, 161 insertions(+), 0 deletions(-)
ACK, if changing 0.9.7 to 0.9.8 throughout Daniel -- |: http://berrange.com -o- http://www.flickr.com/photos/dberrange/ :| |: http://libvirt.org -o- http://virt-manager.org :| |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :| |: http://entangle-photo.org -o- http://live.gnome.org/gtk-vnc :|

--- Notes: ACKed Version 4: - no changes Version 3: - update to client API changes (virConnectAllowKeepAlive dropped and virConnectStartKeepAlive renamed as virConnectSetKeepAlive) Version 2: - automatically exit when a connection is closed because of keepalive timeout examples/domain-events/events-c/event-test.c | 9 ++++++++- examples/domain-events/events-python/event-test.py | 4 +++- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/examples/domain-events/events-c/event-test.c b/examples/domain-events/events-c/event-test.c index 7c99222..af1f47d 100644 --- a/examples/domain-events/events-c/event-test.c +++ b/examples/domain-events/events-c/event-test.c @@ -416,7 +416,14 @@ int main(int argc, char **argv) (callback6ret != -1) && (callback7ret != -1) && (callback9ret != -1)) { - while (run) { + if (virConnectSetKeepAlive(dconn, 5, 3) < 0) { + virErrorPtr err = virGetLastError(); + fprintf(stderr, "Failed to start keepalive protocol: %s\n", + err && err->message ? err->message : "Unknown error"); + run = 0; + } + + while (run && virConnectIsAlive(dconn) == 1) { if (virEventRunDefaultImpl() < 0) { virErrorPtr err = virGetLastError(); fprintf(stderr, "Failed to run event loop: %s\n", diff --git a/examples/domain-events/events-python/event-test.py b/examples/domain-events/events-python/event-test.py index 9628f6e..60a9c34 100644 --- a/examples/domain-events/events-python/event-test.py +++ b/examples/domain-events/events-python/event-test.py @@ -531,11 +531,13 @@ def main(): vc.domainEventRegisterAny(None, libvirt.VIR_DOMAIN_EVENT_ID_GRAPHICS, myDomainEventGraphicsCallback, None) vc.domainEventRegisterAny(None, libvirt.VIR_DOMAIN_EVENT_ID_DISK_CHANGE, myDomainEventDiskChangeCallback, None) + vc.setKeepAlive(5, 3) + # The rest of your app would go here normally, but for sake # of demo we'll just go to sleep. The other option is to # run the event loop in your main thread if your app is # totally event based. - while 1: + while vc.isAlive() == 1: time.sleep(1) -- 1.7.7.1

On Thu, Oct 27, 2011 at 06:05:47PM +0200, Jiri Denemark wrote:
--- Notes: ACKed
Version 4: - no changes
Version 3: - update to client API changes (virConnectAllowKeepAlive dropped and virConnectStartKeepAlive renamed as virConnectSetKeepAlive)
Version 2: - automatically exit when a connection is closed because of keepalive timeout
examples/domain-events/events-c/event-test.c | 9 ++++++++- examples/domain-events/events-python/event-test.py | 4 +++- 2 files changed, 11 insertions(+), 2 deletions(-)
ACK Daniel -- |: http://berrange.com -o- http://www.flickr.com/photos/dberrange/ :| |: http://libvirt.org -o- http://virt-manager.org :| |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :| |: http://entangle-photo.org -o- http://live.gnome.org/gtk-vnc :|

--- Notes: ACKed Version 4: - document the semantics of keepalive_count = 0 Version 3: - explain what keepalive_interval = -1 means - update to client API changes (virConnectAllowKeepAlive dropped and virConnectStartKeepAlive renamed as virConnectSetKeepAlive) Version 2: - no changes src/qemu/libvirtd_qemu.aug | 2 ++ src/qemu/qemu.conf | 22 ++++++++++++++++++++++ src/qemu/qemu_conf.c | 11 +++++++++++ src/qemu/qemu_conf.h | 3 +++ src/qemu/qemu_migration.c | 4 ++++ src/qemu/test_libvirtd_qemu.aug | 6 ++++++ 6 files changed, 48 insertions(+), 0 deletions(-) diff --git a/src/qemu/libvirtd_qemu.aug b/src/qemu/libvirtd_qemu.aug index 6c145c7..ad34e42 100644 --- a/src/qemu/libvirtd_qemu.aug +++ b/src/qemu/libvirtd_qemu.aug @@ -52,6 +52,8 @@ module Libvirtd_qemu = | int_entry "max_processes" | str_entry "lock_manager" | int_entry "max_queued" + | int_entry "keepalive_interval" + | int_entry "keepalive_count" (* Each enty in the config is one of the following three ... *) let entry = vnc_entry diff --git a/src/qemu/qemu.conf b/src/qemu/qemu.conf index 4da5d5a..87ce15c 100644 --- a/src/qemu/qemu.conf +++ b/src/qemu/qemu.conf @@ -316,3 +316,25 @@ # Note, that job lock is per domain. # # max_queued = 0 + +################################################################### +# Keepalive protocol: +# This allows qemu driver to detect broken connections to remote +# libvirtd during peer-to-peer migration. A keepalive message is +# sent to the deamon after keepalive_interval seconds of inactivity +# to check if the deamon is still responding; keepalive_count is a +# maximum number of keepalive messages that are allowed to be sent +# to the deamon without getting any response before the connection +# is considered broken. In other words, the connection is +# automatically closed approximately after +# keepalive_interval * (keepalive_count + 1) seconds since the last +# message received from the deamon. If keepalive_interval is set to +# -1, qemu driver will not send keepalive requests during +# peer-to-peer migration; however, the remote libvirtd can still +# send them and source libvirtd will send responses. When +# keepalive_count is set to 0, connections will be automatically +# closed after keepalive_interval seconds of inactivity without +# sending any keepalive messages. +# +#keepalive_interval = 5 +#keepalive_count = 5 diff --git a/src/qemu/qemu_conf.c b/src/qemu/qemu_conf.c index d1bf075..19f24b1 100644 --- a/src/qemu/qemu_conf.c +++ b/src/qemu/qemu_conf.c @@ -118,6 +118,9 @@ int qemudLoadDriverConfig(struct qemud_driver *driver, virLockManagerPluginNew("nop", NULL, 0))) return -1; + driver->keepAliveInterval = 5; + driver->keepAliveCount = 5; + /* Just check the file is readable before opening it, otherwise * libvirt emits an error. */ @@ -462,6 +465,14 @@ int qemudLoadDriverConfig(struct qemud_driver *driver, CHECK_TYPE("max_queued", VIR_CONF_LONG); if (p) driver->max_queued = p->l; + p = virConfGetValue(conf, "keepalive_interval"); + CHECK_TYPE("keepalive_interval", VIR_CONF_LONG); + if (p) driver->keepAliveInterval = p->l; + + p = virConfGetValue(conf, "keepalive_count"); + CHECK_TYPE("keepalive_count", VIR_CONF_LONG); + if (p) driver->keepAliveCount = p->l; + virConfFree (conf); return 0; } diff --git a/src/qemu/qemu_conf.h b/src/qemu/qemu_conf.h index ff5cf23..48bc67a 100644 --- a/src/qemu/qemu_conf.h +++ b/src/qemu/qemu_conf.h @@ -138,6 +138,9 @@ struct qemud_driver { * of guests which will be automatically killed * when the virConnectPtr is closed*/ virHashTablePtr autodestroy; + + int keepAliveInterval; + unsigned int keepAliveCount; }; typedef struct _qemuDomainCmdlineDef qemuDomainCmdlineDef; diff --git a/src/qemu/qemu_migration.c b/src/qemu/qemu_migration.c index decb0f2..1cebefd 100644 --- a/src/qemu/qemu_migration.c +++ b/src/qemu/qemu_migration.c @@ -2222,6 +2222,10 @@ static int doPeer2PeerMigrate(struct qemud_driver *driver, return -1; } + if (virConnectSetKeepAlive(dconn, driver->keepAliveInterval, + driver->keepAliveCount) < 0) + goto cleanup; + qemuDomainObjEnterRemoteWithDriver(driver, vm); p2p = VIR_DRV_SUPPORTS_FEATURE(dconn->driver, dconn, VIR_DRV_FEATURE_MIGRATION_P2P); diff --git a/src/qemu/test_libvirtd_qemu.aug b/src/qemu/test_libvirtd_qemu.aug index b1f9114..f7476ae 100644 --- a/src/qemu/test_libvirtd_qemu.aug +++ b/src/qemu/test_libvirtd_qemu.aug @@ -115,6 +115,9 @@ vnc_auto_unix_socket = 1 max_processes = 12345 lock_manager = \"fcntl\" + +keepalive_interval = 1 +keepalive_count = 42 " test Libvirtd_qemu.lns get conf = @@ -240,3 +243,6 @@ lock_manager = \"fcntl\" { "max_processes" = "12345" } { "#empty" } { "lock_manager" = "fcntl" } +{ "#empty" } +{ "keepalive_interval" = "1" } +{ "keepalive_count" = "42" } -- 1.7.7.1

On Thu, Oct 27, 2011 at 06:05:48PM +0200, Jiri Denemark wrote:
--- Notes: ACKed
Version 4: - document the semantics of keepalive_count = 0
Version 3: - explain what keepalive_interval = -1 means - update to client API changes (virConnectAllowKeepAlive dropped and virConnectStartKeepAlive renamed as virConnectSetKeepAlive)
Version 2: - no changes
src/qemu/libvirtd_qemu.aug | 2 ++ src/qemu/qemu.conf | 22 ++++++++++++++++++++++ src/qemu/qemu_conf.c | 11 +++++++++++ src/qemu/qemu_conf.h | 3 +++ src/qemu/qemu_migration.c | 4 ++++ src/qemu/test_libvirtd_qemu.aug | 6 ++++++ 6 files changed, 48 insertions(+), 0 deletions(-)
ACK Daniel -- |: http://berrange.com -o- http://www.flickr.com/photos/dberrange/ :| |: http://libvirt.org -o- http://virt-manager.org :| |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :| |: http://entangle-photo.org -o- http://live.gnome.org/gtk-vnc :|

If a connection to destination host is lost during peer-to-peer migration (because keepalive protocol timed out), we won't be able to finish the migration and it doesn't make sense to wait for qemu to transmit all data. This patch automatically cancels such migration without waiting for virDomainAbortJob to be called. --- Notes: ACKed Version 4: - no changes Version 3: - no changes Version 2: - new patch src/qemu/qemu_migration.c | 39 +++++++++++++++++++++++++-------------- 1 files changed, 25 insertions(+), 14 deletions(-) diff --git a/src/qemu/qemu_migration.c b/src/qemu/qemu_migration.c index 1cebefd..8463263 100644 --- a/src/qemu/qemu_migration.c +++ b/src/qemu/qemu_migration.c @@ -915,7 +915,8 @@ qemuMigrationUpdateJobStatus(struct qemud_driver *driver, static int qemuMigrationWaitForCompletion(struct qemud_driver *driver, virDomainObjPtr vm, - enum qemuDomainAsyncJob asyncJob) + enum qemuDomainAsyncJob asyncJob, + virConnectPtr dconn) { qemuDomainObjPrivatePtr priv = vm->privateData; const char *job; @@ -943,6 +944,12 @@ qemuMigrationWaitForCompletion(struct qemud_driver *driver, virDomainObjPtr vm, if (qemuMigrationUpdateJobStatus(driver, vm, job, asyncJob) < 0) goto cleanup; + if (dconn && virConnectIsAlive(dconn) <= 0) { + qemuReportError(VIR_ERR_OPERATION_FAILED, "%s", + _("Lost connection to destination host")); + goto cleanup; + } + virDomainObjUnlock(vm); qemuDriverUnlock(driver); @@ -1521,7 +1528,8 @@ qemuMigrationRun(struct qemud_driver *driver, int *cookieoutlen, unsigned long flags, unsigned long resource, - qemuMigrationSpecPtr spec) + qemuMigrationSpecPtr spec, + virConnectPtr dconn) { int ret = -1; unsigned int migrate_flags = QEMU_MONITOR_MIGRATE_BACKGROUND; @@ -1640,7 +1648,8 @@ qemuMigrationRun(struct qemud_driver *driver, goto cancel; if (qemuMigrationWaitForCompletion(driver, vm, - QEMU_ASYNC_JOB_MIGRATION_OUT) < 0) + QEMU_ASYNC_JOB_MIGRATION_OUT, + dconn) < 0) goto cleanup; /* When migration completed, QEMU will have paused the @@ -1697,7 +1706,8 @@ static int doNativeMigrate(struct qemud_driver *driver, char **cookieout, int *cookieoutlen, unsigned long flags, - unsigned long resource) + unsigned long resource, + virConnectPtr dconn) { qemuDomainObjPrivatePtr priv = vm->privateData; xmlURIPtr uribits = NULL; @@ -1755,7 +1765,7 @@ static int doNativeMigrate(struct qemud_driver *driver, } ret = qemuMigrationRun(driver, vm, cookiein, cookieinlen, cookieout, - cookieoutlen, flags, resource, &spec); + cookieoutlen, flags, resource, &spec, dconn); cleanup: if (spec.destType == MIGRATION_DEST_FD) @@ -1776,7 +1786,8 @@ static int doTunnelMigrate(struct qemud_driver *driver, char **cookieout, int *cookieoutlen, unsigned long flags, - unsigned long resource) + unsigned long resource, + virConnectPtr dconn) { qemuDomainObjPrivatePtr priv = vm->privateData; virNetSocketPtr sock = NULL; @@ -1839,7 +1850,7 @@ static int doTunnelMigrate(struct qemud_driver *driver, } ret = qemuMigrationRun(driver, vm, cookiein, cookieinlen, cookieout, - cookieoutlen, flags, resource, &spec); + cookieoutlen, flags, resource, &spec, dconn); cleanup: if (spec.destType == MIGRATION_DEST_FD) { @@ -1942,12 +1953,12 @@ static int doPeer2PeerMigrate2(struct qemud_driver *driver, if (flags & VIR_MIGRATE_TUNNELLED) ret = doTunnelMigrate(driver, vm, st, NULL, 0, NULL, NULL, - flags, resource); + flags, resource, dconn); else ret = doNativeMigrate(driver, vm, uri_out, cookie, cookielen, NULL, NULL, /* No out cookie with v2 migration */ - flags, resource); + flags, resource, dconn); /* Perform failed. Make sure Finish doesn't overwrite the error */ if (ret < 0) @@ -2088,12 +2099,12 @@ static int doPeer2PeerMigrate3(struct qemud_driver *driver, ret = doTunnelMigrate(driver, vm, st, cookiein, cookieinlen, &cookieout, &cookieoutlen, - flags, resource); + flags, resource, dconn); else ret = doNativeMigrate(driver, vm, uri_out, cookiein, cookieinlen, &cookieout, &cookieoutlen, - flags, resource); + flags, resource, dconn); /* Perform failed. Make sure Finish doesn't overwrite the error */ if (ret < 0) { @@ -2325,7 +2336,7 @@ qemuMigrationPerformJob(struct qemud_driver *driver, qemuMigrationJobSetPhase(driver, vm, QEMU_MIGRATION_PHASE_PERFORM2); ret = doNativeMigrate(driver, vm, uri, cookiein, cookieinlen, cookieout, cookieoutlen, - flags, resource); + flags, resource, NULL); } if (ret < 0) goto endjob; @@ -2414,7 +2425,7 @@ qemuMigrationPerformPhase(struct qemud_driver *driver, resume = virDomainObjGetState(vm, NULL) == VIR_DOMAIN_RUNNING; ret = doNativeMigrate(driver, vm, uri, cookiein, cookieinlen, cookieout, cookieoutlen, - flags, resource); + flags, resource, NULL); if (ret < 0 && resume && virDomainObjGetState(vm, NULL) == VIR_DOMAIN_PAUSED) { @@ -2939,7 +2950,7 @@ qemuMigrationToFile(struct qemud_driver *driver, virDomainObjPtr vm, if (rc < 0) goto cleanup; - rc = qemuMigrationWaitForCompletion(driver, vm, asyncJob); + rc = qemuMigrationWaitForCompletion(driver, vm, asyncJob, NULL); if (rc < 0) goto cleanup; -- 1.7.7.1

On Thu, Oct 27, 2011 at 06:05:49PM +0200, Jiri Denemark wrote:
If a connection to destination host is lost during peer-to-peer migration (because keepalive protocol timed out), we won't be able to finish the migration and it doesn't make sense to wait for qemu to transmit all data. This patch automatically cancels such migration without waiting for virDomainAbortJob to be called. --- Notes: ACKed
Version 4: - no changes
Version 3: - no changes
Version 2: - new patch
src/qemu/qemu_migration.c | 39 +++++++++++++++++++++++++-------------- 1 files changed, 25 insertions(+), 14 deletions(-)
ACK Daniel -- |: http://berrange.com -o- http://www.flickr.com/photos/dberrange/ :| |: http://libvirt.org -o- http://virt-manager.org :| |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :| |: http://entangle-photo.org -o- http://live.gnome.org/gtk-vnc :|
participants (2)
-
Daniel P. Berrange
-
Jiri Denemark