diff --git a/lib/RequestHandler.go b/lib/RequestHandler.go index 939d8c0..aae18f9 100644 --- a/lib/RequestHandler.go +++ b/lib/RequestHandler.go @@ -44,7 +44,7 @@ func ClickstreamProtobufEventHandler(request RequestObject, topic string) { //getting the client which has sent this event var result map[string]interface{} json.Unmarshal(messageBytes, &result) - source := result["source"].(string) + source := getSource(result) //log.Print(source) // to be of the format [magicByte] + [schemaID] + [messageIndex] + [value] recordValue := []byte{} @@ -79,7 +79,7 @@ func ClickstreamJsonEventHandler(request RequestObject, topic string) { //getting the client which has sent this event var result map[string]interface{} json.Unmarshal(messageBytes, &result) - source := result["source"].(string) + source := getSource(result) // to be of the format [magicByte] + [schemaID] + [messageIndex] + [value] recordValue := []byte{} @@ -105,3 +105,10 @@ func ClickstreamJsonEventHandler(request RequestObject, topic string) { metrics.EventProcessingTimeHist.WithLabelValues(topic, source).Observe(float64(utils.NanosToMillis(time.Now().UnixNano()) - eventProcessingStartTime)) producer_module.WriteMessageToKafkaAsync(message, source) } + +func getSource(event map[string]interface{}) string { + if event["source"] == nil { + return "UNKNOWN" + } + return event["source"].(string) +}