package schema import ( "com.navi.medici.janus/config" "fmt" "github.com/riferrei/srclient" "log" "strings" ) var ( SchemaRegistryEndpoint string TopicList []string SchemaVersionMap = make(map[string]int) ) func InitializeSchemaHandler(configuration config.Configurations) { SchemaRegistryEndpoint = configuration.SchemaRegistry.Endpoint TopicList = strings.Split(configuration.SchemaRegistry.Topics, ",") // initialize schema version map which contains latest schema version for topic(s) GetSchemaVersions() } func GetSchemaVersions() { schemaRegistryClient := srclient.CreateSchemaRegistryClient(SchemaRegistryEndpoint) schemaRegistryClient.CodecCreationEnabled(false) for _, topic := range TopicList { schema, err := schemaRegistryClient.GetLatestSchema(topic, false) if err != nil { log.Println(err) } else { schemaId := schema.ID() SchemaVersionMap[topic] = schemaId } } log.Println(SchemaVersionMap) } func AddSchema(topic string, schemaType string, schema string) error { schemaRegistryClient := srclient.CreateSchemaRegistryClient(SchemaRegistryEndpoint) schemaRegistryClient.CodecCreationEnabled(false) existingSchema, _ := schemaRegistryClient.GetLatestSchema(topic, false) fmt.Println("EXISTINGSCHEMA: ") fmt.Println(existingSchema) if existingSchema != nil { compatible, errorCompatible := schemaRegistryClient.IsSchemaCompatible(topic, schema, "latest", srclient.SchemaType(schemaType), false) if errorCompatible != nil { return errorCompatible } else if compatible == false { return fmt.Errorf("given schema not compatible with the latest version") } } _, errroCreate := schemaRegistryClient.CreateSchema(topic, schema, srclient.SchemaType(schemaType), false) // fmt.Println("SCHEMA_ID: ") // fmt.Println(schemaObj.ID()) if errroCreate != nil { return errroCreate } else { return nil } }