With the too-many-rebase-add-change and the additional comment
explaining why currently only one worker must be used
Reviewed-by: Boris Fiuczynski <fiuczy(a)linux.ibm.com>
On 4/19/24 16:49, Marc Hartmayer wrote:
Use a worker pool for processing the events (e.g. udev, mdevctl
config changes)
and the initialization instead of a separate initThread and a mdevctl-thread.
This has the large advantage that we can leverage the job API and now this
thread pool is responsible to do all the "costly-work" and emitting the
libvirt
nodedev events.
Signed-off-by: Marc Hartmayer <mhartmay(a)linux.ibm.com>
---
src/node_device/node_device_udev.c | 244 +++++++++++++++++++++--------
1 file changed, 179 insertions(+), 65 deletions(-)
diff --git a/src/node_device/node_device_udev.c b/src/node_device/node_device_udev.c
index e4b1532dc385..67a8b5cd7132 100644
--- a/src/node_device/node_device_udev.c
+++ b/src/node_device/node_device_udev.c
@@ -43,6 +43,7 @@
#include "virnetdev.h"
#include "virmdev.h"
#include "virutil.h"
+#include "virthreadpool.h"
#include "configmake.h"
@@ -69,13 +70,13 @@ struct _udevEventData {
bool udevThreadQuit;
bool udevDataReady;
- /* init thread */
- virThread *initThread;
-
/* Protects @mdevctlMonitors */
virMutex mdevctlLock;
GList *mdevctlMonitors;
int mdevctlTimeout;
+
+ /* Immutable pointer, self-locking APIs */
+ virThreadPool *workerPool;
};
static virClass *udevEventDataClass;
@@ -86,8 +87,6 @@ udevEventDataDispose(void *obj)
struct udev *udev = NULL;
udevEventData *priv = obj;
- g_clear_pointer(&priv->initThread, g_free);
-
VIR_WITH_MUTEX_LOCK_GUARD(&priv->mdevctlLock) {
g_list_free_full(g_steal_pointer(&priv->mdevctlMonitors),
g_object_unref);
}
@@ -100,6 +99,8 @@ udevEventDataDispose(void *obj)
udev_unref(udev);
}
+ g_clear_pointer(&priv->workerPool, virThreadPoolFree);
+
virMutexDestroy(&priv->mdevctlLock);
virCondDestroy(&priv->udevThreadCond);
@@ -143,6 +144,66 @@ udevEventDataNew(void)
return ret;
}
+typedef enum {
+ NODE_DEVICE_EVENT_INIT = 0,
+ NODE_DEVICE_EVENT_UDEV_ADD,
+ NODE_DEVICE_EVENT_UDEV_REMOVE,
+ NODE_DEVICE_EVENT_UDEV_CHANGE,
+ NODE_DEVICE_EVENT_UDEV_MOVE,
+ NODE_DEVICE_EVENT_MDEVCTL_CONFIG_CHANGED,
+
+ NODE_DEVICE_EVENT_LAST
+} nodeDeviceEventType;
+
+struct _nodeDeviceEvent {
+ nodeDeviceEventType eventType;
+ void *data;
+ virFreeCallback dataFreeFunc;
+};
+typedef struct _nodeDeviceEvent nodeDeviceEvent;
+
+static void
+nodeDeviceEventFree(nodeDeviceEvent *event)
+{
+ if (!event)
+ return;
+
+ if (event->dataFreeFunc)
+ event->dataFreeFunc(event->data);
+ g_free(event);
+}
+G_DEFINE_AUTOPTR_CLEANUP_FUNC(nodeDeviceEvent, nodeDeviceEventFree);
+
+ /**
+ * nodeDeviceEventSubmit:
+ * @eventType: the event to be processed
+ * @data: additional data for the event processor (the pointer is stolen and it
+ * will be properly freed using @dataFreeFunc)
+ * @dataFreeFunc: callback to free @data
+ *
+ * Submits @eventType to be processed by the asynchronous event handling
+ * thread.
+ */
+static int nodeDeviceEventSubmit(nodeDeviceEventType eventType, void *data,
virFreeCallback dataFreeFunc)
+{
+ nodeDeviceEvent *event = g_new0(nodeDeviceEvent, 1);
+ udevEventData *priv = NULL;
+
+ if (!driver)
+ return -1;
+
+ priv = driver->privateData;
+
+ event->eventType = eventType;
+ event->data = data;
+ event->dataFreeFunc = dataFreeFunc;
+ if (virThreadPoolSendJob(priv->workerPool, 0, event) < 0) {
+ nodeDeviceEventFree(event);
+ return -1;
+ }
+ return 0;
+}
+
static bool
udevHasDeviceProperty(struct udev_device *dev,
@@ -1447,7 +1508,7 @@ static void scheduleMdevctlUpdate(udevEventData *data, bool
force);
static int
-udevRemoveOneDeviceSysPath(virNodeDeviceDriverState *driver_state, const char *path)
+processNodeDeviceRemoveEvent(virNodeDeviceDriverState *driver_state, const char *path)
{
virNodeDeviceObj *obj = NULL;
virNodeDeviceDef *def;
@@ -1529,7 +1590,7 @@ udevSetParent(virNodeDeviceDriverState *driver_state, struct
udev_device *device
}
static int
-udevAddOneDevice(virNodeDeviceDriverState *driver_state, struct udev_device *device)
+processNodeDeviceAddAndChangeEvent(virNodeDeviceDriverState *driver_state, struct
udev_device *device)
{
g_autofree char *sysfs_path = NULL;
virNodeDeviceDef *def = NULL;
@@ -1647,7 +1708,7 @@ udevProcessDeviceListEntry(virNodeDeviceDriverState *driver_state,
struct udev *
device = udev_device_new_from_syspath(udev, name);
if (device != NULL) {
- if (udevAddOneDevice(driver_state, device) != 0) {
+ if (processNodeDeviceAddAndChangeEvent(driver_state, device) != 0) {
VIR_DEBUG("Failed to create node device for udev device
'%s'",
name);
}
@@ -1755,26 +1816,23 @@ udevHandleOneDevice(struct udev_device *device)
VIR_DEBUG("udev action: '%s': %s", action,
udev_device_get_syspath(device));
- if (STREQ(action, "add") || STREQ(action, "change"))
- return udevAddOneDevice(driver, device);
-
- if (STREQ(action, "remove")) {
- const char *path = udev_device_get_syspath(device);
-
- return udevRemoveOneDeviceSysPath(driver, path);
- }
-
- if (STREQ(action, "move")) {
- const char *devpath_old = udevGetDeviceProperty(device,
"DEVPATH_OLD");
-
- if (devpath_old) {
- g_autofree char *devpath_old_fixed = g_strdup_printf("/sys%s",
devpath_old);
-
- udevRemoveOneDeviceSysPath(driver, devpath_old_fixed);
- }
-
- return udevAddOneDevice(driver, device);
+ /* Reference is either released via workerpool logic or at the end of this
+ * function. */
+ device = udev_device_ref(device);
+ if (STREQ(action, "add")) {
+ return nodeDeviceEventSubmit(NODE_DEVICE_EVENT_UDEV_ADD, device,
+ (virFreeCallback)udev_device_unref);
+ } else if (STREQ(action, "change")) {
+ return nodeDeviceEventSubmit(NODE_DEVICE_EVENT_UDEV_CHANGE, device,
+ (virFreeCallback)udev_device_unref);
+ } else if (STREQ(action, "remove")) {
+ return nodeDeviceEventSubmit(NODE_DEVICE_EVENT_UDEV_REMOVE, device,
+ (virFreeCallback)udev_device_unref);
+ } else if (STREQ(action, "move")) {
+ return nodeDeviceEventSubmit(NODE_DEVICE_EVENT_UDEV_MOVE, device,
+ (virFreeCallback)udev_device_unref);
}
+ udev_device_unref(device);
return 0;
}
@@ -1993,23 +2051,23 @@ udevSetupSystemDev(void)
static void
-nodeStateInitializeEnumerate(void *opaque)
+processNodeStateInitializeEnumerate(virNodeDeviceDriverState *driver_state, void
*opaque)
{
- udevEventData *priv = driver->privateData;
+ udevEventData *priv = driver_state->privateData;
struct udev *udev = opaque;
/* Populate with known devices */
- if (udevEnumerateDevices(driver, udev) != 0)
+ if (udevEnumerateDevices(driver_state, udev) != 0)
goto error;
/* Load persistent mdevs (which might not be activated yet) and additional
* information about active mediated devices from mdevctl */
- if (nodeDeviceUpdateMediatedDevices(driver) != 0)
+ if (nodeDeviceUpdateMediatedDevices(driver_state) != 0)
goto error;
cleanup:
- VIR_WITH_MUTEX_LOCK_GUARD(&driver->lock) {
- driver->initialized = true;
- virCondBroadcast(&driver->initCond);
+ VIR_WITH_MUTEX_LOCK_GUARD(&driver_state->lock) {
+ driver_state->initialized = true;
+ virCondBroadcast(&driver_state->initCond);
}
return;
@@ -2051,31 +2109,16 @@ udevPCITranslateInit(bool privileged G_GNUC_UNUSED)
static void
-mdevctlUpdateThreadFunc(void *opaque)
-{
- virNodeDeviceDriverState *driver_state = opaque;
-
- if (nodeDeviceUpdateMediatedDevices(driver_state) < 0)
- VIR_WARN("mdevctl failed to update mediated devices");
-}
-
-
-static void
-launchMdevctlUpdateThread(int timer G_GNUC_UNUSED, void *opaque)
+submitMdevctlUpdate(int timer G_GNUC_UNUSED, void *opaque)
{
udevEventData *priv = opaque;
- virThread thread;
if (priv->mdevctlTimeout != -1) {
virEventRemoveTimeout(priv->mdevctlTimeout);
priv->mdevctlTimeout = -1;
}
- if (virThreadCreateFull(&thread, false, mdevctlUpdateThreadFunc,
- "mdevctl-thread", false, driver) < 0) {
- virReportSystemError(errno, "%s",
- _("failed to create mdevctl thread"));
- }
+ nodeDeviceEventSubmit(NODE_DEVICE_EVENT_MDEVCTL_CONFIG_CHANGED, NULL, NULL);
}
@@ -2170,7 +2213,7 @@ mdevctlEnableMonitor(udevEventData *priv)
/* Schedules an mdevctl update for 100ms in the future, canceling any existing
* timeout that may have been set. In this way, multiple update requests in
* quick succession can be collapsed into a single update. if @force is true,
- * an update thread will be spawned immediately. */
+ * the worker job is submitted immediately. */
static void
scheduleMdevctlUpdate(udevEventData *data,
bool force)
@@ -2178,12 +2221,12 @@ scheduleMdevctlUpdate(udevEventData *data,
if (!force) {
if (data->mdevctlTimeout != -1)
virEventRemoveTimeout(data->mdevctlTimeout);
- data->mdevctlTimeout = virEventAddTimeout(100, launchMdevctlUpdateThread,
+ data->mdevctlTimeout = virEventAddTimeout(100, submitMdevctlUpdate,
data, NULL);
return;
}
- launchMdevctlUpdateThread(-1, data);
+ submitMdevctlUpdate(-1, data);
}
@@ -2223,6 +2266,62 @@ mdevctlEventHandleCallback(GFileMonitor *monitor G_GNUC_UNUSED,
}
+static void nodeDeviceEventHandler(void *data, void *opaque)
+{
+ virNodeDeviceDriverState *driver_state = opaque;
+ g_autoptr(nodeDeviceEvent) processEvent = data;
+
+ switch (processEvent->eventType) {
+ case NODE_DEVICE_EVENT_INIT:
+ {
+ struct udev *udev = processEvent->data;
+
+ processNodeStateInitializeEnumerate(driver_state, udev);
+ }
+ break;
+ case NODE_DEVICE_EVENT_UDEV_ADD:
+ case NODE_DEVICE_EVENT_UDEV_CHANGE:
+ {
+ struct udev_device *device = processEvent->data;
+
+ processNodeDeviceAddAndChangeEvent(driver_state, device);
+ }
+ break;
+ case NODE_DEVICE_EVENT_UDEV_REMOVE:
+ {
+ struct udev_device *device = processEvent->data;
+ const char *path = udev_device_get_syspath(device);
+
+ processNodeDeviceRemoveEvent(driver_state, path);
+ }
+ break;
+ case NODE_DEVICE_EVENT_UDEV_MOVE:
+ {
+ struct udev_device *device = processEvent->data;
+ const char *devpath_old = udevGetDeviceProperty(device,
"DEVPATH_OLD");
+
+ if (devpath_old) {
+ g_autofree char *devpath_old_fixed = g_strdup_printf("/sys%s",
devpath_old);
+
+ processNodeDeviceRemoveEvent(driver_state, devpath_old_fixed);
+ }
+
+ processNodeDeviceAddAndChangeEvent(driver_state, device);
+ }
+ break;
+ case NODE_DEVICE_EVENT_MDEVCTL_CONFIG_CHANGED:
+ {
+ if (nodeDeviceUpdateMediatedDevices(driver_state) < 0)
+ VIR_WARN("mdevctl failed to update mediated devices");
+ }
+ break;
+ case NODE_DEVICE_EVENT_LAST:
+ g_assert_not_reached();
+ break;
+ }
+}
+
+
/* Note: It must be safe to call this function even if the driver was not
* successfully initialized. This must be considered when changing this
* function. */
@@ -2258,6 +2357,9 @@ nodeStateShutdownPrepare(void)
priv->udevThreadQuit = true;
virCondSignal(&priv->udevThreadCond);
}
+
+ if (priv->workerPool)
+ virThreadPoolStop(priv->workerPool);
return 0;
}
@@ -2278,11 +2380,19 @@ nodeStateShutdownWait(void)
return 0;
VIR_WITH_OBJECT_LOCK_GUARD(priv) {
- if (priv->initThread)
- virThreadJoin(priv->initThread);
- if (priv->udevThread)
- virThreadJoin(priv->udevThread);
+ if (priv->mdevctlTimeout != -1) {
+ virEventRemoveTimeout(priv->mdevctlTimeout);
+ priv->mdevctlTimeout = -1;
+ }
+
+ if (priv->watch) {
+ virEventRemoveHandle(priv->watch);
+ priv->watch = -1;
+ }
}
+
+ if (priv->workerPool)
+ virThreadPoolDrain(priv->workerPool);
return 0;
}
@@ -2353,6 +2463,16 @@ nodeStateInitialize(bool privileged,
driver->parserCallbacks.postParse = nodeDeviceDefPostParse;
driver->parserCallbacks.validate = nodeDeviceDefValidate;
+ /* must be initialized before trying to reconnect to all the running mdevs
+ * since there might occur some mdevctl monitor events that will be
+ * dispatched to the worker pool */
Add the important information that the current implementation supports
the use of one worker only to ensure the order of udev and libvirt
events remains the same.
+ priv->workerPool = virThreadPoolNewFull(1, 1, 0,
nodeDeviceEventHandler,
+ "nodev-device-event",
+ NULL,
+ driver);
+ if (!priv->workerPool)
+ goto unlock;
+
if (udevPCITranslateInit(privileged) < 0)
goto unlock;
@@ -2410,13 +2530,7 @@ nodeStateInitialize(bool privileged,
if (udevSetupSystemDev() != 0)
goto cleanup;
- priv->initThread = g_new0(virThread, 1);
- if (virThreadCreateFull(priv->initThread, true, nodeStateInitializeEnumerate,
- "nodedev-init", false, udev) < 0) {
- virReportSystemError(errno, "%s",
- _("failed to create udev enumerate thread"));
- goto cleanup;
- }
+ nodeDeviceEventSubmit(NODE_DEVICE_EVENT_INIT, udev_ref(udev),
(virFreeCallback)udev_unref);
return VIR_DRV_STATE_INIT_COMPLETE;
--
Mit freundlichen Grüßen/Kind regards
Boris Fiuczynski
IBM Deutschland Research & Development GmbH
Vorsitzender des Aufsichtsrats: Wolfgang Wendt
Geschäftsführung: David Faller
Sitz der Gesellschaft: Böblingen
Registergericht: Amtsgericht Stuttgart, HRB 243294