[libvirt] [PATCH v4 RESEND 0/4] Support of auto-dump on watchdog event in libvirtd

This patch series adds a new watchdog action `dump' which lets libvirtd can do auto-dump when receiving a watchdog event from qemu guest. In order to make the function work, there must be a watchdog device added to guest, and guest must have a watchdog daemon running, for example, /etc/init.d/watchdog start or auto-started on boot. Changes: v4: - updated threadpool to follow libvirt naming style, use appropriate internals APIs, and hide the struct definitions from the header (by Daniel) - fix an error that qemuDomainObjBeginJobWithDriver() get lost in qemuDomainCoreDump() - use thread pool in libvirtd (qemud worker) v3: - let default auto-dump dir be /var/lib/libvirt/qemu/dump Hu Tao (4): threadpool impl Add a new function doCoreDump Add a watchdog action `dump' Using threadpool API to manage qemud worker cfg.mk | 1 + daemon/libvirtd.c | 172 ++++++---------------------------- daemon/libvirtd.h | 4 + src/Makefile.am | 1 + src/conf/domain_conf.c | 1 + src/conf/domain_conf.h | 1 + src/libvirt_private.syms | 6 + src/qemu/qemu.conf | 5 + src/qemu/qemu_conf.c | 16 +++- src/qemu/qemu_conf.h | 5 + src/qemu/qemu_driver.c | 227 +++++++++++++++++++++++++++++++++----------- src/util/threadpool.c | 235 ++++++++++++++++++++++++++++++++++++++++++++++ src/util/threadpool.h | 49 ++++++++++ 13 files changed, 521 insertions(+), 202 deletions(-) create mode 100644 src/util/threadpool.c create mode 100644 src/util/threadpool.h -- 1.7.3 -- Thanks, Hu Tao

* src/util/threadpool.c, src/util/threadpool.h: Thread pool implementation * src/Makefile.am: Build thread pool * src/libvirt_private.syms: Export public functions --- cfg.mk | 1 + src/Makefile.am | 1 + src/libvirt_private.syms | 6 + src/util/threadpool.c | 235 ++++++++++++++++++++++++++++++++++++++++++++++ src/util/threadpool.h | 49 ++++++++++ 5 files changed, 292 insertions(+), 0 deletions(-) create mode 100644 src/util/threadpool.c create mode 100644 src/util/threadpool.h diff --git a/cfg.mk b/cfg.mk index 5576ecb..e4ee763 100644 --- a/cfg.mk +++ b/cfg.mk @@ -127,6 +127,7 @@ useless_free_options = \ --name=virStoragePoolObjFree \ --name=virStoragePoolSourceFree \ --name=virStorageVolDefFree \ + --name=virThreadPoolFree \ --name=xmlFree \ --name=xmlXPathFreeContext \ --name=xmlXPathFreeObject diff --git a/src/Makefile.am b/src/Makefile.am index a9a1986..d71c644 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -73,6 +73,7 @@ UTIL_SOURCES = \ util/threads.c util/threads.h \ util/threads-pthread.h \ util/threads-win32.h \ + util/threadpool.c util/threadpool.h \ util/uuid.c util/uuid.h \ util/util.c util/util.h \ util/xml.c util/xml.h \ diff --git a/src/libvirt_private.syms b/src/libvirt_private.syms index f251c94..70c68cb 100644 --- a/src/libvirt_private.syms +++ b/src/libvirt_private.syms @@ -861,3 +861,9 @@ virXPathStringLimit; virXPathULong; virXPathULongHex; virXPathULongLong; + + +# threadpool.h +virThreadPoolNew; +virThreadPoolFree; +virThreadPoolSendJob; diff --git a/src/util/threadpool.c b/src/util/threadpool.c new file mode 100644 index 0000000..a5f24c2 --- /dev/null +++ b/src/util/threadpool.c @@ -0,0 +1,235 @@ +/* + * threadpool.c: a generic thread pool implementation + * + * Copyright (C) 2010 Hu Tao + * Copyright (C) 2010 Daniel P. Berrange + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + * + * Authors: + * Hu Tao <hutao@cn.fujitsu.com> + * Daniel P. Berrange <berrange@redhat.com> + */ + +#include <config.h> + +#include "threadpool.h" +#include "memory.h" +#include "threads.h" +#include "virterror_internal.h" +#include "ignore-value.h" + +#define VIR_FROM_THIS VIR_FROM_NONE + +typedef struct _virThreadPoolJob virThreadPoolJob; +typedef virThreadPoolJob *virThreadPoolJobPtr; + +struct _virThreadPoolJob { + virThreadPoolJobPtr next; + + void *data; +}; + +typedef struct _virThreadPoolJobList virThreadPoolJobList; +typedef virThreadPoolJobList *virThreadPoolJobListPtr; + +struct _virThreadPoolJobList { + virThreadPoolJobPtr head; + virThreadPoolJobPtr *tail; +}; + + +struct _virThreadPool { + bool quit; + + virThreadPoolJobFunc jobFunc; + void *jobOpaque; + virThreadPoolJobList jobList; + + virMutex mutex; + virCond cond; + virCond quit_cond; + + size_t maxWorkers; + size_t freeWorkers; + size_t nWorkers; + virThreadPtr workers; +}; + +static void virThreadPoolWorker(void *opaque) +{ + virThreadPoolPtr pool = opaque; + + virMutexLock(&pool->mutex); + + while (1) { + while (!pool->quit && + !pool->jobList.head) { + pool->freeWorkers++; + if (virCondWait(&pool->cond, &pool->mutex) < 0) { + pool->freeWorkers--; + goto out; + } + pool->freeWorkers--; + } + + if (pool->quit) + break; + + virThreadPoolJobPtr job = pool->jobList.head; + pool->jobList.head = pool->jobList.head->next; + job->next = NULL; + if (pool->jobList.tail == &job->next) + pool->jobList.tail = &pool->jobList.head; + + virMutexUnlock(&pool->mutex); + (pool->jobFunc)(job->data, pool->jobOpaque); + VIR_FREE(job); + virMutexLock(&pool->mutex); + } + +out: + pool->nWorkers--; + if (pool->nWorkers == 0) + virCondSignal(&pool->quit_cond); + virMutexUnlock(&pool->mutex); +} + +virThreadPoolPtr virThreadPoolNew(size_t minWorkers, + size_t maxWorkers, + virThreadPoolJobFunc func, + void *opaque) +{ + virThreadPoolPtr pool; + size_t i; + + if (minWorkers > maxWorkers) + minWorkers = maxWorkers; + + if (VIR_ALLOC(pool) < 0) { + virReportOOMError(); + return NULL; + } + + pool->jobList.head = NULL; + pool->jobList.tail = &pool->jobList.head; + + pool->jobFunc = func; + pool->jobOpaque = opaque; + + if (virMutexInit(&pool->mutex) < 0) + goto error; + if (virCondInit(&pool->cond) < 0) + goto error; + if (virCondInit(&pool->quit_cond) < 0) + goto error; + + if (VIR_ALLOC_N(pool->workers, minWorkers) < 0) + goto error; + + pool->maxWorkers = maxWorkers; + for (i = 0; i < minWorkers; i++) { + if (virThreadCreate(&pool->workers[i], + true, + virThreadPoolWorker, + pool) < 0) { + virThreadPoolFree(pool); + return NULL; + } + pool->nWorkers++; + } + + return pool; + +error: + VIR_FREE(pool->workers); + ignore_value(virCondDestroy(&pool->quit_cond)); + ignore_value(virCondDestroy(&pool->cond)); + virMutexDestroy(&pool->mutex); + return NULL; + +} + +void virThreadPoolFree(virThreadPoolPtr pool) +{ + virThreadPoolJobPtr job; + + if (!pool) + return; + + virMutexLock(&pool->mutex); + pool->quit = true; + if (pool->nWorkers > 0) { + virCondBroadcast(&pool->cond); + ignore_value(virCondWait(&pool->quit_cond, &pool->mutex)); + } + + while ((job = pool->jobList.head)) { + pool->jobList.head = pool->jobList.head->next; + VIR_FREE(job); + } + + VIR_FREE(pool->workers); + virMutexUnlock(&pool->mutex); + virMutexDestroy(&pool->mutex); + ignore_value(virCondDestroy(&pool->quit_cond)); + ignore_value(virCondDestroy(&pool->cond)); + VIR_FREE(pool); +} + +int virThreadPoolSendJob(virThreadPoolPtr pool, + void *jobData) +{ + virThreadPoolJobPtr job; + + virMutexLock(&pool->mutex); + if (pool->quit) + goto error; + + if (pool->freeWorkers == 0 && + pool->nWorkers < pool->maxWorkers) { + if (VIR_EXPAND_N(pool->workers, pool->nWorkers, 1) < 0) { + virReportOOMError(); + goto error; + } + + if (virThreadCreate(&pool->workers[pool->nWorkers - 1], + true, + virThreadPoolWorker, + pool) < 0) { + pool->nWorkers--; + goto error; + } + } + + if (VIR_ALLOC(job) < 0) { + virReportOOMError(); + goto error; + } + + job->data = jobData; + job->next = NULL; + *pool->jobList.tail = job; + pool->jobList.tail = &(*pool->jobList.tail)->next; + + virCondSignal(&pool->cond); + virMutexUnlock(&pool->mutex); + + return 0; + +error: + virMutexUnlock(&pool->mutex); + return -1; +} diff --git a/src/util/threadpool.h b/src/util/threadpool.h new file mode 100644 index 0000000..9ff27ec --- /dev/null +++ b/src/util/threadpool.h @@ -0,0 +1,49 @@ +/* + * threadpool.h: a generic thread pool implementation + * + * Copyright (C) 2010 Hu Tao + * Copyright (C) 2010 Daniel P. Berrange + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + * + * Author: + * Hu Tao <hutao@cn.fujitsu.com> + * Daniel P. Berrange <berrange@redhat.com> + */ + +#ifndef __VIR_THREADPOOL_H__ +#define __VIR_THREADPOOL_H__ + +#include "threads.h" + +typedef struct _virThreadPool virThreadPool; +typedef virThreadPool *virThreadPoolPtr; + +typedef void (*virThreadPoolJobFunc)(void *jobdata, void *opaque); + +virThreadPoolPtr virThreadPoolNew(size_t minWorkers, + size_t maxWorkers, + virThreadPoolJobFunc func, + void *opaque) ATTRIBUTE_NONNULL(3) + ATTRIBUTE_RETURN_CHECK; + +void virThreadPoolFree(virThreadPoolPtr pool); + +int virThreadPoolSendJob(virThreadPoolPtr pool, + void *jobdata) ATTRIBUTE_NONNULL(1) + ATTRIBUTE_NONNULL(2) + ATTRIBUTE_RETURN_CHECK; + +#endif -- 1.7.3 -- Thanks, Hu Tao

