From 385b64d49c897686ad3e458f6e83adb1c4daf503 Mon Sep 17 00:00:00 2001 From: aishwarya-raimule Date: Thu, 12 Jan 2023 14:44:26 +0530 Subject: [PATCH 1/8] moved logs to getConfigs function --- main.go | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/main.go b/main.go index e8be5e2..1bef910 100644 --- a/main.go +++ b/main.go @@ -6,7 +6,6 @@ import ( producer_client "com.navi.medici.janus/producer" "com.navi.medici.janus/schema" server "com.navi.medici.janus/server" - "github.com/spf13/viper" "log" ) @@ -16,14 +15,11 @@ var ( cors string ) -const metrics_port = "4000" +const METRICS_PORT = "4000" func init() { configs := getConfigs() - port = configs.Server.Port - log.Printf("PORT IS: %v", port) - log.Printf(configs.Kafka.Bootstrap_Servers) - log.Printf(configs.SchemaRegistry.Endpoint) + schema.InitializeSchemaHandler(configs) producer_client.InitializeProducers(configs.Kafka, configs.Env) @@ -65,14 +61,19 @@ func getConfigs() config.Configurations { configuration.Kafka.Sasl_User = viper.GetString(configuration.Kafka.Sasl_User) configuration.Kafka.Sasl_Password = viper.GetString(configuration.Kafka.Sasl_Password) //} + + port = configuration.Server.Port cors = viper.GetString(configuration.Server.Cors) + log.Printf("PORT IS: %v", port) + log.Printf(configuration.Kafka.Bootstrap_Servers) + log.Printf(configuration.SchemaRegistry.Endpoint) return configuration } func main() { log.Printf("Serving on http://0.0.0.0:", port) httpServer, err1 := server.NewServer(port, cors) - metricsServer, err2 := server.MetricServer(metrics_port) + metricsServer, err2 := server.MetricServer(METRICS_PORT) if err1 != nil { log.Fatalln("Unable to start server, ", err1) } From 4269298463978f59efb38acb10a23d9f77474a23 Mon Sep 17 00:00:00 2001 From: aishwarya-raimule Date: Mon, 23 Jan 2023 14:25:30 +0530 Subject: [PATCH 2/8] removed unused api endpoints --- server/server.go | 7 ------- 1 file changed, 7 deletions(-) diff --git a/server/server.go b/server/server.go index a165ef9..313644f 100644 --- a/server/server.go +++ b/server/server.go @@ -33,16 +33,9 @@ func NewServer(port string, corsList string) (*Server, error) { } router := mux.NewRouter() - router.HandleFunc("/events", eventsHandler).Methods("POST") router.HandleFunc("/events/json", eventsHandlerJson).Methods("POST") router.HandleFunc("/health", healthHandler).Methods("GET") router.HandleFunc("/health/toggle", healthToggleHandler).Methods("GET") - router.HandleFunc("/schema/get", getSchemaHandler).Methods("GET") - router.HandleFunc("/schema/refresh", refreshSchemaHandler).Methods("POST") - router.HandleFunc("/schema/add", addSchemaHandler).Methods("POST") - //router.Path("/metrics").Handler(promhttp.Handler()) - // router.HandleFunc("/test", testHandler).Methods("GET") - // router.HandleFunc("/stop", stopHandler).Methods("POST") httpServer := &http.Server{Addr: ":" + port, Handler: enableCors(router, corsList)} newServer := &Server{HttpServer: httpServer, Listener: listener} From 374fe655d35a34f2a7271cf560f72515568ee8b4 Mon Sep 17 00:00:00 2001 From: aishwarya-raimule Date: Mon, 23 Jan 2023 14:30:42 +0530 Subject: [PATCH 3/8] removed unused api handlers --- server/handlers.go | 69 ---------------------------------------------- 1 file changed, 69 deletions(-) diff --git a/server/handlers.go b/server/handlers.go index 6665400..de77040 100644 --- a/server/handlers.go +++ b/server/handlers.go @@ -2,9 +2,6 @@ package server import ( "com.navi.medici.janus/lib" - "com.navi.medici.janus/schema" - - "fmt" "io" // "log" @@ -29,43 +26,6 @@ type CustomResponse struct { Message string `json:"message"` } -func eventsHandler(w http.ResponseWriter, r *http.Request) { - var reader io.Reader - - // for name, values := range r.Header { - // // Loop over all values for the name. - // for _, value := range values { - // fmt.Println("HEADER: ", name, value) - // } - // } - - // check if body is gzip compressed - if r.Header.Get("Content-Encoding") == "gzip" { - var err error - reader, err = gzip.NewReader(r.Body) - if err != nil { - // log.Printf(err) - w.Header().Set("Content-Type", "application/json") - http.Error(w, "Error while decompressing GZIP payload", http.StatusBadRequest) - return - } - } else { - reader = r.Body - } - - body, err := ioutil.ReadAll(reader) - if err != nil { - // log.Printf(err) - w.Header().Set("Content-Type", "application/json") - http.Error(w, "Request body invalid", http.StatusBadRequest) - return - } - - w.Header().Set("Content-Type", "application/json") - lib.ProtobufRequestChannel <- &lib.RequestObject{Body: body, Header: r.Header} - io.WriteString(w, "ok") -} - func eventsHandlerJson(w http.ResponseWriter, r *http.Request) { var reader io.Reader // check if body is gzip compressed @@ -111,32 +71,3 @@ func healthToggleHandler(w http.ResponseWriter, r *http.Request) { healthyBool = !healthyBool io.WriteString(w, "toggled") } - -func getSchemaHandler(w http.ResponseWriter, r *http.Request) { - schemaMapJson, _ := json.Marshal(schema.SchemaVersionMap) - w.Header().Set("Content-Type", "application/json") - w.Write(schemaMapJson) -} - -func refreshSchemaHandler(w http.ResponseWriter, r *http.Request) { - schema.GetSchemaVersions() - io.WriteString(w, "Updated Schema Map") -} - -func addSchemaHandler(w http.ResponseWriter, r *http.Request) { - body, _ := ioutil.ReadAll(r.Body) - - var bodyJson NewSchemaRequest - error := json.Unmarshal(body, &bodyJson) - fmt.Println(error) - - errorSchema := schema.AddSchema(bodyJson.Topic, bodyJson.SchemaType, bodyJson.Schema) - fmt.Println("errorSchema: ") - fmt.Println(error) - if errorSchema != nil { - http.Error(w, fmt.Sprintf("Error creating the schema %s", errorSchema), http.StatusBadRequest) - } else { - io.WriteString(w, "added") - } - -} From 9ecc13f2d30761da6cfdea3fdf9d148b9914509e Mon Sep 17 00:00:00 2001 From: aishwarya-raimule Date: Mon, 23 Jan 2023 16:22:44 +0530 Subject: [PATCH 4/8] removed unused api handlers and protobuf processing logic --- lib/RequestHandler.go | 44 ------------------------------------------- main.go | 4 ---- 2 files changed, 48 deletions(-) diff --git a/lib/RequestHandler.go b/lib/RequestHandler.go index bd6ac90..68b494d 100644 --- a/lib/RequestHandler.go +++ b/lib/RequestHandler.go @@ -3,10 +3,7 @@ package lib import ( metrics "com.navi.medici.janus/instrumentation" producer_module "com.navi.medici.janus/producer" - "com.navi.medici.janus/schema" - // "log" - "encoding/binary" "encoding/json" "net/http" @@ -23,13 +20,6 @@ type RequestObject struct { Header http.Header } -func ProcessProtobufRequestChannel(topic string) { - for { - request := <-ProtobufRequestChannel - ClickstreamProtobufEventHandler(*request, topic) - } -} - func ProcessJsonRequestChannel(topic string) { for { request := <-JsonRequestChannel @@ -37,40 +27,6 @@ func ProcessJsonRequestChannel(topic string) { } } -func ClickstreamProtobufEventHandler(request RequestObject, topic string) { - - messageBytes := request.Body - //getting the client which has sent this event - var result map[string]interface{} - json.Unmarshal(messageBytes, &result) - source := getSource(result) - //log.Print(source) - // to be of the format [magicByte] + [schemaID] + [messageIndex] + [value] - recordValue := []byte{} - - // add [magicByte] - recordValue = append(recordValue, byte(0)) - - // add schemaID] - schemaIDBytes := make([]byte, 4) - binary.BigEndian.PutUint32(schemaIDBytes, uint32(schema.SchemaVersionMap[topic])) - recordValue = append(recordValue, schemaIDBytes...) - - // add [messageIndex] - messageIndexBytes := []byte{byte(2), byte(0)} - recordValue = append(recordValue, messageIndexBytes...) - - // Now write the bytes from the actual message... - recordValue = append(recordValue, messageBytes...) - - message := &sarama.ProducerMessage{ - Topic: topic, - Value: sarama.ByteEncoder(recordValue), - } - metrics.IncrementCounter("request", source) - producer_module.WriteMessageToKafkaAsync(message, source) -} - func ClickstreamJsonEventHandler(request RequestObject, topic string) { messageBytes := request.Body diff --git a/main.go b/main.go index 1bef910..d788af9 100644 --- a/main.go +++ b/main.go @@ -23,10 +23,6 @@ func init() { schema.InitializeSchemaHandler(configs) producer_client.InitializeProducers(configs.Kafka, configs.Env) - for i := 0; i < 2; i++ { - go lib.ProcessProtobufRequestChannel(configs.Kafka.Kafka_Topic_Protobuf) - } - for i := 0; i < configs.Server.Goroutines; i++ { go lib.ProcessJsonRequestChannel(configs.Kafka.Kafka_Topic_Json) } From 5f5f7b79531f92a16fd0ab659f4edcb9d6f2a95d Mon Sep 17 00:00:00 2001 From: aishwarya-raimule Date: Mon, 23 Jan 2023 16:24:36 +0530 Subject: [PATCH 5/8] removed unused api handlers and protobuf processing logic --- server/handlers.go | 5 ----- server/server.go | 1 - 2 files changed, 6 deletions(-) diff --git a/server/handlers.go b/server/handlers.go index de77040..dcb9f33 100644 --- a/server/handlers.go +++ b/server/handlers.go @@ -66,8 +66,3 @@ func healthHandler(w http.ResponseWriter, r *http.Request) { return } } - -func healthToggleHandler(w http.ResponseWriter, r *http.Request) { - healthyBool = !healthyBool - io.WriteString(w, "toggled") -} diff --git a/server/server.go b/server/server.go index 313644f..6dfbb0a 100644 --- a/server/server.go +++ b/server/server.go @@ -35,7 +35,6 @@ func NewServer(port string, corsList string) (*Server, error) { router := mux.NewRouter() router.HandleFunc("/events/json", eventsHandlerJson).Methods("POST") router.HandleFunc("/health", healthHandler).Methods("GET") - router.HandleFunc("/health/toggle", healthToggleHandler).Methods("GET") httpServer := &http.Server{Addr: ":" + port, Handler: enableCors(router, corsList)} newServer := &Server{HttpServer: httpServer, Listener: listener} From 09beb19ae7141268150862f8d3e751299dcf87a6 Mon Sep 17 00:00:00 2001 From: aishwarya-raimule Date: Mon, 30 Jan 2023 17:45:34 +0530 Subject: [PATCH 6/8] updated log messages and removed AddSchema function --- main.go | 4 ++-- schema/schema_util.go | 25 ------------------------- 2 files changed, 2 insertions(+), 27 deletions(-) diff --git a/main.go b/main.go index 86e3ef2..178cf83 100644 --- a/main.go +++ b/main.go @@ -72,10 +72,10 @@ func main() { httpServer, err1 := server.NewServer(port, cors) metricsServer, err2 := server.MetricServer(METRICS_PORT) if err1 != nil { - logger.Fatal("Unable to start server, %v", zap.String("error", err1.Error())) + logger.Fatal("Unable to start server, %v", zap.Error(err1)) } if err2 != nil { - logger.Fatal("Unable to start Metric server, %v", zap.String("error", err2.Error())) + logger.Fatal("Unable to start Metric server, %v", zap.Error(err2)) } go httpServer.HttpServer.Serve(httpServer.Listener) go metricsServer.HttpServer.Serve(metricsServer.Listener) diff --git a/schema/schema_util.go b/schema/schema_util.go index c0bec05..25df622 100644 --- a/schema/schema_util.go +++ b/schema/schema_util.go @@ -39,28 +39,3 @@ func GetSchemaVersions() { } logger.Debug("Schema Version Map", zap.String("", fmt.Sprintf("%v", SchemaVersionMap))) } - -func AddSchema(topic string, schemaType string, schema string) error { - - schemaRegistryClient := srclient.CreateSchemaRegistryClient(SchemaRegistryEndpoint) - schemaRegistryClient.CodecCreationEnabled(false) - existingSchema, _ := schemaRegistryClient.GetLatestSchema(topic, false) - logger.Debug("EXISTINGSCHEMA", zap.String("", fmt.Sprintf("%v", existingSchema))) - - if existingSchema != nil { - compatible, errorCompatible := schemaRegistryClient.IsSchemaCompatible(topic, schema, "latest", srclient.SchemaType(schemaType), false) - if errorCompatible != nil { - return errorCompatible - } else if compatible == false { - return fmt.Errorf("given schema not compatible with the latest version") - } - } - - _, errroCreate := schemaRegistryClient.CreateSchema(topic, schema, srclient.SchemaType(schemaType), false) - - if errroCreate != nil { - return errroCreate - } else { - return nil - } -} From 8b69869050ce9a0f895b360c38c6be2e4059e84f Mon Sep 17 00:00:00 2001 From: aishwarya-raimule Date: Mon, 30 Jan 2023 17:50:44 +0530 Subject: [PATCH 7/8] removed sarama logging --- producer/producer_config.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/producer/producer_config.go b/producer/producer_config.go index ab3b7c8..9ae7808 100644 --- a/producer/producer_config.go +++ b/producer/producer_config.go @@ -3,12 +3,9 @@ package producer import ( "com.navi.medici.janus/config" "com.navi.medici.janus/utils" - "go.uber.org/zap" - "log" - "crypto/tls" "github.com/Shopify/sarama" - "os" + "go.uber.org/zap" "strings" "time" ) @@ -98,7 +95,6 @@ func GetAsyncProducer(kafkaConfiguration config.KafkaConfigurations, env string) } func InitializeProducers(kafkaConfiguration config.KafkaConfigurations, env string) { - sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags) logger = utils.GetLogger() syncProducer = GetSyncProducer(kafkaConfiguration, env) asyncProducer = GetAsyncProducer(kafkaConfiguration, env) From 9fcd011e221f329a4a8bcaf6760ad6c385ba65aa Mon Sep 17 00:00:00 2001 From: aishwarya-raimule Date: Wed, 1 Feb 2023 13:00:02 +0530 Subject: [PATCH 8/8] added logs, initialise logger if logger is nil --- main.go | 6 +++--- producer/kafka_writer.go | 23 +++++++++++------------ utils/logger.go | 5 ++++- 3 files changed, 18 insertions(+), 16 deletions(-) diff --git a/main.go b/main.go index 178cf83..6830763 100644 --- a/main.go +++ b/main.go @@ -25,11 +25,11 @@ func init() { utils.InitializeLogger(configs.Env) logger = utils.GetLogger() port = configs.Server.Port - logger.Debug("PORT IS: " + port) - logger.Debug("Bootstrap Servers:" + configs.Kafka.Bootstrap_Servers) + logger.Debug("Service started on PORT: " + port) + logger.Debug("Kafka Bootstrap Servers: " + configs.Kafka.Bootstrap_Servers) schema.InitializeSchemaHandler(configs) producer_client.InitializeProducers(configs.Kafka, configs.Env) - + logger.Info("Producer Initialized, starting goroutines for event processing") for i := 0; i < configs.Server.Goroutines; i++ { go lib.ProcessJsonRequestChannel(configs.Kafka.Kafka_Topic_Json) } diff --git a/producer/kafka_writer.go b/producer/kafka_writer.go index 2d8e07b..a6cfa86 100644 --- a/producer/kafka_writer.go +++ b/producer/kafka_writer.go @@ -1,19 +1,18 @@ package producer import ( - metrics "com.navi.medici.janus/instrumentation" - "log" - "github.com/Shopify/sarama" + metrics "com.navi.medici.janus/instrumentation" + "github.com/Shopify/sarama" + "go.uber.org/zap" ) - func WriteMessageToKafkaAsync(message *sarama.ProducerMessage, source string) { - select { - case asyncProducer.Input() <- message: - metrics.IncrementCounter("success", source) -// log.Printf("SUCCESSFULLY WRITTEN TO KAFKA FOR SOURCE %v", source) - case err := <- asyncProducer.Errors(): - metrics.IncrementCounter("fail", source) - log.Printf("FAILED TO WRITE TO KAFKA FOR SOURCE %v %v", source, err) - } + select { + case asyncProducer.Input() <- message: + metrics.IncrementCounter("success", source) + logger.Debug("SUCCESSFULLY WRITTEN TO KAFKA FOR SOURCE " + source) + case err := <-asyncProducer.Errors(): + metrics.IncrementCounter("fail", source) + logger.Error("FAILED TO WRITE TO KAFKA FOR SOURCE "+source, zap.Error(err)) + } } diff --git a/utils/logger.go b/utils/logger.go index 6b41676..83ae8e7 100644 --- a/utils/logger.go +++ b/utils/logger.go @@ -9,7 +9,7 @@ var logger *zap.Logger func InitializeLogger(env string) { var init_err error - if env == "dev" { + if env == "DEV" { logger, init_err = zap.NewDevelopment() } else { logger, init_err = zap.NewProduction() @@ -20,5 +20,8 @@ func InitializeLogger(env string) { } func GetLogger() *zap.Logger { + if logger == nil { + InitializeLogger("PROD") + } return logger }