diff --git a/pull_requests.py b/pull_requests.py index 666b66a..3391e47 100644 --- a/pull_requests.py +++ b/pull_requests.py @@ -1,6 +1,6 @@ from collections import defaultdict import argparse, os, subprocess, docker, uuid -from concurrent.futures import wait, FIRST_COMPLETED, ProcessPoolExecutor +from concurrent.futures import wait, FIRST_COMPLETED, ProcessPoolExecutor, Future from github.Commit import Commit from github.ContentFile import ContentFile from github.PullRequest import PullRequest @@ -476,7 +476,7 @@ def process_repos_parallel( repo_names = [repo_name for repo_name in df["name"] if repo_name not in EXCLUSION_LIST] free_positions = list(range(1, n_workers + 1)) repo_names_iter = iter(repo_names) - future_to_repo = {} + future_to_repo: dict[Future, tuple[str, int]] = {} with tqdm( total=len(repo_names), desc="Processing repos", @@ -514,15 +514,25 @@ def process_repos_parallel( process_repo_worker, name, repos_dir, archive_destination, cache, pos ) future_to_repo[new_fut] = (name, pos) - except KeyboardInterrupt: - print("Saving all the entries up until now") - # any futures that happen to be done but not yet popped: + except BaseException as top_e: + print("\n" * n_workers) + print(f"[ERROR] {type(top_e)}: {top_e}") + print("Saving all the entries of repos that were still being processed") + dataset_ids = {entry.metadata.id for entry in dataset.entries} + # any futures that happen for fut in list(future_to_repo): - if fut.done(): - try: - dataset.entries.extend(fut.result()) - except Exception: - pass + try: + result = fut.result() + print(f"Saving {len(result)} for {future_to_repo[fut][0]}") + for entry in result: + if entry.metadata.id in dataset_ids: + print( + f"{entry.metadata.repo} PR #{entry.metadata.pr_number} already in dataset" + ) + dataset.entries.extend(result) + except Exception as bot_e: + print(f"[ERROR] {type(bot_e)}: {bot_e}") + pass # re-raise so the top‐level finally block still runs raise