[libvirt] [PATCH 00/10] Get ready for migration events

QEMU will soon get support for migration events which will finally allows us to get rid of polling query-migrate every 50ms. However, we first need to be able to wait for all events related to migration (migration status changes, block job events, async abort requests) at once. This series prepares the infrastructure which will allows us to do it (patches 1 and 2) and then makes use of it for block job events (patch 4) and abort requests (patch 10). Patch 8 fixes a bug in migration recovery after libvirtd restarts. The rest of the patches are mostly cleanups or bugfixes or other small changes required by the 5 main patches. Jiri Denemark (10): util: Introduce thread queues thread: Create thread local condition for every thread qemu: Introduce qemuBlockJobUpdate qemu: Properly report failed migration qemu: Use thread queues for synchronous block jobs qemu: Don't mess up with disk->mirrorState Pass domain object to private data formatter/parser qemu: Make qemuMigrationCancelDriveMirror usable without async job qemu: Cancel disk mirrors after libvirtd restart qemu: Use thread queues for asyncAbort po/POTFILES.in | 2 +- src/Makefile.am | 2 + src/conf/domain_conf.c | 4 +- src/conf/domain_conf.h | 6 +- src/libvirt_private.syms | 17 ++ src/libxl/libxl_domain.c | 10 +- src/lxc/lxc_domain.c | 12 +- src/qemu/qemu_blockjob.c | 177 +++++---------------- src/qemu/qemu_blockjob.h | 15 +- src/qemu/qemu_domain.c | 97 +++++++++--- src/qemu/qemu_domain.h | 8 +- src/qemu/qemu_driver.c | 24 ++- src/qemu/qemu_migration.c | 392 +++++++++++++++++++++++++++++----------------- src/qemu/qemu_migration.h | 3 + src/qemu/qemu_process.c | 17 +- src/util/virthread.c | 21 ++- src/util/virthreadqueue.c | 343 ++++++++++++++++++++++++++++++++++++++++ src/util/virthreadqueue.h | 54 +++++++ 18 files changed, 841 insertions(+), 363 deletions(-) create mode 100644 src/util/virthreadqueue.c create mode 100644 src/util/virthreadqueue.h -- 2.4.1

Our usage of pthread conditions does not allow a single thread to wait for several events from different sources. This is because the condition is bound to the source of the event. We can invert the usage by giving each thread its own condition and providing APIs for registering this thread condition with several sources. Each of the sources can then signal the thread condition. Thread queues also support several threads to be registered with a single event source, which can either wakeup all waiting threads or just the first one. Signed-off-by: Jiri Denemark <jdenemar@redhat.com> --- po/POTFILES.in | 1 + src/Makefile.am | 2 + src/libvirt_private.syms | 15 ++ src/util/virthreadqueue.c | 343 ++++++++++++++++++++++++++++++++++++++++++++++ src/util/virthreadqueue.h | 54 ++++++++ 5 files changed, 415 insertions(+) create mode 100644 src/util/virthreadqueue.c create mode 100644 src/util/virthreadqueue.h diff --git a/po/POTFILES.in b/po/POTFILES.in index bb0f6e1..edfd1cc 100644 --- a/po/POTFILES.in +++ b/po/POTFILES.in @@ -222,6 +222,7 @@ src/util/virstoragefile.c src/util/virstring.c src/util/virsysinfo.c src/util/virthreadjob.c +src/util/virthreadqueue.c src/util/virtime.c src/util/virtpm.c src/util/virtypedparam.c diff --git a/src/Makefile.am b/src/Makefile.am index 579421d..c746ecd 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -162,6 +162,7 @@ UTIL_SOURCES = \ util/virthread.c util/virthread.h \ util/virthreadjob.c util/virthreadjob.h \ util/virthreadpool.c util/virthreadpool.h \ + util/virthreadqueue.c util/virthreadqueue.h \ util/virtime.h util/virtime.c \ util/virtpm.h util/virtpm.c \ util/virtypedparam.c util/virtypedparam.h \ @@ -2169,6 +2170,7 @@ libvirt_setuid_rpc_client_la_SOURCES = \ util/virtime.c \ util/virthread.c \ util/virthreadjob.c \ + util/virthreadqueue.c \ util/virtypedparam.c \ util/viruri.c \ util/virutil.c \ diff --git a/src/libvirt_private.syms b/src/libvirt_private.syms index 6a95fb9..cb41d5c 100644 --- a/src/libvirt_private.syms +++ b/src/libvirt_private.syms @@ -2242,6 +2242,21 @@ virThreadPoolNewFull; virThreadPoolSendJob; +# util/virthreadqueue.h +virThreadCondInit; +virThreadCondInvalidate; +virThreadCondNew; +virThreadCondWait; +virThreadCondWaitUntil; +virThreadQueueBroadcast; +virThreadQueueFree; +virThreadQueueIsEmpty; +virThreadQueueNew; +virThreadQueueRegister; +virThreadQueueSignal; +virThreadQueueUnregister; + + # util/virtime.h virTimeFieldsNow; virTimeFieldsNowRaw; diff --git a/src/util/virthreadqueue.c b/src/util/virthreadqueue.c new file mode 100644 index 0000000..c9908cf --- /dev/null +++ b/src/util/virthreadqueue.c @@ -0,0 +1,343 @@ +/* + * virthreadqueue.c: code for managing queues of threads waiting for several + * conditions + * + * Copyright (C) 2015 Red Hat, Inc. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library. If not, see + * <http://www.gnu.org/licenses/>. + * + * Author: Jiri Denemark <jdenemar@redhat.com> + */ + +#include <config.h> + +#include "internal.h" +#include "viralloc.h" +#include "virerror.h" +#include "virlog.h" +#include "virobject.h" +#include "virthread.h" +#include "virthreadqueue.h" + +#define VIR_FROM_THIS VIR_FROM_THREAD +VIR_LOG_INIT("util.threadqueue"); + + +typedef struct _virThreadQueueItem virThreadQueueItem; +typedef virThreadQueueItem *virThreadQueueItemPtr; + +struct _virThreadQueue { + virThreadQueueItemPtr head; + virThreadQueueItemPtr tail; +}; + +struct _virThreadQueueItem { + virThreadQueueItemPtr prev; + virThreadQueueItemPtr next; + virThreadCondPtr cond; +}; + +struct _virThreadCond { + virObject parent; + + bool valid; + virCond cond; +}; + +static virClassPtr virThreadCondClass; +static virThreadLocal virThreadQueueCond; + +static void virThreadCondDispose(void *obj); + + +static int +virThreadQueueOnceInit(void) +{ + if (virThreadLocalInit(&virThreadQueueCond, NULL) < 0) + return -1; + + virThreadCondClass = virClassNew(virClassForObject(), + "virThreadCond", + sizeof(virThreadCond), + virThreadCondDispose); + if (!virThreadCondClass) + return -1; + + return 0; +} + +VIR_ONCE_GLOBAL_INIT(virThreadQueue) + + +virThreadCondPtr +virThreadCondNew(void) +{ + virThreadCondPtr cond; + + if (virThreadQueueInitialize() < 0 || + !(cond = virObjectNew(virThreadCondClass))) + return NULL; + + if (virCondInit(&cond->cond) < 0) { + virObjectUnref(cond); + return NULL; + } + + cond->valid = true; + return cond; +} + + +static void +virThreadCondDispose(void *obj) +{ + virThreadCondPtr cond = obj; + + virCondDestroy(&cond->cond); +} + + +void +virThreadCondInit(virThreadCondPtr cond) +{ + if (virThreadLocalSet(&virThreadQueueCond, (void *) cond) < 0) { + virReportSystemError(errno, + _("cannot set per-thread condition for %llu"), + virThreadSelfID()); + virObjectUnref(cond); + } +} + + +static virThreadCondPtr +virThreadCondGet(void) +{ + virThreadCondPtr cond; + + if (virThreadQueueInitialize() < 0) + return NULL; + + if (!(cond = virThreadLocalGet(&virThreadQueueCond))) { + virReportError(VIR_ERR_INTERNAL_ERROR, + _("uninitialized per-thread condition for %llu"), + virThreadSelfID()); + return NULL; + } + + return cond; +} + + +void +virThreadCondInvalidate(void) +{ + virThreadCondPtr cond; + + if (!(cond = virThreadCondGet())) + return; + + cond->valid = false; + virObjectUnref(cond); +} + + +int +virThreadCondWait(virMutexPtr mtx) +{ + virThreadCondPtr cond; + + if (!(cond = virThreadCondGet())) + return -1; + + if (virCondWait(&cond->cond, mtx) < 0) { + virReportSystemError(errno, "%s", + _("failed to wait for thread condition")); + return -1; + } + return 0; +} + + +int +virThreadCondWaitUntil(virMutexPtr mtx, + unsigned long long whenms) +{ + virThreadCondPtr cond; + + if (!(cond = virThreadCondGet())) + return -1; + + if (virCondWaitUntil(&cond->cond, mtx, whenms) < 0 && + errno != ETIMEDOUT) { + virReportSystemError(errno, "%s", + _("failed to wait for thread condition")); + return -1; + } + return 0; +} + + +static virThreadQueueItemPtr +virThreadQueueFind(virThreadQueuePtr queue, + virThreadCondPtr cond) +{ + virThreadQueueItemPtr item; + + for (item = queue->head; item; item = item->next) { + if (item->cond == cond) + return item; + } + + return NULL; +} + + +static int +virThreadQueueAdd(virThreadQueuePtr queue, + virThreadCondPtr cond) +{ + virThreadQueueItemPtr item; + + if (VIR_ALLOC(item) < 0) + return -1; + + virObjectRef(cond); + item->cond = cond; + + if (queue->tail) + queue->tail->next = item; + else + queue->head = item; + item->prev = queue->tail; + queue->tail = item; + + return 0; +} + + +static void +virThreadQueueRemove(virThreadQueuePtr queue, + virThreadQueueItemPtr item) +{ + if (item->prev) + item->prev->next = item->next; + else + queue->head = item->next; + + if (item->next) + item->next->prev = item->prev; + else + queue->tail = item->prev; + + virObjectUnref(item->cond); + VIR_FREE(item); +} + + +static void +virThreadQueueRemoveInvalid(virThreadQueuePtr queue) +{ + virThreadQueueItemPtr item; + + item = queue->head; + while (item) { + virThreadQueueItemPtr next = item->next; + + if (!item->cond->valid) + virThreadQueueRemove(queue, item); + item = next; + } +} + + +virThreadQueuePtr +virThreadQueueNew(void) +{ + virThreadQueuePtr queue; + + if (VIR_ALLOC(queue) < 0) + return NULL; + + return queue; +} + + +void +virThreadQueueFree(virThreadQueuePtr queue) +{ + if (!queue) + return; + + while (queue->head) + virThreadQueueRemove(queue, queue->head); + VIR_FREE(queue); +} + + +bool +virThreadQueueIsEmpty(virThreadQueuePtr queue) +{ + virThreadQueueRemoveInvalid(queue); + return !queue->head; +} + + +void +virThreadQueueSignal(virThreadQueuePtr queue) +{ + virThreadQueueRemoveInvalid(queue); + if (queue->head) + virCondSignal(&queue->head->cond->cond); +} + + +void +virThreadQueueBroadcast(virThreadQueuePtr queue) +{ + virThreadQueueItemPtr item; + + virThreadQueueRemoveInvalid(queue); + for (item = queue->head; item; item = item->next) + virCondSignal(&item->cond->cond); +} + + +int +virThreadQueueRegister(virThreadQueuePtr queue) +{ + virThreadCondPtr cond; + + if (!(cond = virThreadCondGet())) + return -1; + + if (virThreadQueueFind(queue, cond)) + return 0; + + return virThreadQueueAdd(queue, cond); +} + + +void +virThreadQueueUnregister(virThreadQueuePtr queue) +{ + virThreadCondPtr cond; + virThreadQueueItemPtr item; + + if (!(cond = virThreadCondGet()) || + !(item = virThreadQueueFind(queue, cond))) + return; + + virThreadQueueRemove(queue, item); +} diff --git a/src/util/virthreadqueue.h b/src/util/virthreadqueue.h new file mode 100644 index 0000000..6ae8d9e --- /dev/null +++ b/src/util/virthreadqueue.h @@ -0,0 +1,54 @@ +/* + * virthreadqueue.h: APIs for managing queues of threads waiting for several + * conditions + * + * Copyright (C) 2015 Red Hat, Inc. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library. If not, see + * <http://www.gnu.org/licenses/>. + * + * Author: Jiri Denemark <jdenemar@redhat.com> + */ + +#ifndef __VIR_THREAD_QUEUE_H__ +# define __VIR_THREAD_QUEUE_H__ + +# include "virthread.h" + + +typedef struct _virThreadCond virThreadCond; +typedef virThreadCond *virThreadCondPtr; + +virThreadCondPtr virThreadCondNew(void); +void virThreadCondInit(virThreadCondPtr cond); +void virThreadCondInvalidate(void); +int virThreadCondWait(virMutexPtr mtx) + ATTRIBUTE_RETURN_CHECK; +int virThreadCondWaitUntil(virMutexPtr mtx, + unsigned long long whenms) + ATTRIBUTE_RETURN_CHECK; + +typedef struct _virThreadQueue virThreadQueue; +typedef virThreadQueue *virThreadQueuePtr; + +virThreadQueuePtr virThreadQueueNew(void); +void virThreadQueueFree(virThreadQueuePtr queue); + +bool virThreadQueueIsEmpty(virThreadQueuePtr queue); +void virThreadQueueSignal(virThreadQueuePtr queue); +void virThreadQueueBroadcast(virThreadQueuePtr queue); +int virThreadQueueRegister(virThreadQueuePtr queue); +void virThreadQueueUnregister(virThreadQueuePtr queue); + +#endif -- 2.4.1

