From: "Daniel P. Berrange" <berrange(a)redhat.com>
---
src/Makefile.am | 2 +-
src/nwfilter/nwfilter_dhcpsnoop.c | 2 +-
src/qemu/qemu_conf.h | 2 +-
src/qemu/qemu_driver.c | 2 +-
src/rpc/virnetserver.c | 2 +-
src/util/threadpool.c | 371 --------------------------------------
src/util/threadpool.h | 53 ------
src/util/virthreadpool.c | 371 ++++++++++++++++++++++++++++++++++++++
src/util/virthreadpool.h | 53 ++++++
9 files changed, 429 insertions(+), 429 deletions(-)
delete mode 100644 src/util/threadpool.c
delete mode 100644 src/util/threadpool.h
create mode 100644 src/util/virthreadpool.c
create mode 100644 src/util/virthreadpool.h
diff --git a/src/Makefile.am b/src/Makefile.am
index 376c543..e74a3a3 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -56,7 +56,6 @@ UTIL_SOURCES = \
util/threads.c util/threads.h \
util/threads-pthread.h \
util/threads-win32.h \
- util/threadpool.c util/threadpool.h \
util/uuid.c util/uuid.h \
util/util.c util/util.h \
util/viralloc.c util/viralloc.h \
@@ -86,6 +85,7 @@ UTIL_SOURCES = \
util/virstatslinux.c util/virstatslinux.h \
util/virstoragefile.c util/virstoragefile.h \
util/virsysinfo.c util/virsysinfo.h \
+ util/virthreadpool.c util/virthreadpool.h \
util/virtypedparam.c util/virtypedparam.h \
util/xml.c util/xml.h \
util/virterror.c util/virterror_internal.h \
diff --git a/src/nwfilter/nwfilter_dhcpsnoop.c b/src/nwfilter/nwfilter_dhcpsnoop.c
index a798e95..c1ab622 100644
--- a/src/nwfilter/nwfilter_dhcpsnoop.c
+++ b/src/nwfilter/nwfilter_dhcpsnoop.c
@@ -65,7 +65,7 @@
#include "virnetdev.h"
#include "virfile.h"
#include "viratomic.h"
-#include "threadpool.h"
+#include "virthreadpool.h"
#include "configmake.h"
#include "virtime.h"
diff --git a/src/qemu/qemu_conf.h b/src/qemu/qemu_conf.h
index f928c29..0d4816e 100644
--- a/src/qemu/qemu_conf.h
+++ b/src/qemu/qemu_conf.h
@@ -41,7 +41,7 @@
# include "driver.h"
# include "virbitmap.h"
# include "vircommand.h"
-# include "threadpool.h"
+# include "virthreadpool.h"
# include "locking/lock_manager.h"
# include "qemu_capabilities.h"
diff --git a/src/qemu/qemu_driver.c b/src/qemu/qemu_driver.c
index 0038d95..170f15d 100644
--- a/src/qemu/qemu_driver.c
+++ b/src/qemu/qemu_driver.c
@@ -85,7 +85,7 @@
#include "virfile.h"
#include "fdstream.h"
#include "configmake.h"
-#include "threadpool.h"
+#include "virthreadpool.h"
#include "locking/lock_manager.h"
#include "locking/domain_lock.h"
#include "virkeycode.h"
diff --git a/src/rpc/virnetserver.c b/src/rpc/virnetserver.c
index 67cd4b5..26ceb0c 100644
--- a/src/rpc/virnetserver.c
+++ b/src/rpc/virnetserver.c
@@ -32,7 +32,7 @@
#include "viralloc.h"
#include "virterror_internal.h"
#include "threads.h"
-#include "threadpool.h"
+#include "virthreadpool.h"
#include "util.h"
#include "virfile.h"
#include "virnetservermdns.h"
diff --git a/src/util/threadpool.c b/src/util/threadpool.c
deleted file mode 100644
index 9d3d5d2..0000000
--- a/src/util/threadpool.c
+++ /dev/null
@@ -1,371 +0,0 @@
-/*
- * threadpool.c: a generic thread pool implementation
- *
- * Copyright (C) 2010 Hu Tao
- * Copyright (C) 2010 Daniel P. Berrange
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library. If not, see
- * <
http://www.gnu.org/licenses/>.
- *
- * Authors:
- * Hu Tao <hutao(a)cn.fujitsu.com>
- * Daniel P. Berrange <berrange(a)redhat.com>
- */
-
-#include <config.h>
-
-#include "threadpool.h"
-#include "viralloc.h"
-#include "threads.h"
-#include "virterror_internal.h"
-
-#define VIR_FROM_THIS VIR_FROM_NONE
-
-typedef struct _virThreadPoolJob virThreadPoolJob;
-typedef virThreadPoolJob *virThreadPoolJobPtr;
-
-struct _virThreadPoolJob {
- virThreadPoolJobPtr prev;
- virThreadPoolJobPtr next;
- unsigned int priority;
-
- void *data;
-};
-
-typedef struct _virThreadPoolJobList virThreadPoolJobList;
-typedef virThreadPoolJobList *virThreadPoolJobListPtr;
-
-struct _virThreadPoolJobList {
- virThreadPoolJobPtr head;
- virThreadPoolJobPtr tail;
- virThreadPoolJobPtr firstPrio;
-};
-
-
-struct _virThreadPool {
- bool quit;
-
- virThreadPoolJobFunc jobFunc;
- void *jobOpaque;
- virThreadPoolJobList jobList;
- size_t jobQueueDepth;
-
- virMutex mutex;
- virCond cond;
- virCond quit_cond;
-
- size_t maxWorkers;
- size_t minWorkers;
- size_t freeWorkers;
- size_t nWorkers;
- virThreadPtr workers;
-
- size_t nPrioWorkers;
- virThreadPtr prioWorkers;
- virCond prioCond;
-};
-
-struct virThreadPoolWorkerData {
- virThreadPoolPtr pool;
- virCondPtr cond;
- bool priority;
-};
-
-static void virThreadPoolWorker(void *opaque)
-{
- struct virThreadPoolWorkerData *data = opaque;
- virThreadPoolPtr pool = data->pool;
- virCondPtr cond = data->cond;
- bool priority = data->priority;
- virThreadPoolJobPtr job = NULL;
-
- VIR_FREE(data);
-
- virMutexLock(&pool->mutex);
-
- while (1) {
- while (!pool->quit &&
- ((!priority && !pool->jobList.head) ||
- (priority && !pool->jobList.firstPrio))) {
- if (!priority)
- pool->freeWorkers++;
- if (virCondWait(cond, &pool->mutex) < 0) {
- if (!priority)
- pool->freeWorkers--;
- goto out;
- }
- if (!priority)
- pool->freeWorkers--;
- }
-
- if (pool->quit)
- break;
-
- if (priority) {
- job = pool->jobList.firstPrio;
- } else {
- job = pool->jobList.head;
- }
-
- if (job == pool->jobList.firstPrio) {
- virThreadPoolJobPtr tmp = job->next;
- while (tmp) {
- if (tmp->priority) {
- break;
- }
- tmp = tmp->next;
- }
- pool->jobList.firstPrio = tmp;
- }
-
- if (job->prev)
- job->prev->next = job->next;
- else
- pool->jobList.head = job->next;
- if (job->next)
- job->next->prev = job->prev;
- else
- pool->jobList.tail = job->prev;
-
- pool->jobQueueDepth--;
-
- virMutexUnlock(&pool->mutex);
- (pool->jobFunc)(job->data, pool->jobOpaque);
- VIR_FREE(job);
- virMutexLock(&pool->mutex);
- }
-
-out:
- if (priority)
- pool->nPrioWorkers--;
- else
- pool->nWorkers--;
- if (pool->nWorkers == 0 && pool->nPrioWorkers==0)
- virCondSignal(&pool->quit_cond);
- virMutexUnlock(&pool->mutex);
-}
-
-virThreadPoolPtr virThreadPoolNew(size_t minWorkers,
- size_t maxWorkers,
- size_t prioWorkers,
- virThreadPoolJobFunc func,
- void *opaque)
-{
- virThreadPoolPtr pool;
- size_t i;
- struct virThreadPoolWorkerData *data = NULL;
-
- if (minWorkers > maxWorkers)
- minWorkers = maxWorkers;
-
- if (VIR_ALLOC(pool) < 0) {
- virReportOOMError();
- return NULL;
- }
-
- pool->jobList.tail = pool->jobList.head = NULL;
-
- pool->jobFunc = func;
- pool->jobOpaque = opaque;
-
- if (virMutexInit(&pool->mutex) < 0)
- goto error;
- if (virCondInit(&pool->cond) < 0)
- goto error;
- if (virCondInit(&pool->quit_cond) < 0)
- goto error;
-
- if (VIR_ALLOC_N(pool->workers, minWorkers) < 0)
- goto error;
-
- pool->minWorkers = minWorkers;
- pool->maxWorkers = maxWorkers;
-
- for (i = 0; i < minWorkers; i++) {
- if (VIR_ALLOC(data) < 0) {
- virReportOOMError();
- goto error;
- }
- data->pool = pool;
- data->cond = &pool->cond;
-
- if (virThreadCreate(&pool->workers[i],
- true,
- virThreadPoolWorker,
- data) < 0) {
- goto error;
- }
- pool->nWorkers++;
- }
-
- if (prioWorkers) {
- if (virCondInit(&pool->prioCond) < 0)
- goto error;
- if (VIR_ALLOC_N(pool->prioWorkers, prioWorkers) < 0)
- goto error;
-
- for (i = 0; i < prioWorkers; i++) {
- if (VIR_ALLOC(data) < 0) {
- virReportOOMError();
- goto error;
- }
- data->pool = pool;
- data->cond = &pool->prioCond;
- data->priority = true;
-
- if (virThreadCreate(&pool->prioWorkers[i],
- true,
- virThreadPoolWorker,
- data) < 0) {
- goto error;
- }
- pool->nPrioWorkers++;
- }
- }
-
- return pool;
-
-error:
- VIR_FREE(data);
- virThreadPoolFree(pool);
- return NULL;
-
-}
-
-void virThreadPoolFree(virThreadPoolPtr pool)
-{
- virThreadPoolJobPtr job;
- bool priority = false;
-
- if (!pool)
- return;
-
- virMutexLock(&pool->mutex);
- pool->quit = true;
- if (pool->nWorkers > 0)
- virCondBroadcast(&pool->cond);
- if (pool->nPrioWorkers > 0) {
- priority = true;
- virCondBroadcast(&pool->prioCond);
- }
-
- while (pool->nWorkers > 0 || pool->nPrioWorkers > 0)
- ignore_value(virCondWait(&pool->quit_cond, &pool->mutex));
-
- while ((job = pool->jobList.head)) {
- pool->jobList.head = pool->jobList.head->next;
- VIR_FREE(job);
- }
-
- VIR_FREE(pool->workers);
- virMutexUnlock(&pool->mutex);
- virMutexDestroy(&pool->mutex);
- ignore_value(virCondDestroy(&pool->quit_cond));
- ignore_value(virCondDestroy(&pool->cond));
- if (priority) {
- VIR_FREE(pool->prioWorkers);
- ignore_value(virCondDestroy(&pool->prioCond));
- }
- VIR_FREE(pool);
-}
-
-
-size_t virThreadPoolGetMinWorkers(virThreadPoolPtr pool)
-{
- return pool->minWorkers;
-}
-
-size_t virThreadPoolGetMaxWorkers(virThreadPoolPtr pool)
-{
- return pool->maxWorkers;
-}
-
-size_t virThreadPoolGetPriorityWorkers(virThreadPoolPtr pool)
-{
- return pool->nPrioWorkers;
-}
-
-/*
- * @priority - job priority
- * Return: 0 on success, -1 otherwise
- */
-int virThreadPoolSendJob(virThreadPoolPtr pool,
- unsigned int priority,
- void *jobData)
-{
- virThreadPoolJobPtr job;
- struct virThreadPoolWorkerData *data = NULL;
-
- virMutexLock(&pool->mutex);
- if (pool->quit)
- goto error;
-
- if (pool->freeWorkers - pool->jobQueueDepth <= 0 &&
- pool->nWorkers < pool->maxWorkers) {
- if (VIR_EXPAND_N(pool->workers, pool->nWorkers, 1) < 0) {
- virReportOOMError();
- goto error;
- }
-
- if (VIR_ALLOC(data) < 0) {
- pool->nWorkers--;
- virReportOOMError();
- goto error;
- }
-
- data->pool = pool;
- data->cond = &pool->cond;
-
- if (virThreadCreate(&pool->workers[pool->nWorkers - 1],
- true,
- virThreadPoolWorker,
- data) < 0) {
- VIR_FREE(data);
- pool->nWorkers--;
- goto error;
- }
- }
-
- if (VIR_ALLOC(job) < 0) {
- virReportOOMError();
- goto error;
- }
-
- job->data = jobData;
- job->priority = priority;
-
- job->prev = pool->jobList.tail;
- if (pool->jobList.tail)
- pool->jobList.tail->next = job;
- pool->jobList.tail = job;
-
- if (!pool->jobList.head)
- pool->jobList.head = job;
-
- if (priority && !pool->jobList.firstPrio)
- pool->jobList.firstPrio = job;
-
- pool->jobQueueDepth++;
-
- virCondSignal(&pool->cond);
- if (priority)
- virCondSignal(&pool->prioCond);
-
- virMutexUnlock(&pool->mutex);
- return 0;
-
-error:
- virMutexUnlock(&pool->mutex);
- return -1;
-}
diff --git a/src/util/threadpool.h b/src/util/threadpool.h
deleted file mode 100644
index 4479647..0000000
--- a/src/util/threadpool.h
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * threadpool.h: a generic thread pool implementation
- *
- * Copyright (C) 2010 Hu Tao
- * Copyright (C) 2010 Daniel P. Berrange
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library. If not, see
- * <
http://www.gnu.org/licenses/>.
- *
- * Author:
- * Hu Tao <hutao(a)cn.fujitsu.com>
- * Daniel P. Berrange <berrange(a)redhat.com>
- */
-
-#ifndef __VIR_THREADPOOL_H__
-# define __VIR_THREADPOOL_H__
-
-# include "internal.h"
-
-typedef struct _virThreadPool virThreadPool;
-typedef virThreadPool *virThreadPoolPtr;
-
-typedef void (*virThreadPoolJobFunc)(void *jobdata, void *opaque);
-
-virThreadPoolPtr virThreadPoolNew(size_t minWorkers,
- size_t maxWorkers,
- size_t prioWorkers,
- virThreadPoolJobFunc func,
- void *opaque) ATTRIBUTE_NONNULL(4);
-
-size_t virThreadPoolGetMinWorkers(virThreadPoolPtr pool);
-size_t virThreadPoolGetMaxWorkers(virThreadPoolPtr pool);
-size_t virThreadPoolGetPriorityWorkers(virThreadPoolPtr pool);
-
-void virThreadPoolFree(virThreadPoolPtr pool);
-
-int virThreadPoolSendJob(virThreadPoolPtr pool,
- unsigned int priority,
- void *jobdata) ATTRIBUTE_NONNULL(1)
- ATTRIBUTE_RETURN_CHECK;
-
-#endif
diff --git a/src/util/virthreadpool.c b/src/util/virthreadpool.c
new file mode 100644
index 0000000..c13b078
--- /dev/null
+++ b/src/util/virthreadpool.c
@@ -0,0 +1,371 @@
+/*
+ * threadpool.c: a generic thread pool implementation
+ *
+ * Copyright (C) 2010 Hu Tao
+ * Copyright (C) 2010 Daniel P. Berrange
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library. If not, see
+ * <
http://www.gnu.org/licenses/>.
+ *
+ * Authors:
+ * Hu Tao <hutao(a)cn.fujitsu.com>
+ * Daniel P. Berrange <berrange(a)redhat.com>
+ */
+
+#include <config.h>
+
+#include "virthreadpool.h"
+#include "viralloc.h"
+#include "threads.h"
+#include "virterror_internal.h"
+
+#define VIR_FROM_THIS VIR_FROM_NONE
+
+typedef struct _virThreadPoolJob virThreadPoolJob;
+typedef virThreadPoolJob *virThreadPoolJobPtr;
+
+struct _virThreadPoolJob {
+ virThreadPoolJobPtr prev;
+ virThreadPoolJobPtr next;
+ unsigned int priority;
+
+ void *data;
+};
+
+typedef struct _virThreadPoolJobList virThreadPoolJobList;
+typedef virThreadPoolJobList *virThreadPoolJobListPtr;
+
+struct _virThreadPoolJobList {
+ virThreadPoolJobPtr head;
+ virThreadPoolJobPtr tail;
+ virThreadPoolJobPtr firstPrio;
+};
+
+
+struct _virThreadPool {
+ bool quit;
+
+ virThreadPoolJobFunc jobFunc;
+ void *jobOpaque;
+ virThreadPoolJobList jobList;
+ size_t jobQueueDepth;
+
+ virMutex mutex;
+ virCond cond;
+ virCond quit_cond;
+
+ size_t maxWorkers;
+ size_t minWorkers;
+ size_t freeWorkers;
+ size_t nWorkers;
+ virThreadPtr workers;
+
+ size_t nPrioWorkers;
+ virThreadPtr prioWorkers;
+ virCond prioCond;
+};
+
+struct virThreadPoolWorkerData {
+ virThreadPoolPtr pool;
+ virCondPtr cond;
+ bool priority;
+};
+
+static void virThreadPoolWorker(void *opaque)
+{
+ struct virThreadPoolWorkerData *data = opaque;
+ virThreadPoolPtr pool = data->pool;
+ virCondPtr cond = data->cond;
+ bool priority = data->priority;
+ virThreadPoolJobPtr job = NULL;
+
+ VIR_FREE(data);
+
+ virMutexLock(&pool->mutex);
+
+ while (1) {
+ while (!pool->quit &&
+ ((!priority && !pool->jobList.head) ||
+ (priority && !pool->jobList.firstPrio))) {
+ if (!priority)
+ pool->freeWorkers++;
+ if (virCondWait(cond, &pool->mutex) < 0) {
+ if (!priority)
+ pool->freeWorkers--;
+ goto out;
+ }
+ if (!priority)
+ pool->freeWorkers--;
+ }
+
+ if (pool->quit)
+ break;
+
+ if (priority) {
+ job = pool->jobList.firstPrio;
+ } else {
+ job = pool->jobList.head;
+ }
+
+ if (job == pool->jobList.firstPrio) {
+ virThreadPoolJobPtr tmp = job->next;
+ while (tmp) {
+ if (tmp->priority) {
+ break;
+ }
+ tmp = tmp->next;
+ }
+ pool->jobList.firstPrio = tmp;
+ }
+
+ if (job->prev)
+ job->prev->next = job->next;
+ else
+ pool->jobList.head = job->next;
+ if (job->next)
+ job->next->prev = job->prev;
+ else
+ pool->jobList.tail = job->prev;
+
+ pool->jobQueueDepth--;
+
+ virMutexUnlock(&pool->mutex);
+ (pool->jobFunc)(job->data, pool->jobOpaque);
+ VIR_FREE(job);
+ virMutexLock(&pool->mutex);
+ }
+
+out:
+ if (priority)
+ pool->nPrioWorkers--;
+ else
+ pool->nWorkers--;
+ if (pool->nWorkers == 0 && pool->nPrioWorkers==0)
+ virCondSignal(&pool->quit_cond);
+ virMutexUnlock(&pool->mutex);
+}
+
+virThreadPoolPtr virThreadPoolNew(size_t minWorkers,
+ size_t maxWorkers,
+ size_t prioWorkers,
+ virThreadPoolJobFunc func,
+ void *opaque)
+{
+ virThreadPoolPtr pool;
+ size_t i;
+ struct virThreadPoolWorkerData *data = NULL;
+
+ if (minWorkers > maxWorkers)
+ minWorkers = maxWorkers;
+
+ if (VIR_ALLOC(pool) < 0) {
+ virReportOOMError();
+ return NULL;
+ }
+
+ pool->jobList.tail = pool->jobList.head = NULL;
+
+ pool->jobFunc = func;
+ pool->jobOpaque = opaque;
+
+ if (virMutexInit(&pool->mutex) < 0)
+ goto error;
+ if (virCondInit(&pool->cond) < 0)
+ goto error;
+ if (virCondInit(&pool->quit_cond) < 0)
+ goto error;
+
+ if (VIR_ALLOC_N(pool->workers, minWorkers) < 0)
+ goto error;
+
+ pool->minWorkers = minWorkers;
+ pool->maxWorkers = maxWorkers;
+
+ for (i = 0; i < minWorkers; i++) {
+ if (VIR_ALLOC(data) < 0) {
+ virReportOOMError();
+ goto error;
+ }
+ data->pool = pool;
+ data->cond = &pool->cond;
+
+ if (virThreadCreate(&pool->workers[i],
+ true,
+ virThreadPoolWorker,
+ data) < 0) {
+ goto error;
+ }
+ pool->nWorkers++;
+ }
+
+ if (prioWorkers) {
+ if (virCondInit(&pool->prioCond) < 0)
+ goto error;
+ if (VIR_ALLOC_N(pool->prioWorkers, prioWorkers) < 0)
+ goto error;
+
+ for (i = 0; i < prioWorkers; i++) {
+ if (VIR_ALLOC(data) < 0) {
+ virReportOOMError();
+ goto error;
+ }
+ data->pool = pool;
+ data->cond = &pool->prioCond;
+ data->priority = true;
+
+ if (virThreadCreate(&pool->prioWorkers[i],
+ true,
+ virThreadPoolWorker,
+ data) < 0) {
+ goto error;
+ }
+ pool->nPrioWorkers++;
+ }
+ }
+
+ return pool;
+
+error:
+ VIR_FREE(data);
+ virThreadPoolFree(pool);
+ return NULL;
+
+}
+
+void virThreadPoolFree(virThreadPoolPtr pool)
+{
+ virThreadPoolJobPtr job;
+ bool priority = false;
+
+ if (!pool)
+ return;
+
+ virMutexLock(&pool->mutex);
+ pool->quit = true;
+ if (pool->nWorkers > 0)
+ virCondBroadcast(&pool->cond);
+ if (pool->nPrioWorkers > 0) {
+ priority = true;
+ virCondBroadcast(&pool->prioCond);
+ }
+
+ while (pool->nWorkers > 0 || pool->nPrioWorkers > 0)
+ ignore_value(virCondWait(&pool->quit_cond, &pool->mutex));
+
+ while ((job = pool->jobList.head)) {
+ pool->jobList.head = pool->jobList.head->next;
+ VIR_FREE(job);
+ }
+
+ VIR_FREE(pool->workers);
+ virMutexUnlock(&pool->mutex);
+ virMutexDestroy(&pool->mutex);
+ ignore_value(virCondDestroy(&pool->quit_cond));
+ ignore_value(virCondDestroy(&pool->cond));
+ if (priority) {
+ VIR_FREE(pool->prioWorkers);
+ ignore_value(virCondDestroy(&pool->prioCond));
+ }
+ VIR_FREE(pool);
+}
+
+
+size_t virThreadPoolGetMinWorkers(virThreadPoolPtr pool)
+{
+ return pool->minWorkers;
+}
+
+size_t virThreadPoolGetMaxWorkers(virThreadPoolPtr pool)
+{
+ return pool->maxWorkers;
+}
+
+size_t virThreadPoolGetPriorityWorkers(virThreadPoolPtr pool)
+{
+ return pool->nPrioWorkers;
+}
+
+/*
+ * @priority - job priority
+ * Return: 0 on success, -1 otherwise
+ */
+int virThreadPoolSendJob(virThreadPoolPtr pool,
+ unsigned int priority,
+ void *jobData)
+{
+ virThreadPoolJobPtr job;
+ struct virThreadPoolWorkerData *data = NULL;
+
+ virMutexLock(&pool->mutex);
+ if (pool->quit)
+ goto error;
+
+ if (pool->freeWorkers - pool->jobQueueDepth <= 0 &&
+ pool->nWorkers < pool->maxWorkers) {
+ if (VIR_EXPAND_N(pool->workers, pool->nWorkers, 1) < 0) {
+ virReportOOMError();
+ goto error;
+ }
+
+ if (VIR_ALLOC(data) < 0) {
+ pool->nWorkers--;
+ virReportOOMError();
+ goto error;
+ }
+
+ data->pool = pool;
+ data->cond = &pool->cond;
+
+ if (virThreadCreate(&pool->workers[pool->nWorkers - 1],
+ true,
+ virThreadPoolWorker,
+ data) < 0) {
+ VIR_FREE(data);
+ pool->nWorkers--;
+ goto error;
+ }
+ }
+
+ if (VIR_ALLOC(job) < 0) {
+ virReportOOMError();
+ goto error;
+ }
+
+ job->data = jobData;
+ job->priority = priority;
+
+ job->prev = pool->jobList.tail;
+ if (pool->jobList.tail)
+ pool->jobList.tail->next = job;
+ pool->jobList.tail = job;
+
+ if (!pool->jobList.head)
+ pool->jobList.head = job;
+
+ if (priority && !pool->jobList.firstPrio)
+ pool->jobList.firstPrio = job;
+
+ pool->jobQueueDepth++;
+
+ virCondSignal(&pool->cond);
+ if (priority)
+ virCondSignal(&pool->prioCond);
+
+ virMutexUnlock(&pool->mutex);
+ return 0;
+
+error:
+ virMutexUnlock(&pool->mutex);
+ return -1;
+}
diff --git a/src/util/virthreadpool.h b/src/util/virthreadpool.h
new file mode 100644
index 0000000..4479647
--- /dev/null
+++ b/src/util/virthreadpool.h
@@ -0,0 +1,53 @@
+/*
+ * threadpool.h: a generic thread pool implementation
+ *
+ * Copyright (C) 2010 Hu Tao
+ * Copyright (C) 2010 Daniel P. Berrange
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library. If not, see
+ * <
http://www.gnu.org/licenses/>.
+ *
+ * Author:
+ * Hu Tao <hutao(a)cn.fujitsu.com>
+ * Daniel P. Berrange <berrange(a)redhat.com>
+ */
+
+#ifndef __VIR_THREADPOOL_H__
+# define __VIR_THREADPOOL_H__
+
+# include "internal.h"
+
+typedef struct _virThreadPool virThreadPool;
+typedef virThreadPool *virThreadPoolPtr;
+
+typedef void (*virThreadPoolJobFunc)(void *jobdata, void *opaque);
+
+virThreadPoolPtr virThreadPoolNew(size_t minWorkers,
+ size_t maxWorkers,
+ size_t prioWorkers,
+ virThreadPoolJobFunc func,
+ void *opaque) ATTRIBUTE_NONNULL(4);
+
+size_t virThreadPoolGetMinWorkers(virThreadPoolPtr pool);
+size_t virThreadPoolGetMaxWorkers(virThreadPoolPtr pool);
+size_t virThreadPoolGetPriorityWorkers(virThreadPoolPtr pool);
+
+void virThreadPoolFree(virThreadPoolPtr pool);
+
+int virThreadPoolSendJob(virThreadPoolPtr pool,
+ unsigned int priority,
+ void *jobdata) ATTRIBUTE_NONNULL(1)
+ ATTRIBUTE_RETURN_CHECK;
+
+#endif
--
1.7.11.7