import os import shutil import boto3 from botocore.exceptions import ClientError import sys bucket = 'navi-github-actions-cache' s3_file_name = 'latest_build.apk' class ProgressTracker: def __init__(self, total_size): self.total_size = total_size self.bytes_uploaded = 0 self.bytes_downloaded = 0 def upload_progress_callback(self, bytes_uploaded): self.bytes_uploaded += bytes_uploaded bytes_left = self.total_size - self.bytes_uploaded print(f"Uploaded: {to_mb(self.bytes_uploaded)} MB, Remaining: {to_mb(bytes_left)} MB, Total : {to_mb(self.total_size)} MB") def download_progress_callback(self, bytes_amount): self.bytes_downloaded += bytes_amount bytes_left = self.total_size - self.bytes_downloaded print(f"Downloaded: {to_mb(self.bytes_downloaded)} MB, Remaining : {to_mb(bytes_left)} MB, Total : {to_mb(self.total_size)} MB") def upload_file(access_key, secret_key, local_path): try: file_path = local_path file_size = os.path.getsize(file_path) tracker = ProgressTracker(file_size) s3 = boto3.client('s3', aws_access_key_id=access_key, aws_secret_access_key=secret_key) print('Uploading apk') print(os.listdir()) response = s3.upload_file(file_path, bucket, s3_file_name, Callback=tracker.upload_progress_callback) print('File Uploaded Successfully') except Exception as e: print(f"Error uploading file: {str(e)}") def download_file(access_key, secret_key, local_path): try: # Create the directory for the local file path if it doesn't exist os.makedirs(os.path.dirname(local_path), exist_ok=True) s3 = boto3.client('s3', aws_access_key_id=access_key, aws_secret_access_key=secret_key) file_info = s3.head_object(Bucket=bucket, Key=s3_file_name) file_size = file_info['ContentLength'] tracker = ProgressTracker(file_size) with open(local_path, 'wb') as f: s3.download_fileobj(bucket, s3_file_name, f, Callback=tracker.download_progress_callback) print("File downloaded successfully.") print(local_path) # Print the file path except Exception as e: print(f"Error downloading file: {str(e)}") def to_mb(bytes): return round(bytes/(1024*1024), 2) if __name__ == "__main__": command = sys.argv[1] access_key = sys.argv[2] secret_key = sys.argv[3] if command == 'download': local_path = sys.argv[4] download_file(access_key, secret_key, local_path) elif command == 'upload': local_path = sys.argv[4] upload_file(access_key, secret_key, local_path)