now using the PyGithub library, so that it handles

for me the issue of too many requests in too little time
This commit is contained in:
Karma Riuk
2025-03-06 16:36:53 +01:00
parent 57b0f0c2cd
commit 9fa7dd53af
2 changed files with 38 additions and 54 deletions

View File

@ -3,3 +3,4 @@ tqdm
docker docker
beautifulsoup4 beautifulsoup4
unidiff unidiff
PyGithub

View File

@ -1,35 +1,22 @@
import os, requests import os
from datetime import datetime from datetime import datetime
import pandas as pd import pandas as pd
import tqdm import tqdm
from github import Github
COMMON_HEADERS = { # Initialize GitHub API client
'Accept': 'application/vnd.github+json', g = Github(os.environ["GITHUB_AUTH_TOKEN_CRAB"])
'Authorization': f'Bearer {os.environ["GITHUB_AUTH_TOKEN_CRAB"]}',
'X-Github-Api-Version': '2022-11-28',
}
def github_call(url, params = {}): def has_only_1_round_of_comments(commits, comments):
result = requests.get(url, headers=COMMON_HEADERS, params=params) if not comments or not commits:
if result.status_code != 200:
raise Exception(f"Failed to fetch {url}: {result.status_code}, {result = }")
return result
def get_pulls(repo_url: str) -> list[dict]:
response = github_call(f'{repo_url}/pulls', params={"state": "all"})
return response.json()
def has_only_1_round_of_comments(commits: list[dict], comments: list[dict]) -> bool:
if len(comments) == 0 or len(commits) == 0:
return False return False
# Convert timestamps to datetime objects for easy comparison commit_dates = [c.commit.author.date for c in commits]
commit_dates = [datetime.fromisoformat(c["commit"]["author"]["date"]) for c in commits] comment_dates = [c.created_at for c in comments]
comment_dates = [datetime.fromisoformat(c["created_at"]) for c in comments]
commit_dates.sort() commit_dates.sort()
comment_dates.sort() comment_dates.sort()
# Identify the first and last comment times
first_comment_time = comment_dates[0] first_comment_time = comment_dates[0]
last_comment_time = comment_dates[-1] last_comment_time = comment_dates[-1]
@ -39,47 +26,43 @@ def has_only_1_round_of_comments(commits: list[dict], comments: list[dict]) -> b
return True return True
def process_pull(repo, pull):
def process_pull(repo_name: str, pull_number: str) -> dict: commits = list(pull.get_commits())
pull = github_call(f"https://api.github.com/repos/{repo_name}/pulls/{pull_number}").json() comments = list(pull.get_review_comments())
commits = github_call(f"https://api.github.com/repos/{repo_name}/pulls/{pull_number}/commits").json()
comments = github_call(f"https://api.github.com/repos/{repo_name}/pulls/{pull_number}/comments").json()
return { return {
"repo": repo_name, "repo": repo.full_name,
"pr_number": pull["number"], "pr_number": pull.number,
"additions": pull["additions"], "additions": pull.additions,
"deletions": pull["deletions"], "deletions": pull.deletions,
"changed_files": pull["changed_files"], "changed_files": pull.changed_files,
"has_only_1_round_of_comments": has_only_1_round_of_comments(commits, comments), "has_only_1_round_of_comments": has_only_1_round_of_comments(commits, comments),
"has_only_1_comment": len(comments) == 1, "has_only_1_comment": len(comments) == 1,
} }
def process_repo(repo_name: str) -> list[dict]: def process_repo(repo_name):
repo = g.get_repo(repo_name)
stats = [] stats = []
pulls = get_pulls(f"https://api.github.com/repos/{repo_name}")
for pull in tqdm.tqdm(pulls, desc=repo_name, leave=False): for pull in tqdm.tqdm(list(repo.get_pulls(state="closed")), desc=repo_name, leave=False):
if "merged_at" not in pull or pull["merged_at"] is None: if not pull.merged_at:
continue continue
stats.append(process_pull(repo_name, pull["number"])) stats.append(process_pull(repo, pull))
return stats return stats
def main(): def main():
repos = pd.read_csv("results.csv") repos = pd.read_csv("results.csv")
repos = repos[repos["good_repo_for_crab"] == True] repos = repos[repos["good_repo_for_crab"] == True]
print(len(repos))
stats = [] stats = []
try:
for _, row in tqdm.tqdm(repos.iterrows(), total=len(repos)): for _, row in tqdm.tqdm(repos.iterrows(), total=len(repos)):
if "name" not in row or not isinstance(row["name"], str): if "name" not in row or not isinstance(row["name"], str):
continue continue
name = row["name"] stats.extend(process_repo(row["name"]))
stats.extend(process_repo(name)) finally:
pd.DataFrame(stats).to_csv("pr_stats.csv", index=False) pd.DataFrame(stats).to_csv("pr_stats.csv", index=False)
if __name__ == "__main__": if __name__ == "__main__":
main() main()