This implements very simple thread pool to process dbus messages in
separate threads. We don't need to handle queue for messages because
dbus does that for us.
The default thread count will be currently 4 and it is also
configurable via --threads parameter for the libvirt-dbus daemon.
Signed-off-by: Pavel Hrdina <phrdina(a)redhat.com>
---
src/Makefile.am | 1 +
src/connect.c | 14 +++++++
src/connect.h | 3 ++
src/main.c | 118 +++++++++++++++++++++++++++++++++++++++++++++++++-------
src/util.c | 28 ++++++++++++++
src/util.h | 9 +++++
6 files changed, 159 insertions(+), 14 deletions(-)
diff --git a/src/Makefile.am b/src/Makefile.am
index 9e23f1b..73bbfd9 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -32,6 +32,7 @@ libvirt_dbus_LDFLAGS = \
$(DBUS_LDFLAGS) \
$(RELRO_LDFLAGS) \
$(PID_LDFLAGS) \
+ -lpthread \
$(NULL)
libvirt_dbus_LDADD = \
diff --git a/src/connect.c b/src/connect.c
index 2fe305f..41aba5f 100644
--- a/src/connect.c
+++ b/src/connect.c
@@ -56,10 +56,21 @@ virtDBusConnectClose(virtDBusConnect *connect,
connect->connection = NULL;
}
+static void
+virtDBusConnectUnlock(pthread_mutex_t **lock)
+{
+ if (lock)
+ pthread_mutex_unlock(*lock);
+}
+
int
virtDBusConnectOpen(virtDBusConnect *connect,
virtDBusMessage *msg)
{
+ _cleanup_(virtDBusConnectUnlock) pthread_mutex_t *lock = &connect->lock;
+
+ pthread_mutex_lock(lock);
+
if (connect->connection) {
if (virConnectIsAlive(connect->connection))
return 0;
@@ -212,6 +223,9 @@ virtDBusConnectNew(virtDBusConnect **connectp,
connect->uri = uri;
connect->connectPath = connectPath;
+ if (virtDBusUtilMutexInit(&connect->lock) != 0)
+ return -1;
+
if (virtDBusObjectListRegister(objectList,
connect->connectPath,
&introspectXML,
diff --git a/src/connect.h b/src/connect.h
index e685b41..c6026c9 100644
--- a/src/connect.h
+++ b/src/connect.h
@@ -4,10 +4,13 @@
#include <dbus/dbus.h>
#include <libvirt/libvirt.h>
+#include <pthread.h>
#define VIRT_DBUS_CONNECT_INTERFACE "org.libvirt.Connect"
struct _virtDBusConnect {
+ pthread_mutex_t lock;
+
DBusConnection *bus;
const char *uri;
const char *connectPath;
diff --git a/src/main.c b/src/main.c
index bef5dcc..1808620 100644
--- a/src/main.c
+++ b/src/main.c
@@ -8,6 +8,7 @@
#include <errno.h>
#include <getopt.h>
#include <poll.h>
+#include <pthread.h>
#include <signal.h>
#include <stdbool.h>
#include <stdio.h>
@@ -16,21 +17,98 @@
#include <sys/signalfd.h>
#include <unistd.h>
+#define VIRT_DBUS_THREADS 4
+
static int loop_status = 0;
+static pthread_mutex_t loopStatusLock = PTHREAD_MUTEX_INITIALIZER;
+
+static int
+virtDBusLoopStatusGet(void)
+{
+ int ret;
+ pthread_mutex_lock(&loopStatusLock);
+ ret = loop_status;
+ pthread_mutex_unlock(&loopStatusLock);
+ return ret;
+}
+
+static void
+virtDBusLoopStatusSet(int val)
+{
+ pthread_mutex_lock(&loopStatusLock);
+ loop_status = val;
+ pthread_mutex_unlock(&loopStatusLock);
+}
+
+struct _virtDBusDispatchData {
+ DBusConnection *bus;
+ virtDBusObjectList *objectList;
+};
+
+static pthread_cond_t threadLoopCond;
+static pthread_mutex_t threadLoopLock;
static int
virtDBusProcessEvents(DBusConnection *bus,
virtDBusObjectList *objectList)
{
for (;;) {
- int r;
+ int r;
- r = virtDBusDispatchMessage(bus, objectList);
- if (r < 0)
- return r;
+ r = virtDBusDispatchMessage(bus, objectList);
+ if (r < 0)
+ return r;
- if (r == 0)
- break;
+ if (r == 0)
+ break;
+ }
+
+ return 0;
+}
+
+static void *
+virtDBusDispatchThread(void *opaque)
+{
+ struct _virtDBusDispatchData *data = opaque;
+
+ while(true) {
+ if (pthread_cond_wait(&threadLoopCond, &threadLoopLock) != 0) {
+ virtDBusLoopStatusSet(errno);
+ return NULL;
+ }
+ if (virtDBusLoopStatusGet() != 0)
+ return NULL;
+
+ if (virtDBusProcessEvents(data->bus, data->objectList) < 0) {
+ virtDBusLoopStatusSet(-ENOMEM);
+ return NULL;
+ }
+ }
+
+ return NULL;
+}
+
+static void
+virtDBusDispatch(void)
+{
+ pthread_cond_broadcast(&threadLoopCond);
+}
+
+static int
+virtDBusStartThreads(struct _virtDBusDispatchData *data,
+ int threads)
+{
+ if (pthread_cond_init(&threadLoopCond, NULL) != 0)
+ return -1;
+
+ if (virtDBusUtilMutexInit(&threadLoopLock) != 0)
+ return -1;
+
+ for (int i = 0; i < threads; i++) {
+ pthread_t thread;
+
+ if (pthread_create(&thread, NULL, virtDBusDispatchThread, data) != 0)
+ return -1;
}
return 0;
@@ -49,7 +127,7 @@ virtDBusHandleSignal(int watch VIRT_ATTR_UNUSED,
int events VIRT_ATTR_UNUSED,
void *opaque VIRT_ATTR_UNUSED)
{
- loop_status = -ECANCELED;
+ virtDBusLoopStatusSet(-ECANCELED);
}
struct virtDBusDriver {
@@ -91,6 +169,7 @@ main(int argc, char *argv[])
{ "help", no_argument, NULL, 'h' },
{ "system", no_argument, NULL, ARG_SYSTEM },
{ "session", no_argument, NULL, ARG_SESSION },
+ { "threads", required_argument, NULL, 't' },
{}
};
@@ -106,6 +185,7 @@ main(int argc, char *argv[])
sigset_t mask;
int c;
int r;
+ int threads = VIRT_DBUS_THREADS;
if (geteuid() == 0) {
busType = DBUS_BUS_SYSTEM;
@@ -113,7 +193,7 @@ main(int argc, char *argv[])
busType = DBUS_BUS_SESSION;
}
- while ((c = getopt_long(argc, argv, "hc:", options, NULL)) >= 0) {
+ while ((c = getopt_long(argc, argv, "ht:", options, NULL)) >= 0) {
switch (c) {
case 'h':
printf("Usage: %s [OPTIONS]\n",
program_invocation_short_name);
@@ -123,8 +203,16 @@ main(int argc, char *argv[])
printf(" -h, --help Display this help text and
exit\n");
printf(" --session Connect to the session bus\n");
printf(" --system Connect to the system bus\n");
+ printf(" -t, --threads Configure count of worker
threads\n");
return 0;
+ case 't':
+ if (virtDBusUtilStrToInt(optarg, 10, &threads) < 0) {
+ fprintf(stderr, "Failed to parse --threads.\n");
+ return EXIT_FAILURE;
+ }
+ break;
+
case ARG_SYSTEM:
busType = DBUS_BUS_SYSTEM;
break;
@@ -179,11 +267,13 @@ main(int argc, char *argv[])
return EXIT_FAILURE;
}
- r = virtDBusProcessEvents(bus, &objectList);
- if (r < 0)
- return EXIT_FAILURE;
+ struct _virtDBusDispatchData data = { bus, &objectList };
+
+ virtDBusStartThreads(&data, threads);
+
+ virtDBusDispatch();
- while (loop_status >= 0) {
+ while ((r = virtDBusLoopStatusGet()) >= 0) {
virEventRunDefaultImpl();
r = virtDBusProcessEvents(bus, &objectList);
@@ -191,8 +281,8 @@ main(int argc, char *argv[])
return EXIT_FAILURE;
}
- if (loop_status < 0 && loop_status != -ECANCELED) {
- fprintf(stderr, "Error: %s\n", strerror(-loop_status));
+ if (r < 0 && r != -ECANCELED) {
+ fprintf(stderr, "Error: %s\n", strerror(-r));
return EXIT_FAILURE;
}
diff --git a/src/util.c b/src/util.c
index 6f8b7be..fe9e023 100644
--- a/src/util.c
+++ b/src/util.c
@@ -2,6 +2,7 @@
#include "util.h"
+#include <errno.h>
#include <fcntl.h>
#include <libvirt/virterror.h>
#include <stdio.h>
@@ -90,6 +91,33 @@ virtDBusUtilSetLastVirtError(virtDBusMessage *msg)
virError->message);
}
+int
+virtDBusUtilMutexInit(pthread_mutex_t *mutex)
+{
+ _cleanup_(pthread_mutexattr_destroy) pthread_mutexattr_t attr;
+
+ pthread_mutexattr_init(&attr);
+ pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_NORMAL);
+ return pthread_mutex_init(mutex, &attr);
+}
+
+int
+virtDBusUtilStrToInt(char const *string,
+ int base,
+ int *result)
+{
+ long int val;
+ char *ptr;
+
+ errno = 0;
+ val = strtol(string, &ptr, base);
+ if (errno != 0 || *ptr != 0 || ptr == string || (int) val != val)
+ return -1;
+
+ *result = val;
+ return 0;
+}
+
char *
virtDBusUtilReadFile(const char *filename)
{
diff --git a/src/util.h b/src/util.h
index 16c54df..afb118c 100644
--- a/src/util.h
+++ b/src/util.h
@@ -3,6 +3,7 @@
#include "dbus.h"
#include <libvirt/libvirt.h>
+#include <pthread.h>
#define VIRT_DBUS_ERROR_INTERFACE "org.libvirt.Error"
@@ -26,6 +27,14 @@ virtDBusUtilMessageAppendTypedParameters(virtDBusMessage *msg,
int
virtDBusUtilSetLastVirtError(virtDBusMessage *msg);
+int
+virtDBusUtilMutexInit(pthread_mutex_t *mutex);
+
+int
+virtDBusUtilStrToInt(char const *string,
+ int base,
+ int *result);
+
char *
virtDBusUtilReadFile(const char *filename);
--
2.14.3