[libvirt] [PATCH v2 0/2] libvirt-python: libvirtaio

Hello, libvirt-list, This is second attempt at merging libvirtaio, an event loop implementation which dispatches the callbacks via asyncio's event loop. The first patch fixes the bug around freeing opaque objects [1][2], and the second one is the actual implementation. Since v1 series, as per Daniel Berrange's notes, the second patch has licence comment changed to LGPL-2.1+ and there is no import into main libvirt module. The first patch is unchanged. [1] https://www.redhat.com/archives/libvir-list/2017-January/msg00863.html [2] https://bugzilla.redhat.com/show_bug.cgi?id=1433028 Wojtek Porczyk (2): Allow for ff callbacks to be called by custom event implementations Add asyncio event loop implementation libvirt-override.c | 36 ++--- libvirt-override.py | 39 +++++ libvirt-python.spec.in | 1 + libvirtaio.py | 401 +++++++++++++++++++++++++++++++++++++++++++++++++ sanitytest.py | 5 +- setup.py | 12 ++ 6 files changed, 468 insertions(+), 26 deletions(-) create mode 100644 libvirtaio.py -- pozdrawiam / best regards _.-._ Wojtek Porczyk .-^' '^-. Invisible Things Lab |'-.-^-.-'| | | | | I do not fear computers, | '-.-' | I fear lack of them. '-._ : ,-' -- Isaac Asimov `^-^-_>

