suites/libvirt-cim/cimtest/FilterList/01_enum.py | 71 ++
suites/libvirt-cim/cimtest/FilterList/02_assoc.py | 158 ++++++
suites/libvirt-cim/cimtest/FilterList/03_create.py | 199 ++++++++
suites/libvirt-cim/cimtest/FilterList/helper.py | 515 +++++++++++++++++++++
4 files changed, 943 insertions(+), 0 deletions(-)
# HG changeset patch
# User Eduardo Lima (Etrunko) <eblima(a)br.ibm.com>
# Date 1310498051 10800
# Node ID d9741a8b5eb7ccebf21d69f3cde72729bb60ad22
# Parent 779871660a3583dedf1652773196d07093ff26ff
[TEST] Adding tests for FilterLists
helper.py:
- Helper module with classes and functions used by all tests
01_enum.py:
- Enumerate FilterList instances and compare to results from libvirt
02_assoc.py:
- For each FilterList instance, call Associators to get respective
FilterEntries instances and compare to results from libvirt
03_create.py:
- Creates a new FilterList instance and a NestedFilterList associated
to this new instance. Creates a new domain and a new
AppliedFilterList instance associated to the NetworkPort instance of
that new domain.
Signed-off-by: Eduardo Lima (Etrunko) <eblima(a)br.ibm.com>
diff --git a/suites/libvirt-cim/cimtest/FilterList/01_enum.py
b/suites/libvirt-cim/cimtest/FilterList/01_enum.py
new file mode 100644
--- /dev/null
+++ b/suites/libvirt-cim/cimtest/FilterList/01_enum.py
@@ -0,0 +1,71 @@
+#!/usr/bin/env python
+#
+# Copyright 2011 IBM Corp.
+#
+# Authors:
+# Eduardo Lima (Etrunko) <eblima(a)br.ibm.com>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# General Public License for more details.
+#
+# You should have received a copy of the GNU General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#
+
+#
+# CIMTest Filter Lists Enumerate
+#
+
+import sys
+import helper
+
+from CimTest.ReturnCodes import PASS, FAIL
+from CimTest.Globals import logger
+from XenKvmLib.const import do_main
+
+sup_types = ["KVM",]
+
+@do_main(sup_types)
+def main():
+ options = main.options
+
+ _test = helper.FilterListTest(options.ip, options.virt)
+
+ # Fetch current filters with libvirt
+ libvirt_filters = _test.libvirt_filter_lists()
+ if not libvirt_filters:
+ return FAIL
+
+ logger.info("libvirt filters:\n%s", libvirt_filters)
+
+ # Fetch current filters with libvirt-cim
+ cim_filters = _test.cim_filter_lists()
+ if not cim_filters:
+ # TODO: Add some filters of our own
+ return FAIL
+
+ logger.info("libvirt-cim filters:\n%s", cim_filters)
+
+ # Compare results
+ if len(libvirt_filters) != len(cim_filters):
+ logger.error("CIM filters list length is different than libvirt filters
list")
+ return FAIL
+
+ for f in libvirt_filters:
+ if f not in cim_filters:
+ logger.error("Filter %s, not found in CIM filters list", f)
+ return FAIL
+
+ return PASS
+# main
+
+if __name__ == "__main__":
+ sys.exit(main())
diff --git a/suites/libvirt-cim/cimtest/FilterList/02_assoc.py
b/suites/libvirt-cim/cimtest/FilterList/02_assoc.py
new file mode 100644
--- /dev/null
+++ b/suites/libvirt-cim/cimtest/FilterList/02_assoc.py
@@ -0,0 +1,158 @@
+#!/usr/bin/env python
+#
+# Copyright 2011 IBM Corp.
+#
+# Authors:
+# Eduardo Lima (Etrunko) <eblima(a)br.ibm.com>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# General Public License for more details.
+#
+# You should have received a copy of the GNU General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#
+
+#
+# CIMTest Filter Lists Associators
+#
+
+import sys
+import helper
+
+from CimTest.ReturnCodes import PASS, FAIL, XFAIL
+from CimTest.Globals import logger
+from XenKvmLib.const import do_main
+
+sup_types = ["KVM",]
+
+@do_main(sup_types)
+def main():
+ options = main.options
+
+ _test = helper.FilterListTest(options.ip, options.virt)
+
+ # Fetch current filters with libvirt
+ libvirt_filters = _test.libvirt_entries_in_filter_lists()
+ if not libvirt_filters:
+ return FAIL
+
+ #logger.info("libvirt filters:\n%s", libvirt_filters)
+
+ # Fetch current filters with libvirt-cim
+ cim_filters = _test.cim_entries_in_filter_lists()
+ if not cim_filters:
+ return FAIL
+
+ #logger.info("libvirt-cim filters:\n%s", cim_filters)
+
+ # Compare results
+ if len(libvirt_filters) != len(cim_filters):
+ logger.error("CIM filters list length and libvirt filters list
differ")
+ return FAIL
+
+ # Compare each result
+ for inst_name in cim_filters:
+ _cim_f = _test.GetInstance(inst_name)
+ try:
+ _key = (_cim_f["InstanceID"], _cim_f["Name"])
+ _vir_f = libvirt_filters[_key]
+ except KeyError, e:
+ logger.error(e)
+ return FAIL
+
+ logger.info("")
+ logger.info("Processing '%s' filter", _key[1])
+
+ # Check number of rules
+ n_vir_rules = len([e for e in _vir_f.getchildren() if e.tag in ["rule",
"filterref"]])
+
+ # process each element
+ instances = cim_filters[inst_name]
+ n_cim_rules = len(instances)
+
+ if n_cim_rules != n_vir_rules:
+ logger.error("Number of rules returned by libvirt (%d) and libvirt-cim
(%d) differ",
+ n_vir_rules, n_cim_rules)
+ return FAIL
+
+
+ # TODO: Create a new class to handle the filter parsing
+ def parse_filter(element, spaces=""):
+ logger.info("%s%s(%s)%s",
+ spaces,
+ element.tag,
+ element.attrib and element.attrib or "",
+ element.text and ": %s" % element.text or
"")
+
+ # Recurse to last element in tree
+ for e in element:
+ parse_filter(e, "%s " % spaces)
+
+ if element.tag == "filterref":
+ name = element.get("filter")
+ try:
+ i = [inst for inst in instances if inst["Name"] ==
name][0]
+ logger.info("%s* MATCH: Instance: %s", spaces, i)
+ instances.remove(i)
+ return
+ except:
+ raise Exception("No CIM Instance matching this rule was
found")
+ elif element.tag == "rule":
+ if not instances:
+ raise Exception("No CIM Instance matching this rule was
found")
+
+ rule = helper.FilterRule(element)
+
+ # Find matching instance
+ logger.info("%s* %s", spaces, rule)
+ for i in instances:
+ props = ""
+ for p in i.properties.keys():
+ props = "%s '%s':'%s'" % (props, p,
i[p])
+ logger.info("%s* %s(%s)", spaces, i.classname, props)
+
+ matches, msg = rule.matches(i)
+ if msg:
+ logger.info("%s* %s: %s", spaces, matches and
"MATCH" or "DON'T MATCH", msg)
+
+ if matches:
+ instances.remove(i)
+ return
+
+ # No matching instance
+ raise Exception("No CIM instance matching rule found")
+ else:
+ # Unexpected tag, ignore by now
+ pass
+ # parse_filter
+
+ try:
+ parse_filter(_vir_f)
+ except Exception, e:
+ logger.error("Error parsing filter '%s': %s", _vir_f.tag,
e)
+ return FAIL
+
+ # Check for leftovers
+ for i in instances:
+ props = ""
+ for p in i.properties.keys():
+ props = "%s '%s':'%s'" % (props, p, i[p])
+
+ logger.error("Could NOT find match for instance %s : {%s}",
i.classname, props)
+ return FAIL
+ # end for inst_name in cim_filters
+
+ logger.info("====End of test====")
+ return PASS
+# main
+
+if __name__ == "__main__":
+ sys.exit(main())
diff --git a/suites/libvirt-cim/cimtest/FilterList/03_create.py
b/suites/libvirt-cim/cimtest/FilterList/03_create.py
new file mode 100644
--- /dev/null
+++ b/suites/libvirt-cim/cimtest/FilterList/03_create.py
@@ -0,0 +1,199 @@
+#!/usr/bin/env python
+#
+# Copyright 2011 IBM Corp.
+#
+# Authors:
+# Eduardo Lima (Etrunko) <eblima(a)br.ibm.com>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# General Public License for more details.
+#
+# You should have received a copy of the GNU General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#
+
+#
+# CIMTest Filter Lists Create
+#
+
+import sys
+import helper
+
+import pywbem
+
+from CimTest.ReturnCodes import PASS, FAIL, XFAIL
+from CimTest.Globals import logger
+from XenKvmLib.const import do_main
+from XenKvmLib.vxml import get_class
+from VirtLib.utils import run_remote
+
+sup_types = ["KVM",]
+
+domain = None
+
+def get_filter_inst_and_inst_name(name):
+ try:
+ _filters = test.libvirt_filter_lists()
+ _id = test.id_for_filter_name(_filters, name)
+ except Exception, e:
+ # TODO: define a filter of our own
+ logger.error("'%s' filter list not found in libvirt:\n%s",
name, e)
+ raise
+
+ # Retrieve instance name for "clean-traffic"
+ try:
+ inst_name = test.FindInstanceName(name)
+ except Exception, e:
+ logger.error("'%s' filter list not found in libvirt-cim\n%s",
name, e)
+ raise
+
+ # Retrieve instance for "clean-traffic"
+ inst = test.GetInstance(inst_name)
+
+ if not inst:
+ logger.error("Unable to retrieve instance for '%s' filter
list", name)
+ raise Exception()
+ elif inst["InstanceID"] != _id:
+ logger.error("'%s' ids from libvirt and libvirt-cim differ",
name)
+ raise Exception()
+
+ return inst, inst_name
+# get_filter_inst_and_inst_name
+
+
+def create_filter_list(name):
+ # Get "clean-traffic" filter instance and instance name
+ clean, clean_name = get_filter_inst_and_inst_name("clean-traffic")
+
+ # Check if filter list already exist then delete it
+ try:
+ inst_name = test.FindInstanceName(name)
+ test.wbem.DeleteInstance(inst_name)
+ logger.info("Instance with name '%s' already exists.
Deleting.", name)
+ except:
+ logger.info("No previous Instance with name '%s' found.",
name)
+
+ # Create a new FilterList instance based on name parameter
+ global flist_name
+ logger.info("Creating FilterList '%s'", name)
+ flist_name = test.CreateFilterListInstance(name)
+ flist = test.GetInstance(flist_name)
+
+ # A NestedFilterList instance will add the "clean-traffic" filter
+ # as an entry of the newly created FilterList
+ logger.info("Creating NestedFilterList instance")
+ nested_name = test.CreateFilterListInstance(None, "KVM_NestedFilterList",
+ {"Antecedent":flist_name,
+ "Dependent":clean_name})
+
+ logger.info("Got NestedFilterList name '%s'", nested_name)
+ #nested = test.GetInstance(nested_name)
+ #logger.info("Got NestedFilterList '%s'", nested)
+
+ # Check if results match
+ _id, _name = [f for f in test.libvirt_filter_lists() if f[1] == name][0]
+ elements = test.libvirt_filter_dumpxml(_id)
+ filterref = [e for e in elements if e.tag == "filterref"][0]
+ if clean["Name"] != filterref.get("filter"):
+ raise Exception("NestedFilterList name and libvirt filter don't
match")
+
+ logger.info("NestedFilterList created successfuly")
+ return flist, flist_name
+# create_filter_list
+
+
+def get_nwport_inst_and_inst_name(domain_name):
+ try:
+ inst_name = test.FindInstanceName(domain_name, "SystemName",
+ "KVM_NetworkPort")
+ inst = test.GetInstance(inst_name)
+ except Exception, e:
+ logger.error("Unable to get NetworkPort instance name for '%s'
domain", domain_name)
+ raise
+
+ return inst, inst_name
+#get_nwport_inst_and_inst_name
+
+
+def cleanup():
+ try:
+ # Destroy filter list
+ test.wbem.DeleteInstance(flist_name)
+ except Exception, e:
+ logger.error("Error deleting filter list: %s", e)
+
+ try:
+ # Destroy domain
+ if domain:
+ domain.destroy()
+ domain.undefine()
+ except Exception, e:
+ logger.error("Error destroying domain: %s", e)
+# cleanup
+
+
+@do_main(sup_types)
+def main():
+ result = XFAIL
+ options = main.options
+
+ test_flist = "cimtest-filterlist"
+
+ global test
+ test = helper.FilterListTest(options.ip, options.virt)
+
+ try:
+ # Create a new FilterList instance
+ flist, flist_name = create_filter_list(test_flist)
+
+ # Create a new domain (VM)
+ domain_name = "cimtest-filterlist-domain"
+ global domain
+ domain = helper.CIMDomain(domain_name, test.virt, test.server)
+ domain.define()
+
+ # Get NetworkPort instance and instance name for defined domain
+ nwport, nwport_name = get_nwport_inst_and_inst_name(domain_name)
+
+ # An AppliedFilterList Instance will apply the filter to the network
+ # port of the defined domain
+ test.CreateFilterListInstance(None, "KVM_AppliedFilterList",
+ {"Antecedent":nwport_name,
+ "Dependent":flist_name})
+ except Exception, e:
+ logger.error("Caught exception: %s", e)
+ result = FAIL
+
+ # Check results
+
+ # Cleanup
+ cleanup()
+
+ # Leftovers?
+ try:
+ inst = test.FindInstance(test_flist)
+ logger.error("Leftovers in CIM FilterLists: %s", inst)
+ result = FAIL
+ except IndexError:
+ pass
+
+ try:
+ filt = [f for f in test.libvirt_filter_lists() if f[1] == test_flist][0]
+ logger.error("Leftovers in libvirt filters: %s", filt)
+ result = FAIL
+ except IndexError:
+ pass
+
+ return result
+# main
+
+if __name__ == "__main__":
+ sys.exit(main())
diff --git a/suites/libvirt-cim/cimtest/FilterList/helper.py
b/suites/libvirt-cim/cimtest/FilterList/helper.py
new file mode 100644
--- /dev/null
+++ b/suites/libvirt-cim/cimtest/FilterList/helper.py
@@ -0,0 +1,515 @@
+#!/usr/bin/env python
+#
+# Copyright 2011 IBM Corp.
+#
+# Authors:
+# Eduardo Lima (Etrunko) <eblima(a)br.ibm.com>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# General Public License for more details.
+#
+# You should have received a copy of the GNU General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#
+#
+
+import pywbem
+import libvirt
+
+import CimTest
+from CimTest.Globals import logger
+
+import XenKvmLib
+from XenKvmLib.xm_virt_util import virt2uri
+from XenKvmLib.classes import get_typed_class
+from XenKvmLib.vxml import get_class
+
+import VirtLib
+from VirtLib.utils import run_remote
+
+try:
+ from xml.etree import cElementTree as ElementTree
+except:
+ from xml.etree import ElementTree
+
+
+class BaseTestObject(object):
+
+ def __init__(self, server, virt):
+ self.server = server
+ self.virt = virt
+ self.typed_class = get_typed_class(virt, self.cim_basename)
+ self.uri = virt2uri(virt)
+ self.user = CimTest.Globals.CIM_USER
+ self.passwd = CimTest.Globals.CIM_PASS
+ self.namespace = CimTest.Globals.CIM_NS
+ self.wbem = pywbem.WBEMConnection("http://%s" % server,
+ (self.user, self.passwd),
+ self.namespace)
+ self.wbem.debug = True
+ # __init__
+
+ def EnumerateInstances(self, typed_class=None):
+ if not typed_class:
+ typed_class = self.typed_class
+
+ return self.wbem.EnumerateInstances(typed_class)
+ # EnumerateInstances
+
+ def EnumerateInstanceNames(self, typed_class=None):
+ if not typed_class:
+ typed_class = self.typed_class
+
+ return self.wbem.EnumerateInstanceNames(typed_class)
+ # EnumerateInstanceNames
+
+ def __assoc_args(self, assoc_class=None, result_class=None):
+ kargs = {}
+ if assoc_class:
+ kargs["AssocClass"] = assoc_class
+
+ if result_class:
+ kargs["ResultClass"] = result_class
+
+ return kargs
+ # __assoc_args
+
+ def Associators(self, typed_class=None, assoc_class=None, result_class=None):
+ if not typed_class:
+ typed_class = self.typed_class
+
+ kargs = self.__assoc_args(assoc_class, result_class)
+
+ if kargs:
+ return self.wbem.Associators(typed_class, **kargs)
+
+ return self.wbem.Associators(typed_class)
+ # Associators
+
+ def AssociatorNames(self, typed_class=None, assoc_class=None, result_class=None):
+ if not typed_class:
+ typed_class = self.typed_class
+
+ kargs = self.__assoc_args(assoc_class, result_class)
+
+ if kargs:
+ return self.wbem.Associators(typed_class, **kargs)
+
+ return self.wbem.AssociatorNamess(typed_class)
+ # AssociatorNames
+
+ def GetInstance(self, inst_name):
+ return self.wbem.GetInstance(inst_name)
+ # GetInstance
+
+ def FindInstance(self, inst_name, attr="Name", class_name=None):
+ if not class_name:
+ class_name = self.typed_class
+
+ if isinstance(inst_name, str) or isinstance(inst_name, unicode):
+ _insts = self.EnumerateInstances(class_name)
+ return [i for i in _insts if i[attr] == inst_name][0]
+
+ return self.GetInstance(self, inst_name)
+ # FindInstance
+
+ def FindInstanceName(self, _name, attr="Name", class_name=None):
+ if not class_name:
+ class_name = self.typed_class
+
+ _inst_names = self.EnumerateInstanceNames(class_name)
+ return [i for i in _inst_names if i[attr] == _name][0]
+ # FindInstanceName
+
+ def CreateFilterListInstance(self, name, class_name=None, props={}):
+ if not class_name:
+ class_name = self.typed_class
+
+ if name:
+ props["Name"] = name
+
+ logger.info("Creating Instance of %s", class_name)
+ inst = pywbem.CIMInstance(class_name, props)
+ return self.wbem.CreateInstance(inst)
+ # CreateFilterListInstance
+
+ def DumpWBEMDebug(self):
+ logger.info("*** Begin WBEM Debug ***")
+ logger.info(" * Last raw request\n'%s'",
self.wbem.last_raw_request)
+ logger.info(" * Last raw reply\n'%s'",
self.wbem.last_raw_reply)
+
+ logger.info(" * Last request\n'%s'", self.wbem.last_request)
+ logger.info(" * Last reply\n'%s'", self.wbem.last_reply)
+ logger.info("*** End WBEM Debug ***")
+ # DumpWBEMDebug
+# BaseTestObject
+
+
+class CIMDomain(object):
+
+ def __init__(self, name, virt, server):
+ self.name = name
+ self.server = server
+ self.virt = virt
+ self._domain = get_class(virt)(name)
+ #__init__
+
+ def define(self):
+ return self._domain.cim_define(self.server)
+ # define
+
+ def start(self):
+ return self._domain.cim_start(self.server)
+ # start
+
+ def shutdown(self):
+ return self._domain.cim_shutdown(self.server)
+ # shutdown
+
+ def undefine(self):
+ return self._domain.undefine(self.server)
+ # undefine
+
+ def destroy(self):
+ return self._domain.cim_destroy(self.server)
+ #destroy
+# CIMDomain
+
+
+class FilterListTest(BaseTestObject):
+ cim_basename = "FilterList"
+
+ def __init__(self, server, virt):
+ BaseTestObject.__init__(self, server, virt)
+ # __init__
+
+ def libvirt_filter_lists(self):
+ cmd = "virsh -q -c %s nwfilter-list 2>/dev/null" % self.uri
+ ret, filters = run_remote(self.server, cmd)
+ if ret:
+ logger.error("Error listing existing filters")
+ return None
+
+ filters = filters.split("\n")
+ l = []
+ for f in filters:
+ # Append a tuple of (id, name) to list
+ t = tuple(a for a in f.strip().split() if a)
+ l.append(t)
+
+ return l
+ # libvirt_filter_lists
+
+ def cim_filter_lists(self):
+ _instances = self.EnumerateInstances()
+ l = []
+ for i in _instances:
+ try:
+ # Append a tuple of (id, name) to list
+ l.append((i["InstanceId"], i["Name"]))
+ except KeyError:
+ logger.error("'InstanceID', 'Name' properties not
found in Instance %s", i)
+ return None
+
+ return l
+ # cim_filter_lists
+
+ def id_for_filter_name(self, _list, _name):
+ if isinstance(_list[0], tuple):
+ return [t[0] for t in _list if t[1] == _name][0]
+ elif isinstance(_list[0], CIMInstanceName):
+ return self.GetInstance(inst_name)["InstanceID"]
+ raise AttributeError("Expecting list of either tuple or
CIMInstanceName")
+ # id_for_filter_name
+
+ def name_for_filter_id(self, _list, _id):
+ if isinstance(_list[0], tuple):
+ return [t[1] for t in _list if t[0] == _id][0]
+ elif isinstance(_list[0], CIMInstance):
+ return [i for i in _list if i["InstanceID"] ==
_id][0]["Name"]
+ raise AttributeError("Expecting list of either tuple or CIMInstance")
+ # name_for_filter_id
+
+ def libvirt_filter_dumpxml(self, uuid):
+ cmd = "virsh -q -c %s nwfilter-dumpxml %s 2>/dev/null" % (self.uri,
uuid)
+ ret, out = run_remote(self.server, cmd)
+ if ret:
+ logger.error("Error executing nwfilter-dumpxml")
+ return None
+
+ # Remove all unecessary spaces and new lines
+ _xml = "".join([a.strip() for a in out.split("\n") if a])
+ return ElementTree.fromstring(_xml)
+ # libvirt_filter_dumpxml
+
+ def libvirt_entries_in_filter_lists(self):
+ filters = self.libvirt_filter_lists()
+
+ d = {}
+ for f in filters:
+ root = self.libvirt_filter_dumpxml(f[0])
+ if not root:
+ return None
+
+ d[f] = root
+
+ return d
+ # libvirt_entries_in_filter_lists
+
+ def cim_entries_in_filter_lists(self):
+ d = {}
+
+ _names = self.EnumerateInstanceNames()
+ for n in _names:
+ l = []
+ l.extend(self.Associators(n, result_class="CIM_FilterEntryBase"))
+ l.extend(self.Associators(n, assoc_class="KVM_NestedFilterList"))
+ d[n] = l
+
+ return d
+ # cim_entries_in_filter_lists
+
+ def libvirt_entries_in_filter_list(self, _name, _id=None):
+ _id_name = (_id, _name)
+
+ if not _id:
+ try:
+ _id_name = (self.id_for_filter_name(d.keys(), _name), _name)
+ except IndexError:
+ return None
+ elif not _name:
+ try:
+ _id_name = (_id, self.name_for_filter_id(d.keys(), _id))
+ except IndexError:
+ return None
+
+ return self.libvirt_filter_dumpxml(_id_name[0])
+ # libvirt_entries_in_filter_list
+
+ def cim_entries_in_filter_list(self, _name, _id=None):
+ _inst_name = None
+
+ if not _id:
+ try:
+ _inst_name = self.GetInstanceName(_name)
+ except IndexError:
+ return None
+ elif not _name:
+ try:
+ _inst = self.GetInstance(_id, "InstanceID")
+ _inst_name = self.GetInstanceName(_inst["Name"])
+ except IndexError:
+ return None
+
+ return self.Associators(_inst_name,
result_class="CIM_FilterEntryBase")
+ # cim_entries_in_filter_list
+# FilterListTest
+
+
+class FilterRule(object):
+
+ __directions = {"in" : "1",
+ "out" : "2",
+ "inout": "3",}
+
+ __versions = {"ip" : "4",
+ "ipv6": "6",}
+
+ __actions = {"accept" : "1",
+ "deny" : "2",
+ "drop" : "2",}
+
+ __baserule_map = {"action" : "Action",
+ "direction" : "Direction",
+ "priority" : "Priority",}
+
+ __iprule_map = {"version" : "HdrIPVersion",
+ "" : "HdrFlowLabel",
+ "srcipaddr" : "HdrSrcAddress",
+ "dstipaddr" : "HdrDestAddress",
+ "srcipmask" : "HdrSrcMask",
+ "dstipmask" : "HdrDestMask",
+ "srcipto" : "HdrSrcAddressEndOfRange",
+ "dstipto" : "HdrDestAddressEndOfRange",
+ "srcportstart": "HdrSrcPortStart",
+ "dstportstart": "HdrDestPortStart",
+ "srcportend" : "HdrSrcPortEnd",
+ "dstportend" : "HdrDestPortEnd",
+ "" : "HdrDSCP",
+ "" : "HdrProtocolID",}
+
+ __hdr8021rule_map = {"srcmacaddr": "HdrSrcMACAddr8021",
+ "dstmacaddr": "HdrDestMACAddr8021",
+ "srcmacmask": "HdrSrcMACMask8021",
+ "dstmacmask": "HdrDestMACMask8021",
+ "" : "HdrPriorityValue8021",
+ "" : "HdrVLANID8021",
+ "protocolid": "HdrProtocolID8021",}
+
+ ### FIXME Add to proper rule map
+ """
+ "": "HealthState",
+ "": "StatusDescriptions",
+ "": "Generation",
+ "": "CommunicationStatus",
+ "": "SystemName",
+ "": "DetailedStatus",
+ "": "Caption",
+ "": "OperationalStatus",
+ "": "SystemCreationClassName",
+ "": "Status",
+ "": "Description",
+ "": "InstallDate",
+ "": "CreationClassName",
+ "": "PrimaryStatus",
+ "": "ElementName",
+ "": "Name",
+ "": "IsNegated",
+ "": "InstanceID",
+ "": "OperatingStatus",
+ """
+
+ __basenames = {None : "FilterEntry",
+ "ip" : "IPHeadersFilter",
+ "ipv6": "IPHeadersFilter",
+ "tcp" : "IPHeadersFilter",
+ "udp" : "IPHeadersFilter",
+ "igmp": "IPHeadersFilter",
+ "icmp": "IPHeadersFilter",
+ "mac" : "Hdr8021Filter",
+ "arp" : "Hdr8021Filter",
+ "rarp": "Hdr8021Filter",}
+
+ __rulemaps = {"FilterEntry" : __baserule_map,
+ "IPHeadersFilter": dict(__baserule_map, **__iprule_map),
+ "Hdr8021Filter" : dict(__baserule_map,
**__hdr8021rule_map),}
+
+ def __init__(self, element):
+ self.__dict = element.attrib
+ self.__type = None
+
+ for e in element:
+ self.__dict = dict(self.__dict, **e.attrib)
+ if not self.__type:
+ self.__type = e.tag
+
+ try:
+ self.basename = self.__basenames[self.__type]
+ self.rulemap = self.__rulemaps[self.basename]
+ except KeyError:
+ self.basename = None
+ self.rulemap = None
+ # __init__
+
+ def __getattr__(self, key):
+ if key == "direction":
+ return self.__directions[self.__dict[key]]
+ elif key == "version":
+ if self.__type and "ip" in self.__type:
+ return self.__versions[self.__type]
+ elif key == "action":
+ return self.__actions[self.__dict[key]]
+ elif key == "type":
+ return self.__type
+
+ try:
+ return self.__dict[key]
+ except KeyError:
+ return None
+ # __getattr__
+
+ def __repr__(self):
+ return "FilterRule(type=%s, attributes=%s)" % (self.__type,
self.__dict)
+ # __repr__
+
+ def __addr_to_list(self, val, base, sep):
+ return [long(v, base) for v in val.split(sep)]
+ # __addr_to_list
+
+ def __cidr_to_list(self, val):
+ int_val = int(val, 10)
+ int_val = (0xffffffff >> (32 - int_val)) << (32 - int_val)
+ o1 = (int_val & 0xff000000) >> 24
+ o2 = (int_val & 0x00ff0000) >> 16
+ o3 = (int_val & 0x0000ff00) >> 8
+ o4 = int_val & 0x000000ff
+ return [o1, o2, o3, o4]
+ # __cidr_to_list
+
+ def matches(self, instance):
+ # Classname
+ if not self.basename or self.basename not in instance.classname:
+ return (False, "Classname '%s' does not match instance
'%s'" % (self.basename, instance.classname))
+
+ # IP Version
+ if self.version:
+ prop_name = self.rulemap["version"]
+ try:
+ inst_version = str(instance[prop_name])
+ except KeyError:
+ inst_version = None
+
+ if self.version != inst_version:
+ return (False, "IP version '%s' does not match instance
'%s'" % (self.version, inst_version))
+
+ # Other properties
+ for key in self.__dict:
+ try:
+ inst_key = self.rulemap[key]
+ except KeyError:
+ inst_key = None
+
+ if not inst_key:
+ # logger.info("No match for rule attribute '%s'", key)
+ continue
+
+ # convert the property value to string
+ prop = instance.properties[inst_key]
+ val = self.__getattr__(key)
+ if val.startswith("0x"):
+ inst_val = hex(int(prop.value))
+ else:
+ inst_val = str(prop.value)
+
+ # Handle special cases
+ if inst_val != "None":
+ # Netmask?
+ if "mask" in key:
+ if "." in val:
+ val = self.__addr_to_list(val, base, sep)
+ else:
+ # Assume CIDR
+ val = self.__cidr_to_list(val)
+ inst_val = prop.value
+ # Address?
+ elif "addr" in key:
+ if val.startswith("$"):
+ # Can't translate address starting with '$'
+ logger.info("Assuming matching address for '%s:%s'
and '%s:%s'", key, val, inst_key,inst_val)
+ continue
+ elif prop.is_array and prop.value:
+ sep = "."
+ base = 10
+ if ":" in val:
+ sep = ":"
+ base = 16
+
+ val = self.__addr_to_list(val, base, sep)
+ inst_val = prop.value
+ # if inst_val != None
+
+ if inst_val != val:
+ return (False, "Values for '%s':'%s' and
'%s':'%s' don't match" % (key, val, inst_key, inst_val))
+
+ return (True, "Found matching CIM Instance: %s" % instance)
+ # matches
+# FilterRule
+