On Fri, May 22, 2015 at 00:42:34 +0200, Jiri Denemark wrote:
Our usage of pthread conditions does not allow a single thread to wait for several events from different sources. This is because the condition is bound to the source of the event. We can invert the usage by giving each thread its own condition and providing APIs for registering this thread condition with several sources. Each of the sources can then signal the thread condition.
Thread queues also support several threads to be registered with a single event source, which can either wakeup all waiting threads or just the first one.
Signed-off-by: Jiri Denemark <jdenemar@redhat.com> --- po/POTFILES.in | 1 + src/Makefile.am | 2 + src/libvirt_private.syms | 15 ++ src/util/virthreadqueue.c | 343 ++++++++++++++++++++++++++++++++++++++++++++++ src/util/virthreadqueue.h | 54 ++++++++ 5 files changed, 415 insertions(+) create mode 100644 src/util/virthreadqueue.c create mode 100644 src/util/virthreadqueue.h
ACK, I don't see anything wrong with this but let's see how it's used. Peter

On Fri, May 22, 2015 at 12:42:34AM +0200, Jiri Denemark wrote:
Our usage of pthread conditions does not allow a single thread to wait for several events from different sources. This is because the condition is bound to the source of the event. We can invert the usage by giving each thread its own condition and providing APIs for registering this thread condition with several sources. Each of the sources can then signal the thread condition.
Thread queues also support several threads to be registered with a single event source, which can either wakeup all waiting threads or just the first one.
Signed-off-by: Jiri Denemark <jdenemar@redhat.com>
I'm not really convinced this abstraction is neccessary / appropriate. I can see what you mean about the problems with migration having access to several different virCond's that it needs to wait on, but AFAICT, all the condition variables are ultimately associated with a single guest domain. So it seems the problem could have been solved much more simply by just having a single virCond in the qemuDomainObjPrivate struct. Moving to a centralized single condition var per thread feels like it is really breaking encapsulation of the APIs across the codebase. Regards, Daniel -- |: http://berrange.com -o- http://www.flickr.com/photos/dberrange/ :| |: http://libvirt.org -o- http://virt-manager.org :| |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :| |: http://entangle-photo.org -o- http://live.gnome.org/gtk-vnc :|

On Fri, May 22, 2015 at 13:09:17 +0100, Daniel P. Berrange wrote:
On Fri, May 22, 2015 at 12:42:34AM +0200, Jiri Denemark wrote:
Our usage of pthread conditions does not allow a single thread to wait for several events from different sources. This is because the condition is bound to the source of the event. We can invert the usage by giving each thread its own condition and providing APIs for registering this thread condition with several sources. Each of the sources can then signal the thread condition.
Thread queues also support several threads to be registered with a single event source, which can either wakeup all waiting threads or just the first one.
Signed-off-by: Jiri Denemark <jdenemar@redhat.com>
I'm not really convinced this abstraction is neccessary / appropriate. I can see what you mean about the problems with migration having access to several different virCond's that it needs to wait on, but AFAICT, all the condition variables are ultimately associated with a single guest domain. So it seems the problem could have been solved much more simply by just having a single virCond in the qemuDomainObjPrivate struct.
Moving to a centralized single condition var per thread feels like it is really breaking encapsulation of the APIs across the codebase.
Because we may want to wake up a thread which we know nothing about, that is, we have no idea what job (if any) or even on which domain it is doing. Currently, you can't gracefully kill libvirtd when it is, e.g., migrating a domain to another host. Apart from a bug which stops the main event loop first (which I already solved in another branch of mine), libvirtd would only stop once migration completes or is aborted manually. You have to force kill it if you don't want to wait. Using a thread local condition allows us to wake up any thread and ask it to finish the job asap, possibly canceling it. Jirka

On Fri, May 22, 2015 at 14:28:05 +0200, Jiri Denemark wrote:
On Fri, May 22, 2015 at 13:09:17 +0100, Daniel P. Berrange wrote:
On Fri, May 22, 2015 at 12:42:34AM +0200, Jiri Denemark wrote:
Our usage of pthread conditions does not allow a single thread to wait for several events from different sources. This is because the condition is bound to the source of the event. We can invert the usage by giving each thread its own condition and providing APIs for registering this thread condition with several sources. Each of the sources can then signal the thread condition.
Thread queues also support several threads to be registered with a single event source, which can either wakeup all waiting threads or just the first one.
Signed-off-by: Jiri Denemark <jdenemar@redhat.com>
I'm not really convinced this abstraction is neccessary / appropriate. I can see what you mean about the problems with migration having access to several different virCond's that it needs to wait on, but AFAICT, all the condition variables are ultimately associated with a single guest domain. So it seems the problem could have been solved much more simply by just having a single virCond in the qemuDomainObjPrivate struct.
Moving to a centralized single condition var per thread feels like it is really breaking encapsulation of the APIs across the codebase.
Because we may want to wake up a thread which we know nothing about, that is, we have no idea what job (if any) or even on which domain it is doing. Currently, you can't gracefully kill libvirtd when it is, e.g., migrating a domain to another host. Apart from a bug which stops the main event loop first (which I already solved in another branch of mine), libvirtd would only stop once migration completes or is aborted manually. You have to force kill it if you don't want to wait. Using a thread local condition allows us to wake up any thread and ask it to finish the job asap, possibly canceling it.
Actually, thinking about this a bit more, I could implement it with per-domain condition. Any thread working on a domain would just register the domain's condition with the thread's pool in case the pool wants to wake up its threads. I think this would even be a bit nicer than the approach I implemented. Although, I'd go with a condition stored directly in virDomainObj rather than inside qemuDomainObjPrivate since this is something all driver could (and should) use if they want to use conditions. Jirka

On Mon, May 25, 2015 at 04:04:37PM +0200, Jiri Denemark wrote:
On Fri, May 22, 2015 at 14:28:05 +0200, Jiri Denemark wrote:
On Fri, May 22, 2015 at 13:09:17 +0100, Daniel P. Berrange wrote:
On Fri, May 22, 2015 at 12:42:34AM +0200, Jiri Denemark wrote:
Our usage of pthread conditions does not allow a single thread to wait for several events from different sources. This is because the condition is bound to the source of the event. We can invert the usage by giving each thread its own condition and providing APIs for registering this thread condition with several sources. Each of the sources can then signal the thread condition.
Thread queues also support several threads to be registered with a single event source, which can either wakeup all waiting threads or just the first one.
Signed-off-by: Jiri Denemark <jdenemar@redhat.com>
I'm not really convinced this abstraction is neccessary / appropriate. I can see what you mean about the problems with migration having access to several different virCond's that it needs to wait on, but AFAICT, all the condition variables are ultimately associated with a single guest domain. So it seems the problem could have been solved much more simply by just having a single virCond in the qemuDomainObjPrivate struct.
Moving to a centralized single condition var per thread feels like it is really breaking encapsulation of the APIs across the codebase.
Because we may want to wake up a thread which we know nothing about, that is, we have no idea what job (if any) or even on which domain it is doing. Currently, you can't gracefully kill libvirtd when it is, e.g., migrating a domain to another host. Apart from a bug which stops the main event loop first (which I already solved in another branch of mine), libvirtd would only stop once migration completes or is aborted manually. You have to force kill it if you don't want to wait. Using a thread local condition allows us to wake up any thread and ask it to finish the job asap, possibly canceling it.
Actually, thinking about this a bit more, I could implement it with per-domain condition. Any thread working on a domain would just register the domain's condition with the thread's pool in case the pool wants to wake up its threads. I think this would even be a bit nicer than the approach I implemented. Although, I'd go with a condition stored directly in virDomainObj rather than inside qemuDomainObjPrivate since this is something all driver could (and should) use if they want to use conditions.
That sounds a bit more appealing to me. Regards, Daniel -- |: http://berrange.com -o- http://www.flickr.com/photos/dberrange/ :| |: http://libvirt.org -o- http://virt-manager.org :| |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :| |: http://entangle-photo.org -o- http://live.gnome.org/gtk-vnc :|

On 05/21/2015 06:42 PM, Jiri Denemark wrote:
Our usage of pthread conditions does not allow a single thread to wait for several events from different sources. This is because the condition is bound to the source of the event. We can invert the usage by giving each thread its own condition and providing APIs for registering this thread condition with several sources. Each of the sources can then signal the thread condition.
Thread queues also support several threads to be registered with a single event source, which can either wakeup all waiting threads or just the first one.
Signed-off-by: Jiri Denemark <jdenemar@redhat.com> --- po/POTFILES.in | 1 + src/Makefile.am | 2 + src/libvirt_private.syms | 15 ++ src/util/virthreadqueue.c | 343 ++++++++++++++++++++++++++++++++++++++++++++++ src/util/virthreadqueue.h | 54 ++++++++ 5 files changed, 415 insertions(+) create mode 100644 src/util/virthreadqueue.c create mode 100644 src/util/virthreadqueue.h
Ran the series through Coverity and came back with this gem
+ +void +virThreadQueueFree(virThreadQueuePtr queue) +{ + if (!queue) + return; + + while (queue->head) + virThreadQueueRemove(queue, queue->head);
(3) Event cond_true: Condition "queue->head", taking true branch (6) Event loop_begin: Jumped back to beginning of loop (7) Event cond_true: Condition "queue->head", taking true branch 283 while (queue->head) (4) Event freed_arg: "virThreadQueueRemove" frees "queue->head". [details] (5) Event loop: Jumping back to the beginning of the loop (8) Event deref_arg: Calling "virThreadQueueRemove" dereferences freed pointer "queue->head". [details] 284 virThreadQueueRemove(queue, queue->head);
+ VIR_FREE(queue); +} + +
Where the link to [details] has: 230 static void 231 virThreadQueueRemove(virThreadQueuePtr queue, 232 virThreadQueueItemPtr item) 233 { (1) Event cond_true: Condition "item->prev", taking true branch (1) Event deref_parm: Directly dereferencing parameter "item". Also see events: [freed_arg] 234 if (item->prev) (2) Event if_fallthrough: Falling through to end of if statement 235 item->prev->next = item->next; 236 else (3) Event if_end: End of if statement 237 queue->head = item->next; 238 (4) Event cond_true: Condition "item->next", taking true branch 239 if (item->next) (5) Event if_fallthrough: Falling through to end of if statement 240 item->next->prev = item->prev; 241 else (6) Event if_end: End of if statement 242 queue->tail = item->prev; 243 244 virObjectUnref(item->cond); (7) Event freed_arg: "virFree" frees parameter "item". [details] 245 VIR_FREE(item); 246 } 247 John

