From 385b64d49c897686ad3e458f6e83adb1c4daf503 Mon Sep 17 00:00:00 2001 From: aishwarya-raimule Date: Thu, 12 Jan 2023 14:44:26 +0530 Subject: [PATCH 1/5] moved logs to getConfigs function --- main.go | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/main.go b/main.go index e8be5e2..1bef910 100644 --- a/main.go +++ b/main.go @@ -6,7 +6,6 @@ import ( producer_client "com.navi.medici.janus/producer" "com.navi.medici.janus/schema" server "com.navi.medici.janus/server" - "github.com/spf13/viper" "log" ) @@ -16,14 +15,11 @@ var ( cors string ) -const metrics_port = "4000" +const METRICS_PORT = "4000" func init() { configs := getConfigs() - port = configs.Server.Port - log.Printf("PORT IS: %v", port) - log.Printf(configs.Kafka.Bootstrap_Servers) - log.Printf(configs.SchemaRegistry.Endpoint) + schema.InitializeSchemaHandler(configs) producer_client.InitializeProducers(configs.Kafka, configs.Env) @@ -65,14 +61,19 @@ func getConfigs() config.Configurations { configuration.Kafka.Sasl_User = viper.GetString(configuration.Kafka.Sasl_User) configuration.Kafka.Sasl_Password = viper.GetString(configuration.Kafka.Sasl_Password) //} + + port = configuration.Server.Port cors = viper.GetString(configuration.Server.Cors) + log.Printf("PORT IS: %v", port) + log.Printf(configuration.Kafka.Bootstrap_Servers) + log.Printf(configuration.SchemaRegistry.Endpoint) return configuration } func main() { log.Printf("Serving on http://0.0.0.0:", port) httpServer, err1 := server.NewServer(port, cors) - metricsServer, err2 := server.MetricServer(metrics_port) + metricsServer, err2 := server.MetricServer(METRICS_PORT) if err1 != nil { log.Fatalln("Unable to start server, ", err1) } From 4269298463978f59efb38acb10a23d9f77474a23 Mon Sep 17 00:00:00 2001 From: aishwarya-raimule Date: Mon, 23 Jan 2023 14:25:30 +0530 Subject: [PATCH 2/5] removed unused api endpoints --- server/server.go | 7 ------- 1 file changed, 7 deletions(-) diff --git a/server/server.go b/server/server.go index a165ef9..313644f 100644 --- a/server/server.go +++ b/server/server.go @@ -33,16 +33,9 @@ func NewServer(port string, corsList string) (*Server, error) { } router := mux.NewRouter() - router.HandleFunc("/events", eventsHandler).Methods("POST") router.HandleFunc("/events/json", eventsHandlerJson).Methods("POST") router.HandleFunc("/health", healthHandler).Methods("GET") router.HandleFunc("/health/toggle", healthToggleHandler).Methods("GET") - router.HandleFunc("/schema/get", getSchemaHandler).Methods("GET") - router.HandleFunc("/schema/refresh", refreshSchemaHandler).Methods("POST") - router.HandleFunc("/schema/add", addSchemaHandler).Methods("POST") - //router.Path("/metrics").Handler(promhttp.Handler()) - // router.HandleFunc("/test", testHandler).Methods("GET") - // router.HandleFunc("/stop", stopHandler).Methods("POST") httpServer := &http.Server{Addr: ":" + port, Handler: enableCors(router, corsList)} newServer := &Server{HttpServer: httpServer, Listener: listener} From 374fe655d35a34f2a7271cf560f72515568ee8b4 Mon Sep 17 00:00:00 2001 From: aishwarya-raimule Date: Mon, 23 Jan 2023 14:30:42 +0530 Subject: [PATCH 3/5] removed unused api handlers --- server/handlers.go | 69 ---------------------------------------------- 1 file changed, 69 deletions(-) diff --git a/server/handlers.go b/server/handlers.go index 6665400..de77040 100644 --- a/server/handlers.go +++ b/server/handlers.go @@ -2,9 +2,6 @@ package server import ( "com.navi.medici.janus/lib" - "com.navi.medici.janus/schema" - - "fmt" "io" // "log" @@ -29,43 +26,6 @@ type CustomResponse struct { Message string `json:"message"` } -func eventsHandler(w http.ResponseWriter, r *http.Request) { - var reader io.Reader - - // for name, values := range r.Header { - // // Loop over all values for the name. - // for _, value := range values { - // fmt.Println("HEADER: ", name, value) - // } - // } - - // check if body is gzip compressed - if r.Header.Get("Content-Encoding") == "gzip" { - var err error - reader, err = gzip.NewReader(r.Body) - if err != nil { - // log.Printf(err) - w.Header().Set("Content-Type", "application/json") - http.Error(w, "Error while decompressing GZIP payload", http.StatusBadRequest) - return - } - } else { - reader = r.Body - } - - body, err := ioutil.ReadAll(reader) - if err != nil { - // log.Printf(err) - w.Header().Set("Content-Type", "application/json") - http.Error(w, "Request body invalid", http.StatusBadRequest) - return - } - - w.Header().Set("Content-Type", "application/json") - lib.ProtobufRequestChannel <- &lib.RequestObject{Body: body, Header: r.Header} - io.WriteString(w, "ok") -} - func eventsHandlerJson(w http.ResponseWriter, r *http.Request) { var reader io.Reader // check if body is gzip compressed @@ -111,32 +71,3 @@ func healthToggleHandler(w http.ResponseWriter, r *http.Request) { healthyBool = !healthyBool io.WriteString(w, "toggled") } - -func getSchemaHandler(w http.ResponseWriter, r *http.Request) { - schemaMapJson, _ := json.Marshal(schema.SchemaVersionMap) - w.Header().Set("Content-Type", "application/json") - w.Write(schemaMapJson) -} - -func refreshSchemaHandler(w http.ResponseWriter, r *http.Request) { - schema.GetSchemaVersions() - io.WriteString(w, "Updated Schema Map") -} - -func addSchemaHandler(w http.ResponseWriter, r *http.Request) { - body, _ := ioutil.ReadAll(r.Body) - - var bodyJson NewSchemaRequest - error := json.Unmarshal(body, &bodyJson) - fmt.Println(error) - - errorSchema := schema.AddSchema(bodyJson.Topic, bodyJson.SchemaType, bodyJson.Schema) - fmt.Println("errorSchema: ") - fmt.Println(error) - if errorSchema != nil { - http.Error(w, fmt.Sprintf("Error creating the schema %s", errorSchema), http.StatusBadRequest) - } else { - io.WriteString(w, "added") - } - -} From 9ecc13f2d30761da6cfdea3fdf9d148b9914509e Mon Sep 17 00:00:00 2001 From: aishwarya-raimule Date: Mon, 23 Jan 2023 16:22:44 +0530 Subject: [PATCH 4/5] removed unused api handlers and protobuf processing logic --- lib/RequestHandler.go | 44 ------------------------------------------- main.go | 4 ---- 2 files changed, 48 deletions(-) diff --git a/lib/RequestHandler.go b/lib/RequestHandler.go index bd6ac90..68b494d 100644 --- a/lib/RequestHandler.go +++ b/lib/RequestHandler.go @@ -3,10 +3,7 @@ package lib import ( metrics "com.navi.medici.janus/instrumentation" producer_module "com.navi.medici.janus/producer" - "com.navi.medici.janus/schema" - // "log" - "encoding/binary" "encoding/json" "net/http" @@ -23,13 +20,6 @@ type RequestObject struct { Header http.Header } -func ProcessProtobufRequestChannel(topic string) { - for { - request := <-ProtobufRequestChannel - ClickstreamProtobufEventHandler(*request, topic) - } -} - func ProcessJsonRequestChannel(topic string) { for { request := <-JsonRequestChannel @@ -37,40 +27,6 @@ func ProcessJsonRequestChannel(topic string) { } } -func ClickstreamProtobufEventHandler(request RequestObject, topic string) { - - messageBytes := request.Body - //getting the client which has sent this event - var result map[string]interface{} - json.Unmarshal(messageBytes, &result) - source := getSource(result) - //log.Print(source) - // to be of the format [magicByte] + [schemaID] + [messageIndex] + [value] - recordValue := []byte{} - - // add [magicByte] - recordValue = append(recordValue, byte(0)) - - // add schemaID] - schemaIDBytes := make([]byte, 4) - binary.BigEndian.PutUint32(schemaIDBytes, uint32(schema.SchemaVersionMap[topic])) - recordValue = append(recordValue, schemaIDBytes...) - - // add [messageIndex] - messageIndexBytes := []byte{byte(2), byte(0)} - recordValue = append(recordValue, messageIndexBytes...) - - // Now write the bytes from the actual message... - recordValue = append(recordValue, messageBytes...) - - message := &sarama.ProducerMessage{ - Topic: topic, - Value: sarama.ByteEncoder(recordValue), - } - metrics.IncrementCounter("request", source) - producer_module.WriteMessageToKafkaAsync(message, source) -} - func ClickstreamJsonEventHandler(request RequestObject, topic string) { messageBytes := request.Body diff --git a/main.go b/main.go index 1bef910..d788af9 100644 --- a/main.go +++ b/main.go @@ -23,10 +23,6 @@ func init() { schema.InitializeSchemaHandler(configs) producer_client.InitializeProducers(configs.Kafka, configs.Env) - for i := 0; i < 2; i++ { - go lib.ProcessProtobufRequestChannel(configs.Kafka.Kafka_Topic_Protobuf) - } - for i := 0; i < configs.Server.Goroutines; i++ { go lib.ProcessJsonRequestChannel(configs.Kafka.Kafka_Topic_Json) } From 5f5f7b79531f92a16fd0ab659f4edcb9d6f2a95d Mon Sep 17 00:00:00 2001 From: aishwarya-raimule Date: Mon, 23 Jan 2023 16:24:36 +0530 Subject: [PATCH 5/5] removed unused api handlers and protobuf processing logic --- server/handlers.go | 5 ----- server/server.go | 1 - 2 files changed, 6 deletions(-) diff --git a/server/handlers.go b/server/handlers.go index de77040..dcb9f33 100644 --- a/server/handlers.go +++ b/server/handlers.go @@ -66,8 +66,3 @@ func healthHandler(w http.ResponseWriter, r *http.Request) { return } } - -func healthToggleHandler(w http.ResponseWriter, r *http.Request) { - healthyBool = !healthyBool - io.WriteString(w, "toggled") -} diff --git a/server/server.go b/server/server.go index 313644f..6dfbb0a 100644 --- a/server/server.go +++ b/server/server.go @@ -35,7 +35,6 @@ func NewServer(port string, corsList string) (*Server, error) { router := mux.NewRouter() router.HandleFunc("/events/json", eventsHandlerJson).Methods("POST") router.HandleFunc("/health", healthHandler).Methods("GET") - router.HandleFunc("/health/toggle", healthToggleHandler).Methods("GET") httpServer := &http.Server{Addr: ":" + port, Handler: enableCors(router, corsList)} newServer := &Server{HttpServer: httpServer, Listener: listener}