diff --git a/lib/RequestHandler.go b/lib/RequestHandler.go index aae18f9..41df64e 100644 --- a/lib/RequestHandler.go +++ b/lib/RequestHandler.go @@ -3,13 +3,10 @@ package lib import ( metrics "com.navi.medici.janus/instrumentation" producer_module "com.navi.medici.janus/producer" - "com.navi.medici.janus/schema" "com.navi.medici.janus/utils" - "time" - - "encoding/binary" "encoding/json" "net/http" + "time" "github.com/Shopify/sarama" ) @@ -24,13 +21,6 @@ type RequestObject struct { Header http.Header } -func ProcessProtobufRequestChannel(topic string) { - for { - request := <-ProtobufRequestChannel - ClickstreamProtobufEventHandler(*request, topic) - } -} - func ProcessJsonRequestChannel(topic string) { for { request := <-JsonRequestChannel @@ -38,40 +28,6 @@ func ProcessJsonRequestChannel(topic string) { } } -func ClickstreamProtobufEventHandler(request RequestObject, topic string) { - - messageBytes := request.Body - //getting the client which has sent this event - var result map[string]interface{} - json.Unmarshal(messageBytes, &result) - source := getSource(result) - //log.Print(source) - // to be of the format [magicByte] + [schemaID] + [messageIndex] + [value] - recordValue := []byte{} - - // add [magicByte] - recordValue = append(recordValue, byte(0)) - - // add schemaID] - schemaIDBytes := make([]byte, 4) - binary.BigEndian.PutUint32(schemaIDBytes, uint32(schema.SchemaVersionMap[topic])) - recordValue = append(recordValue, schemaIDBytes...) - - // add [messageIndex] - messageIndexBytes := []byte{byte(2), byte(0)} - recordValue = append(recordValue, messageIndexBytes...) - - // Now write the bytes from the actual message... - recordValue = append(recordValue, messageBytes...) - - message := &sarama.ProducerMessage{ - Topic: topic, - Value: sarama.ByteEncoder(recordValue), - } - metrics.RequestCounter.WithLabelValues(source).Inc() - producer_module.WriteMessageToKafkaAsync(message, source) -} - func ClickstreamJsonEventHandler(request RequestObject, topic string) { eventProcessingStartTime := utils.NanosToMillis(time.Now().UnixNano()) messageBytes := request.Body diff --git a/main.go b/main.go index eb70fb9..6830763 100644 --- a/main.go +++ b/main.go @@ -7,10 +7,9 @@ import ( "com.navi.medici.janus/schema" "com.navi.medici.janus/server" "com.navi.medici.janus/utils" + "github.com/spf13/viper" "go.uber.org/zap" "log" - - "github.com/spf13/viper" ) var ( @@ -19,23 +18,18 @@ var ( logger *zap.Logger ) -const metrics_port = "4000" +const METRICS_PORT = "4000" func init() { configs := getConfigs() utils.InitializeLogger(configs.Env) logger = utils.GetLogger() port = configs.Server.Port - logger.Debug("PORT IS: " + port) - logger.Debug(configs.Kafka.Bootstrap_Servers) - logger.Debug(configs.SchemaRegistry.Endpoint) + logger.Debug("Service started on PORT: " + port) + logger.Debug("Kafka Bootstrap Servers: " + configs.Kafka.Bootstrap_Servers) schema.InitializeSchemaHandler(configs) producer_client.InitializeProducers(configs.Kafka, configs.Env) - - for i := 0; i < 2; i++ { - go lib.ProcessProtobufRequestChannel(configs.Kafka.Kafka_Topic_Protobuf) - } - + logger.Info("Producer Initialized, starting goroutines for event processing") for i := 0; i < configs.Server.Goroutines; i++ { go lib.ProcessJsonRequestChannel(configs.Kafka.Kafka_Topic_Json) } @@ -49,12 +43,12 @@ func getConfigs() config.Configurations { var configuration config.Configurations if err := viper.ReadInConfig(); err != nil { - log.Fatalf("Error reading config file, %s\n", err) + log.Fatalln("Error reading config file, %s", err) } err := viper.Unmarshal(&configuration) if err != nil { - log.Fatalf("Unable to decode into struct, %v\n", err) + log.Fatalln("Unable to decode into struct, %v", err) } // Following coinfigurations read from environment variables @@ -76,12 +70,12 @@ func getConfigs() config.Configurations { func main() { logger.Debug("Serving on http://0.0.0.0:" + port) httpServer, err1 := server.NewServer(port, cors) - metricsServer, err2 := server.MetricServer(metrics_port) + metricsServer, err2 := server.MetricServer(METRICS_PORT) if err1 != nil { - logger.Fatal("Unable to start server, %v", zap.String("error", err1.Error())) + logger.Fatal("Unable to start server, %v", zap.Error(err1)) } if err2 != nil { - logger.Fatal("Unable to start Metric server, %v", zap.String("error", err2.Error())) + logger.Fatal("Unable to start Metric server, %v", zap.Error(err2)) } go httpServer.HttpServer.Serve(httpServer.Listener) go metricsServer.HttpServer.Serve(metricsServer.Listener) diff --git a/producer/kafka_writer.go b/producer/kafka_writer.go index 0d523ba..4f6ffa9 100644 --- a/producer/kafka_writer.go +++ b/producer/kafka_writer.go @@ -13,7 +13,7 @@ func WriteMessageToKafkaAsync(message *sarama.ProducerMessage, source string) { asyncProducer.Input() <- message metrics.KafkaPushTimeHist.WithLabelValues(message.Topic, source).Observe(float64(utils.NanosToMillis(time.Now().UnixNano()) - beforeKafkaPushTime)) metrics.KafkaPushCounter.WithLabelValues(message.Topic, source).Inc() - logger.Info("SUCCESSFULLY WRITTEN TO KAFKA FOR SOURCE " + source) + logger.Debug("SUCCESSFULLY WRITTEN TO KAFKA FOR SOURCE " + source) } func processProducerSuccesses() { @@ -29,5 +29,4 @@ func processProducerErrors() { logger.Error("Failed to write message to Kafka", zap.Error(err.Err)) metrics.ProducerFailCounter.WithLabelValues(err.Msg.Topic).Inc() } - } diff --git a/schema/schema_util.go b/schema/schema_util.go index c0bec05..25df622 100644 --- a/schema/schema_util.go +++ b/schema/schema_util.go @@ -39,28 +39,3 @@ func GetSchemaVersions() { } logger.Debug("Schema Version Map", zap.String("", fmt.Sprintf("%v", SchemaVersionMap))) } - -func AddSchema(topic string, schemaType string, schema string) error { - - schemaRegistryClient := srclient.CreateSchemaRegistryClient(SchemaRegistryEndpoint) - schemaRegistryClient.CodecCreationEnabled(false) - existingSchema, _ := schemaRegistryClient.GetLatestSchema(topic, false) - logger.Debug("EXISTINGSCHEMA", zap.String("", fmt.Sprintf("%v", existingSchema))) - - if existingSchema != nil { - compatible, errorCompatible := schemaRegistryClient.IsSchemaCompatible(topic, schema, "latest", srclient.SchemaType(schemaType), false) - if errorCompatible != nil { - return errorCompatible - } else if compatible == false { - return fmt.Errorf("given schema not compatible with the latest version") - } - } - - _, errroCreate := schemaRegistryClient.CreateSchema(topic, schema, srclient.SchemaType(schemaType), false) - - if errroCreate != nil { - return errroCreate - } else { - return nil - } -} diff --git a/server/handlers.go b/server/handlers.go index d0627c9..c68e5cb 100644 --- a/server/handlers.go +++ b/server/handlers.go @@ -3,16 +3,13 @@ package server import ( metrics "com.navi.medici.janus/instrumentation" "com.navi.medici.janus/lib" - "com.navi.medici.janus/schema" "com.navi.medici.janus/utils" - "time" - "compress/gzip" "encoding/json" - "fmt" "io" "io/ioutil" "net/http" + "time" ) var ( @@ -21,7 +18,6 @@ var ( const ( JSON = "json" - PROTO = "proto" SUCCESS = "success" ERROR = "error" ) @@ -37,43 +33,6 @@ type CustomResponse struct { Message string `json:"message"` } -func eventsHandler(w http.ResponseWriter, r *http.Request) { - var reader io.Reader - - // for name, values := range r.Header { - // // Loop over all values for the name. - // for _, value := range values { - // fmt.Println("HEADER: ", name, value) - // } - // } - - // check if body is gzip compressed - if r.Header.Get("Content-Encoding") == "gzip" { - var err error - reader, err = gzip.NewReader(r.Body) - if err != nil { - // log.Printf(err) - w.Header().Set("Content-Type", "application/json") - http.Error(w, "Error while decompressing GZIP payload", http.StatusBadRequest) - return - } - } else { - reader = r.Body - } - - body, err := ioutil.ReadAll(reader) - if err != nil { - // log.Printf(err) - w.Header().Set("Content-Type", "application/json") - http.Error(w, "Request body invalid", http.StatusBadRequest) - return - } - - w.Header().Set("Content-Type", "application/json") - lib.ProtobufRequestChannel <- &lib.RequestObject{Body: body, Header: r.Header} - io.WriteString(w, "ok") -} - func eventsHandlerJson(w http.ResponseWriter, r *http.Request) { eventHandlerStartTime := utils.NanosToMillis(time.Now().UnixNano()) var reader io.Reader @@ -117,37 +76,3 @@ func healthHandler(w http.ResponseWriter, r *http.Request) { return } } - -func healthToggleHandler(w http.ResponseWriter, r *http.Request) { - healthyBool = !healthyBool - io.WriteString(w, "toggled") -} - -func getSchemaHandler(w http.ResponseWriter, r *http.Request) { - schemaMapJson, _ := json.Marshal(schema.SchemaVersionMap) - w.Header().Set("Content-Type", "application/json") - w.Write(schemaMapJson) -} - -func refreshSchemaHandler(w http.ResponseWriter, r *http.Request) { - schema.GetSchemaVersions() - io.WriteString(w, "Updated Schema Map") -} - -func addSchemaHandler(w http.ResponseWriter, r *http.Request) { - body, _ := ioutil.ReadAll(r.Body) - - var bodyJson NewSchemaRequest - error := json.Unmarshal(body, &bodyJson) - fmt.Println(error) - - errorSchema := schema.AddSchema(bodyJson.Topic, bodyJson.SchemaType, bodyJson.Schema) - fmt.Println("errorSchema: ") - fmt.Println(error) - if errorSchema != nil { - http.Error(w, fmt.Sprintf("Error creating the schema %s", errorSchema), http.StatusBadRequest) - } else { - io.WriteString(w, "added") - } - -} diff --git a/server/server.go b/server/server.go index a165ef9..6dfbb0a 100644 --- a/server/server.go +++ b/server/server.go @@ -33,16 +33,8 @@ func NewServer(port string, corsList string) (*Server, error) { } router := mux.NewRouter() - router.HandleFunc("/events", eventsHandler).Methods("POST") router.HandleFunc("/events/json", eventsHandlerJson).Methods("POST") router.HandleFunc("/health", healthHandler).Methods("GET") - router.HandleFunc("/health/toggle", healthToggleHandler).Methods("GET") - router.HandleFunc("/schema/get", getSchemaHandler).Methods("GET") - router.HandleFunc("/schema/refresh", refreshSchemaHandler).Methods("POST") - router.HandleFunc("/schema/add", addSchemaHandler).Methods("POST") - //router.Path("/metrics").Handler(promhttp.Handler()) - // router.HandleFunc("/test", testHandler).Methods("GET") - // router.HandleFunc("/stop", stopHandler).Methods("POST") httpServer := &http.Server{Addr: ":" + port, Handler: enableCors(router, corsList)} newServer := &Server{HttpServer: httpServer, Listener: listener} diff --git a/utils/logger.go b/utils/logger.go index 6b41676..83ae8e7 100644 --- a/utils/logger.go +++ b/utils/logger.go @@ -9,7 +9,7 @@ var logger *zap.Logger func InitializeLogger(env string) { var init_err error - if env == "dev" { + if env == "DEV" { logger, init_err = zap.NewDevelopment() } else { logger, init_err = zap.NewProduction() @@ -20,5 +20,8 @@ func InitializeLogger(env string) { } func GetLogger() *zap.Logger { + if logger == nil { + InitializeLogger("PROD") + } return logger }