On Thu, Dec 02, 2010 at 03:30:23PM +0800, Hu Tao wrote:
---
daemon/libvirtd.c | 172 +++++++++--------------------------------------------
daemon/libvirtd.h | 4 +
2 files changed, 33 insertions(+), 143 deletions(-)
diff --git a/daemon/libvirtd.c b/daemon/libvirtd.c
index 791b3dc..dbd050a 100644
--- a/daemon/libvirtd.c
+++ b/daemon/libvirtd.c
@@ -67,6 +67,7 @@
#include "stream.h"
#include "hooks.h"
#include "virtaudit.h"
+#include "threadpool.h"
#ifdef HAVE_AVAHI
# include "mdns.h"
#endif
@@ -248,7 +249,6 @@ static void sig_handler(int sig, siginfo_t * siginfo,
static void qemudDispatchClientEvent(int watch, int fd, int events, void *opaque);
static void qemudDispatchServerEvent(int watch, int fd, int events, void *opaque);
-static int qemudStartWorker(struct qemud_server *server, struct qemud_worker *worker);
void
qemudClientMessageQueuePush(struct qemud_client_message **queue,
@@ -1383,6 +1383,7 @@ static int qemudDispatchServer(struct qemud_server *server, struct
qemud_socket
client->auth = sock->auth;
client->addr = addr;
client->addrstr = addrstr;
+ client->server = server;
addrstr = NULL;
This shouldn't be needed, as 'server' shoudl be passed into
the worker function via the 'void *opaque' parameter.
for (i = 0 ; i < VIR_DOMAIN_EVENT_ID_LAST ; i++) {
@@ -1458,19 +1459,6 @@ static int qemudDispatchServer(struct qemud_server *server, struct
qemud_socket
server->clients[server->nclients++] = client;
- if (server->nclients > server->nactiveworkers &&
- server->nactiveworkers < server->nworkers) {
- for (i = 0 ; i < server->nworkers ; i++) {
- if (!server->workers[i].hasThread) {
- if (qemudStartWorker(server, &server->workers[i]) < 0)
- return -1;
- server->nactiveworkers++;
- break;
- }
- }
- }
-
-
return 0;
error:
@@ -1534,100 +1522,27 @@ void qemudDispatchClientFailure(struct qemud_client *client) {
VIR_FREE(client->addrstr);
}
-
-/* Caller must hold server lock */
-static struct qemud_client *qemudPendingJob(struct qemud_server *server)
+static void qemudWorker(void *data, void *opaque ATTRIBUTE_UNUSED)
{
- int i;
- for (i = 0 ; i < server->nclients ; i++) {
- virMutexLock(&server->clients[i]->lock);
- if (server->clients[i]->dx) {
- /* Delibrately don't unlock client - caller wants the lock */
- return server->clients[i];
- }
- virMutexUnlock(&server->clients[i]->lock);
- }
- return NULL;
-}
+ struct qemud_client *client = data;
+ struct qemud_client_message *msg;
-static void *qemudWorker(void *data)
-{
- struct qemud_worker *worker = data;
- struct qemud_server *server = worker->server;
+ virMutexLock(&client->lock);
It is neccessary to hold the lock on 'server' before obtaining a
lock on 'client'. The server lock can be released again immediately
if no longer needed.
- while (1) {
- struct qemud_client *client = NULL;
- struct qemud_client_message *msg;
+ /* Remove our message from dispatch queue while we use it */
+ msg = qemudClientMessageQueueServe(&client->dx);
- virMutexLock(&server->lock);
- while ((client = qemudPendingJob(server)) == NULL) {
- if (worker->quitRequest ||
- virCondWait(&server->job, &server->lock) < 0) {
- virMutexUnlock(&server->lock);
- return NULL;
- }
- }
- if (worker->quitRequest) {
- virMutexUnlock(&client->lock);
- virMutexUnlock(&server->lock);
- return NULL;
- }
- worker->processingCall = 1;
- virMutexUnlock(&server->lock);
-
- /* We own a locked client now... */
- client->refs++;
-
- /* Remove our message from dispatch queue while we use it */
- msg = qemudClientMessageQueueServe(&client->dx);
-
- /* This function drops the lock during dispatch,
- * and re-acquires it before returning */
- if (remoteDispatchClientRequest (server, client, msg) < 0) {
- VIR_FREE(msg);
- qemudDispatchClientFailure(client);
- client->refs--;
- virMutexUnlock(&client->lock);
- continue;
- }
-
- client->refs--;
- virMutexUnlock(&client->lock);
-
- virMutexLock(&server->lock);
- worker->processingCall = 0;
- virMutexUnlock(&server->lock);
- }
-}
-
-static int qemudStartWorker(struct qemud_server *server,
- struct qemud_worker *worker) {
- pthread_attr_t attr;
- pthread_attr_init(&attr);
- /* We want to join workers, so don't detach them */
- /*pthread_attr_setdetachstate(&attr, 1);*/
-
- if (worker->hasThread)
- return -1;
-
- worker->server = server;
- worker->hasThread = 1;
- worker->quitRequest = 0;
- worker->processingCall = 0;
-
- if (pthread_create(&worker->thread,
- &attr,
- qemudWorker,
- worker) != 0) {
- worker->hasThread = 0;
- worker->server = NULL;
- return -1;
+ /* This function drops the lock during dispatch,
+ * and re-acquires it before returning */
+ if (remoteDispatchClientRequest (client->server, client, msg) < 0) {
+ VIR_FREE(msg);
+ qemudDispatchClientFailure(client);
}
- return 0;
+ client->refs--;
+ virMutexUnlock(&client->lock);
}
-
/*
* Read data into buffer using wire decoding (plain or TLS)
*
@@ -1857,8 +1772,11 @@ readmore:
}
/* Move completed message to the end of the dispatch queue */
- if (msg)
+ if (msg) {
+ client->refs++;
qemudClientMessageQueuePush(&client->dx, msg);
+ ignore_value(virThreadPoolSendJob(server->workerPool, client));
+ }
client->nrequests++;
/* Possibly need to create another receive buffer */
@@ -1870,9 +1788,6 @@ readmore:
client->rx->bufferLength = REMOTE_MESSAGE_HEADER_XDR_LEN;
qemudUpdateClientEvent(client);
-
- /* Tell one of the workers to get on with it... */
- virCondSignal(&server->job);
}
}
}
@@ -2311,10 +2226,14 @@ static void *qemudRunLoop(void *opaque) {
return NULL;
}
- for (i = 0 ; i < min_workers ; i++) {
- if (qemudStartWorker(server, &server->workers[i]) < 0)
- goto cleanup;
- server->nactiveworkers++;
+ server->workerPool = virThreadPoolNew(min_workers,
+ max_workers,
+ qemudWorker,
+ NULL);
Should pass 'server' in here, instead of NULL.
+ if (!server->workerPool) {
+ VIR_ERROR0(_("Failed to create thread pool"));
+ virMutexUnlock(&server->lock);
+ return NULL;
}
for (;!server->quitEventThread;) {
A small change in that we no longer kill off idle worker threads,
but the improved simplicity of libvirtd code makes this a worthwhile
tradeoff. So looks good to me aside from the minor locking bug.
Regards,
Daniel