diff --git a/main.go b/main.go index 513b0b7..a0386b1 100644 --- a/main.go +++ b/main.go @@ -48,8 +48,11 @@ func init() { log.Printf(configuration.Kafka.Bootstrap_Servers) log.Printf(configuration.SchemaRegistry.Endpoint) + producer_module.SchemaRegistryEndpoint = configuration.SchemaRegistry.Endpoint + producer_module.TopicList = strings.Split(configuration.SchemaRegistry.Topics, ",") + // initialize schema version map which contains latest schema version for topic(s) - producer_module.GetSchemaVersions(configuration.SchemaRegistry.Endpoint, strings.Split(configuration.SchemaRegistry.Topics, ",")) + producer_module.GetSchemaVersions() // initialize producers producer_module.InitializeProducers(configuration.Kafka) diff --git a/producer/get_schema.go b/producer/get_schema.go index cbe9b2e..4fa7c9a 100644 --- a/producer/get_schema.go +++ b/producer/get_schema.go @@ -8,15 +8,17 @@ import ( var ( + SchemaRegistryEndpoint string + TopicList []string SchemaVersionMap = make(map[string]int) ) -func GetSchemaVersions(schemaRegistryEndpoint string, topicList []string) { +func GetSchemaVersions() { - schemaRegistryClient := srclient.CreateSchemaRegistryClient(schemaRegistryEndpoint) + schemaRegistryClient := srclient.CreateSchemaRegistryClient(SchemaRegistryEndpoint) schemaRegistryClient.CodecCreationEnabled(false) - for _, topic := range topicList { + for _, topic := range TopicList { schema, err := schemaRegistryClient.GetLatestSchema(topic, false) if err != nil { log.Println(err) diff --git a/server/handlers.go b/server/handlers.go index d89bc57..a9ba194 100644 --- a/server/handlers.go +++ b/server/handlers.go @@ -108,4 +108,10 @@ func getSchemaHandler(w http.ResponseWriter, r *http.Request) { schemaMapJson, _ := json.Marshal(producer.SchemaVersionMap) w.Header().Set("Content-Type", "application/json") w.Write(schemaMapJson) -} \ No newline at end of file +} + + +func refreshSchemaHandler(w http.ResponseWriter, r *http.Request) { + producer.GetSchemaVersions() + io.WriteString(w, "Updated Schema Map") +} diff --git a/server/server.go b/server/server.go index f9aadb0..33ab2ff 100644 --- a/server/server.go +++ b/server/server.go @@ -35,8 +35,8 @@ func NewServer(port string) (*Server, error) { router.HandleFunc("/health", healthHandler).Methods("GET") router.HandleFunc("/health/toggle", healthToggleHandler).Methods("GET") router.HandleFunc("/schema/get", getSchemaHandler).Methods("GET") + router.HandleFunc("/schema/refresh", refreshSchemaHandler).Methods("POST") // router.HandleFunc("/test", testHandler).Methods("GET") - // router.HandleFunc("/schema/refresh", schemaRefreshHandler).Methods("POST") // router.HandleFunc("/stop", stopHandler).Methods("POST") httpServer := &http.Server{Addr: ":" + port, Handler: router}