The current remote driver code for streams only supports
blocking I/O mode. This is fine for the usage with migration
but is a problem for more general use cases, in particular
bi-directional streams.
This adds supported for the stream callbacks and non-blocking
I/O. with the minor caveat is that it doesn't actually do
non-blocking I/O for sending stream data, only receiving it.
A future patch will try to do non-blocking sends, but this is
quite tricky to get right.
* src/remote/remote_driver.c: Allow non-blocking I/O for
streams and support callbacks
---
src/remote/remote_driver.c | 188 ++++++++++++++++++++++++++++++++++++++++----
1 files changed, 172 insertions(+), 16 deletions(-)
diff --git a/src/remote/remote_driver.c b/src/remote/remote_driver.c
index 1c874b2..61da8ff 100644
--- a/src/remote/remote_driver.c
+++ b/src/remote/remote_driver.c
@@ -132,6 +132,13 @@ struct private_stream_data {
unsigned int serial;
unsigned int proc_nr;
+ virStreamEventCallback cb;
+ void *cbOpaque;
+ virFreeCallback cbFree;
+ int cbEvents;
+ int cbTimer;
+ int cbDispatch;
+
/* XXX this is potentially unbounded if the client
* app has domain events registered, since packets
* may be read off wire, while app isn't ready to
@@ -200,9 +207,10 @@ struct private_data {
};
enum {
- REMOTE_CALL_IN_OPEN = (1 << 0),
+ REMOTE_CALL_IN_OPEN = (1 << 0),
REMOTE_CALL_QUIET_MISSING_RPC = (1 << 1),
- REMOTE_QEMU_CALL = (1 << 2),
+ REMOTE_CALL_QEMU = (1 << 2),
+ REMOTE_CALL_NONBLOCK = (1 << 3),
};
@@ -8144,6 +8152,20 @@ remoteStreamOpen(virStreamPtr st,
}
+static void
+remoteStreamEventTimerUpdate(struct private_stream_data *privst)
+{
+ if (!privst->cb)
+ return;
+
+ if (!privst->cbEvents)
+ virEventUpdateTimeout(privst->cbTimer, -1);
+ else if (privst->incoming &&
+ (privst->cbEvents & VIR_STREAM_EVENT_READABLE))
+ virEventUpdateTimeout(privst->cbTimer, 0);
+}
+
+
static int
remoteStreamPacket(virStreamPtr st,
int status,
@@ -8338,6 +8360,12 @@ remoteStreamRecv(virStreamPtr st,
struct remote_thread_call *thiscall;
int ret;
+ if (st->flags & VIR_STREAM_NONBLOCK) {
+ DEBUG0("Non-blocking mode and no data available");
+ rv = -2;
+ goto cleanup;
+ }
+
if (VIR_ALLOC(thiscall) < 0) {
virReportOOMError();
goto cleanup;
@@ -8381,6 +8409,8 @@ remoteStreamRecv(virStreamPtr st,
rv = 0;
}
+ remoteStreamEventTimerUpdate(privst);
+
DEBUG("Done %d", rv);
cleanup:
@@ -8391,28 +8421,153 @@ cleanup:
return rv;
}
+
+static void
+remoteStreamEventTimer(int timer ATTRIBUTE_UNUSED, void *opaque)
+{
+ virStreamPtr st = opaque;
+ struct private_data *priv = st->conn->privateData;
+ struct private_stream_data *privst = st->privateData;
+
+ remoteDriverLock(priv);
+ if (privst->cb &&
+ (privst->cbEvents & VIR_STREAM_EVENT_READABLE) &&
+ privst->incomingOffset) {
+ virStreamEventCallback cb = privst->cb;
+ void *cbOpaque = privst->cbOpaque;
+ virFreeCallback cbFree = privst->cbFree;
+
+ privst->cbDispatch = 1;
+ remoteDriverUnlock(priv);
+ (cb)(st, VIR_STREAM_EVENT_READABLE, cbOpaque);
+ remoteDriverLock(priv);
+ privst->cbDispatch = 0;
+
+ if (!privst->cb && cbFree)
+ (cbFree)(cbOpaque);
+ }
+ remoteDriverUnlock(priv);
+}
+
+
+static void
+remoteStreamEventTimerFree(void *opaque)
+{
+ virStreamPtr st = opaque;
+ virUnrefStream(st);
+}
+
+
static int
-remoteStreamEventAddCallback(virStreamPtr stream ATTRIBUTE_UNUSED,
- int events ATTRIBUTE_UNUSED,
- virStreamEventCallback cb ATTRIBUTE_UNUSED,
- void *opaque ATTRIBUTE_UNUSED,
- virFreeCallback ff ATTRIBUTE_UNUSED)
+remoteStreamEventAddCallback(virStreamPtr st,
+ int events,
+ virStreamEventCallback cb,
+ void *opaque,
+ virFreeCallback ff)
{
- return -1;
+ struct private_data *priv = st->conn->privateData;
+ struct private_stream_data *privst = st->privateData;
+ int ret = -1;
+
+ remoteDriverLock(priv);
+
+ if (events & ~VIR_STREAM_EVENT_READABLE) {
+ remoteError(VIR_ERR_INTERNAL_ERROR,
+ _("unsupported stream events %d"), events);
+ goto cleanup;
+ }
+
+ if (privst->cb) {
+ remoteError(VIR_ERR_INTERNAL_ERROR,
+ _("multiple stream callbacks not supported"));
+ goto cleanup;
+ }
+
+ virStreamRef(st);
+ if ((privst->cbTimer =
+ virEventAddTimeout(-1,
+ remoteStreamEventTimer,
+ st,
+ remoteStreamEventTimerFree)) < 0) {
+ virUnrefStream(st);
+ goto cleanup;
+ }
+
+ privst->cb = cb;
+ privst->cbOpaque = opaque;
+ privst->cbFree = ff;
+ privst->cbEvents = events;
+
+ ret = 0;
+
+cleanup:
+ remoteDriverUnlock(priv);
+ return ret;
}
static int
-remoteStreamEventUpdateCallback(virStreamPtr stream ATTRIBUTE_UNUSED,
- int events ATTRIBUTE_UNUSED)
+remoteStreamEventUpdateCallback(virStreamPtr st,
+ int events)
{
- return -1;
+ struct private_data *priv = st->conn->privateData;
+ struct private_stream_data *privst = st->privateData;
+ int ret = -1;
+
+ remoteDriverLock(priv);
+
+ if (events & ~VIR_STREAM_EVENT_READABLE) {
+ remoteError(VIR_ERR_INTERNAL_ERROR,
+ _("unsupported stream events %d"), events);
+ goto cleanup;
+ }
+
+ if (!privst->cb) {
+ remoteError(VIR_ERR_INTERNAL_ERROR,
+ _("no stream callback registered"));
+ goto cleanup;
+ }
+
+ privst->cbEvents = events;
+
+ remoteStreamEventTimerUpdate(privst);
+
+ ret = 0;
+
+cleanup:
+ remoteDriverUnlock(priv);
+ return ret;
}
static int
-remoteStreamEventRemoveCallback(virStreamPtr stream ATTRIBUTE_UNUSED)
+remoteStreamEventRemoveCallback(virStreamPtr st)
{
- return -1;
+ struct private_data *priv = st->conn->privateData;
+ struct private_stream_data *privst = st->privateData;
+ int ret = -1;
+
+ remoteDriverLock(priv);
+
+ if (!privst->cb) {
+ remoteError(VIR_ERR_INTERNAL_ERROR,
+ _("no stream callback registered"));
+ goto cleanup;
+ }
+
+ if (!privst->cbDispatch &&
+ privst->cbFree)
+ (privst->cbFree)(privst->cbOpaque);
+ privst->cb = NULL;
+ privst->cbOpaque = NULL;
+ privst->cbFree = NULL;
+ privst->cbEvents = 0;
+ virEventRemoveTimeout(privst->cbTimer);
+
+ ret = 0;
+
+cleanup:
+ remoteDriverUnlock(priv);
+ return ret;
}
static int
@@ -9065,7 +9220,7 @@ remoteQemuDomainMonitorCommand (virDomainPtr domain, const char
*cmd,
args.flags = flags;
memset (&ret, 0, sizeof ret);
- if (call (domain->conn, priv, REMOTE_QEMU_CALL, QEMU_PROC_MONITOR_COMMAND,
+ if (call (domain->conn, priv, REMOTE_CALL_QEMU, QEMU_PROC_MONITOR_COMMAND,
(xdrproc_t) xdr_qemu_monitor_command_args, (char *) &args,
(xdrproc_t) xdr_qemu_monitor_command_ret, (char *) &ret) == -1)
goto done;
@@ -9119,7 +9274,7 @@ prepareCall(struct private_data *priv,
rv->ret = ret;
rv->want_reply = 1;
- if (flags & REMOTE_QEMU_CALL) {
+ if (flags & REMOTE_CALL_QEMU) {
hdr.prog = QEMU_PROGRAM;
hdr.vers = QEMU_PROTOCOL_VERSION;
}
@@ -9512,7 +9667,7 @@ processCallDispatch(virConnectPtr conn, struct private_data *priv,
expectedprog = REMOTE_PROGRAM;
expectedvers = REMOTE_PROTOCOL_VERSION;
- if (flags & REMOTE_QEMU_CALL) {
+ if (flags & REMOTE_CALL_QEMU) {
expectedprog = QEMU_PROGRAM;
expectedvers = QEMU_PROTOCOL_VERSION;
}
@@ -9738,6 +9893,7 @@ processCallDispatchStream(virConnectPtr conn ATTRIBUTE_UNUSED,
thecall->mode = REMOTE_MODE_COMPLETE;
} else {
VIR_WARN("Got aysnc data packet offset=%d",
privst->incomingOffset);
+ remoteStreamEventTimerUpdate(privst);
}
return 0;
}
--
1.7.2.3