When either creating a threadpool, or creating a new thread to accomplish a job
that had been placed into the jobqueue, every time thread-specific data need to
be allocated, threadpool needs to be (re)-allocated and thread count indicators
updated. Make the code clearer to read by compressing these operations into a
more complex one.
---
src/util/virthreadpool.c | 109 +++++++++++++++++++----------------------------
1 file changed, 44 insertions(+), 65 deletions(-)
diff --git a/src/util/virthreadpool.c b/src/util/virthreadpool.c
index f640448..8af4ec0 100644
--- a/src/util/virthreadpool.c
+++ b/src/util/virthreadpool.c
@@ -157,6 +157,43 @@ static void virThreadPoolWorker(void *opaque)
virMutexUnlock(&pool->mutex);
}
+static int
+virThreadPoolExpand(virThreadPoolPtr pool, size_t gain, bool priority)
+{
+ virThreadPtr workers = priority ? pool->prioWorkers : pool->workers;
+ size_t *curWorkers = priority ? &pool->nPrioWorkers : &pool->nWorkers;
+ size_t i = 0;
+ struct virThreadPoolWorkerData *data = NULL;
+
+ if (VIR_EXPAND_N(workers, *curWorkers, gain) < 0)
+ return -1;
+
+ for (i = 0; i < gain; i++) {
+ if (VIR_ALLOC(data) < 0)
+ goto error;
+
+ data->pool = pool;
+ data->cond = priority ? &pool->prioCond : &pool->cond;
+ data->priority = priority;
+
+ if (virThreadCreateFull(&workers[i],
+ false,
+ virThreadPoolWorker,
+ pool->jobFuncName,
+ true,
+ data) < 0) {
+ VIR_FREE(data);
+ goto error;
+ }
+ }
+
+ return 0;
+
+ error:
+ *curWorkers -= gain - i;
+ return -1;
+}
+
virThreadPoolPtr
virThreadPoolNewFull(size_t minWorkers,
size_t maxWorkers,
@@ -166,8 +203,6 @@ virThreadPoolNewFull(size_t minWorkers,
void *opaque)
{
virThreadPoolPtr pool;
- size_t i;
- struct virThreadPoolWorkerData *data = NULL;
if (minWorkers > maxWorkers)
minWorkers = maxWorkers;
@@ -188,58 +223,23 @@ virThreadPoolNewFull(size_t minWorkers,
if (virCondInit(&pool->quit_cond) < 0)
goto error;
- if (VIR_ALLOC_N(pool->workers, minWorkers) < 0)
- goto error;
-
pool->minWorkers = minWorkers;
pool->maxWorkers = maxWorkers;
- for (i = 0; i < minWorkers; i++) {
- if (VIR_ALLOC(data) < 0)
- goto error;
- data->pool = pool;
- data->cond = &pool->cond;
-
- if (virThreadCreateFull(&pool->workers[i],
- false,
- virThreadPoolWorker,
- pool->jobFuncName,
- true,
- data) < 0) {
- goto error;
- }
- pool->nWorkers++;
- }
+ if (virThreadPoolExpand(pool, minWorkers, false) < 0)
+ goto error;
if (prioWorkers) {
if (virCondInit(&pool->prioCond) < 0)
goto error;
- if (VIR_ALLOC_N(pool->prioWorkers, prioWorkers) < 0)
- goto error;
- for (i = 0; i < prioWorkers; i++) {
- if (VIR_ALLOC(data) < 0)
- goto error;
- data->pool = pool;
- data->cond = &pool->prioCond;
- data->priority = true;
-
- if (virThreadCreateFull(&pool->prioWorkers[i],
- false,
- virThreadPoolWorker,
- pool->jobFuncName,
- true,
- data) < 0) {
- goto error;
- }
- pool->nPrioWorkers++;
- }
+ if (virThreadPoolExpand(pool, prioWorkers, true) < 0)
+ goto error;
}
return pool;
error:
- VIR_FREE(data);
virThreadPoolFree(pool);
return NULL;
@@ -307,36 +307,15 @@ int virThreadPoolSendJob(virThreadPoolPtr pool,
void *jobData)
{
virThreadPoolJobPtr job;
- struct virThreadPoolWorkerData *data = NULL;
virMutexLock(&pool->mutex);
if (pool->quit)
goto error;
if (pool->freeWorkers - pool->jobQueueDepth <= 0 &&
- pool->nWorkers < pool->maxWorkers) {
- if (VIR_EXPAND_N(pool->workers, pool->nWorkers, 1) < 0)
- goto error;
-
- if (VIR_ALLOC(data) < 0) {
- pool->nWorkers--;
- goto error;
- }
-
- data->pool = pool;
- data->cond = &pool->cond;
-
- if (virThreadCreateFull(&pool->workers[pool->nWorkers - 1],
- false,
- virThreadPoolWorker,
- pool->jobFuncName,
- true,
- data) < 0) {
- VIR_FREE(data);
- pool->nWorkers--;
- goto error;
- }
- }
+ pool->nWorkers < pool->maxWorkers &&
+ virThreadPoolExpand(pool, 1, false) < 0)
+ goto error;
if (VIR_ALLOC(job) < 0)
goto error;
--
2.4.3