This patch makes the event loop in the libvirtd daemon thread safe, and
re-entrant. This allows one thread to add/remove/update timers/file handle
watches while other thread is doing the poll. This sometimes requires that
we wakeup the main thread to make it see changes to the poll FD list. We
use the traditional self-pipe trick for this task.
event.c | 168 ++++++++++++++++++++++++++++++++++++++++++++++++++++++----------
event.h | 14 +++++
qemud.c | 10 +++
qemud.h | 3 +
4 files changed, 167 insertions(+), 28 deletions(-)
Daniel
diff --git a/qemud/event.c b/qemud/event.c
--- a/qemud/event.c
+++ b/qemud/event.c
@@ -28,12 +28,16 @@
#include <poll.h>
#include <sys/time.h>
#include <errno.h>
+#include <unistd.h>
#include "qemud.h"
#include "event.h"
#include "memory.h"
+#include "util.h"
#define EVENT_DEBUG(fmt, ...) qemudDebug("EVENT: " fmt, __VA_ARGS__)
+
+static int virEventInterruptLocked(void);
/* State for a single file handle being monitored */
struct virEventHandle {
@@ -63,6 +67,9 @@ struct virEventTimeout {
/* State for the main event loop */
struct virEventLoop {
+ pthread_mutex_t lock;
+ pthread_t leader;
+ int wakeupfd[2];
int handlesCount;
int handlesAlloc;
struct virEventHandle *handles;
@@ -80,6 +87,16 @@ static int nextWatch = 0;
/* Unique ID for the next timer to be registered */
static int nextTimer = 0;
+static void virEventLock(void)
+{
+ pthread_mutex_lock(&eventLoop.lock);
+}
+
+static void virEventUnlock(void)
+{
+ pthread_mutex_unlock(&eventLoop.lock);
+}
+
/*
* Register a callback for monitoring file handle events.
* NB, it *must* be safe to call this from within a callback
@@ -89,17 +106,23 @@ int virEventAddHandleImpl(int fd, int ev
virEventHandleCallback cb,
void *opaque,
virFreeCallback ff) {
+ int watch;
EVENT_DEBUG("Add handle %d %d %p %p", fd, events, cb, opaque);
+ virEventLock();
if (eventLoop.handlesCount == eventLoop.handlesAlloc) {
EVENT_DEBUG("Used %d handle slots, adding %d more",
eventLoop.handlesAlloc, EVENT_ALLOC_EXTENT);
if (VIR_REALLOC_N(eventLoop.handles,
- (eventLoop.handlesAlloc + EVENT_ALLOC_EXTENT)) < 0)
+ (eventLoop.handlesAlloc + EVENT_ALLOC_EXTENT)) < 0) {
+ virEventUnlock();
return -1;
+ }
eventLoop.handlesAlloc += EVENT_ALLOC_EXTENT;
}
- eventLoop.handles[eventLoop.handlesCount].watch = nextWatch++;
+ watch = nextWatch++;
+
+ eventLoop.handles[eventLoop.handlesCount].watch = watch;
eventLoop.handles[eventLoop.handlesCount].fd = fd;
eventLoop.handles[eventLoop.handlesCount].events =
virEventHandleTypeToPollEvent(events);
@@ -110,11 +133,15 @@ int virEventAddHandleImpl(int fd, int ev
eventLoop.handlesCount++;
- return nextWatch-1;
+ virEventInterruptLocked();
+ virEventUnlock();
+
+ return watch;
}
void virEventUpdateHandleImpl(int watch, int events) {
int i;
+ virEventLock();
for (i = 0 ; i < eventLoop.handlesCount ; i++) {
if (eventLoop.handles[i].watch == watch) {
eventLoop.handles[i].events =
@@ -122,6 +149,8 @@ void virEventUpdateHandleImpl(int watch,
break;
}
}
+ virEventInterruptLocked();
+ virEventUnlock();
}
/*
@@ -133,6 +162,7 @@ int virEventRemoveHandleImpl(int watch)
int virEventRemoveHandleImpl(int watch) {
int i;
EVENT_DEBUG("Remove handle %d", watch);
+ virEventLock();
for (i = 0 ; i < eventLoop.handlesCount ; i++) {
if (eventLoop.handles[i].deleted)
continue;
@@ -140,9 +170,12 @@ int virEventRemoveHandleImpl(int watch)
if (eventLoop.handles[i].watch == watch) {
EVENT_DEBUG("mark delete %d %d", i, eventLoop.handles[i].fd);
eventLoop.handles[i].deleted = 1;
+ virEventUnlock();
return 0;
}
}
+ virEventInterruptLocked();
+ virEventUnlock();
return -1;
}
@@ -157,17 +190,21 @@ int virEventAddTimeoutImpl(int frequency
void *opaque,
virFreeCallback ff) {
struct timeval now;
+ int ret;
EVENT_DEBUG("Adding timer %d with %d ms freq", nextTimer, frequency);
if (gettimeofday(&now, NULL) < 0) {
return -1;
}
+ virEventLock();
if (eventLoop.timeoutsCount == eventLoop.timeoutsAlloc) {
EVENT_DEBUG("Used %d timeout slots, adding %d more",
eventLoop.timeoutsAlloc, EVENT_ALLOC_EXTENT);
if (VIR_REALLOC_N(eventLoop.timeouts,
- (eventLoop.timeoutsAlloc + EVENT_ALLOC_EXTENT)) < 0)
+ (eventLoop.timeoutsAlloc + EVENT_ALLOC_EXTENT)) < 0) {
+ virEventUnlock();
return -1;
+ }
eventLoop.timeoutsAlloc += EVENT_ALLOC_EXTENT;
}
@@ -183,8 +220,10 @@ int virEventAddTimeoutImpl(int frequency
(((unsigned long long)now.tv_usec)/1000) : 0;
eventLoop.timeoutsCount++;
-
- return nextTimer-1;
+ ret = nextTimer-1;
+ virEventInterruptLocked();
+ virEventUnlock();
+ return ret;
}
void virEventUpdateTimeoutImpl(int timer, int frequency) {
@@ -195,6 +234,7 @@ void virEventUpdateTimeoutImpl(int timer
return;
}
+ virEventLock();
for (i = 0 ; i < eventLoop.timeoutsCount ; i++) {
if (eventLoop.timeouts[i].timer == timer) {
eventLoop.timeouts[i].frequency = frequency;
@@ -205,6 +245,8 @@ void virEventUpdateTimeoutImpl(int timer
break;
}
}
+ virEventInterruptLocked();
+ virEventUnlock();
}
/*
@@ -216,15 +258,19 @@ int virEventRemoveTimeoutImpl(int timer)
int virEventRemoveTimeoutImpl(int timer) {
int i;
EVENT_DEBUG("Remove timer %d", timer);
+ virEventLock();
for (i = 0 ; i < eventLoop.timeoutsCount ; i++) {
if (eventLoop.timeouts[i].deleted)
continue;
if (eventLoop.timeouts[i].timer == timer) {
eventLoop.timeouts[i].deleted = 1;
+ virEventUnlock();
return 0;
}
}
+ virEventInterruptLocked();
+ virEventUnlock();
return -1;
}
@@ -336,10 +382,15 @@ static int virEventDispatchTimeouts(void
continue;
if (eventLoop.timeouts[i].expiresAt <= now) {
- (eventLoop.timeouts[i].cb)(eventLoop.timeouts[i].timer,
- eventLoop.timeouts[i].opaque);
+ virEventTimeoutCallback cb = eventLoop.timeouts[i].cb;
+ int timer = eventLoop.timeouts[i].timer;
+ void *opaque = eventLoop.timeouts[i].opaque;
eventLoop.timeouts[i].expiresAt =
now + eventLoop.timeouts[i].frequency;
+
+ virEventUnlock();
+ (cb)(timer, opaque);
+ virEventLock();
}
}
return 0;
@@ -356,28 +407,25 @@ static int virEventDispatchTimeouts(void
*
* Returns 0 upon success, -1 if an error occurred
*/
-static int virEventDispatchHandles(struct pollfd *fds) {
+static int virEventDispatchHandles(int nfds, struct pollfd *fds) {
int i;
- virEventHandleType hEvents;
- /* Save this now - it may be changed during dispatch */
- int nhandles = eventLoop.handlesCount;
- for (i = 0 ; i < nhandles ; i++) {
+ for (i = 0 ; i < nfds ; i++) {
if (eventLoop.handles[i].deleted) {
EVENT_DEBUG("Skip deleted %d", eventLoop.handles[i].fd);
continue;
}
if (fds[i].revents) {
- hEvents = virPollEventToEventHandleType(fds[i].revents);
- EVENT_DEBUG("Dispatch %d %d %d %p",
- eventLoop.handles[i].watch,
- fds[i].fd, fds[i].revents,
- eventLoop.handles[i].opaque);
- (eventLoop.handles[i].cb)(eventLoop.handles[i].watch,
- fds[i].fd,
- hEvents,
- eventLoop.handles[i].opaque);
+ virEventHandleCallback cb = eventLoop.handles[i].cb;
+ void *opaque = eventLoop.handles[i].opaque;
+ int hEvents = virPollEventToEventHandleType(fds[i].revents);
+ EVENT_DEBUG("Dispatch %d %d %p", fds[i].fd,
+ fds[i].revents, eventLoop.handles[i].opaque);
+ virEventUnlock();
+ (cb)(eventLoop.handles[i].watch,
+ fds[i].fd, hEvents, opaque);
+ virEventLock();
}
}
@@ -472,13 +520,20 @@ int virEventRunOnce(void) {
struct pollfd *fds;
int ret, timeout, nfds;
- if ((nfds = virEventMakePollFDs(&fds)) < 0)
+ virEventLock();
+ eventLoop.leader = pthread_self();
+ if ((nfds = virEventMakePollFDs(&fds)) < 0) {
+ virEventUnlock();
return -1;
+ }
if (virEventCalculateTimeout(&timeout) < 0) {
VIR_FREE(fds);
+ virEventUnlock();
return -1;
}
+
+ virEventUnlock();
retry:
EVENT_DEBUG("Poll on %d handles %p timeout %d", nfds, fds, timeout);
@@ -491,25 +546,86 @@ int virEventRunOnce(void) {
VIR_FREE(fds);
return -1;
}
+
+ virEventLock();
if (virEventDispatchTimeouts() < 0) {
VIR_FREE(fds);
+ virEventUnlock();
return -1;
}
if (ret > 0 &&
- virEventDispatchHandles(fds) < 0) {
+ virEventDispatchHandles(nfds, fds) < 0) {
VIR_FREE(fds);
+ virEventUnlock();
return -1;
}
VIR_FREE(fds);
- if (virEventCleanupTimeouts() < 0)
+ if (virEventCleanupTimeouts() < 0) {
+ virEventUnlock();
+ return -1;
+ }
+
+ if (virEventCleanupHandles() < 0) {
+ virEventUnlock();
+ return -1;
+ }
+
+ eventLoop.leader = 0;
+ virEventUnlock();
+ return 0;
+}
+
+static void virEventHandleWakeup(int watch ATTRIBUTE_UNUSED,
+ int fd,
+ int events ATTRIBUTE_UNUSED,
+ void *opaque ATTRIBUTE_UNUSED)
+{
+ char c;
+ virEventLock();
+ saferead(fd, &c, sizeof(c));
+ virEventUnlock();
+}
+
+int virEventInit(void)
+{
+ if (pthread_mutex_init(&eventLoop.lock, NULL) != 0)
return -1;
- if (virEventCleanupHandles() < 0)
+ if (pipe(eventLoop.wakeupfd) < 0 ||
+ qemudSetNonBlock(eventLoop.wakeupfd[0]) < 0 ||
+ qemudSetNonBlock(eventLoop.wakeupfd[1]) < 0 ||
+ qemudSetCloseExec(eventLoop.wakeupfd[0]) < 0 ||
+ qemudSetCloseExec(eventLoop.wakeupfd[1]) < 0)
+ return -1;
+
+ if (virEventAddHandleImpl(eventLoop.wakeupfd[0],
+ VIR_EVENT_HANDLE_READABLE,
+ virEventHandleWakeup, NULL, NULL) < 0)
return -1;
return 0;
+}
+
+static int virEventInterruptLocked(void)
+{
+ char c = '\0';
+ if (pthread_self() == eventLoop.leader)
+ return 0;
+
+ if (safewrite(eventLoop.wakeupfd[1], &c, sizeof(c)) != sizeof(c))
+ return -1;
+ return 0;
+}
+
+int virEventInterrupt(void)
+{
+ int ret;
+ virEventLock();
+ ret = virEventInterruptLocked();
+ virEventUnlock();
+ return ret;
}
int
diff --git a/qemud/event.h b/qemud/event.h
--- a/qemud/event.h
+++ b/qemud/event.h
@@ -101,6 +101,13 @@ int virEventRemoveTimeoutImpl(int timer)
int virEventRemoveTimeoutImpl(int timer);
/**
+ * virEventInit: Initialize the event loop
+ *
+ * returns -1 if initialization failed
+ */
+int virEventInit(void);
+
+/**
* virEventRunOnce: run a single iteration of the event loop.
*
* Blocks the caller until at least one file handle has an
@@ -116,5 +123,12 @@ virPollEventToEventHandleType(int events
virPollEventToEventHandleType(int events);
+/**
+ * virEventInterrupt: wakeup any thread waiting in poll()
+ *
+ * return -1 if wakup failed
+ */
+int virEventInterrupt(void);
+
#endif /* __VIRTD_EVENT_H__ */
diff --git a/qemud/qemud.c b/qemud/qemud.c
--- a/qemud/qemud.c
+++ b/qemud/qemud.c
@@ -296,7 +296,7 @@ qemudDispatchSignalEvent(int watch ATTRI
server->shutdown = 1;
}
-static int qemudSetCloseExec(int fd) {
+int qemudSetCloseExec(int fd) {
int flags;
if ((flags = fcntl(fd, F_GETFD)) < 0)
goto error;
@@ -311,7 +311,7 @@ static int qemudSetCloseExec(int fd) {
}
-static int qemudSetNonBlock(int fd) {
+int qemudSetNonBlock(int fd) {
int flags;
if ((flags = fcntl(fd, F_GETFL)) < 0)
goto error;
@@ -752,6 +752,12 @@ static struct qemud_server *qemudInitial
}
server->sigread = sigread;
+
+ if (virEventInit() < 0) {
+ qemudLog(QEMUD_ERR, "%s", _("Failed to initialize event
system"));
+ VIR_FREE(server);
+ return NULL;
+ }
virInitialize();
diff --git a/qemud/qemud.h b/qemud/qemud.h
--- a/qemud/qemud.h
+++ b/qemud/qemud.h
@@ -177,6 +177,9 @@ void qemudLog(int priority, const char *
#define qemudDebug(fmt, ...) do {} while(0)
#endif
+int qemudSetCloseExec(int fd);
+int qemudSetNonBlock(int fd);
+
unsigned int
remoteDispatchClientRequest (struct qemud_server *server,
struct qemud_client *client);
--
|: Red Hat, Engineering, London -o-
http://people.redhat.com/berrange/ :|
|:
http://libvirt.org -o-
http://virt-manager.org -o-
http://ovirt.org :|
|:
http://autobuild.org -o-
http://search.cpan.org/~danberr/ :|
|: GnuPG: 7D3B9505 -o- F3C9 553F A1DA 4AC2 5648 23C1 B3DF F742 7D3B 9505 :|