---
daemon/libvirtd.c | 172 +++++++++--------------------------------------------
daemon/libvirtd.h | 2 +
2 files changed, 31 insertions(+), 143 deletions(-)
diff --git a/daemon/libvirtd.c b/daemon/libvirtd.c
index 791b3dc..effa45f 100644
--- a/daemon/libvirtd.c
+++ b/daemon/libvirtd.c
@@ -67,6 +67,7 @@
#include "stream.h"
#include "hooks.h"
#include "virtaudit.h"
+#include "threadpool.h"
#ifdef HAVE_AVAHI
# include "mdns.h"
#endif
@@ -248,7 +249,6 @@ static void sig_handler(int sig, siginfo_t * siginfo,
static void qemudDispatchClientEvent(int watch, int fd, int events, void *opaque);
static void qemudDispatchServerEvent(int watch, int fd, int events, void *opaque);
-static int qemudStartWorker(struct qemud_server *server, struct qemud_worker *worker);
void
qemudClientMessageQueuePush(struct qemud_client_message **queue,
@@ -1458,19 +1458,6 @@ static int qemudDispatchServer(struct qemud_server *server, struct
qemud_socket
server->clients[server->nclients++] = client;
- if (server->nclients > server->nactiveworkers &&
- server->nactiveworkers < server->nworkers) {
- for (i = 0 ; i < server->nworkers ; i++) {
- if (!server->workers[i].hasThread) {
- if (qemudStartWorker(server, &server->workers[i]) < 0)
- return -1;
- server->nactiveworkers++;
- break;
- }
- }
- }
-
-
return 0;
error:
@@ -1534,100 +1521,28 @@ void qemudDispatchClientFailure(struct qemud_client *client) {
VIR_FREE(client->addrstr);
}
-
-/* Caller must hold server lock */
-static struct qemud_client *qemudPendingJob(struct qemud_server *server)
-{
- int i;
- for (i = 0 ; i < server->nclients ; i++) {
- virMutexLock(&server->clients[i]->lock);
- if (server->clients[i]->dx) {
- /* Delibrately don't unlock client - caller wants the lock */
- return server->clients[i];
- }
- virMutexUnlock(&server->clients[i]->lock);
- }
- return NULL;
-}
-
-static void *qemudWorker(void *data)
+static void qemudWorker(void *data, void *opaque)
{
- struct qemud_worker *worker = data;
- struct qemud_server *server = worker->server;
-
- while (1) {
- struct qemud_client *client = NULL;
- struct qemud_client_message *msg;
-
- virMutexLock(&server->lock);
- while ((client = qemudPendingJob(server)) == NULL) {
- if (worker->quitRequest ||
- virCondWait(&server->job, &server->lock) < 0) {
- virMutexUnlock(&server->lock);
- return NULL;
- }
- }
- if (worker->quitRequest) {
- virMutexUnlock(&client->lock);
- virMutexUnlock(&server->lock);
- return NULL;
- }
- worker->processingCall = 1;
- virMutexUnlock(&server->lock);
-
- /* We own a locked client now... */
- client->refs++;
-
- /* Remove our message from dispatch queue while we use it */
- msg = qemudClientMessageQueueServe(&client->dx);
-
- /* This function drops the lock during dispatch,
- * and re-acquires it before returning */
- if (remoteDispatchClientRequest (server, client, msg) < 0) {
- VIR_FREE(msg);
- qemudDispatchClientFailure(client);
- client->refs--;
- virMutexUnlock(&client->lock);
- continue;
- }
-
- client->refs--;
- virMutexUnlock(&client->lock);
-
- virMutexLock(&server->lock);
- worker->processingCall = 0;
- virMutexUnlock(&server->lock);
- }
-}
-
-static int qemudStartWorker(struct qemud_server *server,
- struct qemud_worker *worker) {
- pthread_attr_t attr;
- pthread_attr_init(&attr);
- /* We want to join workers, so don't detach them */
- /*pthread_attr_setdetachstate(&attr, 1);*/
+ struct qemud_server *server = opaque;
+ struct qemud_client *client = data;
+ struct qemud_client_message *msg;
- if (worker->hasThread)
- return -1;
+ virMutexLock(&client->lock);
- worker->server = server;
- worker->hasThread = 1;
- worker->quitRequest = 0;
- worker->processingCall = 0;
+ /* Remove our message from dispatch queue while we use it */
+ msg = qemudClientMessageQueueServe(&client->dx);
- if (pthread_create(&worker->thread,
- &attr,
- qemudWorker,
- worker) != 0) {
- worker->hasThread = 0;
- worker->server = NULL;
- return -1;
+ /* This function drops the lock during dispatch,
+ * and re-acquires it before returning */
+ if (remoteDispatchClientRequest (server, client, msg) < 0) {
+ VIR_FREE(msg);
+ qemudDispatchClientFailure(client);
}
- return 0;
+ client->refs--;
+ virMutexUnlock(&client->lock);
}
-
/*
* Read data into buffer using wire decoding (plain or TLS)
*
@@ -1857,8 +1772,11 @@ readmore:
}
/* Move completed message to the end of the dispatch queue */
- if (msg)
+ if (msg) {
+ client->refs++;
qemudClientMessageQueuePush(&client->dx, msg);
+ ignore_value(virThreadPoolSendJob(server->workerPool, client));
+ }
client->nrequests++;
/* Possibly need to create another receive buffer */
@@ -1870,9 +1788,6 @@ readmore:
client->rx->bufferLength = REMOTE_MESSAGE_HEADER_XDR_LEN;
qemudUpdateClientEvent(client);
-
- /* Tell one of the workers to get on with it... */
- virCondSignal(&server->job);
}
}
}
@@ -2311,10 +2226,14 @@ static void *qemudRunLoop(void *opaque) {
return NULL;
}
- for (i = 0 ; i < min_workers ; i++) {
- if (qemudStartWorker(server, &server->workers[i]) < 0)
- goto cleanup;
- server->nactiveworkers++;
+ server->workerPool = virThreadPoolNew(min_workers,
+ max_workers,
+ qemudWorker,
+ server);
+ if (!server->workerPool) {
+ VIR_ERROR0(_("Failed to create thread pool"));
+ virMutexUnlock(&server->lock);
+ return NULL;
}
for (;!server->quitEventThread;) {
@@ -2367,47 +2286,14 @@ static void *qemudRunLoop(void *opaque) {
goto reprocess;
}
}
-
- /* If number of active workers exceeds both the min_workers
- * threshold and the number of clients, then kill some
- * off */
- for (i = 0 ; (i < server->nworkers &&
- server->nactiveworkers > server->nclients &&
- server->nactiveworkers > min_workers) ; i++) {
-
- if (server->workers[i].hasThread &&
- !server->workers[i].processingCall) {
- server->workers[i].quitRequest = 1;
-
- virCondBroadcast(&server->job);
- virMutexUnlock(&server->lock);
- pthread_join(server->workers[i].thread, NULL);
- virMutexLock(&server->lock);
- server->workers[i].hasThread = 0;
- server->nactiveworkers--;
- }
- }
- }
-
-cleanup:
- for (i = 0 ; i < server->nworkers ; i++) {
- if (!server->workers[i].hasThread)
- continue;
-
- server->workers[i].quitRequest = 1;
- virCondBroadcast(&server->job);
-
- virMutexUnlock(&server->lock);
- pthread_join(server->workers[i].thread, NULL);
- virMutexLock(&server->lock);
- server->workers[i].hasThread = 0;
}
- VIR_FREE(server->workers);
for (i = 0; i < server->nclients; i++)
qemudFreeClient(server->clients[i]);
server->nclients = 0;
VIR_SHRINK_N(server->clients, server->nclients_max, server->nclients_max);
+ virThreadPoolFree(server->workerPool);
+ server->workerPool = NULL;
virMutexUnlock(&server->lock);
return NULL;
}
diff --git a/daemon/libvirtd.h b/daemon/libvirtd.h
index af20e56..dfd54cd 100644
--- a/daemon/libvirtd.h
+++ b/daemon/libvirtd.h
@@ -49,6 +49,7 @@
# include "logging.h"
# include "threads.h"
# include "network.h"
+# include "threadpool.h"
# if WITH_DTRACE
# ifndef LIBVIRTD_PROBES_H
@@ -283,6 +284,7 @@ struct qemud_server {
int privileged;
+ virThreadPoolPtr workerPool;
size_t nworkers;
size_t nactiveworkers;
struct qemud_worker *workers;
--
1.7.3
--
Thanks,
Hu Tao