package lib import ( metrics "com.navi.medici.janus/instrumentation" producer_module "com.navi.medici.janus/producer" "com.navi.medici.janus/schema" // "log" "encoding/binary" "encoding/json" "net/http" "github.com/Shopify/sarama" ) var ( ProtobufRequestChannel = make(chan *RequestObject) JsonRequestChannel = make(chan *RequestObject) ) type RequestObject struct { Body []byte Header http.Header } func ProcessProtobufRequestChannel(topic string) { for { request := <-ProtobufRequestChannel ClickstreamProtobufEventHandler(*request, topic) } } func ProcessJsonRequestChannel(topic string) { for { request := <-JsonRequestChannel ClickstreamJsonEventHandler(*request, topic) } } func ClickstreamProtobufEventHandler(request RequestObject, topic string) { messageBytes := request.Body //getting the client which has sent this event var result map[string]interface{} json.Unmarshal(messageBytes, &result) source := getSource(result) //log.Print(source) // to be of the format [magicByte] + [schemaID] + [messageIndex] + [value] recordValue := []byte{} // add [magicByte] recordValue = append(recordValue, byte(0)) // add schemaID] schemaIDBytes := make([]byte, 4) binary.BigEndian.PutUint32(schemaIDBytes, uint32(schema.SchemaVersionMap[topic])) recordValue = append(recordValue, schemaIDBytes...) // add [messageIndex] messageIndexBytes := []byte{byte(2), byte(0)} recordValue = append(recordValue, messageIndexBytes...) // Now write the bytes from the actual message... recordValue = append(recordValue, messageBytes...) message := &sarama.ProducerMessage{ Topic: topic, Value: sarama.ByteEncoder(recordValue), } metrics.IncrementCounter("request", source) producer_module.WriteMessageToKafkaAsync(message, source) } func ClickstreamJsonEventHandler(request RequestObject, topic string) { messageBytes := request.Body //getting the client which has sent this event var result map[string]interface{} json.Unmarshal(messageBytes, &result) source := getSource(result) // to be of the format [magicByte] + [schemaID] + [messageIndex] + [value] recordValue := []byte{} // add [magicByte] // recordValue = append(recordValue, byte(0)) // // add schemaID] // schemaIDBytes := make([]byte, 4) // binary.BigEndian.PutUint32(schemaIDBytes, uint32(schema.SchemaVersionMap[topic])) // recordValue = append(recordValue, schemaIDBytes...) // Now write the bytes from the actual message... recordValue = append(recordValue, messageBytes...) message := &sarama.ProducerMessage{ Topic: topic, Value: sarama.ByteEncoder(recordValue), } metrics.IncrementCounter("request", source) producer_module.WriteMessageToKafkaAsync(message, source) } func getSource(event map[string]interface{}) string { if event["source"] == nil { return "UNKNOWN" } return event["source"].(string) }