get schema versions
This commit is contained in:
@@ -4,6 +4,7 @@ package config
|
||||
type Configurations struct {
|
||||
Server ServerConfigurations
|
||||
Kafka KafkaConfigurations
|
||||
SchemaRegistry SchemaRegistryConfigurations
|
||||
}
|
||||
|
||||
|
||||
@@ -22,3 +23,8 @@ type KafkaConfigurations struct {
|
||||
// SASL_Mechanism string
|
||||
// Security_Protocol string
|
||||
}
|
||||
|
||||
type SchemaRegistryConfigurations struct {
|
||||
Endpoint string
|
||||
Topics string
|
||||
}
|
||||
|
||||
@@ -10,3 +10,7 @@ kafka:
|
||||
retry_backoff_ms: 500
|
||||
sasl_user: SASL_USER # read from environment variable
|
||||
sasl_password: SASL_PASSWORD # read from environment variable
|
||||
|
||||
schemaRegistry:
|
||||
endpoint: SCHEMA_REGISTRY_ENDPOINT
|
||||
topics: TOPICS
|
||||
|
||||
12
main.go
12
main.go
@@ -3,11 +3,12 @@ package main
|
||||
|
||||
import (
|
||||
config "com.navi.medici.janus/config"
|
||||
// producer_module "com.navi.medici.janus/producer"
|
||||
producer_module "com.navi.medici.janus/producer"
|
||||
server "com.navi.medici.janus/server"
|
||||
|
||||
"fmt"
|
||||
"log"
|
||||
"strings"
|
||||
"github.com/spf13/viper"
|
||||
)
|
||||
|
||||
@@ -33,13 +34,20 @@ func init() {
|
||||
fmt.Printf("Unable to decode into struct, %v", err)
|
||||
}
|
||||
|
||||
|
||||
configuration.Kafka.Bootstrap_Servers = viper.GetString(configuration.Kafka.Bootstrap_Servers)
|
||||
configuration.Kafka.Sasl_User = viper.GetString(configuration.Kafka.Sasl_User)
|
||||
configuration.Kafka.Sasl_Password = viper.GetString(configuration.Kafka.Sasl_Password)
|
||||
configuration.SchemaRegistry.Endpoint = viper.GetString(configuration.SchemaRegistry.Endpoint)
|
||||
configuration.SchemaRegistry.Topics = viper.GetString(configuration.SchemaRegistry.Topics)
|
||||
|
||||
port = configuration.Server.Port
|
||||
log.Printf("PORT IS: ", port)
|
||||
log.Printf(configuration.Kafka.Bootstrap_Servers)
|
||||
log.Printf(configuration.Kafka.Sasl_User)
|
||||
log.Printf(configuration.Kafka.Sasl_Password)
|
||||
log.Printf(configuration.SchemaRegistry.Endpoint)
|
||||
|
||||
producer_module.GetSchemaVersions(configuration.SchemaRegistry.Endpoint, strings.Split(configuration.SchemaRegistry.Topics, ","))
|
||||
|
||||
// sync producer using sarama
|
||||
// sync_producer := producer_module.GetSyncProducer(configuration.Kafka)
|
||||
|
||||
32
producer/get_schema.go
Normal file
32
producer/get_schema.go
Normal file
@@ -0,0 +1,32 @@
|
||||
package producer
|
||||
|
||||
|
||||
import (
|
||||
"log"
|
||||
"reflect"
|
||||
"github.com/riferrei/srclient"
|
||||
)
|
||||
|
||||
|
||||
var (
|
||||
schemaVersionMap = make(map[string]int)
|
||||
)
|
||||
|
||||
|
||||
func GetSchemaVersions(schemaRegistryEndpoint string, topicList []string) {
|
||||
|
||||
schemaRegistryClient := srclient.CreateSchemaRegistryClient(schemaRegistryEndpoint)
|
||||
schemaRegistryClient.CodecCreationEnabled(false)
|
||||
for _, topic := range topicList {
|
||||
schema, err := schemaRegistryClient.GetLatestSchema(topic, false)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
} else {
|
||||
schemaId := schema.ID()
|
||||
schemaVersionMap[topic] = schemaId
|
||||
}
|
||||
}
|
||||
|
||||
log.Println(schemaVersionMap)
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user