On Tue, Aug 16, 2011 at 06:39:10PM +0200, Michal Privoznik wrote:
This patch annotates APIs with low or high priority.
In low set MUST be all APIs which might eventually access monitor
(and thus block indefinitely). Other APIs may be marked as high
priority. However, some must be (e.g. domainDestroy).
For high priority calls (HPC), there is new thread pool created.
HPC tries to land in usual thread pool firstly. The condition here
is it contains at least one free worker. As soon as it doesn't,
HPCs are placed into the new pool. Therefore, only those APIs which
are guaranteed to end in reasonable small amount of time can be marked
as HPC.
The size of this HPC pool is static, because HPC are expected to end
quickly, therefore jobs assigned to this pool will be served quickly.
It can be configured in libvirtd.conf via prio_workers variable.
Default is set to 5.
To mark API with low or high priority, append priority:{low|high} to
it's comment in src/remote/remote_protocol.x. This is similar to
autogen|skipgen.
diff --git a/daemon/remote.c b/daemon/remote.c
index 939044c..f6c6f35 100644
--- a/daemon/remote.c
+++ b/daemon/remote.c
@@ -464,6 +464,32 @@ int remoteClientInitHook(virNetServerPtr srv ATTRIBUTE_UNUSED,
return 0;
}
+int remoteGetProcPriority(virNetMessageHeaderPtr hdr)
+{
+ u_int prog = hdr->prog;
+ int proc = hdr->proc;
+ int *table = NULL;
+ size_t max = 0;
+
+ switch (prog) {
+ case REMOTE_PROGRAM:
+ table = remoteProcsPriority;
+ max = remoteNProcs;
+ break;
+ case QEMU_PROGRAM:
+ table = qemuProcsPriority;
+ max = qemuNProcs;
+ break;
+ }
+
+ if (!table || !max)
+ return 0;
+ if (proc >= max)
+ return 0;
+
+ return table[proc];
+}
I don't think this is the right way provide the priority information.
We already have a extensible table for information about procedures
in virnetserverprogram.h
struct _virNetServerProgramProc {
virNetServerProgramDispatchFunc func;
size_t arg_len;
xdrproc_t arg_filter;
size_t ret_len;
xdrproc_t ret_filter;
bool needAuth;
};
We should be adding 'unsigned int priority' to the struct.
diff --git a/src/rpc/gendispatch.pl b/src/rpc/gendispatch.pl
index 0d344e8..a8f9fc6 100755
--- a/src/rpc/gendispatch.pl
+++ b/src/rpc/gendispatch.pl
@@ -954,6 +970,28 @@ elsif ($opt_b) {
}
print "};\n";
print "size_t ${structprefix}NProcs =
ARRAY_CARDINALITY(${structprefix}Procs);\n";
+
+ # Now we write priority table
+
+ print "\nint ${structprefix}ProcsPriority[] = {\n";
+ for ($id = 0 ; $id <= $#calls ; $id++) {
+ if (!defined $calls[$id]) {
+ print "0, /* Unkown proc $id */\n";
+ next;
+ }
+ if ($calls[$id]->{priority}) {
+ print "1";
+ } else {
+ print "0";
+ }
+ if ($calls[$id]->{UC_NAME}) {
+ print ", /* ${procprefix}_PROC_$calls[$id]->{UC_NAME} */";
+ } else {
+ print ",";
+ }
+ print "\n";
+ }
+ print "};\n";
}
And so instead of creating a new table here, we should just be writing
the new priority field to the existing table.
# Bodies for client functions ("remote_client_bodies.h").
diff --git a/src/rpc/virnetserver.c b/src/rpc/virnetserver.c
index 1a49dbb..b8150b7 100644
--- a/src/rpc/virnetserver.c
+++ b/src/rpc/virnetserver.c
@@ -72,9 +72,12 @@ struct _virNetServer {
virMutex lock;
virThreadPoolPtr workers;
+ virThreadPoolPtr prio_workers;
bool privileged;
+ virNetServerPriorityProcFunc prio_func;
+
size_t nsignals;
virNetServerSignalPtr *signals;
int sigread;
@@ -182,6 +185,7 @@ static int virNetServerDispatchNewMessage(virNetServerClientPtr
client,
{
virNetServerPtr srv = opaque;
virNetServerJobPtr job;
+ bool priority = false;
int ret;
VIR_DEBUG("server=%p client=%p message=%p",
@@ -192,11 +196,25 @@ static int virNetServerDispatchNewMessage(virNetServerClientPtr
client,
return -1;
}
+ if (srv->prio_func)
+ priority = srv->prio_func(&msg->header);
+
job->client = client;
job->msg = msg;
virNetServerLock(srv);
- if ((ret = virThreadPoolSendJob(srv->workers, job)) < 0)
+ ret = virThreadPoolSendJob(srv->workers, priority, job);
+ if (ret < -1) {
+ goto cleanup;
+ } else if (ret == -1) {
+ /* try placing job into priority pool */
+ VIR_DEBUG("worker pool full, placing proc %d into priority pool",
+ msg->header.proc);
+ ret = virThreadPoolSendJob(srv->prio_workers, false, job);
+ }
+
+cleanup:
+ if (ret < 0)
VIR_FREE(job);
virNetServerUnlock(srv);
One thing that concerns me is that using 2 separate thread pools is
inherantly racy. eg, between the time we check for free workers in
the main pool, and place the job in the priority pool, workers in
the main pool may have become free. This can lead to imbalances
where the main pool is temporarily busy, and so a large number of
jobs get queued up in the priority pool.
I'm wondering if there is anyway we could have a single thread pool,
but some of the workers in the pool are high priority workers. So
the decision about who gets what jobs is taken when the job is served,
rather than when the job is queued. Avoiding the double queues in this
way would avoid getting into an unbalanced situation, where the priority
queue is very full but the main queue is now empty.
diff --git a/src/util/threadpool.c b/src/util/threadpool.c
index 8217591..ad2d249 100644
--- a/src/util/threadpool.c
+++ b/src/util/threadpool.c
@@ -185,27 +185,41 @@ void virThreadPoolFree(virThreadPoolPtr pool)
VIR_FREE(pool);
}
+/*
+ * @only_if_free - place job in pool iff there is
+ * a free worker(s).
+ *
+ * Return: 0 on success,
+ * -1 if no free worker available but requested
+ * -2 otherwise
+ */
int virThreadPoolSendJob(virThreadPoolPtr pool,
+ bool only_if_free,
void *jobData)
{
virThreadPoolJobPtr job;
+ int ret = -2;
virMutexLock(&pool->mutex);
if (pool->quit)
goto error;
- if (pool->freeWorkers == 0 &&
- pool->nWorkers < pool->maxWorkers) {
- if (VIR_EXPAND_N(pool->workers, pool->nWorkers, 1) < 0) {
- virReportOOMError();
- goto error;
- }
+ if (pool->freeWorkers == 0) {
+ if (pool->nWorkers < pool->maxWorkers) {
+ if (VIR_EXPAND_N(pool->workers, pool->nWorkers, 1) < 0) {
+ virReportOOMError();
+ goto error;
+ }
- if (virThreadCreate(&pool->workers[pool->nWorkers - 1],
- true,
- virThreadPoolWorker,
- pool) < 0) {
- pool->nWorkers--;
+ if (virThreadCreate(&pool->workers[pool->nWorkers - 1],
+ true,
+ virThreadPoolWorker,
+ pool) < 0) {
+ pool->nWorkers--;
+ goto error;
+ }
+ } else if (only_if_free) {
+ ret = -1;
goto error;
}
I don't think this check is correct, because it is only checking
the free workers, against the current/max workers. It is not
taking into account the possibility that there are queued jobs
which have not yet been served. So it might look like there is
a free worker, but there is already a pending job which could
consume it.
Regards,
Daniel
--
|:
http://berrange.com -o-
http://www.flickr.com/photos/dberrange/ :|
|:
http://libvirt.org -o-
http://virt-manager.org :|
|:
http://autobuild.org -o-
http://search.cpan.org/~danberr/ :|
|:
http://entangle-photo.org -o-
http://live.gnome.org/gtk-vnc :|