[libvirt] [PATCH 2/3] libvirtaio: add allow for moving callbacks to other event loop
The virEvent implementation is tied to a particular loop. When spinning another loop, the callbacks have to be moved to another implementation, so they will have a chance to be invoked, should they be scheduled. If not, file descriptors will be leaking. Signed-off-by: Wojtek Porczyk <woju@invisiblethingslab.com> --- libvirtaio.py | 64 +++++++++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 51 insertions(+), 13 deletions(-) diff --git a/libvirtaio.py b/libvirtaio.py index fc868bd..d161cd1 100644 --- a/libvirtaio.py +++ b/libvirtaio.py @@ -195,9 +195,10 @@ class FDCallback(Callback): return '<{} iden={} fd={} event={}>'.format( self.__class__.__name__, self.iden, self.descriptor.fd, self.event) - def update(self, event): + def update(self, event=None): '''Update the callback and fix descriptor's watchers''' - self.event = event + if event is not None: + self.event = event self.descriptor.update() # @@ -238,20 +239,21 @@ class TimeoutCallback(Callback): self.cb(self.iden, self.opaque) self.impl.log.debug('timer %r callback ended', self.iden) - def update(self, timeout): + def update(self, timeout=None): '''Start or the timer, possibly updating timeout''' - self.timeout = timeout - - if self.timeout >= 0 and self._task is None: - self.impl.log.debug('timer %r start', self.iden) - self._task = ensure_future(self._timer(), - loop=self.impl.loop) + if timeout is not None: + self.timeout = timeout - elif self.timeout < 0 and self._task is not None: + if self._task is not None: self.impl.log.debug('timer %r stop', self.iden) self._task.cancel() # pylint: disable=no-member self._task = None + if self.timeout >= 0: + self.impl.log.debug('timer %r start', self.iden) + self._task = ensure_future(self._timer(), + loop=self.impl.loop) + def close(self): '''Stop the timer and call ff callback''' super(TimeoutCallback, self).close() @@ -274,6 +276,7 @@ class virEventAsyncIOImpl(object): self.callbacks = {} self.descriptors = DescriptorDict(self) self.log = logging.getLogger(self.__class__.__name__) + self.pending_tasks = set() def register(self): '''Register this instance as event loop implementation''' @@ -284,9 +287,30 @@ class virEventAsyncIOImpl(object): self._add_timeout, self._update_timeout, self._remove_timeout) return self + def takeover(self, other): + '''Take over other implementation, probably registered on another loop + + :param virEventAsyncIOImpl other: other implementation to be taken over + ''' + self.log.warning('%r taking over %r', self, other) + + while other.callbacks: + iden, callback = other.callbacks.popitem() + self.log.debug(' takeover %d %r', iden, callback) + assert callback.iden == iden + callback.impl = self + self.callbacks[iden] = callback + + if isinstance(callback, FDCallback): + fd = callback.descriptor.fd + assert callback is other.descriptors[fd].remove_handle(iden) + self.descriptors[fd].add_handle(callback) + def schedule_ff_callback(self, iden, opaque): '''Schedule a ff callback from one of the handles or timers''' - ensure_future(self._ff_callback(iden, opaque), loop=self.loop) + fut = ensure_future(self._ff_callback(iden, opaque), loop=self.loop) + self.pending_tasks.add(fut) + fut.add_done_callback(self.pending_tasks.remove) @asyncio.coroutine def _ff_callback(self, iden, opaque): @@ -297,13 +321,19 @@ class virEventAsyncIOImpl(object): self.log.debug('ff_callback(iden=%d, opaque=...)', iden) return libvirt.virEventInvokeFreeCallback(opaque) + @asyncio.coroutine + def drain(self): + self.log.debug('drain()') + if self.pending_tasks: + yield from asyncio.wait(self.pending_tasks, loop=self.loop) + def is_idle(self): '''Returns False if there are leftovers from a connection Those may happen if there are sematical problems while closing a connection. For example, not deregistered events before .close(). ''' - return not self.callbacks + return not self.callbacks and not self.pending_tasks def _add_handle(self, fd, event, cb, opaque): '''Register a callback for monitoring file handle events @@ -403,10 +433,18 @@ class virEventAsyncIOImpl(object): callback = self.callbacks.pop(timer) callback.close() + +_current_impl = None def virEventRegisterAsyncIOImpl(loop=None): '''Arrange for libvirt's callbacks to be dispatched via asyncio event loop The implementation object is returned, but in normal usage it can safely be discarded. ''' - return virEventAsyncIOImpl(loop=loop).register() + global _current_impl + impl = virEventAsyncIOImpl(loop=loop) + impl.register() + if _current_impl is not None: + impl.takeover(_current_impl) + _current_impl = impl + return impl -- 2.9.4
On Thu, Aug 31, 2017 at 09:40:23PM +0200, Wojtek Porczyk wrote:
The virEvent implementation is tied to a particular loop. When spinning another loop, the callbacks have to be moved to another implementation, so they will have a chance to be invoked, should they be scheduled. If not, file descriptors will be leaking.
Signed-off-by: Wojtek Porczyk <woju@invisiblethingslab.com> --- libvirtaio.py | 64 +++++++++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 51 insertions(+), 13 deletions(-)
diff --git a/libvirtaio.py b/libvirtaio.py index fc868bd..d161cd1 100644 --- a/libvirtaio.py +++ b/libvirtaio.py @@ -195,9 +195,10 @@ class FDCallback(Callback): return '<{} iden={} fd={} event={}>'.format( self.__class__.__name__, self.iden, self.descriptor.fd, self.event)
- def update(self, event): + def update(self, event=None): '''Update the callback and fix descriptor's watchers''' - self.event = event + if event is not None: + self.event = event self.descriptor.update()
# @@ -238,20 +239,21 @@ class TimeoutCallback(Callback): self.cb(self.iden, self.opaque) self.impl.log.debug('timer %r callback ended', self.iden)
- def update(self, timeout): + def update(self, timeout=None): '''Start or the timer, possibly updating timeout''' - self.timeout = timeout - - if self.timeout >= 0 and self._task is None: - self.impl.log.debug('timer %r start', self.iden) - self._task = ensure_future(self._timer(), - loop=self.impl.loop) + if timeout is not None: + self.timeout = timeout
- elif self.timeout < 0 and self._task is not None: + if self._task is not None: self.impl.log.debug('timer %r stop', self.iden) self._task.cancel() # pylint: disable=no-member self._task = None
+ if self.timeout >= 0: + self.impl.log.debug('timer %r start', self.iden) + self._task = ensure_future(self._timer(), + loop=self.impl.loop) + def close(self): '''Stop the timer and call ff callback''' super(TimeoutCallback, self).close() @@ -274,6 +276,7 @@ class virEventAsyncIOImpl(object): self.callbacks = {} self.descriptors = DescriptorDict(self) self.log = logging.getLogger(self.__class__.__name__) + self.pending_tasks = set()
def register(self): '''Register this instance as event loop implementation''' @@ -284,9 +287,30 @@ class virEventAsyncIOImpl(object): self._add_timeout, self._update_timeout, self._remove_timeout) return self
+ def takeover(self, other): + '''Take over other implementation, probably registered on another loop + + :param virEventAsyncIOImpl other: other implementation to be taken over + ''' + self.log.warning('%r taking over %r', self, other) + + while other.callbacks: + iden, callback = other.callbacks.popitem() + self.log.debug(' takeover %d %r', iden, callback) + assert callback.iden == iden + callback.impl = self + self.callbacks[iden] = callback + + if isinstance(callback, FDCallback): + fd = callback.descriptor.fd + assert callback is other.descriptors[fd].remove_handle(iden) + self.descriptors[fd].add_handle(callback) + def schedule_ff_callback(self, iden, opaque): '''Schedule a ff callback from one of the handles or timers''' - ensure_future(self._ff_callback(iden, opaque), loop=self.loop) + fut = ensure_future(self._ff_callback(iden, opaque), loop=self.loop) + self.pending_tasks.add(fut) + fut.add_done_callback(self.pending_tasks.remove)
@asyncio.coroutine def _ff_callback(self, iden, opaque): @@ -297,13 +321,19 @@ class virEventAsyncIOImpl(object): self.log.debug('ff_callback(iden=%d, opaque=...)', iden) return libvirt.virEventInvokeFreeCallback(opaque)
+ @asyncio.coroutine + def drain(self): + self.log.debug('drain()') + if self.pending_tasks: + yield from asyncio.wait(self.pending_tasks, loop=self.loop) + def is_idle(self): '''Returns False if there are leftovers from a connection
Those may happen if there are sematical problems while closing a connection. For example, not deregistered events before .close(). ''' - return not self.callbacks + return not self.callbacks and not self.pending_tasks
def _add_handle(self, fd, event, cb, opaque): '''Register a callback for monitoring file handle events @@ -403,10 +433,18 @@ class virEventAsyncIOImpl(object): callback = self.callbacks.pop(timer) callback.close()
+ +_current_impl = None def virEventRegisterAsyncIOImpl(loop=None): '''Arrange for libvirt's callbacks to be dispatched via asyncio event loop
The implementation object is returned, but in normal usage it can safely be discarded. ''' - return virEventAsyncIOImpl(loop=loop).register() + global _current_impl + impl = virEventAsyncIOImpl(loop=loop) + impl.register() + if _current_impl is not None: + impl.takeover(_current_impl) + _current_impl = impl + return impl
IIUC, you are trying to make it possible to register multiple event loop impls. This is *not* supported usage of libvirt. You must call 'virEventRegisterImpl' before opening any connection, and once called you are forbidden to call it again. Regards, Daniel -- |: https://berrange.com -o- https://www.flickr.com/photos/dberrange :| |: https://libvirt.org -o- https://fstop138.berrange.com :| |: https://entangle-photo.org -o- https://www.instagram.com/dberrange :|
On Fri, Sep 01, 2017 at 10:08:18AM +0100, Daniel P. Berrange wrote:
IIUC, you are trying to make it possible to register multiple event loop impls. This is *not* supported usage of libvirt. You must call 'virEventRegisterImpl' before opening any connection, and once called you are forbidden to call it again.
Yes, that's correct. Can't I do it, even after I close all the connections? Why then libvirt_virEventRegisterImpl (libvirt-python/libvirt-override.c:5434) seems to accomodate for running it second time? The reason for this is we have separate event loop for each test case, but the whole suite runs in a single process. The Impl has to use the new loop for each test. Would it be better to just substitute the loop in a long-lived Impl instance? -- pozdrawiam / best regards _.-._ Wojtek Porczyk .-^' '^-. Invisible Things Lab |'-.-^-.-'| | | | | I do not fear computers, | '-.-' | I fear lack of them. '-._ : ,-' -- Isaac Asimov `^-^-_>
On Fri, Sep 01, 2017 at 01:51:17PM +0200, Wojtek Porczyk wrote:
On Fri, Sep 01, 2017 at 10:08:18AM +0100, Daniel P. Berrange wrote:
IIUC, you are trying to make it possible to register multiple event loop impls. This is *not* supported usage of libvirt. You must call 'virEventRegisterImpl' before opening any connection, and once called you are forbidden to call it again.
Yes, that's correct. Can't I do it, even after I close all the connections?
That would be racy because some cleanup is liable to happen asynchronously.
Why then libvirt_virEventRegisterImpl (libvirt-python/libvirt-override.c:5434) seems to accomodate for running it second time?
That's bogus code that we should remove - in fact the C library should simply ignore any subsequent virEventRegisterImpl API calls after the first one.
The reason for this is we have separate event loop for each test case, but the whole suite runs in a single process. The Impl has to use the new loop for each test. Would it be better to just substitute the loop in a long-lived Impl instance?
Replacing 'loop' would achieve the same effet i guess, but you must ensure there are no callbacks still registered by libvirt before doing that. Generally the expectation is that you register an event loop and then run it forever until the process exits. Regards, Daniel -- |: https://berrange.com -o- https://www.flickr.com/photos/dberrange :| |: https://libvirt.org -o- https://fstop138.berrange.com :| |: https://entangle-photo.org -o- https://www.instagram.com/dberrange :|
On Fri, Sep 01, 2017 at 01:16:51PM +0100, Daniel P. Berrange wrote:
On Fri, Sep 01, 2017 at 01:51:17PM +0200, Wojtek Porczyk wrote:
On Fri, Sep 01, 2017 at 10:08:18AM +0100, Daniel P. Berrange wrote:
IIUC, you are trying to make it possible to register multiple event loop impls. This is *not* supported usage of libvirt. You must call 'virEventRegisterImpl' before opening any connection, and once called you are forbidden to call it again.
Yes, that's correct. Can't I do it, even after I close all the connections?
That would be racy because some cleanup is liable to happen asynchronously.
What cleanup? Some cleanup in C-level libvirt client, or expressed as callbacks and therefore visible to implementation itself? The latter is largely remedied by drain().
Why then libvirt_virEventRegisterImpl (libvirt-python/libvirt-override.c:5434) seems to accomodate for running it second time?
That's bogus code that we should remove - in fact the C library should simply ignore any subsequent virEventRegisterImpl API calls after the first one.
Thanks for clarification. I understand that it wasn't the intended way and it's not meant to be supported and hard to test. But it works OK so far. It only begs the question, how much of libvirt-override.c is also deemed to be "bogus code". :)
The reason for this is we have separate event loop for each test case, but the whole suite runs in a single process. The Impl has to use the new loop for each test. Would it be better to just substitute the loop in a long-lived Impl instance?
Replacing 'loop' would achieve the same effet i guess, but you must ensure there are no callbacks still registered by libvirt before doing that.
Well, if someone correctly unregisters any event handlers and matches any references to connection with apropriate number of virConnectionClose()s, there should be no callbacks registered and no descriptors watched. There are other libraries which behave in similar way. Python throws a Warning when closing a non-empty loop (wrt descriptors and pending tasks), which we use as convenient way to ensure that nothing there remains. That's how we currently do it in tests anyway.
Generally the expectation is that you register an event loop and then run it forever until the process exits.
TBH asyncio explicitly disclaims that guarantee, and there are different recommendations as to how people shoud run their loops. Some those include running one loop per thread. Some non-stdlib loop implementations even allow running loop inside the loop for various X toolkit related reasons. If this patch is controversial, should I submit a series without this patch, because other two are unrelated? -- pozdrawiam / best regards _.-._ Wojtek Porczyk .-^' '^-. Invisible Things Lab |'-.-^-.-'| | | | | I do not fear computers, | '-.-' | I fear lack of them. '-._ : ,-' -- Isaac Asimov `^-^-_>
participants (2)
-
Daniel P. Berrange -
Wojtek Porczyk