Formatted utils.py

This commit is contained in:
Karma Riuk
2025-03-31 11:49:36 +02:00
parent 308f58b587
commit 46d8d45d7c

View File

@ -5,6 +5,7 @@ from github.PaginatedList import PaginatedList
from github.PullRequestComment import PullRequestComment from github.PullRequestComment import PullRequestComment
from tqdm import tqdm from tqdm import tqdm
def move_github_logging_to_file(): def move_github_logging_to_file():
github_logger = logging.getLogger("github") github_logger = logging.getLogger("github")
@ -18,25 +19,36 @@ def move_github_logging_to_file():
github_logger.addHandler(file_handler) github_logger.addHandler(file_handler)
github_logger.propagate = False # Prevent logging to standard output github_logger.propagate = False # Prevent logging to standard output
def parse_date(date: str) -> datetime: def parse_date(date: str) -> datetime:
return datetime.strptime(date, "%Y-%m-%dT%H:%M:%SZ") return datetime.strptime(date, "%Y-%m-%dT%H:%M:%SZ")
def has_only_1_round_of_comments(commits: PaginatedList[Commit], comments: PaginatedList[PullRequestComment]):
if ( def has_only_1_round_of_comments(
comments is None or commits is None commits: PaginatedList[Commit], comments: PaginatedList[PullRequestComment]
or comments.totalCount == 0 or commits.totalCount == 0 ):
): if comments is None or commits is None or comments.totalCount == 0 or commits.totalCount == 0:
return False return False
commit_dates = [commit.commit.author.date for commit in tqdm(commits, total=commits.totalCount, desc="Extracting date from commits", leave=False)] commit_dates = [
comment_dates = [comment.created_at for comment in tqdm(comments, total=comments.totalCount, desc="Extracting date from comments", leave=False)] commit.commit.author.date
for commit in tqdm(
commits, total=commits.totalCount, desc="Extracting date from commits", leave=False
)
]
comment_dates = [
comment.created_at
for comment in tqdm(
comments, total=comments.totalCount, desc="Extracting date from comments", leave=False
)
]
commit_dates.sort() commit_dates.sort()
comment_dates.sort() comment_dates.sort()
first_comment_time = comment_dates[0] first_comment_time = comment_dates[0]
last_comment_time = comment_dates[-1] last_comment_time = comment_dates[-1]
n_before = n_after = 0 n_before = n_after = 0
for commit_time in tqdm(commit_dates, desc="Checking for 1 round of comments", leave=False): for commit_time in tqdm(commit_dates, desc="Checking for 1 round of comments", leave=False):
if commit_time < first_comment_time: if commit_time < first_comment_time:
@ -48,19 +60,25 @@ def has_only_1_round_of_comments(commits: PaginatedList[Commit], comments: Pagin
if first_comment_time < commit_time < last_comment_time: if first_comment_time < commit_time < last_comment_time:
return False return False
return n_before >= 1 and n_after >= 1 return n_before >= 1 and n_after >= 1
def has_only_1_comment(commits: PaginatedList[Commit], comments: PaginatedList[PullRequestComment], verbose: bool = False):
if ( def has_only_1_comment(
comments is None or commits is None commits: PaginatedList[Commit],
or comments.totalCount == 0 or commits.totalCount == 0 comments: PaginatedList[PullRequestComment],
): verbose: bool = False,
if verbose: print(f"No comments or commits: {comments.totalCount} comments, {commits.totalCount} commits") ):
if comments is None or commits is None or comments.totalCount == 0 or commits.totalCount == 0:
if verbose:
print(
f"No comments or commits: {comments.totalCount} comments, {commits.totalCount} commits"
)
return False return False
if comments.totalCount != 1: if comments.totalCount != 1:
if verbose: print(f"More than 1 comment: {comments.totalCount} comments") if verbose:
print(f"More than 1 comment: {comments.totalCount} comments")
return False return False
commit_dates = [commit.commit.author.date for commit in commits] commit_dates = [commit.commit.author.date for commit in commits]
@ -75,9 +93,11 @@ def has_only_1_comment(commits: PaginatedList[Commit], comments: PaginatedList[P
if commit_date > comment_date: if commit_date > comment_date:
n_after += 1 n_after += 1
continue continue
if verbose: print(f"n_before: {n_before}, n_after: {n_after}") if verbose:
print(f"n_before: {n_before}, n_after: {n_after}")
return n_before >= 1 and n_after >= 1 return n_before >= 1 and n_after >= 1
def is_already_repo_cloned(repos_dir: str, repo_name: str) -> bool: def is_already_repo_cloned(repos_dir: str, repo_name: str) -> bool:
""" """
Checks if the repository is cloned locally and if its remote URL matches the expected GitHub repository URL. Checks if the repository is cloned locally and if its remote URL matches the expected GitHub repository URL.
@ -96,10 +116,7 @@ def is_already_repo_cloned(repos_dir: str, repo_name: str) -> bool:
try: try:
result = subprocess.run( result = subprocess.run(
["git", "-C", path, "remote", "-v"], ["git", "-C", path, "remote", "-v"], capture_output=True, text=True, check=True
capture_output=True,
text=True,
check=True
) )
remote_urls = result.stdout.splitlines() remote_urls = result.stdout.splitlines()
@ -110,7 +127,10 @@ def is_already_repo_cloned(repos_dir: str, repo_name: str) -> bool:
except subprocess.CalledProcessError: except subprocess.CalledProcessError:
return False return False
def clone(repo: str, dest: str, updates: dict = {}, force: bool = False, verbose: bool = False) -> bool:
def clone(
repo: str, dest: str, updates: dict = {}, force: bool = False, verbose: bool = False
) -> bool:
""" """
Clones a GitHub repository into a local directory. Clones a GitHub repository into a local directory.
@ -130,11 +150,12 @@ def clone(repo: str, dest: str, updates: dict = {}, force: bool = False, verbose
updates["cloned_successfully"] = "Already exists" updates["cloned_successfully"] = "Already exists"
return True return True
if verbose: print(f"Cloning {repo}") if verbose:
print(f"Cloning {repo}")
proc = subprocess.run( proc = subprocess.run(
["git", "clone", "--depth", "1", f"https://github.com/{repo}", local_repo_path], ["git", "clone", "--depth", "1", f"https://github.com/{repo}", local_repo_path],
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE stderr=subprocess.PIPE,
) )
if proc.returncode != 0: if proc.returncode != 0:
updates["cloned_successfully"] = False updates["cloned_successfully"] = False
@ -148,6 +169,7 @@ def clone(repo: str, dest: str, updates: dict = {}, force: bool = False, verbose
updates["cloned_successfully"] = True updates["cloned_successfully"] = True
return True return True
def run_git_cmd(cmd: list[str], repo_path: str) -> subprocess.CompletedProcess: def run_git_cmd(cmd: list[str], repo_path: str) -> subprocess.CompletedProcess:
return subprocess.run( return subprocess.run(
["git", "-C", repo_path] + cmd, ["git", "-C", repo_path] + cmd,