
On 11/30/2010 12:14 AM, Hu Tao wrote:
--- src/Makefile.am | 1 + src/util/threadpool.c | 140 +++++++++++++++++++++++++++++++++++++++++++++++++ src/util/threadpool.h | 35 ++++++++++++ 3 files changed, 176 insertions(+), 0 deletions(-) create mode 100644 src/util/threadpool.c create mode 100644 src/util/threadpool.h
diff --git a/src/Makefile.am b/src/Makefile.am index a9a1986..5febd76 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -76,6 +76,7 @@ UTIL_SOURCES = \ util/uuid.c util/uuid.h \ util/util.c util/util.h \ util/xml.c util/xml.h \ + util/threadpool.c util/threadpool.h \ util/virtaudit.c util/virtaudit.h \ util/virterror.c util/virterror_internal.h
diff --git a/src/util/threadpool.c b/src/util/threadpool.c new file mode 100644 index 0000000..4bf0f8d --- /dev/null +++ b/src/util/threadpool.c @@ -0,0 +1,140 @@
Copyright header?
+#include <config.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> + +#include "threadpool.h" + +static void *workerHandleJob(void *data) +{ + struct virData *localData = NULL; + struct virWorkerPool *pool = data; + + pthread_mutex_lock(&pool->mutex);
We should be using virMutexLock here, so as to also be portable to mingw.
+ + while (1) { + while (!pool->quit && !pool->dataList) { + pool->nFreeWorker++; + pthread_cond_signal(&pool->worker_cond);
Likewise, virCondSignal here.
+ pthread_cond_wait(&pool->cond, &pool->mutex);
and virCondWait.
+ pool->nFreeWorker--; + + if (pool->nWorker > pool->nMaxWorker) + goto out; + } + + while ((localData = pool->dataList) != NULL) { + pool->dataList = pool->dataList->next; + localData->next = NULL; + + pthread_mutex_unlock(&pool->mutex); + + (pool->func)(localData->data); + free(localData);
VIR_FREE().
+ + pthread_mutex_lock(&pool->mutex); + } + + if (pool->quit) + break; + } + +out: + pool->nWorker--; + if (pool->nWorker == 0) + pthread_cond_signal(&pool->quit_cond); + pthread_mutex_unlock(&pool->mutex); + + return NULL; +} + +struct virWorkerPool *virWorkerPoolNew(int nWorker, int maxWorker, virWorkerFunc func) +{ + struct virWorkerPool *pool; + pthread_t pid; + int i; + + if (nWorker < 0) + return NULL; + + if (nWorker > maxWorker) + return NULL;
daemon/libvirtd.c already has a notion of worker threads; I'm wondering how much overlap there is between your implementation and that one. A better proof that this would be a useful API addition would be to have the next patch in the series convert libvirtd.c over to using this API.
+ + pool = malloc(sizeof(*pool));
Run 'make syntax-check' - it would have complained about this. Use VIR_ALLOC or VIR_ALLOC_N instead of malloc.
+ if (!pool) + return NULL; + + memset(pool, 0, sizeof(*pool)); + pool->func = func; + pthread_mutex_init(&pool->mutex, NULL);
virMutexInit()
+ pthread_cond_init(&pool->cond, NULL); + pthread_cond_init(&pool->worker_cond, NULL); + pthread_cond_init(&pool->quit_cond, NULL);
virCondInit()
+ + for (i = 0; i < nWorker; i++) { + pthread_create(&pid, NULL, workerHandleJob, pool);
virThreadCreate()
+ } + + pool->nFreeWorker = 0; + pool->nWorker = nWorker; + pool->nMaxWorker = maxWorker; + + return pool; +} + +void virWorkerPoolFree(struct virWorkerPool *pool) +{ + pthread_mutex_lock(&pool->mutex); + pool->quit = 1;
Use <stdbool.h> and bool if a value will only ever be 0 or 1.
+ if (pool->nWorker > 0) { + pthread_cond_broadcast(&pool->cond); + pthread_cond_wait(&pool->quit_cond, &pool->mutex); + } + pthread_mutex_unlock(&pool->mutex); + free(pool);
VIR_FREE()
+} + +int virWorkerPoolSendJob(struct virWorkerPool *pool, void *data) +{ + pthread_t pid; + struct virData *localData; + + localData = malloc(sizeof(*localData));
VIR_ALLOC()
+ if (!localData) + return -1; + + localData->data = data; + + pthread_mutex_lock(&pool->mutex); + if (pool->quit) { + pthread_mutex_unlock(&pool->mutex); + free(localData); + return -1; + } + + localData->next = pool->dataList; + pool->dataList = localData; + + if (pool->nFreeWorker == 0 && pool->nWorker < pool->nMaxWorker) { + pthread_create(&pid, NULL, workerHandleJob, pool); + pool->nWorker++; + } + + pthread_cond_signal(&pool->cond); + + pthread_mutex_unlock(&pool->mutex); + + return 0; +} + +int virWorkerPoolSetMaxWorker(struct virWorkerPool *pool, int maxWorker) +{ + if (maxWorker < 0) + return -1; + + pthread_mutex_lock(&pool->mutex); + pool->nMaxWorker = maxWorker; + pthread_mutex_unlock(&pool->mutex);
Does this do the right thing if maxWorker < pool->nMaxWorker, or does it silently lose existing workers?
+ + return 0; +} diff --git a/src/util/threadpool.h b/src/util/threadpool.h new file mode 100644 index 0000000..5ff3a6b --- /dev/null +++ b/src/util/threadpool.h @@ -0,0 +1,35 @@
Copyright header?
+#ifndef __THREADPOOL_H__ +#define __THREADPOOL_H__
Use of the __ namespace risks collision with the system; I'd feel better if this were __VIR_THREADPOOL_H__.
+ +#include <pthread.h>
"threads.h", not <pthread.h>, so we can support mingw
+ +typedef void (*virWorkerFunc)(void *);
pthread_create() takes a function that can return void*. Should worker functions be allowed to return a value?
+ +struct virData { + struct virData *next; + + void *data; +};
We've typically used typedefs to avoid having to type 'struct virData' everywhere else.
+ +struct virWorkerPool { + int nWorker; + int nMaxWorker; + int nFreeWorker;
s/int/size_t/ when dealing with non-zero counts.
+ + int quit;
s/int/bool/
+ + virWorkerFunc func; + struct virData *dataList; + + pthread_mutex_t mutex; + pthread_cond_t cond; + pthread_cond_t worker_cond; + pthread_cond_t quit_cond;
virMutex, virCond
+}; + +struct virWorkerPool *virWorkerPoolNew(int nWorker, int nMaxWorker, virWorkerFunc func);
needs ATTRIBUTE_RETURN_CHECK.
+void virWorkerPoolFree(struct virWorkerPool *pool); +int virWorkerPoolSendJob(struct virWorkerPool *wp, void *data);
ATTRIBUTE_NONNULL(1)
+int virWorkerPoolSetMaxWorker(struct virWorkerPool *pool, int maxWorker);
ATTRIBUTE_NONNULL(1)
+ +#endif
-- Eric Blake eblake@redhat.com +1-801-349-2682 Libvirt virtualization library http://libvirt.org