The documentation says:
If the opaque user data requires free'ing when the handle is unregistered, then a 2nd callback can be supplied for this purpose. This callback needs to be invoked from a clean stack. If 'ff' callbacks are invoked directly from the virEventRemoveHandleFunc they will likely deadlock in libvirt.
And they did deadlock. In removeTimeout too. Now we supply a custom function to pick it from the opaque blob and fire. Signed-off-by: Wojtek Porczyk <woju@invisiblethingslab.com> --- libvirt-override.c | 36 ++++++++++++------------------------ libvirt-override.py | 39 +++++++++++++++++++++++++++++++++++++++ sanitytest.py | 5 +++-- 3 files changed, 54 insertions(+), 26 deletions(-) diff --git a/libvirt-override.c b/libvirt-override.c index 9e40f00..37f7ee2 100644 --- a/libvirt-override.c +++ b/libvirt-override.c @@ -5223,6 +5223,9 @@ libvirt_virEventAddHandleFunc(int fd, VIR_PY_TUPLE_SET_GOTO(pyobj_args, 3, cb_args, cleanup); + /* If changing contents of the opaque object, please also change + * virEventExecuteFFCallback() in libvirt-override.py + */ VIR_PY_TUPLE_SET_GOTO(cb_args, 0, libvirt_virEventHandleCallbackWrap(cb), cleanup); VIR_PY_TUPLE_SET_GOTO(cb_args, 1, libvirt_virVoidPtrWrap(opaque), cleanup); VIR_PY_TUPLE_SET_GOTO(cb_args, 2, libvirt_virFreeCallbackWrap(ff), cleanup); @@ -5292,20 +5295,11 @@ libvirt_virEventRemoveHandleFunc(int watch) VIR_PY_TUPLE_SET_GOTO(pyobj_args, 0, libvirt_intWrap(watch), cleanup); result = PyEval_CallObject(removeHandleObj, pyobj_args); - if (!result) { + if (result) { + retval = 0; + } else { PyErr_Print(); PyErr_Clear(); - } else if (!PyTuple_Check(result) || PyTuple_Size(result) != 3) { - DEBUG("%s: %s must return opaque obj registered with %s" - "to avoid leaking libvirt memory\n", - __FUNCTION__, NAME(removeHandle), NAME(addHandle)); - } else { - opaque = PyTuple_GetItem(result, 1); - ff = PyTuple_GetItem(result, 2); - cff = PyvirFreeCallback_Get(ff); - if (cff) - (*cff)(PyvirVoidPtr_Get(opaque)); - retval = 0; } cleanup: @@ -5350,6 +5344,9 @@ libvirt_virEventAddTimeoutFunc(int timeout, VIR_PY_TUPLE_SET_GOTO(pyobj_args, 2, cb_args, cleanup); + /* If changing contents of the opaque object, please also change + * virEventExecuteFFCallback() in libvirt-override.py + */ VIR_PY_TUPLE_SET_GOTO(cb_args, 0, libvirt_virEventTimeoutCallbackWrap(cb), cleanup); VIR_PY_TUPLE_SET_GOTO(cb_args, 1, libvirt_virVoidPtrWrap(opaque), cleanup); VIR_PY_TUPLE_SET_GOTO(cb_args, 2, libvirt_virFreeCallbackWrap(ff), cleanup); @@ -5416,20 +5413,11 @@ libvirt_virEventRemoveTimeoutFunc(int timer) VIR_PY_TUPLE_SET_GOTO(pyobj_args, 0, libvirt_intWrap(timer), cleanup); result = PyEval_CallObject(removeTimeoutObj, pyobj_args); - if (!result) { + if (result) { + retval = 0; + } else { PyErr_Print(); PyErr_Clear(); - } else if (!PyTuple_Check(result) || PyTuple_Size(result) != 3) { - DEBUG("%s: %s must return opaque obj registered with %s" - "to avoid leaking libvirt memory\n", - __FUNCTION__, NAME(removeTimeout), NAME(addTimeout)); - } else { - opaque = PyTuple_GetItem(result, 1); - ff = PyTuple_GetItem(result, 2); - cff = PyvirFreeCallback_Get(ff); - if (cff) - (*cff)(PyvirVoidPtr_Get(opaque)); - retval = 0; } cleanup: diff --git a/libvirt-override.py b/libvirt-override.py index 63f8ecb..3d09d63 100644 --- a/libvirt-override.py +++ b/libvirt-override.py @@ -16,6 +16,7 @@ except ImportError: if str(cyg_e).count("No module named"): raise lib_e +import ctypes import types # The root of all libvirt errors. @@ -211,3 +212,41 @@ def virEventAddTimeout(timeout, cb, opaque): ret = libvirtmod.virEventAddTimeout(timeout, cbData) if ret == -1: raise libvirtError ('virEventAddTimeout() failed') return ret + + +# +# a caller for the ff callbacks for custom event loop implementations +# + +ctypes.pythonapi.PyCapsule_GetPointer.restype = ctypes.c_void_p +ctypes.pythonapi.PyCapsule_GetPointer.argtypes = ( + ctypes.py_object, ctypes.c_char_p) + +_virFreeCallback = ctypes.CFUNCTYPE(None, ctypes.c_void_p) + +def virEventExecuteFFCallback(opaque): + """ + Execute callback which frees the opaque buffer + + @opaque: the opaque object passed to addHandle or addTimeout + + WARNING: This function should not be called from any call by libvirt's + core. It will most probably cause deadlock in C-level libvirt code. + Instead it should be scheduled and called from implementation's stack. + + See https://libvirt.org/html/libvirt-libvirt-event.html#virEventAddHandleFunc + for more information. + + This function is not dependent on any event loop implementation. + """ + # The opaque object is really a 3-tuple, which contains a the real opaque + # pointer and the ff callback, both of which are inside PyCapsules. If not + # specified, the ff may be None. See libvirt-override.c. + + dummy, caps_opaque, caps_ff = opaque + ff = _virFreeCallback(ctypes.pythonapi.PyCapsule_GetPointer( + caps_ff, "virFreeCallback".encode("ascii"))) + if ff: + real_opaque = ctypes.pythonapi.PyCapsule_GetPointer( + caps_opaque, "void*".encode("ascii")) + ff(real_opaque) diff --git a/sanitytest.py b/sanitytest.py index a140ba2..6548831 100644 --- a/sanitytest.py +++ b/sanitytest.py @@ -345,11 +345,12 @@ for name in sorted(finalklassmap): # Phase 6: Validate that every python API has a corresponding C API for klass in gotfunctions: - if klass == "libvirtError": + if klass in ("libvirtError", "virEventAsyncIOImpl"): continue for func in sorted(gotfunctions[klass]): # These are pure python methods with no C APi - if func in ["connect", "getConnect", "domain", "getDomain"]: + if func in ["connect", "getConnect", "domain", "getDomain", + "virEventRegisterAsyncIOImpl"]: continue key = "%s.%s" % (klass, func) -- 2.5.5

This is usable only on python >= 3.4 (or 3.3 with out-of-tree asyncio), however it should be harmless for anyone with older python versions. In simplest case, to have the callbacks queued on the default loop: >>> import libvirtaio >>> libvirtaio.virEventRegisterAsyncIOImpl() The function is not present on non-compatible platforms. Signed-off-by: Wojtek Porczyk <woju@invisiblethingslab.com> --- libvirt-python.spec.in | 1 + libvirtaio.py | 401 +++++++++++++++++++++++++++++++++++++++++++++++++ sanitytest.py | 2 +- setup.py | 12 ++ 4 files changed, 415 insertions(+), 1 deletion(-) create mode 100644 libvirtaio.py diff --git a/libvirt-python.spec.in b/libvirt-python.spec.in index 3021ebd..0ee535e 100644 --- a/libvirt-python.spec.in +++ b/libvirt-python.spec.in @@ -86,6 +86,7 @@ rm -f %{buildroot}%{_libdir}/python*/site-packages/*egg-info %defattr(-,root,root) %doc ChangeLog AUTHORS NEWS README COPYING COPYING.LESSER examples/ %{_libdir}/python3*/site-packages/libvirt.py* +%{_libdir}/python3*/site-packages/libvirtaio.py* %{_libdir}/python3*/site-packages/libvirt_qemu.py* %{_libdir}/python3*/site-packages/libvirt_lxc.py* %{_libdir}/python3*/site-packages/__pycache__/libvirt.cpython-*.py* diff --git a/libvirtaio.py b/libvirtaio.py new file mode 100644 index 0000000..8428f71 --- /dev/null +++ b/libvirtaio.py @@ -0,0 +1,401 @@ +# +# libvirtaio -- asyncio adapter for libvirt +# Copyright (C) 2017 Wojtek Porczyk <woju@invisiblethingslab.com> +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, see +# <http://www.gnu.org/licenses/>. +# + +'''Libvirt event loop implementation using asyncio + +Register the implementation of default loop: + + >>> import libvirtaio + >>> libvirtaio.virEventRegisterAsyncIOImpl() + +.. seealso:: + https://libvirt.org/html/libvirt-libvirt-event.html +''' + +__author__ = 'Wojtek Porczyk <woju@invisiblethingslab.com>' +__license__ = 'LGPL-2.1+' +__all__ = ['virEventAsyncIOImpl', 'virEventRegisterAsyncIOImpl'] + +import asyncio +import itertools +import logging +import warnings + +import libvirt + +try: + from asyncio import ensure_future +except ImportError: + from asyncio import async as ensure_future + + +class Callback(object): + '''Base class for holding callback + + :param virEventAsyncIOImpl impl: the implementation in which we run + :param cb: the callback itself + :param opaque: the opaque tuple passed by libvirt + ''' + # pylint: disable=too-few-public-methods + + _iden_counter = itertools.count() + + def __init__(self, impl, cb, opaque, *args, **kwargs): + super().__init__(*args, **kwargs) + self.iden = next(self._iden_counter) + self.impl = impl + self.cb = cb + self.opaque = opaque + + assert self.iden not in self.impl.callbacks, \ + 'found {} callback: {!r}'.format( + self.iden, self.impl.callbacks[self.iden]) + self.impl.callbacks[self.iden] = self + + def __repr__(self): + return '<{} iden={}>'.format(self.__clas__.__name__, self.iden) + + def close(self): + '''Schedule *ff* callback''' + self.impl.log.debug('callback %d close(), scheduling ff', self.iden) + self.impl.schedule_ff_callback(self.opaque) + +# +# file descriptors +# + +class Descriptor(object): + '''Manager of one file descriptor + + :param virEventAsyncIOImpl impl: the implementation in which we run + :param int fd: the file descriptor + ''' + def __init__(self, impl, fd): + self.impl = impl + self.fd = fd + self.callbacks = {} + + def _handle(self, event): + '''Dispatch the event to the descriptors + + :param int event: The event (from libvirt's constants) being dispatched + ''' + for callback in self.callbacks.values(): + if callback.event is not None and callback.event & event: + callback.cb(callback.iden, self.fd, event, callback.opaque) + + def update(self): + '''Register or unregister callbacks at event loop + + This should be called after change of any ``.event`` in callbacks. + ''' + # It seems like loop.add_{reader,writer} can be run multiple times + # and will still register the callback only once. Likewise, + # remove_{reader,writer} may be run even if the reader/writer + # is not registered (and will just return False). + + # For the edge case of empty callbacks, any() returns False. + if any(callback.event & ~( + libvirt.VIR_EVENT_HANDLE_READABLE | + libvirt.VIR_EVENT_HANDLE_WRITABLE) + for callback in self.callbacks.values()): + warnings.warn( + 'The only event supported are VIR_EVENT_HANDLE_READABLE ' + 'and VIR_EVENT_HANDLE_WRITABLE', + UserWarning) + + if any(callback.event & libvirt.VIR_EVENT_HANDLE_READABLE + for callback in self.callbacks.values()): + self.impl.loop.add_reader( + self.fd, self._handle, libvirt.VIR_EVENT_HANDLE_READABLE) + else: + self.impl.loop.remove_reader(self.fd) + + if any(callback.event & libvirt.VIR_EVENT_HANDLE_WRITABLE + for callback in self.callbacks.values()): + self.impl.loop.add_writer( + self.fd, self._handle, libvirt.VIR_EVENT_HANDLE_WRITABLE) + else: + self.impl.loop.remove_writer(self.fd) + + def add_handle(self, callback): + '''Add a callback to the descriptor + + :param FDCallback callback: the callback to add + :rtype: None + + After adding the callback, it is immediately watched. + ''' + self.callbacks[callback.iden] = callback + self.update() + + def remove_handle(self, iden): + '''Remove a callback from the descriptor + + :param int iden: the identifier of the callback + :returns: the callback + :rtype: FDCallback + + After removing the callback, the descriptor may be unwatched, if there + are no more handles for it. + ''' + callback = self.callbacks.pop(iden) + self.update() + return callback + + def close(self): + '''''' + self.callbacks.clear() + self.update() + +class DescriptorDict(dict): + '''Descriptors collection + + This is used internally by virEventAsyncIOImpl to hold descriptors. + ''' + def __init__(self, impl): + super().__init__() + self.impl = impl + + def __missing__(self, fd): + descriptor = Descriptor(self.impl, fd) + self[fd] = descriptor + return descriptor + +class FDCallback(Callback): + '''Callback for file descriptor (watcher) + + :param Descriptor descriptor: the descriptor manager + :param int event: bitset of events on which to fire the callback + ''' + # pylint: disable=too-few-public-methods + + def __init__(self, *args, descriptor, event, **kwargs): + super().__init__(*args, **kwargs) + self.descriptor = descriptor + self.event = event + + def __repr__(self): + return '<{} iden={} fd={} event={}>'.format( + self.__class__.__name__, self.iden, self.descriptor.fd, self.event) + + def update(self, *, event): + '''Update the callback and fix descriptor's watchers''' + self.event = event + self.descriptor.update() + +# +# timeouts +# + +class TimeoutCallback(Callback): + '''Callback for timer''' + def __init__(self, *args, timeout, **kwargs): + super().__init__(*args, **kwargs) + self.timeout = timeout + self._task = None + + def __repr__(self): + return '<{} iden={} timeout={}>'.format( + self.__class__.__name__, self.iden, self.timeout) + + @asyncio.coroutine + def _timer(self): + '''An actual timer running on the event loop. + + This is a coroutine. + ''' + while True: + assert self.timeout >= 0, \ + 'invalid timeout {} for running timer'.format(self.timeout) + + try: + if self.timeout > 0: + timeout = self.timeout * 1e-3 + self.impl.log.debug('sleeping %r', timeout) + yield from asyncio.sleep(timeout) + else: + # scheduling timeout for next loop iteration + yield + + except asyncio.CancelledError: + self.impl.log.debug('timer %d cancelled', self.iden) + break + + self.cb(self.iden, self.opaque) + self.impl.log.debug('timer %r callback ended', self.iden) + + def update(self, *, timeout=None): + '''Start or the timer, possibly updating timeout''' + if timeout is not None: + self.timeout = timeout + + if self.timeout >= 0 and self._task is None: + self.impl.log.debug('timer %r start', self.iden) + self._task = ensure_future(self._timer(), + loop=self.impl.loop) + + elif self.timeout < 0 and self._task is not None: + self.impl.log.debug('timer %r stop', self.iden) + self._task.cancel() # pylint: disable=no-member + self._task = None + + def close(self): + '''Stop the timer and call ff callback''' + self.timeout = -1 + self.update() + super().close() + +# +# main implementation +# + +class virEventAsyncIOImpl(object): + '''Libvirt event adapter to asyncio. + + :param loop: asyncio's event loop + + If *loop* is not specified, the current (or default) event loop is used. + ''' + + def __init__(self, *, loop=None): + self.loop = loop or asyncio.get_event_loop() + self.callbacks = {} + self.descriptors = DescriptorDict(self) + self.log = logging.getLogger(self.__class__.__name__) + + def register(self): + '''Register this instance as event loop implementation''' + # pylint: disable=bad-whitespace + self.log.debug('register()') + libvirt.virEventRegisterImpl( + self._add_handle, self._update_handle, self._remove_handle, + self._add_timeout, self._update_timeout, self._remove_timeout) + return self + + def schedule_ff_callback(self, opaque): + '''Schedule a ff callback from one of the handles or timers''' + self.loop.call_soon(libvirt.virEventExecuteFFCallback, opaque) + + def is_idle(self): + '''Returns False if there are leftovers from a connection + + Those may happen if there are sematical problems while closing + a connection. For example, not deregistered events before .close(). + ''' + return not self.callbacks + + def _add_handle(self, fd, event, cb, opaque): + '''Register a callback for monitoring file handle events + + :param int fd: file descriptor to listen on + :param int event: bitset of events on which to fire the callback + :param cb: the callback to be called when an event occurrs + :param opaque: user data to pass to the callback + :rtype: int + :returns: handle watch number to be used for updating and unregistering for events + + .. seealso:: + https://libvirt.org/html/libvirt-libvirt-event.html#virEventAddHandleFuncFun... + ''' + self.log.debug('add_handle(fd=%d, event=%d, cb=%r, opaque=%r)', + fd, event, cb, opaque) + callback = FDCallback(self, cb, opaque, + descriptor=self.descriptors[fd], event=event) + self.callbacks[callback.iden] = callback + self.descriptors[fd].add_handle(callback) + return callback.iden + + def _update_handle(self, watch, event): + '''Change event set for a monitored file handle + + :param int watch: file descriptor watch to modify + :param int event: new events to listen on + + .. seealso:: + https://libvirt.org/html/libvirt-libvirt-event.html#virEventUpdateHandleFunc + ''' + self.log.debug('update_handle(watch=%d, event=%d)', watch, event) + return self.callbacks[watch].update(event=event) + + def _remove_handle(self, watch): + '''Unregister a callback from a file handle. + + :param int watch: file descriptor watch to stop listening on + :returns: None (see source for explanation) + + .. seealso:: + https://libvirt.org/html/libvirt-libvirt-event.html#virEventRemoveHandleFunc + ''' + self.log.debug('remove_handle(watch=%d)', watch) + callback = self.callbacks.pop(watch) + assert callback is self.descriptors.remove_handle(watch) + callback.close() + + def _add_timeout(self, timeout, cb, opaque): + '''Register a callback for a timer event + + :param int timeout: the timeout to monitor + :param cb: the callback to call when timeout has expired + :param opaque: user data to pass to the callback + :rtype: int + :returns: a timer value + + .. seealso:: + https://libvirt.org/html/libvirt-libvirt-event.html#virEventAddTimeoutFunc + ''' + self.log.debug('add_timeout(timeout=%d, cb=%r, opaque=%r)', + timeout, cb, opaque) + callback = TimeoutCallback(self, cb, opaque, timeout=timeout) + self.callbacks[callback.iden] = callback + callback.update() + return callback.iden + + def _update_timeout(self, timer, timeout): + '''Change frequency for a timer + + :param int timer: the timer to modify + :param int timeout: the new timeout value in ms + + .. seealso:: + https://libvirt.org/html/libvirt-libvirt-event.html#virEventUpdateTimeoutFun... + ''' + self.log.debug('update_timeout(timer=%d, timeout=%d)', timer, timeout) + return self.callbacks[timer].update(timeout=timeout) + + def _remove_timeout(self, timer): + '''Unregister a callback for a timer + + :param int timer: the timer to remove + :returns: None (see source for explanation) + + .. seealso:: + https://libvirt.org/html/libvirt-libvirt-event.html#virEventRemoveTimeoutFun... + ''' + self.log.debug('remove_timeout(timer=%d)', timer) + callback = self.callbacks.pop(timer) + callback.close() + +def virEventRegisterAsyncIOImpl(*, loop=None): + '''Arrange for libvirt's callbacks to be dispatched via asyncio event loop + + The implementation object is returned, but in normal usage it can safely be + discarded. + ''' + return virEventAsyncIOImpl(loop=loop).register() diff --git a/sanitytest.py b/sanitytest.py index 6548831..53a739f 100644 --- a/sanitytest.py +++ b/sanitytest.py @@ -350,7 +350,7 @@ for klass in gotfunctions: for func in sorted(gotfunctions[klass]): # These are pure python methods with no C APi if func in ["connect", "getConnect", "domain", "getDomain", - "virEventRegisterAsyncIOImpl"]: + "virEventRegisterAsyncIOImpl", "virEventExecuteFFCallback"]: continue key = "%s.%s" % (klass, func) diff --git a/setup.py b/setup.py index 120ddd5..bac9010 100755 --- a/setup.py +++ b/setup.py @@ -14,6 +14,7 @@ import sys import os import os.path import re +import shutil import time MIN_LIBVIRT = "0.9.11" @@ -50,6 +51,12 @@ def have_libvirt_lxc(): except DistutilsExecError: return False +def have_libvirtaio(): + # This depends on asyncio, which in turn depends on "yield from" syntax. + # The asyncio module itself is in standard library since 3.4, but there is + # an out-of-tree version compatible with 3.3. + return sys.version_info >= (3, 3) + def get_pkgconfig_data(args, mod, required=True): """Run pkg-config to and return content associated with it""" f = os.popen("%s %s %s" % (get_pkgcfg(), " ".join(args), mod)) @@ -124,6 +131,9 @@ def get_module_lists(): c_modules.append(modulelxc) py_modules.append("libvirt_lxc") + if have_libvirtaio(): + py_modules.append("libvirtaio") + return c_modules, py_modules @@ -141,6 +151,8 @@ class my_build(build): self.spawn([sys.executable, "generator.py", "libvirt-qemu", apis[1]]) if have_libvirt_lxc(): self.spawn([sys.executable, "generator.py", "libvirt-lxc", apis[2]]) + if have_libvirtaio(): + shutil.copy('libvirtaio.py', 'build') build.run(self) -- 2.5.5

On Fri, Mar 17, 2017 at 02:35:53PM +0100, Wojtek Porczyk wrote:
+class Callback(object): + '''Base class for holding callback + + :param virEventAsyncIOImpl impl: the implementation in which we run + :param cb: the callback itself + :param opaque: the opaque tuple passed by libvirt + ''' + # pylint: disable=too-few-public-methods + + _iden_counter = itertools.count() + + def __init__(self, impl, cb, opaque, *args, **kwargs): + super().__init__(*args, **kwargs) + self.iden = next(self._iden_counter) + self.impl = impl + self.cb = cb + self.opaque = opaque + + assert self.iden not in self.impl.callbacks, \ + 'found {} callback: {!r}'.format( + self.iden, self.impl.callbacks[self.iden]) + self.impl.callbacks[self.iden] = self + + def __repr__(self): + return '<{} iden={}>'.format(self.__clas__.__name__, self.iden)
This looks like it should be 'self.__class__'
+class TimeoutCallback(Callback): + '''Callback for timer''' + def __init__(self, *args, timeout, **kwargs): + super().__init__(*args, **kwargs) + self.timeout = timeout + self._task = None + + def __repr__(self): + return '<{} iden={} timeout={}>'.format( + self.__class__.__name__, self.iden, self.timeout) + + @asyncio.coroutine + def _timer(self): + '''An actual timer running on the event loop. + + This is a coroutine. + ''' + while True: + assert self.timeout >= 0, \ + 'invalid timeout {} for running timer'.format(self.timeout)
When I test, this assert always fires. It seems that when we call 'close', setting timeout==-1, this _timer coroutine continues for one more iteration before CancelledError is triggered.
+ + try: + if self.timeout > 0: + timeout = self.timeout * 1e-3 + self.impl.log.debug('sleeping %r', timeout) + yield from asyncio.sleep(timeout) + else: + # scheduling timeout for next loop iteration + yield + + except asyncio.CancelledError: + self.impl.log.debug('timer %d cancelled', self.iden) + break + + self.cb(self.iden, self.opaque) + self.impl.log.debug('timer %r callback ended', self.iden) + + def update(self, *, timeout=None): + '''Start or the timer, possibly updating timeout''' + if timeout is not None: + self.timeout = timeout +
Using timeout=None as the default looks wrong to me - It should be either mandatory, or -1 IMHO.
+ if self.timeout >= 0 and self._task is None: + self.impl.log.debug('timer %r start', self.iden) + self._task = ensure_future(self._timer(), + loop=self.impl.loop) + + elif self.timeout < 0 and self._task is not None: + self.impl.log.debug('timer %r stop', self.iden) + self._task.cancel() # pylint: disable=no-member + self._task = None + + def close(self): + '''Stop the timer and call ff callback''' + self.timeout = -1 + self.update() + super().close()
diff --git a/setup.py b/setup.py index 120ddd5..bac9010 100755 --- a/setup.py +++ b/setup.py @@ -14,6 +14,7 @@ import sys import os import os.path import re +import shutil import time
MIN_LIBVIRT = "0.9.11" @@ -50,6 +51,12 @@ def have_libvirt_lxc(): except DistutilsExecError: return False
+def have_libvirtaio(): + # This depends on asyncio, which in turn depends on "yield from" syntax. + # The asyncio module itself is in standard library since 3.4, but there is + # an out-of-tree version compatible with 3.3. + return sys.version_info >= (3, 3) + def get_pkgconfig_data(args, mod, required=True): """Run pkg-config to and return content associated with it""" f = os.popen("%s %s %s" % (get_pkgcfg(), " ".join(args), mod)) @@ -124,6 +131,9 @@ def get_module_lists(): c_modules.append(modulelxc) py_modules.append("libvirt_lxc")
+ if have_libvirtaio(): + py_modules.append("libvirtaio") + return c_modules, py_modules
@@ -141,6 +151,8 @@ class my_build(build): self.spawn([sys.executable, "generator.py", "libvirt-qemu", apis[1]]) if have_libvirt_lxc(): self.spawn([sys.executable, "generator.py", "libvirt-lxc", apis[2]]) + if have_libvirtaio(): + shutil.copy('libvirtaio.py', 'build')
build.run(self)
We also need to add libvirtaio.py to MANIFEST.in to ensure it gets into the dist Regards, Daniel -- |: http://berrange.com -o- http://www.flickr.com/photos/dberrange/ :| |: http://libvirt.org -o- http://virt-manager.org :| |: http://entangle-photo.org -o- http://search.cpan.org/~danberr/ :|

On Fri, Mar 17, 2017 at 02:35:53PM +0100, Wojtek Porczyk wrote:
This is usable only on python >= 3.4 (or 3.3 with out-of-tree asyncio), however it should be harmless for anyone with older python versions.
In simplest case, to have the callbacks queued on the default loop:
>>> import libvirtaio >>> libvirtaio.virEventRegisterAsyncIOImpl()
The function is not present on non-compatible platforms.
Signed-off-by: Wojtek Porczyk <woju@invisiblethingslab.com> --- libvirt-python.spec.in | 1 + libvirtaio.py | 401 +++++++++++++++++++++++++++++++++++++++++++++++++ sanitytest.py | 2 +- setup.py | 12 ++ 4 files changed, 415 insertions(+), 1 deletion(-) create mode 100644 libvirtaio.py
+ def _remove_handle(self, watch): + '''Unregister a callback from a file handle. + + :param int watch: file descriptor watch to stop listening on + :returns: None (see source for explanation) + + .. seealso:: + https://libvirt.org/html/libvirt-libvirt-event.html#virEventRemoveHandleFunc + ''' + self.log.debug('remove_handle(watch=%d)', watch) + callback = self.callbacks.pop(watch) + assert callback is self.descriptors.remove_handle(watch) + callback.close()
This code contains a bug - self.descriptors needs to be indexed by 'fd'. We also need to delete the hash entry when the callbacks are empty. IOW, I think the code needs to be: callback = self.callbacks.pop(watch) fd = callback.descriptor.fd assert callback is self.descriptors[fd].remove_handle(watch) if len(self.descriptors[fd].callbacks) == 0: del self.descriptors[fd] callback.close() I'm going to send a patch series containing this fix & some other changes / fixes. Would appreciate if you can confirm my updated code still works from your pov. Regards, Daniel -- |: http://berrange.com -o- http://www.flickr.com/photos/dberrange/ :| |: http://libvirt.org -o- http://virt-manager.org :| |: http://entangle-photo.org -o- http://search.cpan.org/~danberr/ :|

On Tue, Apr 04, 2017 at 03:27:49PM +0100, Daniel P. Berrange wrote:
I'm going to send a patch series containing this fix & some other changes / fixes. Would appreciate if you can confirm my updated code still works from your pov.
Thanks! I'll look into this tomorrow. -- pozdrawiam / best regards _.-._ Wojtek Porczyk .-^' '^-. Invisible Things Lab |'-.-^-.-'| | | | | I do not fear computers, | '-.-' | I fear lack of them. '-._ : ,-' -- Isaac Asimov `^-^-_>

On Fri, Mar 17, 2017 at 02:35:15PM +0100, Wojtek Porczyk wrote:
This is second attempt at merging libvirtaio, an event loop implementation which dispatches the callbacks via asyncio's event loop.
Hi, libvirt-list, Any progress on this? Or is there some sub-list or maintainer, who I should Cc or contact directly? -- pozdrawiam / best regards _.-._ Wojtek Porczyk .-^' '^-. Invisible Things Lab |'-.-^-.-'| | | | | I do not fear computers, | '-.-' | I fear lack of them. '-._ : ,-' -- Isaac Asimov `^-^-_>

On Fri, Mar 24, 2017 at 09:25:11PM +0100, Wojtek Porczyk wrote:
On Fri, Mar 17, 2017 at 02:35:15PM +0100, Wojtek Porczyk wrote:
This is second attempt at merging libvirtaio, an event loop implementation which dispatches the callbacks via asyncio's event loop.
Hi, libvirt-list,
Any progress on this? Or is there some sub-list or maintainer, who I should Cc or contact directly?
Sorry, I've been away on vacation for 2 weeks so wasn't able to look at it before now. Regards, Daniel -- |: http://berrange.com -o- http://www.flickr.com/photos/dberrange/ :| |: http://libvirt.org -o- http://virt-manager.org :| |: http://entangle-photo.org -o- http://search.cpan.org/~danberr/ :|
participants (2)
-
Daniel P. Berrange
-
Wojtek Porczyk