The default thread count is currently 4 and it is also configurable
via --threads/-t paramter for the libvirt-dbus daemon.
Signed-off-by: Pavel Hrdina <phrdina(a)redhat.com>
---
src/connect.c | 4 +++
src/connect.h | 1 +
src/gdbus.c | 97 ++++++++++++++++++++++++++++++++++++++++++++++-------------
src/gdbus.h | 4 +++
src/main.c | 10 ++++++
5 files changed, 95 insertions(+), 21 deletions(-)
diff --git a/src/connect.c b/src/connect.c
index 70af8ac..bf97cd5 100644
--- a/src/connect.c
+++ b/src/connect.c
@@ -56,6 +56,8 @@ gboolean
virtDBusConnectOpen(virtDBusConnect *connect,
GError **error)
{
+ g_autoptr(GMutexLocker) lock = g_mutex_locker_new(&connect->lock);
+
if (connect->connection) {
if (virConnectIsAlive(connect->connection))
return TRUE;
@@ -213,6 +215,8 @@ virtDBusConnectNew(virtDBusConnect **connectp,
connect = g_new0(virtDBusConnect, 1);
+ g_mutex_init(&connect->lock);
+
for (gint i = 0; i < VIR_DOMAIN_EVENT_ID_LAST; i += 1)
connect->callback_ids[i] = -1;
diff --git a/src/connect.h b/src/connect.h
index bfe2995..9572857 100644
--- a/src/connect.h
+++ b/src/connect.h
@@ -14,6 +14,7 @@ struct virtDBusConnect {
const gchar *connectPath;
gchar *domainPath;
virConnectPtr connection;
+ GMutex lock;
gint callback_ids[VIR_DOMAIN_EVENT_ID_LAST];
};
diff --git a/src/gdbus.c b/src/gdbus.c
index cbfd07d..688dc38 100644
--- a/src/gdbus.c
+++ b/src/gdbus.c
@@ -16,6 +16,16 @@ struct _virtDBusGDBusSubtreeData {
};
typedef struct _virtDBusGDBusSubtreeData virtDBusGDBusSubtreeData;
+struct _virtDBusGDBusThreadData {
+ const gchar *objectPath;
+ const gchar *interfaceName;
+ const gchar *methodName;
+ GVariant *parameters;
+ GDBusMethodInvocation *invocation;
+ virtDBusGDBusMethodData *methodData;
+};
+typedef struct _virtDBusGDBusThreadData virtDBusGDBusThreadData;
+
static const gchar *dbusInterfacePrefix = NULL;
/**
@@ -226,6 +236,38 @@ virtDBusGDBusHandleMethod(GVariant *parameters,
outFDs);
}
+static void
+virtDBusGDBusMethodCallThread(gpointer threadData,
+ gpointer userData G_GNUC_UNUSED)
+{
+ g_autofree virtDBusGDBusThreadData *data = threadData;
+
+ if (g_strcmp0(data->interfaceName, "org.freedesktop.DBus.Properties") ==
0) {
+ if (g_strcmp0(data->methodName, "Get") == 0) {
+ virtDBusGDBusHandlePropertyGet(data->parameters, data->invocation,
+ data->objectPath, data->methodData);
+ } else if (g_strcmp0(data->methodName, "Set") == 0) {
+ virtDBusGDBusHandlePropertySet(data->parameters, data->invocation,
+ data->objectPath, data->methodData);
+ } else if (g_strcmp0(data->methodName, "GetAll") == 0) {
+ virtDBusGDBusHandlePropertyGetAll(data->invocation, data->objectPath,
+ data->methodData);
+ } else {
+ g_dbus_method_invocation_return_error(data->invocation,
+ G_DBUS_ERROR,
+ G_DBUS_ERROR_UNKNOWN_METHOD,
+ "unknown method
'%s'",
+ data->methodName);
+ }
+ } else {
+ virtDBusGDBusHandleMethod(data->parameters, data->invocation,
+ data->objectPath, data->methodName,
+ data->methodData);
+ }
+}
+
+GThreadPool *threadPool;
+
static void
virtDBusGDBusHandleMethodCall(GDBusConnection *connection G_GNUC_UNUSED,
const gchar *sender G_GNUC_UNUSED,
@@ -236,27 +278,18 @@ virtDBusGDBusHandleMethodCall(GDBusConnection *connection
G_GNUC_UNUSED,
GDBusMethodInvocation *invocation,
gpointer userData)
{
- virtDBusGDBusMethodData *data = userData;
-
- if (g_strcmp0(interfaceName, "org.freedesktop.DBus.Properties") == 0) {
- if (g_strcmp0(methodName, "Get") == 0) {
- virtDBusGDBusHandlePropertyGet(parameters, invocation,
- objectPath, data);
- } else if (g_strcmp0(methodName, "Set") == 0) {
- virtDBusGDBusHandlePropertySet(parameters, invocation,
- objectPath, data);
- } else if (g_strcmp0(methodName, "GetAll") == 0) {
- virtDBusGDBusHandlePropertyGetAll(invocation, objectPath, data);
- } else {
- g_dbus_method_invocation_return_error(invocation,
- G_DBUS_ERROR,
- G_DBUS_ERROR_UNKNOWN_METHOD,
- "unknown method
'%s'", methodName);
- }
- } else {
- virtDBusGDBusHandleMethod(parameters, invocation, objectPath,
- methodName, data);
- }
+ virtDBusGDBusThreadData *data = g_new0(virtDBusGDBusThreadData, 1);
+
+ g_assert(data);
+
+ data->objectPath = objectPath;
+ data->interfaceName = interfaceName;
+ data->methodName = methodName;
+ data->parameters = parameters;
+ data->invocation = invocation;
+ data->methodData = userData;
+
+ g_thread_pool_push(threadPool, data, NULL);
}
static const GDBusInterfaceVTable virtDBusGDBusVtable = {
@@ -396,3 +429,25 @@ virtDBusGDBusRegisterSubtree(GDBusConnection *bus,
virtDBusGDBusSubtreeDataFree,
NULL);
}
+
+/**
+ * virtDBusGDBusPrepareThreadPool:
+ * @maxThreads: the number of maximum threads in thread pool
+ * @error: return location for error or NULL
+ *
+ * Initializes thread pool to be used to process D-Bus messages.
+ *
+ * Returns TRUE on success, FALSE on error and sets @error.
+ */
+gboolean
+virtDBusGDBusPrepareThreadPool(gint maxThreads,
+ GError **error)
+{
+ threadPool = g_thread_pool_new(virtDBusGDBusMethodCallThread,
+ NULL,
+ maxThreads,
+ FALSE,
+ error);
+
+ return !!threadPool;
+}
diff --git a/src/gdbus.h b/src/gdbus.h
index 6c1a8a3..5ab9cd0 100644
--- a/src/gdbus.h
+++ b/src/gdbus.h
@@ -104,5 +104,9 @@ virtDBusGDBusRegisterSubtree(GDBusConnection *bus,
virtDBusGDBusPropertyTable *properties,
gpointer userData);
+gboolean
+virtDBusGDBusPrepareThreadPool(gint maxThreads,
+ GError **error);
+
G_DEFINE_AUTO_CLEANUP_FREE_FUNC(virtDBusGDBusSource, g_source_remove, 0);
G_DEFINE_AUTO_CLEANUP_FREE_FUNC(virtDBusGDBusOwner, g_bus_unown_name, 0);
diff --git a/src/main.c b/src/main.c
index bbbec09..7afde4f 100644
--- a/src/main.c
+++ b/src/main.c
@@ -89,11 +89,14 @@ virtDBusRegisterDataFree(virtDBusRegisterData *data)
}
G_DEFINE_AUTO_CLEANUP_CLEAR_FUNC(virtDBusRegisterData, virtDBusRegisterDataFree);
+#define VIRT_DBUS_MAX_THREADS 4
+
int
main(gint argc, gchar *argv[])
{
static gboolean systemOpt = FALSE;
static gboolean sessionOpt = FALSE;
+ static gint maxThreads = VIRT_DBUS_MAX_THREADS;
GBusType busType;
g_auto(virtDBusGDBusSource) sigintSource = 0;
g_auto(virtDBusGDBusSource) sigtermSource = 0;
@@ -108,6 +111,8 @@ main(gint argc, gchar *argv[])
"Connect to the system bus", NULL },
{ "session", 0, 0, G_OPTION_ARG_NONE, &sessionOpt,
"Connect to the session bus", NULL },
+ { "threads", 't', 0, G_OPTION_ARG_INT, &maxThreads,
+ "Configure maximal number of worker threads", "N" },
{ NULL }
};
@@ -145,6 +150,11 @@ main(gint argc, gchar *argv[])
}
data.connectList = g_new0(virtDBusConnect *, data.ndrivers + 1);
+ if (!virtDBusGDBusPrepareThreadPool(maxThreads, &error)) {
+ g_printerr("%s\n", error->message);
+ exit(EXIT_FAILURE);
+ }
+
loop = g_main_loop_new(NULL, FALSE);
sigtermSource = g_unix_signal_add(SIGTERM,
--
2.14.3