This converts the QEMU agent APIs to use the per-VM
event loop, which involves switching from virEvent APIs
to GMainContext / GSource APIs.
A GSocket is used as a convenient way to create a GSource
for a socket, but is not yet used for actual I/O.
Signed-off-by: Daniel P. Berrangé <berrange(a)redhat.com>
---
src/qemu/qemu_agent.c | 146 +++++++++++++++++++----------------
src/qemu/qemu_agent.h | 1 +
src/qemu/qemu_process.c | 1 +
tests/qemumonitortestutils.c | 1 +
4 files changed, 84 insertions(+), 65 deletions(-)
diff --git a/src/qemu/qemu_agent.c b/src/qemu/qemu_agent.c
index 72ea159a9c..1655e26212 100644
--- a/src/qemu/qemu_agent.c
+++ b/src/qemu/qemu_agent.c
@@ -25,6 +25,7 @@
#include <unistd.h>
#include <fcntl.h>
#include <sys/time.h>
+#include <gio/gio.h>
#include "qemu_agent.h"
#include "qemu_domain.h"
@@ -101,7 +102,10 @@ struct _qemuAgent {
virCond notify;
int fd;
- int watch;
+
+ GMainContext *context;
+ GSocket *socket;
+ GSource *watch;
bool running;
@@ -172,6 +176,7 @@ static void qemuAgentDispose(void *obj)
(agent->cb->destroy)(agent, agent->vm);
virCondDestroy(&agent->notify);
VIR_FREE(agent->buffer);
+ g_main_context_unref(agent->context);
virResetError(&agent->lastError);
}
@@ -188,13 +193,6 @@ qemuAgentOpenUnix(const char *socketpath)
return -1;
}
- if (virSetNonBlock(agentfd) < 0) {
- virReportSystemError(errno, "%s",
- _("Unable to put monitor "
- "into non-blocking mode"));
- goto error;
- }
-
if (virSetCloseExec(agentfd) < 0) {
virReportSystemError(errno, "%s",
_("Unable to set agent "
@@ -498,28 +496,62 @@ qemuAgentIORead(qemuAgentPtr agent)
}
-static void qemuAgentUpdateWatch(qemuAgentPtr agent)
-{
- int events =
- VIR_EVENT_HANDLE_HANGUP |
- VIR_EVENT_HANDLE_ERROR;
+static gboolean
+qemuAgentIO(GSocket *socket,
+ GIOCondition cond,
+ gpointer opaque);
- if (!agent->watch)
- return;
+
+static void
+qemuAgentRegister(qemuAgentPtr agent)
+{
+ GIOCondition cond = 0;
if (agent->lastError.code == VIR_ERR_OK) {
- events |= VIR_EVENT_HANDLE_READABLE;
+ cond |= G_IO_IN;
if (agent->msg && agent->msg->txOffset <
agent->msg->txLength)
- events |= VIR_EVENT_HANDLE_WRITABLE;
+ cond |= G_IO_OUT;
}
- virEventUpdateHandle(agent->watch, events);
+ agent->watch = g_socket_create_source(agent->socket,
+ cond,
+ NULL);
+
+ virObjectRef(agent);
+ g_source_set_callback(agent->watch,
+ (GSourceFunc)qemuAgentIO,
+ agent,
+ NULL);
+
+ g_source_attach(agent->watch,
+ agent->context);
}
static void
-qemuAgentIO(int watch, int fd, int events, void *opaque)
+qemuAgentUnregister(qemuAgentPtr agent)
+{
+ if (agent->watch) {
+ g_source_destroy(agent->watch);
+ g_source_unref(agent->watch);
+ agent->watch = NULL;
+ }
+}
+
+
+static void qemuAgentUpdateWatch(qemuAgentPtr agent)
+{
+ qemuAgentUnregister(agent);
+ if (agent->socket)
+ qemuAgentRegister(agent);
+}
+
+
+static gboolean
+qemuAgentIO(GSocket *socket G_GNUC_UNUSED,
+ GIOCondition cond,
+ gpointer opaque)
{
qemuAgentPtr agent = opaque;
bool error = false;
@@ -529,45 +561,36 @@ qemuAgentIO(int watch, int fd, int events, void *opaque)
/* lock access to the agent and protect fd */
virObjectLock(agent);
#if DEBUG_IO
- VIR_DEBUG("Agent %p I/O on watch %d fd %d events %d", agent, watch, fd,
events);
+ VIR_DEBUG("Agent %p I/O on watch %d socket %p cond %d", agent,
agent->socket, cond);
#endif
- if (agent->fd == -1 || agent->watch == 0) {
+ if (agent->fd == -1 || !agent->watch) {
virObjectUnlock(agent);
virObjectUnref(agent);
- return;
+ return G_SOURCE_REMOVE;
}
- if (agent->fd != fd || agent->watch != watch) {
- if (events & (VIR_EVENT_HANDLE_HANGUP | VIR_EVENT_HANDLE_ERROR))
- eof = true;
- virReportError(VIR_ERR_INTERNAL_ERROR,
- _("event from unexpected fd %d!=%d / watch %d!=%d"),
- agent->fd, fd, agent->watch, watch);
- error = true;
- } else if (agent->lastError.code != VIR_ERR_OK) {
- if (events & (VIR_EVENT_HANDLE_HANGUP | VIR_EVENT_HANDLE_ERROR))
+ if (agent->lastError.code != VIR_ERR_OK) {
+ if (cond & (G_IO_HUP | G_IO_ERR))
eof = true;
error = true;
} else {
- if (events & VIR_EVENT_HANDLE_WRITABLE) {
+ if (cond & G_IO_OUT) {
if (qemuAgentIOWrite(agent) < 0)
error = true;
- events &= ~VIR_EVENT_HANDLE_WRITABLE;
}
if (!error &&
- events & VIR_EVENT_HANDLE_READABLE) {
+ cond & G_IO_IN) {
int got = qemuAgentIORead(agent);
- events &= ~VIR_EVENT_HANDLE_READABLE;
if (got < 0) {
error = true;
} else if (got == 0) {
eof = true;
} else {
- /* Ignore hangup/error events if we read some data, to
+ /* Ignore hangup/error cond if we read some data, to
* give time for that data to be consumed */
- events = 0;
+ cond = 0;
if (qemuAgentIOProcess(agent) < 0)
error = true;
@@ -575,25 +598,17 @@ qemuAgentIO(int watch, int fd, int events, void *opaque)
}
if (!error &&
- events & VIR_EVENT_HANDLE_HANGUP) {
+ cond & G_IO_HUP) {
virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
_("End of file from agent socket"));
eof = true;
- events &= ~VIR_EVENT_HANDLE_HANGUP;
}
if (!error && !eof &&
- events & VIR_EVENT_HANDLE_ERROR) {
+ cond & G_IO_ERR) {
virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
_("Invalid file descriptor while waiting for
agent"));
eof = true;
- events &= ~VIR_EVENT_HANDLE_ERROR;
- }
- if (!error && events) {
- virReportError(VIR_ERR_INTERNAL_ERROR,
- _("Unhandled event %d for agent fd %d"),
- events, agent->fd);
- error = true;
}
}
@@ -649,15 +664,19 @@ qemuAgentIO(int watch, int fd, int events, void *opaque)
virObjectUnlock(agent);
virObjectUnref(agent);
}
+
+ return G_SOURCE_REMOVE;
}
qemuAgentPtr
qemuAgentOpen(virDomainObjPtr vm,
const virDomainChrSourceDef *config,
+ GMainContext *context,
qemuAgentCallbacksPtr cb)
{
qemuAgentPtr agent;
+ g_autoptr(GError) gerr = NULL;
if (!cb || !cb->eofNotify) {
virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
@@ -693,22 +712,20 @@ qemuAgentOpen(virDomainObjPtr vm,
if (agent->fd == -1)
goto cleanup;
- virObjectRef(agent);
- if ((agent->watch = virEventAddHandle(agent->fd,
- VIR_EVENT_HANDLE_HANGUP |
- VIR_EVENT_HANDLE_ERROR |
- VIR_EVENT_HANDLE_READABLE,
- qemuAgentIO,
- agent,
- virObjectFreeCallback)) < 0) {
- virObjectUnref(agent);
- virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
- _("unable to register agent events"));
+ agent->context = g_main_context_ref(context);
+
+ agent->socket = g_socket_new_from_fd(agent->fd, &gerr);
+ if (!agent->socket) {
+ virReportError(VIR_ERR_INTERNAL_ERROR,
+ _("Unable to create socket object: %s"),
+ gerr->message);
goto cleanup;
}
+ qemuAgentRegister(agent);
+
agent->running = true;
- VIR_DEBUG("New agent %p fd =%d watch=%d", agent, agent->fd,
agent->watch);
+ VIR_DEBUG("New agent %p fd=%d", agent, agent->fd);
return agent;
@@ -763,12 +780,11 @@ void qemuAgentClose(qemuAgentPtr agent)
virObjectLock(agent);
- if (agent->fd >= 0) {
- if (agent->watch) {
- virEventRemoveHandle(agent->watch);
- agent->watch = 0;
- }
- VIR_FORCE_CLOSE(agent->fd);
+ if (agent->socket) {
+ qemuAgentUnregister(agent);
+ g_object_unref(agent->socket);
+ agent->socket = NULL;
+ agent->fd = -1;
}
qemuAgentNotifyCloseLocked(agent);
diff --git a/src/qemu/qemu_agent.h b/src/qemu/qemu_agent.h
index 5656fe60ff..d4d8615323 100644
--- a/src/qemu/qemu_agent.h
+++ b/src/qemu/qemu_agent.h
@@ -41,6 +41,7 @@ struct _qemuAgentCallbacks {
qemuAgentPtr qemuAgentOpen(virDomainObjPtr vm,
const virDomainChrSourceDef *config,
+ GMainContext *context,
qemuAgentCallbacksPtr cb);
void qemuAgentClose(qemuAgentPtr mon);
diff --git a/src/qemu/qemu_process.c b/src/qemu/qemu_process.c
index ea046bcb14..a235064320 100644
--- a/src/qemu/qemu_process.c
+++ b/src/qemu/qemu_process.c
@@ -237,6 +237,7 @@ qemuConnectAgent(virQEMUDriverPtr driver, virDomainObjPtr vm)
agent = qemuAgentOpen(vm,
config->source,
+ virEventThreadGetContext(priv->eventThread),
&agentCallbacks);
virObjectLock(vm);
diff --git a/tests/qemumonitortestutils.c b/tests/qemumonitortestutils.c
index df93aae758..328bfb8525 100644
--- a/tests/qemumonitortestutils.c
+++ b/tests/qemumonitortestutils.c
@@ -1406,6 +1406,7 @@ qemuMonitorTestNewAgent(virDomainXMLOptionPtr xmlopt)
if (!(test->agent = qemuAgentOpen(test->vm,
&src,
+ virEventThreadGetContext(test->eventThread),
&qemuMonitorTestAgentCallbacks)))
goto error;
--
2.24.1