resolved merge conflict

This commit is contained in:
aishwarya-raimule
2023-01-17 14:41:00 +05:30

View File

@@ -44,7 +44,7 @@ func ClickstreamProtobufEventHandler(request RequestObject, topic string) {
//getting the client which has sent this event
var result map[string]interface{}
json.Unmarshal(messageBytes, &result)
source := result["source"].(string)
source := getSource(result)
//log.Print(source)
// to be of the format [magicByte] + [schemaID] + [messageIndex] + [value]
recordValue := []byte{}
@@ -79,7 +79,7 @@ func ClickstreamJsonEventHandler(request RequestObject, topic string) {
//getting the client which has sent this event
var result map[string]interface{}
json.Unmarshal(messageBytes, &result)
source := result["source"].(string)
source := getSource(result)
// to be of the format [magicByte] + [schemaID] + [messageIndex] + [value]
recordValue := []byte{}
@@ -105,3 +105,10 @@ func ClickstreamJsonEventHandler(request RequestObject, topic string) {
metrics.EventProcessingTimeHist.WithLabelValues(topic, source).Observe(float64(utils.NanosToMillis(time.Now().UnixNano()) - eventProcessingStartTime))
producer_module.WriteMessageToKafkaAsync(message, source)
}
func getSource(event map[string]interface{}) string {
if event["source"] == nil {
return "UNKNOWN"
}
return event["source"].(string)
}