These APIs are used by both client and server RPC layer to handle
processing of keepalive messages.
---
Notes:
Version 3 was ACKed provided I cleare some things up (which I believe I did
in v4). I also tried to get the bonus points :-)
Version 4:
- s/KEEPALIVE_VERSION/KEEPALIVE_PROTOCOL_VERSION/
- always free message in virKeepAliveSend
- free ka->response in virKeepAliveStop
- systemtap probes
Version 3:
- remove ADVERTISE message handling
Version 2:
- no change
po/POTFILES.in | 1 +
src/Makefile.am | 3 +-
src/probes.d | 12 ++
src/rpc/virkeepalive.c | 448 ++++++++++++++++++++++++++++++++++++++++++++++++
src/rpc/virkeepalive.h | 56 ++++++
5 files changed, 519 insertions(+), 1 deletions(-)
create mode 100644 src/rpc/virkeepalive.c
create mode 100644 src/rpc/virkeepalive.h
diff --git a/po/POTFILES.in b/po/POTFILES.in
index 5ce35ae..71254dd 100644
--- a/po/POTFILES.in
+++ b/po/POTFILES.in
@@ -72,6 +72,7 @@ src/qemu/qemu_monitor_text.c
src/qemu/qemu_process.c
src/remote/remote_client_bodies.h
src/remote/remote_driver.c
+src/rpc/virkeepalive.c
src/rpc/virnetclient.c
src/rpc/virnetclientprogram.c
src/rpc/virnetclientstream.c
diff --git a/src/Makefile.am b/src/Makefile.am
index 872ac37..33cc5f8 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1382,7 +1382,8 @@ libvirt_net_rpc_la_SOURCES = \
rpc/virnetprotocol.h rpc/virnetprotocol.c \
rpc/virnetsocket.h rpc/virnetsocket.c \
rpc/virnettlscontext.h rpc/virnettlscontext.c \
- rpc/virkeepaliveprotocol.h rpc/virkeepaliveprotocol.c
+ rpc/virkeepaliveprotocol.h rpc/virkeepaliveprotocol.c \
+ rpc/virkeepalive.h rpc/virkeepalive.c
if HAVE_SASL
libvirt_net_rpc_la_SOURCES += \
rpc/virnetsaslcontext.h rpc/virnetsaslcontext.c
diff --git a/src/probes.d b/src/probes.d
index 7f66ac0..d84ebf0 100644
--- a/src/probes.d
+++ b/src/probes.d
@@ -70,6 +70,18 @@ provider libvirt {
probe rpc_tls_session_handshake_fail(void *sess);
+ # file: src/rpc/virkeepalive.c
+ # prefix: rpc
+ probe rpc_keepalive_new(void *ka, void *client, int refs);
+ probe rpc_keepalive_ref(void *ka, void *client, int refs);
+ probe rpc_keepalive_free(void *ka, void *client, int refs);
+ probe rpc_keepalive_start(void *ka, void *client, int interval, int count);
+ probe rpc_keepalive_stop(void *ka, void *client);
+ probe rpc_keepalive_send(void *ka, void *client, int prog, int vers, int proc);
+ probe rpc_keepalive_received(void *ka, void *client, int prog, int vers, int proc);
+ probe rpc_keepalive_timeout(void *ka, void *client, int coundToDeath, int idle);
+
+
# file: src/qemu/qemu_monitor.c
# prefix: qemu
# binary: libvirtd
diff --git a/src/rpc/virkeepalive.c b/src/rpc/virkeepalive.c
new file mode 100644
index 0000000..06b8e63
--- /dev/null
+++ b/src/rpc/virkeepalive.c
@@ -0,0 +1,448 @@
+/*
+ * virkeepalive.c: keepalive handling
+ *
+ * Copyright (C) 2011 Red Hat, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ *
+ * Author: Jiri Denemark <jdenemar(a)redhat.com>
+ */
+
+#include <config.h>
+
+#include "memory.h"
+#include "threads.h"
+#include "virfile.h"
+#include "logging.h"
+#include "util.h"
+#include "virterror_internal.h"
+#include "virnetsocket.h"
+#include "virkeepaliveprotocol.h"
+#include "virkeepalive.h"
+
+#define VIR_FROM_THIS VIR_FROM_RPC
+#define virNetError(code, ...) \
+ virReportErrorHelper(VIR_FROM_THIS, code, __FILE__, \
+ __FUNCTION__, __LINE__, __VA_ARGS__)
+
+struct _virKeepAlive {
+ int refs;
+ virMutex lock;
+
+ int interval;
+ unsigned int count;
+ unsigned int countToDeath;
+ time_t lastPacketReceived;
+ int timer;
+
+ virNetMessagePtr response;
+ int responseTimer;
+
+ virKeepAliveSendFunc sendCB;
+ virKeepAliveDeadFunc deadCB;
+ virKeepAliveFreeFunc freeCB;
+ void *client;
+};
+
+
+static void
+virKeepAliveLock(virKeepAlivePtr ka)
+{
+ virMutexLock(&ka->lock);
+}
+
+static void
+virKeepAliveUnlock(virKeepAlivePtr ka)
+{
+ virMutexUnlock(&ka->lock);
+}
+
+
+static virNetMessagePtr
+virKeepAliveMessage(int proc)
+{
+ virNetMessagePtr msg;
+
+ if (!(msg = virNetMessageNew(false)))
+ return NULL;
+
+ msg->header.prog = KEEPALIVE_PROGRAM;
+ msg->header.vers = KEEPALIVE_PROTOCOL_VERSION;
+ msg->header.type = VIR_NET_MESSAGE;
+ msg->header.proc = proc;
+
+ if (virNetMessageEncodeHeader(msg) < 0 ||
+ virNetMessageEncodePayloadEmpty(msg) < 0) {
+ virNetMessageFree(msg);
+ return NULL;
+ }
+
+ return msg;
+}
+
+
+static void
+virKeepAliveSend(virKeepAlivePtr ka, virNetMessagePtr msg)
+{
+ const char *proc = NULL;
+ void *client = ka->client;
+ virKeepAliveSendFunc sendCB = ka->sendCB;
+
+ switch (msg->header.proc) {
+ case KEEPALIVE_PROC_PING:
+ proc = "request";
+ break;
+ case KEEPALIVE_PROC_PONG:
+ proc = "response";
+ break;
+ }
+
+ if (!proc) {
+ VIR_WARN("Refusing to send unknown keepalive message: %d",
+ msg->header.proc);
+ virNetMessageFree(msg);
+ return;
+ }
+
+ VIR_DEBUG("Sending keepalive %s to client %p", proc, ka->client);
+ PROBE(RPC_KEEPALIVE_SEND,
+ "ka=%p client=%p prog=%d vers=%d proc=%d",
+ ka, ka->client, msg->header.prog, msg->header.vers,
msg->header.proc);
+
+ ka->refs++;
+ virKeepAliveUnlock(ka);
+
+ if (sendCB(client, msg) < 0) {
+ VIR_WARN("Failed to send keepalive %s to client %p", proc, client);
+ virNetMessageFree(msg);
+ }
+
+ virKeepAliveLock(ka);
+ ka->refs--;
+}
+
+
+static void
+virKeepAliveScheduleResponse(virKeepAlivePtr ka)
+{
+ if (ka->responseTimer == -1)
+ return;
+
+ VIR_DEBUG("Scheduling keepalive response to client %p", ka->client);
+
+ if (!ka->response &&
+ !(ka->response = virKeepAliveMessage(KEEPALIVE_PROC_PONG))) {
+ VIR_WARN("Failed to generate keepalive response");
+ return;
+ }
+
+ virEventUpdateTimeout(ka->responseTimer, 0);
+}
+
+
+static void
+virKeepAliveTimer(int timer ATTRIBUTE_UNUSED, void *opaque)
+{
+ virKeepAlivePtr ka = opaque;
+ time_t now = time(NULL);
+
+ virKeepAliveLock(ka);
+
+ PROBE(RPC_KEEPALIVE_TIMEOUT,
+ "ka=%p client=%p countToDeath=%d idle=%d",
+ ka, ka->client, ka->countToDeath,
+ (int) (now - ka->lastPacketReceived));
+
+ if (now - ka->lastPacketReceived < ka->interval - 1) {
+ int timeout = ka->interval - (now - ka->lastPacketReceived);
+ virEventUpdateTimeout(ka->timer, timeout * 1000);
+ goto cleanup;
+ }
+
+ if (ka->countToDeath == 0) {
+ virKeepAliveDeadFunc deadCB = ka->deadCB;
+ void *client = ka->client;
+
+ VIR_WARN("No response from client %p after %d keepalive messages in"
+ " %d seconds",
+ ka->client,
+ ka->count,
+ (int) (now - ka->lastPacketReceived));
+ ka->refs++;
+ virKeepAliveUnlock(ka);
+ deadCB(client);
+ virKeepAliveLock(ka);
+ ka->refs--;
+ } else {
+ virNetMessagePtr msg;
+
+ ka->countToDeath--;
+ if (!(msg = virKeepAliveMessage(KEEPALIVE_PROC_PING)))
+ VIR_WARN("Failed to generate keepalive request");
+ else
+ virKeepAliveSend(ka, msg);
+ virEventUpdateTimeout(ka->timer, ka->interval * 1000);
+ }
+
+cleanup:
+ virKeepAliveUnlock(ka);
+}
+
+
+static void
+virKeepAliveResponseTimer(int timer ATTRIBUTE_UNUSED, void *opaque)
+{
+ virKeepAlivePtr ka = opaque;
+ virNetMessagePtr msg;
+
+ virKeepAliveLock(ka);
+
+ VIR_DEBUG("ka=%p, client=%p, response=%p",
+ ka, ka->client, ka->response);
+
+ if (ka->response) {
+ msg = ka->response;
+ ka->response = NULL;
+ virKeepAliveSend(ka, msg);
+ }
+
+ virEventUpdateTimeout(ka->responseTimer, ka->response ? 0 : -1);
+
+ virKeepAliveUnlock(ka);
+}
+
+
+static void
+virKeepAliveTimerFree(void *opaque)
+{
+ virKeepAliveFree(opaque);
+}
+
+
+virKeepAlivePtr
+virKeepAliveNew(int interval,
+ unsigned int count,
+ void *client,
+ virKeepAliveSendFunc sendCB,
+ virKeepAliveDeadFunc deadCB,
+ virKeepAliveFreeFunc freeCB)
+{
+ virKeepAlivePtr ka;
+
+ VIR_DEBUG("client=%p, interval=%d, count=%u", client, interval, count);
+
+ if (VIR_ALLOC(ka) < 0) {
+ virReportOOMError();
+ return NULL;
+ }
+
+ if (virMutexInit(&ka->lock) < 0) {
+ VIR_FREE(ka);
+ return NULL;
+ }
+
+ ka->refs = 1;
+ ka->interval = interval;
+ ka->count = count;
+ ka->countToDeath = count;
+ ka->timer = -1;
+ ka->client = client;
+ ka->sendCB = sendCB;
+ ka->deadCB = deadCB;
+ ka->freeCB = freeCB;
+
+ ka->responseTimer = virEventAddTimeout(-1, virKeepAliveResponseTimer,
+ ka, virKeepAliveTimerFree);
+ if (ka->responseTimer < 0) {
+ virKeepAliveFree(ka);
+ return NULL;
+ }
+ /* the timer now has a reference to ka */
+ ka->refs++;
+
+ PROBE(RPC_KEEPALIVE_NEW,
+ "ka=%p client=%p refs=%d",
+ ka, ka->client, ka->refs);
+
+ return ka;
+}
+
+
+void
+virKeepAliveRef(virKeepAlivePtr ka)
+{
+ virKeepAliveLock(ka);
+ ka->refs++;
+ PROBE(RPC_KEEPALIVE_REF,
+ "ka=%p client=%p refs=%d",
+ ka, ka->client, ka->refs);
+ virKeepAliveUnlock(ka);
+}
+
+
+void
+virKeepAliveFree(virKeepAlivePtr ka)
+{
+ if (!ka)
+ return;
+
+ virKeepAliveLock(ka);
+ PROBE(RPC_KEEPALIVE_FREE,
+ "ka=%p client=%p refs=%d",
+ ka, ka->client, ka->refs);
+
+ if (--ka->refs > 0) {
+ virKeepAliveUnlock(ka);
+ return;
+ }
+
+ virMutexDestroy(&ka->lock);
+ ka->freeCB(ka->client);
+ VIR_FREE(ka);
+}
+
+
+int
+virKeepAliveStart(virKeepAlivePtr ka,
+ int interval,
+ unsigned int count)
+{
+ int ret = -1;
+ time_t delay;
+ int timeout;
+
+ virKeepAliveLock(ka);
+
+ if (ka->timer >= 0) {
+ VIR_DEBUG("Keepalive messages already enabled");
+ ret = 0;
+ goto cleanup;
+ }
+
+ if (interval > 0) {
+ if (ka->interval > 0) {
+ virNetError(VIR_ERR_INTERNAL_ERROR, "%s",
+ _("keepalive interval already set"));
+ goto cleanup;
+ }
+ ka->interval = interval;
+ ka->count = count;
+ ka->countToDeath = count;
+ }
+
+ if (ka->interval <= 0) {
+ VIR_DEBUG("Keepalive messages disabled by configuration");
+ ret = 0;
+ goto cleanup;
+ }
+
+ PROBE(RPC_KEEPALIVE_START,
+ "ka=%p client=%p interval=%d count=%u",
+ ka, ka->client, interval, count);
+
+ delay = time(NULL) - ka->lastPacketReceived;
+ if (delay > ka->interval)
+ timeout = 0;
+ else
+ timeout = ka->interval - delay;
+ ka->timer = virEventAddTimeout(timeout * 1000, virKeepAliveTimer,
+ ka, virKeepAliveTimerFree);
+ if (ka->timer < 0)
+ goto cleanup;
+
+ /* the timer now has another reference to this object */
+ ka->refs++;
+ ret = 0;
+
+cleanup:
+ virKeepAliveUnlock(ka);
+ return ret;
+}
+
+
+void
+virKeepAliveStop(virKeepAlivePtr ka)
+{
+ virKeepAliveLock(ka);
+
+ PROBE(RPC_KEEPALIVE_STOP,
+ "ka=%p client=%p",
+ ka, ka->client);
+
+ if (ka->timer > 0) {
+ virEventRemoveTimeout(ka->timer);
+ ka->timer = -1;
+ }
+
+ if (ka->responseTimer > 0) {
+ virEventRemoveTimeout(ka->responseTimer);
+ ka->responseTimer = -1;
+ }
+
+ virNetMessageFree(ka->response);
+ ka->response = NULL;
+
+ virKeepAliveUnlock(ka);
+}
+
+
+bool
+virKeepAliveCheckMessage(virKeepAlivePtr ka,
+ virNetMessagePtr msg)
+{
+ bool ret = false;
+
+ VIR_DEBUG("ka=%p, client=%p, msg=%p",
+ ka, ka ? ka->client : "(null)", msg);
+
+ if (!ka)
+ return false;
+
+ virKeepAliveLock(ka);
+
+ ka->countToDeath = ka->count;
+ ka->lastPacketReceived = time(NULL);
+
+ if (msg->header.prog == KEEPALIVE_PROGRAM &&
+ msg->header.vers == KEEPALIVE_PROTOCOL_VERSION &&
+ msg->header.type == VIR_NET_MESSAGE) {
+ PROBE(RPC_KEEPALIVE_RECEIVED,
+ "ka=%p client=%p prog=%d vers=%d proc=%d",
+ ka, ka->client, msg->header.prog,
+ msg->header.vers, msg->header.proc);
+ ret = true;
+ switch (msg->header.proc) {
+ case KEEPALIVE_PROC_PING:
+ VIR_DEBUG("Got keepalive request from client %p", ka->client);
+ virKeepAliveScheduleResponse(ka);
+ break;
+
+ case KEEPALIVE_PROC_PONG:
+ VIR_DEBUG("Got keepalive response from client %p", ka->client);
+ break;
+
+ default:
+ VIR_DEBUG("Ignoring unknown keepalive message %d from client %p",
+ msg->header.proc, ka->client);
+ }
+ }
+
+ if (ka->timer >= 0)
+ virEventUpdateTimeout(ka->timer, ka->interval * 1000);
+
+ virKeepAliveUnlock(ka);
+
+ return ret;
+}
diff --git a/src/rpc/virkeepalive.h b/src/rpc/virkeepalive.h
new file mode 100644
index 0000000..f1654eb
--- /dev/null
+++ b/src/rpc/virkeepalive.h
@@ -0,0 +1,56 @@
+/*
+ * virkeepalive.h: keepalive handling
+ *
+ * Copyright (C) 2011 Red Hat, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ *
+ * Author: Jiri Denemark <jdenemar(a)redhat.com>
+ */
+
+#ifndef __VIR_KEEPALIVE_H__
+# define __VIR_KEEPALIVE_H__
+
+# include "virnetmessage.h"
+
+typedef int (*virKeepAliveSendFunc)(void *client, virNetMessagePtr msg);
+typedef void (*virKeepAliveDeadFunc)(void *client);
+typedef void (*virKeepAliveFreeFunc)(void *client);
+
+typedef struct _virKeepAlive virKeepAlive;
+typedef virKeepAlive *virKeepAlivePtr;
+
+
+virKeepAlivePtr virKeepAliveNew(int interval,
+ unsigned int count,
+ void *client,
+ virKeepAliveSendFunc sendCB,
+ virKeepAliveDeadFunc deadCB,
+ virKeepAliveFreeFunc freeCB)
+ ATTRIBUTE_NONNULL(3) ATTRIBUTE_NONNULL(4)
+ ATTRIBUTE_NONNULL(5) ATTRIBUTE_NONNULL(6);
+
+void virKeepAliveRef(virKeepAlivePtr ka);
+void virKeepAliveFree(virKeepAlivePtr ka);
+
+int virKeepAliveStart(virKeepAlivePtr ka,
+ int interval,
+ unsigned int count);
+void virKeepAliveStop(virKeepAlivePtr ka);
+
+bool virKeepAliveCheckMessage(virKeepAlivePtr ka,
+ virNetMessagePtr msg);
+
+#endif /* __VIR_KEEPALIVE_H__ */
--
1.7.7.1