Files
janus/lib/RequestHandler.go
“nishant-sharma” bbdde6ae68 lib
2021-03-30 16:05:40 +05:30

63 lines
1.3 KiB
Go

package lib
import (
producer_module "com.navi.medici.janus/producer"
"log"
"net/http"
"encoding/binary"
"github.com/Shopify/sarama"
)
var (
RequestChannel = make(chan *RequestObject)
)
type RequestObject struct {
Body []byte
Header http.Header
}
func ProcessRequestChannel() {
for {
request := <- RequestChannel
ClickstreamProtobufEventHandler(*request, "testgo")
}
}
func ClickstreamProtobufEventHandler(request RequestObject, topic string) {
messageBytes := request.Body
recordValue := []byte{}
// add [magicByte]
recordValue = append(recordValue, byte(0))
// add schemaID]
schemaIDBytes := make([]byte, 4)
binary.BigEndian.PutUint32(schemaIDBytes, uint32(producer_module.SchemaVersionMap[topic]))
recordValue = append(recordValue, schemaIDBytes...)
// add [messageIndex]
messageIndexBytes := []byte{byte(2), byte(0)}
recordValue = append(recordValue, messageIndexBytes...)
// Now write the bytes from the actual value...
// valueBytes, _ := proto.Marshal(&sensorReading)
recordValue = append(recordValue, messageBytes...)
message := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.ByteEncoder(recordValue),
}
if message != nil {
log.Printf("WRITTEN TO KAFKA")
}
producer_module.WriteMessageToKafkaAsync(message)
}