Files
janus/lib/RequestHandler.go

106 lines
2.9 KiB
Go
Raw Normal View History

2021-03-30 16:05:40 +05:30
package lib
import (
2022-03-31 16:24:32 +05:30
metrics "com.navi.medici.janus/instrumentation"
producer_module "com.navi.medici.janus/producer"
2021-03-30 16:05:40 +05:30
2022-03-31 16:24:32 +05:30
// "log"
"encoding/binary"
"encoding/json"
"net/http"
"github.com/Shopify/sarama"
2021-03-30 16:05:40 +05:30
)
var (
2021-04-27 16:40:13 +05:30
ProtobufRequestChannel = make(chan *RequestObject)
JsonRequestChannel = make(chan *RequestObject)
2021-03-30 16:05:40 +05:30
)
type RequestObject struct {
2021-04-27 16:40:13 +05:30
Body []byte
Header http.Header
2021-03-30 16:05:40 +05:30
}
2021-04-27 16:40:13 +05:30
func ProcessProtobufRequestChannel(topic string) {
2021-03-30 16:05:40 +05:30
for {
2021-04-27 16:40:13 +05:30
request := <- ProtobufRequestChannel
2021-03-30 16:32:05 +05:30
ClickstreamProtobufEventHandler(*request, topic)
2021-03-30 16:05:40 +05:30
}
}
2021-04-27 16:40:13 +05:30
func ProcessJsonRequestChannel(topic string) {
for {
request := <- JsonRequestChannel
ClickstreamJsonEventHandler(*request, topic)
}
}
2021-03-30 16:05:40 +05:30
func ClickstreamProtobufEventHandler(request RequestObject, topic string) {
messageBytes := request.Body
2022-03-31 16:24:32 +05:30
//getting the client which has sent this event
var result map[string]interface{}
json.Unmarshal(messageBytes, &result)
source := result["source"].(string)
//log.Print(source)
2021-04-01 12:56:04 +05:30
// to be of the format [magicByte] + [schemaID] + [messageIndex] + [value]
2021-04-27 16:40:13 +05:30
recordValue := []byte{}
2021-03-30 16:05:40 +05:30
// add [magicByte]
recordValue = append(recordValue, byte(0))
// add schemaID]
schemaIDBytes := make([]byte, 4)
binary.BigEndian.PutUint32(schemaIDBytes, uint32(producer_module.SchemaVersionMap[topic]))
recordValue = append(recordValue, schemaIDBytes...)
// add [messageIndex]
messageIndexBytes := []byte{byte(2), byte(0)}
recordValue = append(recordValue, messageIndexBytes...)
2021-04-01 12:56:04 +05:30
// Now write the bytes from the actual message...
2021-03-30 16:05:40 +05:30
recordValue = append(recordValue, messageBytes...)
2021-04-27 16:40:13 +05:30
message := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.ByteEncoder(recordValue),
}
2022-03-31 16:24:32 +05:30
metrics.IncrementCounter("request", source)
producer_module.WriteMessageToKafkaAsync(message, source)
2021-04-27 16:40:13 +05:30
}
func ClickstreamJsonEventHandler(request RequestObject, topic string) {
messageBytes := request.Body
2022-03-31 16:24:32 +05:30
//getting the client which has sent this event
var result map[string]interface{}
json.Unmarshal(messageBytes, &result)
source := result["source"].(string)
2021-04-27 16:40:13 +05:30
// to be of the format [magicByte] + [schemaID] + [messageIndex] + [value]
recordValue := []byte{}
// add [magicByte]
recordValue = append(recordValue, byte(0))
// add schemaID]
schemaIDBytes := make([]byte, 4)
binary.BigEndian.PutUint32(schemaIDBytes, uint32(producer_module.SchemaVersionMap[topic]))
recordValue = append(recordValue, schemaIDBytes...)
// Now write the bytes from the actual message...
recordValue = append(recordValue, messageBytes...)
message := &sarama.ProducerMessage{
2021-03-30 16:05:40 +05:30
Topic: topic,
Value: sarama.ByteEncoder(recordValue),
}
2022-03-31 16:24:32 +05:30
metrics.IncrementCounter("request", source)
producer_module.WriteMessageToKafkaAsync(message, source)
2021-03-30 16:05:40 +05:30
}