On Fri, Oct 17, 2008 at 12:02:13PM -0400, Ben Guthro wrote:
Deliver local callbacks in response to remote events
remote_internal.c | 255 ++++++++++++++++++++++++++++++++++++++++++++++++++++--
1 file changed, 248 insertions(+), 7 deletions(-)
@@ -680,6 +689,26 @@ doRemoteOpen (virConnectPtr conn,
(xdrproc_t) xdr_void, (char *) NULL) == -1)
goto failed;
+ if(VIR_ALLOC(priv->domainEvents)<0) {
+ error(conn, VIR_ERR_INVALID_ARG, _("Error allocating domainEvents"));
+ goto failed;
+ }
+
+ DEBUG0("Adding Handler for remote events");
+ /* Set up a callback to listen on the socket data */
+ if (virEventAddHandle(priv->sock,
+ POLLIN | POLLERR | POLLHUP,
+ remoteDomainEventFired,
+ conn) < 0) {
+ DEBUG0("virEventAddHandle failed: No addHandleImpl defined. continuing
without events.");
+ }
+
+ DEBUG0("Adding Timeout for remote event queue flushing");
+ if ( (priv->eventFlushTimer = virEventAddTimeout(0,
+ remoteDomainEventQueueFlush,
+ conn)) < 0) {
Small bug here - this creates an active timer event, which will fire
immediately & forever. Simply change the '0' to a '-1' to register
a timeout that's initially disabled, and then use virEventUpdateTimeout
to toggle it on/off only when required.
+
+static int remoteDomainEventRegister (virConnectPtr conn,
+ void *callback,
+ void *opaque)
+{
+ struct private_data *priv = conn->privateData;
+
+ /* dispatch an rpc - so the server sde can track
+ how many callbacks are regstered */
+ remote_domain_events_register_args args;
+ args.callback = (unsigned long)callback;
+ args.user_data = (unsigned long)opaque;
This relates back to my comment on the remote_protocl.x file - i feel
we should probably maintain a generic token, rather than pointing the
actual pointers over the wire as ints.
/*----------------------------------------------------------------------*/
@@ -4367,6 +4444,7 @@ call (virConnectPtr conn, struct private_data *priv,
really_write (conn, priv, flags & REMOTE_CALL_IN_OPEN, buffer, len-4) ==
-1)
return -1;
+retry_read:
/* Read and deserialise length word. */
if (really_read (conn, priv, flags & REMOTE_CALL_IN_OPEN, buffer2, sizeof
buffer2) == -1)
return -1;
@@ -4418,10 +4496,19 @@ call (virConnectPtr conn, struct private_data *priv,
return -1;
}
- /* If we extend the server to actually send asynchronous messages, then
- * we'll need to change this so that it can recognise an asynch
- * message being received at this point.
- */
+ if (hdr.proc == REMOTE_PROC_DOMAIN_EVENT &&
+ hdr.direction == REMOTE_MESSAGE) {
+ /* An async message has come in while we were waiting for the
+ * response. Process it to pull it off the wire, and try again
+ */
+ DEBUG0("Encountered an event while waiting for a response");
+
+ remoteDomainQueueEvent(conn, &xdr);
Need to call virEventUpdateTimeout() to enable the timer here.
+/**
+ * remoteDomainReadEvent
+ *
+ * Read the event data off the wire
+ */
+static int remoteDomainReadEvent(virConnectPtr conn, XDR *xdr,
+ virDomainPtr *dom, int *event,
+ virConnectDomainEventCallback *cb,
+ void **opaque)
+{
+ remote_domain_event_ret ret;
+ memset (&ret, 0, sizeof ret);
+
+ /* unmarshall parameters, and process it*/
+ if (! xdr_remote_domain_event_ret(xdr, &ret) ) {
+ error (conn, VIR_ERR_RPC, _("remoteDomainProcessEvent: unmarshalling
ret"));
+ return -1;
+ }
+
+ *dom = get_nonnull_domain(conn,ret.dom);
+ *event = ret.event;
+ *cb = (virConnectDomainEventCallback)ret.callback;
+ *opaque = (void *)ret.user_data;
+
+ return 0;
+}
+
+static void remoteDomainProcessEvent(virConnectPtr conn, XDR *xdr)
+{
+ virDomainPtr dom;
+ int event;
+ virConnectDomainEventCallback cb;
+ void *opaque;
+ if(!remoteDomainReadEvent(conn, xdr, &dom, &event, &cb, &opaque)) {
+ DEBUG0("Calling domain event callback (no queue)");
+ if(cb)
+ cb(conn,dom,event,opaque);
Needs to call virDomainFree(dom) to release the object.
+ }
+}
+
+static void remoteDomainQueueEvent(virConnectPtr conn, XDR *xdr)
+{
+ virDomainPtr dom;
+ int event;
+ virConnectDomainEventCallback cb;
+ void *opaque;
+ struct private_data *priv = conn->privateData;
+
+ if(!remoteDomainReadEvent(conn, xdr, &dom, &event, &cb, &opaque))
+ {
+ if( virDomainEventCallbackQueuePush(priv->domainEvents, dom, event, cb,
opaque) < 0 ) {
+ DEBUG("%s", "Error adding event to queue");
+ }
+ }
+}
+
+/** remoteDomainEventFired:
+ *
+ * The callback for monitoring the remote socket
+ * for event data
+ */
+void remoteDomainEventFired(int fd ATTRIBUTE_UNUSED,
+ int event ATTRIBUTE_UNUSED,
+ void *opaque)
+{
+ char buffer[REMOTE_MESSAGE_MAX];
+ char buffer2[4];
+ struct remote_message_header hdr;
+ XDR xdr;
+ int len;
+
+ virConnectPtr conn = opaque;
+ struct private_data *priv = conn->privateData;
+
+ DEBUG("%s : Event fired", __FUNCTION__);
+
+ /* Read and deserialise length word. */
+ if (really_read (conn, priv, 0, buffer2, sizeof buffer2) == -1)
+ return;
+
+ xdrmem_create (&xdr, buffer2, sizeof buffer2, XDR_DECODE);
+ if (!xdr_int (&xdr, &len)) {
+ error (conn, VIR_ERR_RPC, _("xdr_int (length word, reply)"));
+ return;
+ }
+ xdr_destroy (&xdr);
+
+ /* Length includes length word - adjust to real length to read. */
+ len -= 4;
+
+ if (len < 0 || len > REMOTE_MESSAGE_MAX) {
+ error (conn, VIR_ERR_RPC, _("packet received from server too
large"));
+ return;
+ }
+
+ /* Read reply header and what follows (either a ret or an error). */
+ if (really_read (conn, priv, 0, buffer, len) == -1) {
+ error (conn, VIR_ERR_RPC, _("error reading buffer from memory"));
+ return;
+ }
+
+ /* Deserialise reply header. */
+ xdrmem_create (&xdr, buffer, len, XDR_DECODE);
+ if (!xdr_remote_message_header (&xdr, &hdr)) {
+ error (conn, VIR_ERR_RPC, _("invalid header in event firing"));
+ return;
+ }
+
+ if (hdr.proc == REMOTE_PROC_DOMAIN_EVENT &&
+ hdr.direction == REMOTE_MESSAGE) {
+ DEBUG0("Encountered an async event");
+ remoteDomainProcessEvent(conn, &xdr);
+ } else {
+ DEBUG0("invalid proc in event firing");
+ error (conn, VIR_ERR_RPC, _("invalid proc in event firing"));
+ }
+}
+
+void remoteDomainEventQueueFlush(int timer ATTRIBUTE_UNUSED,
+ void *opaque)
+{
+ virDomainEventPtr domEvent;
+ virConnectPtr conn = opaque;
+ struct private_data *priv = conn->privateData;
+
+ DEBUG0("Flushing domain events");
+ while( (domEvent = virDomainEventCallbackQueuePop(priv->domainEvents)) ) {
+ DEBUG(" Flushing %p", domEvent);
+ if(domEvent->cb)
+ domEvent->cb(domEvent->dom->conn,
+ domEvent->dom,
+ domEvent->event,
+ domEvent->opaque);
Needs to also call
virDomainFree(domEvent->dom) to release the object.
+ VIR_FREE(domEvent);
+ }
And virEventUpdateTimeout to disable the timer again.
+}
Here's a small additive patch which takes care of the timer issue
diff -r 99dad81d37dd src/remote_internal.c
--- a/src/remote_internal.c Sun Oct 19 13:46:36 2008 -0400
+++ b/src/remote_internal.c Sun Oct 19 14:06:33 2008 -0400
@@ -704,7 +704,7 @@
}
DEBUG0("Adding Timeout for remote event queue flushing");
- if ( (priv->eventFlushTimer = virEventAddTimeout(0,
+ if ( (priv->eventFlushTimer = virEventAddTimeout(-1,
remoteDomainEventQueueFlush,
conn)) < 0) {
DEBUG0("virEventAddTimeout failed: No addTimeoutImpl defined. continuing
without events.");
@@ -4504,6 +4504,7 @@
DEBUG0("Encountered an event while waiting for a response");
remoteDomainQueueEvent(conn, &xdr);
+ virEventUpdateTimeout(priv->eventFlushTimer, 0);
DEBUG0("Retrying read");
xdr_destroy (&xdr);
@@ -5182,4 +5183,6 @@
domEvent->opaque);
VIR_FREE(domEvent);
}
-}
+
+ virEventUpdateTimeout(priv->eventFlushTimer, -1);
+}
Regards,
Daniel
--
|: Red Hat, Engineering, London -o-
http://people.redhat.com/berrange/ :|
|:
http://libvirt.org -o-
http://virt-manager.org -o-
http://ovirt.org :|
|:
http://autobuild.org -o-
http://search.cpan.org/~danberr/ :|
|: GnuPG: 7D3B9505 -o- F3C9 553F A1DA 4AC2 5648 23C1 B3DF F742 7D3B 9505 :|