On Thu, Aug 31, 2017 at 09:40:23PM +0200, Wojtek Porczyk wrote:
The virEvent implementation is tied to a particular loop. When
spinning
another loop, the callbacks have to be moved to another implementation,
so they will have a chance to be invoked, should they be scheduled. If
not, file descriptors will be leaking.
Signed-off-by: Wojtek Porczyk <woju(a)invisiblethingslab.com>
---
libvirtaio.py | 64 +++++++++++++++++++++++++++++++++++++++++++++++------------
1 file changed, 51 insertions(+), 13 deletions(-)
diff --git a/libvirtaio.py b/libvirtaio.py
index fc868bd..d161cd1 100644
--- a/libvirtaio.py
+++ b/libvirtaio.py
@@ -195,9 +195,10 @@ class FDCallback(Callback):
return '<{} iden={} fd={} event={}>'.format(
self.__class__.__name__, self.iden, self.descriptor.fd, self.event)
- def update(self, event):
+ def update(self, event=None):
'''Update the callback and fix descriptor's
watchers'''
- self.event = event
+ if event is not None:
+ self.event = event
self.descriptor.update()
#
@@ -238,20 +239,21 @@ class TimeoutCallback(Callback):
self.cb(self.iden, self.opaque)
self.impl.log.debug('timer %r callback ended', self.iden)
- def update(self, timeout):
+ def update(self, timeout=None):
'''Start or the timer, possibly updating timeout'''
- self.timeout = timeout
-
- if self.timeout >= 0 and self._task is None:
- self.impl.log.debug('timer %r start', self.iden)
- self._task = ensure_future(self._timer(),
- loop=self.impl.loop)
+ if timeout is not None:
+ self.timeout = timeout
- elif self.timeout < 0 and self._task is not None:
+ if self._task is not None:
self.impl.log.debug('timer %r stop', self.iden)
self._task.cancel() # pylint: disable=no-member
self._task = None
+ if self.timeout >= 0:
+ self.impl.log.debug('timer %r start', self.iden)
+ self._task = ensure_future(self._timer(),
+ loop=self.impl.loop)
+
def close(self):
'''Stop the timer and call ff callback'''
super(TimeoutCallback, self).close()
@@ -274,6 +276,7 @@ class virEventAsyncIOImpl(object):
self.callbacks = {}
self.descriptors = DescriptorDict(self)
self.log = logging.getLogger(self.__class__.__name__)
+ self.pending_tasks = set()
def register(self):
'''Register this instance as event loop
implementation'''
@@ -284,9 +287,30 @@ class virEventAsyncIOImpl(object):
self._add_timeout, self._update_timeout, self._remove_timeout)
return self
+ def takeover(self, other):
+ '''Take over other implementation, probably registered on another
loop
+
+ :param virEventAsyncIOImpl other: other implementation to be taken over
+ '''
+ self.log.warning('%r taking over %r', self, other)
+
+ while other.callbacks:
+ iden, callback = other.callbacks.popitem()
+ self.log.debug(' takeover %d %r', iden, callback)
+ assert callback.iden == iden
+ callback.impl = self
+ self.callbacks[iden] = callback
+
+ if isinstance(callback, FDCallback):
+ fd = callback.descriptor.fd
+ assert callback is other.descriptors[fd].remove_handle(iden)
+ self.descriptors[fd].add_handle(callback)
+
def schedule_ff_callback(self, iden, opaque):
'''Schedule a ff callback from one of the handles or
timers'''
- ensure_future(self._ff_callback(iden, opaque), loop=self.loop)
+ fut = ensure_future(self._ff_callback(iden, opaque), loop=self.loop)
+ self.pending_tasks.add(fut)
+ fut.add_done_callback(self.pending_tasks.remove)
@asyncio.coroutine
def _ff_callback(self, iden, opaque):
@@ -297,13 +321,19 @@ class virEventAsyncIOImpl(object):
self.log.debug('ff_callback(iden=%d, opaque=...)', iden)
return libvirt.virEventInvokeFreeCallback(opaque)
+ @asyncio.coroutine
+ def drain(self):
+ self.log.debug('drain()')
+ if self.pending_tasks:
+ yield from asyncio.wait(self.pending_tasks, loop=self.loop)
+
def is_idle(self):
'''Returns False if there are leftovers from a connection
Those may happen if there are sematical problems while closing
a connection. For example, not deregistered events before .close().
'''
- return not self.callbacks
+ return not self.callbacks and not self.pending_tasks
def _add_handle(self, fd, event, cb, opaque):
'''Register a callback for monitoring file handle events
@@ -403,10 +433,18 @@ class virEventAsyncIOImpl(object):
callback = self.callbacks.pop(timer)
callback.close()
+
+_current_impl = None
def virEventRegisterAsyncIOImpl(loop=None):
'''Arrange for libvirt's callbacks to be dispatched via asyncio
event loop
The implementation object is returned, but in normal usage it can safely be
discarded.
'''
- return virEventAsyncIOImpl(loop=loop).register()
+ global _current_impl
+ impl = virEventAsyncIOImpl(loop=loop)
+ impl.register()
+ if _current_impl is not None:
+ impl.takeover(_current_impl)
+ _current_impl = impl
+ return impl
IIUC, you are trying to make it possible to register multiple event
loop impls. This is *not* supported usage of libvirt. You must
call 'virEventRegisterImpl' before opening any connection, and once
called you are forbidden to call it again.
Regards,
Daniel
--
|:
https://berrange.com -o-
https://www.flickr.com/photos/dberrange :|
|:
https://libvirt.org -o-
https://fstop138.berrange.com :|
|:
https://entangle-photo.org -o-
https://www.instagram.com/dberrange :|