Files
crab/handlers.py
Karma Riuk 211f9dde99 running the aggregate version of jacoco with
maven, hoping to get all the subprojects in one
file instead of having to grab an xml for each
single subproject
2025-03-17 15:49:16 +01:00

401 lines
14 KiB
Python

from abc import ABC, abstractmethod
import os, re, docker, signal, sys
from bs4 import BeautifulSoup
from typing import Optional
USER_ID = os.getuid() # for container user
GROUP_ID = os.getgid()
class BuildHandler(ABC):
def __init__(self, repo_path: str, build_file: str, updates: dict) -> None:
super().__init__()
self.path: str = repo_path
# self.container: Optional[Container] = None
self.build_file: str = build_file
self.updates = updates
def set_client(self, client: docker.DockerClient):
self.client = client
def __enter__(self):
self.container = self.client.containers.run(
image=self.container_name(),
command="tail -f /dev/null", # to keep the container alive
volumes={os.path.abspath(self.path): {"bind": "/repo", "mode": "rw"}},
user=f"{USER_ID}:{GROUP_ID}",
detach=True,
tty=True
)
def __exit__(self, *args):
self.container.kill()
self.container.remove()
def check_for_tests(self) -> None:
with open(os.path.join(self.path, self.build_file), "r") as f:
content = f.read()
for library in ["junit", "testng", "mockito"]:
if library in content:
self.updates["detected_source_of_tests"] = library + " library in build file"
return
for keyword in ["testImplementation", "functionalTests", "bwc_tests_enabled"]:
if keyword in content:
self.updates["detected_source_of_tests"] = keyword + " keyword in build file"
return
test_dirs = [
"src/test/java",
"src/test/kotlin",
"src/test/groovy",
"test",
]
for td in test_dirs:
if os.path.exists(os.path.join(self.path, td)):
self.updates["detected_source_of_tests"] = td + " dir exists in repo"
raise NoTestsFoundError("No tests found")
def compile_repo(self) -> None:
def timeout_handler(signum, frame):
raise TimeoutError("Tests exceeded time limit")
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(3600) # Set timeout to 1 hour (3600 seconds)
try:
exec_result = self.container.exec_run(self.compile_cmd())
output = clean_output(exec_result.output)
if exec_result.exit_code != 0:
raise FailedToCompileError(output)
except TimeoutError:
self.updates["compiled_successfully"] = False
self.updates["error_msg"] = "Compile process killed due to exceeding the 1-hour time limit"
finally:
signal.alarm(0) # Cancel the alarm
def test_repo(self) -> None:
def timeout_handler(signum, frame):
raise TimeoutError("Tests exceeded time limit")
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(3600) # Set timeout to 1 hour (3600 seconds)
try:
exec_result = self.container.exec_run(self.test_cmd())
output = clean_output(exec_result.output)
if exec_result.exit_code != 0:
raise FailedToTestError(output)
self.extract_test_numbers(output)
except TimeoutError:
self.updates["tested_successfully"] = False
self.updates["error_msg"] = "Test process killed due to exceeding the 1-hour time limit"
return
finally:
signal.alarm(0) # Cancel the alarm
def generate_coverage_report(self):
result = self.container.exec_run(self.generate_coverage_report_cmd())
if result.exit_code != 0:
raise CantExecJacoco(clean_output(result.output))
def check_coverage(self, filename: str) -> None:
"""
Check if the given filename is covered by JaCoCo.
"""
coverage_report_path = self.get_jacoco_report_path()
if not os.path.exists(coverage_report_path):
raise NoCoverageReportFound(f"Coverage report file '{coverage_report_path}' does not exist")
with open(coverage_report_path, "r") as file:
soup = BeautifulSoup(file, "html.parser")
# Extract all files listed in the JaCoCo coverage report
covered_files = [a.text.strip() for a in soup.select("table tbody tr td a")]
if filename not in covered_files:
raise FileNotCovered(f"The file '{filename}' was not present in the report '{coverage_report_path}'")
def clean_repo(self) -> None:
self.container.exec_run(self.clean_cmd())
@abstractmethod
def get_type(self) -> str:
pass
@abstractmethod
def compile_cmd(self) -> str:
pass
@abstractmethod
def test_cmd(self) -> str:
pass
@abstractmethod
def extract_test_numbers(self, output: str) -> None:
pass
@abstractmethod
def clean_cmd(self) -> str:
pass
@abstractmethod
def generate_coverage_report_cmd(self) -> str:
pass
@abstractmethod
def get_jacoco_report_paths(self) -> Generator[str]:
pass
@abstractmethod
def container_name(self) -> str:
pass
class MavenHandler(BuildHandler):
def __init__(self, repo_path: str, build_file: str, updates: dict) -> None:
super().__init__(repo_path, build_file, updates)
self.base_cmd = "mvn -B -Dstyle.color=never -Dartifact.download.skip=true"
# -B (Batch Mode): Runs Maven in non-interactive mode, reducing output and removing download progress bars.
# -Dstyle.color=never: Disables ANSI colors.
# -Dartifact.download.skip=true: Prevents Maven from printing download logs (but still downloads dependencies when needed).
def get_type(self) -> str:
return "maven"
def compile_cmd(self) -> str:
return f"{self.base_cmd} clean compile"
def test_cmd(self) -> str:
return f"{self.base_cmd} test"
def clean_cmd(self) -> str:
return f"{self.base_cmd} clean"
def generate_coverage_report_cmd(self):
return f"{self.base_cmd} jacoco:report-aggregate"
def container_name(self) -> str:
return "crab-maven"
def extract_test_numbers(self, output: str) -> None:
pattern = r"\[INFO\] Results:\n\[INFO\]\s*\n\[INFO\] Tests run: (\d+), Failures: (\d+), Errors: (\d+), Skipped: (\d+)"
matches = re.findall(pattern, output)
self.updates["n_tests"] = 0
self.updates["n_tests_passed"] = 0 # Passed tests = Tests run - (Failures + Errors)
self.updates["n_tests_failed"] = 0
self.updates["n_tests_errors"] = 0
self.updates["n_tests_skipped"] = 0
if len(matches) == 0:
raise NoTestResultsToExtractError("No test results found in Maven output:\n" + output)
for match in matches:
tests_run, failures, errors, skipped = map(int, match)
self.updates["n_tests"] += tests_run
self.updates["n_tests_failed"] += failures
self.updates["n_tests_errors"] += errors
self.updates["n_tests_skipped"] += skipped
self.updates["n_tests_passed"] += (tests_run - (failures + errors)) # Calculate passed tests
def get_jacoco_report_paths(self) -> Generator[str]:
yield os.path.join(self.path, "target/site/jacoco-aggregate/jacoco.xml")
class GradleHandler(BuildHandler):
def __init__(self, repo_path: str, build_file: str, updates: dict) -> None:
super().__init__(repo_path, build_file, updates)
self.base_cmd = "gradle --no-daemon --console=plain"
def get_type(self) -> str:
return "gradle"
def compile_cmd(self) -> str:
return f"{self.base_cmd} compileJava"
def test_cmd(self) -> str:
return f"{self.base_cmd} test"
def clean_cmd(self) -> str:
return f"{self.base_cmd} clean"
def generate_coverage_report_cmd(self) -> str:
return f"{self.base_cmd} jacocoTestReport"
def container_name(self) -> str:
return "crab-gradle"
def extract_test_numbers(self, output: str) -> None:
self.updates["n_tests"] = -1
self.updates["n_tests_passed"] = -1
self.updates["n_tests_failed"] = -1
self.updates["n_tests_errors"] = -1
self.updates["n_tests_skipped"] = -1
test_results_path = os.path.join(self.path, "build/reports/tests/test/index.html")
if not os.path.exists(test_results_path):
raise NoTestResultsToExtractError("No test results found (prolly a repo with sub-projects)")
# Load the HTML file
with open(test_results_path, "r") as file:
soup = BeautifulSoup(file, "html.parser")
# test_div = soup.select_one("div", class_="infoBox", id="tests")
test_div = soup.select_one("div.infoBox#tests")
if test_div is None:
raise NoTestResultsToExtractError("No test results found (no div.infoBox#tests)")
# counter_div = test_div.find("div", class_="counter")
counter_div = test_div.select_one("div.counter")
if counter_div is None:
raise NoTestResultsToExtractError("No test results found (not div.counter for tests)")
self.updates["n_tests"] = int(counter_div.text.strip())
# failures_div = soup.find("div", class_="infoBox", id="failures")
failures_div = soup.select_one("div.infoBox#failures")
if failures_div is None:
raise NoTestResultsToExtractError("No test results found (no div.infoBox#failures)")
# counter_div = failures_div.find("div", class_="counter")
counter_div = failures_div.select_one("div.counter")
if counter_div is None:
raise NoTestResultsToExtractError("No test results found (not div.counter for failures)")
self.updates["n_tests_failed"] = int(counter_div.text.strip())
# Calculate passed tests
self.updates["n_tests_passed"] = self.updates["n_tests"] - self.updates["n_tests_failed"]
def get_jacoco_report_paths(self) -> Generator[str]:
raise GradleAggregateReportNotFound("Gradle does not generate a single coverage report file")
class NoTestsFoundError(Exception):
pass
class FailedToCompileError(Exception):
pass
class FailedToTestError(Exception):
pass
class NoTestResultsToExtractError(Exception):
pass
class CantExecJacoco(Exception):
pass
class NoCoverageReportFound(Exception):
pass
class FileNotCovered(Exception):
pass
def merge_download_lines(lines: list) -> list:
"""
Merges lines that are part of the same download block in Maven output.
Args:
lines (list): The lines to merge.
Returns:
list: The merged lines.
"""
downloading_block = False
cleaned_lines = []
for line in lines:
if re.match(r"\[INFO\] Download(ing|ed) from", line):
if not downloading_block:
cleaned_lines.append("[CRAB] Downloading stuff")
downloading_block = True
else:
cleaned_lines.append(line)
downloading_block = False
return cleaned_lines
def merge_unapproved_licences(lines: list) -> list:
"""
Merges lines that are part of the same unapproved licences block in Maven output.
Args:
lines (list): The lines to merge.
Returns:
list: The merged lines.
"""
licenses_block = False
cleaned_lines = []
for line in lines:
if re.match(r"\[WARNING\] Files with unapproved licenses:", line):
cleaned_lines.append(line)
cleaned_lines.append("[CRAB] List of all the unapproved licenses...")
licenses_block = True
elif licenses_block and not re.match(r"\s+\?\/\.m2\/repository", line):
licenses_block = False
if not licenses_block:
cleaned_lines.append(line)
return cleaned_lines
def clean_output(output: bytes) -> str:
output_lines = output.decode().split("\n")
cleaned_lines = merge_download_lines(output_lines)
cleaned_lines = merge_unapproved_licences(cleaned_lines)
return "\n".join(cleaned_lines)
def get_build_handler(root: str, repo: str, updates: dict, verbose: bool = False) -> Optional[BuildHandler]:
"""
Get the path to the build file of a repository. The build file is either a
`pom.xml`, `build.gradle`, or `build.xml` file.
Args:
root (str): The root directory in which the repository is located.
repo (str): The name of the repository.
Returns:
str | None: The path to the repository if it is valid, `None` otherwise
"""
path = os.path.join(root, repo)
# Check if the given path is a directory
if not os.path.isdir(path):
error_msg = f"The path {path} is not a valid directory."
print(error_msg, file=sys.stderr)
updates["error_msg"] = error_msg
return None
to_keep = ["pom.xml", "build.gradle"]
for entry in os.scandir(path):
if entry.is_file() and entry.name in to_keep:
if verbose: print(f"Found {entry.name} in {repo} root, so keeping it and returning")
updates["depth_of_build_file"] = 0
if entry.name == "build.gradle":
updates["build_system"] = "gradle"
return GradleHandler(path, entry.name, updates)
else:
updates["build_system"] = "maven"
return MavenHandler(path, entry.name, updates)
# List files in the immediate subdirectories
for entry in os.scandir(path):
if entry.is_dir():
for sub_entry in os.scandir(entry.path):
if sub_entry.is_file() and sub_entry.name in to_keep:
if verbose: print(f"Found {sub_entry.name} in {repo} first level, so keeping it and returning")
updates["depth_of_build_file"] = 1
if entry.name == "build.gradle":
updates["build_system"] = "gradle"
return GradleHandler(path, os.path.join(entry.name, sub_entry.name), updates)
else:
updates["build_system"] = "maven"
return MavenHandler(path, os.path.join(entry.name, sub_entry.name), updates)
updates["error_msg"] = "No build file found"
return None