On Fri, May 22, 2015 at 10:16:26 -0400, John Ferlan wrote:
On 05/21/2015 06:42 PM, Jiri Denemark wrote:
Our usage of pthread conditions does not allow a single thread to wait for several events from different sources. This is because the condition is bound to the source of the event. We can invert the usage by giving each thread its own condition and providing APIs for registering this thread condition with several sources. Each of the sources can then signal the thread condition.
Thread queues also support several threads to be registered with a single event source, which can either wakeup all waiting threads or just the first one.
Signed-off-by: Jiri Denemark <jdenemar@redhat.com> --- po/POTFILES.in | 1 + src/Makefile.am | 2 + src/libvirt_private.syms | 15 ++ src/util/virthreadqueue.c | 343 ++++++++++++++++++++++++++++++++++++++++++++++ src/util/virthreadqueue.h | 54 ++++++++ 5 files changed, 415 insertions(+) create mode 100644 src/util/virthreadqueue.c create mode 100644 src/util/virthreadqueue.h
Ran the series through Coverity and came back with this gem
+ +void +virThreadQueueFree(virThreadQueuePtr queue) +{ + if (!queue) + return; + + while (queue->head) + virThreadQueueRemove(queue, queue->head);
(3) Event cond_true: Condition "queue->head", taking true branch (6) Event loop_begin: Jumped back to beginning of loop (7) Event cond_true: Condition "queue->head", taking true branch
283 while (queue->head)
(4) Event freed_arg: "virThreadQueueRemove" frees "queue->head". [details] (5) Event loop: Jumping back to the beginning of the loop (8) Event deref_arg: Calling "virThreadQueueRemove" dereferences freed pointer "queue->head". [details]
284 virThreadQueueRemove(queue, queue->head);
False positive. If virThreadQueueRemove frees queue->head, it also updates it with queue->head->next. Jirka

On 05/22/2015 10:31 AM, Jiri Denemark wrote:
On Fri, May 22, 2015 at 10:16:26 -0400, John Ferlan wrote:
On 05/21/2015 06:42 PM, Jiri Denemark wrote:
Our usage of pthread conditions does not allow a single thread to wait for several events from different sources. This is because the condition is bound to the source of the event. We can invert the usage by giving each thread its own condition and providing APIs for registering this thread condition with several sources. Each of the sources can then signal the thread condition.
Thread queues also support several threads to be registered with a single event source, which can either wakeup all waiting threads or just the first one.
Signed-off-by: Jiri Denemark <jdenemar@redhat.com> --- po/POTFILES.in | 1 + src/Makefile.am | 2 + src/libvirt_private.syms | 15 ++ src/util/virthreadqueue.c | 343 ++++++++++++++++++++++++++++++++++++++++++++++ src/util/virthreadqueue.h | 54 ++++++++ 5 files changed, 415 insertions(+) create mode 100644 src/util/virthreadqueue.c create mode 100644 src/util/virthreadqueue.h
Ran the series through Coverity and came back with this gem
+ +void +virThreadQueueFree(virThreadQueuePtr queue) +{ + if (!queue) + return; + + while (queue->head) + virThreadQueueRemove(queue, queue->head);
(3) Event cond_true: Condition "queue->head", taking true branch (6) Event loop_begin: Jumped back to beginning of loop (7) Event cond_true: Condition "queue->head", taking true branch
283 while (queue->head)
(4) Event freed_arg: "virThreadQueueRemove" frees "queue->head". [details] (5) Event loop: Jumping back to the beginning of the loop (8) Event deref_arg: Calling "virThreadQueueRemove" dereferences freed pointer "queue->head". [details]
284 virThreadQueueRemove(queue, queue->head);
False positive. If virThreadQueueRemove frees queue->head, it also updates it with queue->head->next.
I understand and looked at the code, but I think this is a case where pass by reference and pass by value makes a difference. Upon return what causes queue->head to be evaluated again? The call passes queue->head by value and removes it from queue. Upon return nothing causes queue->head to be evaluated again thus wouldn't we enter the loop the second time with the same address? John

On Fri, May 22, 2015 at 13:48:16 -0400, John Ferlan wrote:
On 05/22/2015 10:31 AM, Jiri Denemark wrote:
On Fri, May 22, 2015 at 10:16:26 -0400, John Ferlan wrote:
On 05/21/2015 06:42 PM, Jiri Denemark wrote:
Our usage of pthread conditions does not allow a single thread to wait for several events from different sources. This is because the condition is bound to the source of the event. We can invert the usage by giving each thread its own condition and providing APIs for registering this thread condition with several sources. Each of the sources can then signal the thread condition.
Thread queues also support several threads to be registered with a single event source, which can either wakeup all waiting threads or just the first one.
Signed-off-by: Jiri Denemark <jdenemar@redhat.com> --- po/POTFILES.in | 1 + src/Makefile.am | 2 + src/libvirt_private.syms | 15 ++ src/util/virthreadqueue.c | 343 ++++++++++++++++++++++++++++++++++++++++++++++ src/util/virthreadqueue.h | 54 ++++++++ 5 files changed, 415 insertions(+) create mode 100644 src/util/virthreadqueue.c create mode 100644 src/util/virthreadqueue.h
Ran the series through Coverity and came back with this gem
+ +void +virThreadQueueFree(virThreadQueuePtr queue) +{ + if (!queue) + return; + + while (queue->head) + virThreadQueueRemove(queue, queue->head);
(3) Event cond_true: Condition "queue->head", taking true branch (6) Event loop_begin: Jumped back to beginning of loop (7) Event cond_true: Condition "queue->head", taking true branch
283 while (queue->head)
(4) Event freed_arg: "virThreadQueueRemove" frees "queue->head". [details] (5) Event loop: Jumping back to the beginning of the loop (8) Event deref_arg: Calling "virThreadQueueRemove" dereferences freed pointer "queue->head". [details]
284 virThreadQueueRemove(queue, queue->head);
False positive. If virThreadQueueRemove frees queue->head, it also updates it with queue->head->next.
I understand and looked at the code, but I think this is a case where pass by reference and pass by value makes a difference. Upon return what causes queue->head to be evaluated again? The call passes queue->head by value and removes it from queue. Upon return nothing causes queue->head to be evaluated again thus wouldn't we enter the loop the second time with the same address?
This would be a serious bug in the compiler. The function also gets the queue pointer, which means the compiler should not consider queue->head was unchanged. Jirka

So that any code can call virThreadQueueRegister whenever it needs to wait for some event. The thread condition will be automatically invalidated (and thus ignored by virThreadQueue{Signal,Broadcast}) whenever its thread exits to avoid deadlocks or crashes. Signed-off-by: Jiri Denemark <jdenemar@redhat.com> --- src/util/virthread.c | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/src/util/virthread.c b/src/util/virthread.c index 6c49515..8e2e230 100644 --- a/src/util/virthread.c +++ b/src/util/virthread.c @@ -30,7 +30,9 @@ #endif #include "viralloc.h" +#include "virobject.h" #include "virthreadjob.h" +#include "virthreadqueue.h" /* Nothing special required for pthreads */ @@ -187,6 +189,7 @@ struct virThreadArgs { virThreadFunc func; const char *funcName; bool worker; + virThreadCondPtr cond; void *opaque; }; @@ -198,6 +201,8 @@ static void *virThreadHelper(void *data) /* Free args early, rather than tying it up during the entire thread. */ VIR_FREE(args); + virThreadCondInit(local.cond); + if (local.worker) virThreadJobSetWorker(local.funcName); else @@ -208,6 +213,8 @@ static void *virThreadHelper(void *data) if (!local.worker) virThreadJobClear(0); + virThreadCondInvalidate(); + return NULL; } @@ -218,7 +225,7 @@ int virThreadCreateFull(virThreadPtr thread, bool worker, void *opaque) { - struct virThreadArgs *args; + struct virThreadArgs *args = NULL; pthread_attr_t attr; int ret = -1; int err; @@ -234,22 +241,28 @@ int virThreadCreateFull(virThreadPtr thread, args->funcName = funcName; args->worker = worker; args->opaque = opaque; + if (!(args->cond = virThreadCondNew())) + goto cleanup; if (!joinable) pthread_attr_setdetachstate(&attr, 1); err = pthread_create(&thread->thread, &attr, virThreadHelper, args); - if (err != 0) { - VIR_FREE(args); + if (err != 0) goto cleanup; - } /* New thread owns 'args' in success case, so don't free */ + args = NULL; ret = 0; + cleanup: pthread_attr_destroy(&attr); if (ret < 0) errno = err; + if (args) { + virObjectUnref(args->cond); + VIR_FREE(args); + } return ret; } -- 2.4.1

On Fri, May 22, 2015 at 00:42:35 +0200, Jiri Denemark wrote:
So that any code can call virThreadQueueRegister whenever it needs to wait for some event. The thread condition will be automatically invalidated (and thus ignored by virThreadQueue{Signal,Broadcast}) whenever its thread exits to avoid deadlocks or crashes.
Signed-off-by: Jiri Denemark <jdenemar@redhat.com> --- src/util/virthread.c | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-)
ACK, Peter

The wrapper is useful for calling qemuBlockJobEventProcess with the event details stored in disk's privateData, which is the most likely usage of qemuBlockJobEventProcess. Signed-off-by: Jiri Denemark <jdenemar@redhat.com> --- src/libvirt_private.syms | 2 ++ src/qemu/qemu_blockjob.c | 37 +++++++++++++++++++++++++++++-------- src/qemu/qemu_blockjob.h | 3 +++ 3 files changed, 34 insertions(+), 8 deletions(-) diff --git a/src/libvirt_private.syms b/src/libvirt_private.syms index cb41d5c..7279cdf 100644 --- a/src/libvirt_private.syms +++ b/src/libvirt_private.syms @@ -265,6 +265,8 @@ virDomainDiskInsert; virDomainDiskInsertPreAlloced; virDomainDiskIoTypeFromString; virDomainDiskIoTypeToString; +virDomainDiskMirrorStateTypeFromString; +virDomainDiskMirrorStateTypeToString; virDomainDiskPathByName; virDomainDiskRemove; virDomainDiskRemoveByName; diff --git a/src/qemu/qemu_blockjob.c b/src/qemu/qemu_blockjob.c index 098a43a..605c2a5 100644 --- a/src/qemu/qemu_blockjob.c +++ b/src/qemu/qemu_blockjob.c @@ -38,6 +38,27 @@ VIR_LOG_INIT("qemu.qemu_blockjob"); + +int +qemuBlockJobUpdate(virQEMUDriverPtr driver, + virDomainObjPtr vm, + virDomainDiskDefPtr disk) +{ + qemuDomainDiskPrivatePtr diskPriv = QEMU_DOMAIN_DISK_PRIVATE(disk); + int ret; + + if ((ret = diskPriv->blockJobStatus) == -1) + return -1; + + qemuBlockJobEventProcess(driver, vm, disk, + diskPriv->blockJobType, + diskPriv->blockJobStatus); + diskPriv->blockJobStatus = -1; + + return ret; +} + + /** * qemuBlockJobEventProcess: * @driver: qemu driver @@ -49,8 +70,6 @@ VIR_LOG_INIT("qemu.qemu_blockjob"); * Update disk's mirror state in response to a block job event * from QEMU. For mirror state's that must survive libvirt * restart, also update the domain's status XML. - * - * Returns 0 on success, -1 otherwise. */ void qemuBlockJobEventProcess(virQEMUDriverPtr driver, @@ -67,6 +86,12 @@ qemuBlockJobEventProcess(virQEMUDriverPtr driver, bool save = false; qemuDomainDiskPrivatePtr diskPriv = QEMU_DOMAIN_DISK_PRIVATE(disk); + VIR_DEBUG("disk=%s, mirrorState=%s, type=%d, status=%d", + disk->dst, + NULLSTR(virDomainDiskMirrorStateTypeToString(disk->mirrorState)), + type, + status); + /* Have to generate two variants of the event for old vs. new * client callbacks */ if (type == VIR_DOMAIN_BLOCK_JOB_TYPE_COMMIT && @@ -218,9 +243,7 @@ qemuBlockJobSyncEnd(virQEMUDriverPtr driver, if (diskPriv->blockJobSync && diskPriv->blockJobStatus != -1) { if (ret_status) *ret_status = diskPriv->blockJobStatus; - qemuBlockJobEventProcess(driver, vm, disk, - diskPriv->blockJobType, - diskPriv->blockJobStatus); + qemuBlockJobUpdate(driver, vm, disk); diskPriv->blockJobStatus = -1; } diskPriv->blockJobSync = false; @@ -300,9 +323,7 @@ qemuBlockJobSyncWaitWithTimeout(virQEMUDriverPtr driver, if (ret_status) *ret_status = diskPriv->blockJobStatus; - qemuBlockJobEventProcess(driver, vm, disk, - diskPriv->blockJobType, - diskPriv->blockJobStatus); + qemuBlockJobUpdate(driver, vm, disk); diskPriv->blockJobStatus = -1; return 0; diff --git a/src/qemu/qemu_blockjob.h b/src/qemu/qemu_blockjob.h index ba372a2..81e893e 100644 --- a/src/qemu/qemu_blockjob.h +++ b/src/qemu/qemu_blockjob.h @@ -25,6 +25,9 @@ # include "internal.h" # include "qemu_conf.h" +int qemuBlockJobUpdate(virQEMUDriverPtr driver, + virDomainObjPtr vm, + virDomainDiskDefPtr disk); void qemuBlockJobEventProcess(virQEMUDriverPtr driver, virDomainObjPtr vm, virDomainDiskDefPtr disk, -- 2.4.1

On Fri, May 22, 2015 at 00:42:36 +0200, Jiri Denemark wrote:
The wrapper is useful for calling qemuBlockJobEventProcess with the event details stored in disk's privateData, which is the most likely usage of qemuBlockJobEventProcess.
Signed-off-by: Jiri Denemark <jdenemar@redhat.com> --- src/libvirt_private.syms | 2 ++ src/qemu/qemu_blockjob.c | 37 +++++++++++++++++++++++++++++-------- src/qemu/qemu_blockjob.h | 3 +++ 3 files changed, 34 insertions(+), 8 deletions(-)
ACK, Peter

Because we are polling we may detect some errors after we asked QEMU for migration status even though they occurred before. If this happens and QEMU reports migration completed successfully, we would happily report the migration succeeded even though we should have cancelled it because of the other error. In practise it is not a big issue now but it will become a much bigger issue once the check for storage migration status is moved inside the loop in qemuMigrationWaitForCompletion. Signed-off-by: Jiri Denemark <jdenemar@redhat.com> --- src/qemu/qemu_migration.c | 31 ++++++++++++++----------------- 1 file changed, 14 insertions(+), 17 deletions(-) diff --git a/src/qemu/qemu_migration.c b/src/qemu/qemu_migration.c index f7432e8..b2c4559 100644 --- a/src/qemu/qemu_migration.c +++ b/src/qemu/qemu_migration.c @@ -2459,7 +2459,7 @@ qemuMigrationWaitForCompletion(virQEMUDriverPtr driver, jobInfo->type = VIR_DOMAIN_JOB_UNBOUNDED; - while (1) { + while (jobInfo->type == VIR_DOMAIN_JOB_UNBOUNDED) { /* Poll every 50ms for progress & to allow cancellation */ struct timespec ts = { .tv_sec = 0, .tv_nsec = 50 * 1000 * 1000ull }; @@ -2481,31 +2481,28 @@ qemuMigrationWaitForCompletion(virQEMUDriverPtr driver, break; } - if (jobInfo->type != VIR_DOMAIN_JOB_UNBOUNDED) - break; - - virObjectUnlock(vm); - - nanosleep(&ts, NULL); - - virObjectLock(vm); + if (jobInfo->type == VIR_DOMAIN_JOB_UNBOUNDED) { + virObjectUnlock(vm); + nanosleep(&ts, NULL); + virObjectLock(vm); + } } - if (jobInfo->type == VIR_DOMAIN_JOB_COMPLETED) { - qemuDomainJobInfoUpdateDowntime(jobInfo); - VIR_FREE(priv->job.completed); - if (VIR_ALLOC(priv->job.completed) == 0) - *priv->job.completed = *jobInfo; - return 0; - } else if (jobInfo->type == VIR_DOMAIN_JOB_UNBOUNDED) { + if (jobInfo->type == VIR_DOMAIN_JOB_UNBOUNDED) { /* The migration was aborted by us rather than QEMU itself so let's * update the job type and notify the caller to send migrate_cancel. */ jobInfo->type = VIR_DOMAIN_JOB_FAILED; return -2; - } else { + } else if (jobInfo->type != VIR_DOMAIN_JOB_COMPLETED) { return -1; } + + qemuDomainJobInfoUpdateDowntime(jobInfo); + VIR_FREE(priv->job.completed); + if (VIR_ALLOC(priv->job.completed) == 0) + *priv->job.completed = *jobInfo; + return 0; } -- 2.4.1

On Fri, May 22, 2015 at 00:42:37 +0200, Jiri Denemark wrote:
Because we are polling we may detect some errors after we asked QEMU for migration status even though they occurred before. If this happens and QEMU reports migration completed successfully, we would happily report the migration succeeded even though we should have cancelled it because of the other error.
In practise it is not a big issue now but it will become a much bigger issue once the check for storage migration status is moved inside the loop in qemuMigrationWaitForCompletion.
Signed-off-by: Jiri Denemark <jdenemar@redhat.com> --- src/qemu/qemu_migration.c | 31 ++++++++++++++----------------- 1 file changed, 14 insertions(+), 17 deletions(-)
ACK, Peter

By switching block jobs to use thread queues and thread conditions, we can drop some pretty complicated code in NBD storage migration. Moreover, we are getting ready for migration events (to replace our 50ms polling on query-migrate). The ultimate goal is to have a single loop waiting (using virThreadCondWait) for any migration related event (changed status of a migration, disk mirror events, internal abort requests, ...). This patch makes NBD storage migration ready for this: first we call a QMP command to start or cancel drive mirror on all disks we are interested in and then we wait for a single condition which is signaled on any event related to any of the mirrors. Signed-off-by: Jiri Denemark <jdenemar@redhat.com> --- po/POTFILES.in | 1 - src/qemu/qemu_blockjob.c | 139 ++------------------- src/qemu/qemu_blockjob.h | 12 +- src/qemu/qemu_domain.c | 5 +- src/qemu/qemu_domain.h | 4 +- src/qemu/qemu_driver.c | 24 ++-- src/qemu/qemu_migration.c | 299 ++++++++++++++++++++++++++-------------------- src/qemu/qemu_process.c | 9 +- 8 files changed, 201 insertions(+), 292 deletions(-) diff --git a/po/POTFILES.in b/po/POTFILES.in index edfd1cc..9108ccf 100644 --- a/po/POTFILES.in +++ b/po/POTFILES.in @@ -112,7 +112,6 @@ src/parallels/parallels_utils.h src/parallels/parallels_storage.c src/phyp/phyp_driver.c src/qemu/qemu_agent.c -src/qemu/qemu_blockjob.c src/qemu/qemu_capabilities.c src/qemu/qemu_cgroup.c src/qemu/qemu_command.c diff --git a/src/qemu/qemu_blockjob.c b/src/qemu/qemu_blockjob.c index 605c2a5..e97f449 100644 --- a/src/qemu/qemu_blockjob.c +++ b/src/qemu/qemu_blockjob.c @@ -204,19 +204,17 @@ qemuBlockJobEventProcess(virQEMUDriverPtr driver, * * During a synchronous block job, a block job event for @disk * will not be processed asynchronously. Instead, it will be - * processed only when qemuBlockJobSyncWait* or - * qemuBlockJobSyncEnd is called. + * processed only when qemuBlockJobUpdate or qemuBlockJobSyncEnd + * is called. */ void qemuBlockJobSyncBegin(virDomainDiskDefPtr disk) { qemuDomainDiskPrivatePtr diskPriv = QEMU_DOMAIN_DISK_PRIVATE(disk); - if (diskPriv->blockJobSync) - VIR_WARN("Disk %s already has synchronous block job", - disk->dst); - - diskPriv->blockJobSync = true; + VIR_DEBUG("disk=%s", disk->dst); + virThreadQueueRegister(diskPriv->blockJobQueue); + diskPriv->blockJobStatus = -1; } @@ -225,135 +223,16 @@ qemuBlockJobSyncBegin(virDomainDiskDefPtr disk) * @driver: qemu driver * @vm: domain * @disk: domain disk - * @ret_status: pointer to virConnectDomainEventBlockJobStatus * * End a synchronous block job for @disk. Any pending block job event - * for the disk is processed, and its status is recorded in the - * virConnectDomainEventBlockJobStatus field pointed to by - * @ret_status. + * for the disk is processed. */ void qemuBlockJobSyncEnd(virQEMUDriverPtr driver, virDomainObjPtr vm, - virDomainDiskDefPtr disk, - virConnectDomainEventBlockJobStatus *ret_status) + virDomainDiskDefPtr disk) { - qemuDomainDiskPrivatePtr diskPriv = QEMU_DOMAIN_DISK_PRIVATE(disk); - - if (diskPriv->blockJobSync && diskPriv->blockJobStatus != -1) { - if (ret_status) - *ret_status = diskPriv->blockJobStatus; - qemuBlockJobUpdate(driver, vm, disk); - diskPriv->blockJobStatus = -1; - } - diskPriv->blockJobSync = false; -} - - -/** - * qemuBlockJobSyncWaitWithTimeout: - * @driver: qemu driver - * @vm: domain - * @disk: domain disk - * @timeout: timeout in milliseconds - * @ret_status: pointer to virConnectDomainEventBlockJobStatus - * - * Wait up to @timeout milliseconds for a block job event for @disk. - * If an event is received it is processed, and its status is recorded - * in the virConnectDomainEventBlockJobStatus field pointed to by - * @ret_status. - * - * If @timeout is not 0, @vm will be unlocked while waiting for the event. - * - * Returns 0 if an event was received or the timeout expired, - * -1 otherwise. - */ -int -qemuBlockJobSyncWaitWithTimeout(virQEMUDriverPtr driver, - virDomainObjPtr vm, - virDomainDiskDefPtr disk, - unsigned long long timeout, - virConnectDomainEventBlockJobStatus *ret_status) -{ - qemuDomainDiskPrivatePtr diskPriv = QEMU_DOMAIN_DISK_PRIVATE(disk); - - if (!diskPriv->blockJobSync) { - virReportError(VIR_ERR_INTERNAL_ERROR, "%s", - _("No current synchronous block job")); - return -1; - } - - while (diskPriv->blockJobSync && diskPriv->blockJobStatus == -1) { - int r; - - if (!virDomainObjIsActive(vm)) { - virReportError(VIR_ERR_INTERNAL_ERROR, "%s", - _("guest unexpectedly quit")); - diskPriv->blockJobSync = false; - return -1; - } - - if (timeout == (unsigned long long)-1) { - r = virCondWait(&diskPriv->blockJobSyncCond, &vm->parent.lock); - } else if (timeout) { - unsigned long long now; - if (virTimeMillisNow(&now) < 0) { - virReportSystemError(errno, "%s", - _("Unable to get current time")); - return -1; - } - r = virCondWaitUntil(&diskPriv->blockJobSyncCond, - &vm->parent.lock, - now + timeout); - if (r < 0 && errno == ETIMEDOUT) - return 0; - } else { - errno = ETIMEDOUT; - return 0; - } - - if (r < 0) { - diskPriv->blockJobSync = false; - virReportSystemError(errno, "%s", - _("Unable to wait on block job sync " - "condition")); - return -1; - } - } - - if (ret_status) - *ret_status = diskPriv->blockJobStatus; + VIR_DEBUG("disk=%s", disk->dst); qemuBlockJobUpdate(driver, vm, disk); - diskPriv->blockJobStatus = -1; - - return 0; -} - - -/** - * qemuBlockJobSyncWait: - * @driver: qemu driver - * @vm: domain - * @disk: domain disk - * @ret_status: pointer to virConnectDomainEventBlockJobStatus - * - * Wait for a block job event for @disk. If an event is received it - * is processed, and its status is recorded in the - * virConnectDomainEventBlockJobStatus field pointed to by - * @ret_status. - * - * @vm will be unlocked while waiting for the event. - * - * Returns 0 if an event was received, - * -1 otherwise. - */ -int -qemuBlockJobSyncWait(virQEMUDriverPtr driver, - virDomainObjPtr vm, - virDomainDiskDefPtr disk, - virConnectDomainEventBlockJobStatus *ret_status) -{ - return qemuBlockJobSyncWaitWithTimeout(driver, vm, disk, - (unsigned long long)-1, - ret_status); + virThreadQueueUnregister(QEMU_DOMAIN_DISK_PRIVATE(disk)->blockJobQueue); } diff --git a/src/qemu/qemu_blockjob.h b/src/qemu/qemu_blockjob.h index 81e893e..775ce95 100644 --- a/src/qemu/qemu_blockjob.h +++ b/src/qemu/qemu_blockjob.h @@ -37,16 +37,6 @@ void qemuBlockJobEventProcess(virQEMUDriverPtr driver, void qemuBlockJobSyncBegin(virDomainDiskDefPtr disk); void qemuBlockJobSyncEnd(virQEMUDriverPtr driver, virDomainObjPtr vm, - virDomainDiskDefPtr disk, - virConnectDomainEventBlockJobStatus *ret_status); -int qemuBlockJobSyncWaitWithTimeout(virQEMUDriverPtr driver, - virDomainObjPtr vm, - virDomainDiskDefPtr disk, - unsigned long long timeout, - virConnectDomainEventBlockJobStatus *ret_status); -int qemuBlockJobSyncWait(virQEMUDriverPtr driver, - virDomainObjPtr vm, - virDomainDiskDefPtr disk, - virConnectDomainEventBlockJobStatus *ret_status); + virDomainDiskDefPtr disk); #endif /* __QEMU_BLOCKJOB_H__ */ diff --git a/src/qemu/qemu_domain.c b/src/qemu/qemu_domain.c index db8554b..c25d5a5 100644 --- a/src/qemu/qemu_domain.c +++ b/src/qemu/qemu_domain.c @@ -441,8 +441,7 @@ qemuDomainDiskPrivateNew(void) if (!(priv = virObjectNew(qemuDomainDiskPrivateClass))) return NULL; - if (virCondInit(&priv->blockJobSyncCond) < 0) { - virReportSystemError(errno, "%s", _("Failed to initialize condition")); + if (!(priv->blockJobQueue = virThreadQueueNew())) { virObjectUnref(priv); return NULL; } @@ -455,7 +454,7 @@ qemuDomainDiskPrivateDispose(void *obj) { qemuDomainDiskPrivatePtr priv = obj; - virCondDestroy(&priv->blockJobSyncCond); + virThreadQueueFree(priv->blockJobQueue); } diff --git a/src/qemu/qemu_domain.h b/src/qemu/qemu_domain.h index a6df199..2117a3d 100644 --- a/src/qemu/qemu_domain.h +++ b/src/qemu/qemu_domain.h @@ -35,6 +35,7 @@ # include "qemu_capabilities.h" # include "virchrdev.h" # include "virobject.h" +# include "virthreadqueue.h" # define QEMU_DOMAIN_FORMAT_LIVE_FLAGS \ (VIR_DOMAIN_XML_SECURE | \ @@ -214,10 +215,9 @@ struct _qemuDomainDiskPrivate { bool blockjob; /* for some synchronous block jobs, we need to notify the owner */ - virCond blockJobSyncCond; + virThreadQueuePtr blockJobQueue; int blockJobType; /* type of the block job from the event */ int blockJobStatus; /* status of the finished block job */ - bool blockJobSync; /* the block job needs synchronized termination */ bool migrating; /* the disk is being migrated */ }; diff --git a/src/qemu/qemu_driver.c b/src/qemu/qemu_driver.c index aa0acde..bce631f 100644 --- a/src/qemu/qemu_driver.c +++ b/src/qemu/qemu_driver.c @@ -16679,10 +16679,8 @@ qemuDomainBlockJobAbort(virDomainPtr dom, goto endjob; } - if (modern && !async) { - /* prepare state for event delivery */ + if (modern && !async) qemuBlockJobSyncBegin(disk); - } if (pivot) { if ((ret = qemuDomainBlockPivot(driver, vm, device, disk)) < 0) @@ -16730,21 +16728,21 @@ qemuDomainBlockJobAbort(virDomainPtr dom, VIR_DOMAIN_BLOCK_JOB_TYPE_PULL, VIR_DOMAIN_BLOCK_JOB_CANCELED); } else { - virConnectDomainEventBlockJobStatus status = -1; - if (qemuBlockJobSyncWait(driver, vm, disk, &status) < 0) { - ret = -1; - } else if (status == VIR_DOMAIN_BLOCK_JOB_FAILED) { - virReportError(VIR_ERR_OPERATION_FAILED, - _("failed to terminate block job on disk '%s'"), - disk->dst); - ret = -1; + qemuDomainDiskPrivatePtr diskPriv = QEMU_DOMAIN_DISK_PRIVATE(disk); + qemuBlockJobUpdate(driver, vm, disk); + while (diskPriv->blockjob) { + if (virThreadCondWait(&vm->parent.lock) < 0) { + ret = -1; + goto endjob; + } + qemuBlockJobUpdate(driver, vm, disk); } } } endjob: - if (disk && QEMU_DOMAIN_DISK_PRIVATE(disk)->blockJobSync) - qemuBlockJobSyncEnd(driver, vm, disk, NULL); + if (disk) + qemuBlockJobSyncEnd(driver, vm, disk); qemuDomainObjEndJob(driver, vm); cleanup: diff --git a/src/qemu/qemu_migration.c b/src/qemu/qemu_migration.c index b2c4559..f866ac5 100644 --- a/src/qemu/qemu_migration.c +++ b/src/qemu/qemu_migration.c @@ -1720,7 +1720,7 @@ qemuMigrationStopNBDServer(virQEMUDriverPtr driver, /** - * qemuMigrationCheckDriveMirror: + * qemuMigrationDriveMirrorReady: * @driver: qemu driver * @vm: domain * @@ -1733,111 +1733,148 @@ qemuMigrationStopNBDServer(virQEMUDriverPtr driver, * -1 on error. */ static int -qemuMigrationCheckDriveMirror(virQEMUDriverPtr driver, +qemuMigrationDriveMirrorReady(virQEMUDriverPtr driver, virDomainObjPtr vm) { size_t i; - int ret = 1; + size_t notReady = 0; + int status; for (i = 0; i < vm->def->ndisks; i++) { virDomainDiskDefPtr disk = vm->def->disks[i]; qemuDomainDiskPrivatePtr diskPriv = QEMU_DOMAIN_DISK_PRIVATE(disk); - if (!diskPriv->migrating || !diskPriv->blockJobSync) + if (!diskPriv->migrating) continue; - /* process any pending event */ - if (qemuBlockJobSyncWaitWithTimeout(driver, vm, disk, - 0ull, NULL) < 0) - return -1; - - switch (disk->mirrorState) { - case VIR_DOMAIN_DISK_MIRROR_STATE_NONE: - ret = 0; - break; - case VIR_DOMAIN_DISK_MIRROR_STATE_ABORT: + status = qemuBlockJobUpdate(driver, vm, disk); + if (status == VIR_DOMAIN_BLOCK_JOB_FAILED) { virReportError(VIR_ERR_OPERATION_FAILED, _("migration of disk %s failed"), disk->dst); return -1; } + + if (disk->mirrorState != VIR_DOMAIN_DISK_MIRROR_STATE_READY) + notReady++; } - return ret; + if (notReady) { + VIR_DEBUG("Waiting for %zu disk mirrors to get ready", notReady); + return 0; + } else { + VIR_DEBUG("All disk mirrors are ready"); + return 1; + } } -/** - * qemuMigrationCancelOneDriveMirror: - * @driver: qemu driver - * @vm: domain +/* + * If @failed is not NULL, the function will report an error and set @failed + * to true in case a block job fails. This way we can properly abort migration + * in case some block job failed once all memory has already been transferred. * - * Cancel all drive-mirrors started by qemuMigrationDriveMirror. - * Any pending block job events for the mirrored disks will be - * processed. - * - * Returns 0 on success, -1 otherwise. + * Returns 1 if all mirrors are gone, + * 0 if some mirrors are still active, + * -1 on error. */ static int -qemuMigrationCancelOneDriveMirror(virQEMUDriverPtr driver, +qemuMigrationDriveMirrorCancelled(virQEMUDriverPtr driver, virDomainObjPtr vm, - virDomainDiskDefPtr disk) + bool *failed) { - qemuDomainObjPrivatePtr priv = vm->privateData; - char *diskAlias = NULL; - int ret = -1; + size_t i; + size_t active = 0; + int status; - /* No need to cancel if mirror already aborted */ - if (disk->mirrorState == VIR_DOMAIN_DISK_MIRROR_STATE_ABORT) { - ret = 0; - } else { - virConnectDomainEventBlockJobStatus status = -1; + for (i = 0; i < vm->def->ndisks; i++) { + virDomainDiskDefPtr disk = vm->def->disks[i]; + qemuDomainDiskPrivatePtr diskPriv = QEMU_DOMAIN_DISK_PRIVATE(disk); - if (virAsprintf(&diskAlias, "%s%s", - QEMU_DRIVE_HOST_PREFIX, disk->info.alias) < 0) - goto cleanup; + if (!diskPriv->migrating) + continue; - if (qemuDomainObjEnterMonitorAsync(driver, vm, - QEMU_ASYNC_JOB_MIGRATION_OUT) < 0) - goto endjob; - ret = qemuMonitorBlockJobCancel(priv->mon, diskAlias, true); - if (qemuDomainObjExitMonitor(driver, vm) < 0) - goto endjob; - - if (ret < 0) { - virDomainBlockJobInfo info; - - /* block-job-cancel can fail if QEMU simultaneously - * aborted the job; probe for it again to detect this */ - if (qemuMonitorBlockJobInfo(priv->mon, diskAlias, - &info, NULL) == 0) { - ret = 0; - } else { + status = qemuBlockJobUpdate(driver, vm, disk); + switch (status) { + case VIR_DOMAIN_BLOCK_JOB_FAILED: + if (failed) { virReportError(VIR_ERR_OPERATION_FAILED, - _("could not cancel migration of disk %s"), + _("migration of disk %s failed"), disk->dst); + *failed = true; } + /* fallthrough */ + case VIR_DOMAIN_BLOCK_JOB_CANCELED: + case VIR_DOMAIN_BLOCK_JOB_COMPLETED: + qemuBlockJobSyncEnd(driver, vm, disk); + diskPriv->migrating = false; + break; - goto endjob; + default: + active++; } + } - /* Mirror may become ready before cancellation takes - * effect; loop if we get that event first */ - do { - ret = qemuBlockJobSyncWait(driver, vm, disk, &status); - if (ret < 0) { - VIR_WARN("Unable to wait for block job on %s to cancel", - diskAlias); - goto endjob; - } - } while (status == VIR_DOMAIN_BLOCK_JOB_READY); + if (active) { + VIR_DEBUG("Waiting for %zu disk mirrors to finish", active); + return 0; + } else { + if (failed && *failed) + VIR_DEBUG("All disk mirrors are gone; some of them failed"); + else + VIR_DEBUG("All disk mirrors are gone"); + return 1; } +} + + +/* + * Returns 0 on success, + * 1 when job is already completed or it failed and failNoJob is false, + * -1 on error or when job failed and failNoJob is true. + */ +static int +qemuMigrationCancelOneDriveMirror(virQEMUDriverPtr driver, + virDomainObjPtr vm, + virDomainDiskDefPtr disk, + bool failNoJob) +{ + qemuDomainObjPrivatePtr priv = vm->privateData; + char *diskAlias = NULL; + int ret = -1; + int status; + int rv; + + status = qemuBlockJobUpdate(driver, vm, disk); + switch (status) { + case VIR_DOMAIN_BLOCK_JOB_FAILED: + case VIR_DOMAIN_BLOCK_JOB_CANCELED: + if (failNoJob) { + virReportError(VIR_ERR_OPERATION_FAILED, + _("migration of disk %s failed"), + disk->dst); + return -1; + } + return 1; + + case VIR_DOMAIN_BLOCK_JOB_COMPLETED: + return 1; + } + + if (virAsprintf(&diskAlias, "%s%s", + QEMU_DRIVE_HOST_PREFIX, disk->info.alias) < 0) + return -1; + + if (qemuDomainObjEnterMonitorAsync(driver, vm, + QEMU_ASYNC_JOB_MIGRATION_OUT) < 0) + goto cleanup; + + rv = qemuMonitorBlockJobCancel(priv->mon, diskAlias, true); - endjob: - qemuBlockJobSyncEnd(driver, vm, disk, NULL); + if (qemuDomainObjExitMonitor(driver, vm) < 0 || rv < 0) + goto cleanup; - if (disk->mirrorState == VIR_DOMAIN_DISK_MIRROR_STATE_ABORT) - disk->mirrorState = VIR_DOMAIN_DISK_MIRROR_STATE_NONE; + ret = 0; cleanup: VIR_FREE(diskAlias); @@ -1849,6 +1886,7 @@ qemuMigrationCancelOneDriveMirror(virQEMUDriverPtr driver, * qemuMigrationCancelDriveMirror: * @driver: qemu driver * @vm: domain + * @check: if true report an error when some of the mirrors fails * * Cancel all drive-mirrors started by qemuMigrationDriveMirror. * Any pending block job events for the affected disks will be @@ -1858,28 +1896,48 @@ qemuMigrationCancelOneDriveMirror(virQEMUDriverPtr driver, */ static int qemuMigrationCancelDriveMirror(virQEMUDriverPtr driver, - virDomainObjPtr vm) + virDomainObjPtr vm, + bool check) { virErrorPtr err = NULL; - int ret = 0; + int ret = -1; size_t i; + int rv; + bool failed = false; + bool *failedPtr = check ? &failed : NULL; + + VIR_DEBUG("Cancelling drive mirrors for domain %s", vm->def->name); for (i = 0; i < vm->def->ndisks; i++) { virDomainDiskDefPtr disk = vm->def->disks[i]; qemuDomainDiskPrivatePtr diskPriv = QEMU_DOMAIN_DISK_PRIVATE(disk); - if (!diskPriv->migrating || !diskPriv->blockJobSync) + if (!diskPriv->migrating) continue; - if (qemuMigrationCancelOneDriveMirror(driver, vm, disk) < 0) { - ret = -1; - if (!err) - err = virSaveLastError(); + rv = qemuMigrationCancelOneDriveMirror(driver, vm, disk, check); + if (rv != 0) { + if (rv < 0) { + if (!err) + err = virSaveLastError(); + failed = true; + } + qemuBlockJobSyncEnd(driver, vm, disk); + diskPriv->migrating = false; } + } - diskPriv->migrating = false; + while ((rv = qemuMigrationDriveMirrorCancelled(driver, vm, + failedPtr)) != 1) { + if (failed && !err) + err = virSaveLastError(); + if (rv < 0 || virThreadCondWait(&vm->parent.lock) < 0) + goto cleanup; } + ret = failed ? -1 : 0; + + cleanup: if (err) { virSetError(err); virFreeError(err); @@ -1924,6 +1982,9 @@ qemuMigrationDriveMirror(virQEMUDriverPtr driver, char *nbd_dest = NULL; char *hoststr = NULL; unsigned int mirror_flags = VIR_DOMAIN_BLOCK_REBASE_REUSE_EXT; + int rv; + + VIR_DEBUG("Starting drive mirrors for domain %s", vm->def->name); /* steal NBD port and thus prevent its propagation back to destination */ port = mig->nbd->port; @@ -1950,60 +2011,46 @@ qemuMigrationDriveMirror(virQEMUDriverPtr driver, !virDomainDiskGetSource(disk)) continue; - VIR_FREE(diskAlias); - VIR_FREE(nbd_dest); if ((virAsprintf(&diskAlias, "%s%s", QEMU_DRIVE_HOST_PREFIX, disk->info.alias) < 0) || (virAsprintf(&nbd_dest, "nbd:%s:%d:exportname=%s", hoststr, port, diskAlias) < 0)) goto cleanup; + if (qemuDomainObjEnterMonitorAsync(driver, vm, + QEMU_ASYNC_JOB_MIGRATION_OUT) < 0) + goto cleanup; + qemuBlockJobSyncBegin(disk); - - if (qemuDomainObjEnterMonitorAsync(driver, vm, - QEMU_ASYNC_JOB_MIGRATION_OUT) < 0) { - qemuBlockJobSyncEnd(driver, vm, disk, NULL); - goto cleanup; - } - mon_ret = qemuMonitorDriveMirror(priv->mon, diskAlias, nbd_dest, NULL, speed, 0, 0, mirror_flags); + VIR_FREE(diskAlias); + VIR_FREE(nbd_dest); if (qemuDomainObjExitMonitor(driver, vm) < 0 || mon_ret < 0) { - qemuBlockJobSyncEnd(driver, vm, disk, NULL); + qemuBlockJobSyncEnd(driver, vm, disk); goto cleanup; } diskPriv->migrating = true; } - /* Wait for each disk to become ready in turn, but check the status - * for *all* mirrors to determine if any have aborted. */ - for (i = 0; i < vm->def->ndisks; i++) { - virDomainDiskDefPtr disk = vm->def->disks[i]; - qemuDomainDiskPrivatePtr diskPriv = QEMU_DOMAIN_DISK_PRIVATE(disk); - - if (!diskPriv->migrating) - continue; - - while (disk->mirrorState != VIR_DOMAIN_DISK_MIRROR_STATE_READY) { - /* The following check should be race free as long as the variable - * is set only with domain object locked. And here we have the - * domain object locked too. */ - if (priv->job.asyncAbort) { - priv->job.current->type = VIR_DOMAIN_JOB_CANCELLED; - virReportError(VIR_ERR_OPERATION_ABORTED, _("%s: %s"), - qemuDomainAsyncJobTypeToString(priv->job.asyncJob), - _("canceled by client")); - goto cleanup; - } - - if (qemuBlockJobSyncWaitWithTimeout(driver, vm, disk, - 500ull, NULL) < 0) - goto cleanup; - - if (qemuMigrationCheckDriveMirror(driver, vm) < 0) - goto cleanup; + while ((rv = qemuMigrationDriveMirrorReady(driver, vm)) != 1) { + unsigned long long now; + + if (rv < 0) + goto cleanup; + + if (priv->job.asyncAbort) { + priv->job.current->type = VIR_DOMAIN_JOB_CANCELLED; + virReportError(VIR_ERR_OPERATION_ABORTED, _("%s: %s"), + qemuDomainAsyncJobTypeToString(priv->job.asyncJob), + _("canceled by client")); + goto cleanup; } + + if (virTimeMillisNow(&now) < 0 || + virThreadCondWaitUntil(&vm->parent.lock, now + 500) < 0) + goto cleanup; } /* Okay, all disks are ready. Modify migrate_flags */ @@ -2436,7 +2483,8 @@ qemuMigrationWaitForCompletion(virQEMUDriverPtr driver, virDomainObjPtr vm, qemuDomainAsyncJob asyncJob, virConnectPtr dconn, - bool abort_on_error) + bool abort_on_error, + bool storage) { qemuDomainObjPrivatePtr priv = vm->privateData; qemuDomainJobInfoPtr jobInfo = priv->job.current; @@ -2466,6 +2514,10 @@ qemuMigrationWaitForCompletion(virQEMUDriverPtr driver, if (qemuMigrationUpdateJobStatus(driver, vm, job, asyncJob) == -1) break; + if (storage && + qemuMigrationDriveMirrorReady(driver, vm) < 0) + break; + /* cancel migration if disk I/O error is emitted while migrating */ if (abort_on_error && virDomainObjGetState(vm, &pauseReason) == VIR_DOMAIN_PAUSED && @@ -3541,7 +3593,7 @@ qemuMigrationConfirmPhase(virQEMUDriverPtr driver, virErrorPtr orig_err = virSaveLastError(); /* cancel any outstanding NBD jobs */ - qemuMigrationCancelDriveMirror(driver, vm); + qemuMigrationCancelDriveMirror(driver, vm, false); virSetError(orig_err); virFreeError(orig_err); @@ -4083,20 +4135,12 @@ qemuMigrationRun(virQEMUDriverPtr driver, rc = qemuMigrationWaitForCompletion(driver, vm, QEMU_ASYNC_JOB_MIGRATION_OUT, - dconn, abort_on_error); + dconn, abort_on_error, !!mig->nbd); if (rc == -2) goto cancel; else if (rc == -1) goto cleanup; - /* Confirm state of drive mirrors */ - if (mig->nbd) { - if (qemuMigrationCheckDriveMirror(driver, vm) != 1) { - ret = -1; - goto cancel; - } - } - /* When migration completed, QEMU will have paused the * CPUs for us, but unless we're using the JSON monitor * we won't have been notified of this, so might still @@ -4120,7 +4164,7 @@ qemuMigrationRun(virQEMUDriverPtr driver, /* cancel any outstanding NBD jobs */ if (mig && mig->nbd) { - if (qemuMigrationCancelDriveMirror(driver, vm) < 0) + if (qemuMigrationCancelDriveMirror(driver, vm, !!ret) < 0) ret = -1; } @@ -5574,7 +5618,8 @@ qemuMigrationToFile(virQEMUDriverPtr driver, virDomainObjPtr vm, if (rc < 0) goto cleanup; - rc = qemuMigrationWaitForCompletion(driver, vm, asyncJob, NULL, false); + rc = qemuMigrationWaitForCompletion(driver, vm, asyncJob, + NULL, false, false); if (rc < 0) { if (rc == -2) { diff --git a/src/qemu/qemu_process.c b/src/qemu/qemu_process.c index 9c5d0f4..b66502c 100644 --- a/src/qemu/qemu_process.c +++ b/src/qemu/qemu_process.c @@ -1000,11 +1000,11 @@ qemuProcessHandleBlockJob(qemuMonitorPtr mon ATTRIBUTE_UNUSED, goto error; diskPriv = QEMU_DOMAIN_DISK_PRIVATE(disk); - if (diskPriv->blockJobSync) { + if (!virThreadQueueIsEmpty(diskPriv->blockJobQueue)) { diskPriv->blockJobType = type; diskPriv->blockJobStatus = status; - /* We have an SYNC API waiting for this event, dispatch it back */ - virCondSignal(&diskPriv->blockJobSyncCond); + /* We have a SYNC API waiting for this event, dispatch it back */ + virThreadQueueBroadcast(diskPriv->blockJobQueue); } else { /* there is no waiting SYNC API, dispatch the update to a thread */ if (VIR_ALLOC(processEvent) < 0) @@ -5060,8 +5060,7 @@ void qemuProcessStop(virQEMUDriverPtr driver, for (i = 0; i < vm->def->ndisks; i++) { qemuDomainDiskPrivatePtr diskPriv = QEMU_DOMAIN_DISK_PRIVATE(vm->def->disks[i]); - if (diskPriv->blockJobSync && diskPriv->blockJobStatus == -1) - virCondSignal(&diskPriv->blockJobSyncCond); + virThreadQueueBroadcast(diskPriv->blockJobQueue); } if ((logfile = qemuDomainCreateLog(driver, vm, true)) < 0) { -- 2.4.1

This patch reverts commit 76c61cdca20c106960af033e5d0f5da70177af0f. VIR_DOMAIN_DISK_MIRROR_STATE_ABORT says we asked for a block job to be aborted rather than saying it was aborted. Let's just use VIR_DOMAIN_DISK_MIRROR_STATE_NONE consistently whenever a block job finishes since no caller depends on VIR_DOMAIN_DISK_MIRROR_STATE_ABORT (anymore) to check whether a block job failed or it was cancelled. Signed-off-by: Jiri Denemark <jdenemar@redhat.com> --- src/qemu/qemu_blockjob.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/qemu/qemu_blockjob.c b/src/qemu/qemu_blockjob.c index e97f449..b69b524 100644 --- a/src/qemu/qemu_blockjob.c +++ b/src/qemu/qemu_blockjob.c @@ -164,8 +164,7 @@ qemuBlockJobEventProcess(virQEMUDriverPtr driver, case VIR_DOMAIN_BLOCK_JOB_CANCELED: virStorageSourceFree(disk->mirror); disk->mirror = NULL; - disk->mirrorState = status == VIR_DOMAIN_BLOCK_JOB_FAILED ? - VIR_DOMAIN_DISK_MIRROR_STATE_ABORT : VIR_DOMAIN_DISK_MIRROR_STATE_NONE; + disk->mirrorState = VIR_DOMAIN_DISK_MIRROR_STATE_NONE; disk->mirrorJob = VIR_DOMAIN_BLOCK_JOB_TYPE_UNKNOWN; save = true; diskPriv->blockjob = false; -- 2.4.1

So that they can format private data (e.g., disk private data) stored elsewhere in the domain object. Signed-off-by: Jiri Denemark <jdenemar@redhat.com> --- src/conf/domain_conf.c | 4 ++-- src/conf/domain_conf.h | 6 ++++-- src/libxl/libxl_domain.c | 10 ++++++---- src/lxc/lxc_domain.c | 12 ++++++++---- src/qemu/qemu_domain.c | 10 ++++++---- 5 files changed, 26 insertions(+), 16 deletions(-) diff --git a/src/conf/domain_conf.c b/src/conf/domain_conf.c index 631f9eb..bb7ca17 100644 --- a/src/conf/domain_conf.c +++ b/src/conf/domain_conf.c @@ -15853,7 +15853,7 @@ virDomainObjParseXML(xmlDocPtr xml, VIR_FREE(nodes); if (xmlopt->privateData.parse && - ((xmlopt->privateData.parse)(ctxt, obj->privateData)) < 0) + xmlopt->privateData.parse(ctxt, obj) < 0) goto error; return obj; @@ -21777,7 +21777,7 @@ virDomainObjFormat(virDomainXMLOptionPtr xmlopt, } if (xmlopt->privateData.format && - ((xmlopt->privateData.format)(&buf, obj->privateData)) < 0) + xmlopt->privateData.format(&buf, obj) < 0) goto error; if (virDomainDefFormatInternal(obj->def, flags, &buf) < 0) diff --git a/src/conf/domain_conf.h b/src/conf/domain_conf.h index 0fcf52e..c750c0f 100644 --- a/src/conf/domain_conf.h +++ b/src/conf/domain_conf.h @@ -2343,8 +2343,10 @@ typedef virDomainXMLOption *virDomainXMLOptionPtr; typedef void *(*virDomainXMLPrivateDataAllocFunc)(void); typedef void (*virDomainXMLPrivateDataFreeFunc)(void *); typedef virObjectPtr (*virDomainXMLPrivateDataNewFunc)(void); -typedef int (*virDomainXMLPrivateDataFormatFunc)(virBufferPtr, void *); -typedef int (*virDomainXMLPrivateDataParseFunc)(xmlXPathContextPtr, void *); +typedef int (*virDomainXMLPrivateDataFormatFunc)(virBufferPtr, + virDomainObjPtr); +typedef int (*virDomainXMLPrivateDataParseFunc)(xmlXPathContextPtr, + virDomainObjPtr); /* Called once after everything else has been parsed, for adjusting * overall domain defaults. */ diff --git a/src/libxl/libxl_domain.c b/src/libxl/libxl_domain.c index 5f5f8e5..83d5e77 100644 --- a/src/libxl/libxl_domain.c +++ b/src/libxl/libxl_domain.c @@ -223,9 +223,10 @@ libxlDomainObjPrivateFree(void *data) } static int -libxlDomainObjPrivateXMLParse(xmlXPathContextPtr ctxt, void *data) +libxlDomainObjPrivateXMLParse(xmlXPathContextPtr ctxt, + virDomainObjPtr vm) { - libxlDomainObjPrivatePtr priv = data; + libxlDomainObjPrivatePtr priv = vm->privateData; priv->lockState = virXPathString("string(./lockstate)", ctxt); @@ -233,9 +234,10 @@ libxlDomainObjPrivateXMLParse(xmlXPathContextPtr ctxt, void *data) } static int -libxlDomainObjPrivateXMLFormat(virBufferPtr buf, void *data) +libxlDomainObjPrivateXMLFormat(virBufferPtr buf, + virDomainObjPtr vm) { - libxlDomainObjPrivatePtr priv = data; + libxlDomainObjPrivatePtr priv = vm->privateData; if (priv->lockState) virBufferAsprintf(buf, "<lockstate>%s</lockstate>\n", priv->lockState); diff --git a/src/lxc/lxc_domain.c b/src/lxc/lxc_domain.c index c2180cb..70606f3 100644 --- a/src/lxc/lxc_domain.c +++ b/src/lxc/lxc_domain.c @@ -51,9 +51,11 @@ static void virLXCDomainObjPrivateFree(void *data) } -static int virLXCDomainObjPrivateXMLFormat(virBufferPtr buf, void *data) +static int +virLXCDomainObjPrivateXMLFormat(virBufferPtr buf, + virDomainObjPtr vm) { - virLXCDomainObjPrivatePtr priv = data; + virLXCDomainObjPrivatePtr priv = vm->privateData; virBufferAsprintf(buf, "<init pid='%llu'/>\n", (unsigned long long)priv->initpid); @@ -61,9 +63,11 @@ static int virLXCDomainObjPrivateXMLFormat(virBufferPtr buf, void *data) return 0; } -static int virLXCDomainObjPrivateXMLParse(xmlXPathContextPtr ctxt, void *data) +static int +virLXCDomainObjPrivateXMLParse(xmlXPathContextPtr ctxt, + virDomainObjPtr vm) { - virLXCDomainObjPrivatePtr priv = data; + virLXCDomainObjPrivatePtr priv = vm->privateData; unsigned long long thepid; if (virXPathULongLong("string(./init[1]/@pid)", ctxt, &thepid) < 0) { diff --git a/src/qemu/qemu_domain.c b/src/qemu/qemu_domain.c index c25d5a5..8724cf0 100644 --- a/src/qemu/qemu_domain.c +++ b/src/qemu/qemu_domain.c @@ -525,9 +525,10 @@ qemuDomainObjPrivateFree(void *data) static int -qemuDomainObjPrivateXMLFormat(virBufferPtr buf, void *data) +qemuDomainObjPrivateXMLFormat(virBufferPtr buf, + virDomainObjPtr vm) { - qemuDomainObjPrivatePtr priv = data; + qemuDomainObjPrivatePtr priv = vm->privateData; const char *monitorpath; qemuDomainJob job; @@ -614,9 +615,10 @@ qemuDomainObjPrivateXMLFormat(virBufferPtr buf, void *data) } static int -qemuDomainObjPrivateXMLParse(xmlXPathContextPtr ctxt, void *data) +qemuDomainObjPrivateXMLParse(xmlXPathContextPtr ctxt, + virDomainObjPtr vm) { - qemuDomainObjPrivatePtr priv = data; + qemuDomainObjPrivatePtr priv = vm->privateData; char *monitorpath; char *tmp; int n; -- 2.4.1

We don't have an async job when reconnecting to existing domains after libvirtd restart. Signed-off-by: Jiri Denemark <jdenemar@redhat.com> --- src/qemu/qemu_migration.c | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/src/qemu/qemu_migration.c b/src/qemu/qemu_migration.c index f866ac5..cd08c88 100644 --- a/src/qemu/qemu_migration.c +++ b/src/qemu/qemu_migration.c @@ -1837,7 +1837,8 @@ static int qemuMigrationCancelOneDriveMirror(virQEMUDriverPtr driver, virDomainObjPtr vm, virDomainDiskDefPtr disk, - bool failNoJob) + bool failNoJob, + qemuDomainAsyncJob asyncJob) { qemuDomainObjPrivatePtr priv = vm->privateData; char *diskAlias = NULL; @@ -1865,8 +1866,7 @@ qemuMigrationCancelOneDriveMirror(virQEMUDriverPtr driver, QEMU_DRIVE_HOST_PREFIX, disk->info.alias) < 0) return -1; - if (qemuDomainObjEnterMonitorAsync(driver, vm, - QEMU_ASYNC_JOB_MIGRATION_OUT) < 0) + if (qemuDomainObjEnterMonitorAsync(driver, vm, asyncJob) < 0) goto cleanup; rv = qemuMonitorBlockJobCancel(priv->mon, diskAlias, true); @@ -1897,7 +1897,8 @@ qemuMigrationCancelOneDriveMirror(virQEMUDriverPtr driver, static int qemuMigrationCancelDriveMirror(virQEMUDriverPtr driver, virDomainObjPtr vm, - bool check) + bool check, + qemuDomainAsyncJob asyncJob) { virErrorPtr err = NULL; int ret = -1; @@ -1915,7 +1916,8 @@ qemuMigrationCancelDriveMirror(virQEMUDriverPtr driver, if (!diskPriv->migrating) continue; - rv = qemuMigrationCancelOneDriveMirror(driver, vm, disk, check); + rv = qemuMigrationCancelOneDriveMirror(driver, vm, disk, + check, asyncJob); if (rv != 0) { if (rv < 0) { if (!err) @@ -3593,7 +3595,8 @@ qemuMigrationConfirmPhase(virQEMUDriverPtr driver, virErrorPtr orig_err = virSaveLastError(); /* cancel any outstanding NBD jobs */ - qemuMigrationCancelDriveMirror(driver, vm, false); + qemuMigrationCancelDriveMirror(driver, vm, false, + QEMU_ASYNC_JOB_MIGRATION_OUT); virSetError(orig_err); virFreeError(orig_err); @@ -4164,7 +4167,8 @@ qemuMigrationRun(virQEMUDriverPtr driver, /* cancel any outstanding NBD jobs */ if (mig && mig->nbd) { - if (qemuMigrationCancelDriveMirror(driver, vm, !!ret) < 0) + if (qemuMigrationCancelDriveMirror(driver, vm, !!ret, + QEMU_ASYNC_JOB_MIGRATION_OUT) < 0) ret = -1; } -- 2.4.1

When libvirtd is restarted during migration, we properly cancel the ongoing migration (unless it managed to almost finished before the restart). But if we were also migrating storage using NBD, we would completely forget about the running disk mirrors. Signed-off-by: Jiri Denemark <jdenemar@redhat.com> --- src/qemu/qemu_domain.c | 45 +++++++++++++++++++++++++++++++++- src/qemu/qemu_migration.c | 61 +++++++++++++++++++++++++++++++++++++++++++++++ src/qemu/qemu_migration.h | 3 +++ src/qemu/qemu_process.c | 8 +------ 4 files changed, 109 insertions(+), 8 deletions(-) diff --git a/src/qemu/qemu_domain.c b/src/qemu/qemu_domain.c index 8724cf0..35cfe20 100644 --- a/src/qemu/qemu_domain.c +++ b/src/qemu/qemu_domain.c @@ -592,7 +592,27 @@ qemuDomainObjPrivateXMLFormat(virBufferPtr buf, qemuDomainAsyncJobPhaseToString( priv->job.asyncJob, priv->job.phase)); } - virBufferAddLit(buf, "/>\n"); + if (priv->job.asyncJob != QEMU_ASYNC_JOB_MIGRATION_OUT) { + virBufferAddLit(buf, "/>\n"); + } else { + size_t i; + virDomainDiskDefPtr disk; + qemuDomainDiskPrivatePtr diskPriv; + + virBufferAddLit(buf, ">\n"); + virBufferAdjustIndent(buf, 2); + + for (i = 0; i < vm->def->ndisks; i++) { + disk = vm->def->disks[i]; + diskPriv = QEMU_DOMAIN_DISK_PRIVATE(disk); + virBufferAsprintf(buf, "<disk dev='%s' migrating='%s'/>\n", + disk->dst, + diskPriv->migrating ? "yes" : "no"); + } + + virBufferAdjustIndent(buf, -2); + virBufferAddLit(buf, "</job>\n"); + } } priv->job.active = job; @@ -750,6 +770,29 @@ qemuDomainObjPrivateXMLParse(xmlXPathContextPtr ctxt, } } + if ((n = virXPathNodeSet("./job[1]/disk[@migrating='yes']", + ctxt, &nodes)) < 0) { + virReportError(VIR_ERR_INTERNAL_ERROR, "%s", + _("failed to parse list of disks marked for migration")); + goto error; + } + if (n > 0) { + if (priv->job.asyncJob != QEMU_ASYNC_JOB_MIGRATION_OUT) { + VIR_WARN("Found disks marked for migration but we were not " + "migrating"); + n = 0; + } + for (i = 0; i < n; i++) { + char *dst = virXMLPropString(nodes[i], "dev"); + virDomainDiskDefPtr disk; + + if (dst && (disk = virDomainDiskByName(vm->def, dst, false))) + QEMU_DOMAIN_DISK_PRIVATE(disk)->migrating = true; + VIR_FREE(dst); + } + } + VIR_FREE(nodes); + priv->fakeReboot = virXPathBoolean("boolean(./fakereboot)", ctxt) == 1; if ((n = virXPathNodeSet("./devices/device", ctxt, &nodes)) < 0) { diff --git a/src/qemu/qemu_migration.c b/src/qemu/qemu_migration.c index cd08c88..1db6979 100644 --- a/src/qemu/qemu_migration.c +++ b/src/qemu/qemu_migration.c @@ -1985,6 +1985,7 @@ qemuMigrationDriveMirror(virQEMUDriverPtr driver, char *hoststr = NULL; unsigned int mirror_flags = VIR_DOMAIN_BLOCK_REBASE_REUSE_EXT; int rv; + virQEMUDriverConfigPtr cfg = virQEMUDriverGetConfig(driver); VIR_DEBUG("Starting drive mirrors for domain %s", vm->def->name); @@ -2034,6 +2035,11 @@ qemuMigrationDriveMirror(virQEMUDriverPtr driver, goto cleanup; } diskPriv->migrating = true; + + if (virDomainSaveStatus(driver->xmlopt, cfg->stateDir, vm) < 0) { + VIR_WARN("Failed to save status on vm %s", vm->def->name); + goto cleanup; + } } while ((rv = qemuMigrationDriveMirrorReady(driver, vm)) != 1) { @@ -2061,6 +2067,7 @@ qemuMigrationDriveMirror(virQEMUDriverPtr driver, ret = 0; cleanup: + virObjectUnref(cfg); VIR_FREE(diskAlias); VIR_FREE(nbd_dest); VIR_FREE(hoststr); @@ -5682,6 +5689,60 @@ qemuMigrationToFile(virQEMUDriverPtr driver, virDomainObjPtr vm, return ret; } + +int +qemuMigrationCancel(virQEMUDriverPtr driver, + virDomainObjPtr vm) +{ + qemuDomainObjPrivatePtr priv = vm->privateData; + size_t i; + + VIR_DEBUG("Canceling unfinished outgoing migration of domain %s", + vm->def->name); + + qemuDomainObjEnterMonitor(driver, vm); + ignore_value(qemuMonitorMigrateCancel(priv->mon)); + if (qemuDomainObjExitMonitor(driver, vm) < 0) + return -1; + + for (i = 0; i < vm->def->ndisks; i++) { + virDomainDiskDefPtr disk = vm->def->disks[i]; + qemuDomainDiskPrivatePtr diskPriv = QEMU_DOMAIN_DISK_PRIVATE(disk); + virDomainBlockJobInfo info; + char *diskAlias; + int rc; + + if (!diskPriv->migrating) + continue; + + if (virAsprintf(&diskAlias, QEMU_DRIVE_HOST_PREFIX "%s", + disk->info.alias) < 0) + return -1; + + VIR_DEBUG("Checking drive mirror state for disk %s (%s)", + disk->dst, diskAlias); + + qemuBlockJobSyncBegin(disk); + qemuDomainObjEnterMonitor(driver, vm); + rc = qemuMonitorBlockJobInfo(priv->mon, diskAlias, &info, NULL); + VIR_FREE(diskAlias); + if (qemuDomainObjExitMonitor(driver, vm) < 0) + return -1; + + if (rc) { + VIR_DEBUG("Drive mirror on disk %s is still running", disk->dst); + } else { + VIR_DEBUG("Drive mirror on disk %s is gone", disk->dst); + qemuBlockJobSyncEnd(driver, vm, disk); + diskPriv->migrating = false; + } + } + + qemuMigrationCancelDriveMirror(driver, vm, false, QEMU_ASYNC_JOB_NONE); + return 0; +} + + int qemuMigrationJobStart(virQEMUDriverPtr driver, virDomainObjPtr vm, diff --git a/src/qemu/qemu_migration.h b/src/qemu/qemu_migration.h index 1726455..e47bde5 100644 --- a/src/qemu/qemu_migration.h +++ b/src/qemu/qemu_migration.h @@ -177,4 +177,7 @@ int qemuMigrationToFile(virQEMUDriverPtr driver, virDomainObjPtr vm, ATTRIBUTE_NONNULL(1) ATTRIBUTE_NONNULL(2) ATTRIBUTE_NONNULL(5) ATTRIBUTE_RETURN_CHECK; +int qemuMigrationCancel(virQEMUDriverPtr driver, + virDomainObjPtr vm); + #endif /* __QEMU_MIGRATION_H__ */ diff --git a/src/qemu/qemu_process.c b/src/qemu/qemu_process.c index b66502c..e4afcf9 100644 --- a/src/qemu/qemu_process.c +++ b/src/qemu/qemu_process.c @@ -3354,8 +3354,6 @@ qemuProcessRecoverMigration(virQEMUDriverPtr driver, virDomainState state, int reason) { - qemuDomainObjPrivatePtr priv = vm->privateData; - if (job == QEMU_ASYNC_JOB_MIGRATION_IN) { switch (phase) { case QEMU_MIGRATION_PHASE_NONE: @@ -3409,11 +3407,7 @@ qemuProcessRecoverMigration(virQEMUDriverPtr driver, case QEMU_MIGRATION_PHASE_PERFORM3: /* migration is still in progress, let's cancel it and resume the * domain */ - VIR_DEBUG("Canceling unfinished outgoing migration of domain %s", - vm->def->name); - qemuDomainObjEnterMonitor(driver, vm); - ignore_value(qemuMonitorMigrateCancel(priv->mon)); - if (qemuDomainObjExitMonitor(driver, vm) < 0) + if (qemuMigrationCancel(driver, vm) < 0) return -1; /* resume the domain but only if it was paused as a result of * migration */ -- 2.4.1

To avoid polling for asyncAbort flag changes. Any thread which enters an async job is automatically registered to be woken up whenever asyncAbort is set (as long as the thread is waiting for its thread condition). Signed-off-by: Jiri Denemark <jdenemar@redhat.com> --- src/qemu/qemu_domain.c | 37 +++++++++++++++++++++---------------- src/qemu/qemu_domain.h | 4 +++- src/qemu/qemu_migration.c | 11 ++++------- 3 files changed, 28 insertions(+), 24 deletions(-) diff --git a/src/qemu/qemu_domain.c b/src/qemu/qemu_domain.c index 35cfe20..68811aa 100644 --- a/src/qemu/qemu_domain.c +++ b/src/qemu/qemu_domain.c @@ -130,16 +130,25 @@ void qemuDomainEventQueue(virQEMUDriverPtr driver, } +static void +qemuDomainObjFreeJob(qemuDomainObjPrivatePtr priv) +{ + VIR_FREE(priv->job.current); + VIR_FREE(priv->job.completed); + virCondDestroy(&priv->job.cond); + virCondDestroy(&priv->job.asyncCond); + virThreadQueueFree(priv->job.abortQueue); +} + static int qemuDomainObjInitJob(qemuDomainObjPrivatePtr priv) { memset(&priv->job, 0, sizeof(priv->job)); - if (virCondInit(&priv->job.cond) < 0) - return -1; - - if (virCondInit(&priv->job.asyncCond) < 0) { - virCondDestroy(&priv->job.cond); + if (virCondInit(&priv->job.cond) < 0 || + virCondInit(&priv->job.asyncCond) < 0 || + !(priv->job.abortQueue = virThreadQueueNew())) { + qemuDomainObjFreeJob(priv); return -1; } @@ -169,7 +178,8 @@ qemuDomainObjResetAsyncJob(qemuDomainObjPrivatePtr priv) job->phase = 0; job->mask = QEMU_JOB_DEFAULT_MASK; job->dump_memory_only = false; - job->asyncAbort = false; + virThreadQueueUnregister(job->abortQueue); + job->abortJob = false; VIR_FREE(job->current); } @@ -190,15 +200,6 @@ qemuDomainObjRestoreJob(virDomainObjPtr obj, qemuDomainObjResetAsyncJob(priv); } -static void -qemuDomainObjFreeJob(qemuDomainObjPrivatePtr priv) -{ - VIR_FREE(priv->job.current); - VIR_FREE(priv->job.completed); - virCondDestroy(&priv->job.cond); - virCondDestroy(&priv->job.asyncCond); -} - static bool qemuDomainTrackJob(qemuDomainJob job) { @@ -1344,6 +1345,7 @@ qemuDomainObjSetJobPhase(virQEMUDriverPtr driver, priv->job.phase = phase; priv->job.asyncOwner = me; + virThreadQueueRegister(priv->job.abortQueue); qemuDomainObjSaveJob(driver, obj); } @@ -1383,6 +1385,7 @@ qemuDomainObjReleaseAsyncJob(virDomainObjPtr obj) qemuDomainAsyncJobTypeToString(priv->job.asyncJob), priv->job.asyncOwner); } + virThreadQueueUnregister(priv->job.abortQueue); priv->job.asyncOwner = 0; } @@ -1482,6 +1485,7 @@ qemuDomainObjBeginJobInternal(virQEMUDriverPtr driver, priv->job.asyncOwnerAPI = virThreadJobGet(); priv->job.asyncStarted = now; priv->job.current->started = now; + virThreadQueueRegister(priv->job.abortQueue); } if (qemuDomainTrackJob(job)) @@ -1652,7 +1656,8 @@ qemuDomainObjAbortAsyncJob(virDomainObjPtr obj) qemuDomainAsyncJobTypeToString(priv->job.asyncJob), obj, obj->def->name); - priv->job.asyncAbort = true; + priv->job.abortJob = true; + virThreadQueueBroadcast(priv->job.abortQueue); } /* diff --git a/src/qemu/qemu_domain.h b/src/qemu/qemu_domain.h index 2117a3d..12c784b 100644 --- a/src/qemu/qemu_domain.h +++ b/src/qemu/qemu_domain.h @@ -136,7 +136,9 @@ struct qemuDomainJobObj { bool dump_memory_only; /* use dump-guest-memory to do dump */ qemuDomainJobInfoPtr current; /* async job progress data */ qemuDomainJobInfoPtr completed; /* statistics data of a recently completed job */ - bool asyncAbort; /* abort of async job requested */ + + virThreadQueuePtr abortQueue; /* threads listening to abortJob */ + bool abortJob; /* abort of the job requested */ }; typedef void (*qemuDomainCleanupCallback)(virQEMUDriverPtr driver, diff --git a/src/qemu/qemu_migration.c b/src/qemu/qemu_migration.c index 1db6979..43ecea5 100644 --- a/src/qemu/qemu_migration.c +++ b/src/qemu/qemu_migration.c @@ -2043,12 +2043,10 @@ qemuMigrationDriveMirror(virQEMUDriverPtr driver, } while ((rv = qemuMigrationDriveMirrorReady(driver, vm)) != 1) { - unsigned long long now; - if (rv < 0) goto cleanup; - if (priv->job.asyncAbort) { + if (priv->job.abortJob) { priv->job.current->type = VIR_DOMAIN_JOB_CANCELLED; virReportError(VIR_ERR_OPERATION_ABORTED, _("%s: %s"), qemuDomainAsyncJobTypeToString(priv->job.asyncJob), @@ -2056,8 +2054,7 @@ qemuMigrationDriveMirror(virQEMUDriverPtr driver, goto cleanup; } - if (virTimeMillisNow(&now) < 0 || - virThreadCondWaitUntil(&vm->parent.lock, now + 500) < 0) + if (virThreadCondWait(&vm->parent.lock) < 0) goto cleanup; } @@ -4053,10 +4050,10 @@ qemuMigrationRun(virQEMUDriverPtr driver, QEMU_ASYNC_JOB_MIGRATION_OUT) < 0) goto cleanup; - if (priv->job.asyncAbort) { + if (priv->job.abortJob) { /* explicitly do this *after* we entered the monitor, * as this is a critical section so we are guaranteed - * priv->job.asyncAbort will not change */ + * priv->job.abortJob will not change */ ignore_value(qemuDomainObjExitMonitor(driver, vm)); priv->job.current->type = VIR_DOMAIN_JOB_CANCELLED; virReportError(VIR_ERR_OPERATION_ABORTED, _("%s: %s"), -- 2.4.1
participants (4)
-
Daniel P. Berrange
-
Jiri Denemark
-
John Ferlan
-
Peter Krempa