From fa59ff506c9aac9ac444fbe4bccc1db63e15f6ca Mon Sep 17 00:00:00 2001 From: r-sharp Date: Fri, 20 Feb 2026 13:00:39 +0000 Subject: [PATCH 1/7] Yet another weird round of having to repeatedly accept the same changes which git / VS Code are marking as in conflict. Annoyingly, this seems to have introduced a bug as well. --- script_umdp3_checker/umdp3_conformance.py | 320 +++++++++++----------- 1 file changed, 159 insertions(+), 161 deletions(-) diff --git a/script_umdp3_checker/umdp3_conformance.py b/script_umdp3_checker/umdp3_conformance.py index ce768ebc..c716ce91 100644 --- a/script_umdp3_checker/umdp3_conformance.py +++ b/script_umdp3_checker/umdp3_conformance.py @@ -103,8 +103,11 @@ def get_branch_name(self) -> str: return self.bdiff_obj.branch -class StyleChecker(ABC): - """Abstract base class for style checkers.""" +class StyleChecker(): + """A Class intended to coordinate the running of a set of style checks on + a set of files. + The checks are a dictionary of named callable routines. + The files are a list of file paths to check.""" """ TODO: This is where it might be good to set up a threadsafe @@ -119,87 +122,21 @@ class instance to hold the 'expanded' check outputs. a TestResult object directly, which includes the extra error info, so that each thread can work independently.""" name: str - file_extensions: Set[str] + # file_extensions: Set[str] check_functions: Dict[str, Callable] files_to_check: List[Path] def __init__( self, name: str, - file_extensions: Set[str], + # file_extensions: Set[str], check_functions: Dict[str, Callable], changed_files: List[Path] = [], ): self.name = name - self.file_extensions = file_extensions or set() + # self.file_extensions = file_extensions or set() self.check_functions = check_functions or {} - self.files_to_check = ( - self.filter_files(changed_files, self.file_extensions) - if changed_files - else [] - ) - - @abstractmethod - def get_name(self) -> str: - """Return the name of this checker.""" - pass - - @abstractmethod - def check(self, file_path: Path) -> CheckResult: - """Run the style checker on a file.""" - pass - - @classmethod - def from_full_list( - cls, - name: str, - file_extensions: Set[str], - check_functions: Dict[str, Callable], - all_files: List[Path], - ) -> "StyleChecker": - """Create a StyleChecker instance filtering files from a full list.""" - filtered_files = cls.filter_files(all_files, file_extensions) - return cls(name, file_extensions, check_functions, filtered_files) - - @staticmethod - def filter_files( - files: List[Path], file_extensions: Set[str] = set() - ) -> List[Path]: - """Filter files based on the checker's file extensions.""" - if not file_extensions: - return files - return [f for f in files if f.suffix in file_extensions] - - -class UMDP3_checker(StyleChecker): - """UMDP3 built-in style checker.""" - - files_to_check: List[Path] - - def __init__( - self, - name: str, - file_extensions: Set[str], - check_functions: Dict[str, Callable], - changed_files: List[Path] = [], - print_volume: int = 3, - ): - self.name = name - self.file_extensions = file_extensions or set() - self.check_functions = check_functions or {} - self.files_to_check = ( - super().filter_files(changed_files, self.file_extensions) - if changed_files - else [] - ) - if print_volume >= 5: - print( - f"UMDP3_checker initialized :\n" - f" Name : {self.name}\n" - f" Has {len(self.check_functions)} check functions\n" - f" Using {len(self.file_extensions)} file extensions\n" - f" Gives {len(self.files_to_check)} files to check." - ) + self.files_to_check = changed_files or [] def get_name(self) -> str: return self.name @@ -211,7 +148,8 @@ def check(self, file_path: Path) -> CheckResult: for check_name, check_function in self.check_functions.items(): file_results.append(check_function(lines)) - tests_failed = sum([0 if result.passed else 1 for result in file_results]) + tests_failed = sum([0 if result.passed else 1 for + result in file_results]) return CheckResult( file_path=str(file_path), tests_failed=tests_failed, @@ -219,89 +157,119 @@ def check(self, file_path: Path) -> CheckResult: test_results=file_results, ) - -class ExternalChecker(StyleChecker): - """Wrapper for external style checking tools.""" - - """ - TODO : This is overriding the 'syle type hint from the base class. - As we're currently passing in a list of strings to pass to 'subcommand'. - Ideally we should be making callable functions for each check, but that - would require more refactoring of the code. - Is that a 'factory' method?""" - check_commands: Dict[str, List[str]] - - def __init__( - self, + @classmethod + def from_full_list( + cls, name: str, file_extensions: Set[str], check_functions: Dict[str, List[str]], changed_files: List[Path], print_volume: int = 3, ): - self.name = name - self.file_extensions = file_extensions or set() - self.check_commands = check_functions or {} - self.files_to_check = ( - super().filter_files(changed_files, self.file_extensions) + cls.name = name + cls.file_extensions = file_extensions or set() + cls.check_commands = check_functions or {} + cls.files_to_check = ( + cls.filter_files(changed_files, cls.file_extensions) if changed_files else [] ) if print_volume >= 5: print( f"ExternalChecker initialized :\n" - f" Name : {self.name}\n" - f" Has {len(self.check_commands)} check commands\n" - f" Using {len(self.file_extensions)} file extensions\n" - f" Gives {len(self.files_to_check)} files to check." + f" Name : {cls.name}\n" + f" Has {len(cls.check_commands)} check commands\n" + f" Using {len(cls.file_extensions)} file extensions\n" + f" Gives {len(cls.files_to_check)} files to check." ) - def get_name(self) -> str: - return self.name + @staticmethod + def filter_files( + files: List[Path], file_extensions: Set[str] = set() + ) -> List[Path]: + """Filter files based on the checker's file extensions.""" + if not file_extensions: + return files + return [f for f in files if f.suffix in file_extensions] - def check(self, file_path: Path) -> CheckResult: - """Run external checker commands on file.""" - file_results = [] - tests_failed = 0 - for test_name, command in self.check_commands.items(): - try: - cmd = command + [str(file_path)] - result = subprocess.run(cmd, capture_output=True, text=True, timeout=60) - except subprocess.TimeoutExpired: - file_results.append( - TestResult( - checker_name=test_name, - failure_count=1, - passed=False, - output=f"Checker {test_name} timed out", - errors={test_name: "TimeoutExpired"}, - ) - ) - tests_failed += 1 - except Exception as e: - file_results.append( - TestResult( - checker_name=test_name, - failure_count=1, - passed=False, - output=str(e), - errors={test_name: str(e)}, - ) - ) - tests_failed += 1 - else: - error_text = result.stderr if result.stderr else "" - file_results.append( - TestResult( - checker_name=test_name, - failure_count=0 if result.returncode == 0 else 1, - passed=result.returncode == 0, - output=result.stdout, - errors={test_name: error_text} if error_text else {}, - ) - ) - if result.returncode != 0: + @classmethod + def create_external_runners( + cls, + name: str, + commands: List[List[str]], + all_files: List[Path], + file_extensions: Set[str] = set(), + ) -> "StyleChecker": + """Create a StyleChecker instance filtering files from a full list.""" + filtered_files = cls.filter_files(all_files, file_extensions) + print(f"Creating external runners for {name} with {len(commands)} " + f"commands and {len(filtered_files)} files to check from a " + f"total of {len(all_files)} files.") + check_functions = {} + # file_results = [] + for command in commands: + print("Configuring external runner for command: " + f"{' '.join(command)}") + external_opname = f"External_operation_{command[0]}" + print(f"Checker name for this command is : {external_opname}") + + def new_mangle(file_name: Path) -> TestResult: + cmd = command + [str(file_name)] + tests_failed = 0 + try: + result = subprocess.run(cmd, capture_output=True, + text=True, timeout=60) + except subprocess.TimeoutExpired: + failure_count = 1 + passed = False + output = f"Checker {external_opname} timed out" + errors = {external_opname: "TimeoutExpired"} tests_failed += 1 + except Exception as e: + failure_count = 1 + passed = False + output = str(e) + errors = {external_opname: str(e)} + tests_failed += 1 + else: + error_text = result.stderr if result.stderr else "" + failure_count = 0 if result.returncode == 0 else 1 + passed = result.returncode == 0 + output = result.stdout + if error_text: + errors = {external_opname: error_text} + else: + errors = {} + if result.returncode != 0: + tests_failed += 1 + # --- + return (TestResult( + checker_name=external_opname, + failure_count=failure_count, + passed=passed, + output=output, + errors=errors, + )) + + check_functions[external_opname] = new_mangle + return cls(name, check_functions, filtered_files) + + +class Check_Runner(StyleChecker): + """A subclass of StyleChecker that is used to run external commands on a + file. As such the check method needs to be overridden to to run on the file + rather than load the file and pass the lines to the check function.""" + + def check(self, file_path: Path) -> CheckResult: + """Run external command on file.""" + file_results = [] # list of TestResult objects + for check_name, check_function in self.check_functions.items(): + file_results.append(check_function(file_path)) + print(f"Finished running {check_name} on {file_path}. ") + print(f"Self described as: {file_results[-1].checker_name}") + + tests_failed = sum([0 if result.passed else 1 for result in + file_results]) return CheckResult( file_path=str(file_path), tests_failed=tests_failed, @@ -339,17 +307,22 @@ def check_files(self) -> None: """ """ TODO : Poor terminology makes discerning what is actually happening - here hard work. A 'checker' is an instance of a StyleChecker - (sub)class. Each of which has a list of checks to perform and a list of - files to perform them on. e.g. A UMDP3_checker for Fortran files. or - the ExternalChecker for Python files. + here hard work. A 'checker' is an instance of the StyleChecker + class. Each of which has a list of checks to perform and a list of + files to perform them on. However, when the 'results' are collected, each result is for a single file+check pair and holds no information about which 'checker' it was part of. Thus some files can be checked by multiple checkers, and the filename will appear multiple times in the output. Not Good! """ results = [] - # print(f"About to use {len(self.checkers)} checkers") + + """TODO : The current implementation creates a thread for each + (checker, file), however a chat with Ollie suggests a better approach + would be to switch to multiprocessing and create a pool of workers, and + then have each worker run all the checks for a given file. This would + reduce the overhead of creating threads and allow for better use of + resources.""" with concurrent.futures.ThreadPoolExecutor( max_workers=self.max_workers ) as executor: @@ -361,12 +334,12 @@ def check_files(self) -> None: for future in concurrent.futures.as_completed(future_to_task): result = future.result() - # print(f"Completed check for file: {result}") results.append(result) self.results = results return - def print_results(self, print_volume: int = 3, quiet_pass: bool = True) -> bool: + def print_results(self, print_volume: int = 3, + quiet_pass: bool = True) -> bool: """Print results and return True if all checks passed. ========================================================""" """ @@ -400,7 +373,8 @@ def print_results(self, print_volume: int = 3, quiet_pass: bool = True) -> bool: for count, (title, info) in enumerate( test_result.errors.items() ): - print(" " * 8 + f"{count + 1:2} : {title} : {info}") + print(" " * 8 + + f"{count + 1:2} : {title} : {info}") print(" " * 8 + line_2(82)) elif print_volume >= 3: print(f" {test_result.checker_name:60s} : ✓ PASS") @@ -433,7 +407,8 @@ def process_arguments(): "-p", "--path", type=str, default="./", help="path to repository" ) parser.add_argument( - "--max-workers", type=int, default=8, help="Maximum number of parallel workers" + "--max-workers", type=int, default=8, + help="Maximum number of parallel workers" ) parser.add_argument( "--fullcheck", @@ -449,10 +424,12 @@ def process_arguments(): ) group = parser.add_mutually_exclusive_group() group.add_argument( - "-v", "--verbose", action="count", default=0, help="Increase output verbosity" + "-v", "--verbose", action="count", default=0, + help="Increase output verbosity" ) group.add_argument( - "-q", "--quiet", action="count", default=0, help="Decrease output verbosity" + "-q", "--quiet", action="count", default=0, + help="Decrease output verbosity" ) # The following are not yet implemented, but may become useful # branch and base branch could be used to configure the CMS diff @@ -536,18 +513,21 @@ def create_style_checkers( dispatch_tables = CheckerDispatchTables() checkers = [] if "Fortran" in file_types: - file_extensions = {".f", ".for", ".f90", ".f95", ".f03", ".f08", ".F90"} + file_extensions = {".f", ".for", ".f90", ".f95", + ".f03", ".f08", ".F90"} fortran_diff_table = dispatch_tables.get_diff_dispatch_table_fortran() fortran_file_table = dispatch_tables.get_file_dispatch_table_fortran() generic_file_table = dispatch_tables.get_file_dispatch_table_all() if print_volume >= 3: print("Configuring Fortran checkers:") - combined_checkers = fortran_diff_table | fortran_file_table | generic_file_table - fortran_file_checker = UMDP3_checker.from_full_list( - "Fortran Checker", file_extensions, combined_checkers, changed_files + combined_checkers = fortran_diff_table | fortran_file_table | \ + generic_file_table + fortran_file_checker = StyleChecker.from_full_list( + "Fortran Checker", file_extensions, + combined_checkers, changed_files ) checkers.append(fortran_file_checker) - if "Python" in file_types: + """if "Python" in file_types: if print_volume >= 3: print("Configuring External Python checkers:") file_extensions = {".py"} @@ -558,15 +538,31 @@ def create_style_checkers( "ruff": ["ruff", "check"], } python_file_checker = ExternalChecker( - "External Python Checkers", file_extensions, python_checkers, changed_files + "External Python Checkers", file_extensions, python_checkers, + changed_files + ) + checkers.append(python_file_checker) """ + if "Python" in file_types: + print("Setting up External Runners for Python files.") + file_extensions = {".py"} + external_commands = [ + ["ruff", "check"], + ["flake8", "-q"], + ["black", "--check"], + ["pylint", "-E"], + ] + python_file_checker = Check_Runner.create_external_runners( + "Python External Checkers", external_commands, changed_files, + file_extensions ) checkers.append(python_file_checker) if "Generic" in file_types or file_types == []: if print_volume >= 3: print("Configuring Generic File Checkers:") all_file_dispatch_table = dispatch_tables.get_file_dispatch_table_all() - generic_checker = UMDP3_checker( - "Generic File Checker", set(), all_file_dispatch_table, changed_files + generic_checker = StyleChecker( + "Generic File Checker", all_file_dispatch_table, + changed_files ) checkers.append(generic_checker) @@ -646,12 +642,14 @@ def get_files_to_check( print(line_1(81) + "\n") else: print("Results :") - all_passed = checker.print_results(print_volume=log_volume, quiet_pass=quiet_pass) + all_passed = checker.print_results(print_volume=log_volume, + quiet_pass=quiet_pass) if log_volume >= 4: print("\n" + line_1(81)) print("## Summary :" + " " * 67 + "##") print(line_1(81)) print(f"Total files checked: {len(checker.results)}") - print(f"Total files failed: {sum(1 for r in checker.results if not r.all_passed)}") + print(f"Total files failed: {sum(1 for r in checker.results if + not r.all_passed)}") exit(0 if all_passed else 1) From dba0aee5b62ac4b02d3496a099630e50520afbad Mon Sep 17 00:00:00 2001 From: r-sharp Date: Fri, 20 Feb 2026 16:44:36 +0000 Subject: [PATCH 2/7] Fixing issue with external runners not establishing properly. --- script_umdp3_checker/umdp3_conformance.py | 122 ++++++++++------------ 1 file changed, 56 insertions(+), 66 deletions(-) diff --git a/script_umdp3_checker/umdp3_conformance.py b/script_umdp3_checker/umdp3_conformance.py index c716ce91..9b29ae86 100644 --- a/script_umdp3_checker/umdp3_conformance.py +++ b/script_umdp3_checker/umdp3_conformance.py @@ -208,52 +208,56 @@ def create_external_runners( check_functions = {} # file_results = [] for command in commands: - print("Configuring external runner for command: " - f"{' '.join(command)}") external_opname = f"External_operation_{command[0]}" - print(f"Checker name for this command is : {external_opname}") - - def new_mangle(file_name: Path) -> TestResult: - cmd = command + [str(file_name)] - tests_failed = 0 - try: - result = subprocess.run(cmd, capture_output=True, - text=True, timeout=60) - except subprocess.TimeoutExpired: - failure_count = 1 - passed = False - output = f"Checker {external_opname} timed out" - errors = {external_opname: "TimeoutExpired"} - tests_failed += 1 - except Exception as e: - failure_count = 1 - passed = False - output = str(e) - errors = {external_opname: str(e)} - tests_failed += 1 - else: - error_text = result.stderr if result.stderr else "" - failure_count = 0 if result.returncode == 0 else 1 - passed = result.returncode == 0 - output = result.stdout - if error_text: - errors = {external_opname: error_text} - else: - errors = {} - if result.returncode != 0: - tests_failed += 1 - # --- - return (TestResult( - checker_name=external_opname, - failure_count=failure_count, - passed=passed, - output=output, - errors=errors, - )) - - check_functions[external_opname] = new_mangle + free_runner = cls.create_free_runner(command, external_opname) + check_functions[external_opname] = free_runner return cls(name, check_functions, filtered_files) + @staticmethod + def create_free_runner(command: List[str], + external_opname: str) -> Callable[[Path], + TestResult]: + """Method to create a free runner function for a given external + command with it's checker name for output.""" + def new_free_runner(file_name: Path) -> TestResult: + cmd = command + [str(file_name)] + tests_failed = 0 + try: + result = subprocess.run(cmd, capture_output=True, + text=True, timeout=60) + except subprocess.TimeoutExpired: + failure_count = 1 + passed = False + output = f"Checker {external_opname} timed out" + errors = {external_opname: "TimeoutExpired"} + tests_failed += 1 + except Exception as e: + failure_count = 1 + passed = False + output = str(e) + errors = {external_opname: str(e)} + tests_failed += 1 + else: + error_text = result.stderr if result.stderr else "" + failure_count = 0 if result.returncode == 0 else 1 + passed = result.returncode == 0 + output = result.stdout + if error_text: + errors = {external_opname: error_text} + else: + errors = {} + if result.returncode != 0: + tests_failed += 1 + # --- + return (TestResult( + checker_name=external_opname, + failure_count=failure_count, + passed=passed, + output=output, + errors=errors, + )) + return new_free_runner + class Check_Runner(StyleChecker): """A subclass of StyleChecker that is used to run external commands on a @@ -265,9 +269,6 @@ def check(self, file_path: Path) -> CheckResult: file_results = [] # list of TestResult objects for check_name, check_function in self.check_functions.items(): file_results.append(check_function(file_path)) - print(f"Finished running {check_name} on {file_path}. ") - print(f"Self described as: {file_results[-1].checker_name}") - tests_failed = sum([0 if result.passed else 1 for result in file_results]) return CheckResult( @@ -524,32 +525,21 @@ def create_style_checkers( generic_file_table fortran_file_checker = StyleChecker.from_full_list( "Fortran Checker", file_extensions, - combined_checkers, changed_files + combined_checkers, changed_files # type: ignore ) + # TODO : The type:ignore above disables something that pylance is + # complaining about in VS Code, (the combined_checkers argument) + # but I can't translate the 'explanation' of what the issue is into + # something meaningful. checkers.append(fortran_file_checker) - """if "Python" in file_types: - if print_volume >= 3: - print("Configuring External Python checkers:") - file_extensions = {".py"} - python_checkers = { - # "flake 8": ["flake8", "-q"], - # "black": ["black", "--check"], - # "pylint": ["pylint", "-E"], - "ruff": ["ruff", "check"], - } - python_file_checker = ExternalChecker( - "External Python Checkers", file_extensions, python_checkers, - changed_files - ) - checkers.append(python_file_checker) """ if "Python" in file_types: - print("Setting up External Runners for Python files.") + print("Configuring External Linters for Python files.") file_extensions = {".py"} external_commands = [ ["ruff", "check"], - ["flake8", "-q"], - ["black", "--check"], - ["pylint", "-E"], + # ["flake8", "-q"], + # ["black", "--check"], + # ["pylint", "-E"], ] python_file_checker = Check_Runner.create_external_runners( "Python External Checkers", external_commands, changed_files, From f257a5141fbc7cf2555d3684219e4987be883749 Mon Sep 17 00:00:00 2001 From: r-sharp Date: Fri, 20 Feb 2026 18:05:26 +0000 Subject: [PATCH 3/7] Fixing an error that seems to have crept in during merging main and resolving clashes. But looking at it, I really can't fathom out 'how' --- script_umdp3_checker/umdp3_conformance.py | 30 +++++++++-------------- 1 file changed, 12 insertions(+), 18 deletions(-) diff --git a/script_umdp3_checker/umdp3_conformance.py b/script_umdp3_checker/umdp3_conformance.py index 9b29ae86..3b71efe0 100644 --- a/script_umdp3_checker/umdp3_conformance.py +++ b/script_umdp3_checker/umdp3_conformance.py @@ -131,7 +131,7 @@ def __init__( name: str, # file_extensions: Set[str], check_functions: Dict[str, Callable], - changed_files: List[Path] = [], + changed_files: List[Path], ): self.name = name # self.file_extensions = file_extensions or set() @@ -161,27 +161,25 @@ def check(self, file_path: Path) -> CheckResult: def from_full_list( cls, name: str, - file_extensions: Set[str], - check_functions: Dict[str, List[str]], - changed_files: List[Path], + file_extensions: Set[str] = set(), + check_functions: Dict[str, Callable] = {}, + changed_files: List[Path] = [], print_volume: int = 3, ): - cls.name = name - cls.file_extensions = file_extensions or set() - cls.check_commands = check_functions or {} - cls.files_to_check = ( - cls.filter_files(changed_files, cls.file_extensions) + files_to_check = ( + cls.filter_files(changed_files, file_extensions) if changed_files else [] ) if print_volume >= 5: print( f"ExternalChecker initialized :\n" - f" Name : {cls.name}\n" - f" Has {len(cls.check_commands)} check commands\n" - f" Using {len(cls.file_extensions)} file extensions\n" - f" Gives {len(cls.files_to_check)} files to check." + f" Name : {name}\n" + f" Has {len(check_functions)} check commands\n" + f" Using {len(file_extensions)} file extensions\n" + f" Gives {len(files_to_check)} files to check." ) + return cls(name, check_functions, files_to_check) @staticmethod def filter_files( @@ -525,12 +523,8 @@ def create_style_checkers( generic_file_table fortran_file_checker = StyleChecker.from_full_list( "Fortran Checker", file_extensions, - combined_checkers, changed_files # type: ignore + combined_checkers, changed_files ) - # TODO : The type:ignore above disables something that pylance is - # complaining about in VS Code, (the combined_checkers argument) - # but I can't translate the 'explanation' of what the issue is into - # something meaningful. checkers.append(fortran_file_checker) if "Python" in file_types: print("Configuring External Linters for Python files.") From 77fcbf156eeccfbf1c29deb58021b5b856bcd7ad Mon Sep 17 00:00:00 2001 From: r-sharp Date: Fri, 20 Feb 2026 18:06:37 +0000 Subject: [PATCH 4/7] Where do these line length errors in the linter(s) come from - I haven't edited this file... --- script_umdp3_checker/umdp3_checker_rules.py | 74 ++++++++++++++------- 1 file changed, 49 insertions(+), 25 deletions(-) diff --git a/script_umdp3_checker/umdp3_checker_rules.py b/script_umdp3_checker/umdp3_checker_rules.py index c18270da..2f09f45a 100644 --- a/script_umdp3_checker/umdp3_checker_rules.py +++ b/script_umdp3_checker/umdp3_checker_rules.py @@ -254,7 +254,8 @@ def unseparated_keywords(self, lines: List[str]) -> TestResult: clean_line = self.remove_quoted(line) for pattern in [f"\\b{kw}\\b" for kw in unseparated_keywords_list]: if re.search(pattern, clean_line, re.IGNORECASE): - self.add_extra_error(f"unseparated keyword in line: {line.strip()}") + self.add_extra_error(f"unseparated keyword in line: " + f"{line.strip()}") failures += 1 error_log = self.add_error_log( error_log, @@ -279,7 +280,8 @@ def go_to_other_than_9999(self, lines: List[str]) -> TestResult: clean_line = self.remove_quoted(line) clean_line = re.sub(r"!.*$", "", clean_line) - if match := re.search(r"\bGO\s*TO\s+(\d+)", clean_line, re.IGNORECASE): + if match := re.search(r"\bGO\s*TO\s+(\d+)", clean_line, + re.IGNORECASE): label = match.group(1) if label != "9999": self.add_extra_error(f"GO TO {label}") @@ -305,10 +307,12 @@ def write_using_default_format(self, lines: List[str]) -> TestResult: clean_line = self.remove_quoted(line) clean_line = re.sub(r"!.*$", "", clean_line) - if re.search(r"\bWRITE\s*\(\s*\*\s*,\s*\*\s*\)", clean_line, re.IGNORECASE): + if re.search(r"\bWRITE\s*\(\s*\*\s*,\s*\*\s*\)", clean_line, + re.IGNORECASE): self.add_extra_error("WRITE(*,*) found") failures += 1 - error_log = self.add_error_log(error_log, "WRITE(*,*) found", count + 1) + error_log = self.add_error_log(error_log, "WRITE(*,*) found", + count + 1) output = f"Checked {count + 1} lines, found {failures} failures." return TestResult( checker_name="WRITE using default format", @@ -344,8 +348,9 @@ def lowercase_variable_names(self, lines: List[str]) -> TestResult: "", clean_line, ) - if re.search(r"[A-Z]{2,}", clean_line): - self.add_extra_error("UPPERCASE variable name") + if match := re.search(r"([A-Z]{2,})", clean_line): + self.add_extra_error("UPPERCASE variable name : " + f"{match[1]}") failures += 1 error_log = self.add_error_log( error_log, "UPPERCASE variable name", count + 1 @@ -418,7 +423,8 @@ def forbidden_keywords(self, lines: List[str]) -> TestResult: clean_line = self.remove_quoted(line) clean_line = re.sub(r"!.*$", "", clean_line) - if re.search(r"\b(EQUIVALENCE|PAUSE)\b", clean_line, re.IGNORECASE): + if re.search(r"\b(EQUIVALENCE|PAUSE)\b", clean_line, + re.IGNORECASE): self.add_extra_error("forbidden keyword") failures += 1 error_log = self.add_error_log( @@ -453,7 +459,8 @@ def forbidden_operators(self, lines: List[str]) -> TestResult: ) return TestResult( - checker_name="Use of older form of relational operator " + "(.GT. etc.)", + checker_name="Use of older form of relational operator " + + "(.GT. etc.)", failure_count=failures, passed=(failures == 0), output=f"Checked {count + 1} lines, found {failures} failures.", @@ -469,7 +476,8 @@ def line_over_80chars(self, lines: List[str]) -> TestResult: if len(line.rstrip()) > 80: self.add_extra_error("line too long") failures += 1 - error_log = self.add_error_log(error_log, "line too long", count + 1) + error_log = self.add_error_log(error_log, "line too long", + count + 1) return TestResult( checker_name="Line longer than 80 characters", @@ -533,7 +541,8 @@ def printstar(self, lines: List[str]) -> TestResult: if re.search(r"\bPRINT\s*\*", clean_line, re.IGNORECASE): self.add_extra_error("PRINT * used") failures += 1 - error_log = self.add_error_log(error_log, "PRINT * used", count + 1) + error_log = self.add_error_log(error_log, "PRINT * used", + count + 1) return TestResult( checker_name="Use of PRINT rather than umMessage and umPrint", @@ -555,7 +564,8 @@ def write6(self, lines: List[str]) -> TestResult: if re.search(r"\bWRITE\s*\(\s*6\s*,", clean_line, re.IGNORECASE): self.add_extra_error("WRITE(6) used") failures += 1 - error_log = self.add_error_log(error_log, "WRITE(6) used", count + 1) + error_log = self.add_error_log(error_log, "WRITE(6) used", + count + 1) return TestResult( checker_name="Use of WRITE(6) rather than umMessage and umPrint", @@ -611,10 +621,12 @@ def omp_missing_dollar(self, lines: List[str]) -> TestResult: error_log = {} count = -1 for count, line in enumerate(lines): - if re.search(r"!\s*OMP\b", line) and not re.search(r"!\$OMP", line): + if (re.search(r"!\s*OMP\b", line) and + not re.search(r"!\$OMP", line)): self.add_extra_error("!OMP without $") failures += 1 - error_log = self.add_error_log(error_log, "!OMP without $", count + 1) + error_log = self.add_error_log(error_log, "!OMP without $", + count + 1) return TestResult( checker_name="!OMP without $", @@ -662,7 +674,8 @@ def cpp_comment(self, lines: List[str]) -> TestResult: self.add_extra_error("Fortran comment in CPP directive") failures += 1 error_log = self.add_error_log( - error_log, "Fortran comment in CPP directive", count + 1 + error_log, "Fortran comment in CPP directive", + count + 1 ) return TestResult( @@ -687,7 +700,8 @@ def obsolescent_fortran_intrinsic(self, lines: List[str]) -> TestResult: self.add_extra_error(f"obsolescent intrinsic: {intrinsic}") failures += 1 error_log = self.add_error_log( - error_log, f"obsolescent intrinsic: {intrinsic}", count + 1 + error_log, f"obsolescent intrinsic: {intrinsic}", + count + 1 ) return TestResult( @@ -735,8 +749,10 @@ def intrinsic_modules(self, lines: List[str]) -> TestResult: for module in intrinsic_modules: if re.search( rf"\bUSE\s+(::)*\s*{module}\b", clean_line, re.IGNORECASE - ) and not re.search(r"\bINTRINSIC\b", clean_line, re.IGNORECASE): - self.add_extra_error(f"intrinsic module {module} without INTRINSIC") + ) and not re.search(r"\bINTRINSIC\b", clean_line, + re.IGNORECASE): + self.add_extra_error(f"intrinsic module {module} " + "without INTRINSIC") failures += 1 error_log = self.add_error_log( error_log, @@ -761,7 +777,8 @@ def read_unit_args(self, lines: List[str]) -> TestResult: clean_line = self.remove_quoted(line) clean_line = re.sub(r"!.*$", "", clean_line) - if match := re.search(r"\bREAD\s*\(\s*([^,)]+)", clean_line, re.IGNORECASE): + if match := re.search(r"\bREAD\s*\(\s*([^,)]+)", clean_line, + re.IGNORECASE): first_arg = match.group(1).strip() if not first_arg.upper().startswith("UNIT="): self.add_extra_error("READ without explicit UNIT=") @@ -803,7 +820,8 @@ def retire_if_def(self, lines: List[str]) -> TestResult: self.add_extra_error(f"retired if-def: {match.group(1)}") failures += 1 error_log = self.add_error_log( - error_log, f"retired if-def: {match.group(1)}", count + 1 + error_log, f"retired if-def: {match.group(1)}", + count + 1 ) return TestResult( checker_name="retired if-def", @@ -845,7 +863,8 @@ def forbidden_stop(self, lines: List[str]) -> TestResult: clean_line = self.remove_quoted(line) clean_line = re.sub(r"!.*$", "", clean_line) - if re.search(r"\b(STOP|CALL\s+abort)\b", clean_line, re.IGNORECASE): + if re.search(r"\b(STOP|CALL\s+abort)\b", clean_line, + re.IGNORECASE): self.add_extra_error("STOP or CALL abort used") failures += 1 error_log = self.add_error_log( @@ -910,7 +929,8 @@ def check_crown_copyright(self, lines: List[str]) -> TestResult: found_copyright = True if not found_copyright: - self.add_extra_error("missing copyright or crown copyright statement") + self.add_extra_error( + "missing copyright or crown copyright statement") error_log = self.add_error_log( error_log, "missing copyright or crown copyright statement", 0 ) @@ -935,14 +955,16 @@ def check_code_owner(self, lines: List[str]) -> TestResult: file_content = "\n".join(lines) found_code_owner = False error_log = {} - if "Code Owner:" in file_content or "code owner" in file_content.lower(): + if ("Code Owner:" in file_content or + "code owner" in file_content.lower()): # print(f"Debug: Found {file_content.lower()}") found_code_owner = True # This is often a warning rather than an error if not found_code_owner: self.add_extra_error("missing code owner comment") - error_log = self.add_error_log(error_log, "missing code owner comment", 0) + error_log = self.add_error_log(error_log, + "missing code owner comment", 0) return TestResult( checker_name="Code Owner Comment", failure_count=0 if found_code_owner else 1, @@ -1014,7 +1036,8 @@ def c_deprecated(self, lines: List[str]) -> int: for line in lines: for identifier in deprecated_c_identifiers: if re.search(rf"\b{identifier}\b", line): - self.add_extra_error(f"deprecated C identifier: {identifier}") + self.add_extra_error( + f"deprecated C identifier: {identifier}") failures += 1 return failures @@ -1041,7 +1064,8 @@ def c_openmp_define_no_combine(self, lines: List[str]) -> int: ) or re.search( r"&&.*_OPENMP.*&&.*SHUM_USE_C_OPENMP_VIA_THREAD_UTILS", line ): - self.add_extra_error("OpenMP defines combined with third macro") + self.add_extra_error( + "OpenMP defines combined with third macro") failures += 1 return failures From d51bc165ea8cdf8ffc16b0dff754b7510ed72edd Mon Sep 17 00:00:00 2001 From: r-sharp Date: Fri, 20 Feb 2026 18:21:09 +0000 Subject: [PATCH 5/7] quick "improvement" to the error reporting in one of the tests --- script_umdp3_checker/umdp3_checker_rules.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/script_umdp3_checker/umdp3_checker_rules.py b/script_umdp3_checker/umdp3_checker_rules.py index 2f09f45a..210bf27b 100644 --- a/script_umdp3_checker/umdp3_checker_rules.py +++ b/script_umdp3_checker/umdp3_checker_rules.py @@ -353,7 +353,8 @@ def lowercase_variable_names(self, lines: List[str]) -> TestResult: f"{match[1]}") failures += 1 error_log = self.add_error_log( - error_log, "UPPERCASE variable name", count + 1 + error_log, f"UPPERCASE variable name {match[1]}", + count + 1 ) output = f"Checked {count + 1} lines, found {failures} failures." From 5b9c4a85434b6f3d759875bbeaac4176621e83e6 Mon Sep 17 00:00:00 2001 From: r-sharp Date: Fri, 20 Feb 2026 18:36:07 +0000 Subject: [PATCH 6/7] Why am I constantly having to re-impliment fixes/tidying I'm sure I've done before. --- script_umdp3_checker/umdp3_conformance.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/script_umdp3_checker/umdp3_conformance.py b/script_umdp3_checker/umdp3_conformance.py index 3b71efe0..d8da66b5 100644 --- a/script_umdp3_checker/umdp3_conformance.py +++ b/script_umdp3_checker/umdp3_conformance.py @@ -11,7 +11,6 @@ # Add custom modules to Python path if needed # Add the repository root to access fcm_bdiff and git_bdiff packages import sys - sys.path.insert(0, str(Path(__file__).parent.parent)) """ @@ -122,19 +121,16 @@ class instance to hold the 'expanded' check outputs. a TestResult object directly, which includes the extra error info, so that each thread can work independently.""" name: str - # file_extensions: Set[str] check_functions: Dict[str, Callable] files_to_check: List[Path] def __init__( self, name: str, - # file_extensions: Set[str], check_functions: Dict[str, Callable], changed_files: List[Path], ): self.name = name - # self.file_extensions = file_extensions or set() self.check_functions = check_functions or {} self.files_to_check = changed_files or [] @@ -204,7 +200,6 @@ def create_external_runners( f"commands and {len(filtered_files)} files to check from a " f"total of {len(all_files)} files.") check_functions = {} - # file_results = [] for command in commands: external_opname = f"External_operation_{command[0]}" free_runner = cls.create_free_runner(command, external_opname) @@ -246,7 +241,6 @@ def new_free_runner(file_name: Path) -> TestResult: errors = {} if result.returncode != 0: tests_failed += 1 - # --- return (TestResult( checker_name=external_opname, failure_count=failure_count, @@ -531,6 +525,8 @@ def create_style_checkers( file_extensions = {".py"} external_commands = [ ["ruff", "check"], + """TODO : The following need 'tweaking' to replicate what's run as + part of the CI on GitHub.""" # ["flake8", "-q"], # ["black", "--check"], # ["pylint", "-E"], From ad4d9a36a4fec1ce1470825f2fb346504babdc22 Mon Sep 17 00:00:00 2001 From: r-sharp Date: Fri, 20 Feb 2026 19:20:50 +0000 Subject: [PATCH 7/7] And now ruff_format wants to change them back again... --- script_umdp3_checker/umdp3_checker_rules.py | 75 +++++++------------ script_umdp3_checker/umdp3_conformance.py | 83 +++++++++------------ 2 files changed, 62 insertions(+), 96 deletions(-) diff --git a/script_umdp3_checker/umdp3_checker_rules.py b/script_umdp3_checker/umdp3_checker_rules.py index 210bf27b..b02db535 100644 --- a/script_umdp3_checker/umdp3_checker_rules.py +++ b/script_umdp3_checker/umdp3_checker_rules.py @@ -254,8 +254,7 @@ def unseparated_keywords(self, lines: List[str]) -> TestResult: clean_line = self.remove_quoted(line) for pattern in [f"\\b{kw}\\b" for kw in unseparated_keywords_list]: if re.search(pattern, clean_line, re.IGNORECASE): - self.add_extra_error(f"unseparated keyword in line: " - f"{line.strip()}") + self.add_extra_error(f"unseparated keyword in line: {line.strip()}") failures += 1 error_log = self.add_error_log( error_log, @@ -280,8 +279,7 @@ def go_to_other_than_9999(self, lines: List[str]) -> TestResult: clean_line = self.remove_quoted(line) clean_line = re.sub(r"!.*$", "", clean_line) - if match := re.search(r"\bGO\s*TO\s+(\d+)", clean_line, - re.IGNORECASE): + if match := re.search(r"\bGO\s*TO\s+(\d+)", clean_line, re.IGNORECASE): label = match.group(1) if label != "9999": self.add_extra_error(f"GO TO {label}") @@ -307,12 +305,10 @@ def write_using_default_format(self, lines: List[str]) -> TestResult: clean_line = self.remove_quoted(line) clean_line = re.sub(r"!.*$", "", clean_line) - if re.search(r"\bWRITE\s*\(\s*\*\s*,\s*\*\s*\)", clean_line, - re.IGNORECASE): + if re.search(r"\bWRITE\s*\(\s*\*\s*,\s*\*\s*\)", clean_line, re.IGNORECASE): self.add_extra_error("WRITE(*,*) found") failures += 1 - error_log = self.add_error_log(error_log, "WRITE(*,*) found", - count + 1) + error_log = self.add_error_log(error_log, "WRITE(*,*) found", count + 1) output = f"Checked {count + 1} lines, found {failures} failures." return TestResult( checker_name="WRITE using default format", @@ -349,12 +345,10 @@ def lowercase_variable_names(self, lines: List[str]) -> TestResult: clean_line, ) if match := re.search(r"([A-Z]{2,})", clean_line): - self.add_extra_error("UPPERCASE variable name : " - f"{match[1]}") + self.add_extra_error(f"UPPERCASE variable name : {match[1]}") failures += 1 error_log = self.add_error_log( - error_log, f"UPPERCASE variable name {match[1]}", - count + 1 + error_log, f"UPPERCASE variable name {match[1]}", count + 1 ) output = f"Checked {count + 1} lines, found {failures} failures." @@ -424,8 +418,7 @@ def forbidden_keywords(self, lines: List[str]) -> TestResult: clean_line = self.remove_quoted(line) clean_line = re.sub(r"!.*$", "", clean_line) - if re.search(r"\b(EQUIVALENCE|PAUSE)\b", clean_line, - re.IGNORECASE): + if re.search(r"\b(EQUIVALENCE|PAUSE)\b", clean_line, re.IGNORECASE): self.add_extra_error("forbidden keyword") failures += 1 error_log = self.add_error_log( @@ -460,8 +453,7 @@ def forbidden_operators(self, lines: List[str]) -> TestResult: ) return TestResult( - checker_name="Use of older form of relational operator " + - "(.GT. etc.)", + checker_name="Use of older form of relational operator " + "(.GT. etc.)", failure_count=failures, passed=(failures == 0), output=f"Checked {count + 1} lines, found {failures} failures.", @@ -477,8 +469,7 @@ def line_over_80chars(self, lines: List[str]) -> TestResult: if len(line.rstrip()) > 80: self.add_extra_error("line too long") failures += 1 - error_log = self.add_error_log(error_log, "line too long", - count + 1) + error_log = self.add_error_log(error_log, "line too long", count + 1) return TestResult( checker_name="Line longer than 80 characters", @@ -542,8 +533,7 @@ def printstar(self, lines: List[str]) -> TestResult: if re.search(r"\bPRINT\s*\*", clean_line, re.IGNORECASE): self.add_extra_error("PRINT * used") failures += 1 - error_log = self.add_error_log(error_log, "PRINT * used", - count + 1) + error_log = self.add_error_log(error_log, "PRINT * used", count + 1) return TestResult( checker_name="Use of PRINT rather than umMessage and umPrint", @@ -565,8 +555,7 @@ def write6(self, lines: List[str]) -> TestResult: if re.search(r"\bWRITE\s*\(\s*6\s*,", clean_line, re.IGNORECASE): self.add_extra_error("WRITE(6) used") failures += 1 - error_log = self.add_error_log(error_log, "WRITE(6) used", - count + 1) + error_log = self.add_error_log(error_log, "WRITE(6) used", count + 1) return TestResult( checker_name="Use of WRITE(6) rather than umMessage and umPrint", @@ -622,12 +611,10 @@ def omp_missing_dollar(self, lines: List[str]) -> TestResult: error_log = {} count = -1 for count, line in enumerate(lines): - if (re.search(r"!\s*OMP\b", line) and - not re.search(r"!\$OMP", line)): + if re.search(r"!\s*OMP\b", line) and not re.search(r"!\$OMP", line): self.add_extra_error("!OMP without $") failures += 1 - error_log = self.add_error_log(error_log, "!OMP without $", - count + 1) + error_log = self.add_error_log(error_log, "!OMP without $", count + 1) return TestResult( checker_name="!OMP without $", @@ -675,8 +662,7 @@ def cpp_comment(self, lines: List[str]) -> TestResult: self.add_extra_error("Fortran comment in CPP directive") failures += 1 error_log = self.add_error_log( - error_log, "Fortran comment in CPP directive", - count + 1 + error_log, "Fortran comment in CPP directive", count + 1 ) return TestResult( @@ -701,8 +687,7 @@ def obsolescent_fortran_intrinsic(self, lines: List[str]) -> TestResult: self.add_extra_error(f"obsolescent intrinsic: {intrinsic}") failures += 1 error_log = self.add_error_log( - error_log, f"obsolescent intrinsic: {intrinsic}", - count + 1 + error_log, f"obsolescent intrinsic: {intrinsic}", count + 1 ) return TestResult( @@ -750,10 +735,8 @@ def intrinsic_modules(self, lines: List[str]) -> TestResult: for module in intrinsic_modules: if re.search( rf"\bUSE\s+(::)*\s*{module}\b", clean_line, re.IGNORECASE - ) and not re.search(r"\bINTRINSIC\b", clean_line, - re.IGNORECASE): - self.add_extra_error(f"intrinsic module {module} " - "without INTRINSIC") + ) and not re.search(r"\bINTRINSIC\b", clean_line, re.IGNORECASE): + self.add_extra_error(f"intrinsic module {module} without INTRINSIC") failures += 1 error_log = self.add_error_log( error_log, @@ -778,8 +761,7 @@ def read_unit_args(self, lines: List[str]) -> TestResult: clean_line = self.remove_quoted(line) clean_line = re.sub(r"!.*$", "", clean_line) - if match := re.search(r"\bREAD\s*\(\s*([^,)]+)", clean_line, - re.IGNORECASE): + if match := re.search(r"\bREAD\s*\(\s*([^,)]+)", clean_line, re.IGNORECASE): first_arg = match.group(1).strip() if not first_arg.upper().startswith("UNIT="): self.add_extra_error("READ without explicit UNIT=") @@ -821,8 +803,7 @@ def retire_if_def(self, lines: List[str]) -> TestResult: self.add_extra_error(f"retired if-def: {match.group(1)}") failures += 1 error_log = self.add_error_log( - error_log, f"retired if-def: {match.group(1)}", - count + 1 + error_log, f"retired if-def: {match.group(1)}", count + 1 ) return TestResult( checker_name="retired if-def", @@ -864,8 +845,7 @@ def forbidden_stop(self, lines: List[str]) -> TestResult: clean_line = self.remove_quoted(line) clean_line = re.sub(r"!.*$", "", clean_line) - if re.search(r"\b(STOP|CALL\s+abort)\b", clean_line, - re.IGNORECASE): + if re.search(r"\b(STOP|CALL\s+abort)\b", clean_line, re.IGNORECASE): self.add_extra_error("STOP or CALL abort used") failures += 1 error_log = self.add_error_log( @@ -930,8 +910,7 @@ def check_crown_copyright(self, lines: List[str]) -> TestResult: found_copyright = True if not found_copyright: - self.add_extra_error( - "missing copyright or crown copyright statement") + self.add_extra_error("missing copyright or crown copyright statement") error_log = self.add_error_log( error_log, "missing copyright or crown copyright statement", 0 ) @@ -956,16 +935,14 @@ def check_code_owner(self, lines: List[str]) -> TestResult: file_content = "\n".join(lines) found_code_owner = False error_log = {} - if ("Code Owner:" in file_content or - "code owner" in file_content.lower()): + if "Code Owner:" in file_content or "code owner" in file_content.lower(): # print(f"Debug: Found {file_content.lower()}") found_code_owner = True # This is often a warning rather than an error if not found_code_owner: self.add_extra_error("missing code owner comment") - error_log = self.add_error_log(error_log, - "missing code owner comment", 0) + error_log = self.add_error_log(error_log, "missing code owner comment", 0) return TestResult( checker_name="Code Owner Comment", failure_count=0 if found_code_owner else 1, @@ -1037,8 +1014,7 @@ def c_deprecated(self, lines: List[str]) -> int: for line in lines: for identifier in deprecated_c_identifiers: if re.search(rf"\b{identifier}\b", line): - self.add_extra_error( - f"deprecated C identifier: {identifier}") + self.add_extra_error(f"deprecated C identifier: {identifier}") failures += 1 return failures @@ -1065,8 +1041,7 @@ def c_openmp_define_no_combine(self, lines: List[str]) -> int: ) or re.search( r"&&.*_OPENMP.*&&.*SHUM_USE_C_OPENMP_VIA_THREAD_UTILS", line ): - self.add_extra_error( - "OpenMP defines combined with third macro") + self.add_extra_error("OpenMP defines combined with third macro") failures += 1 return failures diff --git a/script_umdp3_checker/umdp3_conformance.py b/script_umdp3_checker/umdp3_conformance.py index d8da66b5..b8f16576 100644 --- a/script_umdp3_checker/umdp3_conformance.py +++ b/script_umdp3_checker/umdp3_conformance.py @@ -11,6 +11,7 @@ # Add custom modules to Python path if needed # Add the repository root to access fcm_bdiff and git_bdiff packages import sys + sys.path.insert(0, str(Path(__file__).parent.parent)) """ @@ -102,11 +103,11 @@ def get_branch_name(self) -> str: return self.bdiff_obj.branch -class StyleChecker(): +class StyleChecker: """A Class intended to coordinate the running of a set of style checks on - a set of files. - The checks are a dictionary of named callable routines. - The files are a list of file paths to check.""" + a set of files. + The checks are a dictionary of named callable routines. + The files are a list of file paths to check.""" """ TODO: This is where it might be good to set up a threadsafe @@ -144,8 +145,7 @@ def check(self, file_path: Path) -> CheckResult: for check_name, check_function in self.check_functions.items(): file_results.append(check_function(lines)) - tests_failed = sum([0 if result.passed else 1 for - result in file_results]) + tests_failed = sum([0 if result.passed else 1 for result in file_results]) return CheckResult( file_path=str(file_path), tests_failed=tests_failed, @@ -163,9 +163,7 @@ def from_full_list( print_volume: int = 3, ): files_to_check = ( - cls.filter_files(changed_files, file_extensions) - if changed_files - else [] + cls.filter_files(changed_files, file_extensions) if changed_files else [] ) if print_volume >= 5: print( @@ -196,9 +194,11 @@ def create_external_runners( ) -> "StyleChecker": """Create a StyleChecker instance filtering files from a full list.""" filtered_files = cls.filter_files(all_files, file_extensions) - print(f"Creating external runners for {name} with {len(commands)} " - f"commands and {len(filtered_files)} files to check from a " - f"total of {len(all_files)} files.") + print( + f"Creating external runners for {name} with {len(commands)} " + f"commands and {len(filtered_files)} files to check from a " + f"total of {len(all_files)} files." + ) check_functions = {} for command in commands: external_opname = f"External_operation_{command[0]}" @@ -207,17 +207,17 @@ def create_external_runners( return cls(name, check_functions, filtered_files) @staticmethod - def create_free_runner(command: List[str], - external_opname: str) -> Callable[[Path], - TestResult]: + def create_free_runner( + command: List[str], external_opname: str + ) -> Callable[[Path], TestResult]: """Method to create a free runner function for a given external command with it's checker name for output.""" + def new_free_runner(file_name: Path) -> TestResult: cmd = command + [str(file_name)] tests_failed = 0 try: - result = subprocess.run(cmd, capture_output=True, - text=True, timeout=60) + result = subprocess.run(cmd, capture_output=True, text=True, timeout=60) except subprocess.TimeoutExpired: failure_count = 1 passed = False @@ -241,13 +241,14 @@ def new_free_runner(file_name: Path) -> TestResult: errors = {} if result.returncode != 0: tests_failed += 1 - return (TestResult( + return TestResult( checker_name=external_opname, failure_count=failure_count, passed=passed, output=output, errors=errors, - )) + ) + return new_free_runner @@ -261,8 +262,7 @@ def check(self, file_path: Path) -> CheckResult: file_results = [] # list of TestResult objects for check_name, check_function in self.check_functions.items(): file_results.append(check_function(file_path)) - tests_failed = sum([0 if result.passed else 1 for result in - file_results]) + tests_failed = sum([0 if result.passed else 1 for result in file_results]) return CheckResult( file_path=str(file_path), tests_failed=tests_failed, @@ -331,8 +331,7 @@ def check_files(self) -> None: self.results = results return - def print_results(self, print_volume: int = 3, - quiet_pass: bool = True) -> bool: + def print_results(self, print_volume: int = 3, quiet_pass: bool = True) -> bool: """Print results and return True if all checks passed. ========================================================""" """ @@ -366,8 +365,7 @@ def print_results(self, print_volume: int = 3, for count, (title, info) in enumerate( test_result.errors.items() ): - print(" " * 8 + - f"{count + 1:2} : {title} : {info}") + print(" " * 8 + f"{count + 1:2} : {title} : {info}") print(" " * 8 + line_2(82)) elif print_volume >= 3: print(f" {test_result.checker_name:60s} : ✓ PASS") @@ -400,8 +398,7 @@ def process_arguments(): "-p", "--path", type=str, default="./", help="path to repository" ) parser.add_argument( - "--max-workers", type=int, default=8, - help="Maximum number of parallel workers" + "--max-workers", type=int, default=8, help="Maximum number of parallel workers" ) parser.add_argument( "--fullcheck", @@ -417,12 +414,10 @@ def process_arguments(): ) group = parser.add_mutually_exclusive_group() group.add_argument( - "-v", "--verbose", action="count", default=0, - help="Increase output verbosity" + "-v", "--verbose", action="count", default=0, help="Increase output verbosity" ) group.add_argument( - "-q", "--quiet", action="count", default=0, - help="Decrease output verbosity" + "-q", "--quiet", action="count", default=0, help="Decrease output verbosity" ) # The following are not yet implemented, but may become useful # branch and base branch could be used to configure the CMS diff @@ -506,18 +501,15 @@ def create_style_checkers( dispatch_tables = CheckerDispatchTables() checkers = [] if "Fortran" in file_types: - file_extensions = {".f", ".for", ".f90", ".f95", - ".f03", ".f08", ".F90"} + file_extensions = {".f", ".for", ".f90", ".f95", ".f03", ".f08", ".F90"} fortran_diff_table = dispatch_tables.get_diff_dispatch_table_fortran() fortran_file_table = dispatch_tables.get_file_dispatch_table_fortran() generic_file_table = dispatch_tables.get_file_dispatch_table_all() if print_volume >= 3: print("Configuring Fortran checkers:") - combined_checkers = fortran_diff_table | fortran_file_table | \ - generic_file_table + combined_checkers = fortran_diff_table | fortran_file_table | generic_file_table fortran_file_checker = StyleChecker.from_full_list( - "Fortran Checker", file_extensions, - combined_checkers, changed_files + "Fortran Checker", file_extensions, combined_checkers, changed_files ) checkers.append(fortran_file_checker) if "Python" in file_types: @@ -526,14 +518,16 @@ def create_style_checkers( external_commands = [ ["ruff", "check"], """TODO : The following need 'tweaking' to replicate what's run as - part of the CI on GitHub.""" + part of the CI on GitHub.""", # ["flake8", "-q"], # ["black", "--check"], # ["pylint", "-E"], ] python_file_checker = Check_Runner.create_external_runners( - "Python External Checkers", external_commands, changed_files, - file_extensions + "Python External Checkers", + external_commands, + changed_files, + file_extensions, ) checkers.append(python_file_checker) if "Generic" in file_types or file_types == []: @@ -541,8 +535,7 @@ def create_style_checkers( print("Configuring Generic File Checkers:") all_file_dispatch_table = dispatch_tables.get_file_dispatch_table_all() generic_checker = StyleChecker( - "Generic File Checker", all_file_dispatch_table, - changed_files + "Generic File Checker", all_file_dispatch_table, changed_files ) checkers.append(generic_checker) @@ -622,14 +615,12 @@ def get_files_to_check( print(line_1(81) + "\n") else: print("Results :") - all_passed = checker.print_results(print_volume=log_volume, - quiet_pass=quiet_pass) + all_passed = checker.print_results(print_volume=log_volume, quiet_pass=quiet_pass) if log_volume >= 4: print("\n" + line_1(81)) print("## Summary :" + " " * 67 + "##") print(line_1(81)) print(f"Total files checked: {len(checker.results)}") - print(f"Total files failed: {sum(1 for r in checker.results if - not r.all_passed)}") + print(f"Total files failed: {sum(1 for r in checker.results if not r.all_passed)}") exit(0 if all_passed else 1)