[libvirt PATCH v2 0/7] qemu: introduce a per-VM event loop thread

This series changes the way we manage the QEMU monitor and QEMU agent, such that all I/O is processed by a dedicated event loop thread. Many times in the past years people are reported issues where long running monitor event callbacks block the main libvirtd event loop for an unacceptably long period of time. In the best case, this delays other work being completed, but in bad cases it leads to mgmt app failures when keepalive times trigger a client disconnect. With this series, when we spawn QEMU, we also spawn a dedicated thread running a GMainLoop instance. Then QEMU monitor and QEMU agent UNIX sockets are switched to use GMainContext for events instead of the traditional libvirt event loop APIs. We kill off the event thread when we see EOF on the QEMU monitor during shutdown. The cost of this approach is one extra thread per VM, which incurs a new OS process and a new stack allocation. The QEMU driver already delegates some QMP event handling to a thread pool for certain types of event. This was a previous hack to mitigate the impact on the main event loop. It is likely that we can remove this thread pool from the QEMU driver & rely on the per-VM event threads to do all the work. This will, however, require careful analysis of each handler we pushed into the thread pool to make sure its work doesn't have a dependency on the event loop running in parallel. This is one step towards eliminating the need to have the libvirt event loop registered when using the embedded QEMU driver. A further step is using a thread to dispatch the lifecycle events, since that currently relies on a zero second timer being registered with the event loop. Changed in v2: - Fixed race accessing free'd memory causing crash - Fixed unused variables - Merged first acked patches Daniel P. Berrangé (7): src: introduce an abstraction for running event loops qemu: start/stop an event loop thread for domains qemu: start/stop an event thread for QMP probing tests: start/stop an event thread for QEMU monitor/agent tests qemu: convert monitor to use the per-VM event loop qemu: fix variable naming in agent code qemu: convert agent to use the per-VM event loop po/POTFILES.in | 1 + src/libvirt_private.syms | 5 + src/qemu/qemu_agent.c | 600 ++++++++++++++++++----------------- src/qemu/qemu_agent.h | 1 + src/qemu/qemu_domain.c | 33 ++ src/qemu/qemu_domain.h | 6 + src/qemu/qemu_monitor.c | 145 ++++----- src/qemu/qemu_monitor.h | 3 +- src/qemu/qemu_process.c | 43 ++- src/qemu/qemu_process.h | 2 + src/util/Makefile.inc.am | 2 + src/util/vireventthread.c | 190 +++++++++++ src/util/vireventthread.h | 31 ++ tests/qemumonitortestutils.c | 14 + 14 files changed, 700 insertions(+), 376 deletions(-) create mode 100644 src/util/vireventthread.c create mode 100644 src/util/vireventthread.h -- 2.24.1

We want a way to easily run a private GMainContext in a thread, with correct synchronization between startup and shutdown of the thread. Signed-off-by: Daniel P. Berrangé <berrange@redhat.com> --- po/POTFILES.in | 1 + src/libvirt_private.syms | 5 + src/util/Makefile.inc.am | 2 + src/util/vireventthread.c | 190 ++++++++++++++++++++++++++++++++++++++ src/util/vireventthread.h | 31 +++++++ 5 files changed, 229 insertions(+) create mode 100644 src/util/vireventthread.c create mode 100644 src/util/vireventthread.h diff --git a/po/POTFILES.in b/po/POTFILES.in index 982f2ebc36..413bcd1049 100644 --- a/po/POTFILES.in +++ b/po/POTFILES.in @@ -242,6 +242,7 @@ @SRCDIR@/src/util/virerror.c @SRCDIR@/src/util/virerror.h @SRCDIR@/src/util/virevent.c +@SRCDIR@/src/util/vireventthread.c @SRCDIR@/src/util/virfcp.c @SRCDIR@/src/util/virfdstream.c @SRCDIR@/src/util/virfile.c diff --git a/src/libvirt_private.syms b/src/libvirt_private.syms index 07dee6d841..b04ff52de7 100644 --- a/src/libvirt_private.syms +++ b/src/libvirt_private.syms @@ -1990,6 +1990,11 @@ virEventGLibRegister; virEventGLibRunOnce; +# util/vireventthread.h +virEventThreadGetContext; +virEventThreadNew; + + # util/virfcp.h virFCIsCapableRport; virFCReadRportValue; diff --git a/src/util/Makefile.inc.am b/src/util/Makefile.inc.am index ddb3b43c5f..1a01b79cb8 100644 --- a/src/util/Makefile.inc.am +++ b/src/util/Makefile.inc.am @@ -63,6 +63,8 @@ UTIL_SOURCES = \ util/vireventglib.h \ util/vireventglibwatch.c \ util/vireventglibwatch.h \ + util/vireventthread.c \ + util/vireventthread.h \ util/virfcp.c \ util/virfcp.h \ util/virfdstream.c \ diff --git a/src/util/vireventthread.c b/src/util/vireventthread.c new file mode 100644 index 0000000000..cf865925eb --- /dev/null +++ b/src/util/vireventthread.c @@ -0,0 +1,190 @@ +/* + * vireventthread.c: thread running a dedicated GMainLoop + * + * Copyright (C) 2020 Red Hat, Inc. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library. If not, see + * <http://www.gnu.org/licenses/>. + */ + +#include <config.h> + +#include "vireventthread.h" +#include "virthread.h" +#include "virerror.h" + +struct _virEventThread { + GObject parent; + + GThread *thread; + GMainContext *context; + GMainLoop *loop; +}; + +G_DEFINE_TYPE(virEventThread, vir_event_thread, G_TYPE_OBJECT) + +#define VIR_FROM_THIS VIR_FROM_EVENT + +static void +vir_event_thread_finalize(GObject *object) +{ + virEventThread *evt = VIR_EVENT_THREAD(object); + + if (evt->thread) { + g_main_loop_quit(evt->loop); + g_thread_unref(evt->thread); + } + + g_main_loop_unref(evt->loop); + g_main_context_unref(evt->context); + + G_OBJECT_CLASS(vir_event_thread_parent_class)->finalize(object); +} + + +static void +vir_event_thread_init(virEventThread *evt) +{ + evt->context = g_main_context_new(); + evt->loop = g_main_loop_new(evt->context, FALSE); +} + + +static void +vir_event_thread_class_init(virEventThreadClass *klass) +{ + GObjectClass *obj = G_OBJECT_CLASS(klass); + + obj->finalize = vir_event_thread_finalize; +} + + +typedef struct { + GCond cond; + GMutex lock; + bool running; + + GMainContext *context; + GMainLoop *loop; +} virEventThreadData; + + +static void +virEventThreadDataFree(virEventThreadData *data) +{ + g_main_loop_unref(data->loop); + g_main_context_unref(data->context); + + g_mutex_clear(&data->lock); + g_cond_clear(&data->cond); + + g_free(data); +} + + +static gboolean +virEventThreadNotify(void *opaque) +{ + virEventThreadData *data = opaque; + + g_mutex_lock(&data->lock); + data->running = TRUE; + g_mutex_unlock(&data->lock); + g_cond_signal(&data->cond); + + return G_SOURCE_REMOVE; +} + + +static void * +virEventThreadWorker(void *opaque) +{ + virEventThreadData *data = opaque; + g_autoptr(GSource) running = g_idle_source_new(); + + g_source_set_callback(running, virEventThreadNotify, data, NULL); + + g_source_attach(running, data->context); + + g_main_loop_run(data->loop); + + virEventThreadDataFree(data); + + return NULL; +} + + +static int +virEventThreadStart(virEventThread *evt, const char *name) +{ + g_autoptr(GError) gerr = NULL; + g_autofree char *thname = NULL; + size_t maxname = virThreadMaxName(); + virEventThreadData *data; + + if (maxname) + thname = g_strndup(name, maxname); + else + thname = g_strdup(name); + + if (evt->thread) { + virReportError(VIR_ERR_INTERNAL_ERROR, "%s", + _("Event thread is already running")); + return -1; + } + + data = g_new0(virEventThreadData, 1); + data->loop = g_main_loop_ref(evt->loop); + data->context = g_main_context_ref(evt->context); + g_mutex_init(&data->lock); + g_cond_init(&data->cond); + + evt->thread = g_thread_try_new(thname, + virEventThreadWorker, + data, + &gerr); + if (!evt->thread) { + virEventThreadDataFree(data); + virReportError(VIR_ERR_INTERNAL_ERROR, + _("Unable to start event thread: %s"), + gerr->message); + return -1; + } + + g_mutex_lock(&data->lock); + while (!data->running) + g_cond_wait(&data->cond, &data->lock); + g_mutex_unlock(&data->lock); + + return 0; +} + + +virEventThread * +virEventThreadNew(const char *name) +{ + g_autoptr(virEventThread) evt = VIR_EVENT_THREAD(g_object_new(VIR_TYPE_EVENT_THREAD, NULL)); + + if (virEventThreadStart(evt, name) < 0) + return NULL; + + return g_steal_pointer(&evt); +} + + +GMainContext * +virEventThreadGetContext(virEventThread *evt) +{ + return evt->context; +} diff --git a/src/util/vireventthread.h b/src/util/vireventthread.h new file mode 100644 index 0000000000..5826c25cf4 --- /dev/null +++ b/src/util/vireventthread.h @@ -0,0 +1,31 @@ +/* + * vireventthread.h: thread running a dedicated GMainLoop + * + * Copyright (C) 2020 Red Hat, Inc. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library. If not, see + * <http://www.gnu.org/licenses/>. + */ + +#pragma once + +#include "internal.h" +#include <glib-object.h> + +#define VIR_TYPE_EVENT_THREAD vir_event_thread_get_type() +G_DECLARE_FINAL_TYPE(virEventThread, vir_event_thread, VIR, EVENT_THREAD, GObject); + +virEventThread *virEventThreadNew(const char *name); + +GMainContext *virEventThreadGetContext(virEventThread *evt); -- 2.24.1

The event loop thread will be responsible for handling any per-domain I/O operations, most notably the QEMU monitor and agent sockets. We start this event loop when launching QEMU, but stopping the event loop is a little more complicated. The obvious idea is to stop it in qemuProcessStop(), but if we do that we risk loosing the final events from the QEMU monitor, as they might not have been read by the event thread at the time we tell the thread to stop. The solution is to delay shutdown of the event thread until we have seen EOF from the QEMU monitor, and thus we know there are no further events to process. Note that this assumes that we don't have events to process from the QEMU agent. Signed-off-by: Daniel P. Berrangé <berrange@redhat.com> --- src/qemu/qemu_domain.c | 33 +++++++++++++++++++++++++++++++++ src/qemu/qemu_domain.h | 6 ++++++ src/qemu/qemu_process.c | 21 +++++++++++++++++++++ 3 files changed, 60 insertions(+) diff --git a/src/qemu/qemu_domain.c b/src/qemu/qemu_domain.c index 33c2158eb5..58455692dd 100644 --- a/src/qemu/qemu_domain.c +++ b/src/qemu/qemu_domain.c @@ -2150,6 +2150,33 @@ dbusVMStateHashFree(void *opaque) } +int +qemuDomainObjStartWorker(virDomainObjPtr dom) +{ + qemuDomainObjPrivatePtr priv = dom->privateData; + + if (!priv->eventThread) { + g_autofree char *threadName = g_strdup_printf("vm-%s", dom->def->name); + if (!(priv->eventThread = virEventThreadNew(threadName))) + return -1; + } + + return 0; +} + + +void +qemuDomainObjStopWorker(virDomainObjPtr dom) +{ + qemuDomainObjPrivatePtr priv = dom->privateData; + + if (priv->eventThread) { + g_object_unref(priv->eventThread); + priv->eventThread = NULL; + } +} + + static void * qemuDomainObjPrivateAlloc(void *opaque) { @@ -2289,6 +2316,12 @@ qemuDomainObjPrivateFree(void *data) virHashFree(priv->blockjobs); virHashFree(priv->dbusVMStates); + /* This should never be non-NULL if we get here, but just in case... */ + if (priv->eventThread) { + VIR_ERROR(_("Unexpected event thread still active during domain deletion")); + g_object_unref(priv->eventThread); + } + VIR_FREE(priv); } diff --git a/src/qemu/qemu_domain.h b/src/qemu/qemu_domain.h index 476056c73f..c17d3bc6e2 100644 --- a/src/qemu/qemu_domain.h +++ b/src/qemu/qemu_domain.h @@ -40,6 +40,7 @@ #include "logging/log_manager.h" #include "virdomainmomentobjlist.h" #include "virenum.h" +#include "vireventthread.h" #define QEMU_DOMAIN_FORMAT_LIVE_FLAGS \ (VIR_DOMAIN_XML_SECURE) @@ -300,6 +301,8 @@ struct _qemuDomainObjPrivate { virBitmapPtr namespaces; + virEventThread *eventThread; + qemuMonitorPtr mon; virDomainChrSourceDefPtr monConfig; bool monError; @@ -630,6 +633,9 @@ struct _qemuDomainXmlNsDef { char **capsdel; }; +int qemuDomainObjStartWorker(virDomainObjPtr dom); +void qemuDomainObjStopWorker(virDomainObjPtr dom); + virDomainObjPtr qemuDomainObjFromDomain(virDomainPtr domain); qemuDomainSaveCookiePtr qemuDomainSaveCookieNew(virDomainObjPtr vm); diff --git a/src/qemu/qemu_process.c b/src/qemu/qemu_process.c index bec822a2ae..7f55f15027 100644 --- a/src/qemu/qemu_process.c +++ b/src/qemu/qemu_process.c @@ -320,6 +320,9 @@ qemuProcessHandleMonitorEOF(qemuMonitorPtr mon, qemuDomainDestroyNamespace(driver, vm); cleanup: + /* Now we got EOF we're not expecting more I/O, so we + * can finally kill the event thread */ + qemuDomainObjStopWorker(vm); virObjectUnlock(vm); } @@ -6908,6 +6911,9 @@ qemuProcessLaunch(virConnectPtr conn, if (rv == -1) /* The VM failed to start */ goto cleanup; + if (qemuDomainObjStartWorker(vm) < 0) + goto cleanup; + VIR_DEBUG("Waiting for monitor to show up"); if (qemuProcessWaitForMonitor(driver, vm, asyncJob, logCtxt) < 0) goto cleanup; @@ -7390,6 +7396,18 @@ void qemuProcessStop(virQEMUDriverPtr driver, priv->monConfig = NULL; } + /* + * We cannot stop the event thread at this time. When + * we are in this code, we may not yet have processed the + * STOP event or EOF from the monitor. So the event loop + * may have pending input that we need to process still. + * The qemuProcessHandleMonitorEOF method will kill + * the event thread because at that point we don't + * expect any more I/O from the QEMU monitor. We are + * assuming we don't need to get any more events from the + * QEMU agent at that time. + */ + /* Remove the master key */ qemuDomainMasterKeyRemove(priv); @@ -7981,6 +7999,9 @@ qemuProcessReconnect(void *opaque) virQEMUCapsGet(priv->qemuCaps, QEMU_CAPS_CHARDEV_FD_PASS)) retry = false; + if (qemuDomainObjStartWorker(obj) < 0) + goto error; + VIR_DEBUG("Reconnect monitor to def=%p name='%s' retry=%d", obj, obj->def->name, retry); -- 2.24.1

