#include #include "threadpool.h" #include "memory.h" #include "threads.h" #include "virterror_internal.h" #define VIR_FROM_THIS VIR_FROM_NONE typedef struct _virThreadPoolJob virThreadPoolJob; typedef virThreadPoolJob *virThreadPoolJobPtr; struct _virThreadPoolJob { virThreadPoolJobPtr next; void *data; }; struct _virThreadPool { int quit; virThreadPoolJobFunc jobFunc; void *jobOpaque; virThreadPoolJobPtr jobList; virMutex mutex; virCond cond; virCond quit_cond; size_t maxWorkers; size_t freeWorkers; size_t nWorkers; virThreadPtr workers; }; static void virThreadPoolWorker(void *opaque) { virThreadPoolPtr pool = opaque; virMutexLock(&pool->mutex); while (1) { while (!pool->quit && !pool->jobList) { pool->freeWorkers++; if (virCondWait(&pool->cond, &pool->mutex) < 0) { pool->freeWorkers--; break; } pool->freeWorkers--; } if (pool->quit) break; virThreadPoolJobPtr job = pool->jobList; pool->jobList = pool->jobList->next; job->next = NULL; virMutexUnlock(&pool->mutex); (pool->jobFunc)(job->data, pool->jobOpaque); VIR_FREE(job); virMutexLock(&pool->mutex); } pool->nWorkers--; if (pool->nWorkers == 0) virCondSignal(&pool->quit_cond); virMutexUnlock(&pool->mutex); } virThreadPoolPtr virThreadPoolNew(size_t minWorkers, size_t maxWorkers, virThreadPoolJobFunc func, void *opaque) { virThreadPoolPtr pool; int i; if (VIR_ALLOC(pool) < 0) { virReportOOMError(); return NULL; } pool->jobFunc = func; pool->jobOpaque = opaque; if (virMutexInit(&pool->mutex) < 0) goto error; if (virCondInit(&pool->cond) < 0) goto error; if (virCondInit(&pool->quit_cond) < 0) goto error; if (VIR_ALLOC_N(pool->workers, minWorkers) < 0) goto error; pool->maxWorkers = maxWorkers; for (i = 0; i < minWorkers; i++) { if (virThreadCreate(&pool->workers[i], true, virThreadPoolWorker, pool) < 0) goto error; pool->nWorkers++; } return pool; error: virThreadPoolFree(pool); return NULL; } void virThreadPoolFree(virThreadPoolPtr pool) { virMutexLock(&pool->mutex); pool->quit = 1; if (pool->nWorkers > 0) { virCondBroadcast(&pool->cond); if (virCondWait(&pool->quit_cond, &pool->mutex) < 0) {} } VIR_FREE(pool->workers); virMutexUnlock(&pool->mutex); VIR_FREE(pool); } int virThreadPoolSendJob(virThreadPoolPtr pool, void *jobData) { virThreadPoolJobPtr job; virThreadPoolJobPtr tmp; virMutexLock(&pool->mutex); if (pool->quit) goto error; if (pool->freeWorkers == 0 && pool->nWorkers < pool->maxWorkers) { if (VIR_EXPAND_N(pool->workers, pool->nWorkers, 1) < 0) { virReportOOMError(); goto error; } if (virThreadCreate(&pool->workers[pool->nWorkers], true, virThreadPoolWorker, pool) < 0) goto error; pool->nWorkers++; } if (VIR_ALLOC(job) < 0) { virReportOOMError(); goto error; } job->data = jobData; tmp = pool->jobList; while (tmp && tmp->next) tmp = tmp->next; if (tmp) tmp->next = job; else pool->jobList = job; virCondSignal(&pool->cond); virMutexUnlock(&pool->mutex); return 0; error: virMutexUnlock(&pool->mutex); return -1; }