diff --git a/script_umdp3_checker/umdp3_checker_rules.py b/script_umdp3_checker/umdp3_checker_rules.py index c18270d..b02db53 100644 --- a/script_umdp3_checker/umdp3_checker_rules.py +++ b/script_umdp3_checker/umdp3_checker_rules.py @@ -344,11 +344,11 @@ def lowercase_variable_names(self, lines: List[str]) -> TestResult: "", clean_line, ) - if re.search(r"[A-Z]{2,}", clean_line): - self.add_extra_error("UPPERCASE variable name") + if match := re.search(r"([A-Z]{2,})", clean_line): + self.add_extra_error(f"UPPERCASE variable name : {match[1]}") failures += 1 error_log = self.add_error_log( - error_log, "UPPERCASE variable name", count + 1 + error_log, f"UPPERCASE variable name {match[1]}", count + 1 ) output = f"Checked {count + 1} lines, found {failures} failures." diff --git a/script_umdp3_checker/umdp3_conformance.py b/script_umdp3_checker/umdp3_conformance.py index 0e91783..d1ec788 100644 --- a/script_umdp3_checker/umdp3_conformance.py +++ b/script_umdp3_checker/umdp3_conformance.py @@ -111,8 +111,11 @@ def get_branch_name(self) -> str: return self.bdiff_obj.branch -class StyleChecker(ABC): - """Abstract base class for style checkers.""" +class StyleChecker: + """A Class intended to coordinate the running of a set of style checks on + a set of files. + The checks are a dictionary of named callable routines. + The files are a list of file paths to check.""" """ TODO: This is where it might be good to set up a threadsafe @@ -127,87 +130,18 @@ class instance to hold the 'expanded' check outputs. a TestResult object directly, which includes the extra error info, so that each thread can work independently.""" name: str - file_extensions: Set[str] check_functions: Dict[str, Callable] files_to_check: List[Path] def __init__( self, name: str, - file_extensions: Set[str], check_functions: Dict[str, Callable], - changed_files: List[Path] = [], - ): - self.name = name - self.file_extensions = file_extensions or set() - self.check_functions = check_functions or {} - self.files_to_check = ( - self.filter_files(changed_files, self.file_extensions) - if changed_files - else [] - ) - - @abstractmethod - def get_name(self) -> str: - """Return the name of this checker.""" - pass - - @abstractmethod - def check(self, file_path: Path) -> CheckResult: - """Run the style checker on a file.""" - pass - - @classmethod - def from_full_list( - cls, - name: str, - file_extensions: Set[str], - check_functions: Dict[str, Callable], - all_files: List[Path], - ) -> "StyleChecker": - """Create a StyleChecker instance filtering files from a full list.""" - filtered_files = cls.filter_files(all_files, file_extensions) - return cls(name, file_extensions, check_functions, filtered_files) - - @staticmethod - def filter_files( - files: List[Path], file_extensions: Set[str] = set() - ) -> List[Path]: - """Filter files based on the checker's file extensions.""" - if not file_extensions: - return files - return [f for f in files if f.suffix in file_extensions] - - -class UMDP3_checker(StyleChecker): - """UMDP3 built-in style checker.""" - - files_to_check: List[Path] - - def __init__( - self, - name: str, - file_extensions: Set[str], - check_functions: Dict[str, Callable], - changed_files: List[Path] = [], - print_volume: int = 3, + changed_files: List[Path], ): self.name = name - self.file_extensions = file_extensions or set() self.check_functions = check_functions or {} - self.files_to_check = ( - super().filter_files(changed_files, self.file_extensions) - if changed_files - else [] - ) - if print_volume >= 5: - print( - f"UMDP3_checker initialized :\n" - f" Name : {self.name}\n" - f" Has {len(self.check_functions)} check functions\n" - f" Using {len(self.file_extensions)} file extensions\n" - f" Gives {len(self.files_to_check)} files to check." - ) + self.files_to_check = changed_files or [] def get_name(self) -> str: return self.name @@ -227,89 +161,116 @@ def check(self, file_path: Path) -> CheckResult: test_results=file_results, ) - -class ExternalChecker(StyleChecker): - """Wrapper for external style checking tools.""" - - """ - TODO : This is overriding the 'syle type hint from the base class. - As we're currently passing in a list of strings to pass to 'subcommand'. - Ideally we should be making callable functions for each check, but that - would require more refactoring of the code. - Is that a 'factory' method?""" - check_commands: Dict[str, List[str]] - - def __init__( - self, + @classmethod + def from_full_list( + cls, name: str, - file_extensions: Set[str], - check_functions: Dict[str, List[str]], - changed_files: List[Path], + file_extensions: Set[str] = set(), + check_functions: Dict[str, Callable] = {}, + changed_files: List[Path] = [], print_volume: int = 3, ): - self.name = name - self.file_extensions = file_extensions or set() - self.check_commands = check_functions or {} - self.files_to_check = ( - super().filter_files(changed_files, self.file_extensions) - if changed_files - else [] + files_to_check = ( + cls.filter_files(changed_files, file_extensions) if changed_files else [] ) if print_volume >= 5: print( f"ExternalChecker initialized :\n" - f" Name : {self.name}\n" - f" Has {len(self.check_commands)} check commands\n" - f" Using {len(self.file_extensions)} file extensions\n" - f" Gives {len(self.files_to_check)} files to check." + f" Name : {name}\n" + f" Has {len(check_functions)} check commands\n" + f" Using {len(file_extensions)} file extensions\n" + f" Gives {len(files_to_check)} files to check." ) + return cls(name, check_functions, files_to_check) - def get_name(self) -> str: - return self.name + @staticmethod + def filter_files( + files: List[Path], file_extensions: Set[str] = set() + ) -> List[Path]: + """Filter files based on the checker's file extensions.""" + if not file_extensions: + return files + return [f for f in files if f.suffix in file_extensions] - def check(self, file_path: Path) -> CheckResult: - """Run external checker commands on file.""" - file_results = [] - tests_failed = 0 - for test_name, command in self.check_commands.items(): + @classmethod + def create_external_runners( + cls, + name: str, + commands: List[List[str]], + all_files: List[Path], + file_extensions: Set[str] = set(), + ) -> "StyleChecker": + """Create a StyleChecker instance filtering files from a full list.""" + filtered_files = cls.filter_files(all_files, file_extensions) + print( + f"Creating external runners for {name} with {len(commands)} " + f"commands and {len(filtered_files)} files to check from a " + f"total of {len(all_files)} files." + ) + check_functions = {} + for command in commands: + external_opname = f"External_operation_{command[0]}" + free_runner = cls.create_free_runner(command, external_opname) + check_functions[external_opname] = free_runner + return cls(name, check_functions, filtered_files) + + @staticmethod + def create_free_runner( + command: List[str], external_opname: str + ) -> Callable[[Path], TestResult]: + """Method to create a free runner function for a given external + command with it's checker name for output.""" + + def new_free_runner(file_name: Path) -> TestResult: + cmd = command + [str(file_name)] + tests_failed = 0 try: - cmd = command + [str(file_path)] result = subprocess.run(cmd, capture_output=True, text=True, timeout=60) except subprocess.TimeoutExpired: - file_results.append( - TestResult( - checker_name=test_name, - failure_count=1, - passed=False, - output=f"Checker {test_name} timed out", - errors={test_name: "TimeoutExpired"}, - ) - ) + failure_count = 1 + passed = False + output = f"Checker {external_opname} timed out" + errors = {external_opname: "TimeoutExpired"} tests_failed += 1 except Exception as e: - file_results.append( - TestResult( - checker_name=test_name, - failure_count=1, - passed=False, - output=str(e), - errors={test_name: str(e)}, - ) - ) + failure_count = 1 + passed = False + output = str(e) + errors = {external_opname: str(e)} tests_failed += 1 else: error_text = result.stderr if result.stderr else "" - file_results.append( - TestResult( - checker_name=test_name, - failure_count=0 if result.returncode == 0 else 1, - passed=result.returncode == 0, - output=result.stdout, - errors={test_name: error_text} if error_text else {}, - ) - ) + failure_count = 0 if result.returncode == 0 else 1 + passed = result.returncode == 0 + output = result.stdout + if error_text: + errors = {external_opname: error_text} + else: + errors = {} if result.returncode != 0: tests_failed += 1 + return TestResult( + checker_name=external_opname, + failure_count=failure_count, + passed=passed, + output=output, + errors=errors, + ) + + return new_free_runner + + +class Check_Runner(StyleChecker): + """A subclass of StyleChecker that is used to run external commands on a + file. As such the check method needs to be overridden to to run on the file + rather than load the file and pass the lines to the check function.""" + + def check(self, file_path: Path) -> CheckResult: + """Run external command on file.""" + file_results = [] # list of TestResult objects + for check_name, check_function in self.check_functions.items(): + file_results.append(check_function(file_path)) + tests_failed = sum([0 if result.passed else 1 for result in file_results]) return CheckResult( file_path=str(file_path), tests_failed=tests_failed, @@ -347,17 +308,22 @@ def check_files(self) -> None: """ """ TODO : Poor terminology makes discerning what is actually happening - here hard work. A 'checker' is an instance of a StyleChecker - (sub)class. Each of which has a list of checks to perform and a list of - files to perform them on. e.g. A UMDP3_checker for Fortran files. or - the ExternalChecker for Python files. + here hard work. A 'checker' is an instance of the StyleChecker + class. Each of which has a list of checks to perform and a list of + files to perform them on. However, when the 'results' are collected, each result is for a single file+check pair and holds no information about which 'checker' it was part of. Thus some files can be checked by multiple checkers, and the filename will appear multiple times in the output. Not Good! """ results = [] - # print(f"About to use {len(self.checkers)} checkers") + + """TODO : The current implementation creates a thread for each + (checker, file), however a chat with Ollie suggests a better approach + would be to switch to multiprocessing and create a pool of workers, and + then have each worker run all the checks for a given file. This would + reduce the overhead of creating threads and allow for better use of + resources.""" with concurrent.futures.ThreadPoolExecutor( max_workers=self.max_workers ) as executor: @@ -369,7 +335,6 @@ def check_files(self) -> None: for future in concurrent.futures.as_completed(future_to_task): result = future.result() - # print(f"Completed check for file: {result}") results.append(result) self.results = results return @@ -571,30 +536,34 @@ def create_style_checkers( if print_volume >= 3: print("Configuring Fortran checkers:") combined_checkers = fortran_diff_table | fortran_file_table | generic_file_table - fortran_file_checker = UMDP3_checker.from_full_list( + fortran_file_checker = StyleChecker.from_full_list( "Fortran Checker", file_extensions, combined_checkers, changed_files ) checkers.append(fortran_file_checker) if "Python" in file_types: - if print_volume >= 3: - print("Configuring External Python checkers:") + print("Configuring External Linters for Python files.") file_extensions = {".py"} - python_checkers = { - # "flake 8": ["flake8", "-q"], - # "black": ["black", "--check"], - # "pylint": ["pylint", "-E"], - "ruff": ["ruff", "check"], - } - python_file_checker = ExternalChecker( - "External Python Checkers", file_extensions, python_checkers, changed_files + external_commands = [ + ["ruff", "check"], + """TODO : The following need 'tweaking' to replicate what's run as + part of the CI on GitHub.""", + # ["flake8", "-q"], + # ["black", "--check"], + # ["pylint", "-E"], + ] + python_file_checker = Check_Runner.create_external_runners( + "Python External Checkers", + external_commands, + changed_files, + file_extensions, ) checkers.append(python_file_checker) if "Generic" in file_types or file_types == []: if print_volume >= 3: print("Configuring Generic File Checkers:") all_file_dispatch_table = dispatch_tables.get_file_dispatch_table_all() - generic_checker = UMDP3_checker( - "Generic File Checker", set(), all_file_dispatch_table, changed_files + generic_checker = StyleChecker( + "Generic File Checker", all_file_dispatch_table, changed_files ) checkers.append(generic_checker)