first draft of parallelization (NOT TESTED YET)

This commit is contained in:
Karma Riuk
2025-05-17 09:42:02 +02:00
parent 25072ac8b3
commit 98db478b7b

View File

@ -1,5 +1,5 @@
from collections import defaultdict from collections import defaultdict
import argparse, os, subprocess, docker, uuid import argparse, os, subprocess, docker, uuid, concurrent.futures
from github.Commit import Commit from github.Commit import Commit
from github.ContentFile import ContentFile from github.ContentFile import ContentFile
from github.PullRequest import PullRequest from github.PullRequest import PullRequest
@ -386,6 +386,66 @@ def process_repo(
pbar.update(1) pbar.update(1)
# Wrapper to run in each worker process
def process_repo_worker(
repo_name: str, repos_dir: str, archive_destination: str, cache: dict
) -> list:
# Initialize GitHub and Docker clients in each process
token = os.environ.get("GITHUB_AUTH_TOKEN_CRAB")
g_worker = Github(token, seconds_between_requests=0)
docker_client_worker = docker.from_env()
# Local dataset to collect entries for this repo
local_dataset = Dataset()
# Call the existing process_repo, but passing the local GitHub and Docker clients
# You may need to modify process_repo to accept g and docker_client as parameters
process_repo(repo_name, local_dataset, repos_dir, archive_destination, cache)
return local_dataset.entries
def process_repos_parallel(
df: pd.DataFrame,
dataset: Dataset,
repos_dir: str,
archive_destination: str,
cache: dict[str, dict[int, DatasetEntry]] = {},
):
"""
Parallel processing of repos using ProcessPoolExecutor.
Parameters:
df: DataFrame with a 'name' column of repos to process
dataset: Shared Dataset to collect all entries
repos_dir: Directory root for cloned repos
archive_destination: Directory for archives
cache: Optional cache of previously processed PR entries
"""
repo_names = df["name"]
# Use all CPUs; adjust max_workers as needed
with concurrent.futures.ProcessPoolExecutor(max_workers=os.cpu_count()) as executor:
# Map each repo to a future
future_to_repo = {
executor.submit(process_repo_worker, name, repos_dir, archive_destination, cache): name
for name in repo_names
}
# Iterate as repos complete
for future in tqdm(
concurrent.futures.as_completed(future_to_repo),
total=len(future_to_repo),
desc="Processing repos",
):
repo_name = future_to_repo[future]
try:
entries = future.result()
dataset.entries.extend(entries)
dataset.to_json(args.output)
except Exception as e:
tqdm.write(f"[ERROR] Repo {repo_name}: {e}")
def process_repos( def process_repos(
df: pd.DataFrame, df: pd.DataFrame,
dataset: Dataset, dataset: Dataset,