On Thu, Dec 02, 2010 at 03:26:57PM +0800, Hu Tao wrote:
+virThreadPoolPtr virThreadPoolNew(size_t minWorkers, + size_t maxWorkers, + virThreadPoolJobFunc func, + void *opaque) +{ + virThreadPoolPtr pool; + size_t i; + + if (minWorkers > maxWorkers) + minWorkers = maxWorkers; + + if (VIR_ALLOC(pool) < 0) { + virReportOOMError(); + return NULL; + } + + pool->jobList.head = NULL; + pool->jobList.tail = &pool->jobList.head; + + pool->jobFunc = func; + pool->jobOpaque = opaque; + + if (virMutexInit(&pool->mutex) < 0) + goto error; + if (virCondInit(&pool->cond) < 0) + goto error; + if (virCondInit(&pool->quit_cond) < 0) + goto error; + + if (VIR_ALLOC_N(pool->workers, minWorkers) < 0) + goto error; + + pool->maxWorkers = maxWorkers; + for (i = 0; i < minWorkers; i++) { + if (virThreadCreate(&pool->workers[i], + true, + virThreadPoolWorker, + pool) < 0) { + virThreadPoolFree(pool); + return NULL; + } + pool->nWorkers++; + } + + return pool; + +error: + VIR_FREE(pool->workers); + ignore_value(virCondDestroy(&pool->quit_cond)); + ignore_value(virCondDestroy(&pool->cond)); + virMutexDestroy(&pool->mutex); + return NULL; +}
This is leaking 'pool' on error. IMHO it is preferrable to just call virThreadPoolDestroy, otherwise anytime we add another field to virThreadPoolPtr struct, we have to consider updating 2 cleanup paths.
+ +void virThreadPoolFree(virThreadPoolPtr pool) +{ + virThreadPoolJobPtr job; + + if (!pool) + return; + + virMutexLock(&pool->mutex); + pool->quit = true; + if (pool->nWorkers > 0) { + virCondBroadcast(&pool->cond); + ignore_value(virCondWait(&pool->quit_cond, &pool->mutex)); + } + + while ((job = pool->jobList.head)) { + pool->jobList.head = pool->jobList.head->next; + VIR_FREE(job); + } + + VIR_FREE(pool->workers); + virMutexUnlock(&pool->mutex); + virMutexDestroy(&pool->mutex); + ignore_value(virCondDestroy(&pool->quit_cond)); + ignore_value(virCondDestroy(&pool->cond)); + VIR_FREE(pool); +} + +int virThreadPoolSendJob(virThreadPoolPtr pool, + void *jobData) +{ + virThreadPoolJobPtr job; + + virMutexLock(&pool->mutex); + if (pool->quit) + goto error; + + if (pool->freeWorkers == 0 && + pool->nWorkers < pool->maxWorkers) { + if (VIR_EXPAND_N(pool->workers, pool->nWorkers, 1) < 0) { + virReportOOMError(); + goto error; + } + + if (virThreadCreate(&pool->workers[pool->nWorkers - 1], + true, + virThreadPoolWorker, + pool) < 0) { + pool->nWorkers--; + goto error; + }
Small typo, that check should "!= NULL", rather than "< 0".
+ } + + if (VIR_ALLOC(job) < 0) { + virReportOOMError(); + goto error; + } + + job->data = jobData; + job->next = NULL; + *pool->jobList.tail = job; + pool->jobList.tail = &(*pool->jobList.tail)->next; + + virCondSignal(&pool->cond); + virMutexUnlock(&pool->mutex); + + return 0; + +error: + virMutexUnlock(&pool->mutex); + return -1; +} diff --git a/src/util/threadpool.h b/src/util/threadpool.h new file mode 100644 index 0000000..9ff27ec --- /dev/null +++ b/src/util/threadpool.h @@ -0,0 +1,49 @@ +#ifndef __VIR_THREADPOOL_H__ +#define __VIR_THREADPOOL_H__ + +#include "threads.h"
There's no need to include threads.h here since no virThread stuff is exposed in the API. Just use internal.h instead.
+ +typedef struct _virThreadPool virThreadPool; +typedef virThreadPool *virThreadPoolPtr; + +typedef void (*virThreadPoolJobFunc)(void *jobdata, void *opaque); + +virThreadPoolPtr virThreadPoolNew(size_t minWorkers, + size_t maxWorkers, + virThreadPoolJobFunc func, + void *opaque) ATTRIBUTE_NONNULL(3) + ATTRIBUTE_RETURN_CHECK;
ATTRIBUTE_RETURN_CHECK doesn't serve any useful purpose when placed on constructors, since the caller will always "use" the return value by assigning the pointer to some variable. The compiler can thus never detect whether you check for null or not, even with this annotation.
+void virThreadPoolFree(virThreadPoolPtr pool); + +int virThreadPoolSendJob(virThreadPoolPtr pool, + void *jobdata) ATTRIBUTE_NONNULL(1) + ATTRIBUTE_NONNULL(2) + ATTRIBUTE_RETURN_CHECK;
Regards, Daniel

