Signed-off-by: Rafael Fonseca <r4f4rfs(a)gmail.com>
---
src/util/virthreadpool.c | 149 +++++++++++++--------------------------
1 file changed, 50 insertions(+), 99 deletions(-)
diff --git a/src/util/virthreadpool.c b/src/util/virthreadpool.c
index 379d2369ad..147943f395 100644
--- a/src/util/virthreadpool.c
+++ b/src/util/virthreadpool.c
@@ -59,9 +59,9 @@ struct _virThreadPool {
virThreadPoolJobList jobList;
size_t jobQueueDepth;
- virMutex mutex;
- virCond cond;
- virCond quit_cond;
+ GMutex mutex;
+ GCond cond;
+ GCond quit_cond;
size_t maxWorkers;
size_t minWorkers;
@@ -72,12 +72,12 @@ struct _virThreadPool {
size_t maxPrioWorkers;
size_t nPrioWorkers;
virThreadPtr prioWorkers;
- virCond prioCond;
+ GCond prioCond;
};
struct virThreadPoolWorkerData {
virThreadPoolPtr pool;
- virCondPtr cond;
+ GCond *cond;
bool priority;
};
@@ -93,7 +93,7 @@ static void virThreadPoolWorker(void *opaque)
{
struct virThreadPoolWorkerData *data = opaque;
virThreadPoolPtr pool = data->pool;
- virCondPtr cond = data->cond;
+ GCond *cond = data->cond;
bool priority = data->priority;
size_t *curWorkers = priority ? &pool->nPrioWorkers : &pool->nWorkers;
size_t *maxLimit = priority ? &pool->maxPrioWorkers :
&pool->maxWorkers;
@@ -101,7 +101,7 @@ static void virThreadPoolWorker(void *opaque)
VIR_FREE(data);
- virMutexLock(&pool->mutex);
+ g_mutex_lock(&pool->mutex);
while (1) {
/* In order to support async worker termination, we need ensure that
@@ -117,11 +117,7 @@ static void virThreadPoolWorker(void *opaque)
(priority && !pool->jobList.firstPrio))) {
if (!priority)
pool->freeWorkers++;
- if (virCondWait(cond, &pool->mutex) < 0) {
- if (!priority)
- pool->freeWorkers--;
- goto out;
- }
+ g_cond_wait(cond, &pool->mutex);
if (!priority)
pool->freeWorkers--;
@@ -159,10 +155,10 @@ static void virThreadPoolWorker(void *opaque)
pool->jobQueueDepth--;
- virMutexUnlock(&pool->mutex);
+ g_mutex_unlock(&pool->mutex);
(pool->jobFunc)(job->data, pool->jobOpaque);
VIR_FREE(job);
- virMutexLock(&pool->mutex);
+ g_mutex_lock(&pool->mutex);
}
out:
@@ -171,8 +167,8 @@ static void virThreadPoolWorker(void *opaque)
else
pool->nWorkers--;
if (pool->nWorkers == 0 && pool->nPrioWorkers == 0)
- virCondSignal(&pool->quit_cond);
- virMutexUnlock(&pool->mutex);
+ g_cond_signal(&pool->quit_cond);
+ g_mutex_unlock(&pool->mutex);
}
static int
@@ -241,12 +237,9 @@ virThreadPoolNewFull(size_t minWorkers,
pool->jobName = name;
pool->jobOpaque = opaque;
- if (virMutexInit(&pool->mutex) < 0)
- goto error;
- if (virCondInit(&pool->cond) < 0)
- goto error;
- if (virCondInit(&pool->quit_cond) < 0)
- goto error;
+ g_mutex_init(&pool->mutex);
+ g_cond_init(&pool->cond);
+ g_cond_init(&pool->quit_cond);
pool->minWorkers = minWorkers;
pool->maxWorkers = maxWorkers;
@@ -256,8 +249,7 @@ virThreadPoolNewFull(size_t minWorkers,
goto error;
if (prioWorkers) {
- if (virCondInit(&pool->prioCond) < 0)
- goto error;
+ g_cond_init(&pool->prioCond);
if (virThreadPoolExpand(pool, prioWorkers, true) < 0)
goto error;
@@ -279,17 +271,17 @@ void virThreadPoolFree(virThreadPoolPtr pool)
if (!pool)
return;
- virMutexLock(&pool->mutex);
+ g_mutex_lock(&pool->mutex);
pool->quit = true;
if (pool->nWorkers > 0)
- virCondBroadcast(&pool->cond);
+ g_cond_broadcast(&pool->cond);
if (pool->nPrioWorkers > 0) {
priority = true;
- virCondBroadcast(&pool->prioCond);
+ g_cond_broadcast(&pool->prioCond);
}
while (pool->nWorkers > 0 || pool->nPrioWorkers > 0)
- ignore_value(virCondWait(&pool->quit_cond, &pool->mutex));
+ g_cond_wait(&pool->quit_cond, &pool->mutex);
while ((job = pool->jobList.head)) {
pool->jobList.head = pool->jobList.head->next;
@@ -297,13 +289,13 @@ void virThreadPoolFree(virThreadPoolPtr pool)
}
VIR_FREE(pool->workers);
- virMutexUnlock(&pool->mutex);
- virMutexDestroy(&pool->mutex);
- virCondDestroy(&pool->quit_cond);
- virCondDestroy(&pool->cond);
+ g_mutex_unlock(&pool->mutex);
+ g_mutex_clear(&pool->mutex);
+ g_cond_clear(&pool->quit_cond);
+ g_cond_clear(&pool->cond);
if (priority) {
VIR_FREE(pool->prioWorkers);
- virCondDestroy(&pool->prioCond);
+ g_cond_clear(&pool->prioCond);
}
VIR_FREE(pool);
}
@@ -311,68 +303,38 @@ void virThreadPoolFree(virThreadPoolPtr pool)
size_t virThreadPoolGetMinWorkers(virThreadPoolPtr pool)
{
- size_t ret;
-
- virMutexLock(&pool->mutex);
- ret = pool->minWorkers;
- virMutexUnlock(&pool->mutex);
-
- return ret;
+ g_autoptr(GMutexLocker) locker = g_mutex_locker_new(&pool->mutex);
+ return pool->minWorkers;
}
size_t virThreadPoolGetMaxWorkers(virThreadPoolPtr pool)
{
- size_t ret;
-
- virMutexLock(&pool->mutex);
- ret = pool->maxWorkers;
- virMutexUnlock(&pool->mutex);
-
- return ret;
+ g_autoptr(GMutexLocker) locker = g_mutex_locker_new(&pool->mutex);
+ return pool->maxWorkers;
}
size_t virThreadPoolGetPriorityWorkers(virThreadPoolPtr pool)
{
- size_t ret;
-
- virMutexLock(&pool->mutex);
- ret = pool->nPrioWorkers;
- virMutexUnlock(&pool->mutex);
-
- return ret;
+ g_autoptr(GMutexLocker) locker = g_mutex_locker_new(&pool->mutex);
+ return pool->nPrioWorkers;
}
size_t virThreadPoolGetCurrentWorkers(virThreadPoolPtr pool)
{
- size_t ret;
-
- virMutexLock(&pool->mutex);
- ret = pool->nWorkers;
- virMutexUnlock(&pool->mutex);
-
- return ret;
+ g_autoptr(GMutexLocker) locker = g_mutex_locker_new(&pool->mutex);
+ return pool->nWorkers;
}
size_t virThreadPoolGetFreeWorkers(virThreadPoolPtr pool)
{
- size_t ret;
-
- virMutexLock(&pool->mutex);
- ret = pool->freeWorkers;
- virMutexUnlock(&pool->mutex);
-
- return ret;
+ g_autoptr(GMutexLocker) locker = g_mutex_locker_new(&pool->mutex);
+ return pool->freeWorkers;
}
size_t virThreadPoolGetJobQueueDepth(virThreadPoolPtr pool)
{
- size_t ret;
-
- virMutexLock(&pool->mutex);
- ret = pool->jobQueueDepth;
- virMutexUnlock(&pool->mutex);
-
- return ret;
+ g_autoptr(GMutexLocker) locker = g_mutex_locker_new(&pool->mutex);
+ return pool->jobQueueDepth;
}
/*
@@ -384,18 +346,18 @@ int virThreadPoolSendJob(virThreadPoolPtr pool,
void *jobData)
{
virThreadPoolJobPtr job;
+ g_autoptr(GMutexLocker) locker = g_mutex_locker_new(&pool->mutex);
- virMutexLock(&pool->mutex);
if (pool->quit)
- goto error;
+ return -1;
if (pool->freeWorkers - pool->jobQueueDepth <= 0 &&
pool->nWorkers < pool->maxWorkers &&
virThreadPoolExpand(pool, 1, false) < 0)
- goto error;
+ return -1;
if (VIR_ALLOC(job) < 0)
- goto error;
+ return -1;
job->data = jobData;
job->priority = priority;
@@ -413,16 +375,11 @@ int virThreadPoolSendJob(virThreadPoolPtr pool,
pool->jobQueueDepth++;
- virCondSignal(&pool->cond);
+ g_cond_signal(&pool->cond);
if (priority)
- virCondSignal(&pool->prioCond);
+ g_cond_signal(&pool->prioCond);
- virMutexUnlock(&pool->mutex);
return 0;
-
- error:
- virMutexUnlock(&pool->mutex);
- return -1;
}
int
@@ -433,15 +390,14 @@ virThreadPoolSetParameters(virThreadPoolPtr pool,
{
size_t max;
size_t min;
-
- virMutexLock(&pool->mutex);
+ g_autoptr(GMutexLocker) locker = g_mutex_locker_new(&pool->mutex);
max = maxWorkers >= 0 ? maxWorkers : pool->maxWorkers;
min = minWorkers >= 0 ? minWorkers : pool->minWorkers;
if (min > max) {
virReportError(VIR_ERR_INVALID_ARG, "%s",
_("minWorkers cannot be larger than maxWorkers"));
- goto error;
+ return -1;
}
if ((maxWorkers == 0 && pool->maxWorkers > 0) ||
@@ -449,37 +405,32 @@ virThreadPoolSetParameters(virThreadPoolPtr pool,
virReportError(VIR_ERR_INVALID_ARG, "%s",
_("maxWorkers must not be switched from zero to
non-zero"
" and vice versa"));
- goto error;
+ return -1;
}
if (minWorkers >= 0) {
if ((size_t) minWorkers > pool->nWorkers &&
virThreadPoolExpand(pool, minWorkers - pool->nWorkers,
false) < 0)
- goto error;
+ return -1;
pool->minWorkers = minWorkers;
}
if (maxWorkers >= 0) {
pool->maxWorkers = maxWorkers;
- virCondBroadcast(&pool->cond);
+ g_cond_broadcast(&pool->cond);
}
if (prioWorkers >= 0) {
if (prioWorkers < pool->nPrioWorkers) {
- virCondBroadcast(&pool->prioCond);
+ g_cond_broadcast(&pool->prioCond);
} else if ((size_t) prioWorkers > pool->nPrioWorkers &&
virThreadPoolExpand(pool, prioWorkers - pool->nPrioWorkers,
true) < 0) {
- goto error;
+ return -1;
}
pool->maxPrioWorkers = prioWorkers;
}
- virMutexUnlock(&pool->mutex);
return 0;
-
- error:
- virMutexUnlock(&pool->mutex);
- return -1;
}
--
2.25.2