294 lines
11 KiB
Python
294 lines
11 KiB
Python
import copy
|
|
import difflib
|
|
import inspect
|
|
import json
|
|
import logging
|
|
import os
|
|
import time
|
|
import traceback
|
|
from abc import ABC, abstractmethod
|
|
from datetime import datetime
|
|
from functools import wraps
|
|
|
|
import aioconsole
|
|
import aiohttp
|
|
import asyncio
|
|
import rich
|
|
from aiolimiter import AsyncLimiter
|
|
from asyncio import Semaphore
|
|
from rich.syntax import Syntax
|
|
|
|
|
|
def time_async_function(func):
|
|
@wraps(func)
|
|
async def wrapper(*args, **kwargs):
|
|
start_time = time.perf_counter() # Start measuring time
|
|
result = await func(*args, **kwargs)
|
|
end_time = time.perf_counter() # End measuring time
|
|
elapsed_time = end_time - start_time
|
|
print(f'Execution time for {func.__name__}: {elapsed_time:.4f} seconds')
|
|
return result
|
|
|
|
return wrapper
|
|
|
|
|
|
def get_portal_domain(portal_url):
|
|
split___ = portal_url.split("//")[1].split(".")
|
|
if split___[1] == "np":
|
|
return "non-prod-deployment-portal"
|
|
return split___[0]
|
|
|
|
|
|
class AbstractMigration(ABC):
|
|
list_manifest_endpoint = "/api/manifest/list"
|
|
|
|
def __init__(
|
|
self,
|
|
portal_url,
|
|
portal_token,
|
|
skip_confirmation: bool = False,
|
|
submit_rate_limiter: AsyncLimiter = AsyncLimiter(1, 1),
|
|
max_concurrent_fetches: int = 10,
|
|
dry_run: bool = True
|
|
) -> None:
|
|
self.dry_run = dry_run
|
|
self.portal_url = portal_url
|
|
self.portal_token = portal_token
|
|
self.skip_confirmation: bool = skip_confirmation
|
|
self.post_rate_limiter = submit_rate_limiter
|
|
self.max_concurrent_fetches = Semaphore(max_concurrent_fetches)
|
|
self.portal_domain = get_portal_domain(portal_url)
|
|
|
|
self.http_client = None
|
|
|
|
now = datetime.now()
|
|
|
|
self.now = now.strftime('%Y-%m-%d_%H-%M-%S')
|
|
self.migration_dir = f"migrate_{self.now}/{self.portal_domain}"
|
|
os.makedirs(self.migration_dir, exist_ok=True)
|
|
self.log_filename = f"{self.migration_dir}/migration.log"
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(message)s',
|
|
handlers=[
|
|
logging.FileHandler(self.log_filename)
|
|
]
|
|
)
|
|
|
|
self.logger = logging.getLogger(__name__)
|
|
self.info("Filter function")
|
|
self.info(
|
|
logger_msg=self.get_func_source(self.filter),
|
|
console_msg=Syntax(self.get_func_source(self.filter), "python", theme="ansi_dark"),
|
|
)
|
|
self.info("\n")
|
|
|
|
self.info("Mutate function")
|
|
self.info(
|
|
logger_msg=self.get_func_source(self.migrate),
|
|
console_msg=Syntax(self.get_func_source(self.migrate), "python", theme="ansi_dark"),
|
|
)
|
|
self.info("\n")
|
|
|
|
self.info(f"Dry run: {self.dry_run}")
|
|
self.info(f"Require user confirmation before pushing changes to portal: {not self.skip_confirmation}")
|
|
self.info(f"Portal URL: {self.portal_url}")
|
|
self.info("\n")
|
|
|
|
def info(self, msg: object = None, logger_msg=None, console_msg=None):
|
|
if logger_msg is not None:
|
|
self.logger.info(logger_msg)
|
|
elif msg is not None:
|
|
self.logger.info(msg)
|
|
|
|
if console_msg is not None:
|
|
rich.print(console_msg)
|
|
elif msg is not None:
|
|
rich.print(msg)
|
|
|
|
def get_func_source(self, func):
|
|
try:
|
|
return inspect.getsource(func).strip()
|
|
except Exception as e:
|
|
self.logger.error("Failed to get the source code of the filter function.", e)
|
|
|
|
# noinspection PyUnusedLocal,PyMethodMayBeStatic
|
|
def filter(self, manifest_metadata: dict) -> bool:
|
|
return True
|
|
|
|
async def __aenter__(self):
|
|
self.http_client = aiohttp.ClientSession(headers={"X_AUTH_TOKEN": self.portal_token})
|
|
return self
|
|
|
|
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
|
if self.http_client:
|
|
await self.http_client.close()
|
|
|
|
# @time_async_function
|
|
async def fetch_manifest_metadata_list(self):
|
|
"""Fetches the metadata of all manifests based on the filter criteria."""
|
|
async with self.http_client.get(f"{self.portal_url}{self.list_manifest_endpoint}") as response:
|
|
manifest_metadata_list = await response.json()
|
|
return [manifest_metadata for manifest_metadata in manifest_metadata_list if
|
|
self.filter(manifest_metadata)]
|
|
|
|
@abstractmethod
|
|
def migrate(self, manifest) -> dict:
|
|
raise NotImplementedError("Implement me!")
|
|
|
|
@staticmethod
|
|
def write_json_state_to_file(state, filename):
|
|
"""Writes the current state to a file."""
|
|
os.makedirs(os.path.dirname(filename), exist_ok=True)
|
|
with open(filename, 'w') as file:
|
|
json.dump(state, file, indent=2)
|
|
|
|
@staticmethod
|
|
def write_state_to_file(state, filename):
|
|
"""Writes the current state to a file."""
|
|
os.makedirs(os.path.dirname(filename), exist_ok=True)
|
|
with open(filename, 'w') as file:
|
|
file.write(state)
|
|
|
|
@staticmethod
|
|
def make_diff(portal_domain, old_state, new_state):
|
|
old_json = json.dumps(old_state, indent=2)
|
|
new_json = json.dumps(new_state, indent=2)
|
|
name = f"{portal_domain}: {old_state['environment']}/{old_state['name']}"
|
|
"""Computes the difference between old and new states."""
|
|
return ''.join(
|
|
difflib.unified_diff(
|
|
old_json.splitlines(True),
|
|
new_json.splitlines(True),
|
|
lineterm='\n',
|
|
tofile=f'[new] {name}',
|
|
fromfile=f'[old] {name}'
|
|
)
|
|
)
|
|
|
|
# @time_async_function
|
|
async def fetch_full_manifest(self, manifest_metadata):
|
|
"""Fetches the full manifest based on the metadata."""
|
|
async with self.max_concurrent_fetches:
|
|
async with self.http_client.get(f"{self.portal_url}/api/manifest/{manifest_metadata['id']}") as response:
|
|
return await response.json()
|
|
|
|
async def producer(self, queue, manifest_metadata_list):
|
|
tasks = []
|
|
for metadata in manifest_metadata_list:
|
|
task = asyncio.create_task(self.fetch_and_queue(metadata, queue))
|
|
tasks.append(task)
|
|
|
|
await asyncio.gather(*tasks)
|
|
await queue.put(None) # Signal that production is done
|
|
|
|
async def fetch_and_queue(self, metadata, queue):
|
|
full_manifest = await self.fetch_full_manifest(metadata)
|
|
await queue.put(full_manifest)
|
|
|
|
async def run(self):
|
|
"""Runs the migration process."""
|
|
async with self as migration:
|
|
await migration.__run()
|
|
|
|
async def __run(self):
|
|
"""Runs the migration process and handles user input."""
|
|
manifest_metadata_list = await self.fetch_manifest_metadata_list()
|
|
queue = asyncio.Queue(maxsize=50)
|
|
|
|
_ = asyncio.create_task(self.producer(queue, manifest_metadata_list))
|
|
consumer = asyncio.create_task(self.consumer(queue, self.skip_confirmation))
|
|
|
|
await asyncio.gather(consumer)
|
|
self.info(f"Diffs are available in the directory: {os.getcwd()}/{self.migration_dir}")
|
|
self.info(f"Logs are available in the file: {os.getcwd()}/{self.log_filename}")
|
|
self.info("Migration completed.")
|
|
|
|
async def consumer(self, queue, skip_confirmation):
|
|
apply_all = skip_confirmation
|
|
skip_all = False
|
|
|
|
while True:
|
|
if skip_all:
|
|
break
|
|
manifest = await queue.get()
|
|
if manifest is None:
|
|
# Queue is empty and the producer has finished
|
|
break
|
|
|
|
if 'error' in manifest:
|
|
self.info(f"Failed to fetch manifest:")
|
|
self.info(manifest)
|
|
continue
|
|
|
|
self.info(f"Processing manifest: {manifest['environment']}/{manifest['name']}")
|
|
|
|
old_filename = self.migration_dir + f"/manifests/{manifest['environment']}/{manifest['name']}_old.json"
|
|
new_filename = self.migration_dir + f"/manifests/{manifest['environment']}/{manifest['name']}_new.json"
|
|
|
|
self.write_json_state_to_file(manifest, old_filename)
|
|
try:
|
|
new_manifest = self.migrate(copy.deepcopy(manifest))
|
|
except Exception as e:
|
|
self.info(f"Failed to mutate manifest: {manifest['environment']}/{manifest['name']}")
|
|
self.info(traceback.format_exc())
|
|
continue
|
|
self.write_json_state_to_file(new_manifest, new_filename)
|
|
|
|
diff = self.make_diff(self.portal_domain, manifest, new_manifest)
|
|
|
|
if diff == "":
|
|
continue
|
|
|
|
diff_file_name = (self.migration_dir +
|
|
f"/diffs/{self.portal_domain}/{manifest['environment']}/{manifest['name']}.diff")
|
|
self.write_state_to_file(diff, diff_file_name)
|
|
|
|
self.info(f"Diff for {manifest['environment']}/{manifest['name']}:")
|
|
self.info(
|
|
logger_msg=diff,
|
|
console_msg=Syntax(diff, "diff", theme="ansi_dark"),
|
|
)
|
|
|
|
if not apply_all:
|
|
choice = await self.get_user_choice()
|
|
if choice == 'a':
|
|
self.info("Applying all remaining changes.")
|
|
apply_all = True
|
|
elif choice == 'l':
|
|
self.info("Skipping all remaining changes.")
|
|
skip_all = True
|
|
continue
|
|
elif choice == 'n':
|
|
self.info("Skipping this change.")
|
|
continue
|
|
|
|
if self.dry_run:
|
|
self.info("Dry run enabled. Not sending the change to the portal.")
|
|
continue
|
|
|
|
await self.post_manifest(new_manifest)
|
|
|
|
async def get_user_choice(self):
|
|
valid_choices = ['y', 'a', 'n', 'l']
|
|
while True:
|
|
prompt_ = "Apply this change? [Y] Yes [A] Yes to All [N] No [L] No to All: "
|
|
choice = await aioconsole.ainput(prompt_)
|
|
choice = choice.lstrip().rstrip().lower()
|
|
self.info(logger_msg=f"{prompt_}{choice}")
|
|
if choice in valid_choices:
|
|
return choice
|
|
self.info("Invalid choice. Please enter a valid choice. Apply this change? "
|
|
"[Y] Yes [A] Yes to All [N] No [L] No to All: ")
|
|
|
|
async def post_manifest(self, manifest):
|
|
async with self.post_rate_limiter:
|
|
async with self.http_client.post(f"{self.portal_url}/api/manifest", json=manifest) as response:
|
|
if not (200 <= response.status < 300):
|
|
self.logger.error(f"Failed to post manifest: {manifest['name']}")
|
|
self.logger.error(f"Error: {await response.text()}")
|
|
else:
|
|
self.info(f"Successfully posted manifest: {manifest['environment']}/{manifest['name']}")
|
|
return response.status
|