The libvirtd.conf file has three parameters
max_clients
min_workers
max_workers
When the daemon starts up it spawns min_workers threads. It
accepts connections from upto max_clients. I never implemented
the logic to auto-spawn more threads upto max_workers though.
This patch addresses that. Upon accept()ing a client connection
if the number of clients is greater than the number of active
worker threads, we spawn another worker, unless we've hit the
max workers limit. If the number of clients is greater than
the max_workers, this means some clients may have to wait for
other clients requests to finish before eing processed. No
great problem
This also fixes a shutdown problem. We were marking the threads
as detached, but still calling pthread_join() on them. This gives
an error on Linux, but just hangs on Solaris while it tries to
join a thread that has no intention of exiting.
So during shutdown we set a 'quit' flag on the worker, and then
broadcast a signal to wake it up from its condition variable
sleep. Upon wakup it notices the quit flag and exits, allowing
us to join & cleanup.
qemud.c | 109 +++++++++++++++++++++++++++++++++++++++++++++++++++++++---------
qemud.h | 13 +++++++
2 files changed, 106 insertions(+), 16 deletions(-)
Daniel
diff --git a/qemud/qemud.c b/qemud/qemud.c
--- a/qemud/qemud.c
+++ b/qemud/qemud.c
@@ -167,7 +167,7 @@ static void sig_handler(int sig, siginfo
static void qemudDispatchClientEvent(int watch, int fd, int events, void *opaque);
static void qemudDispatchServerEvent(int watch, int fd, int events, void *opaque);
-
+static int qemudStartWorker(struct qemud_server *server, struct qemud_worker *worker);
void
qemudClientMessageQueuePush(struct qemud_client_message **queue,
@@ -1248,6 +1248,20 @@ static int qemudDispatchServer(struct qe
server->clients[server->nclients++] = client;
+ if (server->nclients > server->nactiveworkers &&
+ server->nactiveworkers < server->nworkers) {
+ int i;
+ for (i = 0 ; i < server->nworkers ; i++) {
+ if (!server->workers[i].active) {
+ if (qemudStartWorker(server, &server->workers[i]) < 0)
+ return -1;
+ server->nactiveworkers++;
+ break;
+ }
+ }
+ }
+
+
return 0;
cleanup:
@@ -1303,19 +1317,28 @@ static struct qemud_client *qemudPending
static void *qemudWorker(void *data)
{
- struct qemud_server *server = data;
+ struct qemud_worker *worker = data;
+ struct qemud_server *server = worker->server;
while (1) {
struct qemud_client *client = NULL;
struct qemud_client_message *reply;
virMutexLock(&server->lock);
- while ((client = qemudPendingJob(server)) == NULL) {
+ while (((client = qemudPendingJob(server)) == NULL) &&
+ !worker->quit) {
if (virCondWait(&server->job, &server->lock) < 0) {
virMutexUnlock(&server->lock);
return NULL;
}
}
+ if (worker->quit) {
+ if (client)
+ virMutexUnlock(&client->lock);
+ virMutexUnlock(&server->lock);
+ return NULL;
+ }
+ worker->processing = 1;
virMutexUnlock(&server->lock);
/* We own a locked client now... */
@@ -1342,9 +1365,40 @@ static void *qemudWorker(void *data)
client->refs--;
virMutexUnlock(&client->lock);
+
+ virMutexLock(&server->lock);
+ worker->processing = 0;
+ virMutexUnlock(&server->lock);
}
}
+static int qemudStartWorker(struct qemud_server *server,
+ struct qemud_worker *worker) {
+ pthread_attr_t attr;
+ pthread_attr_init(&attr);
+ /* We want to join workers, so don't detach them */
+ /*pthread_attr_setdetachstate(&attr, 1);*/
+
+ if (worker->active)
+ return -1;
+
+ worker->server = server;
+ worker->active = 1;
+ worker->quit = 0;
+ worker->processing = 0;
+
+ if (pthread_create(&worker->thread,
+ &attr,
+ qemudWorker,
+ worker) != 0) {
+ worker->active = 0;
+ worker->server = NULL;
+ return -1;
+ }
+
+ return 0;
+}
+
/*
* Read data into buffer using wire decoding (plain or TLS)
@@ -1888,21 +1942,19 @@ static int qemudRunLoop(struct qemud_ser
virMutexLock(&server->lock);
- server->nworkers = min_workers;
+ if (min_workers > max_workers)
+ max_workers = min_workers;
+
+ server->nworkers = max_workers;
if (VIR_ALLOC_N(server->workers, server->nworkers) < 0) {
VIR_ERROR0(_("Failed to allocate workers"));
return -1;
}
- for (i = 0 ; i < server->nworkers ; i++) {
- pthread_attr_t attr;
- pthread_attr_init(&attr);
- pthread_attr_setdetachstate(&attr, 1);
-
- pthread_create(&server->workers[i],
- &attr,
- qemudWorker,
- server);
+ for (i = 0 ; i < min_workers ; i++) {
+ if (qemudStartWorker(server, &server->workers[i]) < 0)
+ goto cleanup;
+ server->nactiveworkers++;
}
for (;;) {
@@ -1948,6 +2000,26 @@ static int qemudRunLoop(struct qemud_ser
}
}
+ /* If number of active workers exceeds both the min_workers
+ * threshold and the number of clients, then kill some
+ * off */
+ for (i = 0 ; (i < server->nworkers &&
+ server->nactiveworkers > server->nclients &&
+ server->nactiveworkers > min_workers) ; i++) {
+
+ if (server->workers[i].active &&
+ !server->workers[i].processing) {
+ server->workers[i].quit = 1;
+
+ virCondBroadcast(&server->job);
+ virMutexUnlock(&server->lock);
+ pthread_join(server->workers[i].thread, NULL);
+ virMutexLock(&server->lock);
+ server->workers[i].active = 0;
+ server->nactiveworkers--;
+ }
+ }
+
/* Unregister any timeout that's active, since we
* just had an event processed
*/
@@ -1963,11 +2035,18 @@ static int qemudRunLoop(struct qemud_ser
}
}
+cleanup:
for (i = 0 ; i < server->nworkers ; i++) {
- pthread_t thread = server->workers[i];
+ if (!server->workers[i].active)
+ continue;
+
+ server->workers[i].quit = 1;
+ virCondBroadcast(&server->job);
+
virMutexUnlock(&server->lock);
- pthread_join(thread, NULL);
+ pthread_join(server->workers[i].thread, NULL);
virMutexLock(&server->lock);
+ server->workers[i].active = 0;
}
VIR_FREE(server->workers);
diff --git a/qemud/qemud.h b/qemud/qemud.h
--- a/qemud/qemud.h
+++ b/qemud/qemud.h
@@ -159,13 +159,24 @@ struct qemud_socket {
struct qemud_socket *next;
};
+struct qemud_worker {
+ pthread_t thread;
+ int active :1;
+ int processing :1;
+ int quit : 1;
+
+ /* back-pointer to our server */
+ struct qemud_server *server;
+};
+
/* Main server state */
struct qemud_server {
virMutex lock;
virCond job;
int nworkers;
- pthread_t *workers;
+ int nactiveworkers;
+ struct qemud_worker *workers;
int nsockets;
struct qemud_socket *sockets;
int nclients;
--
|: Red Hat, Engineering, London -o-
http://people.redhat.com/berrange/ :|
|:
http://libvirt.org -o-
http://virt-manager.org -o-
http://ovirt.org :|
|:
http://autobuild.org -o-
http://search.cpan.org/~danberr/ :|
|: GnuPG: 7D3B9505 -o- F3C9 553F A1DA 4AC2 5648 23C1 B3DF F742 7D3B 9505 :|