#!/usr/bin/env python3 # PYTHON_ARGCOMPLETE_OK import os import sys import shutil from datetime import datetime import subprocess import multiprocessing import socket import getpass import math import re import threading import queue from contextlib import ContextDecorator try: from argparse import ArgumentParser from argparse import RawTextHelpFormatter except ImportError: sys.exit( 'ERROR: You need argparse!\n' + ' install it from http://pypi.python.org/pypi/argparse\n' + ' or run \"pip install argparse\".' ) try: import numpy as np except ImportError: sys.exit( 'ERROR: You need numpy!\n' + ' install it from http://pypi.python.org/pypi/numpy\n' + ' or run \"python3 -m pip install numpy\".' ) try: import netCDF4 except ImportError: sys.exit( 'ERROR: You need netCDF4!\n' + ' install it from http://pypi.python.org/pypi/netCDF4\n' + ' or run \"python3 -m pip install netCDF4\".' ) try: import yaml except ImportError: sys.exit( 'ERROR: You need PyYAML!\n' + ' install it from http://pypi.python.org/pypi/PyYAML\n' + ' or run \"python3 -m pip install PyYAML\".' ) try: import argcomplete except ImportError: print( 'INFO: To use Tab-completion you need argcomplete!\n' + ' install it from http://pypi.python.org/pypi/argcomplete\n' + ' or run \"python3 -m pip install argcomplete\".' ) has_argcomplete = False else: has_argcomplete = True try: from termcolor import colored as tcolored except ImportError: def tcolored(string, color): return string disable_colored_output = False def colored(string, color): if not disable_colored_output: return tcolored(string, color) else: return string def disable_color(): global disable_colored_output disable_colored_output = True version = '1.0.1' class Environment: scripts_dir = os.path.dirname(os.path.realpath(__file__)) trunk_dir = os.path.realpath(os.path.join(scripts_dir, '..')) workspace_dir = os.path.realpath(os.path.join(trunk_dir, '..')) trunk_tests_dir = os.path.join(trunk_dir, 'TUTORIALS') trunk_tests_cases_dir = os.path.join(trunk_tests_dir, 'cases') trunk_tests_builds_dir = os.path.join(trunk_tests_dir, 'builds') tests_dir = os.path.join(workspace_dir, 'tutorials') class LogFormatter: terminal_columns, terminal_lines = shutil.get_terminal_size() hline = '#' * min(terminal_columns, 300) + '\n' table_width_intro = 12 table_width_builds = len(max([s for s in next(os.walk(Environment.trunk_tests_builds_dir))[1] if not s[0] == '.'], key=len)) + len('_debug') table_width_cases = len(max([s for s in next(os.walk(Environment.trunk_tests_cases_dir))[1] if not s[0] == '.'], key=len)) table_width_cores = 7 table_width_total = table_width_intro + table_width_builds + table_width_cases + table_width_cores + 3 intro_table_line_template = \ '{:' + str(table_width_intro) + '} ' task_table_line_template = \ '{:' + str(table_width_intro) + '} ' + \ '{:' + str(table_width_cases) + '} ' + \ '{:' + str(table_width_builds) + '} ' + \ '{:' + str(table_width_cores) + '} ' config_table_line_template = \ '{:' + str(table_width_intro) + '} ' + \ '{:' + str(max(table_width_builds, table_width_cases)) + '} ' + \ '{:8} ' file_table_line_template = \ '{:' + str(table_width_intro) + '} ' + \ '{:' + str(table_width_cases + 13) + '} ' class SignificantDigitsRounder: @staticmethod def _round(value, digits=10): if value == 0.0: return value negative = value < 0.0 value = -value if negative else value rounded_value = round(value, -int(math.floor(math.log10(value))) + (digits - 1)) rounded_value = -rounded_value if negative else rounded_value return rounded_value vectorized_round = np.vectorize(_round) _vectorized_round = np.vectorize(round) @classmethod def around(cls, array, digits=10): # TODO: divide both arrays and check decimal point sign_mask = np.ma.masked_where(array >= 0.0, array).mask pos_array = np.where(sign_mask, array, -array) non_zero_maks = np.ma.masked_where(pos_array == 0.0, pos_array).mask non_zero_array = np.where(non_zero_maks, 1.0, pos_array) i1 = -np.floor(np.log10(non_zero_array)).astype(int) + (digits - 1) rounded_non_zero_array = cls._vectorized_round(non_zero_array, i1) rounded_pos_array = np.where(non_zero_maks, 0.0, rounded_non_zero_array) return np.where(sign_mask, rounded_pos_array, -rounded_pos_array) class Logger(ContextDecorator): def __init__(self, logfile_dir, logfile_name='palmtest.log', logfile_mode='a', verbose=False): self.logfile_path = os.path.join(logfile_dir, logfile_name) self.logfile_mode = logfile_mode self.verbose = verbose def __enter__(self): self._file = open(self.logfile_path, self.logfile_mode) return self def to_file(self, message): self._file.write(message) self._file.flush() def to_log(self, message): if self.verbose: sys.stdout.write(message) sys.stdout.flush() self._file.write(message) self._file.flush() def to_all(self, message): sys.stdout.write(message) sys.stdout.flush() self._file.write(message) self._file.flush() def __exit__(self, *exc): self._file.close() return False class Executor: @staticmethod def _enqueue_output(out, queue): for line in iter(out.readline, b''): queue.put(line) out.close() @staticmethod def execute(cmd, cwd='.', verbose=True, dry_run=False): assert isinstance(cmd, list) if dry_run: cmd = ['echo'] + cmd cmd_str = ' '.join(cmd) p = subprocess.Popen(cmd_str, cwd=cwd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True, bufsize=1) q = queue.Queue() t = threading.Thread(target=Executor._enqueue_output, args=(p.stdout, q)) t.daemon = True # thread dies with the program t.start() with Logger(cwd, verbose=verbose) as logger: # read line without blocking logger.to_log(LogFormatter.hline) logger.to_log('CMD: ' + cmd_str + '\n') logger.to_log(LogFormatter.hline) while t.is_alive(): try: line = q.get_nowait() # or q.get(timeout=.1) except queue.Empty: pass # print('no output yet') else: # got line logger.to_log(line.decode("utf-8")) line = True while line: try: line = q.get_nowait() # or q.get(timeout=.1) except queue.Empty: line = False else: # got line logger.to_log(line.decode("utf-8")) logger.to_log(LogFormatter.hline) rc = p.poll() failed = rc != 0 return failed class NetCDFInterface: def __init__(self, filename): self.filename = filename def is_healthy(self): try: self.get_run_name() except: return False else: return True def get_run_name(self): with netCDF4.Dataset(self.filename, mode='r') as netcdf: l = getattr(netcdf, 'title').split() i = l.index('run:') return l[i+1] def get_var_list(self): with netCDF4.Dataset(self.filename, mode='r') as netcdf: var_list = list(netcdf.variables.keys()) var_list = filter(None, var_list) return sorted(var_list) def show_content(self): with netCDF4.Dataset(self.filename, mode='r') as netcdf: for name in netcdf.ncattrs(): print("Global attr", name, "=", getattr(netcdf, name)) print(netcdf) for v in netcdf.variables: print(v) def get_times_list(self): attributes, times = self.read_var('time') times = [str(time) for time in times] times = list(filter(None, times)) return times def contains(self, variable): return variable in self.get_var_list() def read_var(self, variable): with netCDF4.Dataset(self.filename, mode='r') as netcdf: values = netcdf.variables[variable][:] # extract values attributes = dict() try: attributes['long_name'] = netcdf.variables[variable].name except: attributes['long_name'] = '' try: attributes['unit'] = netcdf.variables[variable].units except: attributes['unit'] = '' return attributes, values class FileComparator: @staticmethod def compare_ascii(file_path1, file_path2, start_string=None): try: with open(file_path1, 'r') as file1: content1 = file1.readlines() except OSError: return True, colored('[reference file not found]', 'red') try: with open(file_path2, 'r') as file2: content2 = file2.readlines() except OSError: return True, colored('[output file not found]', 'red') if start_string: index1 = content1.index(start_string) index2 = content2.index(start_string) comparable_content1 = content1[index1:] comparable_content2 = content2[index2:] ln = index2 + 1 else: comparable_content1 = content1 comparable_content2 = content2 ln = 1 if len(comparable_content1) != len(comparable_content2): return True, colored('[mismatch in total number of lines]', 'red') for line1, line2 in zip(comparable_content1, comparable_content2): if not line1 == line2: return True, colored('[mismatch in content starting line ' + str(ln) + ']', 'red') ln += 1 return False, colored('[file ok]', 'green') @staticmethod def compare_netcdf(file_path1, file_path2, digits=None): nci1 = NetCDFInterface(file_path1) nci2 = NetCDFInterface(file_path2) if not nci1.is_healthy(): return True, colored('[reference file not found]', 'red') if not nci2.is_healthy(): return True, colored('[output file not found]', 'red') times_list1 = nci1.get_times_list() times_list2 = nci2.get_times_list() if not times_list1 == times_list2: return True, colored('[wrong time dimension]', 'red') else: time_list = times_list1 var_list1 = nci1.get_var_list() var_list2 = nci2.get_var_list() if not var_list1 == var_list2: return True, colored('[wrong set of variables]', 'red') else: var_list = var_list1 content1 = dict() content2 = dict() for var in var_list: attributes1, values1 = nci1.read_var(var) attributes2, values2 = nci2.read_var(var) if sorted(attributes1.keys()) != sorted(attributes2.keys()): return True, colored('[wrong set of attributes in variable \"'+var+'\"]', 'red') if isinstance(digits, int): values1 = SignificantDigitsRounder.around(values1, digits=digits) values2 = SignificantDigitsRounder.around(values2, digits=digits) content1[var] = values1 content2[var] = values2 #for decimals in for i, time in enumerate(time_list): for var in var_list: t_content1 = content1[var][i] t_content2 = content2[var][i] if not (t_content1==t_content2).all(): if isinstance(digits, int): return True, colored('[1st mismatch within ' + str(digits) + ' digits at time index ' + str(i) + ' in variable \"' + var + '\"]', 'red') else: return True, colored('[1st mismatch at time index ' + str(i) + ' in variable \"' + var + '\"]', 'red') return False, colored('[file ok]', 'green') class OutputChecker: def __init__(self, test_dir, setup_name, build_name, cores, significant_digits=None, verbose=True, dry_run=False): self.test_dir = test_dir self.setup_name = setup_name self.build_name = build_name self.cores = cores self.significant_digits = significant_digits self.verbose = verbose self.dry_run = dry_run self.job_name = self.setup_name + '__' + build_name + '__' + str(self.cores) self.job_dir = os.path.join(self.test_dir, 'JOBS', self.job_name) self.ref_monitoring_dir = os.path.join(Environment.trunk_tests_cases_dir, self.setup_name, 'MONITORING') self.ref_output_dir = os.path.join(Environment.trunk_tests_cases_dir, self.setup_name, 'OUTPUT') self.res_monitoring_dir = os.path.join(self.job_dir, 'MONITORING') self.res_output_dir = os.path.join(self.job_dir, 'OUTPUT') self.failed = None def get_checkable_file_dicts(self): if os.path.isdir(self.ref_monitoring_dir): file_names_monitoring = [s for s in next(os.walk(self.ref_monitoring_dir))[2]] else: file_names_monitoring = [] file_paths_monitoring = [] for file_name in file_names_monitoring: file_specific_ending = file_name[len(self.setup_name):] file_specific_ending_split = file_specific_ending.split('.') postfix = file_specific_ending_split[0] if len(file_specific_ending_split) > 1: extension = file_specific_ending_split[-1] else: extension = '' if len(file_specific_ending_split) > 2: cycle_info = file_specific_ending_split[1: -1] else: cycle_info = [] file_paths_monitoring.append( dict( postfix=postfix, cycle_info=cycle_info, extension=extension, ref_path=self.ref_monitoring_dir, res_path=self.res_monitoring_dir, ) ) if os.path.isdir(self.ref_output_dir): file_names_output = [s for s in next(os.walk(self.ref_output_dir))[2]] else: file_names_output = [] file_paths_output = [] for file_name in file_names_output: file_specific_ending = file_name[len(self.setup_name):] file_specific_ending_split = file_specific_ending.split('.') postfix = file_specific_ending_split[0] if len(file_specific_ending_split) > 1: extension = file_specific_ending_split[-1] else: extension = '' if len(file_specific_ending_split) > 2: cycle_info = file_specific_ending_split[1: -1] else: cycle_info = [] file_paths_output.append( dict( postfix=postfix, cycle_info=cycle_info, extension=extension, ref_path=self.ref_output_dir, res_path=self.res_output_dir, ) ) return file_paths_monitoring + file_paths_output def check(self): with Logger(self.test_dir, verbose=self.verbose) as logger: logger.to_log('Checking output files:') logger.to_all('\n') failed = False for file_dict in self.get_checkable_file_dicts(): file_failed = False ext_list = [file_dict['extension']] if file_dict['extension'] else [] file_specific_ending = '.'.join([file_dict['postfix']] + file_dict['cycle_info'] + ext_list ) logger.to_all(LogFormatter.file_table_line_template.format('Checking:', self.setup_name + file_specific_ending)) ref_file_path = os.path.join(file_dict['ref_path'], self.setup_name + file_specific_ending) res_file_path = os.path.join(file_dict['res_path'], self.job_name + file_specific_ending) if re.match('_rc', file_dict['postfix']) and not file_dict['extension']: file_failed, message = FileComparator.compare_ascii(ref_file_path, res_file_path, start_string='Run-control output:\n') elif re.match('nc', file_dict['extension']): if self.significant_digits is not None: if re.match('_ts', file_dict['postfix']) and 'timeseries' in self.significant_digits: file_failed, message = FileComparator.compare_netcdf(ref_file_path, res_file_path, digits=self.significant_digits['timeseries']) elif re.match('_pr', file_dict['postfix']) and 'profiles' in self.significant_digits: file_failed, message = FileComparator.compare_netcdf(ref_file_path, res_file_path, digits=self.significant_digits['profiles']) else: file_failed, message = FileComparator.compare_netcdf(ref_file_path, res_file_path, digits=self.significant_digits['other']) else: file_failed, message = FileComparator.compare_netcdf(ref_file_path, res_file_path) else: message = colored('[ignored]', 'blue') if file_failed: failed = True logger.to_all(message + '\n') if self.dry_run: failed = False return failed class PALMJob: """The PALM job class deals with the execution of a single PALM job""" @staticmethod def get_job_name(setup_name, build_name, cores): return setup_name + '__' + build_name + '__' + str(cores) def __init__(self, test_dir, test_case, build_name, cores, verbose=False, dry_run=False): self.test_dir = test_dir self.test_case = test_case self.build_name = build_name self.cores = cores self.verbose = verbose self.dry_run = dry_run self.attempted_debug = False self.failed_debug = None self.attempted_non_debug = False self.failed_non_debug = None def _link_restart_files(self, build_name): if self.dry_run: return True, colored('[restart data dry]', 'blue') name = self.get_job_name(self.test_case.name, build_name, self.cores) source_name = self.get_job_name(self.test_case.use_binary_files_from, build_name, self.cores) source_restart_dir = os.path.join(self.test_dir, 'JOBS', source_name, 'RESTART') try: source_data_dirs_grp = [d for r, d, f in os.walk(source_restart_dir)] except: source_data_dirs_grp = [] if len(source_data_dirs_grp) == 0: source_data_dirs = [] else: source_data_dirs = source_data_dirs_grp[0] if len(source_data_dirs) == 0 and re.match('.+_debug', build_name): source_build_name = build_name[:-len('_debug')] source_name = self.get_job_name(self.test_case.use_binary_files_from, source_build_name, self.cores) source_restart_dir = os.path.join(self.test_dir, 'JOBS', source_name, 'RESTART') try: source_data_dirs_grp = [d for r, d, f in os.walk(source_restart_dir)] except: source_data_dirs_grp = [] if len(source_data_dirs_grp) == 0: source_data_dirs = [] else: source_data_dirs = source_data_dirs_grp[0] if len(source_data_dirs) == 0: source_data_dir = 'no_restart_data' else: source_data_dir = sorted(source_data_dirs)[-1] source_data_dir_path = os.path.join(source_restart_dir, source_data_dir) if os.path.isdir(source_data_dir_path) and re.match('.+_d3d.*', source_data_dir): job_restart_dir = os.path.join(self.test_dir, 'JOBS', name, 'RESTART') os.makedirs(job_restart_dir, exist_ok=False) job_data_dir_path = os.path.join(job_restart_dir, name + '_d3d') os.symlink(source_data_dir_path, job_data_dir_path, target_is_directory=True) return False, colored('[linked restart data from: ' + source_data_dir_path + ']', 'green') else: return True, colored('[no restart data found]', 'red') def _execute(self, name, build_name): execution_failed = Executor.execute( [ os.path.join(self.test_dir, 'trunk', 'SCRIPTS', 'palmrun'), '-c', '\"' + build_name + '\"', '-r', name, '-a', '\"' + ' '.join(self.test_case.activation_strings) + '\"', '-X', str(self.cores), '-T', str(self.cores), '-B', '-v', '-z', ], cwd=self.test_dir, verbose=self.verbose, dry_run=self.dry_run, ) if self.dry_run: return False, colored('[execution dry]', 'blue') elif execution_failed: return True, colored('[execution failed]', 'red') else: return False, colored('[execution ok]', 'green') def _check(self, build_name): checker = OutputChecker( self.test_dir, self.test_case.name, build_name, self.cores, significant_digits=self.test_case.significant_digits, verbose=self.verbose, dry_run=self.dry_run, ) check_failed = checker.check() if self.dry_run: return False, colored('[checks dry]', 'blue') if check_failed: return True, colored('[checks failed]', 'red') else: return False, colored('[checks ok]', 'green') def execute(self, debug=False): if debug: attempted = self.attempted_debug build_name = self.build_name + '_debug' failed = self.failed_debug else: attempted = self.attempted_non_debug build_name = self.build_name failed = self.failed_non_debug if not attempted: with Logger(self.test_dir, verbose=self.verbose) as logger: status_prefix = LogFormatter.task_table_line_template.format('Testing:', self.test_case.name, build_name, self.cores) logger.to_all(status_prefix) logger.to_log('[started]' + '\n') attempted = True name = self.get_job_name(self.test_case.name, build_name, self.cores) input_dir = os.path.join(self.test_dir, 'JOBS', name, 'INPUT') os.makedirs(input_dir, exist_ok=False) # copying needs to be done per file, because input files need to be renamed for input_file in self.test_case.input_file_names: postfix = input_file[len(self.test_case.name):] src = os.path.join(self.test_case.input_dir, input_file) dst = os.path.join(input_dir, name + postfix) shutil.copy(src, dst) # copying the entire directory is ok, because source files do not need to be renamed user_code_dir = os.path.join(self.test_dir, 'JOBS', name, 'USER_CODE') if os.path.isdir(self.test_case.user_code_dir): shutil.copytree(self.test_case.user_code_dir, user_code_dir, copy_function=shutil.copy) if self.test_case.requires_binary_files: link_restart_files_failed, message = self._link_restart_files(build_name) logger.to_log(status_prefix) logger.to_log(message + ' ') logger.to_log('\n') failed, message = self._execute(name, build_name) logger.to_log(status_prefix) logger.to_all(message + ' ') logger.to_log('\n') failed, message = self._check(build_name) logger.to_log(status_prefix) logger.to_log(message + ' ') logger.to_all('\n') if debug: self.attempted_debug = attempted self.failed_debug = failed else: self.attempted_non_debug = attempted self.failed_non_debug = failed return failed def status(self): return dict( attempted=self.attempted_non_debug or self.attempted_debug, failed=self.failed_non_debug and self.failed_debug, debugged=self.attempted_debug, non_debug_failed=self.failed_non_debug, ) class PALMBuild: """The PALM build class deals with configuration and execution of all required PALM builds""" def __init__(self, test_dir, build_name, verbose=False, dry_run=False): self.test_dir = test_dir self.build_name = build_name self.verbose = verbose self.dry_run = dry_run self.configured = False self.executed = False self.available = False self.requires_mpi = False self.requires_netcdf = False self.requires_fftw = False self.requires_rrtmg = False self.attempted_non_debug = False self.attempted_debug = False self.failed_non_debug = None self.failed_debug = None def configure(self): try: with open(os.path.join(Environment.trunk_tests_builds_dir, self.build_name, 'build_config.yml'), 'r') as f: build_config = yaml.load(f) except: return True, colored('[build not found]', 'red') if 'compiler' in build_config: self.compiler = build_config['compiler'] else: return True, colored('[missing \"compiler\" keyword]', 'red') if not isinstance(self.compiler, dict): return True, colored('[\"compiler\" keyword must be dict]', 'red') if 'linker' in build_config: self.linker = build_config['linker'] else: return True, colored('[missing \"linker\" keyword]', 'red') if not isinstance(self.linker, dict): return True, colored('[\"linker\" keyword must be dict]', 'red') if 'mpi_wrapper' in self.compiler: if 'mpi_wrapper}}' in self.compiler['mpi_wrapper']: self.requires_mpi = True else: return True, colored('[missing \"mpi_wrapper\" keyword]', 'red') if 'includes' in self.compiler: for include in self.compiler['includes']: if 'include.netcdf}}' in include: self.requires_netcdf = True if 'include.fftw}}' in include: self.requires_fftw = True if 'include.rrtmg}}' in include: self.requires_rrtmg = True else: return True, colored('[missing \"includes\" keyword in compiler]', 'red') if 'options' in self.linker: for lib in self.linker['options']: if 'lib.netcdf}}' in lib: self.requires_netcdf = True if 'lib.fftw}}' in lib: self.requires_fftw = True if 'lib.rrtmg}}' in lib: self.requires_rrtmg = True else: return True, colored('[missing \"options\" keyword in linker]', 'red') library_names = [] if self.requires_netcdf: library_names.append('netcdf') if self.requires_fftw: library_names.append('fftw') if self.requires_rrtmg: library_names.append('rrtmg') if not 'executable' in self.compiler: return True, colored('[missing \"executable\" keyword in compiler]', 'red') if not 'definitions' in self.compiler: return True, colored('[missing \"definitions\" keyword in compiler]', 'red') if not 'options' in self.compiler: return True, colored('[missing \"options\" keyword in compiler]', 'red') if not 'default' in self.compiler['options']: return True, colored('[missing \"default\" keyword in compiler.options]', 'red') if not 'debug' in self.compiler['options']: return True, colored('[missing \"debug\" keyword in compiler.options]', 'red') try: with open(os.path.join(Environment.workspace_dir, 'palmtest.yml'), 'r') as f: palmtest_config = yaml.load(f) except: return True, colored('[palmtest.yml not found]', 'red') if 'palm_config_template' in palmtest_config: if isinstance(palmtest_config['palm_config_template'], str): custom_template = palmtest_config['palm_config_template'] try: with open(os.path.join(custom_template), 'r') as palm_config_template_file: template = palm_config_template_file.read() except: try: with open(os.path.join(Environment.scripts_dir, '.palm.config.default.in'), 'r') as palm_config_template_file: template = palm_config_template_file.read() except: return True, colored('[trunk/SCRIPTS/.palm.config.default.in not found]', 'red') template = template.replace('@CMAKE_INSTALL_PREFIX@', self.test_dir) template = template.replace('@PALM_HOSTNAME@', socket.gethostname()) template = template.replace('@CMAKE_USERNAME@', getpass.getuser()) template = template.replace('@MPI_Fortran_COMPILER@', self.compiler['mpi_wrapper']) template = template.replace('@CMAKE_Fortran_COMPILER@', self.compiler['executable']) cpp_options_str = ['-D' + s for s in self.compiler['definitions']] template = template.replace('@PALM_CPP_OPTIONS_STR@', ' '.join(cpp_options_str)) template = template.replace('@PALM_CORES@', str(multiprocessing.cpu_count())) template = template.replace('@PALM_COMPILER_OPTIONS@', '{{palmtest.compiler.options}} ' + ' '.join(self.compiler['includes'])) template = template.replace('@PALM_LINKER_OPTIONS@', ' '.join(self.linker['options'])) if 'environments' in palmtest_config: available_environments = palmtest_config['environments'] else: return True, colored('[missing \"environments\" keyword in palmtest.yml]', 'red') if 'id' in self.compiler: c_id = self.compiler['id'] else: return True, colored('[missing \"id\" keyword in compiler]', 'red') if c_id in available_environments: self.available = True environment = available_environments[c_id] if 'mpi_execution_command' in environment: template = template.replace('@PALM_EXECUTE_COMMAND@', environment['mpi_execution_command']) else: template = template.replace('@PALM_EXECUTE_COMMAND@', 'mpirun -n {{mpi_tasks}}') if 'executable' not in environment: return True, colored('[palmtest.yml environment \"' + c_id + '\" has no \"executable\"]', 'red') value = environment['executable'] if isinstance(value, str): template = template.replace('{{' + '.'.join([c_id, 'executable']) + '}}', value) if self.requires_mpi: if 'mpi_wrapper' not in environment: return True, colored('[palmtest.yml environment \"' + c_id + '\" has no \"mpi_wrapper\"]', 'red') value = environment['mpi_wrapper'] if isinstance(value, str): template = template.replace('{{' + '.'.join([c_id, 'mpi_wrapper']) + '}}', value) if 'include' not in environment: return True, colored('[palmtest.yml environment \"' + c_id + '\" has no \"include\"]', 'red') if 'lib' not in environment: return True, colored('[palmtest.yml environment \"' + c_id + '\" has no \"lib\"]', 'red') for lib in library_names: if lib not in environment['include']: return True, colored('[palmtest.yml environment \"' + c_id + '\" has no \"include.'+lib+'\"]', 'red') value = environment['include'][lib] if isinstance(value, str): template = template.replace('{{' + '.'.join([c_id, 'include', lib]) + '}}', value) if lib not in environment['lib']: return True, colored('[palmtest.yml environment \"' + c_id + '\" has no \"lib.'+lib+'\"]', 'red') value = environment['lib'][lib] if isinstance(value, str): template = template.replace('{{' + '.'.join([c_id, 'lib', lib]) + '}}', value) with open(os.path.join(self.test_dir, '.palm.config.' + self.build_name), 'w') as palm_config_file: palm_config_file.write( template.replace( '{{palmtest.compiler.options}}', ' '.join(self.compiler['options']['default']), ) ) with open(os.path.join(self.test_dir, '.palm.config.' + self.build_name + '_debug'), 'w') as palm_config_file: palm_config_file.write( template.replace( '{{palmtest.compiler.options}}', ' '.join(self.compiler['options']['debug']), ) ) self.configured = True return False, colored('[configuration ok]', 'green') else: return True, colored('[palmtest.yml environment \"' + c_id + '\" not found]', 'red') def _execute(self, build_name): self.attempted = True build_failed = Executor.execute( [ os.path.join(self.test_dir, 'trunk', 'SCRIPTS', 'palmbuild'), '-c', '\"' + build_name + '\"', '-v', ], cwd=self.test_dir, verbose=self.verbose, dry_run=self.dry_run, ) if self.dry_run: return False, colored('[build dry]', 'blue') if build_failed: return True, colored('[build failed]', 'red') else: return False, colored('[build ok]', 'green') def build(self, debug=False): if debug: attempted = self.attempted_debug build_name = self.build_name + '_debug' failed = self.failed_debug else: attempted = self.attempted_non_debug build_name = self.build_name failed = self.failed_non_debug if not attempted: with Logger(self.test_dir, verbose=self.verbose) as logger: status_prefix = LogFormatter.task_table_line_template.format('Building:', '', build_name, '') logger.to_all(status_prefix) logger.to_log('[started]' + '\n') attempted = True failed, message = self._execute(build_name) logger.to_log(status_prefix) logger.to_all(message + ' ') logger.to_all('\n') if debug: self.attempted_debug = attempted self.failed_debug = failed else: self.attempted_non_debug = attempted self.failed_non_debug = failed return failed def report(self): return dict( failed_debug=self.failed_debug, failed_non_debug=self.failed_non_debug, ) class PALMTestCase: """The PALM test case class deals with the configuration and execution of all PALM test cases""" def __init__(self,test_dir, name, verbose=False, dry_run=False): self.test_dir = test_dir self.name = name self.verbose = verbose self.dry_run = dry_run self.user_code_dir = os.path.join(Environment.trunk_tests_cases_dir, self.name, 'USER_CODE') self.input_dir = os.path.join(Environment.trunk_tests_cases_dir, self.name, 'INPUT') self.number_of_cores = [] self.build_names = [] self.input_file_names = [] self.configured = False def configure(self, requested_build_names, requested_cores): f_name = os.path.join(Environment.trunk_tests_cases_dir, self.name, 'case_config.yml') try: with open(f_name, 'r') as f: config = yaml.load(f) except: return True, colored('[Case \"' + self.name + '\" could not be found.]', 'red') try: self.use_binary_files_from = config['use_binary_files_from'] except: self.use_binary_files_from = None self.requires_binary_files = bool(self.use_binary_files_from) if 'allowed_builds' not in config: return True, colored('[missing \"allowed_builds\" keyword]', 'red') self.allowed_build_names = config['allowed_builds'] if 'allowed_number_of_cores' not in config: return True, colored('[missing \"allowed_number_of_cores\" keyword]', 'red') self.allowed_number_of_cores = config['allowed_number_of_cores'] if 'activation_strings' not in config: return True, colored('[missing \"activation_strings\" keyword]', 'red') self.activation_strings = config['activation_strings'] if 'significant_digits_for_netcdf_checks' not in config: return True, colored('[missing \"significant_digits_for_netcdf_checks\" keyword]', 'red') self.significant_digits = config['significant_digits_for_netcdf_checks'] if 'timeseries' not in config['significant_digits_for_netcdf_checks']: return True, colored('[missing \"timeseries\" keyword in significant_digits_for_netcdf_checks]', 'red') if 'profiles' not in config['significant_digits_for_netcdf_checks']: return True, colored('[missing \"profiles\" keyword in significant_digits_for_netcdf_checks]', 'red') if 'other' not in config['significant_digits_for_netcdf_checks']: return True, colored('[missing \"other\" keyword in significant_digits_for_netcdf_checks]', 'red') self.number_of_cores = sorted(set(requested_cores).intersection(self.allowed_number_of_cores)) self.build_names = sorted(set(requested_build_names).intersection(self.allowed_build_names)) self.input_file_names = [s for s in next(os.walk(self.input_dir))[2]] self.configured = True if len(self.number_of_cores) == 0 : return True, colored('[no allowed cores requested]', 'blue') if len(self.build_names) == 0: return True, colored('[no allowed builds requested]', 'blue') if len(self.input_file_names) == 0: return True, colored('[no input files found]', 'red') return False, colored('[configuration ok]', 'green') class PALMTest: def __init__(self, args): self.verbose = args.verbose self.no_auto_debug = args.no_auto_debug self.force_debug = args.force_debug self.fail_on_debug = args.fail_on_debug self.dry_run = args.dry_run self.no_color = args.no_color self.test_id = args.test_id self.test_case_names = args.cases self.requested_build_names = args.builds self.requested_cores = args.cores self.test_case_queue = [] self.build_database = dict() def prepare(self): if self.no_color: disable_color() self.test_dir = os.path.join(Environment.tests_dir, self.test_id) try: os.makedirs(self.test_dir, exist_ok=False) except: print('ERROR: Found existing test directory: ' + self.test_dir) exit(1) with Logger(self.test_dir, verbose=self.verbose) as logger: logger.to_all(LogFormatter.hline) logger.to_all('This is the PALM tester (version: ' + version + ')' + '\n') logger.to_all(LogFormatter.hline) try: with open(os.path.join(Environment.workspace_dir, 'palmtest.yml'), 'r') as f: pass except: logger.to_all('ERROR: No palmtest.yml file was found in your working directory!\n') logger.to_all('INFO: A template for this file can be found at: trunk/TESTS/palmtest.yml\n') logger.to_all(' Please copy the template to your working directory and adjust it to your system!\n') exit(1) self.execution_trunk_dir = os.path.join(self.test_dir, 'trunk') os.symlink(Environment.trunk_dir, self.execution_trunk_dir) self.execution_jobs_dir = os.path.join(self.test_dir, 'JOBS') os.makedirs(self.execution_jobs_dir, exist_ok=False) try: with open(os.path.join(Environment.scripts_dir, '.palm.iofiles'), 'r') as iofiles_template_file: iofiles_template = iofiles_template_file.read() with open(os.path.join(self.test_dir, '.palm.iofiles'), 'w') as iofiles_file: iofiles_file.write(iofiles_template.replace('$fast_io_catalog', '$base_data').replace('$restart_data_path', '$base_data').replace('$output_data_path', '$base_data')) except: logger.to_all('ERROR: No .palm.iofiles file was found in trunk/SCRIPTS/') exit(1) available_cores = multiprocessing.cpu_count() final_cores_list = list(filter(lambda x: x <= available_cores, self.requested_cores)) logger.to_all(LogFormatter.config_table_line_template.format('Object:', 'Name:', 'Action:') + 'Status:\n') logger.to_all(LogFormatter.hline) if 'all' in self.requested_build_names: self.requested_build_names = [name for name in next(os.walk(Environment.trunk_tests_builds_dir))[1] if not name[0] == '.'] found_build_names = [] for build_name in self.requested_build_names: build = PALMBuild(self.test_dir, build_name, verbose=self.verbose, dry_run=self.dry_run) configuration_failed, message = build.configure() if not configuration_failed: self.build_database[build_name] = build found_build_names.append(build_name) logger.to_all(LogFormatter.config_table_line_template.format('Build', build_name, 'approved')) logger.to_all(message + '\n') else: logger.to_all(LogFormatter.config_table_line_template.format('Build', build_name, 'rejected')) logger.to_all(message + '\n') final_build_list = found_build_names if 'all' in self.test_case_names: self.test_case_names = sorted([name for name in next(os.walk(Environment.trunk_tests_cases_dir))[1] if not name[0] == '.']) additional_initial_runs_2 = [self.test_case_names] while len(additional_initial_runs_2[-1]) > 0: additional_initial_runs_1 = [] for test_case_name in additional_initial_runs_2[-1]: test_case = PALMTestCase(self.test_dir, test_case_name, verbose=self.verbose) test_case_configuration_failed, message = test_case.configure(final_build_list, final_cores_list) if not test_case_configuration_failed: if test_case.requires_binary_files: additional_initial_runs_1.append(test_case.use_binary_files_from) additional_initial_runs_2.append(sorted(set(additional_initial_runs_1))) test_case_order = [] for i in range(len(additional_initial_runs_2)-1): # low and high refer to priority low = additional_initial_runs_2[i] high = additional_initial_runs_2[i+1] for item in high: while item in low: low.remove(item) test_case_order.append(low) test_case_order_no_dublicates = [] for test_cases in test_case_order: seen = set() seen_add = seen.add test_case_order_no_dublicates.append( [x for x in test_cases if not (x in seen or seen_add(x))] ) approved_test_case_order = [[]] + list(reversed(test_case_order_no_dublicates)) for i, test_cases in enumerate(list(approved_test_case_order)): info = 'Case (dep)' if i < len(approved_test_case_order)-1 else 'Case' for test_case_name in list(test_cases): sys.stdout.flush() test_case = PALMTestCase(self.test_dir, test_case_name, verbose=self.verbose) test_case_configuration_failed, message = test_case.configure(final_build_list, final_cores_list) if test_case_configuration_failed: # removing as configuration failed should only apply to added dependencies approved_test_case_order[i].remove(test_case_name) logger.to_all(LogFormatter.config_table_line_template.format(info, test_case_name, 'rejected')) logger.to_all(message + '\n') elif test_case.requires_binary_files: if test_case.use_binary_files_from not in approved_test_case_order[i-1]: # removing as dependency is already removed approved_test_case_order[i].remove(test_case_name) logger.to_all(LogFormatter.config_table_line_template.format(info, test_case_name, 'disabled')) logger.to_all(colored('[requires dependency \"' + test_case.use_binary_files_from + '\"]', 'red') + '\n') else: logger.to_all(LogFormatter.config_table_line_template.format(info, test_case_name, 'approved')) logger.to_all(message + '\n') else: logger.to_all(LogFormatter.config_table_line_template.format(info, test_case_name, 'approved')) logger.to_all(message + '\n') final_case_list = [] for cases in approved_test_case_order: for case in cases: if case not in final_case_list: final_case_list.append(case) for build_name in final_build_list: build = PALMBuild( self.test_dir, build_name, verbose=self.verbose, dry_run=self.dry_run, ) configuration_failed, message = build.configure() if not configuration_failed: self.build_database[build_name] = build else: logger.to_all(message + '\n') for case_name in final_case_list: test_case = PALMTestCase( self.test_dir, case_name, verbose=self.verbose, dry_run=self.dry_run, ) test_case_configuration_failed, message = test_case.configure(final_build_list, final_cores_list) if not test_case_configuration_failed: self.test_case_queue.append(test_case) logger.to_all(LogFormatter.hline) logger.to_all(LogFormatter.intro_table_line_template.format('Test ID:') + self.test_id + '\n') logger.to_all(LogFormatter.intro_table_line_template.format('Builds:') + str('\n' + LogFormatter.intro_table_line_template.format('')).join(sorted(self.build_database.keys())) + '\n') logger.to_all(LogFormatter.intro_table_line_template.format('Cases:') + str('\n' + LogFormatter.intro_table_line_template.format('')).join([c.name for c in self.test_case_queue]) + '\n') logger.to_all(LogFormatter.intro_table_line_template.format('Cores:') + ' '.join([str(i) for i in final_cores_list]) + '\n') def _execute(self, test_case, build_name, cores): job = PALMJob( self.test_dir, test_case, build_name, cores, verbose=self.verbose, dry_run=self.dry_run ) if self.force_debug: build_failed_non_debug = True job_failed_non_debug = True build_failed_debug = self.build_database[build_name].build(debug=True) if build_failed_debug: job_failed_debug = True else: job_failed_debug = job.execute(debug=True) elif self.no_auto_debug: build_failed_non_debug = self.build_database[build_name].build(debug=False) if build_failed_non_debug: job_failed_non_debug = True else: job_failed_non_debug = job.execute(debug=False) build_failed_debug = None job_failed_debug = None else: build_failed_non_debug = self.build_database[build_name].build(debug=False) if build_failed_non_debug: job_failed_non_debug = True build_failed_debug = self.build_database[build_name].build(debug=True) if build_failed_debug: job_failed_debug = False else: job_failed_debug = job.execute(debug=True) else: job_failed_non_debug = job.execute(debug=False) if job_failed_non_debug: build_failed_debug = self.build_database[build_name].build(debug=True) if build_failed_debug: job_failed_debug = True else: job_failed_debug = job.execute(debug=True) else: build_failed_debug = None job_failed_debug = None return dict( build_failed_non_debug=build_failed_non_debug, job_failed_non_debug=job_failed_non_debug, build_failed_debug=build_failed_debug, job_failed_debug=job_failed_debug, ) def execute(self): with Logger(self.test_dir, verbose=self.verbose) as logger: logger.to_all(LogFormatter.hline) logger.to_all(LogFormatter.task_table_line_template.format('Task:', 'Case:', 'Build:', 'Cores:') + 'Status:\n') logger.to_all(LogFormatter.hline) self.test_report = dict() for test_case in self.test_case_queue: logger.to_log(LogFormatter.hline) logger.to_file(LogFormatter.hline) logger.to_file(LogFormatter.hline) status_dict = dict() for build_name in test_case.build_names: status_dict[build_name] = dict() for cores in test_case.number_of_cores: status_dict[build_name][cores] = self._execute(test_case, build_name, cores) self.test_report[test_case.name] = status_dict logger.to_log(LogFormatter.hline) logger.to_file('\n' * 10) def report(self): with Logger(self.test_dir, verbose=self.verbose) as logger: logger.to_all(LogFormatter.hline) r = '{:10}' + ' total: ' + '{:<3d}' + \ ' ok: ' + colored('{:<3d}', 'green') + \ ' debugged: ' + colored('{:<3d}', 'yellow') + \ ' failed: ' + colored('{:<3d}', 'red') n_all = 0 n_ok = 0 n_debugged = 0 n_failed = 0 for build_name, build in self.build_database.items(): status = build.report() b = status['failed_non_debug'] bd = status['failed_debug'] n_all += 1 if not b and b is not None: n_ok += 1 if bd is not None: n_debugged += 1 if b and (bd or bd is None): n_failed += 1 logger.to_all(r.format('Builds:', n_all, n_ok, n_debugged, n_failed) + '\n') total_failed = n_failed total_debugged = n_debugged n_all = 0 n_ok = 0 n_debugged = 0 n_failed = 0 # {'case_name': {'build_name': {4: {'build_failed_debug': None, # 'build_failed_non_debug': False, # 'job_failed_debug': None, # 'job_failed_non_debug': False}}}, for case_name, case in self.test_report.items(): for build_name, build in case.items(): for cores, results in build.items(): n_all += 1 b = results['build_failed_non_debug'] bd = results['build_failed_debug'] j = results['job_failed_non_debug'] jd = results['job_failed_debug'] if not j: n_ok += 1 if jd is not None: n_debugged += 1 if j and (jd or jd is None): n_failed += 1 logger.to_all(r.format('Tests:', n_all, n_ok, n_debugged, n_failed) + '\n') total_failed += n_failed total_debugged += n_debugged if self.fail_on_debug: return (total_failed + total_debugged) > 0 else: return total_failed > 0 class CustomCompleter: def __init__(self): pass def __call__(self, prefix, parsed_args, **kwargs): return (i for i in self.get_items() if i.startswith(prefix)) def get_items(self): return [] class CaseCompleter(CustomCompleter): def get_items(self): case_names = [name for name in next(os.walk(Environment.trunk_tests_cases_dir))[1] if not name[0] == '.'] return case_names + ['all'] class BuildCompleter(CustomCompleter): def get_items(self): build_names = [name for name in next(os.walk(Environment.trunk_tests_builds_dir))[1] if not name[0] == '.'] return build_names + ['all'] class PALMTestArgumentParser(ArgumentParser): def __init__(self): super().__init__( description='This is the PALM tester\n' + 'Developer Support: knoop@muk.uni-hannover.de', formatter_class=RawTextHelpFormatter, add_help=True, ) self.add_argument( '--version', action='version', version=version, ) self.add_argument( '--verbose', action='store_true', dest='verbose', help='Increase verbosity of terminal output.', required=False, ) self.add_argument( '--no-auto-debug', action='store_true', dest='no_auto_debug', help='Disable automatic debugging in case of test failure.', required=False, ) self.add_argument( '--force-debug', action='store_true', dest='force_debug', help='Force debugging regardless of test failure (ignores --no-auto-debug).', required=False, ) self.add_argument( '--fail-on-debug', action='store_true', dest='fail_on_debug', help='Return a non-zero exit status in case debugging was required.', required=False, ) self.add_argument( '--dry-run', action='store_true', dest='dry_run', help='Prepare and process all requested tests without actually building or executing PALM.', required=False, ) self.add_argument( '--no-color', action='store_true', dest='no_color', help='Disable colored terminal output.', required=False, ) self.add_argument( '--cases', action='store', dest='cases', default=['all'], help='A list of test cases to be executed. (default: %(default)s)', nargs='+', required=False, type=str, metavar='STR', ).completer = CaseCompleter() self.add_argument( '--builds', action='store', dest='builds', default=['all'], help='A list of builds to be executed. (default: %(default)s)', nargs='+', required=False, type=str, metavar='STR', ).completer = BuildCompleter() self.add_argument( '--cores', action='store', dest='cores', default=[i for i in range(1, multiprocessing.cpu_count()+1)], choices=[i for i in range(1, multiprocessing.cpu_count()+1)], help='The number of cores tests are supposed to be executed on. (default: %(default)s)', nargs='+', required=False, type=int, metavar='INT', ) self.add_argument( '--test-id', action='store', dest='test_id', default=datetime.now().strftime('%Y-%m-%d_%H:%M:%S.%f'), help='An individual test id. (default: current timestamp)', required=False, type=str, metavar='STR', ) if __name__ == '__main__': parser = PALMTestArgumentParser() if has_argcomplete: argcomplete.autocomplete(parser) args = parser.parse_args() palm_test = PALMTest(args) palm_test.prepare() palm_test.execute() failed = palm_test.report() exit(1 if failed else 0)