mirror of
https://github.com/karma-riuk/crab.git
synced 2025-07-05 21:38:13 +02:00
saving all the results after any execption
This commit is contained in:
@ -1,6 +1,6 @@
|
|||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
import argparse, os, subprocess, docker, uuid
|
import argparse, os, subprocess, docker, uuid
|
||||||
from concurrent.futures import wait, FIRST_COMPLETED, ProcessPoolExecutor
|
from concurrent.futures import wait, FIRST_COMPLETED, ProcessPoolExecutor, Future
|
||||||
from github.Commit import Commit
|
from github.Commit import Commit
|
||||||
from github.ContentFile import ContentFile
|
from github.ContentFile import ContentFile
|
||||||
from github.PullRequest import PullRequest
|
from github.PullRequest import PullRequest
|
||||||
@ -476,7 +476,7 @@ def process_repos_parallel(
|
|||||||
repo_names = [repo_name for repo_name in df["name"] if repo_name not in EXCLUSION_LIST]
|
repo_names = [repo_name for repo_name in df["name"] if repo_name not in EXCLUSION_LIST]
|
||||||
free_positions = list(range(1, n_workers + 1))
|
free_positions = list(range(1, n_workers + 1))
|
||||||
repo_names_iter = iter(repo_names)
|
repo_names_iter = iter(repo_names)
|
||||||
future_to_repo = {}
|
future_to_repo: dict[Future, tuple[str, int]] = {}
|
||||||
with tqdm(
|
with tqdm(
|
||||||
total=len(repo_names),
|
total=len(repo_names),
|
||||||
desc="Processing repos",
|
desc="Processing repos",
|
||||||
@ -514,15 +514,25 @@ def process_repos_parallel(
|
|||||||
process_repo_worker, name, repos_dir, archive_destination, cache, pos
|
process_repo_worker, name, repos_dir, archive_destination, cache, pos
|
||||||
)
|
)
|
||||||
future_to_repo[new_fut] = (name, pos)
|
future_to_repo[new_fut] = (name, pos)
|
||||||
except KeyboardInterrupt:
|
except BaseException as top_e:
|
||||||
print("Saving all the entries up until now")
|
print("\n" * n_workers)
|
||||||
# any futures that happen to be done but not yet popped:
|
print(f"[ERROR] {type(top_e)}: {top_e}")
|
||||||
|
print("Saving all the entries of repos that were still being processed")
|
||||||
|
dataset_ids = {entry.metadata.id for entry in dataset.entries}
|
||||||
|
# any futures that happen
|
||||||
for fut in list(future_to_repo):
|
for fut in list(future_to_repo):
|
||||||
if fut.done():
|
try:
|
||||||
try:
|
result = fut.result()
|
||||||
dataset.entries.extend(fut.result())
|
print(f"Saving {len(result)} for {future_to_repo[fut][0]}")
|
||||||
except Exception:
|
for entry in result:
|
||||||
pass
|
if entry.metadata.id in dataset_ids:
|
||||||
|
print(
|
||||||
|
f"{entry.metadata.repo} PR #{entry.metadata.pr_number} already in dataset"
|
||||||
|
)
|
||||||
|
dataset.entries.extend(result)
|
||||||
|
except Exception as bot_e:
|
||||||
|
print(f"[ERROR] {type(bot_e)}: {bot_e}")
|
||||||
|
pass
|
||||||
# re-raise so the top‐level finally block still runs
|
# re-raise so the top‐level finally block still runs
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user