On 23.08.2011 14:23, Daniel P. Berrange wrote:
On Tue, Aug 16, 2011 at 06:39:10PM +0200, Michal Privoznik wrote:
> This patch annotates APIs with low or high priority.
> In low set MUST be all APIs which might eventually access monitor
> (and thus block indefinitely). Other APIs may be marked as high
> priority. However, some must be (e.g. domainDestroy).
>
> For high priority calls (HPC), there is new thread pool created.
> HPC tries to land in usual thread pool firstly. The condition here
> is it contains at least one free worker. As soon as it doesn't,
> HPCs are placed into the new pool. Therefore, only those APIs which
> are guaranteed to end in reasonable small amount of time can be marked
> as HPC.
>
> The size of this HPC pool is static, because HPC are expected to end
> quickly, therefore jobs assigned to this pool will be served quickly.
> It can be configured in libvirtd.conf via prio_workers variable.
> Default is set to 5.
>
> To mark API with low or high priority, append priority:{low|high} to
> it's comment in src/remote/remote_protocol.x. This is similar to
> autogen|skipgen.
> diff --git a/daemon/remote.c b/daemon/remote.c
> index 939044c..f6c6f35 100644
> --- a/daemon/remote.c
> +++ b/daemon/remote.c
> @@ -464,6 +464,32 @@ int remoteClientInitHook(virNetServerPtr srv ATTRIBUTE_UNUSED,
> return 0;
> }
>
> +int remoteGetProcPriority(virNetMessageHeaderPtr hdr)
> +{
> + u_int prog = hdr->prog;
> + int proc = hdr->proc;
> + int *table = NULL;
> + size_t max = 0;
> +
> + switch (prog) {
> + case REMOTE_PROGRAM:
> + table = remoteProcsPriority;
> + max = remoteNProcs;
> + break;
> + case QEMU_PROGRAM:
> + table = qemuProcsPriority;
> + max = qemuNProcs;
> + break;
> + }
> +
> + if (!table || !max)
> + return 0;
> + if (proc >= max)
> + return 0;
> +
> + return table[proc];
> +}
I don't think this is the right way provide the priority information.
We already have a extensible table for information about procedures
in virnetserverprogram.h
struct _virNetServerProgramProc {
virNetServerProgramDispatchFunc func;
size_t arg_len;
xdrproc_t arg_filter;
size_t ret_len;
xdrproc_t ret_filter;
bool needAuth;
};
We should be adding 'unsigned int priority' to the struct.
> diff --git a/src/rpc/gendispatch.pl b/src/rpc/gendispatch.pl
> index 0d344e8..a8f9fc6 100755
> --- a/src/rpc/gendispatch.pl
> +++ b/src/rpc/gendispatch.pl
> @@ -954,6 +970,28 @@ elsif ($opt_b) {
> }
> print "};\n";
> print "size_t ${structprefix}NProcs =
ARRAY_CARDINALITY(${structprefix}Procs);\n";
> +
> + # Now we write priority table
> +
> + print "\nint ${structprefix}ProcsPriority[] = {\n";
> + for ($id = 0 ; $id <= $#calls ; $id++) {
> + if (!defined $calls[$id]) {
> + print "0, /* Unkown proc $id */\n";
> + next;
> + }
> + if ($calls[$id]->{priority}) {
> + print "1";
> + } else {
> + print "0";
> + }
> + if ($calls[$id]->{UC_NAME}) {
> + print ", /* ${procprefix}_PROC_$calls[$id]->{UC_NAME}
*/";
> + } else {
> + print ",";
> + }
> + print "\n";
> + }
> + print "};\n";
> }
And so instead of creating a new table here, we should just be writing
the new priority field to the existing table.
> # Bodies for client functions ("remote_client_bodies.h").
> diff --git a/src/rpc/virnetserver.c b/src/rpc/virnetserver.c
> index 1a49dbb..b8150b7 100644
> --- a/src/rpc/virnetserver.c
> +++ b/src/rpc/virnetserver.c
> @@ -72,9 +72,12 @@ struct _virNetServer {
> virMutex lock;
>
> virThreadPoolPtr workers;
> + virThreadPoolPtr prio_workers;
>
> bool privileged;
>
> + virNetServerPriorityProcFunc prio_func;
> +
> size_t nsignals;
> virNetServerSignalPtr *signals;
> int sigread;
> @@ -182,6 +185,7 @@ static int virNetServerDispatchNewMessage(virNetServerClientPtr
client,
> {
> virNetServerPtr srv = opaque;
> virNetServerJobPtr job;
> + bool priority = false;
> int ret;
>
> VIR_DEBUG("server=%p client=%p message=%p",
> @@ -192,11 +196,25 @@ static int virNetServerDispatchNewMessage(virNetServerClientPtr
client,
> return -1;
> }
>
> + if (srv->prio_func)
> + priority = srv->prio_func(&msg->header);
> +
> job->client = client;
> job->msg = msg;
>
> virNetServerLock(srv);
> - if ((ret = virThreadPoolSendJob(srv->workers, job)) < 0)
> + ret = virThreadPoolSendJob(srv->workers, priority, job);
> + if (ret < -1) {
> + goto cleanup;
> + } else if (ret == -1) {
> + /* try placing job into priority pool */
> + VIR_DEBUG("worker pool full, placing proc %d into priority pool",
> + msg->header.proc);
> + ret = virThreadPoolSendJob(srv->prio_workers, false, job);
> + }
> +
> +cleanup:
> + if (ret < 0)
> VIR_FREE(job);
> virNetServerUnlock(srv);
>
One thing that concerns me is that using 2 separate thread pools is
inherantly racy. eg, between the time we check for free workers in
the main pool, and place the job in the priority pool, workers in
the main pool may have become free. This can lead to imbalances
where the main pool is temporarily busy, and so a large number of
jobs get queued up in the priority pool.
I don't see why this is a problem. I mean - that's the purpose of high
prio pool, isn't it? As soon as main pool will contain a free worker,
HPC is sent to it instead of high prio pool. And because HPC are
short-term calls, high prio queue will empty quickly.
I'm wondering if there is anyway we could have a single thread
pool,
but some of the workers in the pool are high priority workers. So
the decision about who gets what jobs is taken when the job is served,
rather than when the job is queued. Avoiding the double queues in this
way would avoid getting into an unbalanced situation, where the priority
queue is very full but the main queue is now empty.
> diff --git a/src/util/threadpool.c b/src/util/threadpool.c
> index 8217591..ad2d249 100644
> --- a/src/util/threadpool.c
> +++ b/src/util/threadpool.c
> @@ -185,27 +185,41 @@ void virThreadPoolFree(virThreadPoolPtr pool)
> VIR_FREE(pool);
> }
>
> +/*
> + * @only_if_free - place job in pool iff there is
> + * a free worker(s).
> + *
> + * Return: 0 on success,
> + * -1 if no free worker available but requested
> + * -2 otherwise
> + */
> int virThreadPoolSendJob(virThreadPoolPtr pool,
> + bool only_if_free,
> void *jobData)
> {
> virThreadPoolJobPtr job;
> + int ret = -2;
>
> virMutexLock(&pool->mutex);
> if (pool->quit)
> goto error;
>
> - if (pool->freeWorkers == 0 &&
> - pool->nWorkers < pool->maxWorkers) {
> - if (VIR_EXPAND_N(pool->workers, pool->nWorkers, 1) < 0) {
> - virReportOOMError();
> - goto error;
> - }
> + if (pool->freeWorkers == 0) {
> + if (pool->nWorkers < pool->maxWorkers) {
> + if (VIR_EXPAND_N(pool->workers, pool->nWorkers, 1) < 0) {
> + virReportOOMError();
> + goto error;
> + }
>
> - if (virThreadCreate(&pool->workers[pool->nWorkers - 1],
> - true,
> - virThreadPoolWorker,
> - pool) < 0) {
> - pool->nWorkers--;
> + if (virThreadCreate(&pool->workers[pool->nWorkers - 1],
> + true,
> + virThreadPoolWorker,
> + pool) < 0) {
> + pool->nWorkers--;
> + goto error;
> + }
> + } else if (only_if_free) {
> + ret = -1;
> goto error;
> }
I don't think this check is correct, because it is only checking
the free workers, against the current/max workers. It is not
taking into account the possibility that there are queued jobs
which have not yet been served. So it might look like there is
a free worker, but there is already a pending job which could
consume it.
I don't agree. Currently we allow expanding of pool only when placing a
job into pool. During placing, pool is locked, so freeWorkers variable
cannot change. Assume freeWorkers == 0; As soon as we can't expand the
pool, job will wait on queue. So we may decide if we want to place job
or not. If freeWorkers is not zero, there can't be any job on the queue,
because it would be immediately taken by a free worker.
What can be a problem though is when job finishes after we locked pool.
In this case, freeWorkers will not get increased so we may do bad
decision. But I don't see a simple way how this can be solved.
Regards,
Daniel