On Wed, Oct 12, 2011 at 07:16:20AM +0200, Jiri Denemark wrote:
These APIs are used by both client and server RPC layer to handle
processing of keepalive messages.
---
Notes:
Version 3:
- remove ADVERTISE message handling
Version 2:
- no change
po/POTFILES.in | 1 +
src/Makefile.am | 3 +-
src/rpc/virkeepalive.c | 426 ++++++++++++++++++++++++++++++++++++++++++++++++
src/rpc/virkeepalive.h | 56 +++++++
4 files changed, 485 insertions(+), 1 deletions(-)
create mode 100644 src/rpc/virkeepalive.c
create mode 100644 src/rpc/virkeepalive.h
diff --git a/po/POTFILES.in b/po/POTFILES.in
index 5ce35ae..71254dd 100644
--- a/po/POTFILES.in
+++ b/po/POTFILES.in
@@ -72,6 +72,7 @@ src/qemu/qemu_monitor_text.c
src/qemu/qemu_process.c
src/remote/remote_client_bodies.h
src/remote/remote_driver.c
+src/rpc/virkeepalive.c
src/rpc/virnetclient.c
src/rpc/virnetclientprogram.c
src/rpc/virnetclientstream.c
diff --git a/src/Makefile.am b/src/Makefile.am
index af07020..944629c 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1370,7 +1370,8 @@ libvirt_net_rpc_la_SOURCES = \
rpc/virnetprotocol.h rpc/virnetprotocol.c \
rpc/virnetsocket.h rpc/virnetsocket.c \
rpc/virnettlscontext.h rpc/virnettlscontext.c \
- rpc/virkeepaliveprotocol.h rpc/virkeepaliveprotocol.c
+ rpc/virkeepaliveprotocol.h rpc/virkeepaliveprotocol.c \
+ rpc/virkeepalive.h rpc/virkeepalive.c
if HAVE_SASL
libvirt_net_rpc_la_SOURCES += \
rpc/virnetsaslcontext.h rpc/virnetsaslcontext.c
diff --git a/src/rpc/virkeepalive.c b/src/rpc/virkeepalive.c
new file mode 100644
index 0000000..44cc322
--- /dev/null
+++ b/src/rpc/virkeepalive.c
@@ -0,0 +1,426 @@
+/*
+ * virkeepalive.c: keepalive handling
+ *
+ * Copyright (C) 2011 Red Hat, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ *
+ * Author: Jiri Denemark <jdenemar(a)redhat.com>
+ */
+
+#include <config.h>
+
+#include "memory.h"
+#include "threads.h"
+#include "virfile.h"
+#include "logging.h"
+#include "util.h"
+#include "virterror_internal.h"
+#include "virnetsocket.h"
+#include "virkeepaliveprotocol.h"
+#include "virkeepalive.h"
+
+#define VIR_FROM_THIS VIR_FROM_RPC
+#define virNetError(code, ...) \
+ virReportErrorHelper(VIR_FROM_THIS, code, __FILE__, \
+ __FUNCTION__, __LINE__, __VA_ARGS__)
+
+struct _virKeepAlive {
+ int refs;
+ virMutex lock;
+
+ int interval;
+ unsigned int count;
+ unsigned int countToDeath;
+ time_t lastPacketReceived;
+ int timer;
+
+ virNetMessagePtr response;
+ int responseTimer;
+
+ virKeepAliveSendFunc sendCB;
+ virKeepAliveDeadFunc deadCB;
+ virKeepAliveFreeFunc freeCB;
+ void *client;
+};
+
+
+static void
+virKeepAliveLock(virKeepAlivePtr ka)
+{
+ virMutexLock(&ka->lock);
+}
+
+static void
+virKeepAliveUnlock(virKeepAlivePtr ka)
+{
+ virMutexUnlock(&ka->lock);
+}
+
+
+static virNetMessagePtr
+virKeepAliveMessage(int proc)
+{
+ virNetMessagePtr msg;
+
+ if (!(msg = virNetMessageNew(false)))
+ return NULL;
+
+ msg->header.prog = KEEPALIVE_PROGRAM;
+ msg->header.vers = KEEPALIVE_VERSION;
+ msg->header.type = VIR_NET_MESSAGE;
+ msg->header.proc = proc;
+
+ if (virNetMessageEncodeHeader(msg) < 0 ||
+ virNetMessageEncodePayloadEmpty(msg) < 0) {
+ virNetMessageFree(msg);
+ return NULL;
+ }
+
+ return msg;
+}
+
+
+static int
+virKeepAliveSend(virKeepAlivePtr ka, virNetMessagePtr msg)
+{
+ int ret;
+ const char *proc = NULL;
+ void *client = ka->client;
+ virKeepAliveSendFunc sendCB = ka->sendCB;
+
+ switch (msg->header.proc) {
+ case KEEPALIVE_PROC_PING:
+ proc = "request";
+ break;
+ case KEEPALIVE_PROC_PONG:
+ proc = "response";
+ break;
+ }
+
+ if (!proc) {
+ VIR_WARN("Refusing to send unknown keepalive message: %d",
+ msg->header.proc);
+ return -1;
This exit path requires the caller to free 'msg'
+ }
+
+ VIR_DEBUG("Sending keepalive %s to client %p", proc, ka->client);
+
+ ka->refs++;
+ virKeepAliveUnlock(ka);
+
+ if ((ret = sendCB(client, msg)) < 0) {
+ VIR_WARN("Failed to send keepalive %s to client %p", proc, client);
+ virNetMessageFree(msg);
Where as this exit path free's the msg itself.
+ }
+
+ virKeepAliveLock(ka);
+ ka->refs--;
+
+ return ret;
+}
+
+
+static void
+virKeepAliveScheduleResponse(virKeepAlivePtr ka)
+{
+ if (ka->responseTimer == -1)
+ return;
+
+ VIR_DEBUG("Scheduling keepalive response to client %p", ka->client);
+
+ if (!ka->response &&
+ !(ka->response = virKeepAliveMessage(KEEPALIVE_PROC_PONG))) {
+ VIR_WARN("Failed to generate keepalive response");
+ return;
+ }
+
+ virEventUpdateTimeout(ka->responseTimer, 0);
+}
+
+
+static void
+virKeepAliveTimer(int timer ATTRIBUTE_UNUSED, void *opaque)
+{
+ virKeepAlivePtr ka = opaque;
+ time_t now = time(NULL);
+
+ virKeepAliveLock(ka);
+
+ VIR_DEBUG("ka=%p, client=%p, countToDeath=%d, lastPacketReceived=%lds
ago",
+ ka, ka->client, ka->countToDeath, now - ka->lastPacketReceived);
+
+ if (now - ka->lastPacketReceived < ka->interval - 1) {
+ int timeout = ka->interval - (now - ka->lastPacketReceived);
+ virEventUpdateTimeout(ka->timer, timeout * 1000);
+ goto cleanup;
+ }
+
+ if (ka->countToDeath == 0) {
+ virKeepAliveDeadFunc deadCB = ka->deadCB;
+ void *client = ka->client;
+
+ VIR_WARN("No response from client %p after %d keepalive messages in"
+ " %d seconds",
+ ka->client,
+ ka->count,
+ (int) (now - ka->lastPacketReceived));
+ ka->refs++;
+ virKeepAliveUnlock(ka);
+ deadCB(client);
+ virKeepAliveLock(ka);
+ ka->refs--;
+ } else {
+ virNetMessagePtr msg;
+
+ ka->countToDeath--;
+ if (!(msg = virKeepAliveMessage(KEEPALIVE_PROC_PING)))
+ VIR_WARN("Failed to generate keepalive request");
+ else
+ ignore_value(virKeepAliveSend(ka, msg));
This might need to change depending on how you fix the return
handling of this method wrt free'ing msg.
+ virEventUpdateTimeout(ka->timer, ka->interval *
1000);
+ }
+
+cleanup:
+ virKeepAliveUnlock(ka);
+}
+
+
+static void
+virKeepAliveResponseTimer(int timer ATTRIBUTE_UNUSED, void *opaque)
+{
+ virKeepAlivePtr ka = opaque;
+ virNetMessagePtr msg;
+
+ virKeepAliveLock(ka);
+
+ VIR_DEBUG("ka=%p, client=%p, response=%p",
+ ka, ka->client, ka->response);
+
+ if (ka->response) {
+ msg = ka->response;
+ ka->response = NULL;
+ ignore_value(virKeepAliveSend(ka, msg));
Likewise possible change needed here.
+ }
+
+ virEventUpdateTimeout(ka->responseTimer, ka->response ? 0 : -1);
+
+ virKeepAliveUnlock(ka);
+}
+
+
+static void
+virKeepAliveTimerFree(void *opaque)
+{
+ virKeepAliveFree(opaque);
+}
+
+
+virKeepAlivePtr
+virKeepAliveNew(int interval,
+ unsigned int count,
+ void *client,
+ virKeepAliveSendFunc sendCB,
+ virKeepAliveDeadFunc deadCB,
+ virKeepAliveFreeFunc freeCB)
+{
+ virKeepAlivePtr ka;
+
+ VIR_DEBUG("client=%p, interval=%d, count=%u", client, interval, count);
+
+ if (VIR_ALLOC(ka) < 0) {
+ virReportOOMError();
+ return NULL;
+ }
+
+ if (virMutexInit(&ka->lock) < 0) {
+ VIR_FREE(ka);
+ return NULL;
+ }
+
+ ka->refs = 1;
+ ka->interval = interval;
+ ka->count = count;
+ ka->countToDeath = count;
+ ka->timer = -1;
+ ka->client = client;
+ ka->sendCB = sendCB;
+ ka->deadCB = deadCB;
+ ka->freeCB = freeCB;
+
+ ka->responseTimer = virEventAddTimeout(-1, virKeepAliveResponseTimer,
+ ka, virKeepAliveTimerFree);
+ if (ka->responseTimer < 0) {
+ virKeepAliveFree(ka);
+ return NULL;
+ }
+ /* the timer now has a reference to ka */
+ ka->refs++;
+
+ return ka;
+}
+
+
+void
+virKeepAliveRef(virKeepAlivePtr ka)
+{
+ virKeepAliveLock(ka);
+ ka->refs++;
+ VIR_DEBUG("ka=%p, client=%p, refs=%d", ka, ka->client, ka->refs);
+ virKeepAliveUnlock(ka);
+}
+
+
+void
+virKeepAliveFree(virKeepAlivePtr ka)
+{
+ if (!ka)
+ return;
+
+ virKeepAliveLock(ka);
+ VIR_DEBUG("ka=%p, client=%p, refs=%d", ka, ka->client, ka->refs);
+ if (--ka->refs > 0) {
+ virKeepAliveUnlock(ka);
+ return;
+ }
+
+ virMutexDestroy(&ka->lock);
+ ka->freeCB(ka->client);
+ VIR_FREE(ka);
+}
+
+
+int
+virKeepAliveStart(virKeepAlivePtr ka,
+ int interval,
+ unsigned int count)
+{
+ int ret = -1;
+ time_t delay;
+ int timeout;
+
+ VIR_DEBUG("ka=%p, client=%p, interval=%d, count=%u",
+ ka, ka->client, interval, count);
+
+ virKeepAliveLock(ka);
+
+ if (ka->timer >= 0) {
+ VIR_DEBUG("Keepalive messages already enabled");
+ ret = 0;
+ goto cleanup;
+ }
+
+ if (interval > 0) {
+ if (ka->interval > 0) {
+ virNetError(VIR_ERR_INTERNAL_ERROR, "%s",
+ _("keepalive interval already set"));
+ goto cleanup;
+ }
+ ka->interval = interval;
+ ka->count = count;
+ ka->countToDeath = count;
+ }
+
+ if (ka->interval <= 0) {
+ VIR_DEBUG("Keepalive messages disabled by configuration");
+ ret = 0;
+ goto cleanup;
+ }
+
+ VIR_DEBUG("Enabling keepalive messages; interval=%d, count=%u",
+ ka->interval, ka->count);
+
+ delay = time(NULL) - ka->lastPacketReceived;
+ if (delay > ka->interval)
+ timeout = 0;
+ else
+ timeout = ka->interval - delay;
+ ka->timer = virEventAddTimeout(timeout * 1000, virKeepAliveTimer,
+ ka, virKeepAliveTimerFree);
+ if (ka->timer < 0)
+ goto cleanup;
+
+ /* the timer now has another reference to this object */
+ ka->refs++;
+ ret = 0;
+
+cleanup:
+ virKeepAliveUnlock(ka);
+ return ret;
+}
+
+
+void
+virKeepAliveStop(virKeepAlivePtr ka)
+{
+ VIR_DEBUG("ka=%p, client=%p", ka, ka->client);
+
+ virKeepAliveLock(ka);
+ if (ka->timer > 0) {
+ virEventRemoveTimeout(ka->timer);
+ ka->timer = -1;
+ }
+ if (ka->responseTimer > 0) {
+ virEventRemoveTimeout(ka->responseTimer);
+ ka->responseTimer = -1;
+ }
+ virKeepAliveUnlock(ka);
+}
Do we need to clear any dangling 'ka->response' object ?
ACK if those questions are cleared up.
Bonus points if you fancy inserting some DTrace/SystemTAP probes to the
code as a later patch.
Daniel
--
|:
http://berrange.com -o-
http://www.flickr.com/photos/dberrange/ :|
|:
http://libvirt.org -o-
http://virt-manager.org :|
|:
http://autobuild.org -o-
http://search.cpan.org/~danberr/ :|
|:
http://entangle-photo.org -o-
http://live.gnome.org/gtk-vnc :|