From 28a6c1ddff369ffedb4dc2a3d8fbd7d2a21c092e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cnishant-sharma=E2=80=9D?= Date: Tue, 27 Apr 2021 16:40:13 +0530 Subject: [PATCH] json support --- config/config.go | 3 +- config/config.yml | 3 +- lib/RequestHandler.go | 58 ++++++++++++++++++++++++++++-------- main.go | 9 ++++-- server/handlers.go | 68 +++++++++++++++++++++++++++++++++++++++---- server/server.go | 3 +- 6 files changed, 122 insertions(+), 22 deletions(-) diff --git a/config/config.go b/config/config.go index cbbd58c..fd88ca6 100644 --- a/config/config.go +++ b/config/config.go @@ -11,7 +11,6 @@ type Configurations struct { type ServerConfigurations struct { Port string Goroutines int - Kafka_Topic string } @@ -21,6 +20,8 @@ type KafkaConfigurations struct { Retry_Backoff_MS int Sasl_User string Sasl_Password string + Kafka_Topic_Json string + Kafka_Topic_Protobuf string } type SchemaRegistryConfigurations struct { diff --git a/config/config.yml b/config/config.yml index ced4400..77ad501 100644 --- a/config/config.yml +++ b/config/config.yml @@ -1,7 +1,6 @@ server: port: 8000 goroutines: 1000 - kafka_topic: KAFKA_TOPIC kafka: bootstrap_servers: BOOTSTRAP_SERVERS # read from environment variable @@ -9,6 +8,8 @@ kafka: retry_backoff_ms: 500 sasl_user: KAFKA_SASL_USER # read from environment variable sasl_password: KAFKA_SASL_PASSWORD # read from environment variable + kafka_topic_json: KAFKA_TOPIC_JSON + kafka_topic_protobuf: KAFKA_TOPIC_PROTOBUF schemaRegistry: endpoint: SCHEMA_REGISTRY_ENDPOINT diff --git a/lib/RequestHandler.go b/lib/RequestHandler.go index 814f9eb..ea84593 100644 --- a/lib/RequestHandler.go +++ b/lib/RequestHandler.go @@ -1,37 +1,46 @@ package lib import ( - producer_module "com.navi.medici.janus/producer" + producer_module "com.navi.medici.janus/producer" - // "log" + // "log" "net/http" - "encoding/binary" - "github.com/Shopify/sarama" + "encoding/binary" + "github.com/Shopify/sarama" ) var ( - RequestChannel = make(chan *RequestObject) + ProtobufRequestChannel = make(chan *RequestObject) + JsonRequestChannel = make(chan *RequestObject) ) type RequestObject struct { - Body []byte - Header http.Header + Body []byte + Header http.Header } -func ProcessRequestChannel(topic string) { +func ProcessProtobufRequestChannel(topic string) { for { - request := <- RequestChannel + request := <- ProtobufRequestChannel ClickstreamProtobufEventHandler(*request, topic) } } +func ProcessJsonRequestChannel(topic string) { + for { + request := <- JsonRequestChannel + ClickstreamJsonEventHandler(*request, topic) + } +} + + func ClickstreamProtobufEventHandler(request RequestObject, topic string) { messageBytes := request.Body // to be of the format [magicByte] + [schemaID] + [messageIndex] + [value] - recordValue := []byte{} + recordValue := []byte{} // add [magicByte] recordValue = append(recordValue, byte(0)) @@ -48,11 +57,36 @@ func ClickstreamProtobufEventHandler(request RequestObject, topic string) { // Now write the bytes from the actual message... recordValue = append(recordValue, messageBytes...) - message := &sarama.ProducerMessage{ + message := &sarama.ProducerMessage{ + Topic: topic, + Value: sarama.ByteEncoder(recordValue), + } + + producer_module.WriteMessageToKafkaAsync(message) +} + +func ClickstreamJsonEventHandler(request RequestObject, topic string) { + + messageBytes := request.Body + + // to be of the format [magicByte] + [schemaID] + [messageIndex] + [value] + recordValue := []byte{} + + // add [magicByte] + recordValue = append(recordValue, byte(0)) + + // add schemaID] + schemaIDBytes := make([]byte, 4) + binary.BigEndian.PutUint32(schemaIDBytes, uint32(producer_module.SchemaVersionMap[topic])) + recordValue = append(recordValue, schemaIDBytes...) + + // Now write the bytes from the actual message... + recordValue = append(recordValue, messageBytes...) + + message := &sarama.ProducerMessage{ Topic: topic, Value: sarama.ByteEncoder(recordValue), } producer_module.WriteMessageToKafkaAsync(message) - } diff --git a/main.go b/main.go index 1c6ff61..e175477 100644 --- a/main.go +++ b/main.go @@ -40,7 +40,8 @@ func init() { configuration.Kafka.Sasl_Password = viper.GetString(configuration.Kafka.Sasl_Password) configuration.SchemaRegistry.Endpoint = viper.GetString(configuration.SchemaRegistry.Endpoint) configuration.SchemaRegistry.Topics = viper.GetString(configuration.SchemaRegistry.Topics) - configuration.Server.Kafka_Topic = viper.GetString(configuration.Server.Kafka_Topic) + configuration.Kafka.Kafka_Topic_Json = viper.GetString(configuration.Kafka.Kafka_Topic_Json) + configuration.Kafka.Kafka_Topic_Protobuf = viper.GetString(configuration.Kafka.Kafka_Topic_Protobuf) port = configuration.Server.Port log.Printf("PORT IS: ", port) @@ -54,7 +55,11 @@ func init() { producer_module.InitializeProducers(configuration.Kafka) for i := 0; i < configuration.Server.Goroutines; i++ { - go lib.ProcessRequestChannel(configuration.Server.Kafka_Topic) + go lib.ProcessProtobufRequestChannel(configuration.Kafka.Kafka_Topic_Protobuf) + } + + for i := 0; i < configuration.Server.Goroutines; i++ { + go lib.ProcessJsonRequestChannel(configuration.Kafka.Kafka_Topic_Json) } } diff --git a/server/handlers.go b/server/handlers.go index 48517ae..1052018 100644 --- a/server/handlers.go +++ b/server/handlers.go @@ -7,26 +7,83 @@ import ( "io/ioutil" "log" "net/http" + "compress/gzip" ) var ( healthyBool bool = true ) + func eventsHandler(w http.ResponseWriter, r *http.Request) { var reader io.Reader - reader = r.Body - body, err := ioutil.ReadAll(reader) - if err == nil { - log.Printf(string(body)) + // check if body is gzip compressed + if r.Header.Get("Content-Encoding") == "gzip" { + var err error + reader, err = gzip.NewReader(r.Body) + if err != nil { + // log.Printf(err) + w.Header().Set("Content-Type", "application/json") + http.Error(w, "Error while decompressing GZIP payload", http.StatusBadRequest) + return + } + } else { + reader = r.Body + } + + body, err := ioutil.ReadAll(reader) + if err != nil { + // log.Printf(err) + w.Header().Set("Content-Type", "application/json") + http.Error(w, "Request body invalid", http.StatusBadRequest) + return } w.Header().Set("Content-Type", "application/json") - lib.RequestChannel <- &lib.RequestObject{Body: body, Header: r.Header} + lib.ProtobufRequestChannel <- &lib.RequestObject{Body: body, Header: r.Header} io.WriteString(w, "ok") } + +func eventsHandlerJson(w http.ResponseWriter, r *http.Request) { + var reader io.Reader + + for name, values := range r.Header { + // Loop over all values for the name. + for _, value := range values { + log.Println(name, value) + } + } + + // check if body is gzip compressed + if r.Header.Get("Content-Encoding") == "gzip" { + var err error + reader, err = gzip.NewReader(r.Body) + if err != nil { + // log.Printf(err) + w.Header().Set("Content-Type", "application/json") + http.Error(w, "Error while decompressing GZIP payload", http.StatusBadRequest) + return + } + } else { + reader = r.Body + } + + body, err := ioutil.ReadAll(reader) + if err != nil { + // log.Printf(err) + w.Header().Set("Content-Type", "application/json") + http.Error(w, "Request body invalid", http.StatusBadRequest) + return + } + + w.Header().Set("Content-Type", "application/json") + lib.JsonRequestChannel <- &lib.RequestObject{Body: body, Header: r.Header} + io.WriteString(w, "ok") +} + + func healthHandler(w http.ResponseWriter, r *http.Request) { if healthyBool { w.Header().Set("Content-Type", "application/json") @@ -37,6 +94,7 @@ func healthHandler(w http.ResponseWriter, r *http.Request) { } } + func healthToggleHandler(w http.ResponseWriter, r *http.Request) { healthyBool = !healthyBool io.WriteString(w, "toggled") diff --git a/server/server.go b/server/server.go index 1a4f052..ab700c3 100644 --- a/server/server.go +++ b/server/server.go @@ -31,8 +31,10 @@ func NewServer(port string) (*Server, error) { router := mux.NewRouter() router.HandleFunc("/events", eventsHandler).Methods("POST") + router.HandleFunc("/events/json", eventsHandlerJson).Methods("POST") router.HandleFunc("/health", healthHandler).Methods("GET") router.HandleFunc("/health/toggle", healthToggleHandler).Methods("GET") + // router.HandleFunc("/test", testHandler).Methods("GET") // router.HandleFunc("/schema/refresh", schemaRefreshHandler).Methods("POST") // router.HandleFunc("/stop", stopHandler).Methods("POST") @@ -41,4 +43,3 @@ func NewServer(port string) (*Server, error) { return newServer, nil } -