check if source from the event is unknown
This commit is contained in:
@@ -4,6 +4,7 @@ import (
|
||||
metrics "com.navi.medici.janus/instrumentation"
|
||||
producer_module "com.navi.medici.janus/producer"
|
||||
"com.navi.medici.janus/schema"
|
||||
"com.navi.medici.janus/utils"
|
||||
|
||||
"encoding/binary"
|
||||
"encoding/json"
|
||||
@@ -42,7 +43,7 @@ func ClickstreamProtobufEventHandler(request RequestObject, topic string) {
|
||||
//getting the client which has sent this event
|
||||
var result map[string]interface{}
|
||||
json.Unmarshal(messageBytes, &result)
|
||||
source := result["source"].(string)
|
||||
source := getSource(result)
|
||||
//log.Print(source)
|
||||
// to be of the format [magicByte] + [schemaID] + [messageIndex] + [value]
|
||||
recordValue := []byte{}
|
||||
@@ -77,7 +78,9 @@ func ClickstreamJsonEventHandler(request RequestObject, topic string) {
|
||||
//getting the client which has sent this event
|
||||
var result map[string]interface{}
|
||||
json.Unmarshal(messageBytes, &result)
|
||||
source := result["source"].(string)
|
||||
source := getSource(result)
|
||||
logger := utils.GetLogger()
|
||||
logger.Debug("Source: " + source)
|
||||
|
||||
// to be of the format [magicByte] + [schemaID] + [messageIndex] + [value]
|
||||
recordValue := []byte{}
|
||||
@@ -101,3 +104,10 @@ func ClickstreamJsonEventHandler(request RequestObject, topic string) {
|
||||
metrics.IncrementCounter("request", source)
|
||||
producer_module.WriteMessageToKafkaAsync(message, source)
|
||||
}
|
||||
|
||||
func getSource(event map[string]interface{}) string {
|
||||
if event["source"] == nil {
|
||||
event["source"] = "UNKNOWN"
|
||||
}
|
||||
return event["source"].(string)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user