From bbdde6ae6871a98d013d589cf6fe9d544c76819e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cnishant-sharma=E2=80=9D?= Date: Tue, 30 Mar 2021 16:05:40 +0530 Subject: [PATCH] lib --- lib/RequestHandler.go | 62 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 62 insertions(+) create mode 100644 lib/RequestHandler.go diff --git a/lib/RequestHandler.go b/lib/RequestHandler.go new file mode 100644 index 0000000..d42826c --- /dev/null +++ b/lib/RequestHandler.go @@ -0,0 +1,62 @@ +package lib + +import ( + producer_module "com.navi.medici.janus/producer" + + "log" + "net/http" + "encoding/binary" + "github.com/Shopify/sarama" + +) + +var ( + RequestChannel = make(chan *RequestObject) +) + +type RequestObject struct { + Body []byte + Header http.Header +} + +func ProcessRequestChannel() { + for { + request := <- RequestChannel + ClickstreamProtobufEventHandler(*request, "testgo") + } +} + +func ClickstreamProtobufEventHandler(request RequestObject, topic string) { + + messageBytes := request.Body + + recordValue := []byte{} + + // add [magicByte] + recordValue = append(recordValue, byte(0)) + + // add schemaID] + schemaIDBytes := make([]byte, 4) + binary.BigEndian.PutUint32(schemaIDBytes, uint32(producer_module.SchemaVersionMap[topic])) + recordValue = append(recordValue, schemaIDBytes...) + + // add [messageIndex] + messageIndexBytes := []byte{byte(2), byte(0)} + recordValue = append(recordValue, messageIndexBytes...) + + // Now write the bytes from the actual value... + // valueBytes, _ := proto.Marshal(&sensorReading) + recordValue = append(recordValue, messageBytes...) + + message := &sarama.ProducerMessage{ + Topic: topic, + Value: sarama.ByteEncoder(recordValue), + } + + if message != nil { + log.Printf("WRITTEN TO KAFKA") + } + + producer_module.WriteMessageToKafkaAsync(message) + +}