On 08/06/2011 12:09 AM, Guannan Ren wrote:
add new testcases repos/remoteAccess/unix_perm_sasl.py
---
repos/remoteAccess/unix_perm_sasl.py | 234 ++++++++++++++++++++++++++++++++++
1 files changed, 234 insertions(+), 0 deletions(-)
create mode 100644 repos/remoteAccess/unix_perm_sasl.py
diff --git a/repos/remoteAccess/unix_perm_sasl.py b/repos/remoteAccess/unix_perm_sasl.py
new file mode 100644
index 0000000..9bb2600
--- /dev/null
+++ b/repos/remoteAccess/unix_perm_sasl.py
@@ -0,0 +1,234 @@
+#!/usr/bin/env python
+""" testing for permission and authentication of unix domain socket
+ remoteAccess:unix_perm_sasl
+ auth_unix_ro
+ none|sasl
+ auth_unix_rw
+ none|sasl
+ unix_sock_group(optional)
+ libvirt
+"""
+
+__author__ = 'Guannan Ren: gren(a)redhat.com'
+__date__ = 'Fri Aug 5, 2011'
+__version__ = '0.1.0'
+__credits__ = 'Copyright (C) 2011 Red Hat, Inc.'
+__all__ = ['unix_perm_sasl', 'group_sasl_set',
+ 'libvirt_configure', 'hypervisor_connecting_test']
+
+import os
+import re
+import sys
+import commands
+
+from pwd import getpwnam
+
+def append_path(path):
+ """Append root path of package"""
+ if path in sys.path:
+ pass
+ else:
+ sys.path.append(path)
+
+pwd = os.getcwd()
+result = re.search('(.*)libvirt-test-API', pwd)
+append_path(result.group(0))
+
+from lib import connectAPI
+from exception import LibvirtAPI
+
+TESTING_USER = 'testapi'
+LIBVIRTD_CONF = "/etc/libvirt/libvirtd.conf"
+SASLPASSWD2 = "/usr/sbin/saslpasswd2"
+
+def check_params(params):
+ """check out the arguments requried for the
testcase"""
+ logger = params['logger']
+ keys = ['auth_unix_ro', 'auth_unix_rw']
+ for key in keys:
+ if key not in params:
+ logger.error("Argument %s is required" % key)
+ return 1
+ return 0
+
+def get_output(command, flag, logger):
+ """execute shell command
+ """
+ status, ret = commands.getstatusoutput(command)
+ if not flag and status:
+ logger.error("executing "+ "\"" + command +
"\"" + " failed")
+ logger.error(ret)
+ return status, ret
It's better to move get_output() out to utils library
as a public
function, since I saw it is often called in our test repos.
- Nan
+
+def libvirt_configure(unix_sock_group, auth_unix_ro, auth_unix_rw, logger):
+ """configure libvirt.conf """
+ logger.info("configuring libvirt.conf")
+
+ # uncomment unix_sock_group
+ unix_group_add = "echo 'unix_sock_group = \"%s\"'>>
%s" % \
+ (unix_sock_group, LIBVIRTD_CONF)
+ status, output = get_output(unix_group_add, 0, logger)
+ if status:
+ logger.error("setting unix_sock_group to %s failed" %
unix_sock_group)
+ return 1
+
+ auth_unix_ro_add = "echo 'auth_unix_ro = \"%s\"'>>
%s" % \
+ (auth_unix_ro, LIBVIRTD_CONF)
+ status, output = get_output(auth_unix_ro_add, 0, logger)
+ if status:
+ logger.error("setting auth_unix_ro to %s failed" % auth_unix_ro)
+ return 1
+
+ auth_unix_rw_add = "echo 'auth_unix_rw = \"%s\"'>>
%s" % \
+ (auth_unix_rw, LIBVIRTD_CONF)
+ status, output = get_output(auth_unix_rw_add, 0, logger)
+ if status:
+ logger.error("setting auth_unix_rw to %s failed" % auth_unix_rw)
+ return 1
+
+ return 0
+
+def group_sasl_set(unix_sock_group, auth_unix_ro, auth_unix_rw, logger):
+ """add libvirt group and set sasl authentication if
needed"""
+ logger.info("add unix socket group and sasl authentication if we need")
+
+ # add unix socket group
+ libvirt_group_add = "groupadd %s" % unix_sock_group
+ status, output = get_output(libvirt_group_add, 0, logger)
+ if status:
+ logger.error("failed to add %s group" % unix_sock_group)
+ return 1
+
+ # add "testapi" as the testing user
+ libvirt_user_add = "useradd -g %s %s" % (unix_sock_group, TESTING_USER)
+ status, output = get_output(libvirt_user_add, 0, logger)
+ if status:
+ logger.error("failed to add %s user into group %s" % \
+ (TESTING_USER, unix_sock_group))
+ return 1
+
+ # add sasl user
+ if auth_unix_ro == 'sasl' or auth_unix_rw == 'sasl':
+ saslpasswd2_add = "echo %s | %s -a libvirt %s" % \
+ (TESTING_USER, SASLPASSWD2, TESTING_USER)
+ status, output = get_output(saslpasswd2_add, 0, logger)
+ if status:
+ logger.error("failed to set sasl user %s" % TESTING_USER)
+ return 1
+
+ return 0
+
+def request_credentials(credentials, user_data):
+ for credential in credentials:
+ if credential[0] == connectAPI.VIR_CRED_AUTHNAME:
+ credential[4] = user_data[0]
+
+ if len(credential[4]) == 0:
+ credential[4] = credential[3]
+ elif credential[0] == connectAPI.VIR_CRED_PASSPHRASE:
+ credential[4] = user_data[1]
+ else:
+ return -1
+
+ return 0
+
+def hypervisor_connecting_test(uri, auth_unix_ro, auth_unix_rw, logger):
+ """connect to hypervisor"""
+ logger.info("connect to hypervisor")
+ orginal_user = os.geteuid()
+ testing_user_id = getpwnam(TESTING_USER)[2]
+ logger.info("the testing_user id is %d" % testing_user_id)
+
+ logger.info("set euid to %d" % testing_user_id)
+ os.seteuid(testing_user_id)
+
+ try:
+ conn = connectAPI.ConnectAPI()
+ if auth_unix_ro == 'none':
+ virconn = conn.open_read_only(uri)
+ elif auth_unix_ro == 'sasl':
+ user_data = [TESTING_USER, TESTING_USER]
+ auth = [[connectAPI.VIR_CRED_AUTHNAME, \
+ connectAPI.VIR_CRED_PASSPHRASE],
+ request_credentials, user_data]
+ virconn = conn.openAuth(uri, auth, 0)
+
+ if auth_unix_rw == 'none':
+ virconn = conn.open(uri)
+ elif auth_unix_rw == 'sasl':
+ user_data = [TESTING_USER, TESTING_USER]
+ auth = [[connectAPI.VIR_CRED_AUTHNAME, \
+ connectAPI.VIR_CRED_PASSPHRASE],
+ request_credentials, user_data]
+ virconn = conn.openAuth(uri, auth, 0)
+ conn.close()
+ except LibvirtAPI, e:
+ logger.error("API error message: %s, error code is %s" % \
+ (e.response()['message'], e.response()['code']))
+ logger.info("set euid back to %d" % orginal_user)
+ os.seteuid(orginal_user)
+ conn.close()
+ return 1
+
+ logger.info("set euid back to %d" % orginal_user)
+ os.seteuid(orginal_user)
+ return 0
+
+def unix_perm_sasl(params):
+ """ test unix socket group function and sasl
authentication"""
+ logger = params['logger']
+ params_check_result = check_params(params)
+ if params_check_result:
+ return 1
+
+ auth_unix_ro = params['auth_unix_ro']
+ auth_unix_rw = params['auth_unix_rw']
+
+ unix_sock_group = 'libvirt'
+ if params.has_key('unix_sock_group'):
+ unix_sock_group = params['unix_sock_group']
+
+ uri = "qemu:///system"
+
+
+ if group_sasl_set(unix_sock_group, auth_unix_ro, auth_unix_rw, logger):
+ return 1
+
+ if libvirt_configure(unix_sock_group, auth_unix_ro, auth_unix_rw, logger):
+ return 1
+
+ if hypervisor_connecting_test(uri, auth_unix_ro, auth_unix_rw, logger):
+ return 1
+
+ return 0
+
+def unix_perm_sasl_clean(params):
+ """clean testing environment"""
+ logger = params['logger']
+
+ auth_unix_ro = params['auth_unix_ro']
+ auth_unix_rw = params['auth_unix_rw']
+
+ unix_sock_group = 'libvirt'
+ if params.has_key('unix_sock_group'):
+ unix_sock_group = params['unix_sock_group']
+
+ # delete "testapi" user
+ libvirt_user_del = "userdel %s" % TESTING_USER
+ status, output = get_output(libvirt_user_del, 0, logger)
+ if status:
+ logger.error("failed to del %s user into group %s" % TESTING_USER)
+
+ # delete unix socket group
+ libvirt_group_del = "groupdel %s" % unix_sock_group
+ status, output = get_output(libvirt_group_del, 0, logger)
+ if status:
+ logger.error("failed to del %s group" % unix_sock_group)
+
+ # delete sasl user
+ if auth_unix_ro == 'sasl' or auth_unix_rw == 'sasl':
+ saslpasswd2_delete = "%s -a libvirt -d %s" % (SASLPASSWD2,
TESTING_USER)
+ status, output = get_output(saslpasswd2_delete, 0, logger)
+ if status:
+ logger.error("failed to delete sasl user %s" % TESTING_USER)
+