
On Fri, Oct 17, 2008 at 12:02:13PM -0400, Ben Guthro wrote:
Deliver local callbacks in response to remote events
remote_internal.c | 255 ++++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 248 insertions(+), 7 deletions(-)
@@ -680,6 +689,26 @@ doRemoteOpen (virConnectPtr conn, (xdrproc_t) xdr_void, (char *) NULL) == -1) goto failed;
+ if(VIR_ALLOC(priv->domainEvents)<0) { + error(conn, VIR_ERR_INVALID_ARG, _("Error allocating domainEvents")); + goto failed; + } + + DEBUG0("Adding Handler for remote events"); + /* Set up a callback to listen on the socket data */ + if (virEventAddHandle(priv->sock, + POLLIN | POLLERR | POLLHUP, + remoteDomainEventFired, + conn) < 0) { + DEBUG0("virEventAddHandle failed: No addHandleImpl defined. continuing without events."); + } + + DEBUG0("Adding Timeout for remote event queue flushing"); + if ( (priv->eventFlushTimer = virEventAddTimeout(0, + remoteDomainEventQueueFlush, + conn)) < 0) {
Small bug here - this creates an active timer event, which will fire immediately & forever. Simply change the '0' to a '-1' to register a timeout that's initially disabled, and then use virEventUpdateTimeout to toggle it on/off only when required.
+ +static int remoteDomainEventRegister (virConnectPtr conn, + void *callback, + void *opaque) +{ + struct private_data *priv = conn->privateData; + + /* dispatch an rpc - so the server sde can track + how many callbacks are regstered */ + remote_domain_events_register_args args; + args.callback = (unsigned long)callback; + args.user_data = (unsigned long)opaque;
This relates back to my comment on the remote_protocl.x file - i feel we should probably maintain a generic token, rather than pointing the actual pointers over the wire as ints.
/*----------------------------------------------------------------------*/
@@ -4367,6 +4444,7 @@ call (virConnectPtr conn, struct private_data *priv, really_write (conn, priv, flags & REMOTE_CALL_IN_OPEN, buffer, len-4) == -1) return -1;
+retry_read: /* Read and deserialise length word. */ if (really_read (conn, priv, flags & REMOTE_CALL_IN_OPEN, buffer2, sizeof buffer2) == -1) return -1; @@ -4418,10 +4496,19 @@ call (virConnectPtr conn, struct private_data *priv, return -1; }
- /* If we extend the server to actually send asynchronous messages, then - * we'll need to change this so that it can recognise an asynch - * message being received at this point. - */ + if (hdr.proc == REMOTE_PROC_DOMAIN_EVENT && + hdr.direction == REMOTE_MESSAGE) { + /* An async message has come in while we were waiting for the + * response. Process it to pull it off the wire, and try again + */ + DEBUG0("Encountered an event while waiting for a response"); + + remoteDomainQueueEvent(conn, &xdr);
Need to call virEventUpdateTimeout() to enable the timer here.
+/** + * remoteDomainReadEvent + * + * Read the event data off the wire + */ +static int remoteDomainReadEvent(virConnectPtr conn, XDR *xdr, + virDomainPtr *dom, int *event, + virConnectDomainEventCallback *cb, + void **opaque) +{ + remote_domain_event_ret ret; + memset (&ret, 0, sizeof ret); + + /* unmarshall parameters, and process it*/ + if (! xdr_remote_domain_event_ret(xdr, &ret) ) { + error (conn, VIR_ERR_RPC, _("remoteDomainProcessEvent: unmarshalling ret")); + return -1; + } + + *dom = get_nonnull_domain(conn,ret.dom); + *event = ret.event; + *cb = (virConnectDomainEventCallback)ret.callback; + *opaque = (void *)ret.user_data; + + return 0; +} + +static void remoteDomainProcessEvent(virConnectPtr conn, XDR *xdr) +{ + virDomainPtr dom; + int event; + virConnectDomainEventCallback cb; + void *opaque; + if(!remoteDomainReadEvent(conn, xdr, &dom, &event, &cb, &opaque)) { + DEBUG0("Calling domain event callback (no queue)"); + if(cb) + cb(conn,dom,event,opaque);
Needs to call virDomainFree(dom) to release the object.
+ } +} + +static void remoteDomainQueueEvent(virConnectPtr conn, XDR *xdr) +{ + virDomainPtr dom; + int event; + virConnectDomainEventCallback cb; + void *opaque; + struct private_data *priv = conn->privateData; + + if(!remoteDomainReadEvent(conn, xdr, &dom, &event, &cb, &opaque)) + { + if( virDomainEventCallbackQueuePush(priv->domainEvents, dom, event, cb, opaque) < 0 ) { + DEBUG("%s", "Error adding event to queue"); + } + } +} + +/** remoteDomainEventFired: + * + * The callback for monitoring the remote socket + * for event data + */ +void remoteDomainEventFired(int fd ATTRIBUTE_UNUSED, + int event ATTRIBUTE_UNUSED, + void *opaque) +{ + char buffer[REMOTE_MESSAGE_MAX]; + char buffer2[4]; + struct remote_message_header hdr; + XDR xdr; + int len; + + virConnectPtr conn = opaque; + struct private_data *priv = conn->privateData; + + DEBUG("%s : Event fired", __FUNCTION__); + + /* Read and deserialise length word. */ + if (really_read (conn, priv, 0, buffer2, sizeof buffer2) == -1) + return; + + xdrmem_create (&xdr, buffer2, sizeof buffer2, XDR_DECODE); + if (!xdr_int (&xdr, &len)) { + error (conn, VIR_ERR_RPC, _("xdr_int (length word, reply)")); + return; + } + xdr_destroy (&xdr); + + /* Length includes length word - adjust to real length to read. */ + len -= 4; + + if (len < 0 || len > REMOTE_MESSAGE_MAX) { + error (conn, VIR_ERR_RPC, _("packet received from server too large")); + return; + } + + /* Read reply header and what follows (either a ret or an error). */ + if (really_read (conn, priv, 0, buffer, len) == -1) { + error (conn, VIR_ERR_RPC, _("error reading buffer from memory")); + return; + } + + /* Deserialise reply header. */ + xdrmem_create (&xdr, buffer, len, XDR_DECODE); + if (!xdr_remote_message_header (&xdr, &hdr)) { + error (conn, VIR_ERR_RPC, _("invalid header in event firing")); + return; + } + + if (hdr.proc == REMOTE_PROC_DOMAIN_EVENT && + hdr.direction == REMOTE_MESSAGE) { + DEBUG0("Encountered an async event"); + remoteDomainProcessEvent(conn, &xdr); + } else { + DEBUG0("invalid proc in event firing"); + error (conn, VIR_ERR_RPC, _("invalid proc in event firing")); + } +} + +void remoteDomainEventQueueFlush(int timer ATTRIBUTE_UNUSED, + void *opaque) +{ + virDomainEventPtr domEvent; + virConnectPtr conn = opaque; + struct private_data *priv = conn->privateData; + + DEBUG0("Flushing domain events"); + while( (domEvent = virDomainEventCallbackQueuePop(priv->domainEvents)) ) { + DEBUG(" Flushing %p", domEvent); + if(domEvent->cb) + domEvent->cb(domEvent->dom->conn, + domEvent->dom, + domEvent->event, + domEvent->opaque); Needs to also call virDomainFree(domEvent->dom) to release the object.
+ VIR_FREE(domEvent); + }
And virEventUpdateTimeout to disable the timer again.
+}
Here's a small additive patch which takes care of the timer issue diff -r 99dad81d37dd src/remote_internal.c --- a/src/remote_internal.c Sun Oct 19 13:46:36 2008 -0400 +++ b/src/remote_internal.c Sun Oct 19 14:06:33 2008 -0400 @@ -704,7 +704,7 @@ } DEBUG0("Adding Timeout for remote event queue flushing"); - if ( (priv->eventFlushTimer = virEventAddTimeout(0, + if ( (priv->eventFlushTimer = virEventAddTimeout(-1, remoteDomainEventQueueFlush, conn)) < 0) { DEBUG0("virEventAddTimeout failed: No addTimeoutImpl defined. continuing without events."); @@ -4504,6 +4504,7 @@ DEBUG0("Encountered an event while waiting for a response"); remoteDomainQueueEvent(conn, &xdr); + virEventUpdateTimeout(priv->eventFlushTimer, 0); DEBUG0("Retrying read"); xdr_destroy (&xdr); @@ -5182,4 +5183,6 @@ domEvent->opaque); VIR_FREE(domEvent); } -} + + virEventUpdateTimeout(priv->eventFlushTimer, -1); +} Regards, Daniel -- |: Red Hat, Engineering, London -o- http://people.redhat.com/berrange/ :| |: http://libvirt.org -o- http://virt-manager.org -o- http://ovirt.org :| |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :| |: GnuPG: 7D3B9505 -o- F3C9 553F A1DA 4AC2 5648 23C1 B3DF F742 7D3B 9505 :|