This patch is for information only and should not be comitted.
Signed-off-by: Adam Litke <agl(a)us.ibm.com>
---
blockPull-test.py | 281 +++++++++++++++++++++++++++++++++++++++++++++++++++++
1 files changed, 281 insertions(+), 0 deletions(-)
create mode 100644 blockPull-test.py
diff --git a/blockPull-test.py b/blockPull-test.py
new file mode 100644
index 0000000..a75e0d9
--- /dev/null
+++ b/blockPull-test.py
@@ -0,0 +1,281 @@
+#!/usr/bin/env python
+
+import sys
+import subprocess
+import time
+import unittest
+import re
+import threading
+import libvirt
+
+qemu_img_bin = "/home/aglitke/src/qemu/qemu-img"
+virsh_bin = "/home/aglitke/src/libvirt/tools/virsh"
+
+dom_xml = """
+<domain type='kvm'>
+ <name>blockPull-test</name>
+ <memory>131072</memory>
+ <currentMemory>131072</currentMemory>
+ <vcpu>1</vcpu>
+ <os>
+ <type arch='x86_64' machine='pc-0.13'>hvm</type>
+ <boot dev='hd'/>
+ </os>
+ <features>
+ <acpi/>
+ <apic/>
+ <pae/>
+ </features>
+ <clock offset='utc'/>
+ <on_poweroff>destroy</on_poweroff>
+ <on_reboot>restart</on_reboot>
+ <on_crash>restart</on_crash>
+ <devices>
+
<emulator>/home/aglitke/src/qemu/x86_64-softmmu/qemu-system-x86_64</emulator>
+ <disk type='file' device='disk'>
+ <driver name='qemu' type='qed'/>
+ <source file='/tmp/disk1.qed' />
+ <target dev='vda' bus='virtio'/>
+ </disk>
+ <disk type='file' device='disk'>
+ <driver name='qemu' type='qed'/>
+ <source file='/tmp/disk2.qed' />
+ <target dev='vdb' bus='virtio'/>
+ </disk>
+ <disk type='file' device='disk'>
+ <driver name='qemu' type='raw'/>
+ <source file='/tmp/disk3.raw' />
+ <target dev='vdc' bus='virtio'/>
+ </disk>
+ <graphics type='vnc' port='-1' autoport='yes'/>
+ </devices>
+</domain>
+"""
+
+def qemu_img(*args):
+ global qemu_img_bin
+
+ devnull = open('/dev/null', 'r+')
+ return subprocess.call([qemu_img_bin] + list(args), stdin=devnull, stdout=devnull)
+
+def virsh(*args):
+ global virsh_bin
+
+ devnull = open('/dev/null', 'r+')
+ return subprocess.Popen([virsh_bin] + list(args),
+ stdout=subprocess.PIPE).communicate()[0]
+ #return subprocess.call([virsh_bin] + list(args),
+ # stdin=devnull, stdout=devnull, stderr=devnull)
+
+def make_baseimage(name, size_mb):
+ devnull = open('/dev/null', 'r+')
+ return subprocess.call(['dd', 'if=/dev/zero', "of=%s" %
name, 'bs=1M',
+ 'count=%i' % size_mb], stdin=devnull, stdout=devnull,
stderr=devnull)
+
+def has_backing_file(path):
+ global qemu_img_bin
+ p1 = subprocess.Popen([qemu_img_bin, "info", path],
+ stdout=subprocess.PIPE).communicate()[0]
+ matches = re.findall("^backing file:", p1, re.M)
+ if len(matches) > 0:
+ return True
+ return False
+
+class BlockPullTestCase(unittest.TestCase):
+ def _error_handler(self, ctx, error, dummy=None):
+ pass
+
+ def create_disks(self, sparse):
+ self.disks = [ '/tmp/disk1.qed', '/tmp/disk2.qed',
'/tmp/disk3.raw' ]
+ if sparse:
+ qemu_img('create', '-f', 'raw',
'/tmp/backing1.img', '100M')
+ qemu_img('create', '-f', 'raw',
'/tmp/backing2.img', '100M')
+ else:
+ make_baseimage('/tmp/backing1.img', 100)
+ make_baseimage('/tmp/backing2.img', 100)
+ qemu_img('create', '-f', 'qed', '-o',
'backing_file=/tmp/backing1.img', self.disks[0])
+ qemu_img('create', '-f', 'qed', '-o',
'backing_file=/tmp/backing2.img', self.disks[1])
+ qemu_img('create', '-f', 'raw', self.disks[2],
'100M')
+
+ def begin(self, sparse=True):
+ global dom_xml
+
+ libvirt.registerErrorHandler(self._error_handler, None)
+ self.create_disks(sparse)
+ self.conn = libvirt.open('qemu:///system')
+ self.dom = self.conn.createXML(dom_xml, 0)
+
+ def end(self):
+ self.dom.destroy()
+ self.conn.close()
+
+class TestBasicErrors(BlockPullTestCase):
+ def setUp(self):
+ self.begin()
+
+ def tearDown(self):
+ self.end()
+
+ def test_bad_path(self):
+ try:
+ self.dom.blockPull('/dev/null', 0, 0)
+ except libvirt.libvirtError, e:
+ self.assertEqual(libvirt.VIR_ERR_INVALID_ARG, e.get_error_code())
+ else:
+ e = self.conn.virConnGetLastError()
+ self.assertEqual(libvirt.VIR_ERR_INVALID_ARG, e[0])
+
+ def test_abort_no_stream(self):
+ try:
+ self.dom.blockJobAbort(self.disks[0], 0)
+ except libvirt.libvirtError, e:
+ self.assertEqual(libvirt.VIR_ERR_OPERATION_INVALID, e.get_error_code())
+ else:
+ e = self.conn.virConnGetLastError()
+ self.assertEqual(libvirt.VIR_ERR_OPERATION_INVALID, e[0])
+
+ def test_start_same_twice(self):
+ self.dom.blockPull(self.disks[0], 0, 0)
+ try:
+ self.dom.blockPull(self.disks[0], 0, 0)
+ except libvirt.libvirtError, e:
+ self.assertEqual(libvirt.VIR_ERR_OPERATION_FAILED, e.get_error_code())
+ else:
+ e = self.conn.virConnGetLastError()
+ self.assertEqual(libvirt.VIR_ERR_OPERATION_FAILED, e[0])
+
+ def test_unsupported_disk(self):
+ try:
+ self.dom.blockPull(self.disks[2], 0, 0)
+ except libvirt.libvirtError, e:
+ self.assertEqual(libvirt.VIR_ERR_OPERATION_INVALID, e.get_error_code())
+ else:
+ e = self.conn.virConnGetLastError()
+ self.assertEqual(libvirt.VIR_ERR_OPERATION_INVALID, e[0])
+
+class TestBasicCommands(BlockPullTestCase):
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ self.end()
+
+ def test_start_stop(self):
+ self.begin(sparse=False)
+ self.dom.blockPull(self.disks[0], 0, 0)
+ time.sleep(1)
+ info = self.dom.blockJobInfo(self.disks[0], 0)
+ self.assertIsNot(None, info)
+ self.assertEqual(info['type'], libvirt.VIR_DOMAIN_BLOCK_JOB_TYPE_PULL)
+ self.dom.blockJobAbort(self.disks[0], 0)
+ time.sleep(1)
+ self.assertIs(None, self.dom.blockJobInfo(self.disks[0], 0))
+
+ def test_whole_disk(self):
+ self.begin()
+ self.assertTrue(has_backing_file(self.disks[0]))
+ self.dom.blockPull(self.disks[0], 0, 0)
+ for i in xrange(1, 5):
+ if self.dom.blockJobInfo(self.disks[0], 0) is None:
+ break
+ time.sleep(1)
+ self.assertFalse(has_backing_file(self.disks[0]))
+
+ def test_two_disks_at_once(self):
+ self.begin()
+ disk_list = range(2)
+ for d in disk_list:
+ self.dom.blockPull(self.disks[d], 0, 0)
+
+ for i in xrange(5):
+ for d in disk_list:
+ info = self.dom.blockJobInfo(self.disks[d], 0)
+ if info is None:
+ disk_list.remove(d)
+ if len(disk_list) == 0:
+ break
+ time.sleep(1)
+ for d in range(2):
+ self.assertFalse(has_backing_file(self.disks[d]))
+
+class TestEvents(BlockPullTestCase):
+ def eventLoopRun(self):
+ while self.do_events:
+ libvirt.virEventRunDefaultImpl()
+
+ def eventLoopStart(self):
+ libvirt.virEventRegisterDefaultImpl()
+ self.eventLoopThread = threading.Thread(target=self.eventLoopRun,
name="libvirtEventLoop")
+ self.eventLoopThread.setDaemon(True)
+ self.do_events = True
+ self.eventLoopThread.start()
+
+ def eventLoopStop(self):
+ self.do_events = False
+
+ def setUp(self):
+ self.eventLoopStart()
+
+ def tearDown(self):
+ self.end()
+
+ @staticmethod
+ def recordBlockJobEvent(conn, dom, path, type, status, inst):
+ inst.event = (dom, path, type, status)
+
+ def test_event_complete(self):
+ self.begin()
+ self.event = None
+ self.conn.domainEventRegisterAny(self.dom,
libvirt.VIR_DOMAIN_EVENT_ID_BLOCK_JOB,
+ TestEvents.recordBlockJobEvent, self)
+ self.dom.blockPull(self.disks[0], 0, 0)
+ for i in xrange(1, 5):
+ if self.event is not None:
+ break
+ time.sleep(1)
+ self.eventLoopStop()
+ self.assertIsNot(None, self.event)
+ self.assertFalse(has_backing_file(self.disks[0]))
+ self.assertEqual(self.event[1], self.disks[0])
+ self.assertEqual(self.event[2], libvirt.VIR_DOMAIN_BLOCK_JOB_TYPE_PULL)
+ self.assertEqual(self.event[3], libvirt.VIR_DOMAIN_BLOCK_JOB_COMPLETED)
+
+class TestVirsh(BlockPullTestCase):
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ self.end()
+
+ def test_blockpull(self):
+ self.begin()
+ virsh('blockpull', self.dom.name(), self.disks[0])
+ for i in xrange(1, 5):
+ if self.dom.blockJobInfo(self.disks[0], 0) is None:
+ break
+ time.sleep(1)
+ self.assertFalse(has_backing_file(self.disks[0]))
+
+ def test_job_abort(self):
+ self.begin(sparse=False)
+ self.dom.blockPull(self.disks[0], 0, 0)
+ time.sleep(1)
+ self.assertIsNot(None, self.dom.blockJobInfo(self.disks[0], 0))
+ virsh('blockjob', self.dom.name(), '--abort', self.disks[0])
+ time.sleep(2)
+ self.assertIs(None, self.dom.blockJobInfo(self.disks[0], 0))
+ self.assertTrue(has_backing_file(self.disks[0]))
+
+ def test_job_info(self):
+ self.begin(sparse=False)
+ virsh('blockpull', self.dom.name(), self.disks[0])
+ for i in xrange(1, 10):
+ output = virsh('blockjob', self.dom.name(), '--info',
self.disks[0])
+ matches = re.findall("^Block Pull:", output, re.M)
+ if len(matches) > 0:
+ break
+ time.sleep(1)
+ self.assertFalse(has_backing_file(self.disks[0]))
+
+if __name__ == '__main__':
+ unittest.main()
--
1.7.3