
suites/libvirt-cim/cimtest/FilterList/01_enum.py | 71 ++ suites/libvirt-cim/cimtest/FilterList/02_assoc.py | 158 ++++++ suites/libvirt-cim/cimtest/FilterList/03_create.py | 199 ++++++++ suites/libvirt-cim/cimtest/FilterList/helper.py | 515 +++++++++++++++++++++ 4 files changed, 943 insertions(+), 0 deletions(-) # HG changeset patch # User Eduardo Lima (Etrunko) <eblima@br.ibm.com> # Date 1310498051 10800 # Node ID d9741a8b5eb7ccebf21d69f3cde72729bb60ad22 # Parent 779871660a3583dedf1652773196d07093ff26ff [TEST] Adding tests for FilterLists helper.py: - Helper module with classes and functions used by all tests 01_enum.py: - Enumerate FilterList instances and compare to results from libvirt 02_assoc.py: - For each FilterList instance, call Associators to get respective FilterEntries instances and compare to results from libvirt 03_create.py: - Creates a new FilterList instance and a NestedFilterList associated to this new instance. Creates a new domain and a new AppliedFilterList instance associated to the NetworkPort instance of that new domain. Signed-off-by: Eduardo Lima (Etrunko) <eblima@br.ibm.com> diff --git a/suites/libvirt-cim/cimtest/FilterList/01_enum.py b/suites/libvirt-cim/cimtest/FilterList/01_enum.py new file mode 100644 --- /dev/null +++ b/suites/libvirt-cim/cimtest/FilterList/01_enum.py @@ -0,0 +1,71 @@ +#!/usr/bin/env python +# +# Copyright 2011 IBM Corp. +# +# Authors: +# Eduardo Lima (Etrunko) <eblima@br.ibm.com> +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# + +# +# CIMTest Filter Lists Enumerate +# + +import sys +import helper + +from CimTest.ReturnCodes import PASS, FAIL +from CimTest.Globals import logger +from XenKvmLib.const import do_main + +sup_types = ["KVM",] + +@do_main(sup_types) +def main(): + options = main.options + + _test = helper.FilterListTest(options.ip, options.virt) + + # Fetch current filters with libvirt + libvirt_filters = _test.libvirt_filter_lists() + if not libvirt_filters: + return FAIL + + logger.info("libvirt filters:\n%s", libvirt_filters) + + # Fetch current filters with libvirt-cim + cim_filters = _test.cim_filter_lists() + if not cim_filters: + # TODO: Add some filters of our own + return FAIL + + logger.info("libvirt-cim filters:\n%s", cim_filters) + + # Compare results + if len(libvirt_filters) != len(cim_filters): + logger.error("CIM filters list length is different than libvirt filters list") + return FAIL + + for f in libvirt_filters: + if f not in cim_filters: + logger.error("Filter %s, not found in CIM filters list", f) + return FAIL + + return PASS +# main + +if __name__ == "__main__": + sys.exit(main()) diff --git a/suites/libvirt-cim/cimtest/FilterList/02_assoc.py b/suites/libvirt-cim/cimtest/FilterList/02_assoc.py new file mode 100644 --- /dev/null +++ b/suites/libvirt-cim/cimtest/FilterList/02_assoc.py @@ -0,0 +1,158 @@ +#!/usr/bin/env python +# +# Copyright 2011 IBM Corp. +# +# Authors: +# Eduardo Lima (Etrunko) <eblima@br.ibm.com> +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# + +# +# CIMTest Filter Lists Associators +# + +import sys +import helper + +from CimTest.ReturnCodes import PASS, FAIL, XFAIL +from CimTest.Globals import logger +from XenKvmLib.const import do_main + +sup_types = ["KVM",] + +@do_main(sup_types) +def main(): + options = main.options + + _test = helper.FilterListTest(options.ip, options.virt) + + # Fetch current filters with libvirt + libvirt_filters = _test.libvirt_entries_in_filter_lists() + if not libvirt_filters: + return FAIL + + #logger.info("libvirt filters:\n%s", libvirt_filters) + + # Fetch current filters with libvirt-cim + cim_filters = _test.cim_entries_in_filter_lists() + if not cim_filters: + return FAIL + + #logger.info("libvirt-cim filters:\n%s", cim_filters) + + # Compare results + if len(libvirt_filters) != len(cim_filters): + logger.error("CIM filters list length and libvirt filters list differ") + return FAIL + + # Compare each result + for inst_name in cim_filters: + _cim_f = _test.GetInstance(inst_name) + try: + _key = (_cim_f["InstanceID"], _cim_f["Name"]) + _vir_f = libvirt_filters[_key] + except KeyError, e: + logger.error(e) + return FAIL + + logger.info("") + logger.info("Processing '%s' filter", _key[1]) + + # Check number of rules + n_vir_rules = len([e for e in _vir_f.getchildren() if e.tag in ["rule", "filterref"]]) + + # process each element + instances = cim_filters[inst_name] + n_cim_rules = len(instances) + + if n_cim_rules != n_vir_rules: + logger.error("Number of rules returned by libvirt (%d) and libvirt-cim (%d) differ", + n_vir_rules, n_cim_rules) + return FAIL + + + # TODO: Create a new class to handle the filter parsing + def parse_filter(element, spaces=""): + logger.info("%s%s(%s)%s", + spaces, + element.tag, + element.attrib and element.attrib or "", + element.text and ": %s" % element.text or "") + + # Recurse to last element in tree + for e in element: + parse_filter(e, "%s " % spaces) + + if element.tag == "filterref": + name = element.get("filter") + try: + i = [inst for inst in instances if inst["Name"] == name][0] + logger.info("%s* MATCH: Instance: %s", spaces, i) + instances.remove(i) + return + except: + raise Exception("No CIM Instance matching this rule was found") + elif element.tag == "rule": + if not instances: + raise Exception("No CIM Instance matching this rule was found") + + rule = helper.FilterRule(element) + + # Find matching instance + logger.info("%s* %s", spaces, rule) + for i in instances: + props = "" + for p in i.properties.keys(): + props = "%s '%s':'%s'" % (props, p, i[p]) + logger.info("%s* %s(%s)", spaces, i.classname, props) + + matches, msg = rule.matches(i) + if msg: + logger.info("%s* %s: %s", spaces, matches and "MATCH" or "DON'T MATCH", msg) + + if matches: + instances.remove(i) + return + + # No matching instance + raise Exception("No CIM instance matching rule found") + else: + # Unexpected tag, ignore by now + pass + # parse_filter + + try: + parse_filter(_vir_f) + except Exception, e: + logger.error("Error parsing filter '%s': %s", _vir_f.tag, e) + return FAIL + + # Check for leftovers + for i in instances: + props = "" + for p in i.properties.keys(): + props = "%s '%s':'%s'" % (props, p, i[p]) + + logger.error("Could NOT find match for instance %s : {%s}", i.classname, props) + return FAIL + # end for inst_name in cim_filters + + logger.info("====End of test====") + return PASS +# main + +if __name__ == "__main__": + sys.exit(main()) diff --git a/suites/libvirt-cim/cimtest/FilterList/03_create.py b/suites/libvirt-cim/cimtest/FilterList/03_create.py new file mode 100644 --- /dev/null +++ b/suites/libvirt-cim/cimtest/FilterList/03_create.py @@ -0,0 +1,199 @@ +#!/usr/bin/env python +# +# Copyright 2011 IBM Corp. +# +# Authors: +# Eduardo Lima (Etrunko) <eblima@br.ibm.com> +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# + +# +# CIMTest Filter Lists Create +# + +import sys +import helper + +import pywbem + +from CimTest.ReturnCodes import PASS, FAIL, XFAIL +from CimTest.Globals import logger +from XenKvmLib.const import do_main +from XenKvmLib.vxml import get_class +from VirtLib.utils import run_remote + +sup_types = ["KVM",] + +domain = None + +def get_filter_inst_and_inst_name(name): + try: + _filters = test.libvirt_filter_lists() + _id = test.id_for_filter_name(_filters, name) + except Exception, e: + # TODO: define a filter of our own + logger.error("'%s' filter list not found in libvirt:\n%s", name, e) + raise + + # Retrieve instance name for "clean-traffic" + try: + inst_name = test.FindInstanceName(name) + except Exception, e: + logger.error("'%s' filter list not found in libvirt-cim\n%s", name, e) + raise + + # Retrieve instance for "clean-traffic" + inst = test.GetInstance(inst_name) + + if not inst: + logger.error("Unable to retrieve instance for '%s' filter list", name) + raise Exception() + elif inst["InstanceID"] != _id: + logger.error("'%s' ids from libvirt and libvirt-cim differ", name) + raise Exception() + + return inst, inst_name +# get_filter_inst_and_inst_name + + +def create_filter_list(name): + # Get "clean-traffic" filter instance and instance name + clean, clean_name = get_filter_inst_and_inst_name("clean-traffic") + + # Check if filter list already exist then delete it + try: + inst_name = test.FindInstanceName(name) + test.wbem.DeleteInstance(inst_name) + logger.info("Instance with name '%s' already exists. Deleting.", name) + except: + logger.info("No previous Instance with name '%s' found.", name) + + # Create a new FilterList instance based on name parameter + global flist_name + logger.info("Creating FilterList '%s'", name) + flist_name = test.CreateFilterListInstance(name) + flist = test.GetInstance(flist_name) + + # A NestedFilterList instance will add the "clean-traffic" filter + # as an entry of the newly created FilterList + logger.info("Creating NestedFilterList instance") + nested_name = test.CreateFilterListInstance(None, "KVM_NestedFilterList", + {"Antecedent":flist_name, + "Dependent":clean_name}) + + logger.info("Got NestedFilterList name '%s'", nested_name) + #nested = test.GetInstance(nested_name) + #logger.info("Got NestedFilterList '%s'", nested) + + # Check if results match + _id, _name = [f for f in test.libvirt_filter_lists() if f[1] == name][0] + elements = test.libvirt_filter_dumpxml(_id) + filterref = [e for e in elements if e.tag == "filterref"][0] + if clean["Name"] != filterref.get("filter"): + raise Exception("NestedFilterList name and libvirt filter don't match") + + logger.info("NestedFilterList created successfuly") + return flist, flist_name +# create_filter_list + + +def get_nwport_inst_and_inst_name(domain_name): + try: + inst_name = test.FindInstanceName(domain_name, "SystemName", + "KVM_NetworkPort") + inst = test.GetInstance(inst_name) + except Exception, e: + logger.error("Unable to get NetworkPort instance name for '%s' domain", domain_name) + raise + + return inst, inst_name +#get_nwport_inst_and_inst_name + + +def cleanup(): + try: + # Destroy filter list + test.wbem.DeleteInstance(flist_name) + except Exception, e: + logger.error("Error deleting filter list: %s", e) + + try: + # Destroy domain + if domain: + domain.destroy() + domain.undefine() + except Exception, e: + logger.error("Error destroying domain: %s", e) +# cleanup + + +@do_main(sup_types) +def main(): + result = XFAIL + options = main.options + + test_flist = "cimtest-filterlist" + + global test + test = helper.FilterListTest(options.ip, options.virt) + + try: + # Create a new FilterList instance + flist, flist_name = create_filter_list(test_flist) + + # Create a new domain (VM) + domain_name = "cimtest-filterlist-domain" + global domain + domain = helper.CIMDomain(domain_name, test.virt, test.server) + domain.define() + + # Get NetworkPort instance and instance name for defined domain + nwport, nwport_name = get_nwport_inst_and_inst_name(domain_name) + + # An AppliedFilterList Instance will apply the filter to the network + # port of the defined domain + test.CreateFilterListInstance(None, "KVM_AppliedFilterList", + {"Antecedent":nwport_name, + "Dependent":flist_name}) + except Exception, e: + logger.error("Caught exception: %s", e) + result = FAIL + + # Check results + + # Cleanup + cleanup() + + # Leftovers? + try: + inst = test.FindInstance(test_flist) + logger.error("Leftovers in CIM FilterLists: %s", inst) + result = FAIL + except IndexError: + pass + + try: + filt = [f for f in test.libvirt_filter_lists() if f[1] == test_flist][0] + logger.error("Leftovers in libvirt filters: %s", filt) + result = FAIL + except IndexError: + pass + + return result +# main + +if __name__ == "__main__": + sys.exit(main()) diff --git a/suites/libvirt-cim/cimtest/FilterList/helper.py b/suites/libvirt-cim/cimtest/FilterList/helper.py new file mode 100644 --- /dev/null +++ b/suites/libvirt-cim/cimtest/FilterList/helper.py @@ -0,0 +1,515 @@ +#!/usr/bin/env python +# +# Copyright 2011 IBM Corp. +# +# Authors: +# Eduardo Lima (Etrunko) <eblima@br.ibm.com> +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# + +import pywbem +import libvirt + +import CimTest +from CimTest.Globals import logger + +import XenKvmLib +from XenKvmLib.xm_virt_util import virt2uri +from XenKvmLib.classes import get_typed_class +from XenKvmLib.vxml import get_class + +import VirtLib +from VirtLib.utils import run_remote + +try: + from xml.etree import cElementTree as ElementTree +except: + from xml.etree import ElementTree + + +class BaseTestObject(object): + + def __init__(self, server, virt): + self.server = server + self.virt = virt + self.typed_class = get_typed_class(virt, self.cim_basename) + self.uri = virt2uri(virt) + self.user = CimTest.Globals.CIM_USER + self.passwd = CimTest.Globals.CIM_PASS + self.namespace = CimTest.Globals.CIM_NS + self.wbem = pywbem.WBEMConnection("http://%s" % server, + (self.user, self.passwd), + self.namespace) + self.wbem.debug = True + # __init__ + + def EnumerateInstances(self, typed_class=None): + if not typed_class: + typed_class = self.typed_class + + return self.wbem.EnumerateInstances(typed_class) + # EnumerateInstances + + def EnumerateInstanceNames(self, typed_class=None): + if not typed_class: + typed_class = self.typed_class + + return self.wbem.EnumerateInstanceNames(typed_class) + # EnumerateInstanceNames + + def __assoc_args(self, assoc_class=None, result_class=None): + kargs = {} + if assoc_class: + kargs["AssocClass"] = assoc_class + + if result_class: + kargs["ResultClass"] = result_class + + return kargs + # __assoc_args + + def Associators(self, typed_class=None, assoc_class=None, result_class=None): + if not typed_class: + typed_class = self.typed_class + + kargs = self.__assoc_args(assoc_class, result_class) + + if kargs: + return self.wbem.Associators(typed_class, **kargs) + + return self.wbem.Associators(typed_class) + # Associators + + def AssociatorNames(self, typed_class=None, assoc_class=None, result_class=None): + if not typed_class: + typed_class = self.typed_class + + kargs = self.__assoc_args(assoc_class, result_class) + + if kargs: + return self.wbem.Associators(typed_class, **kargs) + + return self.wbem.AssociatorNamess(typed_class) + # AssociatorNames + + def GetInstance(self, inst_name): + return self.wbem.GetInstance(inst_name) + # GetInstance + + def FindInstance(self, inst_name, attr="Name", class_name=None): + if not class_name: + class_name = self.typed_class + + if isinstance(inst_name, str) or isinstance(inst_name, unicode): + _insts = self.EnumerateInstances(class_name) + return [i for i in _insts if i[attr] == inst_name][0] + + return self.GetInstance(self, inst_name) + # FindInstance + + def FindInstanceName(self, _name, attr="Name", class_name=None): + if not class_name: + class_name = self.typed_class + + _inst_names = self.EnumerateInstanceNames(class_name) + return [i for i in _inst_names if i[attr] == _name][0] + # FindInstanceName + + def CreateFilterListInstance(self, name, class_name=None, props={}): + if not class_name: + class_name = self.typed_class + + if name: + props["Name"] = name + + logger.info("Creating Instance of %s", class_name) + inst = pywbem.CIMInstance(class_name, props) + return self.wbem.CreateInstance(inst) + # CreateFilterListInstance + + def DumpWBEMDebug(self): + logger.info("*** Begin WBEM Debug ***") + logger.info(" * Last raw request\n'%s'", self.wbem.last_raw_request) + logger.info(" * Last raw reply\n'%s'", self.wbem.last_raw_reply) + + logger.info(" * Last request\n'%s'", self.wbem.last_request) + logger.info(" * Last reply\n'%s'", self.wbem.last_reply) + logger.info("*** End WBEM Debug ***") + # DumpWBEMDebug +# BaseTestObject + + +class CIMDomain(object): + + def __init__(self, name, virt, server): + self.name = name + self.server = server + self.virt = virt + self._domain = get_class(virt)(name) + #__init__ + + def define(self): + return self._domain.cim_define(self.server) + # define + + def start(self): + return self._domain.cim_start(self.server) + # start + + def shutdown(self): + return self._domain.cim_shutdown(self.server) + # shutdown + + def undefine(self): + return self._domain.undefine(self.server) + # undefine + + def destroy(self): + return self._domain.cim_destroy(self.server) + #destroy +# CIMDomain + + +class FilterListTest(BaseTestObject): + cim_basename = "FilterList" + + def __init__(self, server, virt): + BaseTestObject.__init__(self, server, virt) + # __init__ + + def libvirt_filter_lists(self): + cmd = "virsh -q -c %s nwfilter-list 2>/dev/null" % self.uri + ret, filters = run_remote(self.server, cmd) + if ret: + logger.error("Error listing existing filters") + return None + + filters = filters.split("\n") + l = [] + for f in filters: + # Append a tuple of (id, name) to list + t = tuple(a for a in f.strip().split() if a) + l.append(t) + + return l + # libvirt_filter_lists + + def cim_filter_lists(self): + _instances = self.EnumerateInstances() + l = [] + for i in _instances: + try: + # Append a tuple of (id, name) to list + l.append((i["InstanceId"], i["Name"])) + except KeyError: + logger.error("'InstanceID', 'Name' properties not found in Instance %s", i) + return None + + return l + # cim_filter_lists + + def id_for_filter_name(self, _list, _name): + if isinstance(_list[0], tuple): + return [t[0] for t in _list if t[1] == _name][0] + elif isinstance(_list[0], CIMInstanceName): + return self.GetInstance(inst_name)["InstanceID"] + raise AttributeError("Expecting list of either tuple or CIMInstanceName") + # id_for_filter_name + + def name_for_filter_id(self, _list, _id): + if isinstance(_list[0], tuple): + return [t[1] for t in _list if t[0] == _id][0] + elif isinstance(_list[0], CIMInstance): + return [i for i in _list if i["InstanceID"] == _id][0]["Name"] + raise AttributeError("Expecting list of either tuple or CIMInstance") + # name_for_filter_id + + def libvirt_filter_dumpxml(self, uuid): + cmd = "virsh -q -c %s nwfilter-dumpxml %s 2>/dev/null" % (self.uri, uuid) + ret, out = run_remote(self.server, cmd) + if ret: + logger.error("Error executing nwfilter-dumpxml") + return None + + # Remove all unecessary spaces and new lines + _xml = "".join([a.strip() for a in out.split("\n") if a]) + return ElementTree.fromstring(_xml) + # libvirt_filter_dumpxml + + def libvirt_entries_in_filter_lists(self): + filters = self.libvirt_filter_lists() + + d = {} + for f in filters: + root = self.libvirt_filter_dumpxml(f[0]) + if not root: + return None + + d[f] = root + + return d + # libvirt_entries_in_filter_lists + + def cim_entries_in_filter_lists(self): + d = {} + + _names = self.EnumerateInstanceNames() + for n in _names: + l = [] + l.extend(self.Associators(n, result_class="CIM_FilterEntryBase")) + l.extend(self.Associators(n, assoc_class="KVM_NestedFilterList")) + d[n] = l + + return d + # cim_entries_in_filter_lists + + def libvirt_entries_in_filter_list(self, _name, _id=None): + _id_name = (_id, _name) + + if not _id: + try: + _id_name = (self.id_for_filter_name(d.keys(), _name), _name) + except IndexError: + return None + elif not _name: + try: + _id_name = (_id, self.name_for_filter_id(d.keys(), _id)) + except IndexError: + return None + + return self.libvirt_filter_dumpxml(_id_name[0]) + # libvirt_entries_in_filter_list + + def cim_entries_in_filter_list(self, _name, _id=None): + _inst_name = None + + if not _id: + try: + _inst_name = self.GetInstanceName(_name) + except IndexError: + return None + elif not _name: + try: + _inst = self.GetInstance(_id, "InstanceID") + _inst_name = self.GetInstanceName(_inst["Name"]) + except IndexError: + return None + + return self.Associators(_inst_name, result_class="CIM_FilterEntryBase") + # cim_entries_in_filter_list +# FilterListTest + + +class FilterRule(object): + + __directions = {"in" : "1", + "out" : "2", + "inout": "3",} + + __versions = {"ip" : "4", + "ipv6": "6",} + + __actions = {"accept" : "1", + "deny" : "2", + "drop" : "2",} + + __baserule_map = {"action" : "Action", + "direction" : "Direction", + "priority" : "Priority",} + + __iprule_map = {"version" : "HdrIPVersion", + "" : "HdrFlowLabel", + "srcipaddr" : "HdrSrcAddress", + "dstipaddr" : "HdrDestAddress", + "srcipmask" : "HdrSrcMask", + "dstipmask" : "HdrDestMask", + "srcipto" : "HdrSrcAddressEndOfRange", + "dstipto" : "HdrDestAddressEndOfRange", + "srcportstart": "HdrSrcPortStart", + "dstportstart": "HdrDestPortStart", + "srcportend" : "HdrSrcPortEnd", + "dstportend" : "HdrDestPortEnd", + "" : "HdrDSCP", + "" : "HdrProtocolID",} + + __hdr8021rule_map = {"srcmacaddr": "HdrSrcMACAddr8021", + "dstmacaddr": "HdrDestMACAddr8021", + "srcmacmask": "HdrSrcMACMask8021", + "dstmacmask": "HdrDestMACMask8021", + "" : "HdrPriorityValue8021", + "" : "HdrVLANID8021", + "protocolid": "HdrProtocolID8021",} + + ### FIXME Add to proper rule map + """ + "": "HealthState", + "": "StatusDescriptions", + "": "Generation", + "": "CommunicationStatus", + "": "SystemName", + "": "DetailedStatus", + "": "Caption", + "": "OperationalStatus", + "": "SystemCreationClassName", + "": "Status", + "": "Description", + "": "InstallDate", + "": "CreationClassName", + "": "PrimaryStatus", + "": "ElementName", + "": "Name", + "": "IsNegated", + "": "InstanceID", + "": "OperatingStatus", + """ + + __basenames = {None : "FilterEntry", + "ip" : "IPHeadersFilter", + "ipv6": "IPHeadersFilter", + "tcp" : "IPHeadersFilter", + "udp" : "IPHeadersFilter", + "igmp": "IPHeadersFilter", + "icmp": "IPHeadersFilter", + "mac" : "Hdr8021Filter", + "arp" : "Hdr8021Filter", + "rarp": "Hdr8021Filter",} + + __rulemaps = {"FilterEntry" : __baserule_map, + "IPHeadersFilter": dict(__baserule_map, **__iprule_map), + "Hdr8021Filter" : dict(__baserule_map, **__hdr8021rule_map),} + + def __init__(self, element): + self.__dict = element.attrib + self.__type = None + + for e in element: + self.__dict = dict(self.__dict, **e.attrib) + if not self.__type: + self.__type = e.tag + + try: + self.basename = self.__basenames[self.__type] + self.rulemap = self.__rulemaps[self.basename] + except KeyError: + self.basename = None + self.rulemap = None + # __init__ + + def __getattr__(self, key): + if key == "direction": + return self.__directions[self.__dict[key]] + elif key == "version": + if self.__type and "ip" in self.__type: + return self.__versions[self.__type] + elif key == "action": + return self.__actions[self.__dict[key]] + elif key == "type": + return self.__type + + try: + return self.__dict[key] + except KeyError: + return None + # __getattr__ + + def __repr__(self): + return "FilterRule(type=%s, attributes=%s)" % (self.__type, self.__dict) + # __repr__ + + def __addr_to_list(self, val, base, sep): + return [long(v, base) for v in val.split(sep)] + # __addr_to_list + + def __cidr_to_list(self, val): + int_val = int(val, 10) + int_val = (0xffffffff >> (32 - int_val)) << (32 - int_val) + o1 = (int_val & 0xff000000) >> 24 + o2 = (int_val & 0x00ff0000) >> 16 + o3 = (int_val & 0x0000ff00) >> 8 + o4 = int_val & 0x000000ff + return [o1, o2, o3, o4] + # __cidr_to_list + + def matches(self, instance): + # Classname + if not self.basename or self.basename not in instance.classname: + return (False, "Classname '%s' does not match instance '%s'" % (self.basename, instance.classname)) + + # IP Version + if self.version: + prop_name = self.rulemap["version"] + try: + inst_version = str(instance[prop_name]) + except KeyError: + inst_version = None + + if self.version != inst_version: + return (False, "IP version '%s' does not match instance '%s'" % (self.version, inst_version)) + + # Other properties + for key in self.__dict: + try: + inst_key = self.rulemap[key] + except KeyError: + inst_key = None + + if not inst_key: + # logger.info("No match for rule attribute '%s'", key) + continue + + # convert the property value to string + prop = instance.properties[inst_key] + val = self.__getattr__(key) + if val.startswith("0x"): + inst_val = hex(int(prop.value)) + else: + inst_val = str(prop.value) + + # Handle special cases + if inst_val != "None": + # Netmask? + if "mask" in key: + if "." in val: + val = self.__addr_to_list(val, base, sep) + else: + # Assume CIDR + val = self.__cidr_to_list(val) + inst_val = prop.value + # Address? + elif "addr" in key: + if val.startswith("$"): + # Can't translate address starting with '$' + logger.info("Assuming matching address for '%s:%s' and '%s:%s'", key, val, inst_key,inst_val) + continue + elif prop.is_array and prop.value: + sep = "." + base = 10 + if ":" in val: + sep = ":" + base = 16 + + val = self.__addr_to_list(val, base, sep) + inst_val = prop.value + # if inst_val != None + + if inst_val != val: + return (False, "Values for '%s':'%s' and '%s':'%s' don't match" % (key, val, inst_key, inst_val)) + + return (True, "Found matching CIM Instance: %s" % instance) + # matches +# FilterRule +