Files
crab/utils.py
2025-05-20 16:40:26 +02:00

203 lines
6.2 KiB
Python

import argparse
import os, enum, logging, subprocess
from datetime import datetime
from typing import Optional, Any, Sequence, Type, Union
from github.Commit import Commit
from github.PaginatedList import PaginatedList
from github.PullRequestComment import PullRequestComment
from tqdm import tqdm
from errors import CantCloneRepoError
def move_logger_to_file(logger_name, filename):
github_logger = logging.getLogger(logger_name)
# Remove existing handlers to prevent duplicate logging
for handler in github_logger.handlers[:]:
github_logger.removeHandler(handler)
file_handler = logging.FileHandler(filename) # Log to file
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
file_handler.setFormatter(formatter)
github_logger.addHandler(file_handler)
github_logger.propagate = False # Prevent logging to standard output
def parse_date(date: str) -> datetime:
return datetime.strptime(date, "%Y-%m-%dT%H:%M:%SZ")
def has_only_1_round_of_comments(
commits: PaginatedList[Commit], comments: PaginatedList[PullRequestComment]
):
if comments is None or commits is None or comments.totalCount == 0 or commits.totalCount == 0:
return False
commit_dates = [
commit.commit.author.date
for commit in tqdm(
commits, total=commits.totalCount, desc="Extracting date from commits", leave=False
)
]
comment_dates = [
comment.created_at
for comment in tqdm(
comments, total=comments.totalCount, desc="Extracting date from comments", leave=False
)
]
commit_dates.sort()
comment_dates.sort()
first_comment_time = comment_dates[0]
last_comment_time = comment_dates[-1]
n_before = n_after = 0
for commit_time in tqdm(commit_dates, desc="Checking for 1 round of comments", leave=False):
if commit_time < first_comment_time:
n_before += 1
continue
if commit_time > last_comment_time:
n_after += 1
continue
if first_comment_time < commit_time < last_comment_time:
return False
return n_before >= 1 and n_after >= 1
def has_only_1_comment(
commits: PaginatedList[Commit],
comments: PaginatedList[PullRequestComment],
verbose: bool = False,
):
if comments is None or commits is None or comments.totalCount == 0 or commits.totalCount == 0:
if verbose:
print(
f"No comments or commits: {comments.totalCount} comments, {commits.totalCount} commits"
)
return False
if comments.totalCount != 1:
if verbose:
print(f"More than 1 comment: {comments.totalCount} comments")
return False
commit_dates = [commit.commit.author.date for commit in commits]
comment_date = comments[0].created_at
n_before = n_after = 0
for commit_date in commit_dates:
if commit_date < comment_date:
n_before += 1
continue
if commit_date > comment_date:
n_after += 1
continue
if verbose:
print(f"n_before: {n_before}, n_after: {n_after}")
return n_before >= 1 and n_after >= 1
def is_already_repo_cloned(repos_dir: str, repo_name: str) -> bool:
"""
Checks if the repository is cloned locally and if its remote URL matches the expected GitHub repository URL.
Parameters:
repos_dir (str): The directory where repositories are stored.
repo_name (str): The name of the repository.
Returns:
bool: True if the repository is already cloned, False otherwise.
"""
path = os.path.join(repos_dir, repo_name)
if not os.path.exists(path) or not os.path.isdir(path):
return False
try:
result = subprocess.run(
["git", "-C", path, "remote", "-v"], capture_output=True, text=True, check=True
)
remote_urls = result.stdout.splitlines()
expected_url = f"https://github.com/{repo_name}"
return any(expected_url in url for url in remote_urls)
except subprocess.CalledProcessError:
return False
def clone(repo: str, dest: str, force: bool = False) -> None:
"""
Clone a GitHub repository to the specified destination directory.
Parameters:
repo (str): The name of the repository to clone (e.g., "user/repo").
dest (str): The destination directory where the repository will be cloned.
force (bool): If True, force clone even if the repository already exists.
Raises:
CantCloneRepoError: If the repository cannot be cloned.
"""
local_repo_path = os.path.join(dest, repo)
if not force and is_already_repo_cloned(dest, repo):
return
try:
subprocess.run(
["git", "clone", "--depth", "1", f"https://github.com/{repo}", local_repo_path],
check=True,
capture_output=True,
text=True,
)
except subprocess.CalledProcessError as e:
raise CantCloneRepoError(e.stderr)
def run_git_cmd(cmd: list[str], repo_path: str) -> subprocess.CompletedProcess:
return subprocess.run(
["git", "-C", repo_path] + cmd,
check=True,
capture_output=True,
text=True,
)
def prompt_yes_no(prompt: str, *, default: Optional[bool] = None) -> bool:
choices = "y/n"
if default is not None:
choices = "Y/n" if default else "y/N"
while True:
ans = input(f"{prompt} [{choices}]: ").strip().lower()
if ans in {"y", "yes"}:
return True
elif ans in {"n", "no"}:
return False
elif default is not None:
return default
else:
print("Please enter 'y' or 'n'.")
class EnumChoicesAction(argparse.Action):
def __init__(self, *args, type: Type[enum.Enum], **kwargs) -> None:
super().__init__(*args, **kwargs, choices=[e.value for e in type])
self.enum = type
def __call__(
self,
parser: argparse.ArgumentParser,
namespace: argparse.Namespace,
values: Union[str, Sequence[Any], None] = None,
option_string: Optional[str] = None,
) -> None:
if not isinstance(values, str):
raise TypeError
setattr(namespace, self.dest, self.enum(values))