* src/util/threadpool.c, src/util/threadpool.h: Thread pool
implementation
* src/Makefile.am: Build thread pool
* src/libvirt_private.syms: Export public functions
---
cfg.mk | 1 +
src/Makefile.am | 1 +
src/libvirt_private.syms | 6 +
src/util/threadpool.c | 235 ++++++++++++++++++++++++++++++++++++++++++++++
src/util/threadpool.h | 49 ++++++++++
5 files changed, 292 insertions(+), 0 deletions(-)
create mode 100644 src/util/threadpool.c
create mode 100644 src/util/threadpool.h
diff --git a/cfg.mk b/cfg.mk
index 5576ecb..e4ee763 100644
--- a/cfg.mk
+++ b/cfg.mk
@@ -127,6 +127,7 @@ useless_free_options = \
--name=virStoragePoolObjFree \
--name=virStoragePoolSourceFree \
--name=virStorageVolDefFree \
+ --name=virThreadPoolFree \
--name=xmlFree \
--name=xmlXPathFreeContext \
--name=xmlXPathFreeObject
diff --git a/src/Makefile.am b/src/Makefile.am
index a9a1986..d71c644 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -73,6 +73,7 @@ UTIL_SOURCES = \
util/threads.c util/threads.h \
util/threads-pthread.h \
util/threads-win32.h \
+ util/threadpool.c util/threadpool.h \
util/uuid.c util/uuid.h \
util/util.c util/util.h \
util/xml.c util/xml.h \
diff --git a/src/libvirt_private.syms b/src/libvirt_private.syms
index f251c94..70c68cb 100644
--- a/src/libvirt_private.syms
+++ b/src/libvirt_private.syms
@@ -861,3 +861,9 @@ virXPathStringLimit;
virXPathULong;
virXPathULongHex;
virXPathULongLong;
+
+
+# threadpool.h
+virThreadPoolNew;
+virThreadPoolFree;
+virThreadPoolSendJob;
diff --git a/src/util/threadpool.c b/src/util/threadpool.c
new file mode 100644
index 0000000..a5f24c2
--- /dev/null
+++ b/src/util/threadpool.c
@@ -0,0 +1,235 @@
+/*
+ * threadpool.c: a generic thread pool implementation
+ *
+ * Copyright (C) 2010 Hu Tao
+ * Copyright (C) 2010 Daniel P. Berrange
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ *
+ * Authors:
+ * Hu Tao <hutao(a)cn.fujitsu.com>
+ * Daniel P. Berrange <berrange(a)redhat.com>
+ */
+
+#include <config.h>
+
+#include "threadpool.h"
+#include "memory.h"
+#include "threads.h"
+#include "virterror_internal.h"
+#include "ignore-value.h"
+
+#define VIR_FROM_THIS VIR_FROM_NONE
+
+typedef struct _virThreadPoolJob virThreadPoolJob;
+typedef virThreadPoolJob *virThreadPoolJobPtr;
+
+struct _virThreadPoolJob {
+ virThreadPoolJobPtr next;
+
+ void *data;
+};
+
+typedef struct _virThreadPoolJobList virThreadPoolJobList;
+typedef virThreadPoolJobList *virThreadPoolJobListPtr;
+
+struct _virThreadPoolJobList {
+ virThreadPoolJobPtr head;
+ virThreadPoolJobPtr *tail;
+};
+
+
+struct _virThreadPool {
+ bool quit;
+
+ virThreadPoolJobFunc jobFunc;
+ void *jobOpaque;
+ virThreadPoolJobList jobList;
+
+ virMutex mutex;
+ virCond cond;
+ virCond quit_cond;
+
+ size_t maxWorkers;
+ size_t freeWorkers;
+ size_t nWorkers;
+ virThreadPtr workers;
+};
+
+static void virThreadPoolWorker(void *opaque)
+{
+ virThreadPoolPtr pool = opaque;
+
+ virMutexLock(&pool->mutex);
+
+ while (1) {
+ while (!pool->quit &&
+ !pool->jobList.head) {
+ pool->freeWorkers++;
+ if (virCondWait(&pool->cond, &pool->mutex) < 0) {
+ pool->freeWorkers--;
+ goto out;
+ }
+ pool->freeWorkers--;
+ }
+
+ if (pool->quit)
+ break;
+
+ virThreadPoolJobPtr job = pool->jobList.head;
+ pool->jobList.head = pool->jobList.head->next;
+ job->next = NULL;
+ if (pool->jobList.tail == &job->next)
+ pool->jobList.tail = &pool->jobList.head;
+
+ virMutexUnlock(&pool->mutex);
+ (pool->jobFunc)(job->data, pool->jobOpaque);
+ VIR_FREE(job);
+ virMutexLock(&pool->mutex);
+ }
+
+out:
+ pool->nWorkers--;
+ if (pool->nWorkers == 0)
+ virCondSignal(&pool->quit_cond);
+ virMutexUnlock(&pool->mutex);
+}
+
+virThreadPoolPtr virThreadPoolNew(size_t minWorkers,
+ size_t maxWorkers,
+ virThreadPoolJobFunc func,
+ void *opaque)
+{
+ virThreadPoolPtr pool;
+ size_t i;
+
+ if (minWorkers > maxWorkers)
+ minWorkers = maxWorkers;
+
+ if (VIR_ALLOC(pool) < 0) {
+ virReportOOMError();
+ return NULL;
+ }
+
+ pool->jobList.head = NULL;
+ pool->jobList.tail = &pool->jobList.head;
+
+ pool->jobFunc = func;
+ pool->jobOpaque = opaque;
+
+ if (virMutexInit(&pool->mutex) < 0)
+ goto error;
+ if (virCondInit(&pool->cond) < 0)
+ goto error;
+ if (virCondInit(&pool->quit_cond) < 0)
+ goto error;
+
+ if (VIR_ALLOC_N(pool->workers, minWorkers) < 0)
+ goto error;
+
+ pool->maxWorkers = maxWorkers;
+ for (i = 0; i < minWorkers; i++) {
+ if (virThreadCreate(&pool->workers[i],
+ true,
+ virThreadPoolWorker,
+ pool) < 0) {
+ virThreadPoolFree(pool);
+ return NULL;
+ }
+ pool->nWorkers++;
+ }
+
+ return pool;
+
+error:
+ VIR_FREE(pool->workers);
+ ignore_value(virCondDestroy(&pool->quit_cond));
+ ignore_value(virCondDestroy(&pool->cond));
+ virMutexDestroy(&pool->mutex);
+ return NULL;
+
+}
+
+void virThreadPoolFree(virThreadPoolPtr pool)
+{
+ virThreadPoolJobPtr job;
+
+ if (!pool)
+ return;
+
+ virMutexLock(&pool->mutex);
+ pool->quit = true;
+ if (pool->nWorkers > 0) {
+ virCondBroadcast(&pool->cond);
+ ignore_value(virCondWait(&pool->quit_cond, &pool->mutex));
+ }
+
+ while ((job = pool->jobList.head)) {
+ pool->jobList.head = pool->jobList.head->next;
+ VIR_FREE(job);
+ }
+
+ VIR_FREE(pool->workers);
+ virMutexUnlock(&pool->mutex);
+ virMutexDestroy(&pool->mutex);
+ ignore_value(virCondDestroy(&pool->quit_cond));
+ ignore_value(virCondDestroy(&pool->cond));
+ VIR_FREE(pool);
+}
+
+int virThreadPoolSendJob(virThreadPoolPtr pool,
+ void *jobData)
+{
+ virThreadPoolJobPtr job;
+
+ virMutexLock(&pool->mutex);
+ if (pool->quit)
+ goto error;
+
+ if (pool->freeWorkers == 0 &&
+ pool->nWorkers < pool->maxWorkers) {
+ if (VIR_EXPAND_N(pool->workers, pool->nWorkers, 1) < 0) {
+ virReportOOMError();
+ goto error;
+ }
+
+ if (virThreadCreate(&pool->workers[pool->nWorkers - 1],
+ true,
+ virThreadPoolWorker,
+ pool) < 0) {
+ pool->nWorkers--;
+ goto error;
+ }
+ }
+
+ if (VIR_ALLOC(job) < 0) {
+ virReportOOMError();
+ goto error;
+ }
+
+ job->data = jobData;
+ job->next = NULL;
+ *pool->jobList.tail = job;
+ pool->jobList.tail = &(*pool->jobList.tail)->next;
+
+ virCondSignal(&pool->cond);
+ virMutexUnlock(&pool->mutex);
+
+ return 0;
+
+error:
+ virMutexUnlock(&pool->mutex);
+ return -1;
+}
diff --git a/src/util/threadpool.h b/src/util/threadpool.h
new file mode 100644
index 0000000..9ff27ec
--- /dev/null
+++ b/src/util/threadpool.h
@@ -0,0 +1,49 @@
+/*
+ * threadpool.h: a generic thread pool implementation
+ *
+ * Copyright (C) 2010 Hu Tao
+ * Copyright (C) 2010 Daniel P. Berrange
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ *
+ * Author:
+ * Hu Tao <hutao(a)cn.fujitsu.com>
+ * Daniel P. Berrange <berrange(a)redhat.com>
+ */
+
+#ifndef __VIR_THREADPOOL_H__
+#define __VIR_THREADPOOL_H__
+
+#include "threads.h"
+
+typedef struct _virThreadPool virThreadPool;
+typedef virThreadPool *virThreadPoolPtr;
+
+typedef void (*virThreadPoolJobFunc)(void *jobdata, void *opaque);
+
+virThreadPoolPtr virThreadPoolNew(size_t minWorkers,
+ size_t maxWorkers,
+ virThreadPoolJobFunc func,
+ void *opaque) ATTRIBUTE_NONNULL(3)
+ ATTRIBUTE_RETURN_CHECK;
+
+void virThreadPoolFree(virThreadPoolPtr pool);
+
+int virThreadPoolSendJob(virThreadPoolPtr pool,
+ void *jobdata) ATTRIBUTE_NONNULL(1)
+ ATTRIBUTE_NONNULL(2)
+ ATTRIBUTE_RETURN_CHECK;
+
+#endif
--
1.7.3
--
Thanks,
Hu Tao