Skip to content
6 changes: 3 additions & 3 deletions script_umdp3_checker/umdp3_checker_rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,11 +344,11 @@ def lowercase_variable_names(self, lines: List[str]) -> TestResult:
"",
clean_line,
)
if re.search(r"[A-Z]{2,}", clean_line):
self.add_extra_error("UPPERCASE variable name")
if match := re.search(r"([A-Z]{2,})", clean_line):
self.add_extra_error(f"UPPERCASE variable name : {match[1]}")
failures += 1
error_log = self.add_error_log(
error_log, "UPPERCASE variable name", count + 1
error_log, f"UPPERCASE variable name {match[1]}", count + 1
)

output = f"Checked {count + 1} lines, found {failures} failures."
Expand Down
279 changes: 124 additions & 155 deletions script_umdp3_checker/umdp3_conformance.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,11 @@ def get_branch_name(self) -> str:
return self.bdiff_obj.branch


class StyleChecker(ABC):
"""Abstract base class for style checkers."""
class StyleChecker:
"""A Class intended to coordinate the running of a set of style checks on
a set of files.
The checks are a dictionary of named callable routines.
The files are a list of file paths to check."""

"""
TODO: This is where it might be good to set up a threadsafe
Expand All @@ -127,87 +130,18 @@ class instance to hold the 'expanded' check outputs.
a TestResult object directly, which includes the extra error
info, so that each thread can work independently."""
name: str
file_extensions: Set[str]
check_functions: Dict[str, Callable]
files_to_check: List[Path]

def __init__(
self,
name: str,
file_extensions: Set[str],
check_functions: Dict[str, Callable],
changed_files: List[Path] = [],
):
self.name = name
self.file_extensions = file_extensions or set()
self.check_functions = check_functions or {}
self.files_to_check = (
self.filter_files(changed_files, self.file_extensions)
if changed_files
else []
)

@abstractmethod
def get_name(self) -> str:
"""Return the name of this checker."""
pass

@abstractmethod
def check(self, file_path: Path) -> CheckResult:
"""Run the style checker on a file."""
pass

@classmethod
def from_full_list(
cls,
name: str,
file_extensions: Set[str],
check_functions: Dict[str, Callable],
all_files: List[Path],
) -> "StyleChecker":
"""Create a StyleChecker instance filtering files from a full list."""
filtered_files = cls.filter_files(all_files, file_extensions)
return cls(name, file_extensions, check_functions, filtered_files)

@staticmethod
def filter_files(
files: List[Path], file_extensions: Set[str] = set()
) -> List[Path]:
"""Filter files based on the checker's file extensions."""
if not file_extensions:
return files
return [f for f in files if f.suffix in file_extensions]


class UMDP3_checker(StyleChecker):
"""UMDP3 built-in style checker."""

files_to_check: List[Path]

def __init__(
self,
name: str,
file_extensions: Set[str],
check_functions: Dict[str, Callable],
changed_files: List[Path] = [],
print_volume: int = 3,
changed_files: List[Path],
):
self.name = name
self.file_extensions = file_extensions or set()
self.check_functions = check_functions or {}
self.files_to_check = (
super().filter_files(changed_files, self.file_extensions)
if changed_files
else []
)
if print_volume >= 5:
print(
f"UMDP3_checker initialized :\n"
f" Name : {self.name}\n"
f" Has {len(self.check_functions)} check functions\n"
f" Using {len(self.file_extensions)} file extensions\n"
f" Gives {len(self.files_to_check)} files to check."
)
self.files_to_check = changed_files or []

def get_name(self) -> str:
return self.name
Expand All @@ -227,89 +161,116 @@ def check(self, file_path: Path) -> CheckResult:
test_results=file_results,
)


class ExternalChecker(StyleChecker):
"""Wrapper for external style checking tools."""

"""
TODO : This is overriding the 'syle type hint from the base class.
As we're currently passing in a list of strings to pass to 'subcommand'.
Ideally we should be making callable functions for each check, but that
would require more refactoring of the code.
Is that a 'factory' method?"""
check_commands: Dict[str, List[str]]

