
On Thu, Dec 02, 2010 at 03:30:23PM +0800, Hu Tao wrote:
--- daemon/libvirtd.c | 172 +++++++++-------------------------------------------- daemon/libvirtd.h | 4 + 2 files changed, 33 insertions(+), 143 deletions(-)
diff --git a/daemon/libvirtd.c b/daemon/libvirtd.c index 791b3dc..dbd050a 100644 --- a/daemon/libvirtd.c +++ b/daemon/libvirtd.c @@ -67,6 +67,7 @@ #include "stream.h" #include "hooks.h" #include "virtaudit.h" +#include "threadpool.h" #ifdef HAVE_AVAHI # include "mdns.h" #endif @@ -248,7 +249,6 @@ static void sig_handler(int sig, siginfo_t * siginfo,
static void qemudDispatchClientEvent(int watch, int fd, int events, void *opaque); static void qemudDispatchServerEvent(int watch, int fd, int events, void *opaque); -static int qemudStartWorker(struct qemud_server *server, struct qemud_worker *worker);
void qemudClientMessageQueuePush(struct qemud_client_message **queue, @@ -1383,6 +1383,7 @@ static int qemudDispatchServer(struct qemud_server *server, struct qemud_socket client->auth = sock->auth; client->addr = addr; client->addrstr = addrstr; + client->server = server; addrstr = NULL;
This shouldn't be needed, as 'server' shoudl be passed into the worker function via the 'void *opaque' parameter.
for (i = 0 ; i < VIR_DOMAIN_EVENT_ID_LAST ; i++) { @@ -1458,19 +1459,6 @@ static int qemudDispatchServer(struct qemud_server *server, struct qemud_socket
server->clients[server->nclients++] = client;
- if (server->nclients > server->nactiveworkers && - server->nactiveworkers < server->nworkers) { - for (i = 0 ; i < server->nworkers ; i++) { - if (!server->workers[i].hasThread) { - if (qemudStartWorker(server, &server->workers[i]) < 0) - return -1; - server->nactiveworkers++; - break; - } - } - } - - return 0;
error: @@ -1534,100 +1522,27 @@ void qemudDispatchClientFailure(struct qemud_client *client) { VIR_FREE(client->addrstr); }
- -/* Caller must hold server lock */ -static struct qemud_client *qemudPendingJob(struct qemud_server *server) +static void qemudWorker(void *data, void *opaque ATTRIBUTE_UNUSED) { - int i; - for (i = 0 ; i < server->nclients ; i++) { - virMutexLock(&server->clients[i]->lock); - if (server->clients[i]->dx) { - /* Delibrately don't unlock client - caller wants the lock */ - return server->clients[i]; - } - virMutexUnlock(&server->clients[i]->lock); - } - return NULL; -} + struct qemud_client *client = data; + struct qemud_client_message *msg;
-static void *qemudWorker(void *data) -{ - struct qemud_worker *worker = data; - struct qemud_server *server = worker->server; + virMutexLock(&client->lock);
It is neccessary to hold the lock on 'server' before obtaining a lock on 'client'. The server lock can be released again immediately if no longer needed.
- while (1) { - struct qemud_client *client = NULL; - struct qemud_client_message *msg; + /* Remove our message from dispatch queue while we use it */ + msg = qemudClientMessageQueueServe(&client->dx);
- virMutexLock(&server->lock); - while ((client = qemudPendingJob(server)) == NULL) { - if (worker->quitRequest || - virCondWait(&server->job, &server->lock) < 0) { - virMutexUnlock(&server->lock); - return NULL; - } - } - if (worker->quitRequest) { - virMutexUnlock(&client->lock); - virMutexUnlock(&server->lock); - return NULL; - } - worker->processingCall = 1; - virMutexUnlock(&server->lock); - - /* We own a locked client now... */ - client->refs++; - - /* Remove our message from dispatch queue while we use it */ - msg = qemudClientMessageQueueServe(&client->dx); - - /* This function drops the lock during dispatch, - * and re-acquires it before returning */ - if (remoteDispatchClientRequest (server, client, msg) < 0) { - VIR_FREE(msg); - qemudDispatchClientFailure(client); - client->refs--; - virMutexUnlock(&client->lock); - continue; - } - - client->refs--; - virMutexUnlock(&client->lock); - - virMutexLock(&server->lock); - worker->processingCall = 0; - virMutexUnlock(&server->lock); - } -} - -static int qemudStartWorker(struct qemud_server *server, - struct qemud_worker *worker) { - pthread_attr_t attr; - pthread_attr_init(&attr); - /* We want to join workers, so don't detach them */ - /*pthread_attr_setdetachstate(&attr, 1);*/ - - if (worker->hasThread) - return -1; - - worker->server = server; - worker->hasThread = 1; - worker->quitRequest = 0; - worker->processingCall = 0; - - if (pthread_create(&worker->thread, - &attr, - qemudWorker, - worker) != 0) { - worker->hasThread = 0; - worker->server = NULL; - return -1; + /* This function drops the lock during dispatch, + * and re-acquires it before returning */ + if (remoteDispatchClientRequest (client->server, client, msg) < 0) { + VIR_FREE(msg); + qemudDispatchClientFailure(client); }
- return 0; + client->refs--; + virMutexUnlock(&client->lock); }
- /* * Read data into buffer using wire decoding (plain or TLS) * @@ -1857,8 +1772,11 @@ readmore: }
/* Move completed message to the end of the dispatch queue */ - if (msg) + if (msg) { + client->refs++; qemudClientMessageQueuePush(&client->dx, msg); + ignore_value(virThreadPoolSendJob(server->workerPool, client)); + } client->nrequests++;
/* Possibly need to create another receive buffer */ @@ -1870,9 +1788,6 @@ readmore: client->rx->bufferLength = REMOTE_MESSAGE_HEADER_XDR_LEN;
qemudUpdateClientEvent(client); - - /* Tell one of the workers to get on with it... */ - virCondSignal(&server->job); } } } @@ -2311,10 +2226,14 @@ static void *qemudRunLoop(void *opaque) { return NULL; }
- for (i = 0 ; i < min_workers ; i++) { - if (qemudStartWorker(server, &server->workers[i]) < 0) - goto cleanup; - server->nactiveworkers++; + server->workerPool = virThreadPoolNew(min_workers, + max_workers, + qemudWorker, + NULL);
Should pass 'server' in here, instead of NULL.
+ if (!server->workerPool) { + VIR_ERROR0(_("Failed to create thread pool")); + virMutexUnlock(&server->lock); + return NULL; }
for (;!server->quitEventThread;) {
A small change in that we no longer kill off idle worker threads, but the improved simplicity of libvirtd code makes this a worthwhile tradeoff. So looks good to me aside from the minor locking bug. Regards, Daniel