On 12/02/2010 05:28 AM, Daniel P. Berrange wrote:
+virThreadPoolPtr virThreadPoolNew(size_t minWorkers, + size_t maxWorkers, + virThreadPoolJobFunc func, + void *opaque) ATTRIBUTE_NONNULL(3) + ATTRIBUTE_RETURN_CHECK;
ATTRIBUTE_RETURN_CHECK doesn't serve any useful purpose when placed on constructors, since the caller will always "use" the return value by assigning the pointer to some variable. The compiler can thus never detect whether you check for null or not, even with this annotation.
Good point. However, in looking through gcc's documentation, maybe it's time we introduce a new attribute for constructors: #define ATTRIBUTE_MALLOC __attribute__((__malloc__)) The `malloc' attribute is used to tell the compiler that a function may be treated as if any non-`NULL' pointer it returns cannot alias any other pointer valid when the function returns. This will often improve optimization. Standard functions with this property include `malloc' and `calloc'. `realloc'-like functions have this property as long as the old pointer is never referred to (including comparing it to the new pointer) after the function returns a non-`NULL' value. I think that tools like clang might also be able to feed off of the malloc attribute to make decisions about whether NULL-checking needs to be performed on the result, and/or provide better leak detection analysis. However, that's a separate idea, and doesn't affect this series. -- Eric Blake eblake@redhat.com +1-801-349-2682 Libvirt virtualization library http://libvirt.org

On Thu, Dec 02, 2010 at 12:28:17PM +0000, Daniel P. Berrange wrote:
On Thu, Dec 02, 2010 at 03:26:57PM +0800, Hu Tao wrote:
+virThreadPoolPtr virThreadPoolNew(size_t minWorkers, + size_t maxWorkers, + virThreadPoolJobFunc func, + void *opaque) +{ + virThreadPoolPtr pool; + size_t i; + + if (minWorkers > maxWorkers) + minWorkers = maxWorkers; + + if (VIR_ALLOC(pool) < 0) { + virReportOOMError(); + return NULL; + } + + pool->jobList.head = NULL; + pool->jobList.tail = &pool->jobList.head; + + pool->jobFunc = func; + pool->jobOpaque = opaque; + + if (virMutexInit(&pool->mutex) < 0) + goto error; + if (virCondInit(&pool->cond) < 0) + goto error; + if (virCondInit(&pool->quit_cond) < 0) + goto error; + + if (VIR_ALLOC_N(pool->workers, minWorkers) < 0) + goto error; + + pool->maxWorkers = maxWorkers; + for (i = 0; i < minWorkers; i++) { + if (virThreadCreate(&pool->workers[i], + true, + virThreadPoolWorker, + pool) < 0) { + virThreadPoolFree(pool); + return NULL; + } + pool->nWorkers++; + } + + return pool; + +error: + VIR_FREE(pool->workers); + ignore_value(virCondDestroy(&pool->quit_cond)); + ignore_value(virCondDestroy(&pool->cond)); + virMutexDestroy(&pool->mutex); + return NULL; +}
This is leaking 'pool' on error. IMHO it is preferrable to just call virThreadPoolDestroy, otherwise anytime we add another field to virThreadPoolPtr struct, we have to consider updating 2 cleanup paths.
Agree. Since it is in error clean path (thus thread pool is not yet created) it doesn't matter to lock on a broken mutex or wait on a broken cond. -- Thanks, Hu Tao

On Fri, Dec 03, 2010 at 09:46:44AM +0800, Hu Tao wrote:
On Thu, Dec 02, 2010 at 12:28:17PM +0000, Daniel P. Berrange wrote:
On Thu, Dec 02, 2010 at 03:26:57PM +0800, Hu Tao wrote:
+virThreadPoolPtr virThreadPoolNew(size_t minWorkers, + size_t maxWorkers, + virThreadPoolJobFunc func, + void *opaque) +{ + virThreadPoolPtr pool; + size_t i; + + if (minWorkers > maxWorkers) + minWorkers = maxWorkers; + + if (VIR_ALLOC(pool) < 0) { + virReportOOMError(); + return NULL; + } + + pool->jobList.head = NULL; + pool->jobList.tail = &pool->jobList.head; + + pool->jobFunc = func; + pool->jobOpaque = opaque; + + if (virMutexInit(&pool->mutex) < 0) + goto error; + if (virCondInit(&pool->cond) < 0) + goto error; + if (virCondInit(&pool->quit_cond) < 0) + goto error; + + if (VIR_ALLOC_N(pool->workers, minWorkers) < 0) + goto error; + + pool->maxWorkers = maxWorkers; + for (i = 0; i < minWorkers; i++) { + if (virThreadCreate(&pool->workers[i], + true, + virThreadPoolWorker, + pool) < 0) { + virThreadPoolFree(pool); + return NULL; + } + pool->nWorkers++; + } + + return pool; + +error: + VIR_FREE(pool->workers); + ignore_value(virCondDestroy(&pool->quit_cond)); + ignore_value(virCondDestroy(&pool->cond)); + virMutexDestroy(&pool->mutex); + return NULL; +}
This is leaking 'pool' on error. IMHO it is preferrable to just call virThreadPoolDestroy, otherwise anytime we add another field to virThreadPoolPtr struct, we have to consider updating 2 cleanup paths.
Agree. Since it is in error clean path (thus thread pool is not yet created) it doesn't matter to lock on a broken mutex or wait on a broken cond.
IMHO it doesn't matter if we call virMutexDestroy on an uninitialized mutex, not least because this code is already calling viCondDestroy on potentially uninitialized conditions. We'll just get an error code back from the destroy function which is ignored anyway. And the failure to initialize a mutex is something I've never encountered in practice. Regards, Daniel

On Thu, Dec 02, 2010 at 12:28:17PM +0000, Daniel P. Berrange wrote: <...snip...>
+ + if (virThreadCreate(&pool->workers[pool->nWorkers - 1], + true, + virThreadPoolWorker, + pool) < 0) { + pool->nWorkers--; + goto error; + }
Small typo, that check should "!= NULL", rather than "< 0".
Confused. Do you mean virThreadCreate() or VIR_ALLOC() below? But both return int.
+ } + + if (VIR_ALLOC(job) < 0) { + virReportOOMError(); + goto error; + }
-- Thanks, Hu Tao

On Fri, Dec 03, 2010 at 10:49:25AM +0800, Hu Tao wrote:
On Thu, Dec 02, 2010 at 12:28:17PM +0000, Daniel P. Berrange wrote: <...snip...>
+ + if (virThreadCreate(&pool->workers[pool->nWorkers - 1], + true, + virThreadPoolWorker, + pool) < 0) { + pool->nWorkers--; + goto error; + }
Small typo, that check should "!= NULL", rather than "< 0".
Confused. Do you mean virThreadCreate() or VIR_ALLOC() below? But both return int.
virThreadCreate returns a virThreadPtr object, so you need to check for its pointer being non-NULL. Regards, Daniel

