This is usable only on python >= 3.4 (or 3.3 with out-of-tree asyncio),
however it should be harmless for anyone with older python versions.
In simplest case, to have the callbacks queued on the default loop:
>> import libvirtaio
>> libvirtaio.virEventRegisterAsyncIOImpl()
The function is not present on non-compatible platforms.
Signed-off-by: Wojtek Porczyk <woju(a)invisiblethingslab.com>
---
libvirt-python.spec.in | 1 +
libvirtaio.py | 401 +++++++++++++++++++++++++++++++++++++++++++++++++
sanitytest.py | 2 +-
setup.py | 12 ++
4 files changed, 415 insertions(+), 1 deletion(-)
create mode 100644 libvirtaio.py
diff --git a/libvirt-python.spec.in b/libvirt-python.spec.in
index 3021ebd..0ee535e 100644
--- a/libvirt-python.spec.in
+++ b/libvirt-python.spec.in
@@ -86,6 +86,7 @@ rm -f %{buildroot}%{_libdir}/python*/site-packages/*egg-info
%defattr(-,root,root)
%doc ChangeLog AUTHORS NEWS README COPYING COPYING.LESSER examples/
%{_libdir}/python3*/site-packages/libvirt.py*
+%{_libdir}/python3*/site-packages/libvirtaio.py*
%{_libdir}/python3*/site-packages/libvirt_qemu.py*
%{_libdir}/python3*/site-packages/libvirt_lxc.py*
%{_libdir}/python3*/site-packages/__pycache__/libvirt.cpython-*.py*
diff --git a/libvirtaio.py b/libvirtaio.py
new file mode 100644
index 0000000..8428f71
--- /dev/null
+++ b/libvirtaio.py
@@ -0,0 +1,401 @@
+#
+# libvirtaio -- asyncio adapter for libvirt
+# Copyright (C) 2017 Wojtek Porczyk <woju(a)invisiblethingslab.com>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, see
+# <
http://www.gnu.org/licenses/>.
+#
+
+'''Libvirt event loop implementation using asyncio
+
+Register the implementation of default loop:
+
+ >>> import libvirtaio
+ >>> libvirtaio.virEventRegisterAsyncIOImpl()
+
+.. seealso::
+
https://libvirt.org/html/libvirt-libvirt-event.html
+'''
+
+__author__ = 'Wojtek Porczyk <woju(a)invisiblethingslab.com>'
+__license__ = 'LGPL-2.1+'
+__all__ = ['virEventAsyncIOImpl', 'virEventRegisterAsyncIOImpl']
+
+import asyncio
+import itertools
+import logging
+import warnings
+
+import libvirt
+
+try:
+ from asyncio import ensure_future
+except ImportError:
+ from asyncio import async as ensure_future
+
+
+class Callback(object):
+ '''Base class for holding callback
+
+ :param virEventAsyncIOImpl impl: the implementation in which we run
+ :param cb: the callback itself
+ :param opaque: the opaque tuple passed by libvirt
+ '''
+ # pylint: disable=too-few-public-methods
+
+ _iden_counter = itertools.count()
+
+ def __init__(self, impl, cb, opaque, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.iden = next(self._iden_counter)
+ self.impl = impl
+ self.cb = cb
+ self.opaque = opaque
+
+ assert self.iden not in self.impl.callbacks, \
+ 'found {} callback: {!r}'.format(
+ self.iden, self.impl.callbacks[self.iden])
+ self.impl.callbacks[self.iden] = self
+
+ def __repr__(self):
+ return '<{} iden={}>'.format(self.__clas__.__name__, self.iden)
+
+ def close(self):
+ '''Schedule *ff* callback'''
+ self.impl.log.debug('callback %d close(), scheduling ff', self.iden)
+ self.impl.schedule_ff_callback(self.opaque)
+
+#
+# file descriptors
+#
+
+class Descriptor(object):
+ '''Manager of one file descriptor
+
+ :param virEventAsyncIOImpl impl: the implementation in which we run
+ :param int fd: the file descriptor
+ '''
+ def __init__(self, impl, fd):
+ self.impl = impl
+ self.fd = fd
+ self.callbacks = {}
+
+ def _handle(self, event):
+ '''Dispatch the event to the descriptors
+
+ :param int event: The event (from libvirt's constants) being dispatched
+ '''
+ for callback in self.callbacks.values():
+ if callback.event is not None and callback.event & event:
+ callback.cb(callback.iden, self.fd, event, callback.opaque)
+
+ def update(self):
+ '''Register or unregister callbacks at event loop
+
+ This should be called after change of any ``.event`` in callbacks.
+ '''
+ # It seems like loop.add_{reader,writer} can be run multiple times
+ # and will still register the callback only once. Likewise,
+ # remove_{reader,writer} may be run even if the reader/writer
+ # is not registered (and will just return False).
+
+ # For the edge case of empty callbacks, any() returns False.
+ if any(callback.event & ~(
+ libvirt.VIR_EVENT_HANDLE_READABLE |
+ libvirt.VIR_EVENT_HANDLE_WRITABLE)
+ for callback in self.callbacks.values()):
+ warnings.warn(
+ 'The only event supported are VIR_EVENT_HANDLE_READABLE '
+ 'and VIR_EVENT_HANDLE_WRITABLE',
+ UserWarning)
+
+ if any(callback.event & libvirt.VIR_EVENT_HANDLE_READABLE
+ for callback in self.callbacks.values()):
+ self.impl.loop.add_reader(
+ self.fd, self._handle, libvirt.VIR_EVENT_HANDLE_READABLE)
+ else:
+ self.impl.loop.remove_reader(self.fd)
+
+ if any(callback.event & libvirt.VIR_EVENT_HANDLE_WRITABLE
+ for callback in self.callbacks.values()):
+ self.impl.loop.add_writer(
+ self.fd, self._handle, libvirt.VIR_EVENT_HANDLE_WRITABLE)
+ else:
+ self.impl.loop.remove_writer(self.fd)
+
+ def add_handle(self, callback):
+ '''Add a callback to the descriptor
+
+ :param FDCallback callback: the callback to add
+ :rtype: None
+
+ After adding the callback, it is immediately watched.
+ '''
+ self.callbacks[callback.iden] = callback
+ self.update()
+
+ def remove_handle(self, iden):
+ '''Remove a callback from the descriptor
+
+ :param int iden: the identifier of the callback
+ :returns: the callback
+ :rtype: FDCallback
+
+ After removing the callback, the descriptor may be unwatched, if there
+ are no more handles for it.
+ '''
+ callback = self.callbacks.pop(iden)
+ self.update()
+ return callback
+
+ def close(self):
+ ''''''
+ self.callbacks.clear()
+ self.update()
+
+class DescriptorDict(dict):
+ '''Descriptors collection
+
+ This is used internally by virEventAsyncIOImpl to hold descriptors.
+ '''
+ def __init__(self, impl):
+ super().__init__()
+ self.impl = impl
+
+ def __missing__(self, fd):
+ descriptor = Descriptor(self.impl, fd)
+ self[fd] = descriptor
+ return descriptor
+
+class FDCallback(Callback):
+ '''Callback for file descriptor (watcher)
+
+ :param Descriptor descriptor: the descriptor manager
+ :param int event: bitset of events on which to fire the callback
+ '''
+ # pylint: disable=too-few-public-methods
+
+ def __init__(self, *args, descriptor, event, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.descriptor = descriptor
+ self.event = event
+
+ def __repr__(self):
+ return '<{} iden={} fd={} event={}>'.format(
+ self.__class__.__name__, self.iden, self.descriptor.fd, self.event)
+
+ def update(self, *, event):
+ '''Update the callback and fix descriptor's
watchers'''
+ self.event = event
+ self.descriptor.update()
+
+#
+# timeouts
+#
+
+class TimeoutCallback(Callback):
+ '''Callback for timer'''
+ def __init__(self, *args, timeout, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.timeout = timeout
+ self._task = None
+
+ def __repr__(self):
+ return '<{} iden={} timeout={}>'.format(
+ self.__class__.__name__, self.iden, self.timeout)
+
+ @asyncio.coroutine
+ def _timer(self):
+ '''An actual timer running on the event loop.
+
+ This is a coroutine.
+ '''
+ while True:
+ assert self.timeout >= 0, \
+ 'invalid timeout {} for running timer'.format(self.timeout)
+
+ try:
+ if self.timeout > 0:
+ timeout = self.timeout * 1e-3
+ self.impl.log.debug('sleeping %r', timeout)
+ yield from asyncio.sleep(timeout)
+ else:
+ # scheduling timeout for next loop iteration
+ yield
+
+ except asyncio.CancelledError:
+ self.impl.log.debug('timer %d cancelled', self.iden)
+ break
+
+ self.cb(self.iden, self.opaque)
+ self.impl.log.debug('timer %r callback ended', self.iden)
+
+ def update(self, *, timeout=None):
+ '''Start or the timer, possibly updating timeout'''
+ if timeout is not None:
+ self.timeout = timeout
+
+ if self.timeout >= 0 and self._task is None:
+ self.impl.log.debug('timer %r start', self.iden)
+ self._task = ensure_future(self._timer(),
+ loop=self.impl.loop)
+
+ elif self.timeout < 0 and self._task is not None:
+ self.impl.log.debug('timer %r stop', self.iden)
+ self._task.cancel() # pylint: disable=no-member
+ self._task = None
+
+ def close(self):
+ '''Stop the timer and call ff callback'''
+ self.timeout = -1
+ self.update()
+ super().close()
+
+#
+# main implementation
+#
+
+class virEventAsyncIOImpl(object):
+ '''Libvirt event adapter to asyncio.
+
+ :param loop: asyncio's event loop
+
+ If *loop* is not specified, the current (or default) event loop is used.
+ '''
+
+ def __init__(self, *, loop=None):
+ self.loop = loop or asyncio.get_event_loop()
+ self.callbacks = {}
+ self.descriptors = DescriptorDict(self)
+ self.log = logging.getLogger(self.__class__.__name__)
+
+ def register(self):
+ '''Register this instance as event loop
implementation'''
+ # pylint: disable=bad-whitespace
+ self.log.debug('register()')
+ libvirt.virEventRegisterImpl(
+ self._add_handle, self._update_handle, self._remove_handle,
+ self._add_timeout, self._update_timeout, self._remove_timeout)
+ return self
+
+ def schedule_ff_callback(self, opaque):
+ '''Schedule a ff callback from one of the handles or
timers'''
+ self.loop.call_soon(libvirt.virEventExecuteFFCallback, opaque)
+
+ def is_idle(self):
+ '''Returns False if there are leftovers from a connection
+
+ Those may happen if there are sematical problems while closing
+ a connection. For example, not deregistered events before .close().
+ '''
+ return not self.callbacks
+
+ def _add_handle(self, fd, event, cb, opaque):
+ '''Register a callback for monitoring file handle events
+
+ :param int fd: file descriptor to listen on
+ :param int event: bitset of events on which to fire the callback
+ :param cb: the callback to be called when an event occurrs
+ :param opaque: user data to pass to the callback
+ :rtype: int
+ :returns: handle watch number to be used for updating and unregistering for
events
+
+ .. seealso::
+
https://libvirt.org/html/libvirt-libvirt-event.html#virEventAddHandleFunc...
+ '''
+ self.log.debug('add_handle(fd=%d, event=%d, cb=%r, opaque=%r)',
+ fd, event, cb, opaque)
+ callback = FDCallback(self, cb, opaque,
+ descriptor=self.descriptors[fd], event=event)
+ self.callbacks[callback.iden] = callback
+ self.descriptors[fd].add_handle(callback)
+ return callback.iden
+
+ def _update_handle(self, watch, event):
+ '''Change event set for a monitored file handle
+
+ :param int watch: file descriptor watch to modify
+ :param int event: new events to listen on
+
+ .. seealso::
+
https://libvirt.org/html/libvirt-libvirt-event.html#virEventUpdateHandleFunc
+ '''
+ self.log.debug('update_handle(watch=%d, event=%d)', watch, event)
+ return self.callbacks[watch].update(event=event)
+
+ def _remove_handle(self, watch):
+ '''Unregister a callback from a file handle.
+
+ :param int watch: file descriptor watch to stop listening on
+ :returns: None (see source for explanation)
+
+ .. seealso::
+
https://libvirt.org/html/libvirt-libvirt-event.html#virEventRemoveHandleFunc
+ '''
+ self.log.debug('remove_handle(watch=%d)', watch)
+ callback = self.callbacks.pop(watch)
+ assert callback is self.descriptors.remove_handle(watch)
+ callback.close()
+
+ def _add_timeout(self, timeout, cb, opaque):
+ '''Register a callback for a timer event
+
+ :param int timeout: the timeout to monitor
+ :param cb: the callback to call when timeout has expired
+ :param opaque: user data to pass to the callback
+ :rtype: int
+ :returns: a timer value
+
+ .. seealso::
+
https://libvirt.org/html/libvirt-libvirt-event.html#virEventAddTimeoutFunc
+ '''
+ self.log.debug('add_timeout(timeout=%d, cb=%r, opaque=%r)',
+ timeout, cb, opaque)
+ callback = TimeoutCallback(self, cb, opaque, timeout=timeout)
+ self.callbacks[callback.iden] = callback
+ callback.update()
+ return callback.iden
+
+ def _update_timeout(self, timer, timeout):
+ '''Change frequency for a timer
+
+ :param int timer: the timer to modify
+ :param int timeout: the new timeout value in ms
+
+ .. seealso::
+
https://libvirt.org/html/libvirt-libvirt-event.html#virEventUpdateTimeout...
+ '''
+ self.log.debug('update_timeout(timer=%d, timeout=%d)', timer, timeout)
+ return self.callbacks[timer].update(timeout=timeout)
+
+ def _remove_timeout(self, timer):
+ '''Unregister a callback for a timer
+
+ :param int timer: the timer to remove
+ :returns: None (see source for explanation)
+
+ .. seealso::
+
https://libvirt.org/html/libvirt-libvirt-event.html#virEventRemoveTimeout...
+ '''
+ self.log.debug('remove_timeout(timer=%d)', timer)
+ callback = self.callbacks.pop(timer)
+ callback.close()
+
+def virEventRegisterAsyncIOImpl(*, loop=None):
+ '''Arrange for libvirt's callbacks to be dispatched via asyncio event
loop
+
+ The implementation object is returned, but in normal usage it can safely be
+ discarded.
+ '''
+ return virEventAsyncIOImpl(loop=loop).register()
diff --git a/sanitytest.py b/sanitytest.py
index 6548831..53a739f 100644
--- a/sanitytest.py
+++ b/sanitytest.py
@@ -350,7 +350,7 @@ for klass in gotfunctions:
for func in sorted(gotfunctions[klass]):
# These are pure python methods with no C APi
if func in ["connect", "getConnect", "domain",
"getDomain",
- "virEventRegisterAsyncIOImpl"]:
+ "virEventRegisterAsyncIOImpl",
"virEventExecuteFFCallback"]:
continue
key = "%s.%s" % (klass, func)
diff --git a/setup.py b/setup.py
index 120ddd5..bac9010 100755
--- a/setup.py
+++ b/setup.py
@@ -14,6 +14,7 @@ import sys
import os
import os.path
import re
+import shutil
import time
MIN_LIBVIRT = "0.9.11"
@@ -50,6 +51,12 @@ def have_libvirt_lxc():
except DistutilsExecError:
return False
+def have_libvirtaio():
+ # This depends on asyncio, which in turn depends on "yield from" syntax.
+ # The asyncio module itself is in standard library since 3.4, but there is
+ # an out-of-tree version compatible with 3.3.
+ return sys.version_info >= (3, 3)
+
def get_pkgconfig_data(args, mod, required=True):
"""Run pkg-config to and return content associated with
it"""
f = os.popen("%s %s %s" % (get_pkgcfg(), " ".join(args), mod))
@@ -124,6 +131,9 @@ def get_module_lists():
c_modules.append(modulelxc)
py_modules.append("libvirt_lxc")
+ if have_libvirtaio():
+ py_modules.append("libvirtaio")
+
return c_modules, py_modules
@@ -141,6 +151,8 @@ class my_build(build):
self.spawn([sys.executable, "generator.py", "libvirt-qemu",
apis[1]])
if have_libvirt_lxc():
self.spawn([sys.executable, "generator.py",
"libvirt-lxc", apis[2]])
+ if have_libvirtaio():
+ shutil.copy('libvirtaio.py', 'build')
build.run(self)
--
2.5.5