# HG changeset patch
# User Hollis Blanchard <hollisb(a)us.ibm.com>
# Date 1250181138 25200
# Node ID 36aa26ca62cabda72e027f075889c7a0973a4e6b
# Parent c345ed88f03432da53efee161d505bd64c02c32e
[RFC] Implement libvirt event callback management.
Libvirt requires that users implement their own event monitoring and
callback-management infrastructure. Actually, two similar lists are needed: one
for file descriptors to monitor, and one for pending timers.
This patch starts exactly one event-monitoring thread per libvirt-cim instance.
Multiple provider threads in the same process will share this thread.
Signed-off-by: Hollis Blanchard <hollisb(a)us.ibm.com>
---
This builds but isn't tested, and may very well contain embarrassing list
manipulation or locking bugs. It really needs a consumer to test, i.e. for
someone to register a domain event callback.
Comments welcome.
Please CC me on replies.
diff --git a/libxkutil/Makefile.am b/libxkutil/Makefile.am
--- a/libxkutil/Makefile.am
+++ b/libxkutil/Makefile.am
@@ -5,14 +5,14 @@ SUBDIRS = tests
CFLAGS += $(CFLAGS_STRICT)
noinst_HEADERS = cs_util.h misc_util.h device_parsing.h xmlgen.h infostore.h \
- pool_parsing.h
+ pool_parsing.h event.h
lib_LTLIBRARIES = libxkutil.la
AM_LDFLAGS = -lvirt -luuid
libxkutil_la_SOURCES = cs_util_instance.c misc_util.c device_parsing.c \
- xmlgen.c infostore.c pool_parsing.c
+ xmlgen.c infostore.c pool_parsing.c event.c
noinst_PROGRAMS = xml_parse_test
diff --git a/libxkutil/event.c b/libxkutil/event.c
new file mode 100644
--- /dev/null
+++ b/libxkutil/event.c
@@ -0,0 +1,367 @@
+/*
+ * Copyright IBM Corp. 2009
+ *
+ * Authors:
+ * Hollis Blanchard <hollisb(a)us.ibm.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ */
+
+#include <stdio.h>
+#include <string.h>
+#include <stdlib.h>
+#include <stdbool.h>
+#include <stdarg.h>
+#include <unistd.h>
+#include <pthread.h>
+#include <errno.h>
+#include <poll.h>
+#include <values.h>
+
+#include <libvirt/libvirt.h>
+#include <libvirt/virterror.h>
+
+#include "event.h"
+
+struct timer {
+ int id;
+ int timeout;
+ void *opaque;
+ virFreeCallback ff;
+ struct timer *next;
+ bool deleted;
+};
+
+struct watch {
+ int id;
+ int fd;
+ int events;
+ virEventHandleCallback cb;
+ void *opaque;
+ virFreeCallback ff;
+ struct watch *next;
+ bool deleted;
+};
+
+static pthread_t watch_thread_id;
+
+static int next_watch_id;
+static struct watch *watch_list;
+static int watch_count;
+static pthread_mutex_t watch_list_mutex = PTHREAD_MUTEX_INITIALIZER;
+
+static int next_timer_id;
+static struct timer *timer_list;
+static pthread_mutex_t timer_list_mutex = PTHREAD_MUTEX_INITIALIZER;
+
+
+static int eventAddHandle(int fd, int events, virEventHandleCallback cb,
+ void *opaque, virFreeCallback ff)
+{
+ struct watch *watch;
+
+ CU_DEBUG("%s", __func__);
+
+ watch = malloc(sizeof(struct watch));
+ if (!watch)
+ return -ENOMEM;
+
+ watch->id = next_watch_id++;
+ watch->fd = fd;
+ watch->events = events;
+ watch->cb = cb;
+ watch->opaque = opaque;
+ watch->ff = ff;
+
+ pthread_mutex_lock(&watch_list_mutex);
+
+ watch->next = watch_list;
+ watch_list = watch;
+ watch_count++;
+
+ pthread_mutex_unlock(&watch_list_mutex);
+
+ return watch->id;
+}
+
+static void eventUpdateHandle(int id, int events)
+{
+ struct watch *cur;
+
+ CU_DEBUG("%s %d", __func__, id);
+
+ for (cur = watch_list; cur != NULL; cur = cur->next) {
+ if (cur->id == id) {
+ cur->events = events;
+ break;
+ }
+ }
+}
+
+/* To avoid locking problems, watches are just flagged here, and the memory is
+ * freed later. */
+static int eventRemoveHandle(int id)
+{
+ struct watch *cur;
+
+ CU_DEBUG("%s %d", __func__, id);
+
+ for (cur = watch_list; cur != NULL; cur = cur->next) {
+ if (cur->id == id) {
+ cur->deleted = 1;
+ break;
+ }
+ }
+
+ return 0;
+}
+
+/* Delete all watches marked for deletion. */
+static void event_watch_free_deleted(void)
+{
+ struct watch *cur;
+ struct watch **link;
+
+ CU_DEBUG("%s", __func__);
+
+ pthread_mutex_lock(&watch_list_mutex);
+
+ cur = watch_list;
+ link = &watch_list;
+ while (cur != NULL) {
+ struct watch *next = cur->next;
+
+ if (cur->deleted) {
+ *link = next;
+
+ cur->ff(cur->opaque);
+ free(cur);
+ watch_count--;
+ } else
+ link = &cur->next;
+
+ cur = next;
+ }
+
+ pthread_mutex_unlock(&watch_list_mutex);
+}
+
+static int eventAddTimeout(int timeout, virEventTimeoutCallback cb,
+ void *opaque, virFreeCallback ff)
+{
+ struct timer *timer;
+
+ CU_DEBUG("%s", __func__);
+
+ timer = malloc(sizeof(struct timer));
+ if (!timer)
+ return -ENOMEM;
+
+ timer->id = next_timer_id++;
+ timer->timeout = timeout;
+ timer->opaque = opaque;
+ timer->ff = ff;
+
+ pthread_mutex_lock(&timer_list_mutex);
+
+ timer->next = timer_list;
+ timer_list = timer;
+
+ pthread_mutex_unlock(&timer_list_mutex);
+
+ return timer->id;
+}
+
+static void eventUpdateTimeout(int id, int timeout)
+{
+ struct timer *cur;
+
+ CU_DEBUG("%s %d", __func__, id);
+
+ for (cur = timer_list; cur != NULL; cur = cur->next) {
+ if (cur->id == id) {
+ cur->timeout = timeout;
+ break;
+ }
+ }
+}
+
+static int eventRemoveTimeout(int id)
+{
+ struct timer *cur;
+
+ CU_DEBUG("%s %d", __func__, id);
+
+ for (cur = timer_list; cur != NULL; cur = cur->next) {
+ if (cur->id == id) {
+ cur->deleted = 1;
+ break;
+ }
+ }
+
+ return 0;
+}
+
+/* Delete all timers marked for deletion. */
+static void event_timer_free_deleted(void)
+{
+ struct timer *cur;
+ struct timer **link;
+
+ CU_DEBUG("%s", __func__);
+
+ pthread_mutex_lock(&timer_list_mutex);
+
+ cur = timer_list;
+ link = &timer_list;
+ while (cur != NULL) {
+ struct timer *next = cur->next;
+
+ if (cur->deleted) {
+ *link = next;
+
+ cur->ff(cur->opaque);
+ free(cur);
+ } else
+ link = &cur->next;
+
+ cur = next;
+ }
+
+ pthread_mutex_unlock(&timer_list_mutex);
+}
+
+
+
+static int poll_to_libvirt_events(int pevents)
+{
+ int vevents = 0;
+
+ if (pevents & POLLIN)
+ vevents |= VIR_EVENT_HANDLE_READABLE;
+
+ if (pevents & POLLOUT)
+ vevents |= VIR_EVENT_HANDLE_WRITABLE;
+
+ if (pevents & POLLERR)
+ vevents |= VIR_EVENT_HANDLE_ERROR;
+
+ if (pevents & POLLHUP)
+ vevents |= VIR_EVENT_HANDLE_HANGUP;
+
+ return vevents;
+}
+
+static int libvirt_to_poll_events(int vevents)
+{
+ int pevents = 0;
+
+ if (vevents & VIR_EVENT_HANDLE_READABLE)
+ pevents |= POLLIN;
+
+ if (vevents & VIR_EVENT_HANDLE_WRITABLE)
+ pevents |= POLLOUT;
+
+ if (vevents & VIR_EVENT_HANDLE_ERROR)
+ pevents |= POLLERR;
+
+ if (vevents & VIR_EVENT_HANDLE_HANGUP)
+ pevents |= POLLHUP;
+
+ return pevents;
+}
+
+static void invoke_callback(struct watch *watch, struct pollfd *pollfd)
+{
+ int vevents = poll_to_libvirt_events(watch->events);
+
+ watch->cb(watch->id, watch->fd, vevents, watch->opaque);
+}
+
+static int event_next_timeout(void)
+{
+ struct timer *cur;
+ int closest = MAXINT;
+
+ for (cur = timer_list; cur != NULL; cur = cur->next)
+ if (cur->timeout < closest)
+ closest = cur->timeout;
+
+ return closest;
+}
+
+/* One thread to watch all fds for all events for all libvirt threads. */
+static void *event_thread(void *ptr)
+{
+ while (1) {
+ struct watch *cur;
+ struct pollfd *pollfds;
+ struct pollfd *pollfd;
+ int timeout;
+ int i;
+
+ pollfds = malloc(sizeof(struct pollfd) * watch_count);
+
+ /* fill in pollfds array from our watch list */
+ for (pollfd = &pollfds[0], cur = watch_list;
+ cur != NULL;
+ pollfd++, cur = cur->next) {
+ pollfd->fd = cur->fd;
+ pollfd->events = libvirt_to_poll_events(cur->events);
+ }
+
+ timeout = event_next_timeout();
+
+ poll(pollfds, watch_count, timeout);
+
+ /* invoke callbacks */
+ for (i = 0; i < watch_count; i++)
+ for (cur = watch_list; cur != NULL; cur = cur->next)
+ if (cur->fd == pollfds[i].fd
+ && !cur->deleted) {
+ invoke_callback(cur, &pollfds[i]);
+ break;
+ }
+
+ free(pollfds);
+
+ event_watch_free_deleted();
+ event_timer_free_deleted();
+ }
+
+ return NULL;
+}
+
+void init_events(void)
+{
+ static pthread_mutex_t thread_mutex = PTHREAD_MUTEX_INITIALIZER;
+
+ CU_DEBUG("%s", __func__);
+
+ pthread_mutex_lock(&thread_mutex);
+
+ if (!watch_thread_id) {
+ virEventRegisterImpl(eventAddHandle,
+ eventUpdateHandle,
+ eventRemoveHandle,
+ eventAddTimeout,
+ eventUpdateTimeout,
+ eventRemoveTimeout);
+
+ pthread_create(&watch_thread_id, NULL, event_thread, NULL);
+ }
+
+ pthread_mutex_unlock(&thread_mutex);
+}
diff --git a/libxkutil/event.h b/libxkutil/event.h
new file mode 100644
--- /dev/null
+++ b/libxkutil/event.h
@@ -0,0 +1,29 @@
+/*
+ * Copyright IBM Corp. 2009
+ *
+ * Authors:
+ * Hollis Blanchard <hollisb(a)us.ibm.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ */
+
+#ifndef __EVENT_H
+#define __EVENT_H
+
+#include <libcmpiutil.h>
+
+void init_events(void);
+
+#endif /* __EVENT_H */
diff --git a/libxkutil/misc_util.c b/libxkutil/misc_util.c
--- a/libxkutil/misc_util.c
+++ b/libxkutil/misc_util.c
@@ -38,6 +38,7 @@
#include "misc_util.h"
#include "cs_util.h"
+#include "event.h"
#include <config.h>
@@ -490,6 +491,7 @@ bool parse_instanceid(const CMPIObjectPa
bool libvirt_cim_init(void)
{
+ init_events();
return virInitialize() == 0;
}