Files
janus/schema/schema_util.go

68 lines
1.9 KiB
Go

package schema
import (
"com.navi.medici.janus/config"
"fmt"
"github.com/riferrei/srclient"
"log"
"strings"
)
var (
SchemaRegistryEndpoint string
TopicList []string
SchemaVersionMap = make(map[string]int)
)
func InitializeSchemaHandler(configuration config.Configurations) {
SchemaRegistryEndpoint = configuration.SchemaRegistry.Endpoint
TopicList = strings.Split(configuration.SchemaRegistry.Topics, ",")
// initialize schema version map which contains latest schema version for topic(s)
GetSchemaVersions()
}
func GetSchemaVersions() {
schemaRegistryClient := srclient.CreateSchemaRegistryClient(SchemaRegistryEndpoint)
schemaRegistryClient.CodecCreationEnabled(false)
for _, topic := range TopicList {
schema, err := schemaRegistryClient.GetLatestSchema(topic, false)
if err != nil {
log.Println(err)
} else {
schemaId := schema.ID()
SchemaVersionMap[topic] = schemaId
}
}
log.Println(SchemaVersionMap)
}
func AddSchema(topic string, schemaType string, schema string) error {
schemaRegistryClient := srclient.CreateSchemaRegistryClient(SchemaRegistryEndpoint)
schemaRegistryClient.CodecCreationEnabled(false)
existingSchema, _ := schemaRegistryClient.GetLatestSchema(topic, false)
fmt.Println("EXISTINGSCHEMA: ")
fmt.Println(existingSchema)
if existingSchema != nil {
compatible, errorCompatible := schemaRegistryClient.IsSchemaCompatible(topic, schema, "latest", srclient.SchemaType(schemaType), false)
if errorCompatible != nil {
return errorCompatible
} else if compatible == false {
return fmt.Errorf("given schema not compatible with the latest version")
}
}
_, errroCreate := schemaRegistryClient.CreateSchema(topic, schema, srclient.SchemaType(schemaType), false)
// fmt.Println("SCHEMA_ID: ")
// fmt.Println(schemaObj.ID())
if errroCreate != nil {
return errroCreate
} else {
return nil
}
}