
On Thu, Sep 14, 2017 at 02:41:12AM +0200, Wojtek Porczyk wrote:
The intended use is to ensure that the implementation is empty, which is one way to ensure that all connections were properly closed and file descriptors reclaimed.
Signed-off-by: Wojtek Porczyk <woju@invisiblethingslab.com> --- libvirtaio.py | 36 ++++++++++++++++++++++++++++++++++-- 1 file changed, 34 insertions(+), 2 deletions(-)
diff --git a/libvirtaio.py b/libvirtaio.py index 97a7f6c..1c432dd 100644 --- a/libvirtaio.py +++ b/libvirtaio.py @@ -269,10 +269,27 @@ class virEventAsyncIOImpl(object): self.descriptors = DescriptorDict(self) self.log = logging.getLogger(self.__class__.__name__)
+ # NOTE invariant: _finished.is_set() iff _pending == 0 + self._pending = 0 + self._finished = asyncio.Event(loop=loop) + self._finished.set() + def __repr__(self): return '<{} callbacks={} descriptors={}>'.format( type(self).__name__, self.callbacks, self.descriptors)
+ def _pending_inc(self): + '''Increase the count of pending affairs. Do not use directly.''' + self._pending += 1 + self._finished.clear() + + def _pending_dec(self): + '''Decrease the count of pending affairs. Do not use directly.''' + assert self._pending > 0 + self._pending -= 1 + if self._pending == 0: + self._finished.set() + def register(self): '''Register this instance as event loop implementation''' # pylint: disable=bad-whitespace @@ -293,7 +310,20 @@ class virEventAsyncIOImpl(object): This is a coroutine. ''' self.log.debug('ff_callback(iden=%d, opaque=...)', iden) - return libvirt.virEventInvokeFreeCallback(opaque) + ret = libvirt.virEventInvokeFreeCallback(opaque) + self._pending_dec() + return ret + + @asyncio.coroutine + def drain(self): + '''Wait for the implementation to become idle. + + This is a coroutine. + ''' + self.log.debug('drain()') + if self._pending: + yield from self._finished.wait() + self.log.debug('drain ended')
What is responsible for calling 'drain' ?
def is_idle(self): '''Returns False if there are leftovers from a connection @@ -301,7 +331,7 @@ class virEventAsyncIOImpl(object): Those may happen if there are sematical problems while closing a connection. For example, not deregistered events before .close(). ''' - return not self.callbacks + return not self.callbacks and not self._pending
def _add_handle(self, fd, event, cb, opaque): '''Register a callback for monitoring file handle events @@ -324,6 +354,7 @@ class virEventAsyncIOImpl(object): fd, event, callback.iden) self.callbacks[callback.iden] = callback self.descriptors[fd].add_handle(callback) + self._pending_inc() return callback.iden
def _update_handle(self, watch, event): @@ -378,6 +409,7 @@ class virEventAsyncIOImpl(object): timeout, callback.iden) self.callbacks[callback.iden] = callback callback.update(timeout=timeout) + self._pending_inc() return callback.iden
def _update_timeout(self, timer, timeout):
Regards, Daniel -- |: https://berrange.com -o- https://www.flickr.com/photos/dberrange :| |: https://libvirt.org -o- https://fstop138.berrange.com :| |: https://entangle-photo.org -o- https://www.instagram.com/dberrange :|