On Tue, Oct 06, 2009 at 07:04:29PM +0200, Dan Kenigsberg wrote:
Would someone help me have a shrink-wrapped solution for obtaining
libvirt
events in python?
I decided to re-write the demo program so that is shows a serious
production kwalitee event loop implementation that can be used in
real world applications. I think you'll find this much nicer :-)
I propose we add this example to examples/domain-events/events-py
instead of the code we currently have
Daniel
#!/usr/bin/python -u
#
#
#
#################################################################################
# Start off by implementing a general purpose event loop for anyones use
#################################################################################
import sys
import getopt
import os
import libvirt
import select
import errno
import time
import threading
class virEventLoopPure:
class virEventLoopPureHandle:
def __init__(self, handle, fd, events, cb, opaque):
self.handle = handle
self.fd = fd
self.events = events
self.cb = cb
self.opaque = opaque
def get_id(self):
return self.handle
def get_fd(self):
return self.fd
def get_events(self):
return self.events
def set_events(self, events):
self.events = events
def dispatch(self, events):
self.cb(self.handle,
self.fd,
events,
self.opaque[0],
self.opaque[1])
class virEventLoopPureTimer:
def __init__(self, timer, interval, cb, opaque):
self.timer = timer
self.interval = interval
self.cb = cb
self.opaque = opaque
self.lastfired = 0
def get_id(self):
return self.timer
def get_interval(self):
return self.interval
def set_interval(self, interval):
self.interval = interval
def get_last_fired(self):
return self.lastfired
def set_last_fired(self, now):
self.lastfired = now
def dispatch(self):
self.cb(self.timer,
self.opaque[0],
self.opaque[1])
def __init__(self, debug=False):
self.debugOn = debug
self.poll = select.poll()
self.pipetrick = os.pipe()
self.nextHandleID = 1
self.nextTimerID = 1
self.handles = []
self.timers = []
self.quit = False
self.debug("Self pipe watch %d write %d" %(self.pipetrick[0],
self.pipetrick[1]))
self.poll.register(self.pipetrick[0], select.POLLIN)
def debug(self, msg):
if self.debugOn:
print msg
def next_timeout(self):
next = 0
for t in self.timers:
last = t.get_last_fired()
interval = t.get_interval()
if interval < 0:
continue
if next == 0 or (last + interval) < next:
next = last + interval
return next
def get_handle_by_fd(self, fd):
for h in self.handles:
if h.get_fd() == fd:
return h
return None
def get_handle_by_id(self, handleID):
for h in self.handles:
if h.get_id() == handleID:
return h
return None
def run_once(self):
sleep = -1
next = self.next_timeout()
self.debug("Next timeout due at %d" % next)
if next > 0:
now = int(time.time())
if now >= next:
sleep = 0
else:
sleep = next - now
self.debug("Poll with a sleep of %d" % sleep)
events = self.poll.poll(sleep)
for (fd, revents) in events:
if fd == self.pipetrick[0]:
data = os.read(fd, 1)
continue
h = self.get_handle_by_fd(fd)
if h:
self.debug("Dispatch fd %d handle %d events %d" % (fd,
h.get_id(), revents))
h.dispatch(self.events_from_poll(revents))
now = int(time.time())
for t in self.timers:
interval = t.get_interval()
if interval < 0:
continue
want = t.get_last_fired() + interval
# Deduct 20ms, since schedular timeslice
# means we could be ever so slightly early
if now >= (want-20):
self.debug("Dispatch timer %d now %s want %s" % (t.get_id(),
str(now), str(want)))
t.set_last_fired(now)
t.dispatch()
def run_loop(self):
self.quit = False
while not self.quit:
self.run_once()
def interrupt(self):
os.write(self.pipetrick[1], 'c')
def add_handle(self, fd, events, cb, opaque):
handleID = self.nextHandleID + 1
self.nextHandleID = self.nextHandleID + 1
h = self.virEventLoopPureHandle(handleID, fd, events, cb, opaque)
self.handles.append(h)
self.poll.register(fd, self.events_to_poll(events))
self.interrupt()
self.debug("Add handle %d fd %d events %d" % (handleID, fd, events))
return handleID
def add_timer(self, interval, cb, opaque):
timerID = self.nextTimerID + 1
self.nextTimerID = self.nextTimerID + 1
h = self.virEventLoopPureTimer(timerID, interval, cb, opaque)
self.timers.append(h)
self.interrupt()
self.debug("Add timer %d interval %d" % (timerID, interval))
return timerID
def update_handle(self, handleID, events):
h = self.get_handle_by_id(handleID)
if h:
h.set_events(events)
self.poll.unregister(h.get_fd())
self.poll.register(h.get_fd(), self.events_to_poll(events))
self.interrupt()
self.debug("Update handle %d fd %d events %d" % (handleID,
h.get_fd(), events))
def update_timer(self, timerID, interval):
for h in self.timers:
if h.get_id() == timerID:
h.set_interval(interval);
self.interrupt()
self.debug("Update timer %d interval %d" % (timerID,
interval))
break
def remove_handle(self, handleID):
handles = []
for h in self.handles:
if h.get_id() == handleID:
self.poll.unregister(h.get_fd())
self.debug("Remove handle %d fd %d" % (handleID, h.get_fd()))
else:
handles.append(h)
self.handles = handles
self.interrupt()
def remove_timer(self, timerID):
timers = []
for h in self.timers:
if h.get_id() != timerID:
timers.append(h)
self.debug("Remove timer %d" % timerID)
self.timers = timers
self.interrupt()
def events_to_poll(self, events):
ret = 0
if events & libvirt.VIR_EVENT_HANDLE_READABLE:
ret |= select.POLLIN
if events & libvirt.VIR_EVENT_HANDLE_WRITABLE:
ret |= select.POLLOUT
if events & libvirt.VIR_EVENT_HANDLE_ERROR:
ret |= select.POLLERR;
if events & libvirt.VIR_EVENT_HANDLE_HANGUP:
ret |= select.POLLHUP;
return ret
def events_from_poll(self, events):
ret = 0;
if events & select.POLLIN:
ret |= libvirt.VIR_EVENT_HANDLE_READABLE;
if events & select.POLLOUT:
ret |= libvirt.VIR_EVENT_HANDLE_WRITABLE;
if events & select.POLLNVAL:
ret |= libvirt.VIR_EVENT_HANDLE_ERROR;
if events & select.POLLERR:
ret |= libvirt.VIR_EVENT_HANDLE_ERROR;
if events & select.POLLHUP:
ret |= libvirt.VIR_EVENT_HANDLE_HANGUP;
return ret;
###########################################################################
# Now glue an instance of the general event loop into libvirt's event loop
###########################################################################
eventLoop = virEventLoopPure(debug=False)
eventLoopThread = None
def virEventAddHandleImpl(fd, events, cb, opaque):
global eventLoop
return eventLoop.add_handle(fd, events, cb, opaque)
def virEventUpdateHandleImpl(handleID, events):
global eventLoop
return eventLoop.update_handle(handleID, events)
def virEventRemoveHandleImpl(handleID):
global eventLoop
return eventLoop.remove_handle(handleID)
def virEventAddTimerImpl(interval, cb, opaque):
global eventLoop
return eventLoop.add_timer(interval, cb, opaque)
def virEventUpdateTimerImpl(timerID, interval):
global eventLopo
return eventLoop.update_timer(timerID, interval)
def virEventRemoveTimerImpl(timerID):
global eventLoop
return eventLoop.remove_timer(timerID)
def virEventLoopPureRegister():
libvirt.virEventRegisterImpl(virEventAddHandleImpl,
virEventUpdateHandleImpl,
virEventRemoveHandleImpl,
virEventAddTimerImpl,
virEventUpdateTimerImpl,
virEventRemoveTimerImpl)
def virEventLoopPureRun():
global eventLoop
eventLoop.run_loop()
def virEventLoopPureStart():
global eventLoopThread
virEventLoopPureRegister()
eventLoopThread = threading.Thread(target=virEventLoopPureRun,
name="libvirtEventLoop")
eventLoopThread.setDaemon(True)
eventLoopThread.start()
##########################################################################
# Everything that now follows is a simple demo of domain lifecycle events
##########################################################################
def eventToString(event):
eventStrings = ( "Added",
"Removed",
"Started",
"Suspended",
"Resumed",
"Stopped",
"Saved",
"Restored" );
return eventStrings[event];
def myDomainEventCallback1 (conn, dom, event, detail, opaque):
print "myDomainEventCallback1 EVENT: Domain %s(%s) %s %d" % (dom.name(),
dom.ID(), eventToString(event), detail)
def myDomainEventCallback2 (conn, dom, event, detail, opaque):
print "myDomainEventCallback2 EVENT: Domain %s(%s) %s %d" % (dom.name(),
dom.ID(), eventToString(event), detail)
def usage():
print "usage: "+os.path.basename(sys.argv[0])+" [uri]"
print " uri will default to qemu:///system"
def main():
try:
opts, args = getopt.getopt(sys.argv[1:], "h", ["help"] )
except getopt.GetoptError, err:
# print help information and exit:
print str(err) # will print something like "option -a not recognized"
usage()
sys.exit(2)
for o, a in opts:
if o in ("-h", "--help"):
usage()
sys.exit()
if len(sys.argv) > 1:
uri = sys.argv[1]
else:
uri = "qemu:///system"
print "Using uri:" + uri
# Run a background thread with the event loop
virEventLoopPureStart()
vc = libvirt.open(uri)
# Close connection on exit (to test cleanup paths)
old_exitfunc = getattr(sys, 'exitfunc', None)
def exit():
print "Closing " + str(vc)
vc.close()
if (old_exitfunc): old_exitfunc()
sys.exitfunc = exit
#Add 2 callbacks to prove this works with more than just one
vc.domainEventRegister(myDomainEventCallback1,None)
vc.domainEventRegister(myDomainEventCallback2,None)
# The rest of your app would go here normally, but for sake
# of demo we'll just go to sleep. The other option is to
# run the event loop in your main thread if your app is
# totally event based.
while 1:
time.sleep(1)
if __name__ == "__main__":
main()
--
|: Red Hat, Engineering, London -o-
http://people.redhat.com/berrange/ :|
|:
http://libvirt.org -o-
http://virt-manager.org -o-
http://ovirt.org :|
|:
http://autobuild.org -o-
http://search.cpan.org/~danberr/ :|
|: GnuPG: 7D3B9505 -o- F3C9 553F A1DA 4AC2 5648 23C1 B3DF F742 7D3B 9505 :|