From 666089458f7b9875e90981207db868b214d2d4a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cnishant-sharma=E2=80=9D?= Date: Wed, 12 May 2021 15:26:13 +0530 Subject: [PATCH] add schema support --- producer/get_schema.go | 29 +++++++++++++++++++++++++++++ server/handlers.go | 28 +++++++++++++++++++++++++++- server/server.go | 1 + 3 files changed, 57 insertions(+), 1 deletion(-) diff --git a/producer/get_schema.go b/producer/get_schema.go index 4fa7c9a..8f83c6e 100644 --- a/producer/get_schema.go +++ b/producer/get_schema.go @@ -3,6 +3,7 @@ package producer import ( "log" + "fmt" "github.com/riferrei/srclient" ) @@ -29,5 +30,33 @@ func GetSchemaVersions() { } log.Println(SchemaVersionMap) +} + +func AddSchema(topic string, schemaType string, schema string) error { + + schemaRegistryClient := srclient.CreateSchemaRegistryClient(SchemaRegistryEndpoint) + schemaRegistryClient.CodecCreationEnabled(false) + existingSchema, _ := schemaRegistryClient.GetLatestSchema(topic, false) + fmt.Println("EXISTINGSCHEMA: ") + fmt.Println(existingSchema) + + if existingSchema != nil { + compatible, errorCompatible := schemaRegistryClient.IsSchemaCompatible(topic, schema, "latest", srclient.SchemaType(schemaType), false) + if errorCompatible != nil { + return errorCompatible + } else if (compatible == false) { + return fmt.Errorf("given schema not compatible with the latest version") + } + } + + _, errroCreate := schemaRegistryClient.CreateSchema(topic, schema, srclient.SchemaType(schemaType), false) + // fmt.Println("SCHEMA_ID: ") + // fmt.Println(schemaObj.ID()) + + if errroCreate != nil { + return errroCreate + } else { + return nil + } } \ No newline at end of file diff --git a/server/handlers.go b/server/handlers.go index a9ba194..e20eda2 100644 --- a/server/handlers.go +++ b/server/handlers.go @@ -5,7 +5,7 @@ import ( producer "com.navi.medici.janus/producer" "io" - // "fmt" + "fmt" // "log" "io/ioutil" "net/http" @@ -18,6 +18,13 @@ var ( ) +type NewSchemaRequest struct { + Topic string `json:"topic"` + Schema string `json:"schema"` + SchemaType string `json:"schema_type"` +} + + func eventsHandler(w http.ResponseWriter, r *http.Request) { var reader io.Reader @@ -115,3 +122,22 @@ func refreshSchemaHandler(w http.ResponseWriter, r *http.Request) { producer.GetSchemaVersions() io.WriteString(w, "Updated Schema Map") } + + +func addSchemaHandler(w http.ResponseWriter, r *http.Request) { + body, _ := ioutil.ReadAll(r.Body) + + var bodyJson NewSchemaRequest + error := json.Unmarshal(body, &bodyJson) + fmt.Println(error) + + errorSchema := producer.AddSchema(bodyJson.Topic, bodyJson.SchemaType, bodyJson.Schema) + fmt.Println("errorSchema: ") + fmt.Println(error) + if errorSchema != nil { + http.Error(w, fmt.Sprintf("Error creating the schema %s", errorSchema), http.StatusBadRequest) + } else { + io.WriteString(w, "added") + } + +} diff --git a/server/server.go b/server/server.go index 33ab2ff..c0d37f0 100644 --- a/server/server.go +++ b/server/server.go @@ -36,6 +36,7 @@ func NewServer(port string) (*Server, error) { router.HandleFunc("/health/toggle", healthToggleHandler).Methods("GET") router.HandleFunc("/schema/get", getSchemaHandler).Methods("GET") router.HandleFunc("/schema/refresh", refreshSchemaHandler).Methods("POST") + router.HandleFunc("/schema/add", addSchemaHandler).Methods("POST") // router.HandleFunc("/test", testHandler).Methods("GET") // router.HandleFunc("/stop", stopHandler).Methods("POST")