The intended use is to ensure that the implementation is empty, which is
one way to ensure that all connections were properly closed and file
descriptors reclaimed.
Signed-off-by: Wojtek Porczyk <woju(a)invisiblethingslab.com>
---
libvirtaio.py | 36 ++++++++++++++++++++++++++++++++++--
1 file changed, 34 insertions(+), 2 deletions(-)
diff --git a/libvirtaio.py b/libvirtaio.py
index 97a7f6c..1c432dd 100644
--- a/libvirtaio.py
+++ b/libvirtaio.py
@@ -269,10 +269,27 @@ class virEventAsyncIOImpl(object):
self.descriptors = DescriptorDict(self)
self.log = logging.getLogger(self.__class__.__name__)
+ # NOTE invariant: _finished.is_set() iff _pending == 0
+ self._pending = 0
+ self._finished = asyncio.Event(loop=loop)
+ self._finished.set()
+
def __repr__(self):
return '<{} callbacks={} descriptors={}>'.format(
type(self).__name__, self.callbacks, self.descriptors)
+ def _pending_inc(self):
+ '''Increase the count of pending affairs. Do not use
directly.'''
+ self._pending += 1
+ self._finished.clear()
+
+ def _pending_dec(self):
+ '''Decrease the count of pending affairs. Do not use
directly.'''
+ assert self._pending > 0
+ self._pending -= 1
+ if self._pending == 0:
+ self._finished.set()
+
def register(self):
'''Register this instance as event loop
implementation'''
# pylint: disable=bad-whitespace
@@ -293,7 +310,20 @@ class virEventAsyncIOImpl(object):
This is a coroutine.
'''
self.log.debug('ff_callback(iden=%d, opaque=...)', iden)
- return libvirt.virEventInvokeFreeCallback(opaque)
+ ret = libvirt.virEventInvokeFreeCallback(opaque)
+ self._pending_dec()
+ return ret
+
+ @asyncio.coroutine
+ def drain(self):
+ '''Wait for the implementation to become idle.
+
+ This is a coroutine.
+ '''
+ self.log.debug('drain()')
+ if self._pending:
+ yield from self._finished.wait()
+ self.log.debug('drain ended')
def is_idle(self):
'''Returns False if there are leftovers from a connection
@@ -301,7 +331,7 @@ class virEventAsyncIOImpl(object):
Those may happen if there are sematical problems while closing
a connection. For example, not deregistered events before .close().
'''
- return not self.callbacks
+ return not self.callbacks and not self._pending
def _add_handle(self, fd, event, cb, opaque):
'''Register a callback for monitoring file handle events
@@ -324,6 +354,7 @@ class virEventAsyncIOImpl(object):
fd, event, callback.iden)
self.callbacks[callback.iden] = callback
self.descriptors[fd].add_handle(callback)
+ self._pending_inc()
return callback.iden
def _update_handle(self, watch, event):
@@ -378,6 +409,7 @@ class virEventAsyncIOImpl(object):
timeout, callback.iden)
self.callbacks[callback.iden] = callback
callback.update(timeout=timeout)
+ self._pending_inc()
return callback.iden
def _update_timeout(self, timer, timeout):
--
2.9.4