diff --git a/lib/RequestHandler.go b/lib/RequestHandler.go index a921bc7..bd6ac90 100644 --- a/lib/RequestHandler.go +++ b/lib/RequestHandler.go @@ -1,104 +1,111 @@ package lib import ( - metrics "com.navi.medici.janus/instrumentation" - producer_module "com.navi.medici.janus/producer" - "com.navi.medici.janus/schema" + metrics "com.navi.medici.janus/instrumentation" + producer_module "com.navi.medici.janus/producer" + "com.navi.medici.janus/schema" - // "log" - "encoding/binary" - "encoding/json" - "net/http" + // "log" + "encoding/binary" + "encoding/json" + "net/http" - "github.com/Shopify/sarama" + "github.com/Shopify/sarama" ) var ( - ProtobufRequestChannel = make(chan *RequestObject) - JsonRequestChannel = make(chan *RequestObject) + ProtobufRequestChannel = make(chan *RequestObject) + JsonRequestChannel = make(chan *RequestObject) ) type RequestObject struct { - Body []byte - Header http.Header + Body []byte + Header http.Header } func ProcessProtobufRequestChannel(topic string) { - for { - request := <-ProtobufRequestChannel - ClickstreamProtobufEventHandler(*request, topic) - } + for { + request := <-ProtobufRequestChannel + ClickstreamProtobufEventHandler(*request, topic) + } } func ProcessJsonRequestChannel(topic string) { - for { - request := <-JsonRequestChannel - ClickstreamJsonEventHandler(*request, topic) - } + for { + request := <-JsonRequestChannel + ClickstreamJsonEventHandler(*request, topic) + } } func ClickstreamProtobufEventHandler(request RequestObject, topic string) { - messageBytes := request.Body - //getting the client which has sent this event - var result map[string]interface{} - json.Unmarshal(messageBytes, &result) - source := result["source"].(string) - //log.Print(source) - // to be of the format [magicByte] + [schemaID] + [messageIndex] + [value] - recordValue := []byte{} + messageBytes := request.Body + //getting the client which has sent this event + var result map[string]interface{} + json.Unmarshal(messageBytes, &result) + source := getSource(result) + //log.Print(source) + // to be of the format [magicByte] + [schemaID] + [messageIndex] + [value] + recordValue := []byte{} - // add [magicByte] - recordValue = append(recordValue, byte(0)) + // add [magicByte] + recordValue = append(recordValue, byte(0)) - // add schemaID] - schemaIDBytes := make([]byte, 4) - binary.BigEndian.PutUint32(schemaIDBytes, uint32(schema.SchemaVersionMap[topic])) - recordValue = append(recordValue, schemaIDBytes...) + // add schemaID] + schemaIDBytes := make([]byte, 4) + binary.BigEndian.PutUint32(schemaIDBytes, uint32(schema.SchemaVersionMap[topic])) + recordValue = append(recordValue, schemaIDBytes...) - // add [messageIndex] - messageIndexBytes := []byte{byte(2), byte(0)} - recordValue = append(recordValue, messageIndexBytes...) + // add [messageIndex] + messageIndexBytes := []byte{byte(2), byte(0)} + recordValue = append(recordValue, messageIndexBytes...) - // Now write the bytes from the actual message... - recordValue = append(recordValue, messageBytes...) + // Now write the bytes from the actual message... + recordValue = append(recordValue, messageBytes...) - message := &sarama.ProducerMessage{ - Topic: topic, - Value: sarama.ByteEncoder(recordValue), - } - metrics.IncrementCounter("request", source) - producer_module.WriteMessageToKafkaAsync(message, source) + message := &sarama.ProducerMessage{ + Topic: topic, + Value: sarama.ByteEncoder(recordValue), + } + metrics.IncrementCounter("request", source) + producer_module.WriteMessageToKafkaAsync(message, source) } func ClickstreamJsonEventHandler(request RequestObject, topic string) { - messageBytes := request.Body + messageBytes := request.Body - //getting the client which has sent this event - var result map[string]interface{} - json.Unmarshal(messageBytes, &result) - source := result["source"].(string) + //getting the client which has sent this event + var result map[string]interface{} + json.Unmarshal(messageBytes, &result) + source := getSource(result) - // to be of the format [magicByte] + [schemaID] + [messageIndex] + [value] - recordValue := []byte{} + // to be of the format [magicByte] + [schemaID] + [messageIndex] + [value] + recordValue := []byte{} - // add [magicByte] - // recordValue = append(recordValue, byte(0)) + // add [magicByte] + // recordValue = append(recordValue, byte(0)) - // // add schemaID] - // schemaIDBytes := make([]byte, 4) - // binary.BigEndian.PutUint32(schemaIDBytes, uint32(schema.SchemaVersionMap[topic])) - // recordValue = append(recordValue, schemaIDBytes...) + // // add schemaID] + // schemaIDBytes := make([]byte, 4) + // binary.BigEndian.PutUint32(schemaIDBytes, uint32(schema.SchemaVersionMap[topic])) + // recordValue = append(recordValue, schemaIDBytes...) - // Now write the bytes from the actual message... - recordValue = append(recordValue, messageBytes...) + // Now write the bytes from the actual message... + recordValue = append(recordValue, messageBytes...) - message := &sarama.ProducerMessage{ - Topic: topic, - Value: sarama.ByteEncoder(recordValue), - } + message := &sarama.ProducerMessage{ + Topic: topic, + Value: sarama.ByteEncoder(recordValue), + } - metrics.IncrementCounter("request", source) - producer_module.WriteMessageToKafkaAsync(message, source) + metrics.IncrementCounter("request", source) + producer_module.WriteMessageToKafkaAsync(message, source) +} + +func getSource(event map[string]interface{}) string { + if event["source"] == nil { + return "UNKNOWN" + } + return event["source"].(string) }