In common with regular QEMU guests, the QMP probing will need an event loop for handling monitor I/O operations. Signed-off-by: Daniel P. Berrangé <berrange@redhat.com> --- src/qemu/qemu_process.c | 15 +++++++++++++++ src/qemu/qemu_process.h | 2 ++ 2 files changed, 17 insertions(+) diff --git a/src/qemu/qemu_process.c b/src/qemu/qemu_process.c index 7f55f15027..36111c8882 100644 --- a/src/qemu/qemu_process.c +++ b/src/qemu/qemu_process.c @@ -8373,6 +8373,9 @@ qemuProcessQMPFree(qemuProcessQMPPtr proc) return; qemuProcessQMPStop(proc); + + g_object_unref(proc->eventThread); + VIR_FREE(proc->binary); VIR_FREE(proc->libDir); VIR_FREE(proc->uniqDir); @@ -8404,6 +8407,8 @@ qemuProcessQMPNew(const char *binary, { qemuProcessQMPPtr ret = NULL; qemuProcessQMPPtr proc = NULL; + const char *threadSuffix; + g_autofree char *threadName = NULL; VIR_DEBUG("exec=%s, libDir=%s, runUid=%u, runGid=%u, forceTCG=%d", binary, libDir, runUid, runGid, forceTCG); @@ -8418,6 +8423,16 @@ qemuProcessQMPNew(const char *binary, proc->runGid = runGid; proc->forceTCG = forceTCG; + threadSuffix = strrchr(binary, '-'); + if (threadSuffix) + threadSuffix++; + else + threadSuffix = binary; + threadName = g_strdup_printf("qmp-%s", threadSuffix); + + if (!(proc->eventThread = virEventThreadNew(threadName))) + goto cleanup; + ret = g_steal_pointer(&proc); cleanup: diff --git a/src/qemu/qemu_process.h b/src/qemu/qemu_process.h index 9af9f967fd..3077d3ef9e 100644 --- a/src/qemu/qemu_process.h +++ b/src/qemu/qemu_process.h @@ -24,6 +24,7 @@ #include "qemu_conf.h" #include "qemu_domain.h" #include "virstoragefile.h" +#include "vireventthread.h" int qemuProcessPrepareMonitorChr(virDomainChrSourceDefPtr monConfig, const char *domainDir); @@ -217,6 +218,7 @@ struct _qemuProcessQMP { char *monpath; char *pidfile; char *uniqDir; + virEventThread *eventThread; virCommandPtr cmd; qemuMonitorPtr mon; pid_t pid; -- 2.24.1

Tests which are using the QEMU monitor / agent need to have an event thread running a private GMainContext. There is already a thread running the main libvirt event loop but this can't be eliminated yet as it is used for more than just the monitor client I/O. Signed-off-by: Daniel P. Berrangé <berrange@redhat.com> --- tests/qemumonitortestutils.c | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/tests/qemumonitortestutils.c b/tests/qemumonitortestutils.c index 00f5b49439..a30851aa63 100644 --- a/tests/qemumonitortestutils.c +++ b/tests/qemumonitortestutils.c @@ -36,6 +36,7 @@ #include "virlog.h" #include "virerror.h" #include "virstring.h" +#include "vireventthread.h" #define VIR_FROM_THIS VIR_FROM_NONE @@ -66,6 +67,8 @@ struct _qemuMonitorTest { virNetSocketPtr server; virNetSocketPtr client; + virEventThread *eventThread; + qemuMonitorPtr mon; qemuAgentPtr agent; @@ -389,6 +392,8 @@ qemuMonitorTestFree(qemuMonitorTestPtr test) qemuAgentClose(test->agent); } + g_object_unref(test->eventThread); + virObjectUnref(test->vm); if (test->started) @@ -1138,6 +1143,7 @@ qemuMonitorCommonTestInit(qemuMonitorTestPtr test) "}" /* We skip the normal handshake reply of "{\"execute\":\"qmp_capabilities\"}" */ + qemuMonitorTestPtr qemuMonitorTestNew(virDomainXMLOptionPtr xmlopt, virDomainObjPtr vm, @@ -1153,6 +1159,9 @@ qemuMonitorTestNew(virDomainXMLOptionPtr xmlopt, if (!(test = qemuMonitorCommonTestNew(xmlopt, vm, &src))) goto error; + if (!(test->eventThread = virEventThreadNew("mon-test"))) + goto error; + test->qapischema = schema; if (!(test->mon = qemuMonitorOpen(test->vm, &src, @@ -1391,6 +1400,9 @@ qemuMonitorTestNewAgent(virDomainXMLOptionPtr xmlopt) if (!(test = qemuMonitorCommonTestNew(xmlopt, NULL, &src))) goto error; + if (!(test->eventThread = virEventThreadNew("agent-test"))) + goto error; + if (!(test->agent = qemuAgentOpen(test->vm, &src, &qemuMonitorTestAgentCallbacks))) -- 2.24.1

This converts the QEMU monitor APIs to use the per-VM event loop, which involves switching from virEvent APIs to GMainContext / GSource APIs. A GSocket is used as a convenient way to create a GSource for a socket, but is not yet used for actual I/O. Signed-off-by: Daniel P. Berrangé <berrange@redhat.com> --- src/qemu/qemu_monitor.c | 145 ++++++++++++++++------------------- src/qemu/qemu_monitor.h | 3 +- src/qemu/qemu_process.c | 6 +- tests/qemumonitortestutils.c | 1 + 4 files changed, 71 insertions(+), 84 deletions(-) diff --git a/src/qemu/qemu_monitor.c b/src/qemu/qemu_monitor.c index e54d28b6cc..ba865cc95d 100644 --- a/src/qemu/qemu_monitor.c +++ b/src/qemu/qemu_monitor.c @@ -24,6 +24,7 @@ #include <poll.h> #include <unistd.h> #include <fcntl.h> +#include <gio/gio.h> #include "qemu_monitor.h" #include "qemu_monitor_text.h" @@ -72,12 +73,9 @@ struct _qemuMonitor { int fd; - /* Represents the watch number to be used for updating and - * unregistering the monitor @fd for events in the event loop: - * > 0: valid watch number - * = 0: not registered - * < 0: an error occurred during the registration of @fd */ - int watch; + GMainContext *context; + GSocket *socket; + GSource *watch; virDomainObjPtr vm; @@ -227,6 +225,7 @@ qemuMonitorDispose(void *obj) (mon->cb->destroy)(mon, mon->vm, mon->callbackOpaque); virObjectUnref(mon->vm); + g_main_context_unref(mon->context); virResetError(&mon->lastError); virCondDestroy(&mon->notify); VIR_FREE(mon->buffer); @@ -510,27 +509,16 @@ qemuMonitorIORead(qemuMonitorPtr mon) static void qemuMonitorUpdateWatch(qemuMonitorPtr mon) { - int events = - VIR_EVENT_HANDLE_HANGUP | - VIR_EVENT_HANDLE_ERROR; - - if (!mon->watch) - return; - - if (mon->lastError.code == VIR_ERR_OK) { - events |= VIR_EVENT_HANDLE_READABLE; - - if ((mon->msg && mon->msg->txOffset < mon->msg->txLength) && - !mon->waitGreeting) - events |= VIR_EVENT_HANDLE_WRITABLE; - } - - virEventUpdateHandle(mon->watch, events); + qemuMonitorUnregister(mon); + if (mon->socket) + qemuMonitorRegister(mon); } -static void -qemuMonitorIO(int watch, int fd, int events, void *opaque) +static gboolean +qemuMonitorIO(GSocket *socket G_GNUC_UNUSED, + GIOCondition cond, + gpointer opaque) { qemuMonitorPtr mon = opaque; bool error = false; @@ -542,39 +530,29 @@ qemuMonitorIO(int watch, int fd, int events, void *opaque) /* lock access to the monitor and protect fd */ virObjectLock(mon); #if DEBUG_IO - VIR_DEBUG("Monitor %p I/O on watch %d fd %d events %d", mon, watch, fd, events); + VIR_DEBUG("Monitor %p I/O on socket %p cond %d", mon, socket, cond); #endif - if (mon->fd == -1 || mon->watch == 0) { + if (mon->fd == -1 || !mon->watch) { virObjectUnlock(mon); virObjectUnref(mon); - return; + return G_SOURCE_REMOVE; } - if (mon->fd != fd || mon->watch != watch) { - if (events & (VIR_EVENT_HANDLE_HANGUP | VIR_EVENT_HANDLE_ERROR)) - eof = true; - virReportError(VIR_ERR_INTERNAL_ERROR, - _("event from unexpected fd %d!=%d / watch %d!=%d"), - mon->fd, fd, mon->watch, watch); - error = true; - } else if (mon->lastError.code != VIR_ERR_OK) { - if (events & (VIR_EVENT_HANDLE_HANGUP | VIR_EVENT_HANDLE_ERROR)) + if (mon->lastError.code != VIR_ERR_OK) { + if (cond & (G_IO_HUP | G_IO_ERR)) eof = true; error = true; } else { - if (events & VIR_EVENT_HANDLE_WRITABLE) { + if (cond & G_IO_OUT) { if (qemuMonitorIOWrite(mon) < 0) { error = true; if (errno == ECONNRESET) hangup = true; } - events &= ~VIR_EVENT_HANDLE_WRITABLE; } - if (!error && - events & VIR_EVENT_HANDLE_READABLE) { + if (!error && cond & G_IO_IN) { int got = qemuMonitorIORead(mon); - events &= ~VIR_EVENT_HANDLE_READABLE; if (got < 0) { error = true; if (errno == ECONNRESET) @@ -582,37 +560,29 @@ qemuMonitorIO(int watch, int fd, int events, void *opaque) } else if (got == 0) { eof = true; } else { - /* Ignore hangup/error events if we read some data, to + /* Ignore hangup/error cond if we read some data, to * give time for that data to be consumed */ - events = 0; + cond = 0; if (qemuMonitorIOProcess(mon) < 0) error = true; } } - if (events & VIR_EVENT_HANDLE_HANGUP) { + if (cond & G_IO_HUP) { hangup = true; if (!error) { virReportError(VIR_ERR_INTERNAL_ERROR, "%s", _("End of file from qemu monitor")); eof = true; - events &= ~VIR_EVENT_HANDLE_HANGUP; } } if (!error && !eof && - events & VIR_EVENT_HANDLE_ERROR) { + cond & G_IO_ERR) { virReportError(VIR_ERR_INTERNAL_ERROR, "%s", _("Invalid file descriptor while waiting for monitor")); eof = true; - events &= ~VIR_EVENT_HANDLE_ERROR; - } - if (!error && events) { - virReportError(VIR_ERR_INTERNAL_ERROR, - _("Unhandled event %d for monitor fd %d"), - events, mon->fd); - error = true; } } @@ -680,16 +650,20 @@ qemuMonitorIO(int watch, int fd, int events, void *opaque) virObjectUnlock(mon); virObjectUnref(mon); } + + return G_SOURCE_REMOVE; } static qemuMonitorPtr qemuMonitorOpenInternal(virDomainObjPtr vm, int fd, + GMainContext *context, qemuMonitorCallbacksPtr cb, void *opaque) { qemuMonitorPtr mon; + g_autoptr(GError) gerr = NULL; if (!cb->eofNotify) { virReportError(VIR_ERR_INTERNAL_ERROR, "%s", @@ -714,6 +688,7 @@ qemuMonitorOpenInternal(virDomainObjPtr vm, goto cleanup; } mon->fd = fd; + mon->context = g_main_context_ref(context); mon->vm = virObjectRef(vm); mon->waitGreeting = true; mon->cb = cb; @@ -724,20 +699,17 @@ qemuMonitorOpenInternal(virDomainObjPtr vm, "%s", _("Unable to set monitor close-on-exec flag")); goto cleanup; } - if (virSetNonBlock(mon->fd) < 0) { + + mon->socket = g_socket_new_from_fd(fd, &gerr); + if (!mon->socket) { virReportError(VIR_ERR_INTERNAL_ERROR, - "%s", _("Unable to put monitor into non-blocking mode")); + _("Unable to create socket object: %s"), + gerr->message); goto cleanup; } - virObjectLock(mon); - if (!qemuMonitorRegister(mon)) { - virObjectUnlock(mon); - virReportError(VIR_ERR_INTERNAL_ERROR, "%s", - _("unable to register monitor events")); - goto cleanup; - } + qemuMonitorRegister(mon); PROBE(QEMU_MONITOR_NEW, "mon=%p refs=%d fd=%d", @@ -783,6 +755,7 @@ qemuMonitorOpen(virDomainObjPtr vm, virDomainChrSourceDefPtr config, bool retry, unsigned long long timeout, + GMainContext *context, qemuMonitorCallbacksPtr cb, void *opaque) { @@ -816,7 +789,7 @@ qemuMonitorOpen(virDomainObjPtr vm, goto cleanup; } - ret = qemuMonitorOpenInternal(vm, fd, cb, opaque); + ret = qemuMonitorOpenInternal(vm, fd, context, cb, opaque); cleanup: if (!ret) VIR_FORCE_CLOSE(fd); @@ -831,25 +804,32 @@ qemuMonitorOpen(virDomainObjPtr vm, * * Registers the monitor in the event loop. The caller has to hold the * lock for @mon. - * - * Returns true in case of success, false otherwise */ -bool +void qemuMonitorRegister(qemuMonitorPtr mon) { - virObjectRef(mon); - if ((mon->watch = virEventAddHandle(mon->fd, - VIR_EVENT_HANDLE_HANGUP | - VIR_EVENT_HANDLE_ERROR | - VIR_EVENT_HANDLE_READABLE, - qemuMonitorIO, - mon, - virObjectFreeCallback)) < 0) { - virObjectUnref(mon); - return false; + GIOCondition cond = 0; + + if (mon->lastError.code == VIR_ERR_OK) { + cond |= G_IO_IN; + + if ((mon->msg && mon->msg->txOffset < mon->msg->txLength) && + !mon->waitGreeting) + cond |= G_IO_OUT; } - return true; + mon->watch = g_socket_create_source(mon->socket, + cond, + NULL); + + virObjectRef(mon); + g_source_set_callback(mon->watch, + (GSourceFunc)qemuMonitorIO, + mon, + NULL); + + g_source_attach(mon->watch, + mon->context); } @@ -857,8 +837,9 @@ void qemuMonitorUnregister(qemuMonitorPtr mon) { if (mon->watch) { - virEventRemoveHandle(mon->watch); - mon->watch = 0; + g_source_destroy(mon->watch); + g_source_unref(mon->watch); + mon->watch = NULL; } } @@ -874,9 +855,11 @@ qemuMonitorClose(qemuMonitorPtr mon) qemuMonitorSetDomainLogLocked(mon, NULL, NULL, NULL); - if (mon->fd >= 0) { + if (mon->socket) { qemuMonitorUnregister(mon); - VIR_FORCE_CLOSE(mon->fd); + g_object_unref(mon->socket); + mon->socket = NULL; + mon->fd = -1; } /* In case another thread is waiting for its monitor command to be diff --git a/src/qemu/qemu_monitor.h b/src/qemu/qemu_monitor.h index 2319647a35..6ccea909e9 100644 --- a/src/qemu/qemu_monitor.h +++ b/src/qemu/qemu_monitor.h @@ -382,11 +382,12 @@ qemuMonitorPtr qemuMonitorOpen(virDomainObjPtr vm, virDomainChrSourceDefPtr config, bool retry, unsigned long long timeout, + GMainContext *context, qemuMonitorCallbacksPtr cb, void *opaque) ATTRIBUTE_NONNULL(1) ATTRIBUTE_NONNULL(2) ATTRIBUTE_NONNULL(5); -bool qemuMonitorRegister(qemuMonitorPtr mon) +void qemuMonitorRegister(qemuMonitorPtr mon) ATTRIBUTE_NONNULL(1); void qemuMonitorUnregister(qemuMonitorPtr mon) ATTRIBUTE_NONNULL(1); diff --git a/src/qemu/qemu_process.c b/src/qemu/qemu_process.c index 36111c8882..ea046bcb14 100644 --- a/src/qemu/qemu_process.c +++ b/src/qemu/qemu_process.c @@ -1977,6 +1977,7 @@ qemuConnectMonitor(virQEMUDriverPtr driver, virDomainObjPtr vm, int asyncJob, priv->monConfig, retry, timeout, + virEventThreadGetContext(priv->eventThread), &monitorCallbacks, driver); @@ -8597,8 +8598,9 @@ qemuProcessQMPConnectMonitor(qemuProcessQMPPtr proc) proc->vm->pid = proc->pid; - if (!(proc->mon = qemuMonitorOpen(proc->vm, &monConfig, true, - 0, &callbacks, NULL))) + if (!(proc->mon = qemuMonitorOpen(proc->vm, &monConfig, true, 0, + virEventThreadGetContext(proc->eventThread), + &callbacks, NULL))) goto cleanup; virObjectLock(proc->mon); diff --git a/tests/qemumonitortestutils.c b/tests/qemumonitortestutils.c index a30851aa63..df93aae758 100644 --- a/tests/qemumonitortestutils.c +++ b/tests/qemumonitortestutils.c @@ -1167,6 +1167,7 @@ qemuMonitorTestNew(virDomainXMLOptionPtr xmlopt, &src, true, 0, + virEventThreadGetContext(test->eventThread), &qemuMonitorTestCallbacks, driver))) goto error; -- 2.24.1

On 5. 3. 2020 13:51, Daniel P. Berrangé wrote:
This converts the QEMU monitor APIs to use the per-VM event loop, which involves switching from virEvent APIs to GMainContext / GSource APIs.
A GSocket is used as a convenient way to create a GSource for a socket, but is not yet used for actual I/O.
Signed-off-by: Daniel P. Berrangé <berrange@redhat.com> --- src/qemu/qemu_monitor.c | 145 ++++++++++++++++------------------- src/qemu/qemu_monitor.h | 3 +- src/qemu/qemu_process.c | 6 +- tests/qemumonitortestutils.c | 1 + 4 files changed, 71 insertions(+), 84 deletions(-)
@@ -831,25 +804,32 @@ qemuMonitorOpen(virDomainObjPtr vm, * * Registers the monitor in the event loop. The caller has to hold the * lock for @mon. - * - * Returns true in case of success, false otherwise */ -bool +void qemuMonitorRegister(qemuMonitorPtr mon) { - virObjectRef(mon); - if ((mon->watch = virEventAddHandle(mon->fd, - VIR_EVENT_HANDLE_HANGUP | - VIR_EVENT_HANDLE_ERROR | - VIR_EVENT_HANDLE_READABLE, - qemuMonitorIO, - mon, - virObjectFreeCallback)) < 0) { - virObjectUnref(mon); - return false; + GIOCondition cond = 0; + + if (mon->lastError.code == VIR_ERR_OK) { + cond |= G_IO_IN; + + if ((mon->msg && mon->msg->txOffset < mon->msg->txLength) && + !mon->waitGreeting) + cond |= G_IO_OUT; }
- return true; + mon->watch = g_socket_create_source(mon->socket, + cond, + NULL); + + virObjectRef(mon); + g_source_set_callback(mon->watch, + (GSourceFunc)qemuMonitorIO, + mon, + NULL);
So previously, we passed virObjectFreeCallback, so that @mon is unrefed when removing the watch. But now you're passing NULL which will lead to a memleak. Replace it with "(GDestroyNotify) virObjectUnref". Michal

We are dealing with the QEMU agent, not the monitor. Signed-off-by: Daniel P. Berrangé <berrange@redhat.com> --- src/qemu/qemu_agent.c | 498 +++++++++++++++++++++--------------------- 1 file changed, 249 insertions(+), 249 deletions(-) diff --git a/src/qemu/qemu_agent.c b/src/qemu/qemu_agent.c index 9f3fb9732f..72ea159a9c 100644 --- a/src/qemu/qemu_agent.c +++ b/src/qemu/qemu_agent.c @@ -65,7 +65,7 @@ VIR_LOG_INIT("qemu.qemu_agent"); * static struct { const char *type; - void (*handler)(qemuAgentPtr mon, virJSONValuePtr data); + void (*handler)(qemuAgentPtr agent, virJSONValuePtr data); } eventHandlers[] = { }; */ @@ -78,13 +78,13 @@ struct _qemuAgentMessage { int txOffset; int txLength; - /* Used by the JSON monitor to hold reply / error */ + /* Used by the JSON agent to hold reply / error */ char *rxBuffer; int rxLength; void *rxObject; /* True if rxBuffer / rxObject are ready, or a - * fatal error occurred on the monitor channel + * fatal error occurred on the agent channel */ bool finished; /* true for sync command */ @@ -113,18 +113,18 @@ struct _qemuAgent { * non-NULL */ qemuAgentMessagePtr msg; - /* Buffer incoming data ready for Agent monitor + /* Buffer incoming data ready for agent * code to process & find message boundaries */ size_t bufferOffset; size_t bufferLength; char *buffer; /* If anything went wrong, this will be fed back - * the next monitor msg */ + * the next agent msg */ virError lastError; /* Some guest agent commands don't return anything - * but fire up an event on qemu monitor instead. + * but fire up an event on qemu agent instead. * Take that as indication of successful completion */ qemuAgentEvent await_event; int timeout; @@ -166,71 +166,71 @@ qemuAgentEscapeNonPrintable(const char *text) static void qemuAgentDispose(void *obj) { - qemuAgentPtr mon = obj; - VIR_DEBUG("mon=%p", mon); - if (mon->cb && mon->cb->destroy) - (mon->cb->destroy)(mon, mon->vm); - virCondDestroy(&mon->notify); - VIR_FREE(mon->buffer); - virResetError(&mon->lastError); + qemuAgentPtr agent = obj; + VIR_DEBUG("agent=%p", agent); + if (agent->cb && agent->cb->destroy) + (agent->cb->destroy)(agent, agent->vm); + virCondDestroy(&agent->notify); + VIR_FREE(agent->buffer); + virResetError(&agent->lastError); } static int -qemuAgentOpenUnix(const char *monitor) +qemuAgentOpenUnix(const char *socketpath) { struct sockaddr_un addr; - int monfd; + int agentfd; int ret = -1; - if ((monfd = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) { + if ((agentfd = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) { virReportSystemError(errno, "%s", _("failed to create socket")); return -1; } - if (virSetNonBlock(monfd) < 0) { + if (virSetNonBlock(agentfd) < 0) { virReportSystemError(errno, "%s", _("Unable to put monitor " "into non-blocking mode")); goto error; } - if (virSetCloseExec(monfd) < 0) { + if (virSetCloseExec(agentfd) < 0) { virReportSystemError(errno, "%s", - _("Unable to set monitor " + _("Unable to set agent " "close-on-exec flag")); goto error; } memset(&addr, 0, sizeof(addr)); addr.sun_family = AF_UNIX; - if (virStrcpyStatic(addr.sun_path, monitor) < 0) { + if (virStrcpyStatic(addr.sun_path, socketpath) < 0) { virReportError(VIR_ERR_INTERNAL_ERROR, - _("Agent path %s too big for destination"), monitor); + _("Socket path %s too big for destination"), socketpath); goto error; } - ret = connect(monfd, (struct sockaddr *)&addr, sizeof(addr)); + ret = connect(agentfd, (struct sockaddr *)&addr, sizeof(addr)); if (ret < 0) { virReportSystemError(errno, "%s", - _("failed to connect to monitor socket")); + _("failed to connect to agent socket")); goto error; } - return monfd; + return agentfd; error: - VIR_FORCE_CLOSE(monfd); + VIR_FORCE_CLOSE(agentfd); return -1; } static int -qemuAgentIOProcessEvent(qemuAgentPtr mon, +qemuAgentIOProcessEvent(qemuAgentPtr agent, virJSONValuePtr obj) { const char *type; - VIR_DEBUG("mon=%p obj=%p", mon, obj); + VIR_DEBUG("agent=%p obj=%p", agent, obj); type = virJSONValueObjectGetString(obj, "event"); if (!type) { @@ -245,7 +245,7 @@ qemuAgentIOProcessEvent(qemuAgentPtr mon, virJSONValuePtr data = virJSONValueObjectGet(obj, "data"); VIR_DEBUG("handle %s handler=%p data=%p", type, eventHandlers[i].handler, data); - (eventHandlers[i].handler)(mon, data); + (eventHandlers[i].handler)(agent, data); break; } } @@ -254,7 +254,7 @@ qemuAgentIOProcessEvent(qemuAgentPtr mon, } static int -qemuAgentIOProcessLine(qemuAgentPtr mon, +qemuAgentIOProcessLine(qemuAgentPtr agent, const char *line, qemuAgentMessagePtr msg) { @@ -283,7 +283,7 @@ qemuAgentIOProcessLine(qemuAgentPtr mon, if (virJSONValueObjectHasKey(obj, "QMP") == 1) { ret = 0; } else if (virJSONValueObjectHasKey(obj, "event") == 1) { - ret = qemuAgentIOProcessEvent(mon, obj); + ret = qemuAgentIOProcessEvent(agent, obj); } else if (virJSONValueObjectHasKey(obj, "error") == 1 || virJSONValueObjectHasKey(obj, "return") == 1) { if (msg) { @@ -323,7 +323,7 @@ qemuAgentIOProcessLine(qemuAgentPtr mon, return ret; } -static int qemuAgentIOProcessData(qemuAgentPtr mon, +static int qemuAgentIOProcessData(qemuAgentPtr agent, char *data, size_t len, qemuAgentMessagePtr msg) @@ -347,7 +347,7 @@ static int qemuAgentIOProcessData(qemuAgentPtr mon, int got = nl - (data + used); for (i = 0; i < strlen(LINE_ENDING); i++) data[used + got + i] = '\0'; - if (qemuAgentIOProcessLine(mon, data + used, msg) < 0) + if (qemuAgentIOProcessLine(agent, data + used, msg) < 0) return -1; used += got + strlen(LINE_ENDING); } else { @@ -360,11 +360,11 @@ static int qemuAgentIOProcessData(qemuAgentPtr mon, } /* This method processes data that has been received - * from the monitor. Looking for async events and + * from the agent. Looking for async events and * replies/errors. */ static int -qemuAgentIOProcess(qemuAgentPtr mon) +qemuAgentIOProcess(qemuAgentPtr agent) { int len; qemuAgentMessagePtr msg = NULL; @@ -372,97 +372,97 @@ qemuAgentIOProcess(qemuAgentPtr mon) /* See if there's a message ready for reply; that is, * one that has completed writing all its data. */ - if (mon->msg && mon->msg->txOffset == mon->msg->txLength) - msg = mon->msg; + if (agent->msg && agent->msg->txOffset == agent->msg->txLength) + msg = agent->msg; #if DEBUG_IO # if DEBUG_RAW_IO char *str1 = qemuAgentEscapeNonPrintable(msg ? msg->txBuffer : ""); - char *str2 = qemuAgentEscapeNonPrintable(mon->buffer); + char *str2 = qemuAgentEscapeNonPrintable(agent->buffer); VIR_ERROR(_("Process %zu %p %p [[[%s]]][[[%s]]]"), - mon->bufferOffset, mon->msg, msg, str1, str2); + agent->bufferOffset, agent->msg, msg, str1, str2); VIR_FREE(str1); VIR_FREE(str2); # else - VIR_DEBUG("Process %zu", mon->bufferOffset); + VIR_DEBUG("Process %zu", agent->bufferOffset); # endif #endif - len = qemuAgentIOProcessData(mon, - mon->buffer, mon->bufferOffset, + len = qemuAgentIOProcessData(agent, + agent->buffer, agent->bufferOffset, msg); if (len < 0) return -1; - if (len < mon->bufferOffset) { - memmove(mon->buffer, mon->buffer + len, mon->bufferOffset - len); - mon->bufferOffset -= len; + if (len < agent->bufferOffset) { + memmove(agent->buffer, agent->buffer + len, agent->bufferOffset - len); + agent->bufferOffset -= len; } else { - VIR_FREE(mon->buffer); - mon->bufferOffset = mon->bufferLength = 0; + VIR_FREE(agent->buffer); + agent->bufferOffset = agent->bufferLength = 0; } #if DEBUG_IO - VIR_DEBUG("Process done %zu used %d", mon->bufferOffset, len); + VIR_DEBUG("Process done %zu used %d", agent->bufferOffset, len); #endif if (msg && msg->finished) - virCondBroadcast(&mon->notify); + virCondBroadcast(&agent->notify); return len; } /* - * Called when the monitor is able to write data - * Call this function while holding the monitor lock. + * Called when the agent is able to write data + * Call this function while holding the agent lock. */ static int -qemuAgentIOWrite(qemuAgentPtr mon) +qemuAgentIOWrite(qemuAgentPtr agent) { int done; /* If no active message, or fully transmitted, then no-op */ - if (!mon->msg || mon->msg->txOffset == mon->msg->txLength) + if (!agent->msg || agent->msg->txOffset == agent->msg->txLength) return 0; - done = safewrite(mon->fd, - mon->msg->txBuffer + mon->msg->txOffset, - mon->msg->txLength - mon->msg->txOffset); + done = safewrite(agent->fd, + agent->msg->txBuffer + agent->msg->txOffset, + agent->msg->txLength - agent->msg->txOffset); if (done < 0) { if (errno == EAGAIN) return 0; virReportSystemError(errno, "%s", - _("Unable to write to monitor")); + _("Unable to write to agent")); return -1; } - mon->msg->txOffset += done; + agent->msg->txOffset += done; return done; } /* - * Called when the monitor has incoming data to read - * Call this function while holding the monitor lock. + * Called when the agent has incoming data to read + * Call this function while holding the agent lock. * * Returns -1 on error, or number of bytes read */ static int -qemuAgentIORead(qemuAgentPtr mon) +qemuAgentIORead(qemuAgentPtr agent) { - size_t avail = mon->bufferLength - mon->bufferOffset; + size_t avail = agent->bufferLength - agent->bufferOffset; int ret = 0; if (avail < 1024) { - if (mon->bufferLength >= QEMU_AGENT_MAX_RESPONSE) { + if (agent->bufferLength >= QEMU_AGENT_MAX_RESPONSE) { virReportSystemError(ERANGE, _("No complete agent response found in %d bytes"), QEMU_AGENT_MAX_RESPONSE); return -1; } - if (VIR_REALLOC_N(mon->buffer, - mon->bufferLength + 1024) < 0) + if (VIR_REALLOC_N(agent->buffer, + agent->bufferLength + 1024) < 0) return -1; - mon->bufferLength += 1024; + agent->bufferLength += 1024; avail += 1024; } @@ -470,14 +470,14 @@ qemuAgentIORead(qemuAgentPtr mon) until we block on EAGAIN, or hit EOF */ while (avail > 1) { int got; - got = read(mon->fd, - mon->buffer + mon->bufferOffset, + got = read(agent->fd, + agent->buffer + agent->bufferOffset, avail - 1); if (got < 0) { if (errno == EAGAIN) break; virReportSystemError(errno, "%s", - _("Unable to read from monitor")); + _("Unable to read from agent")); ret = -1; break; } @@ -486,79 +486,79 @@ qemuAgentIORead(qemuAgentPtr mon) ret += got; avail -= got; - mon->bufferOffset += got; - mon->buffer[mon->bufferOffset] = '\0'; + agent->bufferOffset += got; + agent->buffer[agent->bufferOffset] = '\0'; } #if DEBUG_IO - VIR_DEBUG("Now read %zu bytes of data", mon->bufferOffset); + VIR_DEBUG("Now read %zu bytes of data", agent->bufferOffset); #endif return ret; } -static void qemuAgentUpdateWatch(qemuAgentPtr mon) +static void qemuAgentUpdateWatch(qemuAgentPtr agent) { int events = VIR_EVENT_HANDLE_HANGUP | VIR_EVENT_HANDLE_ERROR; - if (!mon->watch) + if (!agent->watch) return; - if (mon->lastError.code == VIR_ERR_OK) { + if (agent->lastError.code == VIR_ERR_OK) { events |= VIR_EVENT_HANDLE_READABLE; - if (mon->msg && mon->msg->txOffset < mon->msg->txLength) + if (agent->msg && agent->msg->txOffset < agent->msg->txLength) events |= VIR_EVENT_HANDLE_WRITABLE; } - virEventUpdateHandle(mon->watch, events); + virEventUpdateHandle(agent->watch, events); } static void qemuAgentIO(int watch, int fd, int events, void *opaque) { - qemuAgentPtr mon = opaque; + qemuAgentPtr agent = opaque; bool error = false; bool eof = false; - virObjectRef(mon); - /* lock access to the monitor and protect fd */ - virObjectLock(mon); + virObjectRef(agent); + /* lock access to the agent and protect fd */ + virObjectLock(agent); #if DEBUG_IO - VIR_DEBUG("Agent %p I/O on watch %d fd %d events %d", mon, watch, fd, events); + VIR_DEBUG("Agent %p I/O on watch %d fd %d events %d", agent, watch, fd, events); #endif - if (mon->fd == -1 || mon->watch == 0) { - virObjectUnlock(mon); - virObjectUnref(mon); + if (agent->fd == -1 || agent->watch == 0) { + virObjectUnlock(agent); + virObjectUnref(agent); return; } - if (mon->fd != fd || mon->watch != watch) { + if (agent->fd != fd || agent->watch != watch) { if (events & (VIR_EVENT_HANDLE_HANGUP | VIR_EVENT_HANDLE_ERROR)) eof = true; virReportError(VIR_ERR_INTERNAL_ERROR, _("event from unexpected fd %d!=%d / watch %d!=%d"), - mon->fd, fd, mon->watch, watch); + agent->fd, fd, agent->watch, watch); error = true; - } else if (mon->lastError.code != VIR_ERR_OK) { + } else if (agent->lastError.code != VIR_ERR_OK) { if (events & (VIR_EVENT_HANDLE_HANGUP | VIR_EVENT_HANDLE_ERROR)) eof = true; error = true; } else { if (events & VIR_EVENT_HANDLE_WRITABLE) { - if (qemuAgentIOWrite(mon) < 0) + if (qemuAgentIOWrite(agent) < 0) error = true; events &= ~VIR_EVENT_HANDLE_WRITABLE; } if (!error && events & VIR_EVENT_HANDLE_READABLE) { - int got = qemuAgentIORead(mon); + int got = qemuAgentIORead(agent); events &= ~VIR_EVENT_HANDLE_READABLE; if (got < 0) { error = true; @@ -569,7 +569,7 @@ qemuAgentIO(int watch, int fd, int events, void *opaque) * give time for that data to be consumed */ events = 0; - if (qemuAgentIOProcess(mon) < 0) + if (qemuAgentIOProcess(agent) < 0) error = true; } } @@ -577,7 +577,7 @@ qemuAgentIO(int watch, int fd, int events, void *opaque) if (!error && events & VIR_EVENT_HANDLE_HANGUP) { virReportError(VIR_ERR_INTERNAL_ERROR, "%s", - _("End of file from agent monitor")); + _("End of file from agent socket")); eof = true; events &= ~VIR_EVENT_HANDLE_HANGUP; } @@ -585,69 +585,69 @@ qemuAgentIO(int watch, int fd, int events, void *opaque) if (!error && !eof && events & VIR_EVENT_HANDLE_ERROR) { virReportError(VIR_ERR_INTERNAL_ERROR, "%s", - _("Invalid file descriptor while waiting for monitor")); + _("Invalid file descriptor while waiting for agent")); eof = true; events &= ~VIR_EVENT_HANDLE_ERROR; } if (!error && events) { virReportError(VIR_ERR_INTERNAL_ERROR, - _("Unhandled event %d for monitor fd %d"), - events, mon->fd); + _("Unhandled event %d for agent fd %d"), + events, agent->fd); error = true; } } if (error || eof) { - if (mon->lastError.code != VIR_ERR_OK) { + if (agent->lastError.code != VIR_ERR_OK) { /* Already have an error, so clear any new error */ virResetLastError(); } else { if (virGetLastErrorCode() == VIR_ERR_OK) virReportError(VIR_ERR_INTERNAL_ERROR, "%s", - _("Error while processing monitor IO")); - virCopyLastError(&mon->lastError); + _("Error while processing agent IO")); + virCopyLastError(&agent->lastError); virResetLastError(); } - VIR_DEBUG("Error on monitor %s", NULLSTR(mon->lastError.message)); + VIR_DEBUG("Error on agent %s", NULLSTR(agent->lastError.message)); /* If IO process resulted in an error & we have a message, * then wakeup that waiter */ - if (mon->msg && !mon->msg->finished) { - mon->msg->finished = 1; - virCondSignal(&mon->notify); + if (agent->msg && !agent->msg->finished) { + agent->msg->finished = 1; + virCondSignal(&agent->notify); } } - qemuAgentUpdateWatch(mon); + qemuAgentUpdateWatch(agent); /* We have to unlock to avoid deadlock against command thread, * but is this safe ? I think it is, because the callback * will try to acquire the virDomainObjPtr mutex next */ if (eof) { void (*eofNotify)(qemuAgentPtr, virDomainObjPtr) - = mon->cb->eofNotify; - virDomainObjPtr vm = mon->vm; + = agent->cb->eofNotify; + virDomainObjPtr vm = agent->vm; /* Make sure anyone waiting wakes up now */ - virCondSignal(&mon->notify); - virObjectUnlock(mon); - virObjectUnref(mon); + virCondSignal(&agent->notify); + virObjectUnlock(agent); + virObjectUnref(agent); VIR_DEBUG("Triggering EOF callback"); - (eofNotify)(mon, vm); + (eofNotify)(agent, vm); } else if (error) { void (*errorNotify)(qemuAgentPtr, virDomainObjPtr) - = mon->cb->errorNotify; - virDomainObjPtr vm = mon->vm; + = agent->cb->errorNotify; + virDomainObjPtr vm = agent->vm; /* Make sure anyone waiting wakes up now */ - virCondSignal(&mon->notify); - virObjectUnlock(mon); - virObjectUnref(mon); + virCondSignal(&agent->notify); + virObjectUnlock(agent); + virObjectUnref(agent); VIR_DEBUG("Triggering error callback"); - (errorNotify)(mon, vm); + (errorNotify)(agent, vm); } else { - virObjectUnlock(mon); - virObjectUnref(mon); + virObjectUnlock(agent); + virObjectUnref(agent); } } @@ -657,7 +657,7 @@ qemuAgentOpen(virDomainObjPtr vm, const virDomainChrSourceDef *config, qemuAgentCallbacksPtr cb) { - qemuAgentPtr mon; + qemuAgentPtr agent; if (!cb || !cb->eofNotify) { virReportError(VIR_ERR_INTERNAL_ERROR, "%s", @@ -668,49 +668,49 @@ qemuAgentOpen(virDomainObjPtr vm, if (qemuAgentInitialize() < 0) return NULL; - if (!(mon = virObjectLockableNew(qemuAgentClass))) + if (!(agent = virObjectLockableNew(qemuAgentClass))) return NULL; - mon->timeout = QEMU_DOMAIN_PRIVATE(vm)->agentTimeout; - mon->fd = -1; - if (virCondInit(&mon->notify) < 0) { + agent->timeout = QEMU_DOMAIN_PRIVATE(vm)->agentTimeout; + agent->fd = -1; + if (virCondInit(&agent->notify) < 0) { virReportSystemError(errno, "%s", - _("cannot initialize monitor condition")); - virObjectUnref(mon); + _("cannot initialize agent condition")); + virObjectUnref(agent); return NULL; } - mon->vm = vm; - mon->cb = cb; + agent->vm = vm; + agent->cb = cb; if (config->type != VIR_DOMAIN_CHR_TYPE_UNIX) { virReportError(VIR_ERR_INTERNAL_ERROR, - _("unable to handle monitor type: %s"), + _("unable to handle agent type: %s"), virDomainChrTypeToString(config->type)); goto cleanup; } - mon->fd = qemuAgentOpenUnix(config->data.nix.path); - if (mon->fd == -1) + agent->fd = qemuAgentOpenUnix(config->data.nix.path); + if (agent->fd == -1) goto cleanup; - virObjectRef(mon); - if ((mon->watch = virEventAddHandle(mon->fd, + virObjectRef(agent); + if ((agent->watch = virEventAddHandle(agent->fd, VIR_EVENT_HANDLE_HANGUP | VIR_EVENT_HANDLE_ERROR | VIR_EVENT_HANDLE_READABLE, qemuAgentIO, - mon, + agent, virObjectFreeCallback)) < 0) { - virObjectUnref(mon); + virObjectUnref(agent); virReportError(VIR_ERR_INTERNAL_ERROR, "%s", - _("unable to register monitor events")); + _("unable to register agent events")); goto cleanup; } - mon->running = true; - VIR_DEBUG("New mon %p fd =%d watch=%d", mon, mon->fd, mon->watch); + agent->running = true; + VIR_DEBUG("New agent %p fd =%d watch=%d", agent, agent->fd, agent->watch); - return mon; + return agent; cleanup: /* We don't want the 'destroy' callback invoked during @@ -718,75 +718,75 @@ qemuAgentOpen(virDomainObjPtr vm, * give a double-unref on virDomainObjPtr in the caller, * so kill the callbacks now. */ - mon->cb = NULL; - qemuAgentClose(mon); + agent->cb = NULL; + qemuAgentClose(agent); return NULL; } static void -qemuAgentNotifyCloseLocked(qemuAgentPtr mon) +qemuAgentNotifyCloseLocked(qemuAgentPtr agent) { - if (mon) { - mon->running = false; + if (agent) { + agent->running = false; /* If there is somebody waiting for a message * wake him up. No message will arrive anyway. */ - if (mon->msg && !mon->msg->finished) { - mon->msg->finished = 1; - virCondSignal(&mon->notify); + if (agent->msg && !agent->msg->finished) { + agent->msg->finished = 1; + virCondSignal(&agent->notify); } } } void -qemuAgentNotifyClose(qemuAgentPtr mon) +qemuAgentNotifyClose(qemuAgentPtr agent) { - if (!mon) + if (!agent) return; - VIR_DEBUG("mon=%p", mon); + VIR_DEBUG("agent=%p", agent); - virObjectLock(mon); - qemuAgentNotifyCloseLocked(mon); - virObjectUnlock(mon); + virObjectLock(agent); + qemuAgentNotifyCloseLocked(agent); + virObjectUnlock(agent); } -void qemuAgentClose(qemuAgentPtr mon) +void qemuAgentClose(qemuAgentPtr agent) { - if (!mon) + if (!agent) return; - VIR_DEBUG("mon=%p", mon); + VIR_DEBUG("agent=%p", agent); - virObjectLock(mon); + virObjectLock(agent); - if (mon->fd >= 0) { - if (mon->watch) { - virEventRemoveHandle(mon->watch); - mon->watch = 0; + if (agent->fd >= 0) { + if (agent->watch) { + virEventRemoveHandle(agent->watch); + agent->watch = 0; } - VIR_FORCE_CLOSE(mon->fd); + VIR_FORCE_CLOSE(agent->fd); } - qemuAgentNotifyCloseLocked(mon); - virObjectUnlock(mon); + qemuAgentNotifyCloseLocked(agent); + virObjectUnlock(agent); - virObjectUnref(mon); + virObjectUnref(agent); } #define QEMU_AGENT_WAIT_TIME 5 /** * qemuAgentSend: - * @mon: Monitor + * @agent: agent object * @msg: Message * @seconds: number of seconds to wait for the result, it can be either * -2, -1, 0 or positive. * - * Send @msg to agent @mon. If @seconds is equal to + * Send @msg to agent @agent. If @seconds is equal to * VIR_DOMAIN_QEMU_AGENT_COMMAND_BLOCK(-2), this function will block forever * waiting for the result. The value of * VIR_DOMAIN_QEMU_AGENT_COMMAND_DEFAULT(-1) means use default timeout value @@ -798,7 +798,7 @@ void qemuAgentClose(qemuAgentPtr mon) * -2 on timeout, * -1 otherwise */ -static int qemuAgentSend(qemuAgentPtr mon, +static int qemuAgentSend(qemuAgentPtr agent, qemuAgentMessagePtr msg, int seconds) { @@ -806,10 +806,10 @@ static int qemuAgentSend(qemuAgentPtr mon, unsigned long long then = 0; /* Check whether qemu quit unexpectedly */ - if (mon->lastError.code != VIR_ERR_OK) { + if (agent->lastError.code != VIR_ERR_OK) { VIR_DEBUG("Attempt to send command while error is set %s", - NULLSTR(mon->lastError.message)); - virSetError(&mon->lastError); + NULLSTR(agent->lastError.message)); + virSetError(&agent->lastError); return -1; } @@ -822,37 +822,37 @@ static int qemuAgentSend(qemuAgentPtr mon, then = now + seconds * 1000ull; } - mon->msg = msg; - qemuAgentUpdateWatch(mon); + agent->msg = msg; + qemuAgentUpdateWatch(agent); - while (!mon->msg->finished) { - if ((then && virCondWaitUntil(&mon->notify, &mon->parent.lock, then) < 0) || - (!then && virCondWait(&mon->notify, &mon->parent.lock) < 0)) { + while (!agent->msg->finished) { + if ((then && virCondWaitUntil(&agent->notify, &agent->parent.lock, then) < 0) || + (!then && virCondWait(&agent->notify, &agent->parent.lock) < 0)) { if (errno == ETIMEDOUT) { virReportError(VIR_ERR_AGENT_UNRESPONSIVE, "%s", _("Guest agent not available for now")); ret = -2; } else { virReportSystemError(errno, "%s", - _("Unable to wait on agent monitor " + _("Unable to wait on agent socket " "condition")); } goto cleanup; } } - if (mon->lastError.code != VIR_ERR_OK) { + if (agent->lastError.code != VIR_ERR_OK) { VIR_DEBUG("Send command resulted in error %s", - NULLSTR(mon->lastError.message)); - virSetError(&mon->lastError); + NULLSTR(agent->lastError.message)); + virSetError(&agent->lastError); goto cleanup; } ret = 0; cleanup: - mon->msg = NULL; - qemuAgentUpdateWatch(mon); + agent->msg = NULL; + qemuAgentUpdateWatch(agent); return ret; } @@ -860,7 +860,7 @@ static int qemuAgentSend(qemuAgentPtr mon, /** * qemuAgentGuestSync: - * @mon: Monitor + * @agent: agent object * * Send guest-sync with unique ID * and wait for reply. If we get one, check if @@ -870,7 +870,7 @@ static int qemuAgentSend(qemuAgentPtr mon, * -1 otherwise */ static int -qemuAgentGuestSync(qemuAgentPtr mon) +qemuAgentGuestSync(qemuAgentPtr agent) { int ret = -1; int send_ret; @@ -880,8 +880,8 @@ qemuAgentGuestSync(qemuAgentPtr mon) /* if user specified a custom agent timeout that is lower than the * default timeout, use the shorter timeout instead */ - if ((mon->timeout >= 0) && (mon->timeout < timeout)) - timeout = mon->timeout; + if ((agent->timeout >= 0) && (agent->timeout < timeout)) + timeout = agent->timeout; memset(&sync_msg, 0, sizeof(sync_msg)); /* set only on first sync */ @@ -900,7 +900,7 @@ qemuAgentGuestSync(qemuAgentPtr mon) VIR_DEBUG("Sending guest-sync command with ID: %llu", id); - send_ret = qemuAgentSend(mon, &sync_msg, timeout); + send_ret = qemuAgentSend(agent, &sync_msg, timeout); VIR_DEBUG("qemuAgentSend returned: %d", send_ret); @@ -913,9 +913,9 @@ qemuAgentGuestSync(qemuAgentPtr mon) memset(&sync_msg, 0, sizeof(sync_msg)); goto retry; } else { - if (mon->running) + if (agent->running) virReportError(VIR_ERR_INTERNAL_ERROR, "%s", - _("Missing monitor reply object")); + _("Missing agent reply object")); else virReportError(VIR_ERR_AGENT_UNRESPONSIVE, "%s", _("Guest agent disappeared while executing command")); @@ -1066,7 +1066,7 @@ qemuAgentCheckError(virJSONValuePtr cmd, } static int -qemuAgentCommand(qemuAgentPtr mon, +qemuAgentCommand(qemuAgentPtr agent, virJSONValuePtr cmd, virJSONValuePtr *reply, bool needReply, @@ -1075,17 +1075,17 @@ qemuAgentCommand(qemuAgentPtr mon, int ret = -1; qemuAgentMessage msg; char *cmdstr = NULL; - int await_event = mon->await_event; + int await_event = agent->await_event; *reply = NULL; - if (!mon->running) { + if (!agent->running) { virReportError(VIR_ERR_AGENT_UNRESPONSIVE, "%s", _("Guest agent disappeared while executing command")); return -1; } - if (qemuAgentGuestSync(mon) < 0) + if (qemuAgentGuestSync(agent) < 0) return -1; memset(&msg, 0, sizeof(msg)); @@ -1097,7 +1097,7 @@ qemuAgentCommand(qemuAgentPtr mon, VIR_DEBUG("Send command '%s' for write, seconds = %d", cmdstr, seconds); - ret = qemuAgentSend(mon, &msg, seconds); + ret = qemuAgentSend(agent, &msg, seconds); VIR_DEBUG("Receive command reply ret=%d rxObject=%p", ret, msg.rxObject); @@ -1109,9 +1109,9 @@ qemuAgentCommand(qemuAgentPtr mon, if (await_event && !needReply) { VIR_DEBUG("Woken up by event %d", await_event); } else { - if (mon->running) + if (agent->running) virReportError(VIR_ERR_INTERNAL_ERROR, "%s", - _("Missing monitor reply object")); + _("Missing agent reply object")); else virReportError(VIR_ERR_AGENT_UNRESPONSIVE, "%s", _("Guest agent disappeared while executing command")); @@ -1184,22 +1184,22 @@ qemuAgentMakeStringsArray(const char **strings, unsigned int len) return NULL; } -void qemuAgentNotifyEvent(qemuAgentPtr mon, +void qemuAgentNotifyEvent(qemuAgentPtr agent, qemuAgentEvent event) { - virObjectLock(mon); + virObjectLock(agent); - VIR_DEBUG("mon=%p event=%d await_event=%d", mon, event, mon->await_event); - if (mon->await_event == event) { - mon->await_event = QEMU_AGENT_EVENT_NONE; + VIR_DEBUG("agent=%p event=%d await_event=%d", agent, event, agent->await_event); + if (agent->await_event == event) { + agent->await_event = QEMU_AGENT_EVENT_NONE; /* somebody waiting for this event, wake him up. */ - if (mon->msg && !mon->msg->finished) { - mon->msg->finished = 1; - virCondSignal(&mon->notify); + if (agent->msg && !agent->msg->finished) { + agent->msg->finished = 1; + virCondSignal(&agent->notify); } } - virObjectUnlock(mon); + virObjectUnlock(agent); } VIR_ENUM_DECL(qemuAgentShutdownMode); @@ -1209,7 +1209,7 @@ VIR_ENUM_IMPL(qemuAgentShutdownMode, "powerdown", "reboot", "halt", ); -int qemuAgentShutdown(qemuAgentPtr mon, +int qemuAgentShutdown(qemuAgentPtr agent, qemuAgentShutdownMode mode) { int ret = -1; @@ -1223,10 +1223,10 @@ int qemuAgentShutdown(qemuAgentPtr mon, return -1; if (mode == QEMU_AGENT_SHUTDOWN_REBOOT) - mon->await_event = QEMU_AGENT_EVENT_RESET; + agent->await_event = QEMU_AGENT_EVENT_RESET; else - mon->await_event = QEMU_AGENT_EVENT_SHUTDOWN; - ret = qemuAgentCommand(mon, cmd, &reply, false, + agent->await_event = QEMU_AGENT_EVENT_SHUTDOWN; + ret = qemuAgentCommand(agent, cmd, &reply, false, VIR_DOMAIN_QEMU_AGENT_COMMAND_SHUTDOWN); virJSONValueFree(cmd); @@ -1236,7 +1236,7 @@ int qemuAgentShutdown(qemuAgentPtr mon, /* * qemuAgentFSFreeze: - * @mon: Agent + * @agent: agent object * @mountpoints: Array of mountpoint paths to be frozen, or NULL for all * @nmountpoints: Number of mountpoints to be frozen, or 0 for all * @@ -1248,7 +1248,7 @@ int qemuAgentShutdown(qemuAgentPtr mon, * Returns: number of file system frozen on success, * -1 on error. */ -int qemuAgentFSFreeze(qemuAgentPtr mon, const char **mountpoints, +int qemuAgentFSFreeze(qemuAgentPtr agent, const char **mountpoints, unsigned int nmountpoints) { int ret = -1; @@ -1269,7 +1269,7 @@ int qemuAgentFSFreeze(qemuAgentPtr mon, const char **mountpoints, if (!cmd) goto cleanup; - if (qemuAgentCommand(mon, cmd, &reply, true, mon->timeout) < 0) + if (qemuAgentCommand(agent, cmd, &reply, true, agent->timeout) < 0) goto cleanup; if (virJSONValueObjectGetNumberInt(reply, "return", &ret) < 0) { @@ -1286,7 +1286,7 @@ int qemuAgentFSFreeze(qemuAgentPtr mon, const char **mountpoints, /* * qemuAgentFSThaw: - * @mon: Agent + * @agent: agent object * * Issue guest-fsfreeze-thaw command to guest agent, * which unfreezes all mounted file systems and returns @@ -1295,7 +1295,7 @@ int qemuAgentFSFreeze(qemuAgentPtr mon, const char **mountpoints, * Returns: number of file system thawed on success, * -1 on error. */ -int qemuAgentFSThaw(qemuAgentPtr mon) +int qemuAgentFSThaw(qemuAgentPtr agent) { int ret = -1; virJSONValuePtr cmd; @@ -1306,7 +1306,7 @@ int qemuAgentFSThaw(qemuAgentPtr mon) if (!cmd) return -1; - if (qemuAgentCommand(mon, cmd, &reply, true, mon->timeout) < 0) + if (qemuAgentCommand(agent, cmd, &reply, true, agent->timeout) < 0) goto cleanup; if (virJSONValueObjectGetNumberInt(reply, "return", &ret) < 0) { @@ -1330,7 +1330,7 @@ VIR_ENUM_IMPL(qemuAgentSuspendMode, ); int -qemuAgentSuspend(qemuAgentPtr mon, +qemuAgentSuspend(qemuAgentPtr agent, unsigned int target) { int ret = -1; @@ -1342,8 +1342,8 @@ qemuAgentSuspend(qemuAgentPtr mon, if (!cmd) return -1; - mon->await_event = QEMU_AGENT_EVENT_SUSPEND; - ret = qemuAgentCommand(mon, cmd, &reply, false, mon->timeout); + agent->await_event = QEMU_AGENT_EVENT_SUSPEND; + ret = qemuAgentCommand(agent, cmd, &reply, false, agent->timeout); virJSONValueFree(cmd); virJSONValueFree(reply); @@ -1351,7 +1351,7 @@ qemuAgentSuspend(qemuAgentPtr mon, } int -qemuAgentArbitraryCommand(qemuAgentPtr mon, +qemuAgentArbitraryCommand(qemuAgentPtr agent, const char *cmd_str, char **result, int timeout) @@ -1372,7 +1372,7 @@ qemuAgentArbitraryCommand(qemuAgentPtr mon, if (!(cmd = virJSONValueFromString(cmd_str))) goto cleanup; - if ((ret = qemuAgentCommand(mon, cmd, &reply, true, timeout)) < 0) + if ((ret = qemuAgentCommand(agent, cmd, &reply, true, timeout)) < 0) goto cleanup; if (!(*result = virJSONValueToString(reply, false))) @@ -1386,7 +1386,7 @@ qemuAgentArbitraryCommand(qemuAgentPtr mon, } int -qemuAgentFSTrim(qemuAgentPtr mon, +qemuAgentFSTrim(qemuAgentPtr agent, unsigned long long minimum) { int ret = -1; @@ -1399,7 +1399,7 @@ qemuAgentFSTrim(qemuAgentPtr mon, if (!cmd) return ret; - ret = qemuAgentCommand(mon, cmd, &reply, false, mon->timeout); + ret = qemuAgentCommand(agent, cmd, &reply, false, agent->timeout); virJSONValueFree(cmd); virJSONValueFree(reply); @@ -1407,7 +1407,7 @@ qemuAgentFSTrim(qemuAgentPtr mon, } int -qemuAgentGetVCPUs(qemuAgentPtr mon, +qemuAgentGetVCPUs(qemuAgentPtr agent, qemuAgentCPUInfoPtr *info) { int ret = -1; @@ -1420,7 +1420,7 @@ qemuAgentGetVCPUs(qemuAgentPtr mon, if (!(cmd = qemuAgentMakeCommand("guest-get-vcpus", NULL))) return -1; - if (qemuAgentCommand(mon, cmd, &reply, true, mon->timeout) < 0) + if (qemuAgentCommand(agent, cmd, &reply, true, agent->timeout) < 0) goto cleanup; if (!(data = virJSONValueObjectGetArray(reply, "return"))) { @@ -1482,7 +1482,7 @@ qemuAgentGetVCPUs(qemuAgentPtr mon, /* returns the value provided by the guest agent or -1 on internal error */ static int -qemuAgentSetVCPUsCommand(qemuAgentPtr mon, +qemuAgentSetVCPUsCommand(qemuAgentPtr agent, qemuAgentCPUInfoPtr info, size_t ninfo, int *nmodified) @@ -1533,7 +1533,7 @@ qemuAgentSetVCPUsCommand(qemuAgentPtr mon, NULL))) goto cleanup; - if (qemuAgentCommand(mon, cmd, &reply, true, mon->timeout) < 0) + if (qemuAgentCommand(agent, cmd, &reply, true, agent->timeout) < 0) goto cleanup; /* All negative values are invalid. Return of 0 is bogus since we wouldn't @@ -1564,7 +1564,7 @@ qemuAgentSetVCPUsCommand(qemuAgentPtr mon, * Returns -1 on error, 0 on success. */ int -qemuAgentSetVCPUs(qemuAgentPtr mon, +qemuAgentSetVCPUs(qemuAgentPtr agent, qemuAgentCPUInfoPtr info, size_t ninfo) { @@ -1573,7 +1573,7 @@ qemuAgentSetVCPUs(qemuAgentPtr mon, size_t i; do { - if ((rv = qemuAgentSetVCPUsCommand(mon, info, ninfo, &nmodified)) < 0) + if ((rv = qemuAgentSetVCPUsCommand(agent, info, ninfo, &nmodified)) < 0) return -1; /* all vcpus were set successfully */ @@ -1673,7 +1673,7 @@ qemuAgentUpdateCPUInfo(unsigned int nvcpus, int -qemuAgentGetHostname(qemuAgentPtr mon, +qemuAgentGetHostname(qemuAgentPtr agent, char **hostname) { int ret = -1; @@ -1688,7 +1688,7 @@ qemuAgentGetHostname(qemuAgentPtr mon, if (!cmd) return ret; - if (qemuAgentCommand(mon, cmd, &reply, true, mon->timeout) < 0) { + if (qemuAgentCommand(agent, cmd, &reply, true, agent->timeout) < 0) { if (qemuAgentErrorCommandUnsupported(reply)) ret = -2; goto cleanup; @@ -1718,7 +1718,7 @@ qemuAgentGetHostname(qemuAgentPtr mon, int -qemuAgentGetTime(qemuAgentPtr mon, +qemuAgentGetTime(qemuAgentPtr agent, long long *seconds, unsigned int *nseconds) { @@ -1732,7 +1732,7 @@ qemuAgentGetTime(qemuAgentPtr mon, if (!cmd) return ret; - if (qemuAgentCommand(mon, cmd, &reply, true, mon->timeout) < 0) + if (qemuAgentCommand(agent, cmd, &reply, true, agent->timeout) < 0) goto cleanup; if (virJSONValueObjectGetNumberUlong(reply, "return", &json_time) < 0) { @@ -1760,7 +1760,7 @@ qemuAgentGetTime(qemuAgentPtr mon, * @sync: let guest agent to read domain's RTC (@setTime is ignored) */ int -qemuAgentSetTime(qemuAgentPtr mon, +qemuAgentSetTime(qemuAgentPtr agent, long long seconds, unsigned int nseconds, bool rtcSync) @@ -1777,7 +1777,7 @@ qemuAgentSetTime(qemuAgentPtr mon, long long json_time; /* Check if we overflow. For some reason qemu doesn't handle unsigned - * long long on the monitor well as it silently truncates numbers to + * long long on the agent well as it silently truncates numbers to * signed long long. Therefore we must check overflow against LLONG_MAX * not ULLONG_MAX. */ if (seconds > LLONG_MAX / 1000000000LL) { @@ -1797,7 +1797,7 @@ qemuAgentSetTime(qemuAgentPtr mon, if (!cmd) return ret; - if (qemuAgentCommand(mon, cmd, &reply, true, mon->timeout) < 0) + if (qemuAgentCommand(agent, cmd, &reply, true, agent->timeout) < 0) goto cleanup; ret = 0; @@ -1919,7 +1919,7 @@ qemuAgentGetFSInfoFillDisks(virJSONValuePtr jsondisks, * -1 otherwise */ int -qemuAgentGetFSInfo(qemuAgentPtr mon, +qemuAgentGetFSInfo(qemuAgentPtr agent, qemuAgentFSInfoPtr **info) { size_t i; @@ -1934,7 +1934,7 @@ qemuAgentGetFSInfo(qemuAgentPtr mon, if (!cmd) return ret; - if (qemuAgentCommand(mon, cmd, &reply, true, mon->timeout) < 0) { + if (qemuAgentCommand(agent, cmd, &reply, true, agent->timeout) < 0) { if (qemuAgentErrorCommandUnsupported(reply)) ret = -2; goto cleanup; @@ -2052,7 +2052,7 @@ qemuAgentGetFSInfo(qemuAgentPtr mon, /* * qemuAgentGetInterfaces: - * @mon: Agent monitor + * @agent: agent object * @ifaces: pointer to an array of pointers pointing to interface objects * * Issue guest-network-get-interfaces to guest agent, which returns a @@ -2062,7 +2062,7 @@ qemuAgentGetFSInfo(qemuAgentPtr mon, * Returns: number of interfaces on success, -1 on error. */ int -qemuAgentGetInterfaces(qemuAgentPtr mon, +qemuAgentGetInterfaces(qemuAgentPtr agent, virDomainInterfacePtr **ifaces) { int ret = -1; @@ -2085,7 +2085,7 @@ qemuAgentGetInterfaces(qemuAgentPtr mon, if (!(cmd = qemuAgentMakeCommand("guest-network-get-interfaces", NULL))) goto cleanup; - if (qemuAgentCommand(mon, cmd, &reply, true, mon->timeout) < 0) + if (qemuAgentCommand(agent, cmd, &reply, true, agent->timeout) < 0) goto cleanup; if (!(ret_array = virJSONValueObjectGet(reply, "return"))) { @@ -2242,7 +2242,7 @@ qemuAgentGetInterfaces(qemuAgentPtr mon, int -qemuAgentSetUserPassword(qemuAgentPtr mon, +qemuAgentSetUserPassword(qemuAgentPtr agent, const char *user, const char *password, bool crypted) @@ -2262,7 +2262,7 @@ qemuAgentSetUserPassword(qemuAgentPtr mon, NULL))) goto cleanup; - if (qemuAgentCommand(mon, cmd, &reply, true, mon->timeout) < 0) + if (qemuAgentCommand(agent, cmd, &reply, true, agent->timeout) < 0) goto cleanup; ret = 0; @@ -2279,7 +2279,7 @@ qemuAgentSetUserPassword(qemuAgentPtr mon, * -1 otherwise */ int -qemuAgentGetUsers(qemuAgentPtr mon, +qemuAgentGetUsers(qemuAgentPtr agent, virTypedParameterPtr *params, int *nparams, int *maxparams) @@ -2293,7 +2293,7 @@ qemuAgentGetUsers(qemuAgentPtr mon, if (!(cmd = qemuAgentMakeCommand("guest-get-users", NULL))) return -1; - if (qemuAgentCommand(mon, cmd, &reply, true, mon->timeout) < 0) { + if (qemuAgentCommand(agent, cmd, &reply, true, agent->timeout) < 0) { if (qemuAgentErrorCommandUnsupported(reply)) return -2; return -1; @@ -2370,7 +2370,7 @@ qemuAgentGetUsers(qemuAgentPtr mon, * -1 otherwise */ int -qemuAgentGetOSInfo(qemuAgentPtr mon, +qemuAgentGetOSInfo(qemuAgentPtr agent, virTypedParameterPtr *params, int *nparams, int *maxparams) @@ -2382,7 +2382,7 @@ qemuAgentGetOSInfo(qemuAgentPtr mon, if (!(cmd = qemuAgentMakeCommand("guest-get-osinfo", NULL))) return -1; - if (qemuAgentCommand(mon, cmd, &reply, true, mon->timeout) < 0) { + if (qemuAgentCommand(agent, cmd, &reply, true, agent->timeout) < 0) { if (qemuAgentErrorCommandUnsupported(reply)) return -2; return -1; @@ -2423,7 +2423,7 @@ qemuAgentGetOSInfo(qemuAgentPtr mon, * -1 otherwise */ int -qemuAgentGetTimezone(qemuAgentPtr mon, +qemuAgentGetTimezone(qemuAgentPtr agent, virTypedParameterPtr *params, int *nparams, int *maxparams) @@ -2437,7 +2437,7 @@ qemuAgentGetTimezone(qemuAgentPtr mon, if (!(cmd = qemuAgentMakeCommand("guest-get-timezone", NULL))) return -1; - if (qemuAgentCommand(mon, cmd, &reply, true, mon->timeout) < 0) { + if (qemuAgentCommand(agent, cmd, &reply, true, agent->timeout) < 0) { if (qemuAgentErrorCommandUnsupported(reply)) return -2; return -1; @@ -2468,14 +2468,14 @@ qemuAgentGetTimezone(qemuAgentPtr mon, } /* qemuAgentSetResponseTimeout: - * mon: agent monitor - * timeout: number of seconds to wait for agent response + * @agent: agent object + * @timeout: number of seconds to wait for agent response * * The agent object must be locked prior to calling this function. */ void -qemuAgentSetResponseTimeout(qemuAgentPtr mon, +qemuAgentSetResponseTimeout(qemuAgentPtr agent, int timeout) { - mon->timeout = timeout; + agent->timeout = timeout; } -- 2.24.1

This converts the QEMU agent APIs to use the per-VM event loop, which involves switching from virEvent APIs to GMainContext / GSource APIs. A GSocket is used as a convenient way to create a GSource for a socket, but is not yet used for actual I/O. Signed-off-by: Daniel P. Berrangé <berrange@redhat.com> --- src/qemu/qemu_agent.c | 146 +++++++++++++++++++---------------- src/qemu/qemu_agent.h | 1 + src/qemu/qemu_process.c | 1 + tests/qemumonitortestutils.c | 1 + 4 files changed, 84 insertions(+), 65 deletions(-) diff --git a/src/qemu/qemu_agent.c b/src/qemu/qemu_agent.c index 72ea159a9c..1655e26212 100644 --- a/src/qemu/qemu_agent.c +++ b/src/qemu/qemu_agent.c @@ -25,6 +25,7 @@ #include <unistd.h> #include <fcntl.h> #include <sys/time.h> +#include <gio/gio.h> #include "qemu_agent.h" #include "qemu_domain.h" @@ -101,7 +102,10 @@ struct _qemuAgent { virCond notify; int fd; - int watch; + + GMainContext *context; + GSocket *socket; + GSource *watch; bool running; @@ -172,6 +176,7 @@ static void qemuAgentDispose(void *obj) (agent->cb->destroy)(agent, agent->vm); virCondDestroy(&agent->notify); VIR_FREE(agent->buffer); + g_main_context_unref(agent->context); virResetError(&agent->lastError); } @@ -188,13 +193,6 @@ qemuAgentOpenUnix(const char *socketpath) return -1; } - if (virSetNonBlock(agentfd) < 0) { - virReportSystemError(errno, "%s", - _("Unable to put monitor " - "into non-blocking mode")); - goto error; - } - if (virSetCloseExec(agentfd) < 0) { virReportSystemError(errno, "%s", _("Unable to set agent " @@ -498,28 +496,62 @@ qemuAgentIORead(qemuAgentPtr agent) } -static void qemuAgentUpdateWatch(qemuAgentPtr agent) -{ - int events = - VIR_EVENT_HANDLE_HANGUP | - VIR_EVENT_HANDLE_ERROR; +static gboolean +qemuAgentIO(GSocket *socket, + GIOCondition cond, + gpointer opaque); - if (!agent->watch) - return; + +static void +qemuAgentRegister(qemuAgentPtr agent) +{ + GIOCondition cond = 0; if (agent->lastError.code == VIR_ERR_OK) { - events |= VIR_EVENT_HANDLE_READABLE; + cond |= G_IO_IN; if (agent->msg && agent->msg->txOffset < agent->msg->txLength) - events |= VIR_EVENT_HANDLE_WRITABLE; + cond |= G_IO_OUT; } - virEventUpdateHandle(agent->watch, events); + agent->watch = g_socket_create_source(agent->socket, + cond, + NULL); + + virObjectRef(agent); + g_source_set_callback(agent->watch, + (GSourceFunc)qemuAgentIO, + agent, + NULL); + + g_source_attach(agent->watch, + agent->context); } static void -qemuAgentIO(int watch, int fd, int events, void *opaque) +qemuAgentUnregister(qemuAgentPtr agent) +{ + if (agent->watch) { + g_source_destroy(agent->watch); + g_source_unref(agent->watch); + agent->watch = NULL; + } +} + + +static void qemuAgentUpdateWatch(qemuAgentPtr agent) +{ + qemuAgentUnregister(agent); + if (agent->socket) + qemuAgentRegister(agent); +} + + +static gboolean +qemuAgentIO(GSocket *socket G_GNUC_UNUSED, + GIOCondition cond, + gpointer opaque) { qemuAgentPtr agent = opaque; bool error = false; @@ -529,45 +561,36 @@ qemuAgentIO(int watch, int fd, int events, void *opaque) /* lock access to the agent and protect fd */ virObjectLock(agent); #if DEBUG_IO - VIR_DEBUG("Agent %p I/O on watch %d fd %d events %d", agent, watch, fd, events); + VIR_DEBUG("Agent %p I/O on watch %d socket %p cond %d", agent, agent->socket, cond); #endif - if (agent->fd == -1 || agent->watch == 0) { + if (agent->fd == -1 || !agent->watch) { virObjectUnlock(agent); virObjectUnref(agent); - return; + return G_SOURCE_REMOVE; } - if (agent->fd != fd || agent->watch != watch) { - if (events & (VIR_EVENT_HANDLE_HANGUP | VIR_EVENT_HANDLE_ERROR)) - eof = true; - virReportError(VIR_ERR_INTERNAL_ERROR, - _("event from unexpected fd %d!=%d / watch %d!=%d"), - agent->fd, fd, agent->watch, watch); - error = true; - } else if (agent->lastError.code != VIR_ERR_OK) { - if (events & (VIR_EVENT_HANDLE_HANGUP | VIR_EVENT_HANDLE_ERROR)) + if (agent->lastError.code != VIR_ERR_OK) { + if (cond & (G_IO_HUP | G_IO_ERR)) eof = true; error = true; } else { - if (events & VIR_EVENT_HANDLE_WRITABLE) { + if (cond & G_IO_OUT) { if (qemuAgentIOWrite(agent) < 0) error = true; - events &= ~VIR_EVENT_HANDLE_WRITABLE; } if (!error && - events & VIR_EVENT_HANDLE_READABLE) { + cond & G_IO_IN) { int got = qemuAgentIORead(agent); - events &= ~VIR_EVENT_HANDLE_READABLE; if (got < 0) { error = true; } else if (got == 0) { eof = true; } else { - /* Ignore hangup/error events if we read some data, to + /* Ignore hangup/error cond if we read some data, to * give time for that data to be consumed */ - events = 0; + cond = 0; if (qemuAgentIOProcess(agent) < 0) error = true; @@ -575,25 +598,17 @@ qemuAgentIO(int watch, int fd, int events, void *opaque) } if (!error && - events & VIR_EVENT_HANDLE_HANGUP) { + cond & G_IO_HUP) { virReportError(VIR_ERR_INTERNAL_ERROR, "%s", _("End of file from agent socket")); eof = true; - events &= ~VIR_EVENT_HANDLE_HANGUP; } if (!error && !eof && - events & VIR_EVENT_HANDLE_ERROR) { + cond & G_IO_ERR) { virReportError(VIR_ERR_INTERNAL_ERROR, "%s", _("Invalid file descriptor while waiting for agent")); eof = true; - events &= ~VIR_EVENT_HANDLE_ERROR; - } - if (!error && events) { - virReportError(VIR_ERR_INTERNAL_ERROR, - _("Unhandled event %d for agent fd %d"), - events, agent->fd); - error = true; } } @@ -649,15 +664,19 @@ qemuAgentIO(int watch, int fd, int events, void *opaque) virObjectUnlock(agent); virObjectUnref(agent); } + + return G_SOURCE_REMOVE; } qemuAgentPtr qemuAgentOpen(virDomainObjPtr vm, const virDomainChrSourceDef *config, + GMainContext *context, qemuAgentCallbacksPtr cb) { qemuAgentPtr agent; + g_autoptr(GError) gerr = NULL; if (!cb || !cb->eofNotify) { virReportError(VIR_ERR_INTERNAL_ERROR, "%s", @@ -693,22 +712,20 @@ qemuAgentOpen(virDomainObjPtr vm, if (agent->fd == -1) goto cleanup; - virObjectRef(agent); - if ((agent->watch = virEventAddHandle(agent->fd, - VIR_EVENT_HANDLE_HANGUP | - VIR_EVENT_HANDLE_ERROR | - VIR_EVENT_HANDLE_READABLE, - qemuAgentIO, - agent, - virObjectFreeCallback)) < 0) { - virObjectUnref(agent); - virReportError(VIR_ERR_INTERNAL_ERROR, "%s", - _("unable to register agent events")); + agent->context = g_main_context_ref(context); + + agent->socket = g_socket_new_from_fd(agent->fd, &gerr); + if (!agent->socket) { + virReportError(VIR_ERR_INTERNAL_ERROR, + _("Unable to create socket object: %s"), + gerr->message); goto cleanup; } + qemuAgentRegister(agent); + agent->running = true; - VIR_DEBUG("New agent %p fd =%d watch=%d", agent, agent->fd, agent->watch); + VIR_DEBUG("New agent %p fd=%d", agent, agent->fd); return agent; @@ -763,12 +780,11 @@ void qemuAgentClose(qemuAgentPtr agent) virObjectLock(agent); - if (agent->fd >= 0) { - if (agent->watch) { - virEventRemoveHandle(agent->watch); - agent->watch = 0; - } - VIR_FORCE_CLOSE(agent->fd); + if (agent->socket) { + qemuAgentUnregister(agent); + g_object_unref(agent->socket); + agent->socket = NULL; + agent->fd = -1; } qemuAgentNotifyCloseLocked(agent); diff --git a/src/qemu/qemu_agent.h b/src/qemu/qemu_agent.h index 5656fe60ff..d4d8615323 100644 --- a/src/qemu/qemu_agent.h +++ b/src/qemu/qemu_agent.h @@ -41,6 +41,7 @@ struct _qemuAgentCallbacks { qemuAgentPtr qemuAgentOpen(virDomainObjPtr vm, const virDomainChrSourceDef *config, + GMainContext *context, qemuAgentCallbacksPtr cb); void qemuAgentClose(qemuAgentPtr mon); diff --git a/src/qemu/qemu_process.c b/src/qemu/qemu_process.c index ea046bcb14..a235064320 100644 --- a/src/qemu/qemu_process.c +++ b/src/qemu/qemu_process.c @@ -237,6 +237,7 @@ qemuConnectAgent(virQEMUDriverPtr driver, virDomainObjPtr vm) agent = qemuAgentOpen(vm, config->source, + virEventThreadGetContext(priv->eventThread), &agentCallbacks); virObjectLock(vm); diff --git a/tests/qemumonitortestutils.c b/tests/qemumonitortestutils.c index df93aae758..328bfb8525 100644 --- a/tests/qemumonitortestutils.c +++ b/tests/qemumonitortestutils.c @@ -1406,6 +1406,7 @@ qemuMonitorTestNewAgent(virDomainXMLOptionPtr xmlopt) if (!(test->agent = qemuAgentOpen(test->vm, &src, + virEventThreadGetContext(test->eventThread), &qemuMonitorTestAgentCallbacks))) goto error; -- 2.24.1

On 5. 3. 2020 13:51, Daniel P. Berrangé wrote:
This converts the QEMU agent APIs to use the per-VM event loop, which involves switching from virEvent APIs to GMainContext / GSource APIs.
A GSocket is used as a convenient way to create a GSource for a socket, but is not yet used for actual I/O.
Signed-off-by: Daniel P. Berrangé <berrange@redhat.com> --- src/qemu/qemu_agent.c | 146 +++++++++++++++++++---------------- src/qemu/qemu_agent.h | 1 + src/qemu/qemu_process.c | 1 + tests/qemumonitortestutils.c | 1 + 4 files changed, 84 insertions(+), 65 deletions(-)
@@ -498,28 +496,62 @@ qemuAgentIORead(qemuAgentPtr agent) }
-static void qemuAgentUpdateWatch(qemuAgentPtr agent) -{ - int events = - VIR_EVENT_HANDLE_HANGUP | - VIR_EVENT_HANDLE_ERROR; +static gboolean +qemuAgentIO(GSocket *socket, + GIOCondition cond, + gpointer opaque);
- if (!agent->watch) - return; + +static void +qemuAgentRegister(qemuAgentPtr agent) +{ + GIOCondition cond = 0;
if (agent->lastError.code == VIR_ERR_OK) { - events |= VIR_EVENT_HANDLE_READABLE; + cond |= G_IO_IN;
if (agent->msg && agent->msg->txOffset < agent->msg->txLength) - events |= VIR_EVENT_HANDLE_WRITABLE; + cond |= G_IO_OUT; }
- virEventUpdateHandle(agent->watch, events); + agent->watch = g_socket_create_source(agent->socket, + cond, + NULL); + + virObjectRef(agent); + g_source_set_callback(agent->watch, + (GSourceFunc)qemuAgentIO, + agent, + NULL);
And again, this needs to be "(GDestroyNotify) virObjectUnref" instead of NULL. Michal

On 5. 3. 2020 13:51, Daniel P. Berrangé wrote:
This series changes the way we manage the QEMU monitor and QEMU agent, such that all I/O is processed by a dedicated event loop thread.
Many times in the past years people are reported issues where long running monitor event callbacks block the main libvirtd event loop for an unacceptably long period of time. In the best case, this delays other work being completed, but in bad cases it leads to mgmt app failures when keepalive times trigger a client disconnect.
With this series, when we spawn QEMU, we also spawn a dedicated thread running a GMainLoop instance. Then QEMU monitor and QEMU agent UNIX sockets are switched to use GMainContext for events instead of the traditional libvirt event loop APIs. We kill off the event thread when we see EOF on the QEMU monitor during shutdown.
The cost of this approach is one extra thread per VM, which incurs a new OS process and a new stack allocation.
The QEMU driver already delegates some QMP event handling to a thread pool for certain types of event. This was a previous hack to mitigate the impact on the main event loop. It is likely that we can remove this thread pool from the QEMU driver & rely on the per-VM event threads to do all the work. This will, however, require careful analysis of each handler we pushed into the thread pool to make sure its work doesn't have a dependency on the event loop running in parallel.
This is one step towards eliminating the need to have the libvirt event loop registered when using the embedded QEMU driver. A further step is using a thread to dispatch the lifecycle events, since that currently relies on a zero second timer being registered with the event loop.
Changed in v2:
- Fixed race accessing free'd memory causing crash - Fixed unused variables - Merged first acked patches
Daniel P. Berrangé (7): src: introduce an abstraction for running event loops qemu: start/stop an event loop thread for domains qemu: start/stop an event thread for QMP probing tests: start/stop an event thread for QEMU monitor/agent tests qemu: convert monitor to use the per-VM event loop qemu: fix variable naming in agent code qemu: convert agent to use the per-VM event loop
po/POTFILES.in | 1 + src/libvirt_private.syms | 5 + src/qemu/qemu_agent.c | 600 ++++++++++++++++++----------------- src/qemu/qemu_agent.h | 1 + src/qemu/qemu_domain.c | 33 ++ src/qemu/qemu_domain.h | 6 + src/qemu/qemu_monitor.c | 145 ++++----- src/qemu/qemu_monitor.h | 3 +- src/qemu/qemu_process.c | 43 ++- src/qemu/qemu_process.h | 2 + src/util/Makefile.inc.am | 2 + src/util/vireventthread.c | 190 +++++++++++ src/util/vireventthread.h | 31 ++ tests/qemumonitortestutils.c | 14 + 14 files changed, 700 insertions(+), 376 deletions(-) create mode 100644 src/util/vireventthread.c create mode 100644 src/util/vireventthread.h
Reviewed-by: Michal Privoznik <mprivozn@redhat.com> Should we also delete the event loop from the qemu_shim.c then? Michal

On Fri, Mar 06, 2020 at 02:42:45PM +0100, Michal Prívozník wrote:
On 5. 3. 2020 13:51, Daniel P. Berrangé wrote:
This series changes the way we manage the QEMU monitor and QEMU agent, such that all I/O is processed by a dedicated event loop thread.
Many times in the past years people are reported issues where long running monitor event callbacks block the main libvirtd event loop for an unacceptably long period of time. In the best case, this delays other work being completed, but in bad cases it leads to mgmt app failures when keepalive times trigger a client disconnect.
With this series, when we spawn QEMU, we also spawn a dedicated thread running a GMainLoop instance. Then QEMU monitor and QEMU agent UNIX sockets are switched to use GMainContext for events instead of the traditional libvirt event loop APIs. We kill off the event thread when we see EOF on the QEMU monitor during shutdown.
The cost of this approach is one extra thread per VM, which incurs a new OS process and a new stack allocation.
The QEMU driver already delegates some QMP event handling to a thread pool for certain types of event. This was a previous hack to mitigate the impact on the main event loop. It is likely that we can remove this thread pool from the QEMU driver & rely on the per-VM event threads to do all the work. This will, however, require careful analysis of each handler we pushed into the thread pool to make sure its work doesn't have a dependency on the event loop running in parallel.
This is one step towards eliminating the need to have the libvirt event loop registered when using the embedded QEMU driver. A further step is using a thread to dispatch the lifecycle events, since that currently relies on a zero second timer being registered with the event loop.
Changed in v2:
- Fixed race accessing free'd memory causing crash - Fixed unused variables - Merged first acked patches
Daniel P. Berrangé (7): src: introduce an abstraction for running event loops qemu: start/stop an event loop thread for domains qemu: start/stop an event thread for QMP probing tests: start/stop an event thread for QEMU monitor/agent tests qemu: convert monitor to use the per-VM event loop qemu: fix variable naming in agent code qemu: convert agent to use the per-VM event loop
po/POTFILES.in | 1 + src/libvirt_private.syms | 5 + src/qemu/qemu_agent.c | 600 ++++++++++++++++++----------------- src/qemu/qemu_agent.h | 1 + src/qemu/qemu_domain.c | 33 ++ src/qemu/qemu_domain.h | 6 + src/qemu/qemu_monitor.c | 145 ++++----- src/qemu/qemu_monitor.h | 3 +- src/qemu/qemu_process.c | 43 ++- src/qemu/qemu_process.h | 2 + src/util/Makefile.inc.am | 2 + src/util/vireventthread.c | 190 +++++++++++ src/util/vireventthread.h | 31 ++ tests/qemumonitortestutils.c | 14 + 14 files changed, 700 insertions(+), 376 deletions(-) create mode 100644 src/util/vireventthread.c create mode 100644 src/util/vireventthread.h
Reviewed-by: Michal Privoznik <mprivozn@redhat.com>
Should we also delete the event loop from the qemu_shim.c then?
I've got patches for that, but its not possible until we can make the virObjectEventState struct use a thread for dispatch, instead of using a event loop zero-second timer. Regards, Daniel -- |: https://berrange.com -o- https://www.flickr.com/photos/dberrange :| |: https://libvirt.org -o- https://fstop138.berrange.com :| |: https://entangle-photo.org -o- https://www.instagram.com/dberrange :|
participants (2)
-
Daniel P. Berrangé
-
Michal Prívozník