INFRA-3931 | Ashvin | Add support for multiple versions in Flink (#1235)

* INFRA-3931 | Ashvin | Remove taskmanager replica from jsonschema backend validation

* INFRA-3931 | Ashvin | Update tests to accommodate for the new flinkVersion field

* INFRA-3931 | Ashvin | Add controller level validation on taskManagerSlots and flinkVersion

Both are now required fields.

* INFRA-3930 | Ashvin | Update kutegen and add migration script

* INFRA-3931 | Ashvin | Remove replicas field from test files

This field was deleted long back as it was redundant. See this https://github.com/navi-infra/kutegen/pull/190
Please enter the commit message for your changes. Lines starting
This commit is contained in:
Ashvin S
2024-10-18 12:46:08 +05:30
committed by GitHub
parent aca0a32adc
commit 2f37cde3e5
7 changed files with 206 additions and 16 deletions

Submodule kutegen updated: 7aa6d277cc...e5707f4a93

View File

@@ -0,0 +1,176 @@
import asyncio
import json
import logging
import aiohttp
import requests
from aiolimiter import AsyncLimiter
# INFRA-3930
# This script add default versions for existing flink manifests.
# This script also removes the redundant taskmanager replica field.
# List all deployment portal using this command
# kubectl get ingress -A -l Name=deployment-portal-backend -o json |
# jq '.items[].spec.rules[0].host'
#
# Get tokens of all portals using navicli command
# find ~/.navicli -name "*portal" -print0 -exec cat {} \; -exec echo "" \; |
# sed 's/portal/portal /'
portals_with_tokens = [
["https://deployment-portal-backend.cmd.navi-tech.in", "<token>",
[1971, 2144, 2156, 1970, 2177, 1559, 1558]],
# ["https://sa-deployment-portal-backend.cmd.navi-tech.in",
# "<token>",
# [943, 1155, 1059, 786, 913, 1102, 914, 993, 789, 1135, 863, 874, 876, 935, 880, 936, 867, 899,
# 856, 875, 963, 886, 1163, 870, 1145, 1169, 919, 996, 1146, 926, 1129, 916, 1001, 1037, 974,
# 1133, 892, 657, 1311, 659, 994, 1042, 1035, 1041, 995, 958, 1179, 1038, 1048, 1247, 1049,
# 1159, 1248, 1018, 1014, 1005, 1194, 972, 1024, 1040, 1142, 837, 1027, 1161, 978, 1187, 1122,
# 1188, 1100, 827, 833, 1301, 699, 683, 1197, 1125, 1217, 698, 1252, 1303, 680, 740, 1182, 781,
# 1025, 1233]],
]
# Set up basic configuration for logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
s = requests.Session()
list_manifest_endpoint = "/api/manifest/list"
DRY_RUN = False
logger.info(f"DRY_RUN: {DRY_RUN}")
def remove_taskmanager_replica(manifest: dict) -> bool:
if "flink" not in manifest:
return False
if "flinkDeployment" not in manifest["flink"]:
return False
if "taskManager" not in manifest["flink"]["flinkDeployment"]:
return False
if "replicas" in manifest["flink"]["flinkDeployment"]["taskManager"]:
del manifest["flink"]["flinkDeployment"]["taskManager"]["replicas"]
if DRY_RUN:
logger.info(f"DRY_RUN: Removing taskmanager replica from manifest: "
f"{manifest['id']} {manifest['environment']}/{manifest['name']}")
return False
return True
return False
def add_default_flink_version(manifest: dict) -> bool:
if "flink" not in manifest:
manifest["flink"] = {}
if "flinkDeployment" not in manifest["flink"]:
manifest["flink"]["flinkDeployment"] = {}
if "taskManager" in manifest["flink"]["flinkDeployment"]:
if "replica" in manifest["flink"]["flinkDeployment"]["taskManager"]:
del manifest["flink"]["flinkDeployment"]["taskManager"]["replica"]
if "flinkConfiguration" not in manifest["flink"]["flinkDeployment"]:
manifest["flink"]["flinkDeployment"]["flinkConfiguration"] = {}
if "flinkVersion" not in manifest["flink"]["flinkDeployment"]["flinkConfiguration"] or \
manifest["flink"]["flinkDeployment"]["flinkConfiguration"]["flinkVersion"] == "":
manifest["flink"]["flinkDeployment"]["flinkConfiguration"]["flinkVersion"] = "v1_17"
if DRY_RUN:
logger.info(f"DRY_RUN: Adding default flink version to manifest: "
f"{manifest['id']} {manifest['environment']}/{manifest['name']}")
return False
return True
return False
async def main():
async with aiohttp.ClientSession() as session:
for portal in portals_with_tokens:
await process_portal(session, portal)
async def fetch_manifest(session, portal, _id, headers):
async with rate_limit:
logger.debug(f"Fetching manifest: {_id}")
async with session.get(f"{portal[0]}/api/manifest/{_id}", headers=headers) as response:
logger.debug(response.status)
return await response.json()
async def process_flink_manifest(session, portal, manifest, headers):
logger.debug(
f"Processing manifest: {manifest['id']} {manifest['environment']}/{manifest['name']}")
write_to_file(f"{manifest['environment']}_{manifest['name']}_old", manifest)
do_post1 = add_default_flink_version(manifest)
# do_post2 = True or remove_taskmanager_replica(manifest)
write_to_file(f"{manifest['environment']}_{manifest['name']}_new", manifest)
logger.info(f"Wrote manifest to file: {manifest['environment']}/{manifest['name']}")
if do_post1:
async with session.post(f"{portal[0]}/api/manifest", json=manifest,
headers=headers) as response:
if response.status < 200 or response.status >= 300:
logger.error(f"Failed to update manifest: {manifest['id']}")
logger.error(await response.json())
else:
logger.info(
f"Updated manifest: {manifest['id']} "
f"{manifest['environment']}/{manifest['name']}")
await asyncio.sleep(1)
rate_limit = AsyncLimiter(1, 1)
async def process_portal(session, portal):
logger.info(f"Processing portal: {portal[0]}")
headers = {"X_AUTH_TOKEN": portal[1]}
# async with session.get(f"{portal[0]}{list_manifest_endpoint}", headers=headers) as response:
# manifest_list = await response.json()
#
# logger.info("Fetching total manifests: " + str(len(manifest_list)))
flink_manifests = []
non_typed_manifests = []
tasks = []
for _id in portal[2]:
tasks.append(fetch_manifest(session, portal, _id, headers))
pending_tasks = [asyncio.create_task(task) for task in tasks]
for task in asyncio.as_completed(pending_tasks):
manifest = await task
if "status" in manifest and manifest["status"] == 500:
continue
if "type" not in manifest:
type = "flink" if "flink" in manifest else "deployment" if "deployment" in manifest else "unknown"
logger.error(manifest)
logger.error(
f"Manifest {manifest['id']} {manifest['environment']}/{manifest['name']} "
f"does not have type field. Actual field {type}")
non_typed_manifests.append(manifest)
elif manifest["type"] == "flink":
flink_manifests.append(manifest)
if len(non_typed_manifests) > 0:
logger.error("Error: Found manifests without type field")
return
update_tasks = []
for manifest in flink_manifests:
update_tasks.append(process_flink_manifest(session, portal, manifest, headers))
await asyncio.gather(*update_tasks)
def write_to_file(filename, manifest):
with open(f"out/{filename}", "w") as file_:
json.dump(manifest, file_, indent=" ")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -10,15 +10,29 @@
"properties": {
"flinkConfiguration": {
"type": "object",
"required": [
"taskManagerSlots",
"flinkVersion"
],
"properties": {
"taskManagerSlots": {
"type": "number"
},
"flinkVersion": {
"type": "string",
"enum": [
"v1_17",
"v1_19"
]
}
}
},
"jobManager": {
"type": "object",
"required": ["replicas","resources"],
"required": [
"replicas",
"resources"
],
"properties": {
"replicas": {
"type": "number"
@@ -38,11 +52,11 @@
},
"taskManager": {
"type": "object",
"required": ["replicas","resources"],
"required": [
"replicas",
"resources"
],
"properties": {
"replicas": {
"type": "number"
},
"resources": {
"type": "object",
"properties": {
@@ -57,4 +71,4 @@
}
}
}
}
}

View File

@@ -85,13 +85,13 @@
}
},
"taskManager": {
"replicas": 2,
"resources": {
"cpu": 0.5,
"memory": "1024mb"
}
},
"flinkConfiguration": {
"flinkVersion": "v1_17",
"taskManagerSlots": 2,
"savepointFrequency": "1h"
}
@@ -108,4 +108,4 @@
"team": {
"name": "Infra"
}
}
}

