#!/usr/bin/env python # coding: utf-8 import os import re import base64 import sys import subprocess import concurrent.futures from typing import List, Tuple from litellm import completion from tenacity import retry, wait_exponential, stop_after_attempt OUTPUT_FILE = 'code_review_output.txt' DEFAULT_MAX_WORKERS = 3 # Function to check if the current directory is a git repository root def is_git_repository_root(directory: str) -> bool: return os.path.isdir(os.path.join(directory, '.git')) def install_packages_from_file(filename: str): try: with open(filename, 'r') as f: for line in f: package_name = line.strip() if package_name: subprocess.check_call([sys.executable, '-m', 'pip', 'install', package_name, '--quiet'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) except Exception as e: print("") def create_system_prompt() -> str: encoded_prompt = os.getenv('ENCODED_CODE_REVIEW_SYSTEM_PROMPT', 'Um9sZTpHUFQgQ29kZSBSZXZpZXcgQWdlbnQ7IApPYmplY3RpdmU6UmV2aWV3IEpTLCBKU1gsIFRTLCBUU1ggY29kZTsKVGhpbmdzIHRvIHJldmlldzogRm9jdXMgb24gbWFqb3IgaXNzdWVzCjEuIE5vIG1hZ2ljIG51bWJlcnMgYW5kIG1hZ2ljIHN0cmluZ3M6IEF2b2lkIHVzaW5nIGhhcmQtY29kZWQgdmFsdWVzIGRpcmVjdGx5IGluIHRoZSBjb2RlLCBhcyB0aGV5IGNhbiBtYWtlIHRoZSBjb2RlIGhhcmRlciB0byB1bmRlcnN0YW5kIGFuZCBtYWludGFpbi4KMi4gSW1wcm92ZSBuYW1pbmcgb2YgdmFyaWFibGVzIGFuZCBmdW5jdGlvbnM6IENsZWFyLCBkZXNjcmlwdGl2ZSBuYW1lcyBtYWtlIGNvZGUgbW9yZSByZWFkYWJsZSBhbmQgbWFpbnRhaW5hYmxlLgozLiBJbXByb3ZlIFR5cGVTY3JpcHQgdXNlOiBMZXZlcmFnaW5nIFR5cGVTY3JpcHQncyBmZWF0dXJlcyBjYW4gc2lnbmlmaWNhbnRseSByZWR1Y2UgcnVudGltZSBlcnJvcnMgYW5kIGltcHJvdmUgY29kZSBxdWFsaXR5Lgo0LiBObyBoYXJkY29kaW5nIG9mIHotaW5kZXggYW5kIGNvbG91cnM6IEhhcmRjb2RlZCB2YWx1ZXMgY2FuIGxlYWQgdG8gaW5jb25zaXN0ZW5jaWVzIGFuZCBkaWZmaWN1bHRpZXMgaW4gdGhlbWUgY2hhbmdlcyBvciBhZGp1c3RtZW50cy4KNS4gQXZvaWQgRGF0ZS5ub3coKSB3aGVuIHNlcnZlciB0aW1lIGlzIGF2YWlsYWJsZTogU2VydmVyIHRpbWUgZW5zdXJlcyBjb25zaXN0ZW5jeSBhY3Jvc3MgZGlmZmVyZW50IGNsaWVudHMsIGF2b2lkaW5nIHBvdGVudGlhbCBpc3N1ZXMgd2l0aCBsb2NhbCB0aW1lIGRpc2NyZXBhbmNpZXMuCjYuIFVzZSBvcHRpb25hbCBjaGFpbmluZyB3aGVyZXZlciBwb3NzaWJsZTogVGhpcyBpbXByb3ZlcyBjb2RlIHJlYWRhYmlsaXR5IGFuZCBzYWZldHkgYnkgYXZvaWRpbmcgdW5uZWNlc3NhcnkgY2hlY2tzIGZvciB1bmRlZmluZWQgb3IgbnVsbCB2YWx1ZXMuCjcuIFByZWZlciBmdW5jdGlvbmFsIHByb2dyYW1taW5nOiBFbXBoYXNpemluZyBpbW11dGFiaWxpdHkgYW5kIHN0YXRlbGVzc25lc3MgY2FuIGxlYWQgdG8gbW9yZSBwcmVkaWN0YWJsZSBhbmQgYnVnLXJlc2lzdGFudCBjb2RlLgo4LiBFcnJvciBIYW5kbGluZzogUHJvcGVyIGVycm9yIGhhbmRsaW5nIGVuc3VyZXMgdGhhdCB0aGUgYXBwbGljYXRpb24gY2FuIGdyYWNlZnVsbHkgaGFuZGxlIHVuZXhwZWN0ZWQgc2l0dWF0aW9ucywgaW1wcm92aW5nIHJlbGlhYmlsaXR5Lgo5LiBPcHRpbWl6YXRpb246IFdoaWxlIGF2b2lkaW5nIHByZW1hdHVyZSBvcHRpbWl6YXRpb24sIGJlIG1pbmRmdWwgb2Ygb2J2aW91cyBwZXJmb3JtYW5jZSBpc3N1ZXMgdGhhdCBjb3VsZCBhZmZlY3QgdGhlIHVzZXIgZXhwZXJpZW5jZS4KCkZvcm1hdCBvZiBvdXRwdXQgd2lsbCBiZTogCkZvcm1hdHRlZCBHaXRIdWIgUFIgY29tbWVudCBpbiBiZWxvdyBnaXZlbiBmb3JtYXQuIERvbuKAmXQgd3JhcCB0aGUgb3V0cHV0IGluIHRyaXBsZSB0aWNrcyAoYGBgKS4KCiMjIyBwYXRoL3RvL2ZpbGUKKipJc3N1ZXMgRm91bmQqKgpgYGAKY29kZSBzbmlwcGV0CmBgYAoqKklzc3VlOioqIFNob3J0IElzc3VlIERlc2NyaXB0aW9uIChlZy4gbWFnaWMgc3RyaW5nIGxpdGVyYWwgdXNlZCkuCioqU3VnZ2VzdGVkIEZpeDoqKiBJc3N1ZSBGaXguIChlZy4gRGVmaW5lIGEgY29uc3RhbnQgZm9yIHRoZSBtYWdpYyBzdHJpbmcgbGl0ZXJhbCBhbmQgdXNlIGl0IGluc3RlYWQpLg=') decoded_bytes = base64.b64decode(encoded_prompt) system_prompt = decoded_bytes.decode('utf-8') return system_prompt def create_gpt_prompt(file_path: str, code: str) -> str: encoded_prompt = os.getenv('ENCODED_CODE_REVIEW_USER_PROMPT', 'UmV2aWV3IHRoZSBmb2xsb3dpbmcgSlMvSlNYL1RTL1RTWCBjb2RlIGZvciBvbmx5IHRoZSBtYWpvciBwb3RlbnRpYWwgaXNzdWVzIHJlbGF0ZWQgdG8gdGhlc2UgZmlsZXMuIEl0IGNvdWxkIGJlIHRoZXNlIHRoaW5nczogCjEuIE5vIG1hZ2ljIG51bWJlcnMgYW5kIG1hZ2ljIHN0cmluZ3M6IEF2b2lkIHVzaW5nIGhhcmQtY29kZWQgdmFsdWVzIGRpcmVjdGx5IGluIHRoZSBjb2RlLCBhcyB0aGV5IGNhbiBtYWtlIHRoZSBjb2RlIGhhcmRlciB0byB1bmRlcnN0YW5kIGFuZCBtYWludGFpbi4KMi4gSW1wcm92ZSBuYW1pbmcgb2YgdmFyaWFibGVzIGFuZCBmdW5jdGlvbnM6IENsZWFyLCBkZXNjcmlwdGl2ZSBuYW1lcyBtYWtlIGNvZGUgbW9yZSByZWFkYWJsZSBhbmQgbWFpbnRhaW5hYmxlLgozLiBJbXByb3ZlIFR5cGVTY3JpcHQgdXNlOiBMZXZlcmFnaW5nIFR5cGVTY3JpcHQncyBmZWF0dXJlcyBjYW4gc2lnbmlmaWNhbnRseSByZWR1Y2UgcnVudGltZSBlcnJvcnMgYW5kIGltcHJvdmUgY29kZSBxdWFsaXR5Lgo0LiBObyBoYXJkY29kaW5nIG9mIHotaW5kZXggYW5kIGNvbG91cnM6IEhhcmRjb2RlZCB2YWx1ZXMgY2FuIGxlYWQgdG8gaW5jb25zaXN0ZW5jaWVzIGFuZCBkaWZmaWN1bHRpZXMgaW4gdGhlbWUgY2hhbmdlcyBvciBhZGp1c3RtZW50cy4KNS4gQXZvaWQgRGF0ZS5ub3coKSB3aGVuIHNlcnZlciB0aW1lIGlzIGF2YWlsYWJsZTogU2VydmVyIHRpbWUgZW5zdXJlcyBjb25zaXN0ZW5jeSBhY3Jvc3MgZGlmZmVyZW50IGNsaWVudHMsIGF2b2lkaW5nIHBvdGVudGlhbCBpc3N1ZXMgd2l0aCBsb2NhbCB0aW1lIGRpc2NyZXBhbmNpZXMuCjYuIFVzZSBvcHRpb25hbCBjaGFpbmluZyB3aGVyZXZlciBwb3NzaWJsZTogVGhpcyBpbXByb3ZlcyBjb2RlIHJlYWRhYmlsaXR5IGFuZCBzYWZldHkgYnkgYXZvaWRpbmcgdW5uZWNlc3NhcnkgY2hlY2tzIGZvciB1bmRlZmluZWQgb3IgbnVsbCB2YWx1ZXMuCjcuIFByZWZlciBmdW5jdGlvbmFsIHByb2dyYW1taW5nOiBFbXBoYXNpemluZyBpbW11dGFiaWxpdHkgYW5kIHN0YXRlbGVzc25lc3MgY2FuIGxlYWQgdG8gbW9yZSBwcmVkaWN0YWJsZSBhbmQgYnVnLXJlc2lzdGFudCBjb2RlLgo4LiBFcnJvciBIYW5kbGluZzogUHJvcGVyIGVycm9yIGhhbmRsaW5nIGVuc3VyZXMgdGhhdCB0aGUgYXBwbGljYXRpb24gY2FuIGdyYWNlZnVsbHkgaGFuZGxlIHVuZXhwZWN0ZWQgc2l0dWF0aW9ucywgaW1wcm92aW5nIHJlbGlhYmlsaXR5Lgo5LiBPcHRpbWl6YXRpb246IFdoaWxlIGF2b2lkaW5nIHByZW1hdHVyZSBvcHRpbWl6YXRpb24sIGJlIG1pbmRmdWwgb2Ygb2J2aW91cyBwZXJmb3JtYW5jZSBpc3N1ZXMgdGhhdCBjb3VsZCBhZmZlY3QgdGhlIHVzZXIgZXhwZXJpZW5jZS4gCkJlIGNvbmNyZXRlIGluIHlvdXIgcmVzcG9uc2UgdG8gb25seSB0aGUgbWFqb3IgaXNzdWVzIGFuZCBnaXZlIHRvLXRoZS1wb2ludCBkZXNjcmlwdGlvbiBhbmQgZml4ZXMgaW4gbWF4IDItMyBsaW5lcyBmb3IgZXZlcnkgaXNzdWUuIElmIHlvdSBkb24ndCBmaW5kIGFueSBpc3N1ZXMgaW4gdGhlIGNvZGUsIGp1c3QgZ2l2ZSAiTm8gbWFqb3IgaXNzdWVzIGZvdW5kIiBhbmQgZG9uJ3QgZ2l2ZSBhbnkgdW5uZWNlc3Nhcnkgc3VnZ2VzdGlvbnMgaW4gdGhhdCBjYXNlLiBUaGUgb3V0cHV0IHNob3VsZCBiZSBmb3JtYXR0ZWQgYXMgYSBHaXRIdWIgUFIgY29tbWVudC4KCkZpbGUgUGF0aDoKe2ZpbGVfcGF0aH0KCkNvZGU6Cntjb2RlfQ=') decoded_bytes = base64.b64decode(encoded_prompt) user_prompt = decoded_bytes.decode('utf-8') filled_prompt = user_prompt.format(file_path=file_path, code=code) return filled_prompt @retry(wait=wait_exponential(multiplier=2, min=5, max=30), stop=stop_after_attempt(3)) def call_gpt_model(prompt: str) -> str: try: response = completion( model=os.getenv('GPT_MODEL_NAME'), messages=[ { "role": "system", "content": create_system_prompt(), }, { "role": "user", "content": prompt, } ], ) return response['choices'][0]['message']['content'].strip() except Exception as e: print(f"An error occurred: {e}") raise def get_pr_diff(directory: str, base_branch: str) -> List[str]: try: original_dir = os.getcwd() os.chdir(directory) subprocess.check_call(['git', 'fetch', 'origin', base_branch]) diff_output = subprocess.check_output(['git', 'diff', f'origin/{base_branch}...HEAD'], text=True) return diff_output.splitlines() except subprocess.CalledProcessError as e: print(f"Error: {e}") return [] finally: os.chdir(original_dir) def extract_code_from_diff(diff_lines: List[str]) -> List[Tuple[str, str]]: code_snippets = [] file_path = "" code = "" for line in diff_lines: if line.startswith('+++ b/'): if file_path and code: if file_path.endswith(('.js', '.jsx', '.ts', '.tsx')): code_snippets.append((file_path, code)) code = "" file_path = line[6:] elif line.startswith('+') and not line.startswith('++'): code += line[1:] + '\n' if file_path and code: if file_path.endswith(('.js', '.jsx', '.ts', '.tsx')): code_snippets.append((file_path, code)) return code_snippets def generate_review_for_file(file_path: str, code: str) -> str: try: print("Generating review for file path ", file_path) prompt = create_gpt_prompt(file_path, code) gpt_response = call_gpt_model(prompt) if gpt_response.strip() and "no major issues found" not in gpt_response.lower(): return gpt_response except Exception as e: print(f"Failed to generate review for file {file_path}: {e}") return "" def generate_reviews(code_snippets: List[Tuple[str, str]], output_file: str, max_workers) -> List[str]: reviews = [] with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: future_to_file = { executor.submit(generate_review_for_file, file_path, code): (file_path, code) for file_path, code in code_snippets } for future in concurrent.futures.as_completed(future_to_file): file_path, code = future_to_file[future] try: gpt_response = future.result() if gpt_response: reviews.append(gpt_response) with open(output_file, 'a') as f: f.write(gpt_response) f.write("\n---*---\n\n") except Exception as e: print(f"Failed to generate review for file {file_path}: {e}") return reviews def run_analysis(directory, output_file, max_workers): review_mode = os.getenv('REVIEW_MODE', 'pr') if not is_git_repository_root(directory): response = "Failure :: This script must be run at the root of a Git repository." with open(output_file, 'a') as f: f.write(response) f.write("\n---*---\n") return print("Running in mode : ", review_mode) base_branch = os.getenv('BASE_BRANCH', 'master') diff_lines = get_pr_diff(directory, base_branch) code_snippets = extract_code_from_diff(diff_lines) report = generate_reviews(code_snippets, output_file, max_workers) print(report) if __name__ == "__main__": if os.path.exists(OUTPUT_FILE): os.remove(OUTPUT_FILE) if len(sys.argv) != 2: response = "Usage: python generic_code_review directory" with open(OUTPUT_FILE, 'a') as f: f.write(response) f.write("\n---*---\n") else: directory = sys.argv[1] install_packages_from_file('requirements.txt') run_analysis(directory, OUTPUT_FILE, os.getenv('CODE_CRITIC_MAX_WORKERS', DEFAULT_MAX_WORKERS))