From 1ad5c94ee29af2013c3154027164515a8bfbc91c Mon Sep 17 00:00:00 2001 From: aishwarya-raimule Date: Mon, 9 Jan 2023 18:15:35 +0530 Subject: [PATCH] refactored code to breakdown init function, moved schema_util outside producer module, renamed files --- lib/RequestHandler.go | 33 ++++++------ main.go | 52 +++++++++---------- .../get_schema.go => schema/schema_util.go | 25 +++++---- server/handlers.go | 10 ++-- 4 files changed, 60 insertions(+), 60 deletions(-) rename producer/get_schema.go => schema/schema_util.go (72%) diff --git a/lib/RequestHandler.go b/lib/RequestHandler.go index d829ceb..a921bc7 100644 --- a/lib/RequestHandler.go +++ b/lib/RequestHandler.go @@ -1,16 +1,16 @@ package lib import ( + metrics "com.navi.medici.janus/instrumentation" + producer_module "com.navi.medici.janus/producer" + "com.navi.medici.janus/schema" - metrics "com.navi.medici.janus/instrumentation" - producer_module "com.navi.medici.janus/producer" + // "log" + "encoding/binary" + "encoding/json" + "net/http" - // "log" - "encoding/binary" - "encoding/json" - "net/http" - - "github.com/Shopify/sarama" + "github.com/Shopify/sarama" ) var ( @@ -19,25 +19,24 @@ var ( ) type RequestObject struct { - Body []byte - Header http.Header + Body []byte + Header http.Header } func ProcessProtobufRequestChannel(topic string) { for { - request := <- ProtobufRequestChannel + request := <-ProtobufRequestChannel ClickstreamProtobufEventHandler(*request, topic) } } func ProcessJsonRequestChannel(topic string) { for { - request := <- JsonRequestChannel + request := <-JsonRequestChannel ClickstreamJsonEventHandler(*request, topic) } } - func ClickstreamProtobufEventHandler(request RequestObject, topic string) { messageBytes := request.Body @@ -54,7 +53,7 @@ func ClickstreamProtobufEventHandler(request RequestObject, topic string) { // add schemaID] schemaIDBytes := make([]byte, 4) - binary.BigEndian.PutUint32(schemaIDBytes, uint32(producer_module.SchemaVersionMap[topic])) + binary.BigEndian.PutUint32(schemaIDBytes, uint32(schema.SchemaVersionMap[topic])) recordValue = append(recordValue, schemaIDBytes...) // add [messageIndex] @@ -79,7 +78,7 @@ func ClickstreamJsonEventHandler(request RequestObject, topic string) { //getting the client which has sent this event var result map[string]interface{} json.Unmarshal(messageBytes, &result) - source := result["source"].(string) + source := result["source"].(string) // to be of the format [magicByte] + [schemaID] + [messageIndex] + [value] recordValue := []byte{} @@ -89,7 +88,7 @@ func ClickstreamJsonEventHandler(request RequestObject, topic string) { // // add schemaID] // schemaIDBytes := make([]byte, 4) - // binary.BigEndian.PutUint32(schemaIDBytes, uint32(producer_module.SchemaVersionMap[topic])) + // binary.BigEndian.PutUint32(schemaIDBytes, uint32(schema.SchemaVersionMap[topic])) // recordValue = append(recordValue, schemaIDBytes...) // Now write the bytes from the actual message... @@ -99,7 +98,7 @@ func ClickstreamJsonEventHandler(request RequestObject, topic string) { Topic: topic, Value: sarama.ByteEncoder(recordValue), } - + metrics.IncrementCounter("request", source) producer_module.WriteMessageToKafkaAsync(message, source) } diff --git a/main.go b/main.go index 4148223..e8be5e2 100644 --- a/main.go +++ b/main.go @@ -3,12 +3,12 @@ package main import ( config "com.navi.medici.janus/config" lib "com.navi.medici.janus/lib" - producer_module "com.navi.medici.janus/producer" + producer_client "com.navi.medici.janus/producer" + "com.navi.medici.janus/schema" server "com.navi.medici.janus/server" "github.com/spf13/viper" "log" - "strings" ) var ( @@ -16,8 +16,27 @@ var ( cors string ) -func init() { +const metrics_port = "4000" +func init() { + configs := getConfigs() + port = configs.Server.Port + log.Printf("PORT IS: %v", port) + log.Printf(configs.Kafka.Bootstrap_Servers) + log.Printf(configs.SchemaRegistry.Endpoint) + schema.InitializeSchemaHandler(configs) + producer_client.InitializeProducers(configs.Kafka, configs.Env) + + for i := 0; i < 2; i++ { + go lib.ProcessProtobufRequestChannel(configs.Kafka.Kafka_Topic_Protobuf) + } + + for i := 0; i < configs.Server.Goroutines; i++ { + go lib.ProcessJsonRequestChannel(configs.Kafka.Kafka_Topic_Json) + } +} + +func getConfigs() config.Configurations { viper.SetConfigName("config") viper.AddConfigPath("./config") viper.SetConfigType("yml") @@ -46,37 +65,14 @@ func init() { configuration.Kafka.Sasl_User = viper.GetString(configuration.Kafka.Sasl_User) configuration.Kafka.Sasl_Password = viper.GetString(configuration.Kafka.Sasl_Password) //} - - port = configuration.Server.Port cors = viper.GetString(configuration.Server.Cors) - log.Printf("PORT IS: %v", port) - log.Printf(configuration.Kafka.Bootstrap_Servers) - log.Printf(configuration.SchemaRegistry.Endpoint) - - producer_module.SchemaRegistryEndpoint = configuration.SchemaRegistry.Endpoint - producer_module.TopicList = strings.Split(configuration.SchemaRegistry.Topics, ",") - - // initialize schema version map which contains latest schema version for topic(s) - producer_module.GetSchemaVersions() - - // initialize producers - producer_module.InitializeProducers(configuration.Kafka, configuration.Env) - - for i := 0; i < 2; i++ { - go lib.ProcessProtobufRequestChannel(configuration.Kafka.Kafka_Topic_Protobuf) - } - - for i := 0; i < configuration.Server.Goroutines; i++ { - go lib.ProcessJsonRequestChannel(configuration.Kafka.Kafka_Topic_Json) - } - + return configuration } func main() { - log.Printf("Serving on http://0.0.0.0:", port) httpServer, err1 := server.NewServer(port, cors) - metricsServer, err2 := server.MetricServer("4000") + metricsServer, err2 := server.MetricServer(metrics_port) if err1 != nil { log.Fatalln("Unable to start server, ", err1) } diff --git a/producer/get_schema.go b/schema/schema_util.go similarity index 72% rename from producer/get_schema.go rename to schema/schema_util.go index 8f83c6e..d9e33fe 100644 --- a/producer/get_schema.go +++ b/schema/schema_util.go @@ -1,19 +1,25 @@ -package producer - +package schema import ( - "log" + "com.navi.medici.janus/config" "fmt" "github.com/riferrei/srclient" + "log" + "strings" ) - var ( - SchemaRegistryEndpoint string - TopicList []string - SchemaVersionMap = make(map[string]int) + SchemaRegistryEndpoint string + TopicList []string + SchemaVersionMap = make(map[string]int) ) +func InitializeSchemaHandler(configuration config.Configurations) { + SchemaRegistryEndpoint = configuration.SchemaRegistry.Endpoint + TopicList = strings.Split(configuration.SchemaRegistry.Topics, ",") + // initialize schema version map which contains latest schema version for topic(s) + GetSchemaVersions() +} func GetSchemaVersions() { @@ -32,7 +38,6 @@ func GetSchemaVersions() { log.Println(SchemaVersionMap) } - func AddSchema(topic string, schemaType string, schema string) error { schemaRegistryClient := srclient.CreateSchemaRegistryClient(SchemaRegistryEndpoint) @@ -45,7 +50,7 @@ func AddSchema(topic string, schemaType string, schema string) error { compatible, errorCompatible := schemaRegistryClient.IsSchemaCompatible(topic, schema, "latest", srclient.SchemaType(schemaType), false) if errorCompatible != nil { return errorCompatible - } else if (compatible == false) { + } else if compatible == false { return fmt.Errorf("given schema not compatible with the latest version") } } @@ -59,4 +64,4 @@ func AddSchema(topic string, schemaType string, schema string) error { } else { return nil } -} \ No newline at end of file +} diff --git a/server/handlers.go b/server/handlers.go index dc5b337..6665400 100644 --- a/server/handlers.go +++ b/server/handlers.go @@ -1,8 +1,8 @@ package server import ( - lib "com.navi.medici.janus/lib" - producer "com.navi.medici.janus/producer" + "com.navi.medici.janus/lib" + "com.navi.medici.janus/schema" "fmt" "io" @@ -113,13 +113,13 @@ func healthToggleHandler(w http.ResponseWriter, r *http.Request) { } func getSchemaHandler(w http.ResponseWriter, r *http.Request) { - schemaMapJson, _ := json.Marshal(producer.SchemaVersionMap) + schemaMapJson, _ := json.Marshal(schema.SchemaVersionMap) w.Header().Set("Content-Type", "application/json") w.Write(schemaMapJson) } func refreshSchemaHandler(w http.ResponseWriter, r *http.Request) { - producer.GetSchemaVersions() + schema.GetSchemaVersions() io.WriteString(w, "Updated Schema Map") } @@ -130,7 +130,7 @@ func addSchemaHandler(w http.ResponseWriter, r *http.Request) { error := json.Unmarshal(body, &bodyJson) fmt.Println(error) - errorSchema := producer.AddSchema(bodyJson.Topic, bodyJson.SchemaType, bodyJson.Schema) + errorSchema := schema.AddSchema(bodyJson.Topic, bodyJson.SchemaType, bodyJson.Schema) fmt.Println("errorSchema: ") fmt.Println(error) if errorSchema != nil {