Merge pull request #24 from medici/unknown-source-fix

Check if source is nil in the event, set it to UNKNOWN in that case
This commit is contained in:
nitin kulkarni
2023-01-17 12:47:51 +05:30
committed by GitHub Enterprise

View File

@@ -1,7 +1,6 @@
package lib
import (
metrics "com.navi.medici.janus/instrumentation"
producer_module "com.navi.medici.janus/producer"
@@ -14,92 +13,98 @@ import (
)
var (
ProtobufRequestChannel = make(chan *RequestObject)
JsonRequestChannel = make(chan *RequestObject)
ProtobufRequestChannel = make(chan *RequestObject)
JsonRequestChannel = make(chan *RequestObject)
)
type RequestObject struct {
Body []byte
Header http.Header
Body []byte
Header http.Header
}
func ProcessProtobufRequestChannel(topic string) {
for {
request := <- ProtobufRequestChannel
ClickstreamProtobufEventHandler(*request, topic)
}
for {
request := <-ProtobufRequestChannel
ClickstreamProtobufEventHandler(*request, topic)
}
}
func ProcessJsonRequestChannel(topic string) {
for {
request := <- JsonRequestChannel
ClickstreamJsonEventHandler(*request, topic)
}
for {
request := <-JsonRequestChannel
ClickstreamJsonEventHandler(*request, topic)
}
}
func ClickstreamProtobufEventHandler(request RequestObject, topic string) {
messageBytes := request.Body
//getting the client which has sent this event
var result map[string]interface{}
json.Unmarshal(messageBytes, &result)
source := result["source"].(string)
//log.Print(source)
// to be of the format [magicByte] + [schemaID] + [messageIndex] + [value]
recordValue := []byte{}
messageBytes := request.Body
//getting the client which has sent this event
var result map[string]interface{}
json.Unmarshal(messageBytes, &result)
source := getSource(result)
//log.Print(source)
// to be of the format [magicByte] + [schemaID] + [messageIndex] + [value]
recordValue := []byte{}
// add [magicByte]
recordValue = append(recordValue, byte(0))
// add [magicByte]
recordValue = append(recordValue, byte(0))
// add schemaID]
schemaIDBytes := make([]byte, 4)
binary.BigEndian.PutUint32(schemaIDBytes, uint32(producer_module.SchemaVersionMap[topic]))
recordValue = append(recordValue, schemaIDBytes...)
// add schemaID]
schemaIDBytes := make([]byte, 4)
binary.BigEndian.PutUint32(schemaIDBytes, uint32(producer_module.SchemaVersionMap[topic]))
recordValue = append(recordValue, schemaIDBytes...)
// add [messageIndex]
messageIndexBytes := []byte{byte(2), byte(0)}
recordValue = append(recordValue, messageIndexBytes...)
// add [messageIndex]
messageIndexBytes := []byte{byte(2), byte(0)}
recordValue = append(recordValue, messageIndexBytes...)
// Now write the bytes from the actual message...
recordValue = append(recordValue, messageBytes...)
// Now write the bytes from the actual message...
recordValue = append(recordValue, messageBytes...)
message := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.ByteEncoder(recordValue),
}
metrics.IncrementCounter("request", source)
producer_module.WriteMessageToKafkaAsync(message, source)
message := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.ByteEncoder(recordValue),
}
metrics.IncrementCounter("request", source)
producer_module.WriteMessageToKafkaAsync(message, source)
}
func ClickstreamJsonEventHandler(request RequestObject, topic string) {
messageBytes := request.Body
messageBytes := request.Body
//getting the client which has sent this event
var result map[string]interface{}
json.Unmarshal(messageBytes, &result)
source := result["source"].(string)
//getting the client which has sent this event
var result map[string]interface{}
json.Unmarshal(messageBytes, &result)
source := getSource(result)
// to be of the format [magicByte] + [schemaID] + [messageIndex] + [value]
recordValue := []byte{}
// to be of the format [magicByte] + [schemaID] + [messageIndex] + [value]
recordValue := []byte{}
// add [magicByte]
// recordValue = append(recordValue, byte(0))
// add [magicByte]
// recordValue = append(recordValue, byte(0))
// // add schemaID]
// schemaIDBytes := make([]byte, 4)
// binary.BigEndian.PutUint32(schemaIDBytes, uint32(producer_module.SchemaVersionMap[topic]))
// recordValue = append(recordValue, schemaIDBytes...)
// // add schemaID]
// schemaIDBytes := make([]byte, 4)
// binary.BigEndian.PutUint32(schemaIDBytes, uint32(producer_module.SchemaVersionMap[topic]))
// recordValue = append(recordValue, schemaIDBytes...)
// Now write the bytes from the actual message...
recordValue = append(recordValue, messageBytes...)
// Now write the bytes from the actual message...
recordValue = append(recordValue, messageBytes...)
message := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.ByteEncoder(recordValue),
}
metrics.IncrementCounter("request", source)
producer_module.WriteMessageToKafkaAsync(message, source)
message := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.ByteEncoder(recordValue),
}
metrics.IncrementCounter("request", source)
producer_module.WriteMessageToKafkaAsync(message, source)
}
func getSource(event map[string]interface{}) string {
if event["source"] == nil {
return "UNKNOWN"
}
return event["source"].(string)
}