The network_dhcp_leases.py uses DHCPLeases() to validate new API
virNetworkGetDHCPLeases of libvirt.
---
repos/network/network_dhcp_leases.py | 277 +++++++++++++++++++++++++++++++++++
1 file changed, 277 insertions(+)
create mode 100644 repos/network/network_dhcp_leases.py
diff --git a/repos/network/network_dhcp_leases.py b/repos/network/network_dhcp_leases.py
new file mode 100644
index 0000000..29ee529
--- /dev/null
+++ b/repos/network/network_dhcp_leases.py
@@ -0,0 +1,277 @@
+#!/usr/bin/env python
+#test DHCPLeases() API for libvirt
+
+import os
+import time
+import libvirt
+from libvirt import libvirtError
+from utils import utils
+from src import sharedmod
+
+required_params = ('networkname',)
+optional_params = {'macaddr': ''}
+
+LEASE_FILE = "/var/lib/libvirt/dnsmasq/"
+
+def check_ip(ipaddr, logger):
+ """
+ return a string according to ip address type, return 'ipv4' for ipv4,
+ return 'ipv6' for ipv6, return False for others
+ """
+ addr4 = ipaddr.strip().split('.')
+ addr6 = ipaddr.strip().split(':')
+ if len(addr4) == 4:
+ iptype = "ipv4"
+ elif len(addr6) == 6:
+ iptype = "ipv6"
+ else:
+ return False
+ return iptype
+
+def get_network_type(ipaddr,logger):
+ """
+ return 0 or 1 for ipv4/ipv6, this function will be used in
+ check_ipv4_values()/check_ipv6_values()
+ """
+ if check_ip(ipaddr, logger) == "ipv4":
+ return 0
+ elif check_ip(ipaddr, logger) == "ipv6":
+ return 1
+
+def get_bridge_name(network,logger):
+ """
+ get bridge name under specified network from specified network conf
+ """
+ CONF_NETWORK = LEASE_FILE + network + ".conf"
+ GREP_BRIDGE = "grep \"^interface=\" %s | awk -F\"=\"
'{print $2}'"
+ status, output = utils.exec_cmd(GREP_BRIDGE % CONF_NETWORK, shell=True)
+ if not status:
+ pass
+ else:
+ logger.error("\"" + GREP_BRIDGE + "\"" +
"error")
+ logger.error(output)
+ return False
+ return output[0]
+
+def get_ip_prefix(network, iptype, logger):
+ """
+ get ip prefix according to IP type
+ """
+ br = get_bridge_name(network, logger)
+ PREFIX = "ip -4 -o ad show %s | awk '{print $4}'|awk -F\"/\"
'{print $2}'"
+ PREFIX_6 = "ip -6 -o ad show %s|awk '{print $4}'|awk -F\"/\"
'{print $2}'"
+ if iptype == "ipv4":
+ status, output = utils.exec_cmd(PREFIX % br, shell=True)
+ elif iptype == "ipv6":
+ status, output = utils.exec_cmd(PREFIX_6 % br, shell=True)
+ if not status:
+ pass
+ else:
+ logger.error("\"" + GREP_BRIDGE + "\"" +
"error")
+ logger.error(output)
+ return False
+ return output[0]
+
+def get_info_from_dnsmasq(network,macaddr,logger):
+ """
+ generate dict for lease info from virtual network's lease file
+ """
+ title =
['expirytime','mac','ipaddr','hostname','clientid']
+ output_list = []
+ lease_dnsmasq = []
+ temp = []
+ remove_list = []
+ GREP_MAC = "grep -w %s" + " " + LEASE_FILE_DNSMASQ
+ CAT_FILE = "cat" + " " + LEASE_FILE_DNSMASQ
+
+ status, output = utils.exec_cmd(CAT_FILE, shell=True)
+ if not status:
+ for i in range(0, len(output)):
+ output_list = []
+ output_str = output[i]
+ for item in output_str.split(" "):
+ output_list.append(item)
+ lease_dnsmasq.append(dict(zip(title,output_list)))
+
+ #due to no mac field in IPv6 line, so do nothing here temporarily.
+ if macaddr != None:
+ pass
+
+ #remove bridge duid line
+ for i in range(0, len(lease_dnsmasq)):
+ if lease_dnsmasq[i]['expirytime'] == 'duid':
+ remove_list.append(lease_dnsmasq[i])
+
+ for i in range(0, len(remove_list)):
+ lease_dnsmasq.remove(remove_list[i])
+
+ #remove expiry leases
+ for i in range(0, len(lease_dnsmasq)):
+ temp = int(lease_dnsmasq[i]['expirytime'])
+ lease_dnsmasq[i]['expirytime'] = temp
+
+ remove_list = []
+ for i in range(0, len(lease_dnsmasq)):
+ if time.time() >= int(lease_dnsmasq[i]['expirytime']):
+ remove_list.append(lease_dnsmasq[i])
+
+ for i in range(0, len(remove_list)):
+ lease_dnsmasq.remove(remove_list[i])
+
+ #replace * to None
+ for i in range(0, len(lease_dnsmasq)):
+ if lease_dnsmasq[i]['hostname'] == "*":
+ lease_dnsmasq[i]['hostname'] = None
+ if lease_dnsmasq[i]['clientid'] == "*":
+ lease_dnsmasq[i]['clientid'] = None
+
+ return lease_dnsmasq
+ else:
+ logger.error("\"" + CAT_FILE + "\"" +
"error")
+ logger.error(output)
+ return False
+
+def compare_values(op1, op2, network, iptype, logger):
+ """
+ check all printed values from API
+ """
+ dnsmasq = op1
+ api = op2
+ temp = int(api['expirytime'])
+ api['expirytime'] = temp
+
+ for j in range(0,len(dnsmasq)):
+ if dnsmasq[j]['hostname'] == api['hostname'] and \
+ dnsmasq[j]['expirytime'] == api['expirytime']:
+ if dnsmasq[j]['ipaddr'] == api['ipaddr'] and \
+ dnsmasq[j]['clientid'] == api['clientid']:
+
+ if iptype == "ipv4":
+ logger.debug("PASS: hostname: %s expirytime: %s ipaddr: %s"
\
+ %
(api['hostname'],api['expirytime'],api['ipaddr']))
+ logger.debug("Unsupported: clientid: %s in IPv4" \
+ % (api['clientid']))
+ elif iptype == "ipv6":
+ logger.debug("PASS: hostname: %s expirytime: %s ipaddr: %s \
+clientid: %s" %
(api['hostname'],api['expirytime'],api['ipaddr'],\
+api['clientid']))
+
+ if iptype == "ipv4" and api['mac'] ==
dnsmasq[j]['mac']:
+ logger.debug("PASS: mac: %s" % api['mac'])
+ elif iptype == "ipv6" and api['iaid'] ==
dnsmasq[j]['mac']:
+ logger.debug("PASS: iaid: %s" % api['iaid'])
+ else:
+ logger.error("Fail: mac/iaid: %s/%s" % (api['mac'],
\
+ api['iaid']))
+ return False
+
+ break
+ else:
+ if j == len(dnsmasq) - 1:
+ logger.debug("Last loop %d, FAIL: %s" % (j,api))
+ logger.debug("failed on ipaddr or clientid")
+ return False
+ else:
+ logger.debug("Skipped loop %d,Warning: ipaddr: %s \
+clientid: %s" % (j,api['ipaddr'],api['clientid']))
+ continue
+ else:
+ if j == len(dnsmasq) - 1:
+ logger.error("Fail: hostname: %s expirytime: %s ipaddr: %s \
+clientid: %s" %
(api['hostname'],api['expirytime'],api['ipaddr'], \
+api['clientid']))
+ logger.error("Last loop %d, FAIL: %s" % (j,api))
+ return False
+ else:
+ logger.debug("Skipped loop %d,Warning: hostname: \
+%s expirytime: %s" % (j,api['hostname'],api['expirytime']))
+ continue
+ if not api['iface'] == get_bridge_name(network,logger):
+ logger.error("FAIL: iface: %s" % api['iface'])
+ return False
+ else:
+ logger.debug("PASS: iface: %s" % api['iface'])
+ if not api['type'] == get_network_type(api['ipaddr'],logger):
+ logger.error("FAIL: type: %s" % api['type'])
+ return False
+ else:
+ logger.debug("PASS: type: %s" % api['type'])
+
+ if not api['prefix'] == int(get_ip_prefix(network, iptype ,logger)):
+ logger.error("FAIL: prefix: %s" % api['prefix'])
+ logger.error("FAIL: %s" % api)
+ return False
+ else:
+ logger.debug("PASS: prefix: %s" % api['prefix'])
+ if iptype == "ipv4":
+ if not api['iaid'] == None:
+ logger.error("FAIL: iaid: %s" % api['iaid'])
+ return False
+ else:
+ logger.debug("Unsupported: iaid: %s in IPv4" %
api['iaid'])
+ logger.debug("PASS: %s" % api)
+ elif iptype == "ipv6":
+ logger.debug("Ignoring mac checking on IPv6 line %s" %
api['mac'])
+ logger.debug("PASS: %s" % api)
+
+ return True
+
+def check_values(op1, op2, network, logger):
+ """
+ check each line accorting to ip type, if ipv4 go to check_ipv4_values
+ if ipv6, go to check_ipv6_values.
+ """
+ networkname = network
+ dnsmasq = op1
+ api = op2
+
+ for i in range(0, len(api)):
+ if check_ip(api[i]['ipaddr'],logger) == "ipv4":
+ if not compare_values(dnsmasq,api[i],networkname,"ipv4",logger):
+ return False
+ elif check_ip(api[i]['ipaddr'],logger) == "ipv6":
+ if not compare_values(dnsmasq,api[i],networkname,"ipv6",logger):
+ return False
+ else:
+ logger.error("invalid list element for ipv4 and ipv6")
+ return False
+ return True
+
+def network_dhcp_leases(params):
+ """
+ test API for DHCPLeases in class virNetwork
+ """
+ global LEASE_FILE_DNSMASQ
+ logger = params['logger']
+ networkname = params['networkname']
+ LEASE_FILE_DNSMASQ = "/var/lib/libvirt/dnsmasq/" + networkname +
".leases"
+ mac_value = params.get('macaddr', None)
+ conn = sharedmod.libvirtobj['conn']
+ logger.info("The given mac is %s" % (mac_value))
+
+ if not os.path.exists(LEASE_FILE_DNSMASQ):
+ logger.error("leases file for %s is not exist" % networkname)
+ logger.error("%s" % LEASE_FILE_DNSMASQ)
+ return 1
+ dhcp_lease_dns = get_info_from_dnsmasq(networkname, mac_value, logger)
+ logger.info("From dnsmasq: %s" % (dhcp_lease_dns))
+ if not dhcp_lease_dns:
+ return 1
+
+ netobj = conn.networkLookupByName(networkname)
+
+ try:
+ dhcp_lease_api = netobj.DHCPLeases(mac_value,0)
+ if not dhcp_lease_api and dhcp_lease_dns:
+ logger.info("From API: %s" % (dhcp_lease_api))
+ return 1
+ logger.info("From API: %s" % (dhcp_lease_api))
+ if not check_values(dhcp_lease_dns,dhcp_lease_api,networkname,logger):
+ return 1
+
+ except libvirtError, e:
+ logger.error("API error message: %s, error code is %s" \
+ % (e.message, e.get_error_code()))
+ return 1
+
+ return 0
--
1.8.3.1