---
src/Makefile.am | 1 +
src/util/threadpool.c | 172 +++++++++++++++++++++++++++++++++++++++++++++++++
src/util/threadpool.h | 62 ++++++++++++++++++
3 files changed, 235 insertions(+), 0 deletions(-)
create mode 100644 src/util/threadpool.c
create mode 100644 src/util/threadpool.h
diff --git a/src/Makefile.am b/src/Makefile.am
index a9a1986..5febd76 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -76,6 +76,7 @@ UTIL_SOURCES = \
util/uuid.c util/uuid.h \
util/util.c util/util.h \
util/xml.c util/xml.h \
+ util/threadpool.c util/threadpool.h \
util/virtaudit.c util/virtaudit.h \
util/virterror.c util/virterror_internal.h
diff --git a/src/util/threadpool.c b/src/util/threadpool.c
new file mode 100644
index 0000000..79f9fbb
--- /dev/null
+++ b/src/util/threadpool.c
@@ -0,0 +1,172 @@
+/*
+ * threadpool.c: a generic thread pool implementation
+ *
+ * Copyright (C) 2010 Hu Tao
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ *
+ * Author:
+ * Hu Tao <hutao(a)cn.fujitsu.com>
+ */
+
+#include <config.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "threadpool.h"
+#include "memory.h"
+
+static void workerHandleJob(void *data)
+{
+ virDataPtr localData = NULL;
+ virWorkerPoolPtr pool = data;
+
+ virMutexLock(&pool->mutex);
+
+ while (1) {
+ while (!pool->quit && !pool->dataList) {
+ pool->nFreeWorker++;
+ virCondSignal(&pool->worker_cond);
+ if (virCondWait(&pool->cond, &pool->mutex) < 0) {
+ pool->nFreeWorker--;
+ goto out;
+ }
+ pool->nFreeWorker--;
+
+ if (pool->nWorker > pool->nMaxWorker)
+ goto out;
+ }
+
+ while ((localData = pool->dataList) != NULL) {
+ pool->dataList = pool->dataList->next;
+ localData->next = NULL;
+
+ virMutexUnlock(&pool->mutex);
+
+ (pool->func)(localData->data);
+ VIR_FREE(localData);
+
+ virMutexLock(&pool->mutex);
+ }
+
+ if (pool->quit)
+ break;
+ }
+
+out:
+ pool->nWorker--;
+ if (pool->nWorker == 0)
+ virCondSignal(&pool->quit_cond);
+ virMutexUnlock(&pool->mutex);
+}
+
+virWorkerPoolPtr virWorkerPoolNew(int nWorker, int maxWorker, virWorkerFunc func)
+{
+ virWorkerPoolPtr pool;
+ virThread thread;
+ int i;
+
+ if (nWorker < 0)
+ return NULL;
+
+ if (nWorker > maxWorker)
+ return NULL;
+
+ if (VIR_ALLOC(pool))
+ return NULL;
+
+ memset(pool, 0, sizeof(*pool));
+ pool->func = func;
+ if (virMutexInit(&pool->mutex) < 0)
+ goto error;
+ if (virCondInit(&pool->cond) < 0)
+ goto error;
+ if (virCondInit(&pool->worker_cond) < 0)
+ goto error;
+ if (virCondInit(&pool->quit_cond) < 0)
+ goto error;
+
+ for (i = 0; i < nWorker; i++) {
+ if (virThreadCreate(&thread, true, workerHandleJob, pool) < 0) {
+ pool->nWorker = i;
+ goto error;
+ }
+ }
+
+ pool->nFreeWorker = 0;
+ pool->nWorker = nWorker;
+ pool->nMaxWorker = maxWorker;
+
+ return pool;
+error:
+ virWorkerPoolFree(pool);
+ return NULL;
+}
+
+void virWorkerPoolFree(virWorkerPoolPtr pool)
+{
+ virMutexLock(&pool->mutex);
+ pool->quit = true;
+ if (pool->nWorker > 0) {
+ virCondBroadcast(&pool->cond);
+ virCondWait(&pool->quit_cond, &pool->mutex);
+ }
+ virMutexUnlock(&pool->mutex);
+ VIR_FREE(pool);
+}
+
+int virWorkerPoolSendJob(virWorkerPoolPtr pool, void *data)
+{
+ virThread thread;
+ virDataPtr localData;
+
+ if (VIR_ALLOC(localData))
+ return -1;
+
+ localData->data = data;
+
+ virMutexLock(&pool->mutex);
+ if (pool->quit) {
+ virMutexUnlock(&pool->mutex);
+ VIR_FREE(localData);
+ return -1;
+ }
+
+ localData->next = pool->dataList;
+ pool->dataList = localData;
+
+ if (pool->nFreeWorker == 0 && pool->nWorker < pool->nMaxWorker)
{
+ if (virThreadCreate(&thread, true, workerHandleJob, pool) == 0)
+ pool->nWorker++;
+ }
+
+ virCondSignal(&pool->cond);
+ virMutexUnlock(&pool->mutex);
+
+ return 0;
+}
+
+int virWorkerPoolSetMaxWorker(virWorkerPoolPtr pool, int maxWorker)
+{
+ if (maxWorker < 0)
+ return -1;
+
+ virMutexLock(&pool->mutex);
+ pool->nMaxWorker = maxWorker;
+ virMutexUnlock(&pool->mutex);
+
+ return 0;
+}
diff --git a/src/util/threadpool.h b/src/util/threadpool.h
new file mode 100644
index 0000000..f8039e6
--- /dev/null
+++ b/src/util/threadpool.h
@@ -0,0 +1,62 @@
+/*
+ * threadpool.h: a generic thread pool implementation
+ *
+ * Copyright (C) 2010 Hu Tao
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ *
+ * Author:
+ * Hu Tao <hutao(a)cn.fujitsu.com>
+ */
+
+#ifndef __VIR_THREADPOOL_H__
+#define __VIR_THREADPOOL_H__
+
+#include "threads.h"
+
+typedef void (*virWorkerFunc)(void *);
+
+struct _virData {
+ struct _virData *next;
+
+ void *data;
+};
+typedef struct _virData virData;
+typedef virData *virDataPtr;
+
+struct _virWorkerPool {
+ size_t nWorker;
+ size_t nMaxWorker;
+ size_t nFreeWorker;
+
+ bool quit;
+
+ virWorkerFunc func;
+ virDataPtr dataList;
+
+ virMutex mutex;
+ virCond cond;
+ virCond worker_cond;
+ virCond quit_cond;
+};
+typedef struct _virWorkerPool virWorkerPool;
+typedef virWorkerPool *virWorkerPoolPtr;
+
+virWorkerPoolPtr virWorkerPoolNew(int nWorker, int nMaxWorker, virWorkerFunc func)
ATTRIBUTE_RETURN_CHECK;
+void virWorkerPoolFree(virWorkerPoolPtr pool);
+int virWorkerPoolSendJob(virWorkerPoolPtr pool, void *data) ATTRIBUTE_NONNULL(1);
+int virWorkerPoolSetMaxWorker(virWorkerPoolPtr pool, int maxWorker)
ATTRIBUTE_NONNULL(1);
+
+#endif
--
1.7.3
--
Thanks,
Hu Tao