mirror of
https://github.com/karma-riuk/crab.git
synced 2025-07-05 05:28:13 +02:00
201 lines
7.5 KiB
Python
201 lines
7.5 KiB
Python
import os, requests, re
|
|
from datetime import datetime
|
|
from typing import Optional
|
|
import itertools
|
|
import pandas as pd
|
|
|
|
from unidiff import PatchSet
|
|
from io import StringIO
|
|
|
|
COMMON_HEADERS = {
|
|
'Accept': 'application/vnd.github+json',
|
|
'Authorization': f'Bearer {os.environ["GITHUB_AUTH_TOKEN_CRAB"]}',
|
|
'X-Github-Api-Version': '2022-11-28',
|
|
}
|
|
|
|
def github_call(url):
|
|
result = requests.get(url, headers=COMMON_HEADERS)
|
|
if result.status_code != 200:
|
|
raise Exception(f"Failed to fetch {url}: {result.status_code}")
|
|
return result
|
|
|
|
def get_comments(repo_url: str, pr_number: str) -> list[dict]:
|
|
response = github_call(f'{repo_url}/pulls/{pr_number}/comments')
|
|
return response.json()
|
|
|
|
def get_commit(repo_url: str, commit_sha: str) -> dict:
|
|
response = github_call(f'{repo_url}/commits/{commit_sha}')
|
|
return response.json()
|
|
|
|
def get_commits(repo_url: str, pr_number: str) -> list[dict]:
|
|
response = github_call(f'{repo_url}/pulls/{pr_number}/commits')
|
|
commits = response.json()
|
|
for commit in commits:
|
|
detailed_commit = get_commit(repo_url, commit['sha'])
|
|
if "files" not in detailed_commit:
|
|
continue
|
|
|
|
for file in detailed_commit['files']:
|
|
file["patch_range"] = parse_hunk_header(file['patch'])
|
|
commit["files"] = detailed_commit["files"]
|
|
return commits
|
|
|
|
def parse_date(date: str) -> datetime:
|
|
return datetime.strptime(date, "%Y-%m-%dT%H:%M:%SZ")
|
|
|
|
def get_first_comment_date(comments: list[dict]) -> datetime:
|
|
return min([parse_date(comment['created_at']) for comment in comments])
|
|
|
|
def get_useful_commits(commits: list[dict], first_comment_date: datetime) -> list[dict]:
|
|
ret = []
|
|
for commit in commits:
|
|
if ("commit" not in commit
|
|
and "author" not in commit["author"]
|
|
and "date" not in commit['commit']['author']):
|
|
continue
|
|
commit_date = parse_date(commit['commit']['author']['date'])
|
|
if commit_date > first_comment_date:
|
|
ret.append(commit)
|
|
return ret
|
|
|
|
def parse_hunk_header(hunk_header) -> Optional[dict]:
|
|
"""Extracts line ranges from a diff hunk header."""
|
|
match = re.match(r'@@ -(\d+),?(\d*) \+(\d+),?(\d*) @@', hunk_header)
|
|
if match:
|
|
old_start = int(match.group(1))
|
|
old_count = int(match.group(2)) if match.group(2) else 1
|
|
new_start = int(match.group(3))
|
|
new_count = int(match.group(4)) if match.group(4) else 1
|
|
return {
|
|
"old_range": {
|
|
"start" : old_start,
|
|
"end" : old_start + old_count - 1
|
|
},
|
|
"new_range": {
|
|
"start" : new_start,
|
|
"end" : new_start + new_count - 1
|
|
},
|
|
}
|
|
return None
|
|
|
|
def augment_comments(comments: list[dict]) -> list[dict]:
|
|
ret = []
|
|
for comment in comments:
|
|
new_comment = comment.copy()
|
|
if "diff_hunk" not in comment:
|
|
continue
|
|
new_comment["hunk_range"] = parse_hunk_header(comment["diff_hunk"])
|
|
ret.append(new_comment)
|
|
return ret
|
|
|
|
def is_range_overlapping(range1: dict, range2: dict) -> bool:
|
|
return range1["start"] <= range2["start"] <= range1["end"] or range2["start"] <= range1["start"] <= range2["end"]
|
|
|
|
def get_overlapping_commits_and_comments(commits: list[dict], comments: list[dict]) -> list[tuple[dict, dict]]:
|
|
ret = []
|
|
for commit, comment in itertools.product(commits, comments):
|
|
if "hunk_range" not in comment:
|
|
continue
|
|
if "files" not in commit:
|
|
continue
|
|
if parse_date(commit['commit']['author']['date']) < parse_date(comment['created_at']):
|
|
# we can't address a comment if that comment was made after the commit
|
|
continue
|
|
for file in commit["files"]:
|
|
if "patch_range" not in file:
|
|
continue
|
|
if file["filename"] == comment["path"]:
|
|
if is_range_overlapping(file["patch_range"]["old_range"], comment["hunk_range"]["new_range"]):
|
|
commit_copy = commit.copy()
|
|
commit_copy["relevant_file"] = file
|
|
ret.append((commit_copy, comment))
|
|
return ret
|
|
|
|
def reverse_patch(file_after: str, patch_content: str) -> str:
|
|
"""
|
|
Reverses a patch and applies it to a file to get the version of the file before the patch.
|
|
"""
|
|
# Parse the patch
|
|
patch = PatchSet(StringIO(patch_content))
|
|
|
|
# Extract the file to be patched
|
|
after_lines = file_after.splitlines(keepends=True)
|
|
|
|
for patched_file in patch:
|
|
if patched_file.is_modified_file:
|
|
original_lines = after_lines[:]
|
|
modified_lines = []
|
|
|
|
# Apply the patch in reverse
|
|
for hunk in patched_file:
|
|
hunk_lines = [str(line.value) for line in hunk.source_lines()]
|
|
new_start = hunk.target_start - 1
|
|
new_end = new_start + hunk.target_length
|
|
|
|
# Replace modified section with original content from patch
|
|
modified_lines.extend(original_lines[:new_start])
|
|
modified_lines.extend(hunk_lines)
|
|
original_lines = original_lines[new_end:]
|
|
|
|
modified_lines.extend(original_lines)
|
|
return "".join(modified_lines)
|
|
|
|
return file_after # Return unmodified if no patch applies
|
|
|
|
def extract_triplet(commit_comments: list[tuple[dict, dict]])-> list[dict]:
|
|
ret = []
|
|
for commit, comment in commit_comments:
|
|
file_after = github_call(commit["relevant_file"]["raw_url"]).text
|
|
filename = comment["path"]
|
|
patch_content = f"--- a/{filename}\n+++ b/{filename}\n" + commit["relevant_file"]["patch"] + "\n"
|
|
file_before = reverse_patch(file_after, patch_content)
|
|
ret.append({
|
|
"file_before": file_before,
|
|
"comment": comment["body"],
|
|
"file_after": file_after
|
|
})
|
|
return ret
|
|
|
|
def process_pull_request(repo_url: str, pr_number: str) -> bool:
|
|
tmp_comments = get_comments(repo_url, pr_number)
|
|
comments = augment_comments(tmp_comments)
|
|
|
|
if len(comments) == 0:
|
|
# No comments, can't extract triplet
|
|
return False
|
|
|
|
|
|
first_comment_date = get_first_comment_date(comments)
|
|
|
|
# get commits and filter out the ones that are older than the first
|
|
# comment, since they are the commits relevant for the PR
|
|
tmp_commits = get_commits(repo_url, pr_number)
|
|
commits = get_useful_commits(tmp_commits, first_comment_date)
|
|
|
|
if len(commits) == 0:
|
|
# No commits after the first comment, there were no revision from the contributor, so no triplet
|
|
return False
|
|
|
|
|
|
overlapping_commits_and_comments = get_overlapping_commits_and_comments(commits, comments)
|
|
|
|
for commit, comment in overlapping_commits_and_comments:
|
|
print(f"Commit: {commit['sha']} address comment {comment['id']}")
|
|
print(f"Commit message: {commit['commit']['message']}")
|
|
print(f"Comment: {comment['body']}")
|
|
print(commit["relevant_file"]['patch'])
|
|
print()
|
|
|
|
triplets_df = pd.DataFrame(extract_triplet(overlapping_commits_and_comments))
|
|
repo_name = "/".join(repo_url.split("/")[-2:])
|
|
triplets_df["repo"] = repo_name
|
|
triplets_df["pr_number"] = pr_number
|
|
triplets_df.to_csv("triplets.csv", index=False)
|
|
|
|
return True
|
|
|
|
if __name__ == "__main__":
|
|
response = github_call('https://api.github.com/repos/cdk/cdk/pulls/1140/commits')
|
|
response = github_call('https://api.github.com/repos/cdk/cdk/pulls/1140/commits')
|
|
process_pull_request('https://api.github.com/repos/cdk/cdk', '1140')
|