package lib import ( producer_module "com.navi.medici.janus/producer" "log" "net/http" "encoding/binary" "github.com/Shopify/sarama" ) var ( RequestChannel = make(chan *RequestObject) ) type RequestObject struct { Body []byte Header http.Header } func ProcessRequestChannel(topic string) { for { request := <- RequestChannel ClickstreamProtobufEventHandler(*request, topic) } } func ClickstreamProtobufEventHandler(request RequestObject, topic string) { messageBytes := request.Body recordValue := []byte{} // add [magicByte] recordValue = append(recordValue, byte(0)) // add schemaID] schemaIDBytes := make([]byte, 4) binary.BigEndian.PutUint32(schemaIDBytes, uint32(producer_module.SchemaVersionMap[topic])) recordValue = append(recordValue, schemaIDBytes...) // add [messageIndex] messageIndexBytes := []byte{byte(2), byte(0)} recordValue = append(recordValue, messageIndexBytes...) // Now write the bytes from the actual value... // valueBytes, _ := proto.Marshal(&sensorReading) recordValue = append(recordValue, messageBytes...) message := &sarama.ProducerMessage{ Topic: topic, Value: sarama.ByteEncoder(recordValue), } if message != nil { log.Printf("WRITING TO KAFKA") } producer_module.WriteMessageToKafkaAsync(message) }