diff --git a/utils.py b/utils.py index 9611de0..54cc7ff 100644 --- a/utils.py +++ b/utils.py @@ -5,6 +5,7 @@ from github.PaginatedList import PaginatedList from github.PullRequestComment import PullRequestComment from tqdm import tqdm + def move_github_logging_to_file(): github_logger = logging.getLogger("github") @@ -18,25 +19,36 @@ def move_github_logging_to_file(): github_logger.addHandler(file_handler) github_logger.propagate = False # Prevent logging to standard output + def parse_date(date: str) -> datetime: return datetime.strptime(date, "%Y-%m-%dT%H:%M:%SZ") -def has_only_1_round_of_comments(commits: PaginatedList[Commit], comments: PaginatedList[PullRequestComment]): - if ( - comments is None or commits is None - or comments.totalCount == 0 or commits.totalCount == 0 - ): + +def has_only_1_round_of_comments( + commits: PaginatedList[Commit], comments: PaginatedList[PullRequestComment] +): + if comments is None or commits is None or comments.totalCount == 0 or commits.totalCount == 0: return False - - commit_dates = [commit.commit.author.date for commit in tqdm(commits, total=commits.totalCount, desc="Extracting date from commits", leave=False)] - comment_dates = [comment.created_at for comment in tqdm(comments, total=comments.totalCount, desc="Extracting date from comments", leave=False)] - + + commit_dates = [ + commit.commit.author.date + for commit in tqdm( + commits, total=commits.totalCount, desc="Extracting date from commits", leave=False + ) + ] + comment_dates = [ + comment.created_at + for comment in tqdm( + comments, total=comments.totalCount, desc="Extracting date from comments", leave=False + ) + ] + commit_dates.sort() comment_dates.sort() - + first_comment_time = comment_dates[0] last_comment_time = comment_dates[-1] - + n_before = n_after = 0 for commit_time in tqdm(commit_dates, desc="Checking for 1 round of comments", leave=False): if commit_time < first_comment_time: @@ -48,19 +60,25 @@ def has_only_1_round_of_comments(commits: PaginatedList[Commit], comments: Pagin if first_comment_time < commit_time < last_comment_time: return False - + return n_before >= 1 and n_after >= 1 -def has_only_1_comment(commits: PaginatedList[Commit], comments: PaginatedList[PullRequestComment], verbose: bool = False): - if ( - comments is None or commits is None - or comments.totalCount == 0 or commits.totalCount == 0 - ): - if verbose: print(f"No comments or commits: {comments.totalCount} comments, {commits.totalCount} commits") + +def has_only_1_comment( + commits: PaginatedList[Commit], + comments: PaginatedList[PullRequestComment], + verbose: bool = False, +): + if comments is None or commits is None or comments.totalCount == 0 or commits.totalCount == 0: + if verbose: + print( + f"No comments or commits: {comments.totalCount} comments, {commits.totalCount} commits" + ) return False if comments.totalCount != 1: - if verbose: print(f"More than 1 comment: {comments.totalCount} comments") + if verbose: + print(f"More than 1 comment: {comments.totalCount} comments") return False commit_dates = [commit.commit.author.date for commit in commits] @@ -75,9 +93,11 @@ def has_only_1_comment(commits: PaginatedList[Commit], comments: PaginatedList[P if commit_date > comment_date: n_after += 1 continue - if verbose: print(f"n_before: {n_before}, n_after: {n_after}") + if verbose: + print(f"n_before: {n_before}, n_after: {n_after}") return n_before >= 1 and n_after >= 1 + def is_already_repo_cloned(repos_dir: str, repo_name: str) -> bool: """ Checks if the repository is cloned locally and if its remote URL matches the expected GitHub repository URL. @@ -96,10 +116,7 @@ def is_already_repo_cloned(repos_dir: str, repo_name: str) -> bool: try: result = subprocess.run( - ["git", "-C", path, "remote", "-v"], - capture_output=True, - text=True, - check=True + ["git", "-C", path, "remote", "-v"], capture_output=True, text=True, check=True ) remote_urls = result.stdout.splitlines() @@ -110,7 +127,10 @@ def is_already_repo_cloned(repos_dir: str, repo_name: str) -> bool: except subprocess.CalledProcessError: return False -def clone(repo: str, dest: str, updates: dict = {}, force: bool = False, verbose: bool = False) -> bool: + +def clone( + repo: str, dest: str, updates: dict = {}, force: bool = False, verbose: bool = False +) -> bool: """ Clones a GitHub repository into a local directory. @@ -130,11 +150,12 @@ def clone(repo: str, dest: str, updates: dict = {}, force: bool = False, verbose updates["cloned_successfully"] = "Already exists" return True - if verbose: print(f"Cloning {repo}") + if verbose: + print(f"Cloning {repo}") proc = subprocess.run( ["git", "clone", "--depth", "1", f"https://github.com/{repo}", local_repo_path], stdout=subprocess.PIPE, - stderr=subprocess.PIPE + stderr=subprocess.PIPE, ) if proc.returncode != 0: updates["cloned_successfully"] = False @@ -148,6 +169,7 @@ def clone(repo: str, dest: str, updates: dict = {}, force: bool = False, verbose updates["cloned_successfully"] = True return True + def run_git_cmd(cmd: list[str], repo_path: str) -> subprocess.CompletedProcess: return subprocess.run( ["git", "-C", repo_path] + cmd,