*** Please do not consider for merging, example tests only ***
Here are the testcases I am currently running to verify the correctness of this
API. These are continuing to evolve.
Signed-off-by: Adam Litke <agl(a)us.ibm.com>
---
blockPull-test.py | 301 +++++++++++++++++++++++++++++++++++++++++++++++++++++
1 files changed, 301 insertions(+), 0 deletions(-)
create mode 100755 blockPull-test.py
diff --git a/blockPull-test.py b/blockPull-test.py
new file mode 100755
index 0000000..d9ccfb6
--- /dev/null
+++ b/blockPull-test.py
@@ -0,0 +1,301 @@
+#!/usr/bin/env python
+
+import sys
+import subprocess
+import time
+import unittest
+import re
+import threading
+import libvirt
+
+qemu_img_bin = "/home/aglitke/src/qemu/qemu-img"
+virsh_bin = "/home/aglitke/src/libvirt/tools/virsh"
+
+dom_xml = """
+<domain type='kvm'>
+ <name>blockPull-test</name>
+ <memory>131072</memory>
+ <currentMemory>131072</currentMemory>
+ <vcpu>1</vcpu>
+ <os>
+ <type arch='x86_64' machine='pc-0.13'>hvm</type>
+ <boot dev='hd'/>
+ </os>
+ <features>
+ <acpi/>
+ <apic/>
+ <pae/>
+ </features>
+ <clock offset='utc'/>
+ <on_poweroff>destroy</on_poweroff>
+ <on_reboot>restart</on_reboot>
+ <on_crash>restart</on_crash>
+ <devices>
+
<emulator>/home/aglitke/src/qemu/x86_64-softmmu/qemu-system-x86_64</emulator>
+ <disk type='file' device='disk'>
+ <driver name='qemu' type='qed'/>
+ <source file='/tmp/disk1.qed' />
+ <target dev='vda' bus='virtio'/>
+ </disk>
+ <disk type='file' device='disk'>
+ <driver name='qemu' type='qed'/>
+ <source file='/tmp/disk2.qed' />
+ <target dev='vdb' bus='virtio'/>
+ </disk>
+ <disk type='file' device='disk'>
+ <driver name='qemu' type='raw'/>
+ <source file='/tmp/disk3.raw' />
+ <target dev='vdc' bus='virtio'/>
+ </disk>
+ <graphics type='vnc' port='-1' autoport='yes'/>
+ </devices>
+</domain>
+"""
+
+def qemu_img(*args):
+ global qemu_img_bin
+
+ devnull = open('/dev/null', 'r+')
+ return subprocess.call([qemu_img_bin] + list(args), stdin=devnull, stdout=devnull)
+
+def virsh(*args):
+ global virsh_bin
+
+ devnull = open('/dev/null', 'r+')
+ return subprocess.call([virsh_bin] + list(args),
+ stdin=devnull, stdout=devnull, stderr=devnull)
+
+def make_baseimage(name, size_mb):
+ devnull = open('/dev/null', 'r+')
+ return subprocess.call(['dd', 'if=/dev/zero', "of=%s" %
name, 'bs=1M',
+ 'count=%i' % size_mb], stdin=devnull, stdout=devnull,
stderr=devnull)
+
+def has_backing_file(path):
+ global qemu_img_bin
+ p1 = subprocess.Popen([qemu_img_bin, "info", path],
+ stdout=subprocess.PIPE).communicate()[0]
+ matches = re.findall("^backing file:", p1, re.M)
+ if len(matches) > 0:
+ return True
+ return False
+
+class BlockPullTestCase(unittest.TestCase):
+ def _error_handler(self, ctx, error, dummy=None):
+ pass
+
+ def create_disks(self, sparse):
+ self.disks = [ '/tmp/disk1.qed', '/tmp/disk2.qed',
'/tmp/disk3.raw' ]
+ if sparse:
+ qemu_img('create', '-f', 'raw',
'/tmp/backing1.img', '100M')
+ qemu_img('create', '-f', 'raw',
'/tmp/backing2.img', '100M')
+ else:
+ make_baseimage('/tmp/backing1.img', 100)
+ make_baseimage('/tmp/backing2.img', 100)
+ qemu_img('create', '-f', 'qed', '-o',
'backing_file=/tmp/backing1.img,copy_on_read=on', self.disks[0])
+ qemu_img('create', '-f', 'qed', '-o',
'backing_file=/tmp/backing2.img,copy_on_read=on', self.disks[1])
+ qemu_img('create', '-f', 'raw', self.disks[2],
'100M')
+
+ def begin(self, sparse=True):
+ global dom_xml
+
+ libvirt.registerErrorHandler(self._error_handler, None)
+ self.create_disks(sparse)
+ self.conn = libvirt.open('qemu:///system')
+ self.dom = self.conn.createXML(dom_xml, 0)
+
+ def end(self):
+ self.dom.destroy()
+ self.conn.close()
+
+class TestBasicErrors(BlockPullTestCase):
+ def setUp(self):
+ self.begin()
+
+ def tearDown(self):
+ self.end()
+
+ def test_bad_path(self):
+ try:
+ self.dom.blockPull('/dev/null', 0)
+ except libvirt.libvirtError, e:
+ self.assertEqual(libvirt.VIR_ERR_INVALID_ARG, e.get_error_code())
+ else:
+ e = self.conn.virConnGetLastError()
+ self.assertEqual(libvirt.VIR_ERR_INVALID_ARG, e[0])
+
+ def test_abort_no_stream(self):
+ try:
+ self.dom.blockPullAbort(self.disks[0], 0)
+ except libvirt.libvirtError, e:
+ self.assertEqual(libvirt.VIR_ERR_OPERATION_INVALID, e.get_error_code())
+ else:
+ e = self.conn.virConnGetLastError()
+ self.assertEqual(libvirt.VIR_ERR_OPERATION_INVALID, e[0])
+
+ def test_start_same_twice(self):
+ self.dom.blockPullAll(self.disks[0], 0)
+ try:
+ self.dom.blockPullAll(self.disks[0], 0)
+ except libvirt.libvirtError, e:
+ self.assertEqual(libvirt.VIR_ERR_OPERATION_FAILED, e.get_error_code())
+ else:
+ e = self.conn.virConnGetLastError()
+ self.assertEqual(libvirt.VIR_ERR_OPERATION_FAILED, e[0])
+
+ def test_unsupported_disk(self):
+ try:
+ self.dom.blockPull(self.disks[2], 0)
+ except libvirt.libvirtError, e:
+ self.assertEqual(libvirt.VIR_ERR_OPERATION_INVALID, e.get_error_code())
+ else:
+ e = self.conn.virConnGetLastError()
+ self.assertEqual(libvirt.VIR_ERR_OPERATION_INVALID, e[0])
+
+class TestBasicCommands(BlockPullTestCase):
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ self.end()
+
+ def test_start_stop(self):
+ self.begin(sparse=False)
+ self.dom.blockPullAll(self.disks[0], 0)
+ time.sleep(1)
+ self.assertIsNot(None, self.dom.blockPullInfo(self.disks[0], 0))
+ self.dom.blockPullAbort(self.disks[0], 0)
+ time.sleep(1)
+ self.assertIs(None, self.dom.blockPullInfo(self.disks[0], 0))
+
+ # Restarting the stream as incremental should not show up in
+ # blockPullInfo
+ self.dom.blockPull(self.disks[0], 0)
+ self.assertIs(None, self.dom.blockPullInfo(self.disks[0], 0))
+
+ def test_whole_disk(self):
+ self.begin()
+ self.assertTrue(has_backing_file(self.disks[0]))
+ self.dom.blockPullAll(self.disks[0], 0)
+ for i in xrange(1, 5):
+ if self.dom.blockPullInfo(self.disks[0], 0) is None:
+ break
+ time.sleep(1)
+ self.assertFalse(has_backing_file(self.disks[0]))
+
+ def test_incremental(self):
+ self.begin()
+ last_cur = -1
+ while True:
+ info = self.dom.blockPull(self.disks[0], 0)
+ if info is None or info['cur'] == info['end']:
+ break
+ self.assertGreater(info['cur'], last_cur)
+ last_cur = info['cur']
+ self.assertFalse(has_backing_file(self.disks[0]))
+
+ def test_interleaved(self):
+ self.begin()
+ disk_list = range(2)
+ while True:
+ for i in disk_list:
+ info = self.dom.blockPull(self.disks[i], 0)
+ if info is None or info['cur'] == info['end']:
+ disk_list.remove(i)
+ if len(disk_list) == 0:
+ break
+ for i in range(2):
+ self.assertFalse(has_backing_file(self.disks[i]))
+
+ def test_two_disks_at_once(self):
+ self.begin()
+ disk_list = range(2)
+ for d in disk_list:
+ self.dom.blockPullAll(self.disks[d], 0)
+
+ for i in xrange(5):
+ for d in disk_list:
+ info = self.dom.blockPullInfo(self.disks[d], 0)
+ if info is None:
+ disk_list.remove(d)
+ if len(disk_list) == 0:
+ break
+ time.sleep(1)
+ for d in range(2):
+ self.assertFalse(has_backing_file(self.disks[d]))
+
+class TestEvents(BlockPullTestCase):
+ def eventLoopRun(self):
+ while self.do_events:
+ libvirt.virEventRunDefaultImpl()
+
+ def eventLoopStart(self):
+ libvirt.virEventRegisterDefaultImpl()
+ self.eventLoopThread = threading.Thread(target=self.eventLoopRun,
name="libvirtEventLoop")
+ self.eventLoopThread.setDaemon(True)
+ self.do_events = True
+ self.eventLoopThread.start()
+
+ def eventLoopStop(self):
+ self.do_events = False
+
+ def setUp(self):
+ self.eventLoopStart()
+
+ def tearDown(self):
+ self.end()
+
+ @staticmethod
+ def recordBlockPullEvent(conn, dom, path, status, inst):
+ inst.event = (dom, path, status)
+
+ def test_event_complete(self):
+ self.begin()
+ self.event = None
+ self.conn.domainEventRegisterAny(self.dom,
libvirt.VIR_DOMAIN_EVENT_ID_BLOCK_PULL,
+ TestEvents.recordBlockPullEvent, self)
+ self.dom.blockPullAll(self.disks[0], 0)
+ for i in xrange(1, 5):
+ if self.event is not None:
+ break
+ time.sleep(1)
+ self.eventLoopStop()
+ self.assertIsNot(None, self.event)
+ self.assertFalse(has_backing_file(self.disks[0]))
+ self.assertEqual(self.event[1], self.disks[0])
+ self.assertEqual(self.event[2], libvirt.VIR_DOMAIN_BLOCK_PULL_COMPLETED)
+
+class TestVirsh(BlockPullTestCase):
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ self.end()
+
+ def test_blockpull(self):
+ self.begin()
+ info1 = self.dom.blockPull(self.disks[0], 0)
+ virsh('blockpull', self.dom.name(), self.disks[0])
+ info2 = self.dom.blockPull(self.disks[0], 0)
+ self.assertGreater(info2['cur'], info1['cur'])
+
+ def test_blockpullall(self):
+ self.begin()
+ virsh('blockpullall', self.dom.name(), self.disks[0])
+ for i in xrange(1, 5):
+ if self.dom.blockPullInfo(self.disks[0], 0) is None:
+ break
+ time.sleep(1)
+ self.assertFalse(has_backing_file(self.disks[0]))
+
+ def test_blockpullabort(self):
+ self.begin(sparse=False)
+ self.dom.blockPullAll(self.disks[0], 0)
+ time.sleep(1)
+ self.assertIsNot(None, self.dom.blockPullInfo(self.disks[0], 0))
+ virsh('blockpullall', self.dom.name(), '--abort', self.disks[0])
+ time.sleep(1)
+ self.assertIs(None, self.dom.blockPullInfo(self.disks[0], 0))
+ self.assertTrue(has_backing_file(self.disks[0]))
+
+if __name__ == '__main__':
+ unittest.main()
--
1.7.3