On Thu, Dec 02, 2010 at 03:26:57PM +0800, Hu Tao wrote:
+virThreadPoolPtr virThreadPoolNew(size_t minWorkers,
+ size_t maxWorkers,
+ virThreadPoolJobFunc func,
+ void *opaque)
+{
+ virThreadPoolPtr pool;
+ size_t i;
+
+ if (minWorkers > maxWorkers)
+ minWorkers = maxWorkers;
+
+ if (VIR_ALLOC(pool) < 0) {
+ virReportOOMError();
+ return NULL;
+ }
+
+ pool->jobList.head = NULL;
+ pool->jobList.tail = &pool->jobList.head;
+
+ pool->jobFunc = func;
+ pool->jobOpaque = opaque;
+
+ if (virMutexInit(&pool->mutex) < 0)
+ goto error;
+ if (virCondInit(&pool->cond) < 0)
+ goto error;
+ if (virCondInit(&pool->quit_cond) < 0)
+ goto error;
+
+ if (VIR_ALLOC_N(pool->workers, minWorkers) < 0)
+ goto error;
+
+ pool->maxWorkers = maxWorkers;
+ for (i = 0; i < minWorkers; i++) {
+ if (virThreadCreate(&pool->workers[i],
+ true,
+ virThreadPoolWorker,
+ pool) < 0) {
+ virThreadPoolFree(pool);
+ return NULL;
+ }
+ pool->nWorkers++;
+ }
+
+ return pool;
+
+error:
+ VIR_FREE(pool->workers);
+ ignore_value(virCondDestroy(&pool->quit_cond));
+ ignore_value(virCondDestroy(&pool->cond));
+ virMutexDestroy(&pool->mutex);
+ return NULL;
+}
This is leaking 'pool' on error. IMHO it is preferrable to
just call virThreadPoolDestroy, otherwise anytime we add
another field to virThreadPoolPtr struct, we have to consider
updating 2 cleanup paths.
+
+void virThreadPoolFree(virThreadPoolPtr pool)
+{
+ virThreadPoolJobPtr job;
+
+ if (!pool)
+ return;
+
+ virMutexLock(&pool->mutex);
+ pool->quit = true;
+ if (pool->nWorkers > 0) {
+ virCondBroadcast(&pool->cond);
+ ignore_value(virCondWait(&pool->quit_cond, &pool->mutex));
+ }
+
+ while ((job = pool->jobList.head)) {
+ pool->jobList.head = pool->jobList.head->next;
+ VIR_FREE(job);
+ }
+
+ VIR_FREE(pool->workers);
+ virMutexUnlock(&pool->mutex);
+ virMutexDestroy(&pool->mutex);
+ ignore_value(virCondDestroy(&pool->quit_cond));
+ ignore_value(virCondDestroy(&pool->cond));
+ VIR_FREE(pool);
+}
+
+int virThreadPoolSendJob(virThreadPoolPtr pool,
+ void *jobData)
+{
+ virThreadPoolJobPtr job;
+
+ virMutexLock(&pool->mutex);
+ if (pool->quit)
+ goto error;
+
+ if (pool->freeWorkers == 0 &&
+ pool->nWorkers < pool->maxWorkers) {
+ if (VIR_EXPAND_N(pool->workers, pool->nWorkers, 1) < 0) {
+ virReportOOMError();
+ goto error;
+ }
+
+ if (virThreadCreate(&pool->workers[pool->nWorkers - 1],
+ true,
+ virThreadPoolWorker,
+ pool) < 0) {
+ pool->nWorkers--;
+ goto error;
+ }
Small typo, that check should "!= NULL", rather than "< 0".
+ }
+
+ if (VIR_ALLOC(job) < 0) {
+ virReportOOMError();
+ goto error;
+ }
+
+ job->data = jobData;
+ job->next = NULL;
+ *pool->jobList.tail = job;
+ pool->jobList.tail = &(*pool->jobList.tail)->next;
+
+ virCondSignal(&pool->cond);
+ virMutexUnlock(&pool->mutex);
+
+ return 0;
+
+error:
+ virMutexUnlock(&pool->mutex);
+ return -1;
+}
diff --git a/src/util/threadpool.h b/src/util/threadpool.h
new file mode 100644
index 0000000..9ff27ec
--- /dev/null
+++ b/src/util/threadpool.h
@@ -0,0 +1,49 @@
+#ifndef __VIR_THREADPOOL_H__
+#define __VIR_THREADPOOL_H__
+
+#include "threads.h"
There's no need to include threads.h here since no virThread
stuff is exposed in the API. Just use internal.h instead.
+
+typedef struct _virThreadPool virThreadPool;
+typedef virThreadPool *virThreadPoolPtr;
+
+typedef void (*virThreadPoolJobFunc)(void *jobdata, void *opaque);
+
+virThreadPoolPtr virThreadPoolNew(size_t minWorkers,
+ size_t maxWorkers,
+ virThreadPoolJobFunc func,
+ void *opaque) ATTRIBUTE_NONNULL(3)
+ ATTRIBUTE_RETURN_CHECK;
ATTRIBUTE_RETURN_CHECK doesn't serve any useful purpose
when placed on constructors, since the caller will always
"use" the return value by assigning the pointer to some
variable. The compiler can thus never detect whether you
check for null or not, even with this annotation.
+void virThreadPoolFree(virThreadPoolPtr pool);
+
+int virThreadPoolSendJob(virThreadPoolPtr pool,
+ void *jobdata) ATTRIBUTE_NONNULL(1)
+ ATTRIBUTE_NONNULL(2)
+ ATTRIBUTE_RETURN_CHECK;
Regards,
Daniel