From 98db478b7ba46f9e44446e893cd7acd14f74679f Mon Sep 17 00:00:00 2001 From: Karma Riuk Date: Sat, 17 May 2025 09:42:02 +0200 Subject: [PATCH] first draft of parallelization (NOT TESTED YET) --- pull_requests.py | 62 +++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 61 insertions(+), 1 deletion(-) diff --git a/pull_requests.py b/pull_requests.py index a7e3f36..05c406d 100644 --- a/pull_requests.py +++ b/pull_requests.py @@ -1,5 +1,5 @@ from collections import defaultdict -import argparse, os, subprocess, docker, uuid +import argparse, os, subprocess, docker, uuid, concurrent.futures from github.Commit import Commit from github.ContentFile import ContentFile from github.PullRequest import PullRequest @@ -386,6 +386,66 @@ def process_repo( pbar.update(1) +# Wrapper to run in each worker process +def process_repo_worker( + repo_name: str, repos_dir: str, archive_destination: str, cache: dict +) -> list: + # Initialize GitHub and Docker clients in each process + token = os.environ.get("GITHUB_AUTH_TOKEN_CRAB") + g_worker = Github(token, seconds_between_requests=0) + docker_client_worker = docker.from_env() + + # Local dataset to collect entries for this repo + local_dataset = Dataset() + + # Call the existing process_repo, but passing the local GitHub and Docker clients + # You may need to modify process_repo to accept g and docker_client as parameters + process_repo(repo_name, local_dataset, repos_dir, archive_destination, cache) + return local_dataset.entries + + +def process_repos_parallel( + df: pd.DataFrame, + dataset: Dataset, + repos_dir: str, + archive_destination: str, + cache: dict[str, dict[int, DatasetEntry]] = {}, +): + """ + Parallel processing of repos using ProcessPoolExecutor. + + Parameters: + df: DataFrame with a 'name' column of repos to process + dataset: Shared Dataset to collect all entries + repos_dir: Directory root for cloned repos + archive_destination: Directory for archives + cache: Optional cache of previously processed PR entries + """ + repo_names = df["name"] + + # Use all CPUs; adjust max_workers as needed + with concurrent.futures.ProcessPoolExecutor(max_workers=os.cpu_count()) as executor: + # Map each repo to a future + future_to_repo = { + executor.submit(process_repo_worker, name, repos_dir, archive_destination, cache): name + for name in repo_names + } + + # Iterate as repos complete + for future in tqdm( + concurrent.futures.as_completed(future_to_repo), + total=len(future_to_repo), + desc="Processing repos", + ): + repo_name = future_to_repo[future] + try: + entries = future.result() + dataset.entries.extend(entries) + dataset.to_json(args.output) + except Exception as e: + tqdm.write(f"[ERROR] Repo {repo_name}: {e}") + + def process_repos( df: pd.DataFrame, dataset: Dataset,