---
daemon/libvirtd.c | 187 ++++++++---------------------------------------------
daemon/libvirtd.h | 16 +----
2 files changed, 30 insertions(+), 173 deletions(-)
diff --git a/daemon/libvirtd.c b/daemon/libvirtd.c
index 791b3dc..229c0cc 100644
--- a/daemon/libvirtd.c
+++ b/daemon/libvirtd.c
@@ -67,6 +67,7 @@
#include "stream.h"
#include "hooks.h"
#include "virtaudit.h"
+#include "threadpool.h"
#ifdef HAVE_AVAHI
# include "mdns.h"
#endif
@@ -248,7 +249,6 @@ static void sig_handler(int sig, siginfo_t * siginfo,
static void qemudDispatchClientEvent(int watch, int fd, int events, void *opaque);
static void qemudDispatchServerEvent(int watch, int fd, int events, void *opaque);
-static int qemudStartWorker(struct qemud_server *server, struct qemud_worker *worker);
void
qemudClientMessageQueuePush(struct qemud_client_message **queue,
@@ -842,18 +842,10 @@ static struct qemud_server *qemudInitialize(void) {
VIR_FREE(server);
return NULL;
}
- if (virCondInit(&server->job) < 0) {
- VIR_ERROR0(_("cannot initialize condition variable"));
- virMutexDestroy(&server->lock);
- VIR_FREE(server);
- return NULL;
- }
if (virEventInit() < 0) {
VIR_ERROR0(_("Failed to initialize event system"));
virMutexDestroy(&server->lock);
- if (virCondDestroy(&server->job) < 0)
- {}
VIR_FREE(server);
return NULL;
}
@@ -1458,19 +1450,6 @@ static int qemudDispatchServer(struct qemud_server *server, struct
qemud_socket
server->clients[server->nclients++] = client;
- if (server->nclients > server->nactiveworkers &&
- server->nactiveworkers < server->nworkers) {
- for (i = 0 ; i < server->nworkers ; i++) {
- if (!server->workers[i].hasThread) {
- if (qemudStartWorker(server, &server->workers[i]) < 0)
- return -1;
- server->nactiveworkers++;
- break;
- }
- }
- }
-
-
return 0;
error:
@@ -1534,100 +1513,28 @@ void qemudDispatchClientFailure(struct qemud_client *client) {
VIR_FREE(client->addrstr);
}
-
-/* Caller must hold server lock */
-static struct qemud_client *qemudPendingJob(struct qemud_server *server)
-{
- int i;
- for (i = 0 ; i < server->nclients ; i++) {
- virMutexLock(&server->clients[i]->lock);
- if (server->clients[i]->dx) {
- /* Delibrately don't unlock client - caller wants the lock */
- return server->clients[i];
- }
- virMutexUnlock(&server->clients[i]->lock);
- }
- return NULL;
-}
-
-static void *qemudWorker(void *data)
+static void qemudWorker(void *data, void *opaque)
{
- struct qemud_worker *worker = data;
- struct qemud_server *server = worker->server;
-
- while (1) {
- struct qemud_client *client = NULL;
- struct qemud_client_message *msg;
-
- virMutexLock(&server->lock);
- while ((client = qemudPendingJob(server)) == NULL) {
- if (worker->quitRequest ||
- virCondWait(&server->job, &server->lock) < 0) {
- virMutexUnlock(&server->lock);
- return NULL;
- }
- }
- if (worker->quitRequest) {
- virMutexUnlock(&client->lock);
- virMutexUnlock(&server->lock);
- return NULL;
- }
- worker->processingCall = 1;
- virMutexUnlock(&server->lock);
-
- /* We own a locked client now... */
- client->refs++;
-
- /* Remove our message from dispatch queue while we use it */
- msg = qemudClientMessageQueueServe(&client->dx);
-
- /* This function drops the lock during dispatch,
- * and re-acquires it before returning */
- if (remoteDispatchClientRequest (server, client, msg) < 0) {
- VIR_FREE(msg);
- qemudDispatchClientFailure(client);
- client->refs--;
- virMutexUnlock(&client->lock);
- continue;
- }
-
- client->refs--;
- virMutexUnlock(&client->lock);
-
- virMutexLock(&server->lock);
- worker->processingCall = 0;
- virMutexUnlock(&server->lock);
- }
-}
-
-static int qemudStartWorker(struct qemud_server *server,
- struct qemud_worker *worker) {
- pthread_attr_t attr;
- pthread_attr_init(&attr);
- /* We want to join workers, so don't detach them */
- /*pthread_attr_setdetachstate(&attr, 1);*/
+ struct qemud_server *server = opaque;
+ struct qemud_client *client = data;
+ struct qemud_client_message *msg;
- if (worker->hasThread)
- return -1;
+ virMutexLock(&client->lock);
- worker->server = server;
- worker->hasThread = 1;
- worker->quitRequest = 0;
- worker->processingCall = 0;
+ /* Remove our message from dispatch queue while we use it */
+ msg = qemudClientMessageQueueServe(&client->dx);
- if (pthread_create(&worker->thread,
- &attr,
- qemudWorker,
- worker) != 0) {
- worker->hasThread = 0;
- worker->server = NULL;
- return -1;
+ /* This function drops the lock during dispatch,
+ * and re-acquires it before returning */
+ if (remoteDispatchClientRequest (server, client, msg) < 0) {
+ VIR_FREE(msg);
+ qemudDispatchClientFailure(client);
}
- return 0;
+ client->refs--;
+ virMutexUnlock(&client->lock);
}
-
/*
* Read data into buffer using wire decoding (plain or TLS)
*
@@ -1857,8 +1764,11 @@ readmore:
}
/* Move completed message to the end of the dispatch queue */
- if (msg)
+ if (msg) {
+ client->refs++;
qemudClientMessageQueuePush(&client->dx, msg);
+ ignore_value(virThreadPoolSendJob(server->workerPool, client));
+ }
client->nrequests++;
/* Possibly need to create another receive buffer */
@@ -1870,9 +1780,6 @@ readmore:
client->rx->bufferLength = REMOTE_MESSAGE_HEADER_XDR_LEN;
qemudUpdateClientEvent(client);
-
- /* Tell one of the workers to get on with it... */
- virCondSignal(&server->job);
}
}
}
@@ -2305,18 +2212,16 @@ static void *qemudRunLoop(void *opaque) {
if (min_workers > max_workers)
max_workers = min_workers;
- server->nworkers = max_workers;
- if (VIR_ALLOC_N(server->workers, server->nworkers) < 0) {
- VIR_ERROR0(_("Failed to allocate workers"));
+ server->workerPool = virThreadPoolNew(min_workers,
+ max_workers,
+ qemudWorker,
+ server);
+ if (!server->workerPool) {
+ VIR_ERROR0(_("Failed to create thread pool"));
+ virMutexUnlock(&server->lock);
return NULL;
}
- for (i = 0 ; i < min_workers ; i++) {
- if (qemudStartWorker(server, &server->workers[i]) < 0)
- goto cleanup;
- server->nactiveworkers++;
- }
-
for (;!server->quitEventThread;) {
/* A shutdown timeout is specified, so check
* if any drivers have active state, if not
@@ -2367,47 +2272,14 @@ static void *qemudRunLoop(void *opaque) {
goto reprocess;
}
}
-
- /* If number of active workers exceeds both the min_workers
- * threshold and the number of clients, then kill some
- * off */
- for (i = 0 ; (i < server->nworkers &&
- server->nactiveworkers > server->nclients &&
- server->nactiveworkers > min_workers) ; i++) {
-
- if (server->workers[i].hasThread &&
- !server->workers[i].processingCall) {
- server->workers[i].quitRequest = 1;
-
- virCondBroadcast(&server->job);
- virMutexUnlock(&server->lock);
- pthread_join(server->workers[i].thread, NULL);
- virMutexLock(&server->lock);
- server->workers[i].hasThread = 0;
- server->nactiveworkers--;
- }
- }
}
-
-cleanup:
- for (i = 0 ; i < server->nworkers ; i++) {
- if (!server->workers[i].hasThread)
- continue;
-
- server->workers[i].quitRequest = 1;
- virCondBroadcast(&server->job);
-
- virMutexUnlock(&server->lock);
- pthread_join(server->workers[i].thread, NULL);
- virMutexLock(&server->lock);
- server->workers[i].hasThread = 0;
- }
- VIR_FREE(server->workers);
for (i = 0; i < server->nclients; i++)
qemudFreeClient(server->clients[i]);
server->nclients = 0;
VIR_SHRINK_N(server->clients, server->nclients_max, server->nclients_max);
+ virThreadPoolFree(server->workerPool);
+ server->workerPool = NULL;
virMutexUnlock(&server->lock);
return NULL;
}
@@ -2475,9 +2347,6 @@ static void qemudCleanup(struct qemud_server *server) {
virStateCleanup();
- if (virCondDestroy(&server->job) < 0) {
- ;
- }
virMutexDestroy(&server->lock);
VIR_FREE(server);
diff --git a/daemon/libvirtd.h b/daemon/libvirtd.h
index af20e56..e4ee63b 100644
--- a/daemon/libvirtd.h
+++ b/daemon/libvirtd.h
@@ -49,6 +49,7 @@
# include "logging.h"
# include "threads.h"
# include "network.h"
+# include "threadpool.h"
# if WITH_DTRACE
# ifndef LIBVIRTD_PROBES_H
@@ -266,26 +267,13 @@ struct qemud_socket {
struct qemud_socket *next;
};
-struct qemud_worker {
- pthread_t thread;
- unsigned int hasThread :1;
- unsigned int processingCall :1;
- unsigned int quitRequest :1;
-
- /* back-pointer to our server */
- struct qemud_server *server;
-};
-
/* Main server state */
struct qemud_server {
virMutex lock;
- virCond job;
int privileged;
- size_t nworkers;
- size_t nactiveworkers;
- struct qemud_worker *workers;
+ virThreadPoolPtr workerPool;
size_t nsockets;
struct qemud_socket *sockets;
size_t nclients;
--
1.7.3
--
Thanks,
Hu Tao