schema refresh endpoint

This commit is contained in:
“nishant-sharma”
2021-05-07 20:35:35 +05:30
parent f911918ed1
commit f680bc4017
4 changed files with 17 additions and 6 deletions

View File

@@ -48,8 +48,11 @@ func init() {
log.Printf(configuration.Kafka.Bootstrap_Servers)
log.Printf(configuration.SchemaRegistry.Endpoint)
producer_module.SchemaRegistryEndpoint = configuration.SchemaRegistry.Endpoint
producer_module.TopicList = strings.Split(configuration.SchemaRegistry.Topics, ",")
// initialize schema version map which contains latest schema version for topic(s)
producer_module.GetSchemaVersions(configuration.SchemaRegistry.Endpoint, strings.Split(configuration.SchemaRegistry.Topics, ","))
producer_module.GetSchemaVersions()
// initialize producers
producer_module.InitializeProducers(configuration.Kafka)

View File

@@ -8,15 +8,17 @@ import (
var (
SchemaRegistryEndpoint string
TopicList []string
SchemaVersionMap = make(map[string]int)
)
func GetSchemaVersions(schemaRegistryEndpoint string, topicList []string) {
func GetSchemaVersions() {
schemaRegistryClient := srclient.CreateSchemaRegistryClient(schemaRegistryEndpoint)
schemaRegistryClient := srclient.CreateSchemaRegistryClient(SchemaRegistryEndpoint)
schemaRegistryClient.CodecCreationEnabled(false)
for _, topic := range topicList {
for _, topic := range TopicList {
schema, err := schemaRegistryClient.GetLatestSchema(topic, false)
if err != nil {
log.Println(err)

View File

@@ -108,4 +108,10 @@ func getSchemaHandler(w http.ResponseWriter, r *http.Request) {
schemaMapJson, _ := json.Marshal(producer.SchemaVersionMap)
w.Header().Set("Content-Type", "application/json")
w.Write(schemaMapJson)
}
}
func refreshSchemaHandler(w http.ResponseWriter, r *http.Request) {
producer.GetSchemaVersions()
io.WriteString(w, "Updated Schema Map")
}

View File

@@ -35,8 +35,8 @@ func NewServer(port string) (*Server, error) {
router.HandleFunc("/health", healthHandler).Methods("GET")
router.HandleFunc("/health/toggle", healthToggleHandler).Methods("GET")
router.HandleFunc("/schema/get", getSchemaHandler).Methods("GET")
router.HandleFunc("/schema/refresh", refreshSchemaHandler).Methods("POST")
// router.HandleFunc("/test", testHandler).Methods("GET")
// router.HandleFunc("/schema/refresh", schemaRefreshHandler).Methods("POST")
// router.HandleFunc("/stop", stopHandler).Methods("POST")
httpServer := &http.Server{Addr: ":" + port, Handler: router}