Create a mechanism to allow the domain/server quit code to be able
to cause any pending jobs to be be purged and request current workers
to quit.
Signed-off-by: John Ferlan <jferlan(a)redhat.com>
---
src/libvirt_private.syms | 1 +
src/util/virthreadpool.c | 64 ++++++++++++++++++++++++++++++++++++++++--------
src/util/virthreadpool.h | 2 ++
3 files changed, 57 insertions(+), 10 deletions(-)
diff --git a/src/libvirt_private.syms b/src/libvirt_private.syms
index bc8cc1fba..6ffceb46b 100644
--- a/src/libvirt_private.syms
+++ b/src/libvirt_private.syms
@@ -2865,6 +2865,7 @@ virThreadJobSetWorker;
# util/virthreadpool.h
+virThreadPoolDrain;
virThreadPoolFree;
virThreadPoolGetCurrentWorkers;
virThreadPoolGetFreeWorkers;
diff --git a/src/util/virthreadpool.c b/src/util/virthreadpool.c
index 10f2bd2c3..0baa05d12 100644
--- a/src/util/virthreadpool.c
+++ b/src/util/virthreadpool.c
@@ -30,9 +30,12 @@
#include "viralloc.h"
#include "virthread.h"
#include "virerror.h"
+#include "virlog.h"
#define VIR_FROM_THIS VIR_FROM_NONE
+VIR_LOG_INIT("util.threadpool");
+
typedef struct _virThreadPoolJob virThreadPoolJob;
typedef virThreadPoolJob *virThreadPoolJobPtr;
@@ -93,6 +96,24 @@ static inline bool virThreadPoolWorkerQuitHelper(size_t count, size_t
limit)
return count > limit;
}
+
+static void
+virThreadPoolJobRemove(virThreadPoolPtr pool,
+ virThreadPoolJobPtr job)
+{
+ if (job->prev)
+ job->prev->next = job->next;
+ else
+ pool->jobList.head = job->next;
+ if (job->next)
+ job->next->prev = job->prev;
+ else
+ pool->jobList.tail = job->prev;
+
+ pool->jobQueueDepth--;
+}
+
+
static void virThreadPoolWorker(void *opaque)
{
struct virThreadPoolWorkerData *data = opaque;
@@ -152,16 +173,7 @@ static void virThreadPoolWorker(void *opaque)
pool->jobList.firstPrio = tmp;
}
- if (job->prev)
- job->prev->next = job->next;
- else
- pool->jobList.head = job->next;
- if (job->next)
- job->next->prev = job->prev;
- else
- pool->jobList.tail = job->prev;
-
- pool->jobQueueDepth--;
+ virThreadPoolJobRemove(pool, job);
virMutexUnlock(&pool->mutex);
(pool->jobFunc)(job->data, pool->jobOpaque);
@@ -307,6 +319,38 @@ void virThreadPoolFree(virThreadPoolPtr pool)
}
+/*
+ * virThreadPoolDrain:
+ * @pool: Pointer to thread pool
+ *
+ * Cause any pending job to be purged and notify the current workers
+ * of the impending quit.
+ */
+void
+virThreadPoolDrain(virThreadPoolPtr pool)
+{
+ virMutexLock(&pool->mutex);
+
+ VIR_DEBUG("nWorkers=%zd, nPrioWorkers=%zd jobQueueDepth=%zd",
+ pool->nWorkers, pool->nPrioWorkers, pool->jobQueueDepth);
+
+ while (pool->jobList.head != pool->jobList.tail) {
+ virThreadPoolJobPtr job = pool->jobList.head;
+
+ virThreadPoolJobRemove(pool, job);
+ VIR_FREE(job);
+ }
+
+ pool->quit = true;
+ if (pool->nWorkers > 0)
+ virCondBroadcast(&pool->cond);
+ if (pool->nPrioWorkers > 0)
+ virCondBroadcast(&pool->prioCond);
+
+ virMutexUnlock(&pool->mutex);
+}
+
+
size_t virThreadPoolGetMinWorkers(virThreadPoolPtr pool)
{
size_t ret;
diff --git a/src/util/virthreadpool.h b/src/util/virthreadpool.h
index e1f362f5b..c54b166b1 100644
--- a/src/util/virthreadpool.h
+++ b/src/util/virthreadpool.h
@@ -52,6 +52,8 @@ size_t virThreadPoolGetJobQueueDepth(virThreadPoolPtr pool);
void virThreadPoolFree(virThreadPoolPtr pool);
+void virThreadPoolDrain(virThreadPoolPtr pool);
+
int virThreadPoolSendJob(virThreadPoolPtr pool,
unsigned int priority,
void *jobdata) ATTRIBUTE_NONNULL(1)
--
2.13.6