42 lines
1.2 KiB
Go
42 lines
1.2 KiB
Go
package schema
|
|
|
|
import (
|
|
"com.navi.medici.janus/config"
|
|
"com.navi.medici.janus/utils"
|
|
"fmt"
|
|
"github.com/riferrei/srclient"
|
|
"go.uber.org/zap"
|
|
"strings"
|
|
)
|
|
|
|
var (
|
|
SchemaRegistryEndpoint string
|
|
TopicList []string
|
|
SchemaVersionMap = make(map[string]int)
|
|
logger *zap.Logger
|
|
)
|
|
|
|
func InitializeSchemaHandler(configuration config.Configurations) {
|
|
logger = utils.GetLogger()
|
|
SchemaRegistryEndpoint = configuration.SchemaRegistry.Endpoint
|
|
TopicList = strings.Split(configuration.SchemaRegistry.Topics, ",")
|
|
// initialize schema version map which contains latest schema version for topic(s)
|
|
GetSchemaVersions()
|
|
}
|
|
|
|
func GetSchemaVersions() {
|
|
|
|
schemaRegistryClient := srclient.CreateSchemaRegistryClient(SchemaRegistryEndpoint)
|
|
schemaRegistryClient.CodecCreationEnabled(false)
|
|
for _, topic := range TopicList {
|
|
schema, err := schemaRegistryClient.GetLatestSchema(topic, false)
|
|
if err != nil {
|
|
logger.Error("error in fetching latest schema", zap.Error(err))
|
|
} else {
|
|
schemaId := schema.ID()
|
|
SchemaVersionMap[topic] = schemaId
|
|
}
|
|
}
|
|
logger.Debug("Schema Version Map", zap.String("", fmt.Sprintf("%v", SchemaVersionMap)))
|
|
}
|