On Wed, Aug 24, 2011 at 10:12:36PM +0100, Daniel P. Berrange wrote:
On Tue, Aug 23, 2011 at 05:53:35PM +0200, Michal Privoznik wrote:
> On 23.08.2011 14:23, Daniel P. Berrange wrote:
> > On Tue, Aug 16, 2011 at 06:39:10PM +0200, Michal Privoznik wrote:
> >> diff --git a/src/util/threadpool.c b/src/util/threadpool.c
> >> index 8217591..ad2d249 100644
> >> --- a/src/util/threadpool.c
> >> +++ b/src/util/threadpool.c
> >> @@ -185,27 +185,41 @@ void virThreadPoolFree(virThreadPoolPtr pool)
> >> VIR_FREE(pool);
> >> }
> >>
> >> +/*
> >> + * @only_if_free - place job in pool iff there is
> >> + * a free worker(s).
> >> + *
> >> + * Return: 0 on success,
> >> + * -1 if no free worker available but requested
> >> + * -2 otherwise
> >> + */
> >> int virThreadPoolSendJob(virThreadPoolPtr pool,
> >> + bool only_if_free,
> >> void *jobData)
> >> {
> >> virThreadPoolJobPtr job;
> >> + int ret = -2;
> >>
> >> virMutexLock(&pool->mutex);
> >> if (pool->quit)
> >> goto error;
> >>
> >> - if (pool->freeWorkers == 0 &&
> >> - pool->nWorkers < pool->maxWorkers) {
> >> - if (VIR_EXPAND_N(pool->workers, pool->nWorkers, 1) < 0)
{
> >> - virReportOOMError();
> >> - goto error;
> >> - }
> >> + if (pool->freeWorkers == 0) {
If we added another counter 'pool->jobQueueDepth', changed whenever a
job is added or remove from the queue, then I think we could get the
correct semantics by doing
if ((pool->freeWorkers - pool->jobQueueDepth) <= 0) {
...
}
> >> + if (pool->nWorkers <
pool->maxWorkers) {
> >> + if (VIR_EXPAND_N(pool->workers, pool->nWorkers, 1) <
0) {
> >> + virReportOOMError();
> >> + goto error;
> >> + }
> >>
> >> - if (virThreadCreate(&pool->workers[pool->nWorkers - 1],
> >> - true,
> >> - virThreadPoolWorker,
> >> - pool) < 0) {
> >> - pool->nWorkers--;
> >> + if (virThreadCreate(&pool->workers[pool->nWorkers -
1],
> >> + true,
> >> + virThreadPoolWorker,
> >> + pool) < 0) {
> >> + pool->nWorkers--;
> >> + goto error;
> >> + }
> >> + } else if (only_if_free) {
> >> + ret = -1;
> >> goto error;
> >> }
> >
> > I don't think this check is correct, because it is only checking
> > the free workers, against the current/max workers. It is not
> > taking into account the possibility that there are queued jobs
> > which have not yet been served. So it might look like there is
> > a free worker, but there is already a pending job which could
> > consume it.
>
> I don't agree. Currently we allow expanding of pool only when placing a
> job into pool. During placing, pool is locked, so freeWorkers variable
> cannot change. Assume freeWorkers == 0; As soon as we can't expand the
> pool, job will wait on queue. So we may decide if we want to place job
> or not. If freeWorkers is not zero, there can't be any job on the queue,
> because it would be immediately taken by a free worker.
The problem I'm seeing involves a sequence of two calls to
virThreadPoolSendJob(). Most of the time we will expect
a liner sequence of virThreadPoolSendJob calls, which will
result in something like this set of steps:
* Initial condition: freeWorkers==1, nWorkers==1, maxWorkers==5, queuedJobs==0
* Thread 1 calls virThreadPoolSendJob()
1. Acquires lock
2. Queues job
3. Notifies condition
4. Releases lock
* Current condition: freeWorkers==1, nWorkers==1, maxWorkers==5, queuedJobs==1
* Thread 2 is a worker thread
1. Woken up from condition wait
2. Acquires lock
3. Decrements freeWorkers
5. Releases lock
6. Starts processing job
* Current condition: freeWorkers==0, nWorkers==1, maxWorkers==5, queuedJobs==0
* Thread 3 calls virThreadPoolSendJob()
1. Acquires lock
2. Spawns new worker thread
3. Queues job
4. Notifies condition
5. Releases lock
* Current condition: freeWorkers==0, nWorkers==2, maxWorkers==5, queuedJobs==1
* Thread 4 is a new worker thread
1. Woken up from condition wait
2. Acquires lock
3. Decrements freeWorkers
5. Releases lock
6. Starts processing job
* Final condition: freeWorkers==0, nWorkers==2, maxWorkers==5, queuedJobs==0
But now consider what happens if two calls to virThreadPoolSendJob
arrive concurrently. It is possible thread 3 will acquire the lock
before thread 2 does. This results in the following set of steps:
* Initial condition: freeWorkers==1, nWorkers==1, maxWorkers==5, queuedJobs==0
* Thread 1 calls virThreadPoolSendJob()
1. Acquires lock
2. Queues job
3. Notifies condition
4. Releases lock
* Current condition: freeWorkers==1, nWorkers==1, maxWorkers==5, queuedJobs==1
* Thread 3 calls virThreadPoolSendJob()
1. Acquires lock
2. Queues job
3. Notifies condition
4. Releases lock
* Current condition: freeWorkers==1, nWorkers==1, maxWorkers==5, queuedJobs==2
* Thread 2 is a worker thread
1. Woken up from condition wait
2. Acquires lock
3. Decrements freeWorkers
5. Releases lock
6. Starts processing job
* Final condition: freeWorkers==0, nWorkers==1, maxWorkers==5, queuedJobs==1
So we end up with 2 jobs and only one worker thread to process
them. If that second job was a high priority job, it would be
queued in the normal queue because 'freeWorkers' was still 1,
even though there was a job in the queue ahead of it, so the
effective 'freeWorkers' was 0.
This is actually an existing bug in the virThreadPool code for queuing
jobs, but it was harmless. When we start to differentiate between low
and high priority jobs, then this bug becomes active and can result in
a high priority job being blocked by low priority ones.
Daniel
--
|:
http://berrange.com -o-
http://www.flickr.com/photos/dberrange/ :|
|:
http://libvirt.org -o-
http://virt-manager.org :|
|:
http://autobuild.org -o-
http://search.cpan.org/~danberr/ :|
|:
http://entangle-photo.org -o-
http://live.gnome.org/gtk-vnc :|