def __init__(
self,
@classmethod
def from_full_list(
cls,
name: str,
file_extensions: Set[str],
check_functions: Dict[str, List[str]],
changed_files: List[Path],
file_extensions: Set[str] = set(),
check_functions: Dict[str, Callable] = {},
changed_files: List[Path] = [],
print_volume: int = 3,
):
self.name = name
self.file_extensions = file_extensions or set()
self.check_commands = check_functions or {}
self.files_to_check = (
super().filter_files(changed_files, self.file_extensions)
if changed_files
else []
files_to_check = (
cls.filter_files(changed_files, file_extensions) if changed_files else []
)
if print_volume >= 5:
print(
f"ExternalChecker initialized :\n"
f" Name : {self.name}\n"
f" Has {len(self.check_commands)} check commands\n"
f" Using {len(self.file_extensions)} file extensions\n"
f" Gives {len(self.files_to_check)} files to check."
f" Name : {name}\n"
f" Has {len(check_functions)} check commands\n"
f" Using {len(file_extensions)} file extensions\n"
f" Gives {len(files_to_check)} files to check."
)
return cls(name, check_functions, files_to_check)

def get_name(self) -> str:
return self.name
@staticmethod
def filter_files(
files: List[Path], file_extensions: Set[str] = set()
) -> List[Path]:
"""Filter files based on the checker's file extensions."""
if not file_extensions:
return files
return [f for f in files if f.suffix in file_extensions]

def check(self, file_path: Path) -> CheckResult:
"""Run external checker commands on file."""
file_results = []
tests_failed = 0
for test_name, command in self.check_commands.items():
@classmethod
def create_external_runners(
cls,
name: str,
commands: List[List[str]],
all_files: List[Path],
file_extensions: Set[str] = set(),
) -> "StyleChecker":
"""Create a StyleChecker instance filtering files from a full list."""
filtered_files = cls.filter_files(all_files, file_extensions)
print(
f"Creating external runners for {name} with {len(commands)} "
f"commands and {len(filtered_files)} files to check from a "
f"total of {len(all_files)} files."
)
check_functions = {}
for command in commands:
external_opname = f"External_operation_{command[0]}"
free_runner = cls.create_free_runner(command, external_opname)
check_functions[external_opname] = free_runner
return cls(name, check_functions, filtered_files)

@staticmethod
def create_free_runner(
command: List[str], external_opname: str
) -> Callable[[Path], TestResult]:
"""Method to create a free runner function for a given external
command with it's checker name for output."""

def new_free_runner(file_name: Path) -> TestResult:
cmd = command + [str(file_name)]
tests_failed = 0
try:
cmd = command + [str(file_path)]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
except subprocess.TimeoutExpired:
file_results.append(
TestResult(
checker_name=test_name,
failure_count=1,
passed=False,
output=f"Checker {test_name} timed out",
errors={test_name: "TimeoutExpired"},
)
)
failure_count = 1
passed = False
output = f"Checker {external_opname} timed out"
errors = {external_opname: "TimeoutExpired"}
tests_failed += 1
except Exception as e:
file_results.append(
TestResult(
checker_name=test_name,
failure_count=1,
passed=False,
output=str(e),
errors={test_name: str(e)},
)
)
failure_count = 1
passed = False
output = str(e)
errors = {external_opname: str(e)}
tests_failed += 1
else:
error_text = result.stderr if result.stderr else ""
file_results.append(
TestResult(
checker_name=test_name,
failure_count=0 if result.returncode == 0 else 1,
passed=result.returncode == 0,
output=result.stdout,
errors={test_name: error_text} if error_text else {},
)
)
failure_count = 0 if result.returncode == 0 else 1
passed = result.returncode == 0
output = result.stdout
if error_text:
errors = {external_opname: error_text}
else:
errors = {}
if result.returncode != 0:
tests_failed += 1
return TestResult(
checker_name=external_opname,
failure_count=failure_count,
passed=passed,
output=output,
errors=errors,
)

return new_free_runner


class Check_Runner(StyleChecker):
"""A subclass of StyleChecker that is used to run external commands on a
file. As such the check method needs to be overridden to to run on the file
rather than load the file and pass the lines to the check function."""

def check(self, file_path: Path) -> CheckResult:
"""Run external command on file."""
file_results = [] # list of TestResult objects
for check_name, check_function in self.check_functions.items():
file_results.append(check_function(file_path))
tests_failed = sum([0 if result.passed else 1 for result in file_results])
return CheckResult(
file_path=str(file_path),
tests_failed=tests_failed,
Expand Down Expand Up @@ -347,17 +308,22 @@ def check_files(self) -> None:
"""
"""
TODO : Poor terminology makes discerning what is actually happening
here hard work. A 'checker' is an instance of a StyleChecker
(sub)class. Each of which has a list of checks to perform and a list of
files to perform them on. e.g. A UMDP3_checker for Fortran files. or
the ExternalChecker for Python files.
here hard work. A 'checker' is an instance of the StyleChecker
class. Each of which has a list of checks to perform and a list of
files to perform them on.
However, when the 'results' are collected, each result is for a single
file+check pair and holds no information about which 'checker' it was
part of. Thus some files can be checked by multiple checkers, and the
filename will appear multiple times in the output. Not Good!
"""
results = []
# print(f"About to use {len(self.checkers)} checkers")

"""TODO : The current implementation creates a thread for each
(checker, file), however a chat with Ollie suggests a better approach
would be to switch to multiprocessing and create a pool of workers, and
then have each worker run all the checks for a given file. This would
reduce the overhead of creating threads and allow for better use of
resources."""
with concurrent.futures.ThreadPoolExecutor(
max_workers=self.max_workers
) as executor:
Expand All @@ -369,7 +335,6 @@ def check_files(self) -> None:

for future in concurrent.futures.as_completed(future_to_task):
result = future.result()
# print(f"Completed check for file: {result}")
results.append(result)
self.results = results
return
Expand Down Expand Up @@ -571,30 +536,34 @@ def create_style_checkers(
if print_volume >= 3:
print("Configuring Fortran checkers:")
combined_checkers = fortran_diff_table | fortran_file_table | generic_file_table
fortran_file_checker = UMDP3_checker.from_full_list(
fortran_file_checker = StyleChecker.from_full_list(
"Fortran Checker", file_extensions, combined_checkers, changed_files
)
checkers.append(fortran_file_checker)
if "Python" in file_types:
if print_volume >= 3:
print("Configuring External Python checkers:")
print("Configuring External Linters for Python files.")
file_extensions = {".py"}
python_checkers = {
# "flake 8": ["flake8", "-q"],
# "black": ["black", "--check"],
# "pylint": ["pylint", "-E"],
"ruff": ["ruff", "check"],
}
python_file_checker = ExternalChecker(
"External Python Checkers", file_extensions, python_checkers, changed_files
external_commands = [
["ruff", "check"],
"""TODO : The following need 'tweaking' to replicate what's run as
part of the CI on GitHub.""",
# ["flake8", "-q"],
# ["black", "--check"],
# ["pylint", "-E"],
]
python_file_checker = Check_Runner.create_external_runners(
"Python External Checkers",
external_commands,
changed_files,
file_extensions,
)
checkers.append(python_file_checker)
if "Generic" in file_types or file_types == []:
if print_volume >= 3:
print("Configuring Generic File Checkers:")
all_file_dispatch_table = dispatch_tables.get_file_dispatch_table_all()
generic_checker = UMDP3_checker(
"Generic File Checker", set(), all_file_dispatch_table, changed_files
generic_checker = StyleChecker(
"Generic File Checker", all_file_dispatch_table, changed_files
)
checkers.append(generic_checker)

Expand Down