add schema support

This commit is contained in:
“nishant-sharma”
2021-05-12 15:26:13 +05:30
parent f680bc4017
commit 666089458f
3 changed files with 57 additions and 1 deletions

View File

@@ -3,6 +3,7 @@ package producer
import (
"log"
"fmt"
"github.com/riferrei/srclient"
)
@@ -29,5 +30,33 @@ func GetSchemaVersions() {
}
log.Println(SchemaVersionMap)
}
func AddSchema(topic string, schemaType string, schema string) error {
schemaRegistryClient := srclient.CreateSchemaRegistryClient(SchemaRegistryEndpoint)
schemaRegistryClient.CodecCreationEnabled(false)
existingSchema, _ := schemaRegistryClient.GetLatestSchema(topic, false)
fmt.Println("EXISTINGSCHEMA: ")
fmt.Println(existingSchema)
if existingSchema != nil {
compatible, errorCompatible := schemaRegistryClient.IsSchemaCompatible(topic, schema, "latest", srclient.SchemaType(schemaType), false)
if errorCompatible != nil {
return errorCompatible
} else if (compatible == false) {
return fmt.Errorf("given schema not compatible with the latest version")
}
}
_, errroCreate := schemaRegistryClient.CreateSchema(topic, schema, srclient.SchemaType(schemaType), false)
// fmt.Println("SCHEMA_ID: ")
// fmt.Println(schemaObj.ID())
if errroCreate != nil {
return errroCreate
} else {
return nil
}
}

View File

@@ -5,7 +5,7 @@ import (
producer "com.navi.medici.janus/producer"
"io"
// "fmt"
"fmt"
// "log"
"io/ioutil"
"net/http"
@@ -18,6 +18,13 @@ var (
)
type NewSchemaRequest struct {
Topic string `json:"topic"`
Schema string `json:"schema"`
SchemaType string `json:"schema_type"`
}
func eventsHandler(w http.ResponseWriter, r *http.Request) {
var reader io.Reader
@@ -115,3 +122,22 @@ func refreshSchemaHandler(w http.ResponseWriter, r *http.Request) {
producer.GetSchemaVersions()
io.WriteString(w, "Updated Schema Map")
}
func addSchemaHandler(w http.ResponseWriter, r *http.Request) {
body, _ := ioutil.ReadAll(r.Body)
var bodyJson NewSchemaRequest
error := json.Unmarshal(body, &bodyJson)
fmt.Println(error)
errorSchema := producer.AddSchema(bodyJson.Topic, bodyJson.SchemaType, bodyJson.Schema)
fmt.Println("errorSchema: ")
fmt.Println(error)
if errorSchema != nil {
http.Error(w, fmt.Sprintf("Error creating the schema %s", errorSchema), http.StatusBadRequest)
} else {
io.WriteString(w, "added")
}
}

View File

@@ -36,6 +36,7 @@ func NewServer(port string) (*Server, error) {
router.HandleFunc("/health/toggle", healthToggleHandler).Methods("GET")
router.HandleFunc("/schema/get", getSchemaHandler).Methods("GET")
router.HandleFunc("/schema/refresh", refreshSchemaHandler).Methods("POST")
router.HandleFunc("/schema/add", addSchemaHandler).Methods("POST")
// router.HandleFunc("/test", testHandler).Methods("GET")
// router.HandleFunc("/stop", stopHandler).Methods("POST")