View File

@@ -76,7 +76,6 @@
}
},
"taskManager": {
"replicas": 2,
"resources": {
"cpu": 0.5,
"memory": "1024mb"
@@ -85,7 +84,8 @@
"restartNonce": 0,
"flinkConfiguration": {
"taskManagerSlots": 2,
"savepointFrequency": "1d"
"savepointFrequency": "1d",
"flinkVersion": "v1_17"
}
},
"flinkJob": {

View File

@@ -63,7 +63,6 @@
}
},
"taskManager": {
"replicas": 2,
"resources": {
"cpu": 0.5,
"memory": "1024mb"
@@ -71,7 +70,8 @@
},
"flinkConfiguration": {
"taskManagerSlots": 2,
"savepointFrequency": "1h"
"savepointFrequency": "1h",
"flinkVersion": "v1_17"
}
},
"flinkJob": {
@@ -87,4 +87,4 @@
"team": {
"name": "Infra"
}
}
}

View File

@@ -51,7 +51,6 @@
},
"taskManager": {
"volumeSize": "10Gi",
"replicas": 2,
"resources": {
"cpu": 0.5,
"memory": "1024mb"
@@ -59,7 +58,8 @@
},
"flinkConfiguration": {
"taskManagerSlots": 2,
"savepointFrequency": "1h"
"savepointFrequency": "1h",
"flinkVersion": "v1_17"
}
},
"flinkJob": {