package lib import ( metrics "com.navi.medici.janus/instrumentation" producer_module "com.navi.medici.janus/producer" "com.navi.medici.janus/utils" "encoding/json" "net/http" "time" "github.com/Shopify/sarama" ) var ( ProtobufRequestChannel = make(chan *RequestObject) JsonRequestChannel = make(chan *RequestObject) ) type RequestObject struct { Body []byte Header http.Header } func ProcessJsonRequestChannel(topic string) { for { request := <-JsonRequestChannel ClickstreamJsonEventHandler(*request, topic) } } func ClickstreamJsonEventHandler(request RequestObject, topic string) { eventProcessingStartTime := utils.NanosToMillis(time.Now().UnixNano()) messageBytes := request.Body //getting the client which has sent this event var result map[string]interface{} json.Unmarshal(messageBytes, &result) source := getSource(result) // to be of the format [magicByte] + [schemaID] + [messageIndex] + [value] recordValue := []byte{} // add [magicByte] // recordValue = append(recordValue, byte(0)) // // add schemaID] // schemaIDBytes := make([]byte, 4) // binary.BigEndian.PutUint32(schemaIDBytes, uint32(schema.SchemaVersionMap[topic])) // recordValue = append(recordValue, schemaIDBytes...) // Now write the bytes from the actual message... recordValue = append(recordValue, messageBytes...) message := &sarama.ProducerMessage{ Topic: topic, Value: sarama.ByteEncoder(recordValue), } metrics.RequestCounter.WithLabelValues(source).Inc() // processing complete, record duration metrics metrics.EventProcessingTimeHist.WithLabelValues(topic, source).Observe(float64(utils.NanosToMillis(time.Now().UnixNano()) - eventProcessingStartTime)) producer_module.WriteMessageToKafkaAsync(message, source) } func getSource(event map[string]interface{}) string { if event["source"] == nil { return "UNKNOWN" } return event["source"].(string) }