mirror of
https://github.com/karma-riuk/crab.git
synced 2025-07-04 21:28:12 +02:00
401 lines
14 KiB
Python
401 lines
14 KiB
Python
from abc import ABC, abstractmethod
|
|
import os, re, docker, signal, sys
|
|
from bs4 import BeautifulSoup
|
|
from typing import Optional
|
|
|
|
USER_ID = os.getuid() # for container user
|
|
GROUP_ID = os.getgid()
|
|
|
|
class BuildHandler(ABC):
|
|
def __init__(self, repo_path: str, build_file: str, updates: dict) -> None:
|
|
super().__init__()
|
|
self.path: str = repo_path
|
|
# self.container: Optional[Container] = None
|
|
self.build_file: str = build_file
|
|
self.updates = updates
|
|
|
|
def set_client(self, client: docker.DockerClient):
|
|
self.client = client
|
|
|
|
def __enter__(self):
|
|
self.container = self.client.containers.run(
|
|
image=self.container_name(),
|
|
command="tail -f /dev/null", # to keep the container alive
|
|
volumes={os.path.abspath(self.path): {"bind": "/repo", "mode": "rw"}},
|
|
user=f"{USER_ID}:{GROUP_ID}",
|
|
detach=True,
|
|
tty=True
|
|
)
|
|
|
|
def __exit__(self, *args):
|
|
self.container.kill()
|
|
self.container.remove()
|
|
|
|
|
|
def check_for_tests(self) -> None:
|
|
with open(os.path.join(self.path, self.build_file), "r") as f:
|
|
content = f.read()
|
|
|
|
for library in ["junit", "testng", "mockito"]:
|
|
if library in content:
|
|
self.updates["detected_source_of_tests"] = library + " library in build file"
|
|
return
|
|
|
|
for keyword in ["testImplementation", "functionalTests", "bwc_tests_enabled"]:
|
|
if keyword in content:
|
|
self.updates["detected_source_of_tests"] = keyword + " keyword in build file"
|
|
return
|
|
|
|
test_dirs = [
|
|
"src/test/java",
|
|
"src/test/kotlin",
|
|
"src/test/groovy",
|
|
"test",
|
|
]
|
|
for td in test_dirs:
|
|
if os.path.exists(os.path.join(self.path, td)):
|
|
self.updates["detected_source_of_tests"] = td + " dir exists in repo"
|
|
|
|
raise NoTestsFoundError("No tests found")
|
|
|
|
def compile_repo(self) -> None:
|
|
def timeout_handler(signum, frame):
|
|
raise TimeoutError("Tests exceeded time limit")
|
|
|
|
signal.signal(signal.SIGALRM, timeout_handler)
|
|
signal.alarm(3600) # Set timeout to 1 hour (3600 seconds)
|
|
|
|
try:
|
|
exec_result = self.container.exec_run(self.compile_cmd())
|
|
output = clean_output(exec_result.output)
|
|
if exec_result.exit_code != 0:
|
|
raise FailedToCompileError(output)
|
|
except TimeoutError:
|
|
self.updates["compiled_successfully"] = False
|
|
self.updates["error_msg"] = "Compile process killed due to exceeding the 1-hour time limit"
|
|
finally:
|
|
signal.alarm(0) # Cancel the alarm
|
|
|
|
def test_repo(self) -> None:
|
|
def timeout_handler(signum, frame):
|
|
raise TimeoutError("Tests exceeded time limit")
|
|
|
|
signal.signal(signal.SIGALRM, timeout_handler)
|
|
signal.alarm(3600) # Set timeout to 1 hour (3600 seconds)
|
|
|
|
try:
|
|
exec_result = self.container.exec_run(self.test_cmd())
|
|
output = clean_output(exec_result.output)
|
|
if exec_result.exit_code != 0:
|
|
raise FailedToTestError(output)
|
|
|
|
self.extract_test_numbers(output)
|
|
|
|
except TimeoutError:
|
|
self.updates["tested_successfully"] = False
|
|
self.updates["error_msg"] = "Test process killed due to exceeding the 1-hour time limit"
|
|
return
|
|
|
|
finally:
|
|
signal.alarm(0) # Cancel the alarm
|
|
|
|
def generate_coverage_report(self):
|
|
result = self.container.exec_run(self.generate_coverage_report_cmd())
|
|
if result.exit_code != 0:
|
|
raise CantExecJacoco(clean_output(result.output))
|
|
|
|
def check_coverage(self, filename: str) -> None:
|
|
"""
|
|
Check if the given filename is covered by JaCoCo.
|
|
"""
|
|
coverage_report_path = self.get_jacoco_report_path()
|
|
if not os.path.exists(coverage_report_path):
|
|
raise NoCoverageReportFound(f"Coverage report file '{coverage_report_path}' does not exist")
|
|
|
|
with open(coverage_report_path, "r") as file:
|
|
soup = BeautifulSoup(file, "html.parser")
|
|
|
|
# Extract all files listed in the JaCoCo coverage report
|
|
covered_files = [a.text.strip() for a in soup.select("table tbody tr td a")]
|
|
|
|
if filename not in covered_files:
|
|
raise FileNotCovered(f"The file '{filename}' was not present in the report '{coverage_report_path}'")
|
|
|
|
def clean_repo(self) -> None:
|
|
self.container.exec_run(self.clean_cmd())
|
|
|
|
@abstractmethod
|
|
def get_type(self) -> str:
|
|
pass
|
|
|
|
@abstractmethod
|
|
def compile_cmd(self) -> str:
|
|
pass
|
|
|
|
@abstractmethod
|
|
def test_cmd(self) -> str:
|
|
pass
|
|
|
|
@abstractmethod
|
|
def extract_test_numbers(self, output: str) -> None:
|
|
pass
|
|
|
|
@abstractmethod
|
|
def clean_cmd(self) -> str:
|
|
pass
|
|
|
|
@abstractmethod
|
|
def generate_coverage_report_cmd(self) -> str:
|
|
pass
|
|
|
|
@abstractmethod
|
|
def get_jacoco_report_paths(self) -> Generator[str]:
|
|
pass
|
|
|
|
@abstractmethod
|
|
def container_name(self) -> str:
|
|
pass
|
|
|
|
class MavenHandler(BuildHandler):
|
|
def __init__(self, repo_path: str, build_file: str, updates: dict) -> None:
|
|
super().__init__(repo_path, build_file, updates)
|
|
self.base_cmd = "mvn -B -Dstyle.color=never -Dartifact.download.skip=true"
|
|
# -B (Batch Mode): Runs Maven in non-interactive mode, reducing output and removing download progress bars.
|
|
# -Dstyle.color=never: Disables ANSI colors.
|
|
# -Dartifact.download.skip=true: Prevents Maven from printing download logs (but still downloads dependencies when needed).
|
|
|
|
def get_type(self) -> str:
|
|
return "maven"
|
|
|
|
def compile_cmd(self) -> str:
|
|
return f"{self.base_cmd} clean compile"
|
|
|
|
def test_cmd(self) -> str:
|
|
return f"{self.base_cmd} test"
|
|
|
|
def clean_cmd(self) -> str:
|
|
return f"{self.base_cmd} clean"
|
|
|
|
def generate_coverage_report_cmd(self):
|
|
return f"{self.base_cmd} jacoco:report"
|
|
|
|
def container_name(self) -> str:
|
|
return "crab-maven"
|
|
|
|
def extract_test_numbers(self, output: str) -> None:
|
|
pattern = r"\[INFO\] Results:\n\[INFO\]\s*\n\[INFO\] Tests run: (\d+), Failures: (\d+), Errors: (\d+), Skipped: (\d+)"
|
|
|
|
matches = re.findall(pattern, output)
|
|
|
|
self.updates["n_tests"] = 0
|
|
self.updates["n_tests_passed"] = 0 # Passed tests = Tests run - (Failures + Errors)
|
|
self.updates["n_tests_failed"] = 0
|
|
self.updates["n_tests_errors"] = 0
|
|
self.updates["n_tests_skipped"] = 0
|
|
|
|
if len(matches) == 0:
|
|
raise NoTestResultsToExtractError("No test results found in Maven output:\n" + output)
|
|
|
|
for match in matches:
|
|
tests_run, failures, errors, skipped = map(int, match)
|
|
self.updates["n_tests"] += tests_run
|
|
self.updates["n_tests_failed"] += failures
|
|
self.updates["n_tests_errors"] += errors
|
|
self.updates["n_tests_skipped"] += skipped
|
|
self.updates["n_tests_passed"] += (tests_run - (failures + errors)) # Calculate passed tests
|
|
|
|
def get_jacoco_report_paths(self) -> Generator[str]:
|
|
yield os.path.join(self.path, "target/site/jacoco-aggregate/jacoco.xml")
|
|
|
|
class GradleHandler(BuildHandler):
|
|
def __init__(self, repo_path: str, build_file: str, updates: dict) -> None:
|
|
super().__init__(repo_path, build_file, updates)
|
|
self.base_cmd = "gradle --no-daemon --console=plain"
|
|
|
|
def get_type(self) -> str:
|
|
return "gradle"
|
|
|
|
def compile_cmd(self) -> str:
|
|
return f"{self.base_cmd} compileJava"
|
|
|
|
def test_cmd(self) -> str:
|
|
return f"{self.base_cmd} test"
|
|
|
|
def clean_cmd(self) -> str:
|
|
return f"{self.base_cmd} clean"
|
|
|
|
def generate_coverage_report_cmd(self) -> str:
|
|
return f"{self.base_cmd} jacocoTestReport"
|
|
|
|
def container_name(self) -> str:
|
|
return "crab-gradle"
|
|
|
|
def extract_test_numbers(self, output: str) -> None:
|
|
self.updates["n_tests"] = -1
|
|
self.updates["n_tests_passed"] = -1
|
|
self.updates["n_tests_failed"] = -1
|
|
self.updates["n_tests_errors"] = -1
|
|
self.updates["n_tests_skipped"] = -1
|
|
|
|
test_results_path = os.path.join(self.path, "build/reports/tests/test/index.html")
|
|
if not os.path.exists(test_results_path):
|
|
raise NoTestResultsToExtractError("No test results found (prolly a repo with sub-projects)")
|
|
|
|
# Load the HTML file
|
|
with open(test_results_path, "r") as file:
|
|
soup = BeautifulSoup(file, "html.parser")
|
|
|
|
# test_div = soup.select_one("div", class_="infoBox", id="tests")
|
|
test_div = soup.select_one("div.infoBox#tests")
|
|
if test_div is None:
|
|
raise NoTestResultsToExtractError("No test results found (no div.infoBox#tests)")
|
|
|
|
# counter_div = test_div.find("div", class_="counter")
|
|
counter_div = test_div.select_one("div.counter")
|
|
if counter_div is None:
|
|
raise NoTestResultsToExtractError("No test results found (not div.counter for tests)")
|
|
|
|
self.updates["n_tests"] = int(counter_div.text.strip())
|
|
|
|
# failures_div = soup.find("div", class_="infoBox", id="failures")
|
|
failures_div = soup.select_one("div.infoBox#failures")
|
|
if failures_div is None:
|
|
raise NoTestResultsToExtractError("No test results found (no div.infoBox#failures)")
|
|
|
|
# counter_div = failures_div.find("div", class_="counter")
|
|
counter_div = failures_div.select_one("div.counter")
|
|
if counter_div is None:
|
|
raise NoTestResultsToExtractError("No test results found (not div.counter for failures)")
|
|
|
|
self.updates["n_tests_failed"] = int(counter_div.text.strip())
|
|
|
|
# Calculate passed tests
|
|
self.updates["n_tests_passed"] = self.updates["n_tests"] - self.updates["n_tests_failed"]
|
|
|
|
def get_jacoco_report_paths(self) -> Generator[str]:
|
|
raise GradleAggregateReportNotFound("Gradle does not generate a single coverage report file")
|
|
|
|
class NoTestsFoundError(Exception):
|
|
pass
|
|
|
|
class FailedToCompileError(Exception):
|
|
pass
|
|
|
|
class FailedToTestError(Exception):
|
|
pass
|
|
|
|
class NoTestResultsToExtractError(Exception):
|
|
pass
|
|
|
|
class CantExecJacoco(Exception):
|
|
pass
|
|
|
|
class NoCoverageReportFound(Exception):
|
|
pass
|
|
|
|
class FileNotCovered(Exception):
|
|
pass
|
|
|
|
def merge_download_lines(lines: list) -> list:
|
|
"""
|
|
Merges lines that are part of the same download block in Maven output.
|
|
|
|
Args:
|
|
lines (list): The lines to merge.
|
|
|
|
Returns:
|
|
list: The merged lines.
|
|
"""
|
|
downloading_block = False
|
|
cleaned_lines = []
|
|
for line in lines:
|
|
if re.match(r"\[INFO\] Download(ing|ed) from", line):
|
|
if not downloading_block:
|
|
cleaned_lines.append("[CRAB] Downloading stuff")
|
|
downloading_block = True
|
|
else:
|
|
cleaned_lines.append(line)
|
|
downloading_block = False
|
|
return cleaned_lines
|
|
|
|
def merge_unapproved_licences(lines: list) -> list:
|
|
"""
|
|
Merges lines that are part of the same unapproved licences block in Maven output.
|
|
|
|
Args:
|
|
lines (list): The lines to merge.
|
|
|
|
Returns:
|
|
list: The merged lines.
|
|
"""
|
|
licenses_block = False
|
|
cleaned_lines = []
|
|
for line in lines:
|
|
if re.match(r"\[WARNING\] Files with unapproved licenses:", line):
|
|
cleaned_lines.append(line)
|
|
cleaned_lines.append("[CRAB] List of all the unapproved licenses...")
|
|
licenses_block = True
|
|
elif licenses_block and not re.match(r"\s+\?\/\.m2\/repository", line):
|
|
licenses_block = False
|
|
|
|
if not licenses_block:
|
|
cleaned_lines.append(line)
|
|
return cleaned_lines
|
|
|
|
def clean_output(output: bytes) -> str:
|
|
output_lines = output.decode().split("\n")
|
|
|
|
cleaned_lines = merge_download_lines(output_lines)
|
|
cleaned_lines = merge_unapproved_licences(cleaned_lines)
|
|
|
|
return "\n".join(cleaned_lines)
|
|
|
|
def get_build_handler(root: str, repo: str, updates: dict, verbose: bool = False) -> Optional[BuildHandler]:
|
|
"""
|
|
Get the path to the build file of a repository. The build file is either a
|
|
`pom.xml`, `build.gradle`, or `build.xml` file.
|
|
|
|
Args:
|
|
root (str): The root directory in which the repository is located.
|
|
repo (str): The name of the repository.
|
|
|
|
Returns:
|
|
str | None: The path to the repository if it is valid, `None` otherwise
|
|
"""
|
|
path = os.path.join(root, repo)
|
|
# Check if the given path is a directory
|
|
if not os.path.isdir(path):
|
|
error_msg = f"The path {path} is not a valid directory."
|
|
print(error_msg, file=sys.stderr)
|
|
updates["error_msg"] = error_msg
|
|
return None
|
|
|
|
to_keep = ["pom.xml", "build.gradle"]
|
|
for entry in os.scandir(path):
|
|
if entry.is_file() and entry.name in to_keep:
|
|
if verbose: print(f"Found {entry.name} in {repo} root, so keeping it and returning")
|
|
updates["depth_of_build_file"] = 0
|
|
if entry.name == "build.gradle":
|
|
updates["build_system"] = "gradle"
|
|
return GradleHandler(path, entry.name, updates)
|
|
else:
|
|
updates["build_system"] = "maven"
|
|
return MavenHandler(path, entry.name, updates)
|
|
|
|
# List files in the immediate subdirectories
|
|
for entry in os.scandir(path):
|
|
if entry.is_dir():
|
|
for sub_entry in os.scandir(entry.path):
|
|
if sub_entry.is_file() and sub_entry.name in to_keep:
|
|
if verbose: print(f"Found {sub_entry.name} in {repo} first level, so keeping it and returning")
|
|
updates["depth_of_build_file"] = 1
|
|
if entry.name == "build.gradle":
|
|
updates["build_system"] = "gradle"
|
|
return GradleHandler(path, os.path.join(entry.name, sub_entry.name), updates)
|
|
else:
|
|
updates["build_system"] = "maven"
|
|
return MavenHandler(path, os.path.join(entry.name, sub_entry.name), updates)
|
|
|
|
updates["error_msg"] = "No build file found"
|
|
return None
|