diff --git a/lib/RequestHandler.go b/lib/RequestHandler.go index 589d8f5..68b494d 100644 --- a/lib/RequestHandler.go +++ b/lib/RequestHandler.go @@ -3,8 +3,7 @@ package lib import ( metrics "com.navi.medici.janus/instrumentation" producer_module "com.navi.medici.janus/producer" - "com.navi.medici.janus/schema" - "encoding/binary" + "encoding/json" "net/http" @@ -21,13 +20,6 @@ type RequestObject struct { Header http.Header } -func ProcessProtobufRequestChannel(topic string) { - for { - request := <-ProtobufRequestChannel - ClickstreamProtobufEventHandler(*request, topic) - } -} - func ProcessJsonRequestChannel(topic string) { for { request := <-JsonRequestChannel @@ -35,40 +27,6 @@ func ProcessJsonRequestChannel(topic string) { } } -func ClickstreamProtobufEventHandler(request RequestObject, topic string) { - - messageBytes := request.Body - //getting the client which has sent this event - var result map[string]interface{} - json.Unmarshal(messageBytes, &result) - source := getSource(result) - //log.Print(source) - // to be of the format [magicByte] + [schemaID] + [messageIndex] + [value] - recordValue := []byte{} - - // add [magicByte] - recordValue = append(recordValue, byte(0)) - - // add schemaID] - schemaIDBytes := make([]byte, 4) - binary.BigEndian.PutUint32(schemaIDBytes, uint32(schema.SchemaVersionMap[topic])) - recordValue = append(recordValue, schemaIDBytes...) - - // add [messageIndex] - messageIndexBytes := []byte{byte(2), byte(0)} - recordValue = append(recordValue, messageIndexBytes...) - - // Now write the bytes from the actual message... - recordValue = append(recordValue, messageBytes...) - - message := &sarama.ProducerMessage{ - Topic: topic, - Value: sarama.ByteEncoder(recordValue), - } - metrics.IncrementCounter("request", source) - producer_module.WriteMessageToKafkaAsync(message, source) -} - func ClickstreamJsonEventHandler(request RequestObject, topic string) { messageBytes := request.Body diff --git a/main.go b/main.go index eb70fb9..86e3ef2 100644 --- a/main.go +++ b/main.go @@ -7,10 +7,9 @@ import ( "com.navi.medici.janus/schema" "com.navi.medici.janus/server" "com.navi.medici.janus/utils" + "github.com/spf13/viper" "go.uber.org/zap" "log" - - "github.com/spf13/viper" ) var ( @@ -19,7 +18,7 @@ var ( logger *zap.Logger ) -const metrics_port = "4000" +const METRICS_PORT = "4000" func init() { configs := getConfigs() @@ -27,15 +26,10 @@ func init() { logger = utils.GetLogger() port = configs.Server.Port logger.Debug("PORT IS: " + port) - logger.Debug(configs.Kafka.Bootstrap_Servers) - logger.Debug(configs.SchemaRegistry.Endpoint) + logger.Debug("Bootstrap Servers:" + configs.Kafka.Bootstrap_Servers) schema.InitializeSchemaHandler(configs) producer_client.InitializeProducers(configs.Kafka, configs.Env) - for i := 0; i < 2; i++ { - go lib.ProcessProtobufRequestChannel(configs.Kafka.Kafka_Topic_Protobuf) - } - for i := 0; i < configs.Server.Goroutines; i++ { go lib.ProcessJsonRequestChannel(configs.Kafka.Kafka_Topic_Json) } @@ -49,12 +43,12 @@ func getConfigs() config.Configurations { var configuration config.Configurations if err := viper.ReadInConfig(); err != nil { - log.Fatalf("Error reading config file, %s\n", err) + log.Fatalln("Error reading config file, %s", err) } err := viper.Unmarshal(&configuration) if err != nil { - log.Fatalf("Unable to decode into struct, %v\n", err) + log.Fatalln("Unable to decode into struct, %v", err) } // Following coinfigurations read from environment variables @@ -76,7 +70,7 @@ func getConfigs() config.Configurations { func main() { logger.Debug("Serving on http://0.0.0.0:" + port) httpServer, err1 := server.NewServer(port, cors) - metricsServer, err2 := server.MetricServer(metrics_port) + metricsServer, err2 := server.MetricServer(METRICS_PORT) if err1 != nil { logger.Fatal("Unable to start server, %v", zap.String("error", err1.Error())) } diff --git a/server/handlers.go b/server/handlers.go index 223ce32..dcb9f33 100644 --- a/server/handlers.go +++ b/server/handlers.go @@ -2,12 +2,11 @@ package server import ( "com.navi.medici.janus/lib" - "com.navi.medici.janus/schema" + "io" + // "log" "compress/gzip" "encoding/json" - "fmt" - "io" "io/ioutil" "net/http" ) @@ -27,43 +26,6 @@ type CustomResponse struct { Message string `json:"message"` } -func eventsHandler(w http.ResponseWriter, r *http.Request) { - var reader io.Reader - - // for name, values := range r.Header { - // // Loop over all values for the name. - // for _, value := range values { - // fmt.Println("HEADER: ", name, value) - // } - // } - - // check if body is gzip compressed - if r.Header.Get("Content-Encoding") == "gzip" { - var err error - reader, err = gzip.NewReader(r.Body) - if err != nil { - // log.Printf(err) - w.Header().Set("Content-Type", "application/json") - http.Error(w, "Error while decompressing GZIP payload", http.StatusBadRequest) - return - } - } else { - reader = r.Body - } - - body, err := ioutil.ReadAll(reader) - if err != nil { - // log.Printf(err) - w.Header().Set("Content-Type", "application/json") - http.Error(w, "Request body invalid", http.StatusBadRequest) - return - } - - w.Header().Set("Content-Type", "application/json") - lib.ProtobufRequestChannel <- &lib.RequestObject{Body: body, Header: r.Header} - io.WriteString(w, "ok") -} - func eventsHandlerJson(w http.ResponseWriter, r *http.Request) { var reader io.Reader // check if body is gzip compressed @@ -104,37 +66,3 @@ func healthHandler(w http.ResponseWriter, r *http.Request) { return } } - -func healthToggleHandler(w http.ResponseWriter, r *http.Request) { - healthyBool = !healthyBool - io.WriteString(w, "toggled") -} - -func getSchemaHandler(w http.ResponseWriter, r *http.Request) { - schemaMapJson, _ := json.Marshal(schema.SchemaVersionMap) - w.Header().Set("Content-Type", "application/json") - w.Write(schemaMapJson) -} - -func refreshSchemaHandler(w http.ResponseWriter, r *http.Request) { - schema.GetSchemaVersions() - io.WriteString(w, "Updated Schema Map") -} - -func addSchemaHandler(w http.ResponseWriter, r *http.Request) { - body, _ := ioutil.ReadAll(r.Body) - - var bodyJson NewSchemaRequest - error := json.Unmarshal(body, &bodyJson) - fmt.Println(error) - - errorSchema := schema.AddSchema(bodyJson.Topic, bodyJson.SchemaType, bodyJson.Schema) - fmt.Println("errorSchema: ") - fmt.Println(error) - if errorSchema != nil { - http.Error(w, fmt.Sprintf("Error creating the schema %s", errorSchema), http.StatusBadRequest) - } else { - io.WriteString(w, "added") - } - -} diff --git a/server/server.go b/server/server.go index a165ef9..6dfbb0a 100644 --- a/server/server.go +++ b/server/server.go @@ -33,16 +33,8 @@ func NewServer(port string, corsList string) (*Server, error) { } router := mux.NewRouter() - router.HandleFunc("/events", eventsHandler).Methods("POST") router.HandleFunc("/events/json", eventsHandlerJson).Methods("POST") router.HandleFunc("/health", healthHandler).Methods("GET") - router.HandleFunc("/health/toggle", healthToggleHandler).Methods("GET") - router.HandleFunc("/schema/get", getSchemaHandler).Methods("GET") - router.HandleFunc("/schema/refresh", refreshSchemaHandler).Methods("POST") - router.HandleFunc("/schema/add", addSchemaHandler).Methods("POST") - //router.Path("/metrics").Handler(promhttp.Handler()) - // router.HandleFunc("/test", testHandler).Methods("GET") - // router.HandleFunc("/stop", stopHandler).Methods("POST") httpServer := &http.Server{Addr: ":" + port, Handler: enableCors(router, corsList)} newServer := &Server{HttpServer: httpServer, Listener: listener}