diff options
Diffstat (limited to '')
-rw-r--r-- | testenv/test/base_test.py | 279 |
1 files changed, 279 insertions, 0 deletions
diff --git a/testenv/test/base_test.py b/testenv/test/base_test.py new file mode 100644 index 0000000..7bd028d --- /dev/null +++ b/testenv/test/base_test.py @@ -0,0 +1,279 @@ +import os +import shutil +import shlex +import traceback +import re +import time +import sys +from subprocess import call +from misc.colour_terminal import print_red, print_blue +from exc.test_failed import TestFailed +import conf + +HTTP = "HTTP" +HTTPS = "HTTPS" + +SKIP_TEST = 77 + +class BaseTest: + + """ + Class that defines methods common to both HTTP and FTP Tests. + Note that this is an abstract class, subclasses must implement + * stop_server() + * instantiate_server_by(protocol) + """ + + def __init__(self, pre_hook, test_params, post_hook, protocols, req_protocols): + """ + Define the class-wide variables (or attributes). + Attributes should not be defined outside __init__. + """ + self.name = os.path.basename(os.path.realpath(sys.argv[0])) + # if pre_hook == None, then {} (an empty dict object) is passed to + # self.pre_configs + self.pre_configs = pre_hook or {} + + self.test_params = test_params or {} + self.post_configs = post_hook or {} + self.protocols = protocols + + if req_protocols is None: + self.req_protocols = map(lambda p: p.lower(), self.protocols) + else: + self.req_protocols = req_protocols + + self.servers = [] + self.domains = [] + self.ports = [] + + self.addr = None + self.port = -1 + + self.wget_options = '' + self.urls = [] + self.envs = dict() + + self.tests_passed = True + self.ready = False + self.init_test_env() + + self.ret_code = 0 + + def get_test_dir(self): + return self.name + '-test' + + def init_test_env(self): + test_dir = self.get_test_dir() + try: + os.mkdir(test_dir) + except FileExistsError: + shutil.rmtree(test_dir) + os.mkdir(test_dir) + os.chdir(test_dir) + + def get_domain_addr(self, addr): + # TODO if there's a multiple number of ports, wouldn't it be + # overridden to the port of the last invocation? + # Set the instance variables 'addr' and 'port' so that + # they can be queried by test cases. + self.addr = str(addr[0]) + self.port = str(addr[1]) + + return [self.addr, self.port] + + def server_setup(self): + print_blue("Running Test %s" % self.name) + for protocol in self.protocols: + instance = self.instantiate_server_by(protocol) + self.servers.append(instance) + + # servers instantiated by different protocols may differ in + # ports and etc. + # so we should record different domains respect to servers. + domain = self.get_domain_addr(instance.server_address) + self.domains.append('localhost') + self.ports.append(domain[1]) + + def exec_wget(self): + cmd_line = self.gen_cmd_line() + params = shlex.split(cmd_line) + print(params) + envs = {"HOME": os.getcwd()} + envs.update(**self.envs) + print(envs) + + if os.getenv("SERVER_WAIT"): + time.sleep(float(os.getenv("SERVER_WAIT"))) + + try: + ret_code = call(params, env=envs) + except FileNotFoundError: + raise TestFailed("The Wget Executable does not exist at the " + "expected path.") + + return ret_code + + def gen_cmd_line(self): + test_path = os.path.abspath(".") + if os.getenv("WGET_PATH"): + wget_path = os.path.abspath(os.getenv("WGET_PATH")) + else: + wget_path = os.path.abspath(os.path.join(test_path, + "..", '..', 'src', + "wget")) + wget_options = '--debug --no-config %s' % self.wget_options + + valgrind = os.getenv("VALGRIND_TESTS", "") + gdb = os.getenv("GDB_TESTS", "") + + # GDB has precedence over Valgrind + # If both VALGRIND_TESTS and GDB_TESTS are defined, + # GDB will be executed. + if gdb == "1": + cmd_line = 'gdb --args %s %s ' % (wget_path, wget_options) + elif valgrind == "1": + cmd_line = 'valgrind --error-exitcode=301 ' \ + '--leak-check=full ' \ + '--track-origins=yes ' \ + '--show-leak-kinds=all ' \ + '--gen-suppressions=all ' \ + '--suppressions=../valgrind-suppressions-ssl ' \ + '%s %s ' % (wget_path, wget_options) + elif valgrind not in ("", "0"): + cmd_line = '%s %s %s ' % (os.getenv("VALGRIND_TESTS", ""), + wget_path, + wget_options) + else: + cmd_line = '%s %s ' % (wget_path, wget_options) + + for req_protocol, urls, domain, port in zip(self.req_protocols, + self.urls, + self.domains, + self.ports): + # zip is function for iterating multiple lists at the same time. + # e.g. for item1, item2 in zip([1, 5, 3], + # ['a', 'e', 'c']): + # print(item1, item2) + # generates the following output: + # 1 a + # 5 e + # 3 c + for url in urls: + cmd_line += '%s://%s:%s/%s ' % (req_protocol, domain, port, url) + + + print(cmd_line) + + return cmd_line + + def __test_cleanup(self): + os.chdir('..') + try: + if not os.getenv("NO_CLEANUP"): + shutil.rmtree(self.get_test_dir()) + except: + print("Unknown Exception while trying to remove Test Environment.") + self.tests_passed = False + + def _exit_test(self): + self.__test_cleanup() + + def begin(self): + return 0 if self.tests_passed else 100 + + def call_test(self): + self.hook_call(self.test_params, 'Test Option') + + try: + self.ret_code = self.exec_wget() + except TestFailed as e: + raise e + finally: + self.stop_server() + + def do_test(self): + self.pre_hook_call() + self.call_test() + self.post_hook_call() + + def hook_call(self, configs, name): + for conf_name, conf_arg in configs.items(): + try: + # conf.find_conf(conf_name) returns the required conf class, + # then the class is instantiated with conf_arg, then the + # conf instance is called with this test instance itself to + # invoke the desired hook + conf.find_conf(conf_name)(conf_arg)(self) + except AttributeError: + self.stop_server() + raise TestFailed("%s %s not defined." % + (name, conf_name)) + + def pre_hook_call(self): + self.hook_call(self.pre_configs, 'Pre Test Function') + + def post_hook_call(self): + self.hook_call(self.post_configs, 'Post Test Function') + + def _replace_substring(self, string): + """ + Replace first occurrence of "{{name}}" in @string with + "getattr(self, name)". + """ + pattern = re.compile(r'\{\{\w+\}\}') + match_obj = pattern.search(string) + if match_obj is not None: + rep = match_obj.group() + temp = getattr(self, rep.strip('{}')) + string = string.replace(rep, temp) + return string + + def instantiate_server_by(self, protocol): + """ + Subclasses must override this method to actually instantiate servers + for test cases. + """ + raise NotImplementedError + + def stop_server(self): + """ + Subclasses must implement this method in order to stop certain + servers of different types. + """ + raise NotImplementedError + + @staticmethod + def get_server_rules(file_obj): + """ + The handling of expect header could be made much better when the + options are parsed in a true and better fashion. For an example, + see the commented portion in Test-basic-auth.py. + """ + server_rules = {} + for rule_name, rule in file_obj.rules.items(): + server_rules[rule_name] = conf.find_conf(rule_name)(rule) + return server_rules + + def __enter__(self): + """ + Initialization for with statement. + """ + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """ + If the with statement got executed with no exception raised, then + exc_type, exc_val, exc_tb are all None. + """ + if exc_val: + self.tests_passed = False + if exc_type is TestFailed: + print_red('Error: %s.' % exc_val.error) + else: + print_red('Unhandled exception caught.') + print(exc_val) + traceback.print_tb(exc_tb) + self.__test_cleanup() + + return self.tests_passed |