242 lines
10 KiB
Python
242 lines
10 KiB
Python
#!/usr/bin/env python
|
|
# coding: utf-8
|
|
|
|
# In[12]:
|
|
|
|
|
|
import os
|
|
import re
|
|
import base64
|
|
import sys
|
|
import subprocess
|
|
import concurrent.futures
|
|
from typing import List, Tuple
|
|
from litellm import completion
|
|
from tenacity import retry, wait_exponential, stop_after_attempt
|
|
|
|
|
|
# In[13]:
|
|
OUTPUT_FILE = 'code_review_output.txt'
|
|
DEFAULT_MAX_WORKERS = 3
|
|
|
|
# Function to check if the current directory is a git repository root
|
|
def is_git_repository_root(directory: str) -> bool:
|
|
return os.path.isdir(os.path.join(directory, '.git'))
|
|
|
|
def install_packages_from_file(filename: str):
|
|
try:
|
|
with open(filename, 'r') as f:
|
|
for line in f:
|
|
package_name = line.strip()
|
|
if package_name:
|
|
subprocess.check_call([sys.executable, '-m', 'pip', 'install', package_name, '--quiet'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
|
except Exception as e:
|
|
print("")
|
|
|
|
|
|
def extract_code(directory: str) -> List[Tuple[str, str]]:
|
|
code_snippets = []
|
|
include_terms = {'service', 'controller', 'listener', 'scheduler', 'utils', 'client', 'repository', 'dao'}
|
|
for root, dirs, files in os.walk(directory):
|
|
# Skip test directories
|
|
if 'test' in root.lower():
|
|
continue
|
|
|
|
for file in files:
|
|
if file.endswith(('.java', '.kt')):
|
|
file_path = os.path.join(root, file)
|
|
# Only include directories with include terms
|
|
if not any(term in file_path.lower() for term in include_terms):
|
|
continue
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
code = f.read()
|
|
# Remove import statements
|
|
code = re.sub(r'^import .*$', '', code, flags=re.MULTILINE)
|
|
code_snippets.append((file_path, code))
|
|
return code_snippets
|
|
|
|
# Function to classify code snippets
|
|
def classify_code(code_snippets: List[Tuple[str, str]]) -> List[Tuple[str, str, str]]:
|
|
classified_code = []
|
|
for file_path, code in code_snippets:
|
|
class_type = 'Unknown'
|
|
if 'Service' in file_path:
|
|
class_type = 'Service'
|
|
elif 'Client' in file_path:
|
|
class_type = 'Client'
|
|
elif 'Controller' in file_path:
|
|
class_type = 'Controller'
|
|
elif 'Listener' in file_path:
|
|
class_type = 'Listener'
|
|
elif 'Util' in file_path:
|
|
class_type = 'Util'
|
|
classified_code.append((file_path, code, class_type))
|
|
return classified_code
|
|
|
|
|
|
# In[15]:
|
|
|
|
def create_system_prompt() -> str:
|
|
encoded_prompt = os.getenv('ENCODED_CODE_REVIEW_SYSTEM_PROMPT', 'Um9sZTpHUFQgQ29kZSBSZXZpZXcgQWdlbnQ7IApPYmplY3RpdmU6UmV2aWV3IEpTLCBKU1gsIFRTLCBUU1ggY29kZTsKVGhpbmdzIHRvIHJldmlldzogRm9jdXMgb24gbWFqb3IgaXNzdWVzCjEuIE5vIG1hZ2ljIG51bWJlcnMgYW5kIG1hZ2ljIHN0cmluZ3MKMi4gaW1wcm92ZSBuYW1pbmcgb2YgdmFyaWFibGVzIGFuZCBmdW5jdGlvbnMKMy4gaW1wcm92ZSB0eXBlc2NyaXB0IHVzZQo0LiBubyBoYXJkY29kaW5nIG9mIHotaW5kZXggYW5kIGNvbG91cnMKNS4gYXZvaWQgZGF0ZS5ub3coKSB3ZSBoYXZlIHNlcnZlciB0aW1lIGF2YWlsYWJsZSB3aGljaCBpcyBtb3JlIGNvbnNpc3RlbnQKNi4gdXNlIG9wdGlvbmFsIGNoYWluaW5nIHdoZXJlIGV2ZXIgcG9zc2libGUuCjcuIHByZWZlciBmdW5jdGlvbmFsIHByb2dyYW1taW5nCjguIHJldXNlIGV4aXN0aW5nIGhlbHBlciBjb2RlCjkuIERvbuKAmXQgdXNlIGNhcnJldCBpbiBwYWNrYWdlLmpzb24KMTAuIEFwcHJvcHJpYXRlIHVzZSBvZiB1c2VNZW1vIGFuZCB1c2VDYWxsYmFjawoxMS4gRG9uJ3QgY3JlYXRlIGNvbXBvbmVudHMgaW5zaWRlIHJlbmRlciBmdW5jdGlvbgoxMi4gRG9uJ3QgY3JlYXRlIGZ1bmN0aW9ucyBpbnNpZGUgcmVuZGVyIGZ1bmN0aW9uCjEzLiBEb24ndCB1c2UgaW5saW5lIHN0eWxlcwoxNC4gRG9uJ3QgdXNlIGlubGluZSBldmVudCBoYW5kbGVycwoxNS4gcHJlZmVyIHRvIGV4dHJhY3QgZnVuY3Rpb24gb3V0c2lkZSBqc3ggaWYgbW9yZSB0aGFuIHR3byBjb25kaXRpb25zIGFyZSByZXF1aXJlZAoxNi4gRG9uJ3QgdXNlIGFueSB0eXBlCjE3LiBPbmx5IHVzZSBjb25zdCBhbmQgaWYgcmVxdWlyZWQgbGV0CjE4LiBFcnJvciBIYW5kbGluZwoxOS4gT3B0aW1pemF0aW9uOiBSZWNvbW1lbmQgYXZvaWRpbmcgcHJlbWF0dXJlIG9wdGltaXphdGlvbiBidXQgYWxzbyBzdWdnZXN0IGtlZXBpbmcgYW4gZXllIG91dCBmb3Igb2J2aW91cyBwZXJmb3JtYW5jZSBpc3N1ZXMsIHN1Y2ggYXMgdW5uZWNlc3NhcnkgY29tcHV0YXRpb25zIGluc2lkZSBsb29wcyBvciBleGNlc3NpdmUgRE9NIG1hbmlwdWxhdGlvbnMuCjIwLiBNZW1vcnkgTGVha3M6IEFkdmlzZSBjaGVja2luZyBmb3IgYW5kIGVsaW1pbmF0aW5nIHBvdGVudGlhbCBtZW1vcnkgbGVha3MsIGZvciBleGFtcGxlLCBieSBlbnN1cmluZyBldmVudCBsaXN0ZW5lcnMgYXJlIHByb3Blcmx5IHJlbW92ZWQgd2hlbiBubyBsb25nZXIgbmVlZGVkLgoKRm9ybWF0IG9mIG91dHB1dCB3aWxsIGJlOiAKRm9ybWF0dGVkIEdpdEh1YiBQUiBjb21tZW50IGluIGJlbG93IGdpdmVuIGZvcm1hdC4gRG9uJ3Qgd3JhcCB0aGUgb3V0cHV0IGluIHRyaXBsZSB0aWNrcyAoYGBgKS4KCiMjIyBwYXRoL3RvL2ZpbGUKKipJc3N1ZXMgRm91bmQqKgpgYGAKY29kZSBzbmlwcGV0CmBgYAoqKklzc3VlOioqIFNob3J0IElzc3VlIERlc2NyaXB0aW9uIChlZy4gbWFnaWMgc3RyaW5nIGxpdGVyYWwgdXNlZCkuCioqU3VnZ2VzdGVkIEZpeDoqKiBJc3N1ZSBGaXguIChlZy4gRGVmaW5lIGEgY29uc3RhbnQgZm9yIHRoZSBtYWdpYyBzdHJpbmcgbGl0ZXJhbCBhbmQgdXNlIGl0IGluc3RlYWQpLgoK')
|
|
decoded_bytes = base64.b64decode(encoded_prompt)
|
|
system_prompt = decoded_bytes.decode('utf-8')
|
|
return system_prompt
|
|
|
|
|
|
def create_gpt_prompt(file_path: str, code: str, class_type: str) -> str:
|
|
encoded_prompt = os.getenv('ENCODED_CODE_REVIEW_USER_PROMPT', 'Q29kZToKe2NvZGV9')
|
|
decoded_bytes = base64.b64decode(encoded_prompt)
|
|
user_prompt = decoded_bytes.decode('utf-8')
|
|
filled_prompt = user_prompt.format(file_path=file_path, class_type=class_type, code=code)
|
|
return filled_prompt
|
|
|
|
|
|
# LLM Utils
|
|
# In[17]:
|
|
@retry(wait=wait_exponential(multiplier=2, min=5, max=30), stop=stop_after_attempt(3))
|
|
def call_gpt_model(prompt: str) -> str:
|
|
try:
|
|
response = completion(
|
|
model=os.getenv('GPT_MODEL_NAME'),
|
|
messages=[
|
|
{
|
|
"role": "system",
|
|
"content": create_system_prompt(),
|
|
},
|
|
{
|
|
"role": "user",
|
|
"content": prompt,
|
|
}
|
|
],
|
|
)
|
|
return response['choices'][0]['message']['content'].strip()
|
|
except Exception as e:
|
|
print(f"An error occurred: {e}")
|
|
raise
|
|
|
|
# In[16]:
|
|
def get_pr_diff(directory: str, base_branch: str) -> List[str]:
|
|
# Change current working directory to the provided directory
|
|
try:
|
|
original_dir = os.getcwd()
|
|
os.chdir(directory)
|
|
|
|
# Ensure base branch is up-to-date
|
|
subprocess.check_call(['git', 'fetch', 'origin', base_branch])
|
|
|
|
# Get the diff output between origin/base_branch and HEAD
|
|
diff_output = subprocess.check_output(['git', 'diff', f'origin/{base_branch}...HEAD'], text=True)
|
|
|
|
return diff_output.splitlines()
|
|
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"Error: {e}")
|
|
return []
|
|
|
|
finally:
|
|
# Restore original working directory
|
|
os.chdir(original_dir)
|
|
|
|
# Function to extract code snippets from PR diff
|
|
def extract_code_from_diff(diff_lines: List[str]) -> List[Tuple[str, str]]:
|
|
code_snippets = []
|
|
file_path = ""
|
|
code = ""
|
|
for line in diff_lines:
|
|
if line.startswith('+++ b/'):
|
|
if file_path and code:
|
|
code_snippets.append((file_path, code))
|
|
code = ""
|
|
file_path = line[6:]
|
|
elif line.startswith('+') and not line.startswith('++'):
|
|
code += line[1:] + '\n'
|
|
if file_path and code:
|
|
code_snippets.append((file_path, code))
|
|
return code_snippets
|
|
|
|
|
|
def generate_review_for_file(file_path: str, code: str, class_type: str) -> str:
|
|
try:
|
|
print("Generating review for file path ", file_path)
|
|
prompt = create_gpt_prompt(file_path, code, class_type)
|
|
gpt_response = call_gpt_model(prompt)
|
|
if gpt_response.strip() and "no major issues found" not in gpt_response.lower():
|
|
return gpt_response
|
|
except Exception as e:
|
|
print(f"Failed to generate review for file {file_path}: {e}")
|
|
return ""
|
|
|
|
|
|
def generate_reviews(classified_code: List[Tuple[str, str, str]], output_file: str, max_workers) -> List[str]:
|
|
reviews = []
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
|
|
future_to_file = {
|
|
executor.submit(generate_review_for_file, file_path, code, class_type): (file_path, code, class_type)
|
|
for file_path, code, class_type in classified_code
|
|
}
|
|
|
|
for future in concurrent.futures.as_completed(future_to_file):
|
|
file_path, code, class_type = future_to_file[future]
|
|
try:
|
|
gpt_response = future.result()
|
|
if gpt_response:
|
|
reviews.append(gpt_response)
|
|
with open(output_file, 'a') as f:
|
|
f.write(gpt_response)
|
|
f.write("\n---*---\n\n") # Separator
|
|
except Exception as e:
|
|
print(f"Failed to generate review for file {file_path}: {e}")
|
|
|
|
return reviews
|
|
|
|
# In[20]:
|
|
# Modify main function to support PR review and full codebase review
|
|
def run_analysis(directory, output_file, max_workers):
|
|
review_mode = os.getenv('REVIEW_MODE', 'pr') # Default to full codebase review
|
|
if not is_git_repository_root(directory):
|
|
response = "Failure :: This script must be run at the root of a Git repository."
|
|
with open(output_file, 'a') as f:
|
|
f.write(response)
|
|
f.write("\n---*---\n") # Separator
|
|
return
|
|
|
|
print("Running in mode : ", review_mode)
|
|
if review_mode == 'pr':
|
|
base_branch = os.getenv('BASE_BRANCH', 'master')
|
|
diff_lines = get_pr_diff(directory, base_branch)
|
|
print("diff ", diff_lines)
|
|
code_snippets = extract_code_from_diff(diff_lines)
|
|
else:
|
|
code_snippets = extract_code(directory)
|
|
|
|
print("Identified code snippets list of size ", len(code_snippets))
|
|
classified_code = classify_code(code_snippets)
|
|
print("Classified code snippets list of size ", len(classified_code))
|
|
report = generate_reviews(classified_code, output_file, max_workers)
|
|
print(report)
|
|
|
|
|
|
# In[21]:
|
|
|
|
if __name__ == "__main__":
|
|
# Environment Variables
|
|
# Set the following Keys for the Script to Run
|
|
# REVIEW_MODE - full/pr
|
|
# GPT_MODEL_NAME
|
|
# Based on GPT Model - Set the credentials
|
|
# OPENAI_API_KEY
|
|
# AZURE_API_KEY
|
|
# AZURE_API_BASE
|
|
# AZURE_API_VERSION
|
|
|
|
if os.path.exists(OUTPUT_FILE):
|
|
os.remove(OUTPUT_FILE)
|
|
if len(sys.argv) != 2:
|
|
response = "Usage: python generic_code_review directory"
|
|
with open(OUTPUT_FILE, 'a') as f:
|
|
f.write(response)
|
|
f.write("\n---*---\n") # Separator
|
|
else:
|
|
directory = sys.argv[1]
|
|
install_packages_from_file('requirements.txt')
|
|
run_analysis(directory, OUTPUT_FILE, os.getenv('CODE_CRITIC_MAX_WORKERS', DEFAULT_MAX_WORKERS)) |