diff --git a/config/config.go b/config/config.go index 99b8a94..7d70d42 100644 --- a/config/config.go +++ b/config/config.go @@ -4,6 +4,7 @@ package config type Configurations struct { Server ServerConfigurations Kafka KafkaConfigurations + SchemaRegistry SchemaRegistryConfigurations } @@ -22,3 +23,8 @@ type KafkaConfigurations struct { // SASL_Mechanism string // Security_Protocol string } + +type SchemaRegistryConfigurations struct { + Endpoint string + Topics string +} diff --git a/config/config.yml b/config/config.yml index b414060..8af79f8 100644 --- a/config/config.yml +++ b/config/config.yml @@ -10,3 +10,7 @@ kafka: retry_backoff_ms: 500 sasl_user: SASL_USER # read from environment variable sasl_password: SASL_PASSWORD # read from environment variable + +schemaRegistry: + endpoint: SCHEMA_REGISTRY_ENDPOINT + topics: TOPICS diff --git a/main.go b/main.go index c08eaa6..02d9d3a 100644 --- a/main.go +++ b/main.go @@ -3,11 +3,12 @@ package main import ( config "com.navi.medici.janus/config" - // producer_module "com.navi.medici.janus/producer" + producer_module "com.navi.medici.janus/producer" server "com.navi.medici.janus/server" "fmt" "log" + "strings" "github.com/spf13/viper" ) @@ -33,13 +34,20 @@ func init() { fmt.Printf("Unable to decode into struct, %v", err) } - configuration.Kafka.Bootstrap_Servers = viper.GetString(configuration.Kafka.Bootstrap_Servers) configuration.Kafka.Sasl_User = viper.GetString(configuration.Kafka.Sasl_User) configuration.Kafka.Sasl_Password = viper.GetString(configuration.Kafka.Sasl_Password) + configuration.SchemaRegistry.Endpoint = viper.GetString(configuration.SchemaRegistry.Endpoint) + configuration.SchemaRegistry.Topics = viper.GetString(configuration.SchemaRegistry.Topics) port = configuration.Server.Port log.Printf("PORT IS: ", port) + log.Printf(configuration.Kafka.Bootstrap_Servers) + log.Printf(configuration.Kafka.Sasl_User) + log.Printf(configuration.Kafka.Sasl_Password) + log.Printf(configuration.SchemaRegistry.Endpoint) + + producer_module.GetSchemaVersions(configuration.SchemaRegistry.Endpoint, strings.Split(configuration.SchemaRegistry.Topics, ",")) // sync producer using sarama // sync_producer := producer_module.GetSyncProducer(configuration.Kafka) diff --git a/producer/get_schema.go b/producer/get_schema.go new file mode 100644 index 0000000..a14841a --- /dev/null +++ b/producer/get_schema.go @@ -0,0 +1,32 @@ +package producer + + +import ( + "log" + "reflect" + "github.com/riferrei/srclient" +) + + +var ( + schemaVersionMap = make(map[string]int) +) + + +func GetSchemaVersions(schemaRegistryEndpoint string, topicList []string) { + + schemaRegistryClient := srclient.CreateSchemaRegistryClient(schemaRegistryEndpoint) + schemaRegistryClient.CodecCreationEnabled(false) + for _, topic := range topicList { + schema, err := schemaRegistryClient.GetLatestSchema(topic, false) + if err != nil { + log.Println(err) + } else { + schemaId := schema.ID() + schemaVersionMap[topic] = schemaId + } + } + + log.Println(schemaVersionMap) + +} \ No newline at end of file