From 73b4455958a1f89b459fc171f20cc9127c329469 Mon Sep 17 00:00:00 2001 From: aishwarya-raimule Date: Tue, 17 Jan 2023 12:11:53 +0530 Subject: [PATCH 1/2] check if source is nil, set it to UNKNOWN in that case --- lib/RequestHandler.go | 127 ++++++++++++++++++++++-------------------- 1 file changed, 66 insertions(+), 61 deletions(-) diff --git a/lib/RequestHandler.go b/lib/RequestHandler.go index d829ceb..528b7e5 100644 --- a/lib/RequestHandler.go +++ b/lib/RequestHandler.go @@ -1,7 +1,6 @@ package lib import ( - metrics "com.navi.medici.janus/instrumentation" producer_module "com.navi.medici.janus/producer" @@ -14,92 +13,98 @@ import ( ) var ( - ProtobufRequestChannel = make(chan *RequestObject) - JsonRequestChannel = make(chan *RequestObject) + ProtobufRequestChannel = make(chan *RequestObject) + JsonRequestChannel = make(chan *RequestObject) ) type RequestObject struct { - Body []byte - Header http.Header + Body []byte + Header http.Header } func ProcessProtobufRequestChannel(topic string) { - for { - request := <- ProtobufRequestChannel - ClickstreamProtobufEventHandler(*request, topic) - } + for { + request := <-ProtobufRequestChannel + ClickstreamProtobufEventHandler(*request, topic) + } } func ProcessJsonRequestChannel(topic string) { - for { - request := <- JsonRequestChannel - ClickstreamJsonEventHandler(*request, topic) - } + for { + request := <-JsonRequestChannel + ClickstreamJsonEventHandler(*request, topic) + } } - func ClickstreamProtobufEventHandler(request RequestObject, topic string) { - messageBytes := request.Body - //getting the client which has sent this event - var result map[string]interface{} - json.Unmarshal(messageBytes, &result) - source := result["source"].(string) - //log.Print(source) - // to be of the format [magicByte] + [schemaID] + [messageIndex] + [value] - recordValue := []byte{} + messageBytes := request.Body + //getting the client which has sent this event + var result map[string]interface{} + json.Unmarshal(messageBytes, &result) + source := getSource(result) + //log.Print(source) + // to be of the format [magicByte] + [schemaID] + [messageIndex] + [value] + recordValue := []byte{} - // add [magicByte] - recordValue = append(recordValue, byte(0)) + // add [magicByte] + recordValue = append(recordValue, byte(0)) - // add schemaID] - schemaIDBytes := make([]byte, 4) - binary.BigEndian.PutUint32(schemaIDBytes, uint32(producer_module.SchemaVersionMap[topic])) - recordValue = append(recordValue, schemaIDBytes...) + // add schemaID] + schemaIDBytes := make([]byte, 4) + binary.BigEndian.PutUint32(schemaIDBytes, uint32(producer_module.SchemaVersionMap[topic])) + recordValue = append(recordValue, schemaIDBytes...) - // add [messageIndex] - messageIndexBytes := []byte{byte(2), byte(0)} - recordValue = append(recordValue, messageIndexBytes...) + // add [messageIndex] + messageIndexBytes := []byte{byte(2), byte(0)} + recordValue = append(recordValue, messageIndexBytes...) - // Now write the bytes from the actual message... - recordValue = append(recordValue, messageBytes...) + // Now write the bytes from the actual message... + recordValue = append(recordValue, messageBytes...) - message := &sarama.ProducerMessage{ - Topic: topic, - Value: sarama.ByteEncoder(recordValue), - } - metrics.IncrementCounter("request", source) - producer_module.WriteMessageToKafkaAsync(message, source) + message := &sarama.ProducerMessage{ + Topic: topic, + Value: sarama.ByteEncoder(recordValue), + } + metrics.IncrementCounter("request", source) + producer_module.WriteMessageToKafkaAsync(message, source) } func ClickstreamJsonEventHandler(request RequestObject, topic string) { - messageBytes := request.Body + messageBytes := request.Body - //getting the client which has sent this event - var result map[string]interface{} - json.Unmarshal(messageBytes, &result) - source := result["source"].(string) + //getting the client which has sent this event + var result map[string]interface{} + json.Unmarshal(messageBytes, &result) + source := getSource(result) - // to be of the format [magicByte] + [schemaID] + [messageIndex] + [value] - recordValue := []byte{} + // to be of the format [magicByte] + [schemaID] + [messageIndex] + [value] + recordValue := []byte{} - // add [magicByte] - // recordValue = append(recordValue, byte(0)) + // add [magicByte] + // recordValue = append(recordValue, byte(0)) - // // add schemaID] - // schemaIDBytes := make([]byte, 4) - // binary.BigEndian.PutUint32(schemaIDBytes, uint32(producer_module.SchemaVersionMap[topic])) - // recordValue = append(recordValue, schemaIDBytes...) + // // add schemaID] + // schemaIDBytes := make([]byte, 4) + // binary.BigEndian.PutUint32(schemaIDBytes, uint32(producer_module.SchemaVersionMap[topic])) + // recordValue = append(recordValue, schemaIDBytes...) - // Now write the bytes from the actual message... - recordValue = append(recordValue, messageBytes...) + // Now write the bytes from the actual message... + recordValue = append(recordValue, messageBytes...) - message := &sarama.ProducerMessage{ - Topic: topic, - Value: sarama.ByteEncoder(recordValue), - } - - metrics.IncrementCounter("request", source) - producer_module.WriteMessageToKafkaAsync(message, source) + message := &sarama.ProducerMessage{ + Topic: topic, + Value: sarama.ByteEncoder(recordValue), + } + + metrics.IncrementCounter("request", source) + producer_module.WriteMessageToKafkaAsync(message, source) +} + +func getSource(event map[string]interface{}) string { + if event["source"] == nil { + event["source"] = "UNKNOWN" + } + return event["source"].(string) } From 5abdd8289fb051ba87deb18eba098131224260b3 Mon Sep 17 00:00:00 2001 From: aishwarya-raimule Date: Tue, 17 Jan 2023 12:33:47 +0530 Subject: [PATCH 2/2] check if source is nil, return UNKNOWN in that case --- lib/RequestHandler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/RequestHandler.go b/lib/RequestHandler.go index 528b7e5..cc90cb8 100644 --- a/lib/RequestHandler.go +++ b/lib/RequestHandler.go @@ -104,7 +104,7 @@ func ClickstreamJsonEventHandler(request RequestObject, topic string) { func getSource(event map[string]interface{}) string { if event["source"] == nil { - event["source"] = "UNKNOWN" + return "UNKNOWN" } return event["source"].(string) }