This converts the QEMU monitor APIs to use the per-VM
event loop, which involves switching from virEvent APIs
to GMainContext / GSource APIs.
A GSocket is used as a convenient way to create a GSource
for a socket, but is not yet used for actual I/O.
Signed-off-by: Daniel P. Berrangé <berrange(a)redhat.com>
---
src/qemu/qemu_monitor.c | 145 ++++++++++++++++-------------------
src/qemu/qemu_monitor.h | 3 +-
src/qemu/qemu_process.c | 6 +-
tests/qemumonitortestutils.c | 1 +
4 files changed, 71 insertions(+), 84 deletions(-)
diff --git a/src/qemu/qemu_monitor.c b/src/qemu/qemu_monitor.c
index bf53962872..d969853963 100644
--- a/src/qemu/qemu_monitor.c
+++ b/src/qemu/qemu_monitor.c
@@ -24,6 +24,7 @@
#include <poll.h>
#include <unistd.h>
#include <fcntl.h>
+#include <gio/gio.h>
#include "qemu_monitor.h"
#include "qemu_monitor_text.h"
@@ -71,12 +72,9 @@ struct _qemuMonitor {
int fd;
- /* Represents the watch number to be used for updating and
- * unregistering the monitor @fd for events in the event loop:
- * > 0: valid watch number
- * = 0: not registered
- * < 0: an error occurred during the registration of @fd */
- int watch;
+ GMainContext *context;
+ GSocket *socket;
+ GSource *watch;
virDomainObjPtr vm;
@@ -226,6 +224,7 @@ qemuMonitorDispose(void *obj)
(mon->cb->destroy)(mon, mon->vm, mon->callbackOpaque);
virObjectUnref(mon->vm);
+ g_main_context_unref(mon->context);
virResetError(&mon->lastError);
virCondDestroy(&mon->notify);
VIR_FREE(mon->buffer);
@@ -509,27 +508,16 @@ qemuMonitorIORead(qemuMonitorPtr mon)
static void
qemuMonitorUpdateWatch(qemuMonitorPtr mon)
{
- int events =
- VIR_EVENT_HANDLE_HANGUP |
- VIR_EVENT_HANDLE_ERROR;
-
- if (!mon->watch)
- return;
-
- if (mon->lastError.code == VIR_ERR_OK) {
- events |= VIR_EVENT_HANDLE_READABLE;
-
- if ((mon->msg && mon->msg->txOffset <
mon->msg->txLength) &&
- !mon->waitGreeting)
- events |= VIR_EVENT_HANDLE_WRITABLE;
- }
-
- virEventUpdateHandle(mon->watch, events);
+ qemuMonitorUnregister(mon);
+ if (mon->socket)
+ qemuMonitorRegister(mon);
}
-static void
-qemuMonitorIO(int watch, int fd, int events, void *opaque)
+static gboolean
+qemuMonitorIO(GSocket *socket G_GNUC_UNUSED,
+ GIOCondition cond,
+ gpointer opaque)
{
qemuMonitorPtr mon = opaque;
bool error = false;
@@ -541,39 +529,29 @@ qemuMonitorIO(int watch, int fd, int events, void *opaque)
/* lock access to the monitor and protect fd */
virObjectLock(mon);
#if DEBUG_IO
- VIR_DEBUG("Monitor %p I/O on watch %d fd %d events %d", mon, watch, fd,
events);
+ VIR_DEBUG("Monitor %p I/O on socket %p cond %d", mon, socket, cond);
#endif
- if (mon->fd == -1 || mon->watch == 0) {
+ if (mon->fd == -1 || !mon->watch) {
virObjectUnlock(mon);
virObjectUnref(mon);
- return;
+ return G_SOURCE_REMOVE;
}
- if (mon->fd != fd || mon->watch != watch) {
- if (events & (VIR_EVENT_HANDLE_HANGUP | VIR_EVENT_HANDLE_ERROR))
- eof = true;
- virReportError(VIR_ERR_INTERNAL_ERROR,
- _("event from unexpected fd %d!=%d / watch %d!=%d"),
- mon->fd, fd, mon->watch, watch);
- error = true;
- } else if (mon->lastError.code != VIR_ERR_OK) {
- if (events & (VIR_EVENT_HANDLE_HANGUP | VIR_EVENT_HANDLE_ERROR))
+ if (mon->lastError.code != VIR_ERR_OK) {
+ if (cond & (G_IO_HUP | G_IO_ERR))
eof = true;
error = true;
} else {
- if (events & VIR_EVENT_HANDLE_WRITABLE) {
+ if (cond & G_IO_OUT) {
if (qemuMonitorIOWrite(mon) < 0) {
error = true;
if (errno == ECONNRESET)
hangup = true;
}
- events &= ~VIR_EVENT_HANDLE_WRITABLE;
}
- if (!error &&
- events & VIR_EVENT_HANDLE_READABLE) {
+ if (!error && cond & G_IO_IN) {
int got = qemuMonitorIORead(mon);
- events &= ~VIR_EVENT_HANDLE_READABLE;
if (got < 0) {
error = true;
if (errno == ECONNRESET)
@@ -581,37 +559,29 @@ qemuMonitorIO(int watch, int fd, int events, void *opaque)
} else if (got == 0) {
eof = true;
} else {
- /* Ignore hangup/error events if we read some data, to
+ /* Ignore hangup/error cond if we read some data, to
* give time for that data to be consumed */
- events = 0;
+ cond = 0;
if (qemuMonitorIOProcess(mon) < 0)
error = true;
}
}
- if (events & VIR_EVENT_HANDLE_HANGUP) {
+ if (cond & G_IO_HUP) {
hangup = true;
if (!error) {
virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
_("End of file from qemu monitor"));
eof = true;
- events &= ~VIR_EVENT_HANDLE_HANGUP;
}
}
if (!error && !eof &&
- events & VIR_EVENT_HANDLE_ERROR) {
+ cond & G_IO_ERR) {
virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
_("Invalid file descriptor while waiting for
monitor"));
eof = true;
- events &= ~VIR_EVENT_HANDLE_ERROR;
- }
- if (!error && events) {
- virReportError(VIR_ERR_INTERNAL_ERROR,
- _("Unhandled event %d for monitor fd %d"),
- events, mon->fd);
- error = true;
}
}
@@ -679,16 +649,20 @@ qemuMonitorIO(int watch, int fd, int events, void *opaque)
virObjectUnlock(mon);
virObjectUnref(mon);
}
+
+ return G_SOURCE_REMOVE;
}
static qemuMonitorPtr
qemuMonitorOpenInternal(virDomainObjPtr vm,
int fd,
+ GMainContext *context,
qemuMonitorCallbacksPtr cb,
void *opaque)
{
qemuMonitorPtr mon;
+ g_autoptr(GError) gerr = NULL;
if (!cb->eofNotify) {
virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
@@ -713,6 +687,7 @@ qemuMonitorOpenInternal(virDomainObjPtr vm,
goto cleanup;
}
mon->fd = fd;
+ mon->context = g_main_context_ref(context);
mon->vm = virObjectRef(vm);
mon->waitGreeting = true;
mon->cb = cb;
@@ -723,20 +698,17 @@ qemuMonitorOpenInternal(virDomainObjPtr vm,
"%s", _("Unable to set monitor close-on-exec
flag"));
goto cleanup;
}
- if (virSetNonBlock(mon->fd) < 0) {
+
+ mon->socket = g_socket_new_from_fd(fd, &gerr);
+ if (!mon->socket) {
virReportError(VIR_ERR_INTERNAL_ERROR,
- "%s", _("Unable to put monitor into non-blocking
mode"));
+ _("Unable to create socket object: %s"),
+ gerr->message);
goto cleanup;
}
-
virObjectLock(mon);
- if (!qemuMonitorRegister(mon)) {
- virObjectUnlock(mon);
- virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
- _("unable to register monitor events"));
- goto cleanup;
- }
+ qemuMonitorRegister(mon);
PROBE(QEMU_MONITOR_NEW,
"mon=%p refs=%d fd=%d",
@@ -782,6 +754,7 @@ qemuMonitorOpen(virDomainObjPtr vm,
virDomainChrSourceDefPtr config,
bool retry,
unsigned long long timeout,
+ GMainContext *context,
qemuMonitorCallbacksPtr cb,
void *opaque)
{
@@ -815,7 +788,7 @@ qemuMonitorOpen(virDomainObjPtr vm,
goto cleanup;
}
- ret = qemuMonitorOpenInternal(vm, fd, cb, opaque);
+ ret = qemuMonitorOpenInternal(vm, fd, context, cb, opaque);
cleanup:
if (!ret)
VIR_FORCE_CLOSE(fd);
@@ -830,25 +803,32 @@ qemuMonitorOpen(virDomainObjPtr vm,
*
* Registers the monitor in the event loop. The caller has to hold the
* lock for @mon.
- *
- * Returns true in case of success, false otherwise
*/
-bool
+void
qemuMonitorRegister(qemuMonitorPtr mon)
{
- virObjectRef(mon);
- if ((mon->watch = virEventAddHandle(mon->fd,
- VIR_EVENT_HANDLE_HANGUP |
- VIR_EVENT_HANDLE_ERROR |
- VIR_EVENT_HANDLE_READABLE,
- qemuMonitorIO,
- mon,
- virObjectFreeCallback)) < 0) {
- virObjectUnref(mon);
- return false;
+ GIOCondition cond = 0;
+
+ if (mon->lastError.code == VIR_ERR_OK) {
+ cond |= G_IO_IN;
+
+ if ((mon->msg && mon->msg->txOffset <
mon->msg->txLength) &&
+ !mon->waitGreeting)
+ cond |= G_IO_OUT;
}
- return true;
+ mon->watch = g_socket_create_source(mon->socket,
+ cond,
+ NULL);
+
+ virObjectRef(mon);
+ g_source_set_callback(mon->watch,
+ (GSourceFunc)qemuMonitorIO,
+ mon,
+ NULL);
+
+ g_source_attach(mon->watch,
+ mon->context);
}
@@ -856,8 +836,9 @@ void
qemuMonitorUnregister(qemuMonitorPtr mon)
{
if (mon->watch) {
- virEventRemoveHandle(mon->watch);
- mon->watch = 0;
+ g_source_destroy(mon->watch);
+ g_source_unref(mon->watch);
+ mon->watch = NULL;
}
}
@@ -873,9 +854,11 @@ qemuMonitorClose(qemuMonitorPtr mon)
qemuMonitorSetDomainLogLocked(mon, NULL, NULL, NULL);
- if (mon->fd >= 0) {
+ if (mon->socket) {
qemuMonitorUnregister(mon);
- VIR_FORCE_CLOSE(mon->fd);
+ g_object_unref(mon->socket);
+ mon->socket = NULL;
+ mon->fd = -1;
}
/* In case another thread is waiting for its monitor command to be
diff --git a/src/qemu/qemu_monitor.h b/src/qemu/qemu_monitor.h
index c84cd425df..dd2aaa4691 100644
--- a/src/qemu/qemu_monitor.h
+++ b/src/qemu/qemu_monitor.h
@@ -391,11 +391,12 @@ qemuMonitorPtr qemuMonitorOpen(virDomainObjPtr vm,
virDomainChrSourceDefPtr config,
bool retry,
unsigned long long timeout,
+ GMainContext *context,
qemuMonitorCallbacksPtr cb,
void *opaque)
ATTRIBUTE_NONNULL(1) ATTRIBUTE_NONNULL(2) ATTRIBUTE_NONNULL(5);
-bool qemuMonitorRegister(qemuMonitorPtr mon)
+void qemuMonitorRegister(qemuMonitorPtr mon)
ATTRIBUTE_NONNULL(1);
void qemuMonitorUnregister(qemuMonitorPtr mon)
ATTRIBUTE_NONNULL(1);
diff --git a/src/qemu/qemu_process.c b/src/qemu/qemu_process.c
index 7475813e9f..bc57474bdc 100644
--- a/src/qemu/qemu_process.c
+++ b/src/qemu/qemu_process.c
@@ -1976,6 +1976,7 @@ qemuConnectMonitor(virQEMUDriverPtr driver, virDomainObjPtr vm, int
asyncJob,
priv->monConfig,
retry,
timeout,
+ virEventThreadGetContext(priv->eventThread),
&monitorCallbacks,
driver);
@@ -8602,8 +8603,9 @@ qemuProcessQMPConnectMonitor(qemuProcessQMPPtr proc)
proc->vm->pid = proc->pid;
- if (!(proc->mon = qemuMonitorOpen(proc->vm, &monConfig, true,
- 0, &callbacks, NULL)))
+ if (!(proc->mon = qemuMonitorOpen(proc->vm, &monConfig, true, 0,
+ virEventThreadGetContext(proc->eventThread),
+ &callbacks, NULL)))
goto cleanup;
virObjectLock(proc->mon);
diff --git a/tests/qemumonitortestutils.c b/tests/qemumonitortestutils.c
index a1641050ea..3efdea9cce 100644
--- a/tests/qemumonitortestutils.c
+++ b/tests/qemumonitortestutils.c
@@ -1171,6 +1171,7 @@ qemuMonitorTestNew(virDomainXMLOptionPtr xmlopt,
&src,
true,
0,
+ virEventThreadGetContext(test->eventThread),
&qemuMonitorTestCallbacks,
driver)))
goto error;
--
2.24.1