
# HG changeset patch # User Hollis Blanchard <hollisb@us.ibm.com> # Date 1250181138 25200 # Node ID 36aa26ca62cabda72e027f075889c7a0973a4e6b # Parent c345ed88f03432da53efee161d505bd64c02c32e [RFC] Implement libvirt event callback management. Libvirt requires that users implement their own event monitoring and callback-management infrastructure. Actually, two similar lists are needed: one for file descriptors to monitor, and one for pending timers. This patch starts exactly one event-monitoring thread per libvirt-cim instance. Multiple provider threads in the same process will share this thread. Signed-off-by: Hollis Blanchard <hollisb@us.ibm.com> --- This builds but isn't tested, and may very well contain embarrassing list manipulation or locking bugs. It really needs a consumer to test, i.e. for someone to register a domain event callback. Comments welcome. Please CC me on replies. diff --git a/libxkutil/Makefile.am b/libxkutil/Makefile.am --- a/libxkutil/Makefile.am +++ b/libxkutil/Makefile.am @@ -5,14 +5,14 @@ SUBDIRS = tests CFLAGS += $(CFLAGS_STRICT) noinst_HEADERS = cs_util.h misc_util.h device_parsing.h xmlgen.h infostore.h \ - pool_parsing.h + pool_parsing.h event.h lib_LTLIBRARIES = libxkutil.la AM_LDFLAGS = -lvirt -luuid libxkutil_la_SOURCES = cs_util_instance.c misc_util.c device_parsing.c \ - xmlgen.c infostore.c pool_parsing.c + xmlgen.c infostore.c pool_parsing.c event.c noinst_PROGRAMS = xml_parse_test diff --git a/libxkutil/event.c b/libxkutil/event.c new file mode 100644 --- /dev/null +++ b/libxkutil/event.c @@ -0,0 +1,367 @@ +/* + * Copyright IBM Corp. 2009 + * + * Authors: + * Hollis Blanchard <hollisb@us.ibm.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ + +#include <stdio.h> +#include <string.h> +#include <stdlib.h> +#include <stdbool.h> +#include <stdarg.h> +#include <unistd.h> +#include <pthread.h> +#include <errno.h> +#include <poll.h> +#include <values.h> + +#include <libvirt/libvirt.h> +#include <libvirt/virterror.h> + +#include "event.h" + +struct timer { + int id; + int timeout; + void *opaque; + virFreeCallback ff; + struct timer *next; + bool deleted; +}; + +struct watch { + int id; + int fd; + int events; + virEventHandleCallback cb; + void *opaque; + virFreeCallback ff; + struct watch *next; + bool deleted; +}; + +static pthread_t watch_thread_id; + +static int next_watch_id; +static struct watch *watch_list; +static int watch_count; +static pthread_mutex_t watch_list_mutex = PTHREAD_MUTEX_INITIALIZER; + +static int next_timer_id; +static struct timer *timer_list; +static pthread_mutex_t timer_list_mutex = PTHREAD_MUTEX_INITIALIZER; + + +static int eventAddHandle(int fd, int events, virEventHandleCallback cb, + void *opaque, virFreeCallback ff) +{ + struct watch *watch; + + CU_DEBUG("%s", __func__); + + watch = malloc(sizeof(struct watch)); + if (!watch) + return -ENOMEM; + + watch->id = next_watch_id++; + watch->fd = fd; + watch->events = events; + watch->cb = cb; + watch->opaque = opaque; + watch->ff = ff; + + pthread_mutex_lock(&watch_list_mutex); + + watch->next = watch_list; + watch_list = watch; + watch_count++; + + pthread_mutex_unlock(&watch_list_mutex); + + return watch->id; +} + +static void eventUpdateHandle(int id, int events) +{ + struct watch *cur; + + CU_DEBUG("%s %d", __func__, id); + + for (cur = watch_list; cur != NULL; cur = cur->next) { + if (cur->id == id) { + cur->events = events; + break; + } + } +} + +/* To avoid locking problems, watches are just flagged here, and the memory is + * freed later. */ +static int eventRemoveHandle(int id) +{ + struct watch *cur; + + CU_DEBUG("%s %d", __func__, id); + + for (cur = watch_list; cur != NULL; cur = cur->next) { + if (cur->id == id) { + cur->deleted = 1; + break; + } + } + + return 0; +} + +/* Delete all watches marked for deletion. */ +static void event_watch_free_deleted(void) +{ + struct watch *cur; + struct watch **link; + + CU_DEBUG("%s", __func__); + + pthread_mutex_lock(&watch_list_mutex); + + cur = watch_list; + link = &watch_list; + while (cur != NULL) { + struct watch *next = cur->next; + + if (cur->deleted) { + *link = next; + + cur->ff(cur->opaque); + free(cur); + watch_count--; + } else + link = &cur->next; + + cur = next; + } + + pthread_mutex_unlock(&watch_list_mutex); +} + +static int eventAddTimeout(int timeout, virEventTimeoutCallback cb, + void *opaque, virFreeCallback ff) +{ + struct timer *timer; + + CU_DEBUG("%s", __func__); + + timer = malloc(sizeof(struct timer)); + if (!timer) + return -ENOMEM; + + timer->id = next_timer_id++; + timer->timeout = timeout; + timer->opaque = opaque; + timer->ff = ff; + + pthread_mutex_lock(&timer_list_mutex); + + timer->next = timer_list; + timer_list = timer; + + pthread_mutex_unlock(&timer_list_mutex); + + return timer->id; +} + +static void eventUpdateTimeout(int id, int timeout) +{ + struct timer *cur; + + CU_DEBUG("%s %d", __func__, id); + + for (cur = timer_list; cur != NULL; cur = cur->next) { + if (cur->id == id) { + cur->timeout = timeout; + break; + } + } +} + +static int eventRemoveTimeout(int id) +{ + struct timer *cur; + + CU_DEBUG("%s %d", __func__, id); + + for (cur = timer_list; cur != NULL; cur = cur->next) { + if (cur->id == id) { + cur->deleted = 1; + break; + } + } + + return 0; +} + +/* Delete all timers marked for deletion. */ +static void event_timer_free_deleted(void) +{ + struct timer *cur; + struct timer **link; + + CU_DEBUG("%s", __func__); + + pthread_mutex_lock(&timer_list_mutex); + + cur = timer_list; + link = &timer_list; + while (cur != NULL) { + struct timer *next = cur->next; + + if (cur->deleted) { + *link = next; + + cur->ff(cur->opaque); + free(cur); + } else + link = &cur->next; + + cur = next; + } + + pthread_mutex_unlock(&timer_list_mutex); +} + + + +static int poll_to_libvirt_events(int pevents) +{ + int vevents = 0; + + if (pevents & POLLIN) + vevents |= VIR_EVENT_HANDLE_READABLE; + + if (pevents & POLLOUT) + vevents |= VIR_EVENT_HANDLE_WRITABLE; + + if (pevents & POLLERR) + vevents |= VIR_EVENT_HANDLE_ERROR; + + if (pevents & POLLHUP) + vevents |= VIR_EVENT_HANDLE_HANGUP; + + return vevents; +} + +static int libvirt_to_poll_events(int vevents) +{ + int pevents = 0; + + if (vevents & VIR_EVENT_HANDLE_READABLE) + pevents |= POLLIN; + + if (vevents & VIR_EVENT_HANDLE_WRITABLE) + pevents |= POLLOUT; + + if (vevents & VIR_EVENT_HANDLE_ERROR) + pevents |= POLLERR; + + if (vevents & VIR_EVENT_HANDLE_HANGUP) + pevents |= POLLHUP; + + return pevents; +} + +static void invoke_callback(struct watch *watch, struct pollfd *pollfd) +{ + int vevents = poll_to_libvirt_events(watch->events); + + watch->cb(watch->id, watch->fd, vevents, watch->opaque); +} + +static int event_next_timeout(void) +{ + struct timer *cur; + int closest = MAXINT; + + for (cur = timer_list; cur != NULL; cur = cur->next) + if (cur->timeout < closest) + closest = cur->timeout; + + return closest; +} + +/* One thread to watch all fds for all events for all libvirt threads. */ +static void *event_thread(void *ptr) +{ + while (1) { + struct watch *cur; + struct pollfd *pollfds; + struct pollfd *pollfd; + int timeout; + int i; + + pollfds = malloc(sizeof(struct pollfd) * watch_count); + + /* fill in pollfds array from our watch list */ + for (pollfd = &pollfds[0], cur = watch_list; + cur != NULL; + pollfd++, cur = cur->next) { + pollfd->fd = cur->fd; + pollfd->events = libvirt_to_poll_events(cur->events); + } + + timeout = event_next_timeout(); + + poll(pollfds, watch_count, timeout); + + /* invoke callbacks */ + for (i = 0; i < watch_count; i++) + for (cur = watch_list; cur != NULL; cur = cur->next) + if (cur->fd == pollfds[i].fd + && !cur->deleted) { + invoke_callback(cur, &pollfds[i]); + break; + } + + free(pollfds); + + event_watch_free_deleted(); + event_timer_free_deleted(); + } + + return NULL; +} + +void init_events(void) +{ + static pthread_mutex_t thread_mutex = PTHREAD_MUTEX_INITIALIZER; + + CU_DEBUG("%s", __func__); + + pthread_mutex_lock(&thread_mutex); + + if (!watch_thread_id) { + virEventRegisterImpl(eventAddHandle, + eventUpdateHandle, + eventRemoveHandle, + eventAddTimeout, + eventUpdateTimeout, + eventRemoveTimeout); + + pthread_create(&watch_thread_id, NULL, event_thread, NULL); + } + + pthread_mutex_unlock(&thread_mutex); +} diff --git a/libxkutil/event.h b/libxkutil/event.h new file mode 100644 --- /dev/null +++ b/libxkutil/event.h @@ -0,0 +1,29 @@ +/* + * Copyright IBM Corp. 2009 + * + * Authors: + * Hollis Blanchard <hollisb@us.ibm.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ + +#ifndef __EVENT_H +#define __EVENT_H + +#include <libcmpiutil.h> + +void init_events(void); + +#endif /* __EVENT_H */ diff --git a/libxkutil/misc_util.c b/libxkutil/misc_util.c --- a/libxkutil/misc_util.c +++ b/libxkutil/misc_util.c @@ -38,6 +38,7 @@ #include "misc_util.h" #include "cs_util.h" +#include "event.h" #include <config.h> @@ -490,6 +491,7 @@ bool parse_instanceid(const CMPIObjectPa bool libvirt_cim_init(void) { + init_events(); return virInitialize() == 0; }