[libvirt] Help needed: simple python api to obtain events

Would someone help me have a shrink-wrapped solution for obtaining libvirt events in python? I'm looking for a python module with no API calls (or maybe just a few). I'd like to register a callback functions with virConnect.domainEventRegister(), and want them to be called in the context of a specialized (automatically-created?) thread. I don't mind if the module has two API calls instead: register() and startPollThread(). I learned that it takes me far too long to do it on my own, and that just modifying events-python.py (attached) reaches miserable outcomes ("libvir: Domain error : invalid domain pointer in virDomainFree", and deadlocked python: #0 0x00000037fe40d174 in __lll_lock_wait () from /lib64/libpthread.so.0 #1 0x00000037fe408aca in _L_lock_1034 () from /lib64/libpthread.so.0 #2 0x00000037fe40898c in pthread_mutex_lock () from /lib64/libpthread.so.0 #3 0x0000003cce04db84 in remoteDomainEventFired () #4 0x00002b4cc96e1612 in libvirt_virEventInvokeHandleCallback () ) Help most welcome, Dan.

On Tue, Oct 06, 2009 at 07:04:29PM +0200, Dan Kenigsberg wrote:
Would someone help me have a shrink-wrapped solution for obtaining libvirt events in python?
I'm looking for a python module with no API calls (or maybe just a few).
I'd like to register a callback functions with virConnect.domainEventRegister(), and want them to be called in the context of a specialized (automatically-created?) thread. I don't mind if the module has two API calls instead: register() and startPollThread().
I learned that it takes me far too long to do it on my own, and that just modifying events-python.py (attached) reaches miserable outcomes ("libvir: Domain error : invalid domain pointer in virDomainFree", and deadlocked python: #0 0x00000037fe40d174 in __lll_lock_wait () from /lib64/libpthread.so.0 #1 0x00000037fe408aca in _L_lock_1034 () from /lib64/libpthread.so.0 #2 0x00000037fe40898c in pthread_mutex_lock () from /lib64/libpthread.so.0 #3 0x0000003cce04db84 in remoteDomainEventFired () #4 0x00002b4cc96e1612 in libvirt_virEventInvokeHandleCallback () )
Help most welcome,
After discussions on IRC, it turns out that this is being seen with the libvirt 0.6.3 release, which had known-broken events code in the python bindings. It did not do correct locking, or reference counting. The current libvirt python works with Dan's demo program. Daniel -- |: Red Hat, Engineering, London -o- http://people.redhat.com/berrange/ :| |: http://libvirt.org -o- http://virt-manager.org -o- http://ovirt.org :| |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :| |: GnuPG: 7D3B9505 -o- F3C9 553F A1DA 4AC2 5648 23C1 B3DF F742 7D3B 9505 :|

On Tue, Oct 06, 2009 at 07:04:29PM +0200, Dan Kenigsberg wrote:
Would someone help me have a shrink-wrapped solution for obtaining libvirt events in python?
I decided to re-write the demo program so that is shows a serious production kwalitee event loop implementation that can be used in real world applications. I think you'll find this much nicer :-) I propose we add this example to examples/domain-events/events-py instead of the code we currently have Daniel #!/usr/bin/python -u # # # ################################################################################# # Start off by implementing a general purpose event loop for anyones use ################################################################################# import sys import getopt import os import libvirt import select import errno import time import threading class virEventLoopPure: class virEventLoopPureHandle: def __init__(self, handle, fd, events, cb, opaque): self.handle = handle self.fd = fd self.events = events self.cb = cb self.opaque = opaque def get_id(self): return self.handle def get_fd(self): return self.fd def get_events(self): return self.events def set_events(self, events): self.events = events def dispatch(self, events): self.cb(self.handle, self.fd, events, self.opaque[0], self.opaque[1]) class virEventLoopPureTimer: def __init__(self, timer, interval, cb, opaque): self.timer = timer self.interval = interval self.cb = cb self.opaque = opaque self.lastfired = 0 def get_id(self): return self.timer def get_interval(self): return self.interval def set_interval(self, interval): self.interval = interval def get_last_fired(self): return self.lastfired def set_last_fired(self, now): self.lastfired = now def dispatch(self): self.cb(self.timer, self.opaque[0], self.opaque[1]) def __init__(self, debug=False): self.debugOn = debug self.poll = select.poll() self.pipetrick = os.pipe() self.nextHandleID = 1 self.nextTimerID = 1 self.handles = [] self.timers = [] self.quit = False self.debug("Self pipe watch %d write %d" %(self.pipetrick[0], self.pipetrick[1])) self.poll.register(self.pipetrick[0], select.POLLIN) def debug(self, msg): if self.debugOn: print msg def next_timeout(self): next = 0 for t in self.timers: last = t.get_last_fired() interval = t.get_interval() if interval < 0: continue if next == 0 or (last + interval) < next: next = last + interval return next def get_handle_by_fd(self, fd): for h in self.handles: if h.get_fd() == fd: return h return None def get_handle_by_id(self, handleID): for h in self.handles: if h.get_id() == handleID: return h return None def run_once(self): sleep = -1 next = self.next_timeout() self.debug("Next timeout due at %d" % next) if next > 0: now = int(time.time()) if now >= next: sleep = 0 else: sleep = next - now self.debug("Poll with a sleep of %d" % sleep) events = self.poll.poll(sleep) for (fd, revents) in events: if fd == self.pipetrick[0]: data = os.read(fd, 1) continue h = self.get_handle_by_fd(fd) if h: self.debug("Dispatch fd %d handle %d events %d" % (fd, h.get_id(), revents)) h.dispatch(self.events_from_poll(revents)) now = int(time.time()) for t in self.timers: interval = t.get_interval() if interval < 0: continue want = t.get_last_fired() + interval # Deduct 20ms, since schedular timeslice # means we could be ever so slightly early if now >= (want-20): self.debug("Dispatch timer %d now %s want %s" % (t.get_id(), str(now), str(want))) t.set_last_fired(now) t.dispatch() def run_loop(self): self.quit = False while not self.quit: self.run_once() def interrupt(self): os.write(self.pipetrick[1], 'c') def add_handle(self, fd, events, cb, opaque): handleID = self.nextHandleID + 1 self.nextHandleID = self.nextHandleID + 1 h = self.virEventLoopPureHandle(handleID, fd, events, cb, opaque) self.handles.append(h) self.poll.register(fd, self.events_to_poll(events)) self.interrupt() self.debug("Add handle %d fd %d events %d" % (handleID, fd, events)) return handleID def add_timer(self, interval, cb, opaque): timerID = self.nextTimerID + 1 self.nextTimerID = self.nextTimerID + 1 h = self.virEventLoopPureTimer(timerID, interval, cb, opaque) self.timers.append(h) self.interrupt() self.debug("Add timer %d interval %d" % (timerID, interval)) return timerID def update_handle(self, handleID, events): h = self.get_handle_by_id(handleID) if h: h.set_events(events) self.poll.unregister(h.get_fd()) self.poll.register(h.get_fd(), self.events_to_poll(events)) self.interrupt() self.debug("Update handle %d fd %d events %d" % (handleID, h.get_fd(), events)) def update_timer(self, timerID, interval): for h in self.timers: if h.get_id() == timerID: h.set_interval(interval); self.interrupt() self.debug("Update timer %d interval %d" % (timerID, interval)) break def remove_handle(self, handleID): handles = [] for h in self.handles: if h.get_id() == handleID: self.poll.unregister(h.get_fd()) self.debug("Remove handle %d fd %d" % (handleID, h.get_fd())) else: handles.append(h) self.handles = handles self.interrupt() def remove_timer(self, timerID): timers = [] for h in self.timers: if h.get_id() != timerID: timers.append(h) self.debug("Remove timer %d" % timerID) self.timers = timers self.interrupt() def events_to_poll(self, events): ret = 0 if events & libvirt.VIR_EVENT_HANDLE_READABLE: ret |= select.POLLIN if events & libvirt.VIR_EVENT_HANDLE_WRITABLE: ret |= select.POLLOUT if events & libvirt.VIR_EVENT_HANDLE_ERROR: ret |= select.POLLERR; if events & libvirt.VIR_EVENT_HANDLE_HANGUP: ret |= select.POLLHUP; return ret def events_from_poll(self, events): ret = 0; if events & select.POLLIN: ret |= libvirt.VIR_EVENT_HANDLE_READABLE; if events & select.POLLOUT: ret |= libvirt.VIR_EVENT_HANDLE_WRITABLE; if events & select.POLLNVAL: ret |= libvirt.VIR_EVENT_HANDLE_ERROR; if events & select.POLLERR: ret |= libvirt.VIR_EVENT_HANDLE_ERROR; if events & select.POLLHUP: ret |= libvirt.VIR_EVENT_HANDLE_HANGUP; return ret; ########################################################################### # Now glue an instance of the general event loop into libvirt's event loop ########################################################################### eventLoop = virEventLoopPure(debug=False) eventLoopThread = None def virEventAddHandleImpl(fd, events, cb, opaque): global eventLoop return eventLoop.add_handle(fd, events, cb, opaque) def virEventUpdateHandleImpl(handleID, events): global eventLoop return eventLoop.update_handle(handleID, events) def virEventRemoveHandleImpl(handleID): global eventLoop return eventLoop.remove_handle(handleID) def virEventAddTimerImpl(interval, cb, opaque): global eventLoop return eventLoop.add_timer(interval, cb, opaque) def virEventUpdateTimerImpl(timerID, interval): global eventLopo return eventLoop.update_timer(timerID, interval) def virEventRemoveTimerImpl(timerID): global eventLoop return eventLoop.remove_timer(timerID) def virEventLoopPureRegister(): libvirt.virEventRegisterImpl(virEventAddHandleImpl, virEventUpdateHandleImpl, virEventRemoveHandleImpl, virEventAddTimerImpl, virEventUpdateTimerImpl, virEventRemoveTimerImpl) def virEventLoopPureRun(): global eventLoop eventLoop.run_loop() def virEventLoopPureStart(): global eventLoopThread virEventLoopPureRegister() eventLoopThread = threading.Thread(target=virEventLoopPureRun, name="libvirtEventLoop") eventLoopThread.setDaemon(True) eventLoopThread.start() ########################################################################## # Everything that now follows is a simple demo of domain lifecycle events ########################################################################## def eventToString(event): eventStrings = ( "Added", "Removed", "Started", "Suspended", "Resumed", "Stopped", "Saved", "Restored" ); return eventStrings[event]; def myDomainEventCallback1 (conn, dom, event, detail, opaque): print "myDomainEventCallback1 EVENT: Domain %s(%s) %s %d" % (dom.name(), dom.ID(), eventToString(event), detail) def myDomainEventCallback2 (conn, dom, event, detail, opaque): print "myDomainEventCallback2 EVENT: Domain %s(%s) %s %d" % (dom.name(), dom.ID(), eventToString(event), detail) def usage(): print "usage: "+os.path.basename(sys.argv[0])+" [uri]" print " uri will default to qemu:///system" def main(): try: opts, args = getopt.getopt(sys.argv[1:], "h", ["help"] ) except getopt.GetoptError, err: # print help information and exit: print str(err) # will print something like "option -a not recognized" usage() sys.exit(2) for o, a in opts: if o in ("-h", "--help"): usage() sys.exit() if len(sys.argv) > 1: uri = sys.argv[1] else: uri = "qemu:///system" print "Using uri:" + uri # Run a background thread with the event loop virEventLoopPureStart() vc = libvirt.open(uri) # Close connection on exit (to test cleanup paths) old_exitfunc = getattr(sys, 'exitfunc', None) def exit(): print "Closing " + str(vc) vc.close() if (old_exitfunc): old_exitfunc() sys.exitfunc = exit #Add 2 callbacks to prove this works with more than just one vc.domainEventRegister(myDomainEventCallback1,None) vc.domainEventRegister(myDomainEventCallback2,None) # The rest of your app would go here normally, but for sake # of demo we'll just go to sleep. The other option is to # run the event loop in your main thread if your app is # totally event based. while 1: time.sleep(1) if __name__ == "__main__": main() -- |: Red Hat, Engineering, London -o- http://people.redhat.com/berrange/ :| |: http://libvirt.org -o- http://virt-manager.org -o- http://ovirt.org :| |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :| |: GnuPG: 7D3B9505 -o- F3C9 553F A1DA 4AC2 5648 23C1 B3DF F742 7D3B 9505 :|

On Wed, Oct 07, 2009 at 12:59:39PM +0100, Daniel P. Berrange wrote:
On Tue, Oct 06, 2009 at 07:04:29PM +0200, Dan Kenigsberg wrote:
Would someone help me have a shrink-wrapped solution for obtaining libvirt events in python?
I decided to re-write the demo program so that is shows a serious production kwalitee event loop implementation that can be used in real world applications. I think you'll find this much nicer :-)
I propose we add this example to examples/domain-events/events-py instead of the code we currently have
ACK, I would just add a bit of comment about the pipe trick, which is something which may be usual for us (in C) but a bit strange when you come from Python. Daniel -- Daniel Veillard | libxml Gnome XML XSLT toolkit http://xmlsoft.org/ daniel@veillard.com | Rpmfind RPM search engine http://rpmfind.net/ http://veillard.com/ | virtualization library http://libvirt.org/

On Wed, Oct 07, 2009 at 12:59:39PM +0100, Daniel P. Berrange wrote:
On Tue, Oct 06, 2009 at 07:04:29PM +0200, Dan Kenigsberg wrote:
Would someone help me have a shrink-wrapped solution for obtaining libvirt events in python?
I decided to re-write the demo program so that is shows a serious production kwalitee event loop implementation that can be used in real world applications. I think you'll find this much nicer :-)
It sure look nicer (though I still don't get the hows and whys). However, it seems that you have an issue with python's time.time() measured in seconds, not milliseconds.
I propose we add this example to examples/domain-events/events-py instead of the code we currently have
Daniel
#!/usr/bin/python -u # # # ################################################################################# # Start off by implementing a general purpose event loop for anyones use #################################################################################
import sys import getopt import os import libvirt import select import errno import time import threading
class virEventLoopPure: class virEventLoopPureHandle: def __init__(self, handle, fd, events, cb, opaque): self.handle = handle self.fd = fd self.events = events self.cb = cb self.opaque = opaque
def get_id(self): return self.handle
def get_fd(self): return self.fd
def get_events(self): return self.events
def set_events(self, events): self.events = events
def dispatch(self, events): self.cb(self.handle, self.fd, events, self.opaque[0], self.opaque[1])
class virEventLoopPureTimer: def __init__(self, timer, interval, cb, opaque): self.timer = timer self.interval = interval self.cb = cb self.opaque = opaque self.lastfired = 0
def get_id(self): return self.timer
def get_interval(self): return self.interval
def set_interval(self, interval): self.interval = interval
def get_last_fired(self): return self.lastfired
def set_last_fired(self, now): self.lastfired = now
def dispatch(self): self.cb(self.timer, self.opaque[0], self.opaque[1])
def __init__(self, debug=False): self.debugOn = debug self.poll = select.poll() self.pipetrick = os.pipe() self.nextHandleID = 1 self.nextTimerID = 1 self.handles = [] self.timers = [] self.quit = False
self.debug("Self pipe watch %d write %d" %(self.pipetrick[0], self.pipetrick[1])) self.poll.register(self.pipetrick[0], select.POLLIN)
def debug(self, msg): if self.debugOn: print msg
def next_timeout(self): next = 0 for t in self.timers: last = t.get_last_fired() interval = t.get_interval() if interval < 0: continue if next == 0 or (last + interval) < next: next = last + interval
return next
def get_handle_by_fd(self, fd): for h in self.handles: if h.get_fd() == fd: return h return None
def get_handle_by_id(self, handleID): for h in self.handles: if h.get_id() == handleID: return h return None
def run_once(self): sleep = -1 next = self.next_timeout() self.debug("Next timeout due at %d" % next) if next > 0: now = int(time.time())
maybe you don't care being one second late
if now >= next: sleep = 0 else: sleep = next - now
self.debug("Poll with a sleep of %d" % sleep) events = self.poll.poll(sleep)
for (fd, revents) in events: if fd == self.pipetrick[0]: data = os.read(fd, 1) continue
h = self.get_handle_by_fd(fd) if h: self.debug("Dispatch fd %d handle %d events %d" % (fd, h.get_id(), revents)) h.dispatch(self.events_from_poll(revents))
now = int(time.time()) for t in self.timers: interval = t.get_interval() if interval < 0: continue
want = t.get_last_fired() + interval # Deduct 20ms, since schedular timeslice # means we could be ever so slightly early
but here, you compare apples and oranges.
if now >= (want-20): self.debug("Dispatch timer %d now %s want %s" % (t.get_id(), str(now), str(want))) t.set_last_fired(now) t.dispatch()
def run_loop(self): self.quit = False while not self.quit: self.run_once()
def interrupt(self): os.write(self.pipetrick[1], 'c')
def add_handle(self, fd, events, cb, opaque): handleID = self.nextHandleID + 1 self.nextHandleID = self.nextHandleID + 1
h = self.virEventLoopPureHandle(handleID, fd, events, cb, opaque) self.handles.append(h)
self.poll.register(fd, self.events_to_poll(events)) self.interrupt()
self.debug("Add handle %d fd %d events %d" % (handleID, fd, events))
return handleID
def add_timer(self, interval, cb, opaque): timerID = self.nextTimerID + 1 self.nextTimerID = self.nextTimerID + 1
h = self.virEventLoopPureTimer(timerID, interval, cb, opaque) self.timers.append(h) self.interrupt()
self.debug("Add timer %d interval %d" % (timerID, interval))
return timerID
def update_handle(self, handleID, events): h = self.get_handle_by_id(handleID) if h: h.set_events(events) self.poll.unregister(h.get_fd()) self.poll.register(h.get_fd(), self.events_to_poll(events)) self.interrupt()
self.debug("Update handle %d fd %d events %d" % (handleID, h.get_fd(), events))
def update_timer(self, timerID, interval): for h in self.timers: if h.get_id() == timerID: h.set_interval(interval); self.interrupt()
self.debug("Update timer %d interval %d" % (timerID, interval)) break
def remove_handle(self, handleID): handles = [] for h in self.handles: if h.get_id() == handleID: self.poll.unregister(h.get_fd()) self.debug("Remove handle %d fd %d" % (handleID, h.get_fd())) else: handles.append(h) self.handles = handles self.interrupt()
def remove_timer(self, timerID): timers = [] for h in self.timers: if h.get_id() != timerID: timers.append(h) self.debug("Remove timer %d" % timerID) self.timers = timers self.interrupt()
def events_to_poll(self, events): ret = 0 if events & libvirt.VIR_EVENT_HANDLE_READABLE: ret |= select.POLLIN if events & libvirt.VIR_EVENT_HANDLE_WRITABLE: ret |= select.POLLOUT if events & libvirt.VIR_EVENT_HANDLE_ERROR: ret |= select.POLLERR; if events & libvirt.VIR_EVENT_HANDLE_HANGUP: ret |= select.POLLHUP; return ret
def events_from_poll(self, events): ret = 0; if events & select.POLLIN: ret |= libvirt.VIR_EVENT_HANDLE_READABLE; if events & select.POLLOUT: ret |= libvirt.VIR_EVENT_HANDLE_WRITABLE; if events & select.POLLNVAL: ret |= libvirt.VIR_EVENT_HANDLE_ERROR; if events & select.POLLERR: ret |= libvirt.VIR_EVENT_HANDLE_ERROR; if events & select.POLLHUP: ret |= libvirt.VIR_EVENT_HANDLE_HANGUP; return ret;
########################################################################### # Now glue an instance of the general event loop into libvirt's event loop ###########################################################################
eventLoop = virEventLoopPure(debug=False) eventLoopThread = None
def virEventAddHandleImpl(fd, events, cb, opaque): global eventLoop return eventLoop.add_handle(fd, events, cb, opaque)
def virEventUpdateHandleImpl(handleID, events): global eventLoop return eventLoop.update_handle(handleID, events)
def virEventRemoveHandleImpl(handleID): global eventLoop return eventLoop.remove_handle(handleID)
def virEventAddTimerImpl(interval, cb, opaque): global eventLoop return eventLoop.add_timer(interval, cb, opaque)
def virEventUpdateTimerImpl(timerID, interval): global eventLopo return eventLoop.update_timer(timerID, interval)
def virEventRemoveTimerImpl(timerID): global eventLoop return eventLoop.remove_timer(timerID)
def virEventLoopPureRegister(): libvirt.virEventRegisterImpl(virEventAddHandleImpl, virEventUpdateHandleImpl, virEventRemoveHandleImpl, virEventAddTimerImpl, virEventUpdateTimerImpl, virEventRemoveTimerImpl)
def virEventLoopPureRun(): global eventLoop eventLoop.run_loop()
def virEventLoopPureStart(): global eventLoopThread virEventLoopPureRegister() eventLoopThread = threading.Thread(target=virEventLoopPureRun, name="libvirtEventLoop") eventLoopThread.setDaemon(True) eventLoopThread.start()
########################################################################## # Everything that now follows is a simple demo of domain lifecycle events ########################################################################## def eventToString(event): eventStrings = ( "Added", "Removed", "Started", "Suspended", "Resumed", "Stopped", "Saved", "Restored" ); return eventStrings[event];
def myDomainEventCallback1 (conn, dom, event, detail, opaque): print "myDomainEventCallback1 EVENT: Domain %s(%s) %s %d" % (dom.name(), dom.ID(), eventToString(event), detail)
def myDomainEventCallback2 (conn, dom, event, detail, opaque): print "myDomainEventCallback2 EVENT: Domain %s(%s) %s %d" % (dom.name(), dom.ID(), eventToString(event), detail)
def usage(): print "usage: "+os.path.basename(sys.argv[0])+" [uri]" print " uri will default to qemu:///system"
def main(): try: opts, args = getopt.getopt(sys.argv[1:], "h", ["help"] ) except getopt.GetoptError, err: # print help information and exit: print str(err) # will print something like "option -a not recognized" usage() sys.exit(2) for o, a in opts: if o in ("-h", "--help"): usage() sys.exit()
if len(sys.argv) > 1: uri = sys.argv[1] else: uri = "qemu:///system"
print "Using uri:" + uri
# Run a background thread with the event loop virEventLoopPureStart()
vc = libvirt.open(uri)
# Close connection on exit (to test cleanup paths) old_exitfunc = getattr(sys, 'exitfunc', None) def exit(): print "Closing " + str(vc) vc.close() if (old_exitfunc): old_exitfunc() sys.exitfunc = exit
#Add 2 callbacks to prove this works with more than just one vc.domainEventRegister(myDomainEventCallback1,None) vc.domainEventRegister(myDomainEventCallback2,None)
# The rest of your app would go here normally, but for sake # of demo we'll just go to sleep. The other option is to # run the event loop in your main thread if your app is # totally event based. while 1: time.sleep(1)
if __name__ == "__main__": main()
-- |: Red Hat, Engineering, London -o- http://people.redhat.com/berrange/ :| |: http://libvirt.org -o- http://virt-manager.org -o- http://ovirt.org :| |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :| |: GnuPG: 7D3B9505 -o- F3C9 553F A1DA 4AC2 5648 23C1 B3DF F742 7D3B 9505 :|

On Fri, Oct 09, 2009 at 10:19:08AM +0200, Dan Kenigsberg wrote:
On Wed, Oct 07, 2009 at 12:59:39PM +0100, Daniel P. Berrange wrote:
On Tue, Oct 06, 2009 at 07:04:29PM +0200, Dan Kenigsberg wrote:
Would someone help me have a shrink-wrapped solution for obtaining libvirt events in python?
I decided to re-write the demo program so that is shows a serious production kwalitee event loop implementation that can be used in real world applications. I think you'll find this much nicer :-)
It sure look nicer (though I still don't get the hows and whys). However, it seems that you have an issue with python's time.time() measured in seconds, not milliseconds.
Oppps, that's a nice mistake :-) I will try and add some documentation for what its doing too. Daniel -- |: Red Hat, Engineering, London -o- http://people.redhat.com/berrange/ :| |: http://libvirt.org -o- http://virt-manager.org -o- http://ovirt.org :| |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :| |: GnuPG: 7D3B9505 -o- F3C9 553F A1DA 4AC2 5648 23C1 B3DF F742 7D3B 9505 :|

On Fri, Oct 09, 2009 at 10:19:08AM +0200, Dan Kenigsberg wrote:
On Wed, Oct 07, 2009 at 12:59:39PM +0100, Daniel P. Berrange wrote:
On Tue, Oct 06, 2009 at 07:04:29PM +0200, Dan Kenigsberg wrote:
Would someone help me have a shrink-wrapped solution for obtaining libvirt events in python?
I decided to re-write the demo program so that is shows a serious production kwalitee event loop implementation that can be used in real world applications. I think you'll find this much nicer :-)
It sure look nicer (though I still don't get the hows and whys). However, it seems that you have an issue with python's time.time() measured in seconds, not milliseconds.
Here's a version with more docs & the time in milliseconds #!/usr/bin/python -u # # # ################################################################################# # Start off by implementing a general purpose event loop for anyones use ################################################################################# import sys import getopt import os import libvirt import select import errno import time import threading # # This general purpose event loop will support waiting for file handle # I/O and errors events, as well as scheduling repeatable timers with # a fixed interval. # # It is a pure python implementation based around the poll() API # class virEventLoopPure: # This class contains the data we need to track for a # single file handle class virEventLoopPureHandle: def __init__(self, handle, fd, events, cb, opaque): self.handle = handle self.fd = fd self.events = events self.cb = cb self.opaque = opaque def get_id(self): return self.handle def get_fd(self): return self.fd def get_events(self): return self.events def set_events(self, events): self.events = events def dispatch(self, events): self.cb(self.handle, self.fd, events, self.opaque[0], self.opaque[1]) # This class contains the data we need to track for a # single periodic timer class virEventLoopPureTimer: def __init__(self, timer, interval, cb, opaque): self.timer = timer self.interval = interval self.cb = cb self.opaque = opaque self.lastfired = 0 def get_id(self): return self.timer def get_interval(self): return self.interval def set_interval(self, interval): self.interval = interval def get_last_fired(self): return self.lastfired def set_last_fired(self, now): self.lastfired = now def dispatch(self): self.cb(self.timer, self.opaque[0], self.opaque[1]) def __init__(self, debug=False): self.debugOn = debug self.poll = select.poll() self.pipetrick = os.pipe() self.nextHandleID = 1 self.nextTimerID = 1 self.handles = [] self.timers = [] self.quit = False # The event loop can be used from multiple threads at once. # Specifically while the main thread is sleeping in poll() # waiting for events to occur, another thread may come along # and add/update/remove a file handle, or timer. When this # happens we need to interrupt the poll() sleep in the other # thread, so that it'll see the file handle / timer changes. # # Using OS level signals for this is very unreliable and # hard to implement correctly. Thus we use the real classic # "self pipe" trick. A anonymous pipe, with one end registered # with the event loop for input events. When we need to force # the main thread out of a poll() sleep, we simple write a # single byte of data to the other end of the pipe. self.debug("Self pipe watch %d write %d" %(self.pipetrick[0], self.pipetrick[1])) self.poll.register(self.pipetrick[0], select.POLLIN) def debug(self, msg): if self.debugOn: print msg # Calculate when the next timeout is due to occurr, returning # the absolute timestamp for the next timeout, or 0 if there is # no timeout due def next_timeout(self): next = 0 for t in self.timers: last = t.get_last_fired() interval = t.get_interval() if interval < 0: continue if next == 0 or (last + interval) < next: next = last + interval return next # Lookup a virEventLoopPureHandle object based on file descriptor def get_handle_by_fd(self, fd): for h in self.handles: if h.get_fd() == fd: return h return None # Lookup a virEventLoopPureHandle object based on its event loop ID def get_handle_by_id(self, handleID): for h in self.handles: if h.get_id() == handleID: return h return None # This is the heart of the event loop, performing one single # iteration. It asks when the next timeout is due, and then # calcuates the maximum amount of time it is able to sleep # for in poll() pending file handle events. # # It then goes into the poll() sleep. # # When poll() returns, there will zero or more file handle # events which need to be dispatched to registered callbacks # It may also be time to fire some periodic timers. # # Due to the coarse granularity of schedular timeslices, if # we ask for a sleep of 500ms in order to satisfy a timer, we # may return upto 1 schedular timeslice early. So even though # our sleep timeout was reached, the registered timer may not # technically be at its expiry point. This leads to us going # back around the loop with a crazy 5ms sleep. So when checking # if timeouts are due, we allow a margin of 20ms, to avoid # these pointless repeated tiny sleeps. def run_once(self): sleep = -1 next = self.next_timeout() self.debug("Next timeout due at %d" % next) if next > 0: now = int(time.time() * 1000) if now >= next: sleep = 0 else: sleep = next - now self.debug("Poll with a sleep of %d" % sleep) events = self.poll.poll(sleep) # Dispatch any file handle events that occurred for (fd, revents) in events: # See if the events was from the self-pipe # telling us to wakup. if so, then discard # the data just continue if fd == self.pipetrick[0]: data = os.read(fd, 1) continue h = self.get_handle_by_fd(fd) if h: self.debug("Dispatch fd %d handle %d events %d" % (fd, h.get_id(), revents)) h.dispatch(self.events_from_poll(revents)) now = int(time.time() * 1000) for t in self.timers: interval = t.get_interval() if interval < 0: continue want = t.get_last_fired() + interval # Deduct 20ms, since schedular timeslice # means we could be ever so slightly early if now >= (want-20): self.debug("Dispatch timer %d now %s want %s" % (t.get_id(), str(now), str(want))) t.set_last_fired(now) t.dispatch() # Actually the event loop forever def run_loop(self): self.quit = False while not self.quit: self.run_once() def interrupt(self): os.write(self.pipetrick[1], 'c') # Registers a new file handle 'fd', monitoring for 'events' (libvirt # event constants), firing the callback cb() when an event occurs. # Returns a unique integer identier for this handle, that should be # used to later update/remove it def add_handle(self, fd, events, cb, opaque): handleID = self.nextHandleID + 1 self.nextHandleID = self.nextHandleID + 1 h = self.virEventLoopPureHandle(handleID, fd, events, cb, opaque) self.handles.append(h) self.poll.register(fd, self.events_to_poll(events)) self.interrupt() self.debug("Add handle %d fd %d events %d" % (handleID, fd, events)) return handleID # Registers a new timer with periodic expiry at 'interval' ms, # firing cb() each time the timer expires. If 'interval' is -1, # then the timer is registered, but not enabled # Returns a unique integer identier for this handle, that should be # used to later update/remove it def add_timer(self, interval, cb, opaque): timerID = self.nextTimerID + 1 self.nextTimerID = self.nextTimerID + 1 h = self.virEventLoopPureTimer(timerID, interval, cb, opaque) self.timers.append(h) self.interrupt() self.debug("Add timer %d interval %d" % (timerID, interval)) return timerID # Change the set of events to be monitored on the file handle def update_handle(self, handleID, events): h = self.get_handle_by_id(handleID) if h: h.set_events(events) self.poll.unregister(h.get_fd()) self.poll.register(h.get_fd(), self.events_to_poll(events)) self.interrupt() self.debug("Update handle %d fd %d events %d" % (handleID, h.get_fd(), events)) # Change the periodic frequency of the timer def update_timer(self, timerID, interval): for h in self.timers: if h.get_id() == timerID: h.set_interval(interval); self.interrupt() self.debug("Update timer %d interval %d" % (timerID, interval)) break # Stop monitoring for events on the file handle def remove_handle(self, handleID): handles = [] for h in self.handles: if h.get_id() == handleID: self.poll.unregister(h.get_fd()) self.debug("Remove handle %d fd %d" % (handleID, h.get_fd())) else: handles.append(h) self.handles = handles self.interrupt() # Stop firing the periodic timer def remove_timer(self, timerID): timers = [] for h in self.timers: if h.get_id() != timerID: timers.append(h) self.debug("Remove timer %d" % timerID) self.timers = timers self.interrupt() # Convert from libvirt event constants, to poll() events constants def events_to_poll(self, events): ret = 0 if events & libvirt.VIR_EVENT_HANDLE_READABLE: ret |= select.POLLIN if events & libvirt.VIR_EVENT_HANDLE_WRITABLE: ret |= select.POLLOUT if events & libvirt.VIR_EVENT_HANDLE_ERROR: ret |= select.POLLERR; if events & libvirt.VIR_EVENT_HANDLE_HANGUP: ret |= select.POLLHUP; return ret # Convert from poll() event constants, to libvirt events constants def events_from_poll(self, events): ret = 0; if events & select.POLLIN: ret |= libvirt.VIR_EVENT_HANDLE_READABLE; if events & select.POLLOUT: ret |= libvirt.VIR_EVENT_HANDLE_WRITABLE; if events & select.POLLNVAL: ret |= libvirt.VIR_EVENT_HANDLE_ERROR; if events & select.POLLERR: ret |= libvirt.VIR_EVENT_HANDLE_ERROR; if events & select.POLLHUP: ret |= libvirt.VIR_EVENT_HANDLE_HANGUP; return ret; ########################################################################### # Now glue an instance of the general event loop into libvirt's event loop ########################################################################### # This single global instance of the event loop wil be used for # monitoring libvirt events eventLoop = virEventLoopPure(debug=False) # This keeps track of what thread is running the event loop, # (if it is run in a background thread) eventLoopThread = None # These next set of 6 methods are the glue between the official # libvirt events API, and our particular impl of the event loop # # There is no reason why the 'virEventLoopPure' has to be used. # An application could easily may these 6 glue methods hook into # another event loop such as GLib's, or something like the python # Twisted event framework. def virEventAddHandleImpl(fd, events, cb, opaque): global eventLoop return eventLoop.add_handle(fd, events, cb, opaque) def virEventUpdateHandleImpl(handleID, events): global eventLoop return eventLoop.update_handle(handleID, events) def virEventRemoveHandleImpl(handleID): global eventLoop return eventLoop.remove_handle(handleID) def virEventAddTimerImpl(interval, cb, opaque): global eventLoop return eventLoop.add_timer(interval, cb, opaque) def virEventUpdateTimerImpl(timerID, interval): global eventLoop return eventLoop.update_timer(timerID, interval) def virEventRemoveTimerImpl(timerID): global eventLoop return eventLoop.remove_timer(timerID) # This tells libvirt what event loop implementation it # should use def virEventLoopPureRegister(): libvirt.virEventRegisterImpl(virEventAddHandleImpl, virEventUpdateHandleImpl, virEventRemoveHandleImpl, virEventAddTimerImpl, virEventUpdateTimerImpl, virEventRemoveTimerImpl) # Directly run the event loop in the current thread def virEventLoopPureRun(): global eventLoop eventLoop.run_loop() # Spawn a background thread to run the event loop def virEventLoopPureStart(): global eventLoopThread virEventLoopPureRegister() eventLoopThread = threading.Thread(target=virEventLoopPureRun, name="libvirtEventLoop") eventLoopThread.setDaemon(True) eventLoopThread.start() ########################################################################## # Everything that now follows is a simple demo of domain lifecycle events ########################################################################## def eventToString(event): eventStrings = ( "Added", "Removed", "Started", "Suspended", "Resumed", "Stopped", "Saved", "Restored" ); return eventStrings[event]; def myDomainEventCallback1 (conn, dom, event, detail, opaque): print "myDomainEventCallback1 EVENT: Domain %s(%s) %s %d" % (dom.name(), dom.ID(), eventToString(event), detail) def myDomainEventCallback2 (conn, dom, event, detail, opaque): print "myDomainEventCallback2 EVENT: Domain %s(%s) %s %d" % (dom.name(), dom.ID(), eventToString(event), detail) def usage(): print "usage: "+os.path.basename(sys.argv[0])+" [uri]" print " uri will default to qemu:///system" def main(): try: opts, args = getopt.getopt(sys.argv[1:], "h", ["help"] ) except getopt.GetoptError, err: # print help information and exit: print str(err) # will print something like "option -a not recognized" usage() sys.exit(2) for o, a in opts: if o in ("-h", "--help"): usage() sys.exit() if len(sys.argv) > 1: uri = sys.argv[1] else: uri = "qemu:///system" print "Using uri:" + uri # Run a background thread with the event loop virEventLoopPureStart() vc = libvirt.open(uri) # Close connection on exit (to test cleanup paths) old_exitfunc = getattr(sys, 'exitfunc', None) def exit(): print "Closing " + str(vc) vc.close() if (old_exitfunc): old_exitfunc() sys.exitfunc = exit #Add 2 callbacks to prove this works with more than just one vc.domainEventRegister(myDomainEventCallback1,None) vc.domainEventRegister(myDomainEventCallback2,None) # The rest of your app would go here normally, but for sake # of demo we'll just go to sleep. The other option is to # run the event loop in your main thread if your app is # totally event based. while 1: time.sleep(1) if __name__ == "__main__": main() -- |: Red Hat, Engineering, London -o- http://people.redhat.com/berrange/ :| |: http://libvirt.org -o- http://virt-manager.org -o- http://ovirt.org :| |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :| |: GnuPG: 7D3B9505 -o- F3C9 553F A1DA 4AC2 5648 23C1 B3DF F742 7D3B 9505 :|

On Fri, Oct 09, 2009 at 09:58:14AM +0100, Daniel P. Berrange wrote:
On Fri, Oct 09, 2009 at 10:19:08AM +0200, Dan Kenigsberg wrote:
On Wed, Oct 07, 2009 at 12:59:39PM +0100, Daniel P. Berrange wrote:
On Tue, Oct 06, 2009 at 07:04:29PM +0200, Dan Kenigsberg wrote:
Would someone help me have a shrink-wrapped solution for obtaining libvirt events in python?
I decided to re-write the demo program so that is shows a serious production kwalitee event loop implementation that can be used in real world applications. I think you'll find this much nicer :-)
It sure look nicer (though I still don't get the hows and whys). However, it seems that you have an issue with python's time.time() measured in seconds, not milliseconds.
Here's a version with more docs & the time in milliseconds
ACK ! Thanks ! Daniel -- Daniel Veillard | libxml Gnome XML XSLT toolkit http://xmlsoft.org/ daniel@veillard.com | Rpmfind RPM search engine http://rpmfind.net/ http://veillard.com/ | virtualization library http://libvirt.org/

On Fri, Oct 09, 2009 at 09:58:14AM +0100, Daniel P. Berrange wrote:
On Fri, Oct 09, 2009 at 10:19:08AM +0200, Dan Kenigsberg wrote:
On Wed, Oct 07, 2009 at 12:59:39PM +0100, Daniel P. Berrange wrote:
On Tue, Oct 06, 2009 at 07:04:29PM +0200, Dan Kenigsberg wrote:
Would someone help me have a shrink-wrapped solution for obtaining libvirt events in python?
I decided to re-write the demo program so that is shows a serious production kwalitee event loop implementation that can be used in real world applications. I think you'll find this much nicer :-)
It sure look nicer (though I still don't get the hows and whys). However, it seems that you have an issue with python's time.time() measured in seconds, not milliseconds.
Here's a version with more docs & the time in milliseconds
#!/usr/bin/python -u # # # ################################################################################# # Start off by implementing a general purpose event loop for anyones use #################################################################################
import sys import getopt import os import libvirt import select import errno import time import threading
# # This general purpose event loop will support waiting for file handle # I/O and errors events, as well as scheduling repeatable timers with # a fixed interval. # # It is a pure python implementation based around the poll() API # class virEventLoopPure: # This class contains the data we need to track for a # single file handle class virEventLoopPureHandle: def __init__(self, handle, fd, events, cb, opaque): self.handle = handle self.fd = fd self.events = events self.cb = cb self.opaque = opaque
def get_id(self): return self.handle
def get_fd(self): return self.fd
def get_events(self): return self.events
def set_events(self, events): self.events = events
def dispatch(self, events): self.cb(self.handle, self.fd, events, self.opaque[0], self.opaque[1])
# This class contains the data we need to track for a # single periodic timer class virEventLoopPureTimer: def __init__(self, timer, interval, cb, opaque): self.timer = timer self.interval = interval self.cb = cb self.opaque = opaque self.lastfired = 0
def get_id(self): return self.timer
def get_interval(self): return self.interval
def set_interval(self, interval): self.interval = interval
def get_last_fired(self): return self.lastfired
def set_last_fired(self, now): self.lastfired = now
def dispatch(self): self.cb(self.timer, self.opaque[0], self.opaque[1])
def __init__(self, debug=False): self.debugOn = debug self.poll = select.poll() self.pipetrick = os.pipe() self.nextHandleID = 1 self.nextTimerID = 1 self.handles = [] self.timers = [] self.quit = False
# The event loop can be used from multiple threads at once. # Specifically while the main thread is sleeping in poll() # waiting for events to occur, another thread may come along # and add/update/remove a file handle, or timer. When this # happens we need to interrupt the poll() sleep in the other # thread, so that it'll see the file handle / timer changes. # # Using OS level signals for this is very unreliable and # hard to implement correctly. Thus we use the real classic # "self pipe" trick. A anonymous pipe, with one end registered # with the event loop for input events. When we need to force # the main thread out of a poll() sleep, we simple write a # single byte of data to the other end of the pipe. self.debug("Self pipe watch %d write %d" %(self.pipetrick[0], self.pipetrick[1])) self.poll.register(self.pipetrick[0], select.POLLIN)
def debug(self, msg): if self.debugOn: print msg
# Calculate when the next timeout is due to occurr, returning # the absolute timestamp for the next timeout, or 0 if there is # no timeout due def next_timeout(self): next = 0 for t in self.timers: last = t.get_last_fired() interval = t.get_interval() if interval < 0: continue if next == 0 or (last + interval) < next: next = last + interval
return next
# Lookup a virEventLoopPureHandle object based on file descriptor def get_handle_by_fd(self, fd): for h in self.handles: if h.get_fd() == fd: return h return None
# Lookup a virEventLoopPureHandle object based on its event loop ID def get_handle_by_id(self, handleID): for h in self.handles: if h.get_id() == handleID: return h return None
# This is the heart of the event loop, performing one single # iteration. It asks when the next timeout is due, and then # calcuates the maximum amount of time it is able to sleep # for in poll() pending file handle events. # # It then goes into the poll() sleep. # # When poll() returns, there will zero or more file handle # events which need to be dispatched to registered callbacks # It may also be time to fire some periodic timers. # # Due to the coarse granularity of schedular timeslices, if # we ask for a sleep of 500ms in order to satisfy a timer, we # may return upto 1 schedular timeslice early. So even though # our sleep timeout was reached, the registered timer may not # technically be at its expiry point. This leads to us going # back around the loop with a crazy 5ms sleep. So when checking # if timeouts are due, we allow a margin of 20ms, to avoid # these pointless repeated tiny sleeps. def run_once(self): sleep = -1 next = self.next_timeout() self.debug("Next timeout due at %d" % next) if next > 0: now = int(time.time() * 1000) if now >= next: sleep = 0 else: sleep = next - now
self.debug("Poll with a sleep of %d" % sleep) events = self.poll.poll(sleep)
Thanks! but there's still a units problem here (should be sleep/1000. ) | poll(...) | poll([timeout=-1[, maxevents=-1]]) -> [(fd, events), (...)] | | Wait for events on the epoll file descriptor for a maximum time of timeout | in seconds (as float). -1 makes poll wait indefinitely. | Up to maxevents are returned to the caller.
# Dispatch any file handle events that occurred for (fd, revents) in events: # See if the events was from the self-pipe # telling us to wakup. if so, then discard # the data just continue if fd == self.pipetrick[0]: data = os.read(fd, 1) continue
h = self.get_handle_by_fd(fd) if h: self.debug("Dispatch fd %d handle %d events %d" % (fd, h.get_id(), revents)) h.dispatch(self.events_from_poll(revents))
now = int(time.time() * 1000) for t in self.timers: interval = t.get_interval() if interval < 0: continue
want = t.get_last_fired() + interval # Deduct 20ms, since schedular timeslice # means we could be ever so slightly early if now >= (want-20): self.debug("Dispatch timer %d now %s want %s" % (t.get_id(), str(now), str(want))) t.set_last_fired(now) t.dispatch()
# Actually the event loop forever def run_loop(self): self.quit = False while not self.quit: self.run_once()
def interrupt(self): os.write(self.pipetrick[1], 'c')
# Registers a new file handle 'fd', monitoring for 'events' (libvirt # event constants), firing the callback cb() when an event occurs. # Returns a unique integer identier for this handle, that should be # used to later update/remove it def add_handle(self, fd, events, cb, opaque): handleID = self.nextHandleID + 1 self.nextHandleID = self.nextHandleID + 1
h = self.virEventLoopPureHandle(handleID, fd, events, cb, opaque) self.handles.append(h)
self.poll.register(fd, self.events_to_poll(events)) self.interrupt()
self.debug("Add handle %d fd %d events %d" % (handleID, fd, events))
return handleID
# Registers a new timer with periodic expiry at 'interval' ms, # firing cb() each time the timer expires. If 'interval' is -1, # then the timer is registered, but not enabled # Returns a unique integer identier for this handle, that should be # used to later update/remove it def add_timer(self, interval, cb, opaque): timerID = self.nextTimerID + 1 self.nextTimerID = self.nextTimerID + 1
h = self.virEventLoopPureTimer(timerID, interval, cb, opaque) self.timers.append(h) self.interrupt()
self.debug("Add timer %d interval %d" % (timerID, interval))
return timerID
# Change the set of events to be monitored on the file handle def update_handle(self, handleID, events): h = self.get_handle_by_id(handleID) if h: h.set_events(events) self.poll.unregister(h.get_fd()) self.poll.register(h.get_fd(), self.events_to_poll(events)) self.interrupt()
self.debug("Update handle %d fd %d events %d" % (handleID, h.get_fd(), events))
# Change the periodic frequency of the timer def update_timer(self, timerID, interval): for h in self.timers: if h.get_id() == timerID: h.set_interval(interval); self.interrupt()
self.debug("Update timer %d interval %d" % (timerID, interval)) break
# Stop monitoring for events on the file handle def remove_handle(self, handleID): handles = [] for h in self.handles: if h.get_id() == handleID: self.poll.unregister(h.get_fd()) self.debug("Remove handle %d fd %d" % (handleID, h.get_fd())) else: handles.append(h) self.handles = handles self.interrupt()
# Stop firing the periodic timer def remove_timer(self, timerID): timers = [] for h in self.timers: if h.get_id() != timerID: timers.append(h) self.debug("Remove timer %d" % timerID) self.timers = timers self.interrupt()
# Convert from libvirt event constants, to poll() events constants def events_to_poll(self, events): ret = 0 if events & libvirt.VIR_EVENT_HANDLE_READABLE: ret |= select.POLLIN if events & libvirt.VIR_EVENT_HANDLE_WRITABLE: ret |= select.POLLOUT if events & libvirt.VIR_EVENT_HANDLE_ERROR: ret |= select.POLLERR; if events & libvirt.VIR_EVENT_HANDLE_HANGUP: ret |= select.POLLHUP; return ret
# Convert from poll() event constants, to libvirt events constants def events_from_poll(self, events): ret = 0; if events & select.POLLIN: ret |= libvirt.VIR_EVENT_HANDLE_READABLE; if events & select.POLLOUT: ret |= libvirt.VIR_EVENT_HANDLE_WRITABLE; if events & select.POLLNVAL: ret |= libvirt.VIR_EVENT_HANDLE_ERROR; if events & select.POLLERR: ret |= libvirt.VIR_EVENT_HANDLE_ERROR; if events & select.POLLHUP: ret |= libvirt.VIR_EVENT_HANDLE_HANGUP; return ret;
########################################################################### # Now glue an instance of the general event loop into libvirt's event loop ###########################################################################
# This single global instance of the event loop wil be used for # monitoring libvirt events eventLoop = virEventLoopPure(debug=False)
# This keeps track of what thread is running the event loop, # (if it is run in a background thread) eventLoopThread = None
# These next set of 6 methods are the glue between the official # libvirt events API, and our particular impl of the event loop # # There is no reason why the 'virEventLoopPure' has to be used. # An application could easily may these 6 glue methods hook into # another event loop such as GLib's, or something like the python # Twisted event framework.
def virEventAddHandleImpl(fd, events, cb, opaque): global eventLoop return eventLoop.add_handle(fd, events, cb, opaque)
def virEventUpdateHandleImpl(handleID, events): global eventLoop return eventLoop.update_handle(handleID, events)
def virEventRemoveHandleImpl(handleID): global eventLoop return eventLoop.remove_handle(handleID)
def virEventAddTimerImpl(interval, cb, opaque): global eventLoop return eventLoop.add_timer(interval, cb, opaque)
def virEventUpdateTimerImpl(timerID, interval): global eventLoop return eventLoop.update_timer(timerID, interval)
def virEventRemoveTimerImpl(timerID): global eventLoop return eventLoop.remove_timer(timerID)
# This tells libvirt what event loop implementation it # should use def virEventLoopPureRegister(): libvirt.virEventRegisterImpl(virEventAddHandleImpl, virEventUpdateHandleImpl, virEventRemoveHandleImpl, virEventAddTimerImpl, virEventUpdateTimerImpl, virEventRemoveTimerImpl)
# Directly run the event loop in the current thread def virEventLoopPureRun(): global eventLoop eventLoop.run_loop()
# Spawn a background thread to run the event loop def virEventLoopPureStart(): global eventLoopThread virEventLoopPureRegister() eventLoopThread = threading.Thread(target=virEventLoopPureRun, name="libvirtEventLoop") eventLoopThread.setDaemon(True) eventLoopThread.start()
########################################################################## # Everything that now follows is a simple demo of domain lifecycle events ########################################################################## def eventToString(event): eventStrings = ( "Added", "Removed", "Started", "Suspended", "Resumed", "Stopped", "Saved", "Restored" ); return eventStrings[event];
def myDomainEventCallback1 (conn, dom, event, detail, opaque): print "myDomainEventCallback1 EVENT: Domain %s(%s) %s %d" % (dom.name(), dom.ID(), eventToString(event), detail)
def myDomainEventCallback2 (conn, dom, event, detail, opaque): print "myDomainEventCallback2 EVENT: Domain %s(%s) %s %d" % (dom.name(), dom.ID(), eventToString(event), detail)
def usage(): print "usage: "+os.path.basename(sys.argv[0])+" [uri]" print " uri will default to qemu:///system"
def main(): try: opts, args = getopt.getopt(sys.argv[1:], "h", ["help"] ) except getopt.GetoptError, err: # print help information and exit: print str(err) # will print something like "option -a not recognized" usage() sys.exit(2) for o, a in opts: if o in ("-h", "--help"): usage() sys.exit()
if len(sys.argv) > 1: uri = sys.argv[1] else: uri = "qemu:///system"
print "Using uri:" + uri
# Run a background thread with the event loop virEventLoopPureStart()
vc = libvirt.open(uri)
# Close connection on exit (to test cleanup paths) old_exitfunc = getattr(sys, 'exitfunc', None) def exit(): print "Closing " + str(vc) vc.close() if (old_exitfunc): old_exitfunc() sys.exitfunc = exit
#Add 2 callbacks to prove this works with more than just one vc.domainEventRegister(myDomainEventCallback1,None) vc.domainEventRegister(myDomainEventCallback2,None)
# The rest of your app would go here normally, but for sake # of demo we'll just go to sleep. The other option is to # run the event loop in your main thread if your app is # totally event based. while 1: time.sleep(1)
if __name__ == "__main__": main()
-- |: Red Hat, Engineering, London -o- http://people.redhat.com/berrange/ :| |: http://libvirt.org -o- http://virt-manager.org -o- http://ovirt.org :| |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :| |: GnuPG: 7D3B9505 -o- F3C9 553F A1DA 4AC2 5648 23C1 B3DF F742 7D3B 9505 :|

On Fri, Oct 09, 2009 at 02:01:13PM +0200, Dan Kenigsberg wrote:
On Fri, Oct 09, 2009 at 09:58:14AM +0100, Daniel P. Berrange wrote:
On Fri, Oct 09, 2009 at 10:19:08AM +0200, Dan Kenigsberg wrote:
On Wed, Oct 07, 2009 at 12:59:39PM +0100, Daniel P. Berrange wrote:
On Tue, Oct 06, 2009 at 07:04:29PM +0200, Dan Kenigsberg wrote:
Would someone help me have a shrink-wrapped solution for obtaining libvirt events in python?
I decided to re-write the demo program so that is shows a serious production kwalitee event loop implementation that can be used in real world applications. I think you'll find this much nicer :-)
It sure look nicer (though I still don't get the hows and whys). However, it seems that you have an issue with python's time.time() measured in seconds, not milliseconds.
Here's a version with more docs & the time in milliseconds
def run_once(self): sleep = -1 next = self.next_timeout() self.debug("Next timeout due at %d" % next) if next > 0: now = int(time.time() * 1000) if now >= next: sleep = 0 else: sleep = next - now
self.debug("Poll with a sleep of %d" % sleep) events = self.poll.poll(sleep)
Thanks! but there's still a units problem here (should be sleep/1000. )
I fixed that too and pushed the example update to GIT Daniel -- |: Red Hat, Engineering, London -o- http://people.redhat.com/berrange/ :| |: http://libvirt.org -o- http://virt-manager.org -o- http://ovirt.org :| |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :| |: GnuPG: 7D3B9505 -o- F3C9 553F A1DA 4AC2 5648 23C1 B3DF F742 7D3B 9505 :|
participants (4)
-
Dan Kenigsberg
-
Daniel P. Berrange
-
Daniel Veillard
-
Florian VICHOT