* INFRA-3931 | Ashvin | Remove taskmanager replica from jsonschema backend validation * INFRA-3931 | Ashvin | Update tests to accommodate for the new flinkVersion field * INFRA-3931 | Ashvin | Add controller level validation on taskManagerSlots and flinkVersion Both are now required fields. * INFRA-3930 | Ashvin | Update kutegen and add migration script * INFRA-3931 | Ashvin | Remove replicas field from test files This field was deleted long back as it was redundant. See this https://github.com/navi-infra/kutegen/pull/190 Please enter the commit message for your changes. Lines starting
177 lines
6.6 KiB
Python
177 lines
6.6 KiB
Python
import asyncio
|
|
import json
|
|
import logging
|
|
|
|
import aiohttp
|
|
import requests
|
|
from aiolimiter import AsyncLimiter
|
|
|
|
# INFRA-3930
|
|
# This script add default versions for existing flink manifests.
|
|
# This script also removes the redundant taskmanager replica field.
|
|
|
|
# List all deployment portal using this command
|
|
# kubectl get ingress -A -l Name=deployment-portal-backend -o json |
|
|
# jq '.items[].spec.rules[0].host'
|
|
#
|
|
# Get tokens of all portals using navicli command
|
|
# find ~/.navicli -name "*portal" -print0 -exec cat {} \; -exec echo "" \; |
|
|
# sed 's/portal/portal /'
|
|
portals_with_tokens = [
|
|
["https://deployment-portal-backend.cmd.navi-tech.in", "<token>",
|
|
[1971, 2144, 2156, 1970, 2177, 1559, 1558]],
|
|
# ["https://sa-deployment-portal-backend.cmd.navi-tech.in",
|
|
# "<token>",
|
|
# [943, 1155, 1059, 786, 913, 1102, 914, 993, 789, 1135, 863, 874, 876, 935, 880, 936, 867, 899,
|
|
# 856, 875, 963, 886, 1163, 870, 1145, 1169, 919, 996, 1146, 926, 1129, 916, 1001, 1037, 974,
|
|
# 1133, 892, 657, 1311, 659, 994, 1042, 1035, 1041, 995, 958, 1179, 1038, 1048, 1247, 1049,
|
|
# 1159, 1248, 1018, 1014, 1005, 1194, 972, 1024, 1040, 1142, 837, 1027, 1161, 978, 1187, 1122,
|
|
# 1188, 1100, 827, 833, 1301, 699, 683, 1197, 1125, 1217, 698, 1252, 1303, 680, 740, 1182, 781,
|
|
# 1025, 1233]],
|
|
]
|
|
|
|
# Set up basic configuration for logging
|
|
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
|
|
logger = logging.getLogger(__name__)
|
|
|
|
s = requests.Session()
|
|
list_manifest_endpoint = "/api/manifest/list"
|
|
DRY_RUN = False
|
|
logger.info(f"DRY_RUN: {DRY_RUN}")
|
|
|
|
|
|
def remove_taskmanager_replica(manifest: dict) -> bool:
|
|
if "flink" not in manifest:
|
|
return False
|
|
|
|
if "flinkDeployment" not in manifest["flink"]:
|
|
return False
|
|
|
|
if "taskManager" not in manifest["flink"]["flinkDeployment"]:
|
|
return False
|
|
|
|
if "replicas" in manifest["flink"]["flinkDeployment"]["taskManager"]:
|
|
del manifest["flink"]["flinkDeployment"]["taskManager"]["replicas"]
|
|
if DRY_RUN:
|
|
logger.info(f"DRY_RUN: Removing taskmanager replica from manifest: "
|
|
f"{manifest['id']} {manifest['environment']}/{manifest['name']}")
|
|
return False
|
|
return True
|
|
return False
|
|
|
|
|
|
def add_default_flink_version(manifest: dict) -> bool:
|
|
if "flink" not in manifest:
|
|
manifest["flink"] = {}
|
|
|
|
if "flinkDeployment" not in manifest["flink"]:
|
|
manifest["flink"]["flinkDeployment"] = {}
|
|
|
|
if "taskManager" in manifest["flink"]["flinkDeployment"]:
|
|
if "replica" in manifest["flink"]["flinkDeployment"]["taskManager"]:
|
|
del manifest["flink"]["flinkDeployment"]["taskManager"]["replica"]
|
|
|
|
if "flinkConfiguration" not in manifest["flink"]["flinkDeployment"]:
|
|
manifest["flink"]["flinkDeployment"]["flinkConfiguration"] = {}
|
|
|
|
if "flinkVersion" not in manifest["flink"]["flinkDeployment"]["flinkConfiguration"] or \
|
|
manifest["flink"]["flinkDeployment"]["flinkConfiguration"]["flinkVersion"] == "":
|
|
manifest["flink"]["flinkDeployment"]["flinkConfiguration"]["flinkVersion"] = "v1_17"
|
|
if DRY_RUN:
|
|
logger.info(f"DRY_RUN: Adding default flink version to manifest: "
|
|
f"{manifest['id']} {manifest['environment']}/{manifest['name']}")
|
|
return False
|
|
return True
|
|
return False
|
|
|
|
|
|
async def main():
|
|
async with aiohttp.ClientSession() as session:
|
|
for portal in portals_with_tokens:
|
|
await process_portal(session, portal)
|
|
|
|
|
|
async def fetch_manifest(session, portal, _id, headers):
|
|
async with rate_limit:
|
|
logger.debug(f"Fetching manifest: {_id}")
|
|
async with session.get(f"{portal[0]}/api/manifest/{_id}", headers=headers) as response:
|
|
logger.debug(response.status)
|
|
return await response.json()
|
|
|
|
|
|
async def process_flink_manifest(session, portal, manifest, headers):
|
|
logger.debug(
|
|
f"Processing manifest: {manifest['id']} {manifest['environment']}/{manifest['name']}")
|
|
write_to_file(f"{manifest['environment']}_{manifest['name']}_old", manifest)
|
|
do_post1 = add_default_flink_version(manifest)
|
|
# do_post2 = True or remove_taskmanager_replica(manifest)
|
|
write_to_file(f"{manifest['environment']}_{manifest['name']}_new", manifest)
|
|
logger.info(f"Wrote manifest to file: {manifest['environment']}/{manifest['name']}")
|
|
|
|
if do_post1:
|
|
async with session.post(f"{portal[0]}/api/manifest", json=manifest,
|
|
headers=headers) as response:
|
|
if response.status < 200 or response.status >= 300:
|
|
logger.error(f"Failed to update manifest: {manifest['id']}")
|
|
logger.error(await response.json())
|
|
else:
|
|
logger.info(
|
|
f"Updated manifest: {manifest['id']} "
|
|
f"{manifest['environment']}/{manifest['name']}")
|
|
await asyncio.sleep(1)
|
|
|
|
|
|
rate_limit = AsyncLimiter(1, 1)
|
|
|
|
|
|
async def process_portal(session, portal):
|
|
logger.info(f"Processing portal: {portal[0]}")
|
|
headers = {"X_AUTH_TOKEN": portal[1]}
|
|
# async with session.get(f"{portal[0]}{list_manifest_endpoint}", headers=headers) as response:
|
|
# manifest_list = await response.json()
|
|
#
|
|
# logger.info("Fetching total manifests: " + str(len(manifest_list)))
|
|
|
|
flink_manifests = []
|
|
non_typed_manifests = []
|
|
|
|
tasks = []
|
|
for _id in portal[2]:
|
|
tasks.append(fetch_manifest(session, portal, _id, headers))
|
|
|
|
pending_tasks = [asyncio.create_task(task) for task in tasks]
|
|
|
|
for task in asyncio.as_completed(pending_tasks):
|
|
manifest = await task
|
|
if "status" in manifest and manifest["status"] == 500:
|
|
continue
|
|
|
|
if "type" not in manifest:
|
|
type = "flink" if "flink" in manifest else "deployment" if "deployment" in manifest else "unknown"
|
|
logger.error(manifest)
|
|
logger.error(
|
|
f"Manifest {manifest['id']} {manifest['environment']}/{manifest['name']} "
|
|
f"does not have type field. Actual field {type}")
|
|
non_typed_manifests.append(manifest)
|
|
elif manifest["type"] == "flink":
|
|
flink_manifests.append(manifest)
|
|
|
|
if len(non_typed_manifests) > 0:
|
|
logger.error("Error: Found manifests without type field")
|
|
return
|
|
|
|
update_tasks = []
|
|
for manifest in flink_manifests:
|
|
update_tasks.append(process_flink_manifest(session, portal, manifest, headers))
|
|
|
|
await asyncio.gather(*update_tasks)
|
|
|
|
|
|
def write_to_file(filename, manifest):
|
|
with open(f"out/{filename}", "w") as file_:
|
|
json.dump(manifest, file_, indent=" ")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|