package lib import ( producer_module "com.navi.medici.janus/producer" // "log" "net/http" "encoding/binary" "github.com/Shopify/sarama" ) var ( ProtobufRequestChannel = make(chan *RequestObject) JsonRequestChannel = make(chan *RequestObject) ) type RequestObject struct { Body []byte Header http.Header } func ProcessProtobufRequestChannel(topic string) { for { request := <- ProtobufRequestChannel ClickstreamProtobufEventHandler(*request, topic) } } func ProcessJsonRequestChannel(topic string) { for { request := <- JsonRequestChannel ClickstreamJsonEventHandler(*request, topic) } } func ClickstreamProtobufEventHandler(request RequestObject, topic string) { messageBytes := request.Body // to be of the format [magicByte] + [schemaID] + [messageIndex] + [value] recordValue := []byte{} // add [magicByte] recordValue = append(recordValue, byte(0)) // add schemaID] schemaIDBytes := make([]byte, 4) binary.BigEndian.PutUint32(schemaIDBytes, uint32(producer_module.SchemaVersionMap[topic])) recordValue = append(recordValue, schemaIDBytes...) // add [messageIndex] messageIndexBytes := []byte{byte(2), byte(0)} recordValue = append(recordValue, messageIndexBytes...) // Now write the bytes from the actual message... recordValue = append(recordValue, messageBytes...) message := &sarama.ProducerMessage{ Topic: topic, Value: sarama.ByteEncoder(recordValue), } producer_module.WriteMessageToKafkaAsync(message) } func ClickstreamJsonEventHandler(request RequestObject, topic string) { messageBytes := request.Body // to be of the format [magicByte] + [schemaID] + [messageIndex] + [value] recordValue := []byte{} // add [magicByte] recordValue = append(recordValue, byte(0)) // add schemaID] schemaIDBytes := make([]byte, 4) binary.BigEndian.PutUint32(schemaIDBytes, uint32(producer_module.SchemaVersionMap[topic])) recordValue = append(recordValue, schemaIDBytes...) // Now write the bytes from the actual message... recordValue = append(recordValue, messageBytes...) message := &sarama.ProducerMessage{ Topic: topic, Value: sarama.ByteEncoder(recordValue), } producer_module.WriteMessageToKafkaAsync(message) }