From ff4eb279d58070041e7426d7561adfb7b2e4e328 Mon Sep 17 00:00:00 2001 From: Ashvin S Date: Mon, 21 Oct 2024 13:08:15 +0530 Subject: [PATCH] INFRA-3931 | Ashvin | Add migration framework (#1239) * INFRA-3931 | Ashvin | Add migration framework --- .gitignore | 3 + scripts/_migration_framework/README.md | 39 +++ scripts/_migration_framework/abstract.py | 293 ++++++++++++++++++ scripts/_migration_framework/example.py | 40 +++ scripts/_migration_framework/requirements.txt | 18 ++ 5 files changed, 393 insertions(+) create mode 100644 scripts/_migration_framework/README.md create mode 100644 scripts/_migration_framework/abstract.py create mode 100644 scripts/_migration_framework/example.py create mode 100644 scripts/_migration_framework/requirements.txt diff --git a/.gitignore b/.gitignore index 2c58a867..3f0d06d0 100644 --- a/.gitignore +++ b/.gitignore @@ -51,3 +51,6 @@ pipeline_manifests user-mapping.yaml **/vendor/ bin/ +**/migrate_* +**/env +**/__pycache__/ diff --git a/scripts/_migration_framework/README.md b/scripts/_migration_framework/README.md new file mode 100644 index 00000000..664de265 --- /dev/null +++ b/scripts/_migration_framework/README.md @@ -0,0 +1,39 @@ +# Migration Framework +This script provides a easier, faster and safer way to do migration on the manifests. + +## How to use +1. Create a new file and implement the `AbstractMigration` class. +2. Implement `migrate` method. This method will be called when the migration is executed. +3. Override `filter` method. Use this function if you do not want to run the migration on all manifests. +4. Run your class using this method: +```python +async def main(): + migration = CustomMigration( + "https://deployment-portal-backend.np.navi-tech.in", + "", + ) + await migration.run() + + +if __name__ == "__main__": + asyncio.run(main()) +``` + +Refer to [example_migration.py](example_migration.py) for a sample migration. + +## Set up environment +1. It is advisable to use a virtual environment to run the migration script. To set up the environment, run the following commands: + ```bash + cd scripts/_migration_framework + python3 -m venv venv + source venv/bin/activate + pip install -r requirements.txt + ``` +2. Run the migration script + ```bash + python example.py + ``` +3. To deactivate the virtual environment, run the following command: + ```bash + deactivate + ``` diff --git a/scripts/_migration_framework/abstract.py b/scripts/_migration_framework/abstract.py new file mode 100644 index 00000000..499ad88c --- /dev/null +++ b/scripts/_migration_framework/abstract.py @@ -0,0 +1,293 @@ +import copy +import difflib +import inspect +import json +import logging +import os +import time +import traceback +from abc import ABC, abstractmethod +from datetime import datetime +from functools import wraps + +import aioconsole +import aiohttp +import asyncio +import rich +from aiolimiter import AsyncLimiter +from asyncio import Semaphore +from rich.syntax import Syntax + + +def time_async_function(func): + @wraps(func) + async def wrapper(*args, **kwargs): + start_time = time.perf_counter() # Start measuring time + result = await func(*args, **kwargs) + end_time = time.perf_counter() # End measuring time + elapsed_time = end_time - start_time + print(f'Execution time for {func.__name__}: {elapsed_time:.4f} seconds') + return result + + return wrapper + + +def get_portal_domain(portal_url): + split___ = portal_url.split("//")[1].split(".") + if split___[1] == "np": + return "non-prod-deployment-portal" + return split___[0] + + +class AbstractMigration(ABC): + list_manifest_endpoint = "/api/manifest/list" + + def __init__( + self, + portal_url, + portal_token, + skip_confirmation: bool = False, + submit_rate_limiter: AsyncLimiter = AsyncLimiter(1, 1), + max_concurrent_fetches: int = 10, + dry_run: bool = True + ) -> None: + self.dry_run = dry_run + self.portal_url = portal_url + self.portal_token = portal_token + self.skip_confirmation: bool = skip_confirmation + self.post_rate_limiter = submit_rate_limiter + self.max_concurrent_fetches = Semaphore(max_concurrent_fetches) + self.portal_domain = get_portal_domain(portal_url) + + self.http_client = None + + now = datetime.now() + + self.now = now.strftime('%Y-%m-%d_%H-%M-%S') + self.migration_dir = f"migrate_{self.now}/{self.portal_domain}" + os.makedirs(self.migration_dir, exist_ok=True) + self.log_filename = f"{self.migration_dir}/migration.log" + + logging.basicConfig( + level=logging.INFO, + format='%(message)s', + handlers=[ + logging.FileHandler(self.log_filename) + ] + ) + + self.logger = logging.getLogger(__name__) + self.info("Filter function") + self.info( + logger_msg=self.get_func_source(self.filter), + console_msg=Syntax(self.get_func_source(self.filter), "python", theme="ansi_dark"), + ) + self.info("\n") + + self.info("Mutate function") + self.info( + logger_msg=self.get_func_source(self.migrate), + console_msg=Syntax(self.get_func_source(self.migrate), "python", theme="ansi_dark"), + ) + self.info("\n") + + self.info(f"Dry run: {self.dry_run}") + self.info(f"Require user confirmation before pushing changes to portal: {not self.skip_confirmation}") + self.info(f"Portal URL: {self.portal_url}") + self.info("\n") + + def info(self, msg: object = None, logger_msg=None, console_msg=None): + if logger_msg is not None: + self.logger.info(logger_msg) + elif msg is not None: + self.logger.info(msg) + + if console_msg is not None: + rich.print(console_msg) + elif msg is not None: + rich.print(msg) + + def get_func_source(self, func): + try: + return inspect.getsource(func).strip() + except Exception as e: + self.logger.error("Failed to get the source code of the filter function.", e) + + # noinspection PyUnusedLocal,PyMethodMayBeStatic + def filter(self, manifest_metadata: dict) -> bool: + return True + + async def __aenter__(self): + self.http_client = aiohttp.ClientSession(headers={"X_AUTH_TOKEN": self.portal_token}) + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + if self.http_client: + await self.http_client.close() + + # @time_async_function + async def fetch_manifest_metadata_list(self): + """Fetches the metadata of all manifests based on the filter criteria.""" + async with self.http_client.get(f"{self.portal_url}{self.list_manifest_endpoint}") as response: + manifest_metadata_list = await response.json() + return [manifest_metadata for manifest_metadata in manifest_metadata_list if + self.filter(manifest_metadata)] + + @abstractmethod + def migrate(self, manifest) -> dict: + raise NotImplementedError("Implement me!") + + @staticmethod + def write_json_state_to_file(state, filename): + """Writes the current state to a file.""" + os.makedirs(os.path.dirname(filename), exist_ok=True) + with open(filename, 'w') as file: + json.dump(state, file, indent=2) + + @staticmethod + def write_state_to_file(state, filename): + """Writes the current state to a file.""" + os.makedirs(os.path.dirname(filename), exist_ok=True) + with open(filename, 'w') as file: + file.write(state) + + @staticmethod + def make_diff(portal_domain, old_state, new_state): + old_json = json.dumps(old_state, indent=2) + new_json = json.dumps(new_state, indent=2) + name = f"{portal_domain}: {old_state['environment']}/{old_state['name']}" + """Computes the difference between old and new states.""" + return ''.join( + difflib.unified_diff( + old_json.splitlines(True), + new_json.splitlines(True), + lineterm='\n', + tofile=f'[new] {name}', + fromfile=f'[old] {name}' + ) + ) + + # @time_async_function + async def fetch_full_manifest(self, manifest_metadata): + """Fetches the full manifest based on the metadata.""" + async with self.max_concurrent_fetches: + async with self.http_client.get(f"{self.portal_url}/api/manifest/{manifest_metadata['id']}") as response: + return await response.json() + + async def producer(self, queue, manifest_metadata_list): + tasks = [] + for metadata in manifest_metadata_list: + task = asyncio.create_task(self.fetch_and_queue(metadata, queue)) + tasks.append(task) + + await asyncio.gather(*tasks) + await queue.put(None) # Signal that production is done + + async def fetch_and_queue(self, metadata, queue): + full_manifest = await self.fetch_full_manifest(metadata) + await queue.put(full_manifest) + + async def run(self): + """Runs the migration process.""" + async with self as migration: + await migration.__run() + + async def __run(self): + """Runs the migration process and handles user input.""" + manifest_metadata_list = await self.fetch_manifest_metadata_list() + queue = asyncio.Queue(maxsize=50) + + _ = asyncio.create_task(self.producer(queue, manifest_metadata_list)) + consumer = asyncio.create_task(self.consumer(queue, self.skip_confirmation)) + + await asyncio.gather(consumer) + self.info(f"Diffs are available in the directory: {os.getcwd()}/{self.migration_dir}") + self.info(f"Logs are available in the file: {os.getcwd()}/{self.log_filename}") + self.info("Migration completed.") + + async def consumer(self, queue, skip_confirmation): + apply_all = skip_confirmation + skip_all = False + + while True: + if skip_all: + break + manifest = await queue.get() + if manifest is None: + # Queue is empty and the producer has finished + break + + if 'error' in manifest: + self.info(f"Failed to fetch manifest:") + self.info(manifest) + continue + + self.info(f"Processing manifest: {manifest['environment']}/{manifest['name']}") + + old_filename = self.migration_dir + f"/manifests/{manifest['environment']}/{manifest['name']}_old.json" + new_filename = self.migration_dir + f"/manifests/{manifest['environment']}/{manifest['name']}_new.json" + + self.write_json_state_to_file(manifest, old_filename) + try: + new_manifest = self.migrate(copy.deepcopy(manifest)) + except Exception as e: + self.info(f"Failed to mutate manifest: {manifest['environment']}/{manifest['name']}") + self.info(traceback.format_exc()) + continue + self.write_json_state_to_file(new_manifest, new_filename) + + diff = self.make_diff(self.portal_domain, manifest, new_manifest) + + if diff == "": + continue + + diff_file_name = (self.migration_dir + + f"/diffs/{self.portal_domain}/{manifest['environment']}/{manifest['name']}.diff") + self.write_state_to_file(diff, diff_file_name) + + self.info(f"Diff for {manifest['environment']}/{manifest['name']}:") + self.info( + logger_msg=diff, + console_msg=Syntax(diff, "diff", theme="ansi_dark"), + ) + + if not apply_all: + choice = await self.get_user_choice() + if choice == 'a': + self.info("Applying all remaining changes.") + apply_all = True + elif choice == 'l': + self.info("Skipping all remaining changes.") + skip_all = True + continue + elif choice == 'n': + self.info("Skipping this change.") + continue + + if self.dry_run: + self.info("Dry run enabled. Not sending the change to the portal.") + continue + + await self.post_manifest(new_manifest) + + async def get_user_choice(self): + valid_choices = ['y', 'a', 'n', 'l'] + while True: + prompt_ = "Apply this change? [Y] Yes [A] Yes to All [N] No [L] No to All: " + choice = await aioconsole.ainput(prompt_) + choice = choice.lstrip().rstrip().lower() + self.info(logger_msg=f"{prompt_}{choice}") + if choice in valid_choices: + return choice + self.info("Invalid choice. Please enter a valid choice. Apply this change? " + "[Y] Yes [A] Yes to All [N] No [L] No to All: ") + + async def post_manifest(self, manifest): + async with self.post_rate_limiter: + async with self.http_client.post(f"{self.portal_url}/api/manifest", json=manifest) as response: + if not (200 <= response.status < 300): + self.logger.error(f"Failed to post manifest: {manifest['name']}") + self.logger.error(f"Error: {await response.text()}") + else: + self.info(f"Successfully posted manifest: {manifest['environment']}/{manifest['name']}") + return response.status diff --git a/scripts/_migration_framework/example.py b/scripts/_migration_framework/example.py new file mode 100644 index 00000000..984b1e06 --- /dev/null +++ b/scripts/_migration_framework/example.py @@ -0,0 +1,40 @@ +import asyncio + +from abstract import AbstractMigration + + +class CustomMigration(AbstractMigration): + def filter(self, manifest_metadata: dict) -> bool: + return manifest_metadata['environment'] == "qa" + + def migrate(self, manifest: dict) -> dict: + if 'type' in manifest and manifest['type'] == 'deployment': + return manifest + + (manifest.setdefault('flink', {}) + .setdefault('flinkDeployment', {}) + .setdefault('flinkConfiguration', {}) + .setdefault('flinkVersion', "v1_17")) + + try: + del (manifest['flink']['flinkDeployment']['taskManager']['replicas']) + except KeyError: + pass + + manifest['flink']['flinkDeployment']['flinkConfiguration']['flinkVersion'] = "v1_17" + + return manifest + + +async def main(): + migration = CustomMigration( + "https://deployment-portal-backend.np.navi-tech.in", + "", + # Dry run is enabled by default. Uncomment this line to disable dry run + # dry_run=False + ) + await migration.run() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/scripts/_migration_framework/requirements.txt b/scripts/_migration_framework/requirements.txt new file mode 100644 index 00000000..a2bb447d --- /dev/null +++ b/scripts/_migration_framework/requirements.txt @@ -0,0 +1,18 @@ +aioconsole==0.8.0 +aiohappyeyeballs==2.4.3 +aiohttp==3.10.10 +aiolimiter==1.1.0 +aiosignal==1.3.1 +async-timeout==4.0.3 +asyncio==3.4.3 +attrs==24.2.0 +frozenlist==1.4.1 +idna==3.10 +markdown-it-py==3.0.0 +mdurl==0.1.2 +multidict==6.1.0 +propcache==0.2.0 +Pygments==2.18.0 +rich==13.9.2 +typing_extensions==4.12.2 +yarl==1.15.4