On Mon, Dec 06, 2010 at 10:49:00AM +0000, Daniel P. Berrange wrote:
On Fri, Dec 03, 2010 at 10:49:25AM +0800, Hu Tao wrote:
On Thu, Dec 02, 2010 at 12:28:17PM +0000, Daniel P. Berrange wrote: <...snip...>
+ + if (virThreadCreate(&pool->workers[pool->nWorkers - 1], + true, + virThreadPoolWorker, + pool) < 0) { + pool->nWorkers--; + goto error; + }
Small typo, that check should "!= NULL", rather than "< 0".
Confused. Do you mean virThreadCreate() or VIR_ALLOC() below? But both return int.
virThreadCreate returns a virThreadPtr object, so you need to check for its pointer being non-NULL.
No, virThreadCreate doesn't allocate memory for a virThread object but takes as its first parameter a virThreadPtr pointing to a virThread object which is allocated by the caller. I'd appreciate it if you can help review v5 of this series(link is https://www.redhat.com/archives/libvir-list/2010-December/msg00181.html) -- Thanks, Hu Tao

This patch prepares for the next patch. --- src/qemu/qemu_driver.c | 132 +++++++++++++++++++++++++++--------------------- 1 files changed, 74 insertions(+), 58 deletions(-) diff --git a/src/qemu/qemu_driver.c b/src/qemu/qemu_driver.c index 1a7c1ad..e534195 100644 --- a/src/qemu/qemu_driver.c +++ b/src/qemu/qemu_driver.c @@ -6063,6 +6063,78 @@ cleanup: return ret; } +static int doCoreDump(struct qemud_driver *driver, + virDomainObjPtr vm, + const char *path, + enum qemud_save_formats compress) +{ + int fd = -1; + int ret = -1; + qemuDomainObjPrivatePtr priv; + + priv = vm->privateData; + + /* Create an empty file with appropriate ownership. */ + if ((fd = open(path, O_CREAT|O_TRUNC|O_WRONLY, S_IRUSR|S_IWUSR)) < 0) { + qemuReportError(VIR_ERR_OPERATION_FAILED, + _("failed to create '%s'"), path); + goto cleanup; + } + + if (VIR_CLOSE(fd) < 0) { + virReportSystemError(errno, + _("unable to save file %s"), + path); + goto cleanup; + } + + if (driver->securityDriver && + driver->securityDriver->domainSetSavedStateLabel && + driver->securityDriver->domainSetSavedStateLabel(driver->securityDriver, + vm, path) == -1) + goto cleanup; + + qemuDomainObjEnterMonitorWithDriver(driver, vm); + if (compress == QEMUD_SAVE_FORMAT_RAW) { + const char *args[] = { + "cat", + NULL, + }; + ret = qemuMonitorMigrateToFile(priv->mon, + QEMU_MONITOR_MIGRATE_BACKGROUND, + args, path, 0); + } else { + const char *prog = qemudSaveCompressionTypeToString(compress); + const char *args[] = { + prog, + "-c", + NULL, + }; + ret = qemuMonitorMigrateToFile(priv->mon, + QEMU_MONITOR_MIGRATE_BACKGROUND, + args, path, 0); + } + qemuDomainObjExitMonitorWithDriver(driver, vm); + if (ret < 0) + goto cleanup; + + ret = qemuDomainWaitForMigrationComplete(driver, vm); + + if (ret < 0) + goto cleanup; + + if (driver->securityDriver && + driver->securityDriver->domainRestoreSavedStateLabel && + driver->securityDriver->domainRestoreSavedStateLabel(driver->securityDriver, + vm, path) == -1) + goto cleanup; + +cleanup: + if (ret != 0) + unlink(path); + return ret; +} + static enum qemud_save_formats getCompressionType(struct qemud_driver *driver) { @@ -6097,13 +6169,10 @@ static int qemudDomainCoreDump(virDomainPtr dom, struct qemud_driver *driver = dom->conn->privateData; virDomainObjPtr vm; int resume = 0, paused = 0; - int ret = -1, fd = -1; + int ret = -1; virDomainEventPtr event = NULL; - enum qemud_save_formats compress; qemuDomainObjPrivatePtr priv; - compress = getCompressionType(driver); - qemuDriverLock(driver); vm = virDomainFindByUUID(&driver->domains, dom->uuid); @@ -6125,26 +6194,6 @@ static int qemudDomainCoreDump(virDomainPtr dom, goto endjob; } - /* Create an empty file with appropriate ownership. */ - if ((fd = open(path, O_CREAT|O_TRUNC|O_WRONLY, S_IRUSR|S_IWUSR)) < 0) { - qemuReportError(VIR_ERR_OPERATION_FAILED, - _("failed to create '%s'"), path); - goto endjob; - } - - if (VIR_CLOSE(fd) < 0) { - virReportSystemError(errno, - _("unable to save file %s"), - path); - goto endjob; - } - - if (driver->securityDriver && - driver->securityDriver->domainSetSavedStateLabel && - driver->securityDriver->domainSetSavedStateLabel(driver->securityDriver, - vm, path) == -1) - goto endjob; - /* Migrate will always stop the VM, so the resume condition is independent of whether the stop command is issued. */ resume = (vm->state == VIR_DOMAIN_RUNNING); @@ -6168,43 +6217,12 @@ static int qemudDomainCoreDump(virDomainPtr dom, } } - qemuDomainObjEnterMonitorWithDriver(driver, vm); - if (compress == QEMUD_SAVE_FORMAT_RAW) { - const char *args[] = { - "cat", - NULL, - }; - ret = qemuMonitorMigrateToFile(priv->mon, - QEMU_MONITOR_MIGRATE_BACKGROUND, - args, path, 0); - } else { - const char *prog = qemudSaveCompressionTypeToString(compress); - const char *args[] = { - prog, - "-c", - NULL, - }; - ret = qemuMonitorMigrateToFile(priv->mon, - QEMU_MONITOR_MIGRATE_BACKGROUND, - args, path, 0); - } - qemuDomainObjExitMonitorWithDriver(driver, vm); - if (ret < 0) - goto endjob; - - ret = qemuDomainWaitForMigrationComplete(driver, vm); - + ret = doCoreDump(driver, vm, path, getCompressionType(driver)); if (ret < 0) goto endjob; paused = 1; - if (driver->securityDriver && - driver->securityDriver->domainRestoreSavedStateLabel && - driver->securityDriver->domainRestoreSavedStateLabel(driver->securityDriver, - vm, path) == -1) - goto endjob; - endjob: if ((ret == 0) && (flags & VIR_DUMP_CRASH)) { qemudShutdownVMDaemon(driver, vm, 0); @@ -6237,8 +6255,6 @@ endjob: } cleanup: - if (ret != 0) - unlink(path); if (vm) virDomainObjUnlock(vm); if (event) -- 1.7.3 -- Thanks, Hu Tao

On Thu, Dec 02, 2010 at 03:30:03PM +0800, Hu Tao wrote:
This patch prepares for the next patch. --- src/qemu/qemu_driver.c | 132 +++++++++++++++++++++++++++--------------------- 1 files changed, 74 insertions(+), 58 deletions(-)
ACK, looks fine to me. Daniel

`dump' watchdog action lets libvirtd to dump the guest when receives a watchdog event (which probably means a guest crash) Currently only qemu is supported. --- src/conf/domain_conf.c | 1 + src/conf/domain_conf.h | 1 + src/qemu/qemu.conf | 5 +++ src/qemu/qemu_conf.c | 16 ++++++++- src/qemu/qemu_conf.h | 5 +++ src/qemu/qemu_driver.c | 95 ++++++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 122 insertions(+), 1 deletions(-) diff --git a/src/conf/domain_conf.c b/src/conf/domain_conf.c index 3f14cee..a6cb444 100644 --- a/src/conf/domain_conf.c +++ b/src/conf/domain_conf.c @@ -245,6 +245,7 @@ VIR_ENUM_IMPL(virDomainWatchdogAction, VIR_DOMAIN_WATCHDOG_ACTION_LAST, "shutdown", "poweroff", "pause", + "dump", "none") VIR_ENUM_IMPL(virDomainVideo, VIR_DOMAIN_VIDEO_TYPE_LAST, diff --git a/src/conf/domain_conf.h b/src/conf/domain_conf.h index 899b19f..7f50b79 100644 --- a/src/conf/domain_conf.h +++ b/src/conf/domain_conf.h @@ -462,6 +462,7 @@ enum virDomainWatchdogAction { VIR_DOMAIN_WATCHDOG_ACTION_SHUTDOWN, VIR_DOMAIN_WATCHDOG_ACTION_POWEROFF, VIR_DOMAIN_WATCHDOG_ACTION_PAUSE, + VIR_DOMAIN_WATCHDOG_ACTION_DUMP, VIR_DOMAIN_WATCHDOG_ACTION_NONE, VIR_DOMAIN_WATCHDOG_ACTION_LAST diff --git a/src/qemu/qemu.conf b/src/qemu/qemu.conf index f4f965e..ba41f80 100644 --- a/src/qemu/qemu.conf +++ b/src/qemu/qemu.conf @@ -191,6 +191,11 @@ # save_image_format = "raw" # dump_image_format = "raw" +# When a domain is configured to be auto-dumped when libvirtd receives a +# watchdog event from qemu guest, libvirtd will save dump files in directory +# specified by auto_dump_path. Default value is /var/lib/libvirt/qemu/dump +# +# auto_dump_path = "/var/lib/libvirt/qemu/dump" # If provided by the host and a hugetlbfs mount point is configured, # a guest may request huge page backing. When this mount point is diff --git a/src/qemu/qemu_conf.c b/src/qemu/qemu_conf.c index 7cd0603..187e206 100644 --- a/src/qemu/qemu_conf.c +++ b/src/qemu/qemu_conf.c @@ -386,6 +386,17 @@ int qemudLoadDriverConfig(struct qemud_driver *driver, } } + p = virConfGetValue (conf, "auto_dump_path"); + CHECK_TYPE ("auto_dump_path", VIR_CONF_STRING); + if (p && p->str) { + VIR_FREE(driver->autoDumpPath); + if (!(driver->autoDumpPath = strdup(p->str))) { + virReportOOMError(); + virConfFree(conf); + return -1; + } + } + p = virConfGetValue (conf, "hugetlbfs_mount"); CHECK_TYPE ("hugetlbfs_mount", VIR_CONF_STRING); if (p && p->str) { @@ -5374,7 +5385,10 @@ int qemudBuildCommandLine(virConnectPtr conn, } ADD_ARG(optstr); - const char *action = virDomainWatchdogActionTypeToString(watchdog->action); + int act = watchdog->action; + if (act == VIR_DOMAIN_WATCHDOG_ACTION_DUMP) + act = VIR_DOMAIN_WATCHDOG_ACTION_PAUSE; + const char *action = virDomainWatchdogActionTypeToString(act); if (!action) { qemuReportError(VIR_ERR_INTERNAL_ERROR, "%s", _("invalid watchdog action")); diff --git a/src/qemu/qemu_conf.h b/src/qemu/qemu_conf.h index aba64d6..9bcae88 100644 --- a/src/qemu/qemu_conf.h +++ b/src/qemu/qemu_conf.h @@ -41,6 +41,7 @@ # include "driver.h" # include "bitmap.h" # include "macvtap.h" +# include "threadpool.h" # define qemudDebug(fmt, ...) do {} while(0) @@ -107,6 +108,8 @@ enum qemud_cmd_flags { struct qemud_driver { virMutex lock; + virThreadPoolPtr workerPool; + int privileged; uid_t user; @@ -174,6 +177,8 @@ struct qemud_driver { char *saveImageFormat; char *dumpImageFormat; + char *autoDumpPath; + pciDeviceList *activePciHostdevs; virBitmapPtr reservedVNCPorts; diff --git a/src/qemu/qemu_driver.c b/src/qemu/qemu_driver.c index e534195..bd25d90 100644 --- a/src/qemu/qemu_driver.c +++ b/src/qemu/qemu_driver.c @@ -85,6 +85,7 @@ #include "files.h" #include "fdstream.h" #include "configmake.h" +#include "threadpool.h" #define VIR_FROM_THIS VIR_FROM_QEMU @@ -137,6 +138,14 @@ struct _qemuDomainObjPrivate { int persistentAddrs; }; +struct watchdogEvent +{ + virDomainObjPtr vm; + int action; +}; + +static void processWatchdogEvent(void *data, void *opaque); + static int qemudShutdown(void); static void qemuDriverLock(struct qemud_driver *driver) @@ -1204,6 +1213,17 @@ qemuHandleDomainWatchdog(qemuMonitorPtr mon ATTRIBUTE_UNUSED, if (virDomainSaveStatus(driver->caps, driver->stateDir, vm) < 0) VIR_WARN("Unable to save status on vm %s after IO error", vm->def->name); } + + if (vm->def->watchdog->action == VIR_DOMAIN_WATCHDOG_ACTION_DUMP) { + struct watchdogEvent *wdEvent; + if (VIR_ALLOC(wdEvent) == 0) { + wdEvent->action = VIR_DOMAIN_WATCHDOG_ACTION_DUMP; + wdEvent->vm = vm; + ignore_value(virThreadPoolSendJob(driver->workerPool, wdEvent)); + } else + virReportOOMError(); + } + virDomainObjUnlock(vm); if (watchdogEvent || lifecycleEvent) { @@ -1786,6 +1806,9 @@ qemudStartup(int privileged) { if (virAsprintf(&qemu_driver->snapshotDir, "%s/lib/libvirt/qemu/snapshot", LOCALSTATEDIR) == -1) goto out_of_memory; + if (virAsprintf(&qemu_driver->autoDumpPath, + "%s/lib/libvirt/qemu/dump", LOCALSTATEDIR) == -1) + goto out_of_memory; } else { uid_t uid = geteuid(); char *userdir = virGetUserDirectory(uid); @@ -1814,6 +1837,8 @@ qemudStartup(int privileged) { goto out_of_memory; if (virAsprintf(&qemu_driver->snapshotDir, "%s/qemu/snapshot", base) == -1) goto out_of_memory; + if (virAsprintf(&qemu_driver->autoDumpPath, "%s/qemu/dump", base) == -1) + goto out_of_memory; } if (virFileMakePath(qemu_driver->stateDir) != 0) { @@ -1846,6 +1871,12 @@ qemudStartup(int privileged) { qemu_driver->snapshotDir, virStrerror(errno, ebuf, sizeof ebuf)); goto error; } + if (virFileMakePath(qemu_driver->autoDumpPath) != 0) { + char ebuf[1024]; + VIR_ERROR(_("Failed to create dump dir '%s': %s"), + qemu_driver->autoDumpPath, virStrerror(errno, ebuf, sizeof ebuf)); + goto error; + } /* Configuration paths are either ~/.libvirt/qemu/... (session) or * /etc/libvirt/qemu/... (system). @@ -1971,6 +2002,10 @@ qemudStartup(int privileged) { qemudAutostartConfigs(qemu_driver); + qemu_driver->workerPool = virThreadPoolNew(0, 1, processWatchdogEvent, NULL); + if (!qemu_driver->workerPool) + goto error; + if (conn) virConnectClose(conn); @@ -2077,6 +2112,7 @@ qemudShutdown(void) { VIR_FREE(qemu_driver->cacheDir); VIR_FREE(qemu_driver->saveDir); VIR_FREE(qemu_driver->snapshotDir); + VIR_FREE(qemu_driver->autoDumpPath); VIR_FREE(qemu_driver->vncTLSx509certdir); VIR_FREE(qemu_driver->vncListen); VIR_FREE(qemu_driver->vncPassword); @@ -2112,6 +2148,7 @@ qemudShutdown(void) { qemuDriverUnlock(qemu_driver); virMutexDestroy(&qemu_driver->lock); + virThreadPoolFree(qemu_driver->workerPool); VIR_FREE(qemu_driver); return 0; @@ -6263,6 +6300,64 @@ cleanup: return ret; } +static void processWatchdogEvent(void *data, void *opaque ATTRIBUTE_UNUSED) +{ + int ret; + struct watchdogEvent *wdEvent = data; + + switch (wdEvent->action) { + case VIR_DOMAIN_WATCHDOG_ACTION_DUMP: + { + char *dumpfile; + int i; + + qemuDomainObjPrivatePtr priv = wdEvent->vm->privateData; + + i = virAsprintf(&dumpfile, "%s/%s-%u", + qemu_driver->autoDumpPath, + wdEvent->vm->def->name, + (unsigned int)time(NULL)); + + qemuDriverLock(qemu_driver); + virDomainObjLock(wdEvent->vm); + + if (qemuDomainObjBeginJobWithDriver(qemu_driver, wdEvent->vm) < 0) + break; + + if (!virDomainObjIsActive(wdEvent->vm)) { + qemuReportError(VIR_ERR_OPERATION_INVALID, + "%s", _("domain is not running")); + break; + } + + ret = doCoreDump(qemu_driver, + wdEvent->vm, + dumpfile, + getCompressionType(qemu_driver)); + if (ret < 0) + qemuReportError(VIR_ERR_OPERATION_FAILED, + "%s", _("Dump failed")); + + qemuDomainObjEnterMonitorWithDriver(qemu_driver, wdEvent->vm); + ret = qemuMonitorStartCPUs(priv->mon, NULL); + qemuDomainObjExitMonitorWithDriver(qemu_driver, wdEvent->vm); + + if (ret < 0) + qemuReportError(VIR_ERR_OPERATION_FAILED, + "%s", _("Resuming after dump failed")); + + ignore_value(qemuDomainObjEndJob(wdEvent->vm)); + + virDomainObjUnlock(wdEvent->vm); + qemuDriverUnlock(qemu_driver); + + VIR_FREE(dumpfile); + } + break; + } + + VIR_FREE(wdEvent); +} static int qemudDomainHotplugVcpus(virDomainObjPtr vm, unsigned int nvcpus) { -- 1.7.3 -- Thanks, Hu Tao

On Thu, Dec 02, 2010 at 03:30:10PM +0800, Hu Tao wrote:
diff --git a/src/qemu/qemu.conf b/src/qemu/qemu.conf index f4f965e..ba41f80 100644 --- a/src/qemu/qemu.conf +++ b/src/qemu/qemu.conf @@ -191,6 +191,11 @@ # save_image_format = "raw" # dump_image_format = "raw"
+# When a domain is configured to be auto-dumped when libvirtd receives a +# watchdog event from qemu guest, libvirtd will save dump files in directory +# specified by auto_dump_path. Default value is /var/lib/libvirt/qemu/dump +# +# auto_dump_path = "/var/lib/libvirt/qemu/dump"
# If provided by the host and a hugetlbfs mount point is configured, # a guest may request huge page backing. When this mount point is
Also need to list this new setting in qemu.aug and test_qemu.aug
diff --git a/src/qemu/qemu_driver.c b/src/qemu/qemu_driver.c index e534195..bd25d90 100644 --- a/src/qemu/qemu_driver.c +++ b/src/qemu/qemu_driver.c @@ -6263,6 +6300,64 @@ cleanup: return ret; }
+static void processWatchdogEvent(void *data, void *opaque ATTRIBUTE_UNUSED) +{ + int ret; + struct watchdogEvent *wdEvent = data; + + switch (wdEvent->action) { + case VIR_DOMAIN_WATCHDOG_ACTION_DUMP: + { + char *dumpfile; + int i; + + qemuDomainObjPrivatePtr priv = wdEvent->vm->privateData; + + i = virAsprintf(&dumpfile, "%s/%s-%u", + qemu_driver->autoDumpPath, + wdEvent->vm->def->name, + (unsigned int)time(NULL)); + + qemuDriverLock(qemu_driver); + virDomainObjLock(wdEvent->vm); + + if (qemuDomainObjBeginJobWithDriver(qemu_driver, wdEvent->vm) < 0) + break; + + if (!virDomainObjIsActive(wdEvent->vm)) { + qemuReportError(VIR_ERR_OPERATION_INVALID, + "%s", _("domain is not running")); + break; + } + + ret = doCoreDump(qemu_driver, + wdEvent->vm, + dumpfile, + getCompressionType(qemu_driver)); + if (ret < 0) + qemuReportError(VIR_ERR_OPERATION_FAILED, + "%s", _("Dump failed")); + + qemuDomainObjEnterMonitorWithDriver(qemu_driver, wdEvent->vm); + ret = qemuMonitorStartCPUs(priv->mon, NULL); + qemuDomainObjExitMonitorWithDriver(qemu_driver, wdEvent->vm); + + if (ret < 0) + qemuReportError(VIR_ERR_OPERATION_FAILED, + "%s", _("Resuming after dump failed")); + + ignore_value(qemuDomainObjEndJob(wdEvent->vm)); + + virDomainObjUnlock(wdEvent->vm);
It isn't safe to ignore the qemuDomainObjEndJob return value, because if the return value is 0, then the VM object has been free()d. So you need todo if (qemuDomainObjEndJob(wdEvent->vm) > 0) virDomainObjUnlock(wdEvent->vm);
+ qemuDriverUnlock(qemu_driver); + + VIR_FREE(dumpfile); + } + break; + } + + VIR_FREE(wdEvent); +}
I'd prefer it if the 'qemu_driver' was passed in via the 'void *opaque' parameter. Although it is available via the global variable, we aim to avoid using that in the driver code, except for the global startup/shutdown methods. Regards, Daniel

--- daemon/libvirtd.c | 172 +++++++++-------------------------------------------- daemon/libvirtd.h | 4 + 2 files changed, 33 insertions(+), 143 deletions(-) diff --git a/daemon/libvirtd.c b/daemon/libvirtd.c index 791b3dc..dbd050a 100644 --- a/daemon/libvirtd.c +++ b/daemon/libvirtd.c @@ -67,6 +67,7 @@ #include "stream.h" #include "hooks.h" #include "virtaudit.h" +#include "threadpool.h" #ifdef HAVE_AVAHI # include "mdns.h" #endif @@ -248,7 +249,6 @@ static void sig_handler(int sig, siginfo_t * siginfo, static void qemudDispatchClientEvent(int watch, int fd, int events, void *opaque); static void qemudDispatchServerEvent(int watch, int fd, int events, void *opaque); -static int qemudStartWorker(struct qemud_server *server, struct qemud_worker *worker); void qemudClientMessageQueuePush(struct qemud_client_message **queue, @@ -1383,6 +1383,7 @@ static int qemudDispatchServer(struct qemud_server *server, struct qemud_socket client->auth = sock->auth; client->addr = addr; client->addrstr = addrstr; + client->server = server; addrstr = NULL; for (i = 0 ; i < VIR_DOMAIN_EVENT_ID_LAST ; i++) { @@ -1458,19 +1459,6 @@ static int qemudDispatchServer(struct qemud_server *server, struct qemud_socket server->clients[server->nclients++] = client; - if (server->nclients > server->nactiveworkers && - server->nactiveworkers < server->nworkers) { - for (i = 0 ; i < server->nworkers ; i++) { - if (!server->workers[i].hasThread) { - if (qemudStartWorker(server, &server->workers[i]) < 0) - return -1; - server->nactiveworkers++; - break; - } - } - } - - return 0; error: @@ -1534,100 +1522,27 @@ void qemudDispatchClientFailure(struct qemud_client *client) { VIR_FREE(client->addrstr); } - -/* Caller must hold server lock */ -static struct qemud_client *qemudPendingJob(struct qemud_server *server) +static void qemudWorker(void *data, void *opaque ATTRIBUTE_UNUSED) { - int i; - for (i = 0 ; i < server->nclients ; i++) { - virMutexLock(&server->clients[i]->lock); - if (server->clients[i]->dx) { - /* Delibrately don't unlock client - caller wants the lock */ - return server->clients[i]; - } - virMutexUnlock(&server->clients[i]->lock); - } - return NULL; -} + struct qemud_client *client = data; + struct qemud_client_message *msg; -static void *qemudWorker(void *data) -{ - struct qemud_worker *worker = data; - struct qemud_server *server = worker->server; + virMutexLock(&client->lock); - while (1) { - struct qemud_client *client = NULL; - struct qemud_client_message *msg; + /* Remove our message from dispatch queue while we use it */ + msg = qemudClientMessageQueueServe(&client->dx); - virMutexLock(&server->lock); - while ((client = qemudPendingJob(server)) == NULL) { - if (worker->quitRequest || - virCondWait(&server->job, &server->lock) < 0) { - virMutexUnlock(&server->lock); - return NULL; - } - } - if (worker->quitRequest) { - virMutexUnlock(&client->lock); - virMutexUnlock(&server->lock); - return NULL; - } - worker->processingCall = 1; - virMutexUnlock(&server->lock); - - /* We own a locked client now... */ - client->refs++; - - /* Remove our message from dispatch queue while we use it */ - msg = qemudClientMessageQueueServe(&client->dx); - - /* This function drops the lock during dispatch, - * and re-acquires it before returning */ - if (remoteDispatchClientRequest (server, client, msg) < 0) { - VIR_FREE(msg); - qemudDispatchClientFailure(client); - client->refs--; - virMutexUnlock(&client->lock); - continue; - } - - client->refs--; - virMutexUnlock(&client->lock); - - virMutexLock(&server->lock); - worker->processingCall = 0; - virMutexUnlock(&server->lock); - } -} - -static int qemudStartWorker(struct qemud_server *server, - struct qemud_worker *worker) { - pthread_attr_t attr; - pthread_attr_init(&attr); - /* We want to join workers, so don't detach them */ - /*pthread_attr_setdetachstate(&attr, 1);*/ - - if (worker->hasThread) - return -1; - - worker->server = server; - worker->hasThread = 1; - worker->quitRequest = 0; - worker->processingCall = 0; - - if (pthread_create(&worker->thread, - &attr, - qemudWorker, - worker) != 0) { - worker->hasThread = 0; - worker->server = NULL; - return -1; + /* This function drops the lock during dispatch, + * and re-acquires it before returning */ + if (remoteDispatchClientRequest (client->server, client, msg) < 0) { + VIR_FREE(msg); + qemudDispatchClientFailure(client); } - return 0; + client->refs--; + virMutexUnlock(&client->lock); } - /* * Read data into buffer using wire decoding (plain or TLS) * @@ -1857,8 +1772,11 @@ readmore: } /* Move completed message to the end of the dispatch queue */ - if (msg) + if (msg) { + client->refs++; qemudClientMessageQueuePush(&client->dx, msg); + ignore_value(virThreadPoolSendJob(server->workerPool, client)); + } client->nrequests++; /* Possibly need to create another receive buffer */ @@ -1870,9 +1788,6 @@ readmore: client->rx->bufferLength = REMOTE_MESSAGE_HEADER_XDR_LEN; qemudUpdateClientEvent(client); - - /* Tell one of the workers to get on with it... */ - virCondSignal(&server->job); } } } @@ -2311,10 +2226,14 @@ static void *qemudRunLoop(void *opaque) { return NULL; } - for (i = 0 ; i < min_workers ; i++) { - if (qemudStartWorker(server, &server->workers[i]) < 0) - goto cleanup; - server->nactiveworkers++; + server->workerPool = virThreadPoolNew(min_workers, + max_workers, + qemudWorker, + NULL); + if (!server->workerPool) { + VIR_ERROR0(_("Failed to create thread pool")); + virMutexUnlock(&server->lock); + return NULL; } for (;!server->quitEventThread;) { @@ -2367,47 +2286,14 @@ static void *qemudRunLoop(void *opaque) { goto reprocess; } } - - /* If number of active workers exceeds both the min_workers - * threshold and the number of clients, then kill some - * off */ - for (i = 0 ; (i < server->nworkers && - server->nactiveworkers > server->nclients && - server->nactiveworkers > min_workers) ; i++) { - - if (server->workers[i].hasThread && - !server->workers[i].processingCall) { - server->workers[i].quitRequest = 1; - - virCondBroadcast(&server->job); - virMutexUnlock(&server->lock); - pthread_join(server->workers[i].thread, NULL); - virMutexLock(&server->lock); - server->workers[i].hasThread = 0; - server->nactiveworkers--; - } - } - } - -cleanup: - for (i = 0 ; i < server->nworkers ; i++) { - if (!server->workers[i].hasThread) - continue; - - server->workers[i].quitRequest = 1; - virCondBroadcast(&server->job); - - virMutexUnlock(&server->lock); - pthread_join(server->workers[i].thread, NULL); - virMutexLock(&server->lock); - server->workers[i].hasThread = 0; } - VIR_FREE(server->workers); for (i = 0; i < server->nclients; i++) qemudFreeClient(server->clients[i]); server->nclients = 0; VIR_SHRINK_N(server->clients, server->nclients_max, server->nclients_max); + virThreadPoolFree(server->workerPool); + server->workerPool = NULL; virMutexUnlock(&server->lock); return NULL; } diff --git a/daemon/libvirtd.h b/daemon/libvirtd.h index af20e56..d7e10dc 100644 --- a/daemon/libvirtd.h +++ b/daemon/libvirtd.h @@ -49,6 +49,7 @@ # include "logging.h" # include "threads.h" # include "network.h" +# include "threadpool.h" # if WITH_DTRACE # ifndef LIBVIRTD_PROBES_H @@ -192,6 +193,8 @@ struct qemud_client { int magic; + struct qemud_server *server; + int fd; int watch; unsigned int readonly :1; @@ -283,6 +286,7 @@ struct qemud_server { int privileged; + virThreadPoolPtr workerPool; size_t nworkers; size_t nactiveworkers; struct qemud_worker *workers; -- 1.7.3 -- Thanks, Hu Tao

On Thu, Dec 02, 2010 at 03:30:23PM +0800, Hu Tao wrote:
--- daemon/libvirtd.c | 172 +++++++++-------------------------------------------- daemon/libvirtd.h | 4 + 2 files changed, 33 insertions(+), 143 deletions(-)
diff --git a/daemon/libvirtd.c b/daemon/libvirtd.c index 791b3dc..dbd050a 100644 --- a/daemon/libvirtd.c +++ b/daemon/libvirtd.c @@ -67,6 +67,7 @@ #include "stream.h" #include "hooks.h" #include "virtaudit.h" +#include "threadpool.h" #ifdef HAVE_AVAHI # include "mdns.h" #endif @@ -248,7 +249,6 @@ static void sig_handler(int sig, siginfo_t * siginfo,
static void qemudDispatchClientEvent(int watch, int fd, int events, void *opaque); static void qemudDispatchServerEvent(int watch, int fd, int events, void *opaque); -static int qemudStartWorker(struct qemud_server *server, struct qemud_worker *worker);
void qemudClientMessageQueuePush(struct qemud_client_message **queue, @@ -1383,6 +1383,7 @@ static int qemudDispatchServer(struct qemud_server *server, struct qemud_socket client->auth = sock->auth; client->addr = addr; client->addrstr = addrstr; + client->server = server; addrstr = NULL;
This shouldn't be needed, as 'server' shoudl be passed into the worker function via the 'void *opaque' parameter.
for (i = 0 ; i < VIR_DOMAIN_EVENT_ID_LAST ; i++) { @@ -1458,19 +1459,6 @@ static int qemudDispatchServer(struct qemud_server *server, struct qemud_socket
server->clients[server->nclients++] = client;
- if (server->nclients > server->nactiveworkers && - server->nactiveworkers < server->nworkers) { - for (i = 0 ; i < server->nworkers ; i++) { - if (!server->workers[i].hasThread) { - if (qemudStartWorker(server, &server->workers[i]) < 0) - return -1; - server->nactiveworkers++; - break; - } - } - } - - return 0;
error: @@ -1534,100 +1522,27 @@ void qemudDispatchClientFailure(struct qemud_client *client) { VIR_FREE(client->addrstr); }
- -/* Caller must hold server lock */ -static struct qemud_client *qemudPendingJob(struct qemud_server *server) +static void qemudWorker(void *data, void *opaque ATTRIBUTE_UNUSED) { - int i; - for (i = 0 ; i < server->nclients ; i++) { - virMutexLock(&server->clients[i]->lock); - if (server->clients[i]->dx) { - /* Delibrately don't unlock client - caller wants the lock */ - return server->clients[i]; - } - virMutexUnlock(&server->clients[i]->lock); - } - return NULL; -} + struct qemud_client *client = data; + struct qemud_client_message *msg;
-static void *qemudWorker(void *data) -{ - struct qemud_worker *worker = data; - struct qemud_server *server = worker->server; + virMutexLock(&client->lock);
It is neccessary to hold the lock on 'server' before obtaining a lock on 'client'. The server lock can be released again immediately if no longer needed.
- while (1) { - struct qemud_client *client = NULL; - struct qemud_client_message *msg; + /* Remove our message from dispatch queue while we use it */ + msg = qemudClientMessageQueueServe(&client->dx);
- virMutexLock(&server->lock); - while ((client = qemudPendingJob(server)) == NULL) { - if (worker->quitRequest || - virCondWait(&server->job, &server->lock) < 0) { - virMutexUnlock(&server->lock); - return NULL; - } - } - if (worker->quitRequest) { - virMutexUnlock(&client->lock); - virMutexUnlock(&server->lock); - return NULL; - } - worker->processingCall = 1; - virMutexUnlock(&server->lock); - - /* We own a locked client now... */ - client->refs++; - - /* Remove our message from dispatch queue while we use it */ - msg = qemudClientMessageQueueServe(&client->dx); - - /* This function drops the lock during dispatch, - * and re-acquires it before returning */ - if (remoteDispatchClientRequest (server, client, msg) < 0) { - VIR_FREE(msg); - qemudDispatchClientFailure(client); - client->refs--; - virMutexUnlock(&client->lock); - continue; - } - - client->refs--; - virMutexUnlock(&client->lock); - - virMutexLock(&server->lock); - worker->processingCall = 0; - virMutexUnlock(&server->lock); - } -} - -static int qemudStartWorker(struct qemud_server *server, - struct qemud_worker *worker) { - pthread_attr_t attr; - pthread_attr_init(&attr); - /* We want to join workers, so don't detach them */ - /*pthread_attr_setdetachstate(&attr, 1);*/ - - if (worker->hasThread) - return -1; - - worker->server = server; - worker->hasThread = 1; - worker->quitRequest = 0; - worker->processingCall = 0; - - if (pthread_create(&worker->thread, - &attr, - qemudWorker, - worker) != 0) { - worker->hasThread = 0; - worker->server = NULL; - return -1; + /* This function drops the lock during dispatch, + * and re-acquires it before returning */ + if (remoteDispatchClientRequest (client->server, client, msg) < 0) { + VIR_FREE(msg); + qemudDispatchClientFailure(client); }
- return 0; + client->refs--; + virMutexUnlock(&client->lock); }
- /* * Read data into buffer using wire decoding (plain or TLS) * @@ -1857,8 +1772,11 @@ readmore: }
/* Move completed message to the end of the dispatch queue */ - if (msg) + if (msg) { + client->refs++; qemudClientMessageQueuePush(&client->dx, msg); + ignore_value(virThreadPoolSendJob(server->workerPool, client)); + } client->nrequests++;
/* Possibly need to create another receive buffer */ @@ -1870,9 +1788,6 @@ readmore: client->rx->bufferLength = REMOTE_MESSAGE_HEADER_XDR_LEN;
qemudUpdateClientEvent(client); - - /* Tell one of the workers to get on with it... */ - virCondSignal(&server->job); } } } @@ -2311,10 +2226,14 @@ static void *qemudRunLoop(void *opaque) { return NULL; }
- for (i = 0 ; i < min_workers ; i++) { - if (qemudStartWorker(server, &server->workers[i]) < 0) - goto cleanup; - server->nactiveworkers++; + server->workerPool = virThreadPoolNew(min_workers, + max_workers, + qemudWorker, + NULL);
Should pass 'server' in here, instead of NULL.
+ if (!server->workerPool) { + VIR_ERROR0(_("Failed to create thread pool")); + virMutexUnlock(&server->lock); + return NULL; }
for (;!server->quitEventThread;) {
A small change in that we no longer kill off idle worker threads, but the improved simplicity of libvirtd code makes this a worthwhile tradeoff. So looks good to me aside from the minor locking bug. Regards, Daniel

On Thu, Dec 02, 2010 at 12:42:19PM +0000, Daniel P. Berrange wrote: <...snip...>
- -/* Caller must hold server lock */ -static struct qemud_client *qemudPendingJob(struct qemud_server *server) +static void qemudWorker(void *data, void *opaque ATTRIBUTE_UNUSED) { - int i; - for (i = 0 ; i < server->nclients ; i++) { - virMutexLock(&server->clients[i]->lock); - if (server->clients[i]->dx) { - /* Delibrately don't unlock client - caller wants the lock */ - return server->clients[i]; - } - virMutexUnlock(&server->clients[i]->lock); - } - return NULL; -} + struct qemud_client *client = data; + struct qemud_client_message *msg;
-static void *qemudWorker(void *data) -{ - struct qemud_worker *worker = data; - struct qemud_server *server = worker->server; + virMutexLock(&client->lock);
It is neccessary to hold the lock on 'server' before obtaining a lock on 'client'. The server lock can be released again immediately if no longer needed.
I guess the reason is to avoid locking a being-freed client. But client->refs has been already incremented by 1 right before the client goes into thread pool, which insures the client against being freed. <...snip...>
+ if (!server->workerPool) { + VIR_ERROR0(_("Failed to create thread pool")); + virMutexUnlock(&server->lock); + return NULL; }
for (;!server->quitEventThread;) {
A small change in that we no longer kill off idle worker threads,
Thread pool can be improved to achieve this internally or provide an interface for callers to force kill off idle worker threads.
but the improved simplicity of libvirtd code makes this a worthwhile tradeoff. So looks good to me aside from the minor locking bug.
Regards, Daniel
-- Thanks, Hu Tao
participants (3)
-
Daniel P. Berrange
-
Eric Blake
-
Hu Tao