removed unused api handlers and protobuf processing logic

This commit is contained in:
aishwarya-raimule
2023-01-23 16:22:44 +05:30
parent 374fe655d3
commit 9ecc13f2d3
2 changed files with 0 additions and 48 deletions

View File

@@ -3,10 +3,7 @@ package lib
import (
metrics "com.navi.medici.janus/instrumentation"
producer_module "com.navi.medici.janus/producer"
"com.navi.medici.janus/schema"
// "log"
"encoding/binary"
"encoding/json"
"net/http"
@@ -23,13 +20,6 @@ type RequestObject struct {
Header http.Header
}
func ProcessProtobufRequestChannel(topic string) {
for {
request := <-ProtobufRequestChannel
ClickstreamProtobufEventHandler(*request, topic)
}
}
func ProcessJsonRequestChannel(topic string) {
for {
request := <-JsonRequestChannel
@@ -37,40 +27,6 @@ func ProcessJsonRequestChannel(topic string) {
}
}
func ClickstreamProtobufEventHandler(request RequestObject, topic string) {
messageBytes := request.Body
//getting the client which has sent this event
var result map[string]interface{}
json.Unmarshal(messageBytes, &result)
source := getSource(result)
//log.Print(source)
// to be of the format [magicByte] + [schemaID] + [messageIndex] + [value]
recordValue := []byte{}
// add [magicByte]
recordValue = append(recordValue, byte(0))
// add schemaID]
schemaIDBytes := make([]byte, 4)
binary.BigEndian.PutUint32(schemaIDBytes, uint32(schema.SchemaVersionMap[topic]))
recordValue = append(recordValue, schemaIDBytes...)
// add [messageIndex]
messageIndexBytes := []byte{byte(2), byte(0)}
recordValue = append(recordValue, messageIndexBytes...)
// Now write the bytes from the actual message...
recordValue = append(recordValue, messageBytes...)
message := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.ByteEncoder(recordValue),
}
metrics.IncrementCounter("request", source)
producer_module.WriteMessageToKafkaAsync(message, source)
}
func ClickstreamJsonEventHandler(request RequestObject, topic string) {
messageBytes := request.Body

View File

@@ -23,10 +23,6 @@ func init() {
schema.InitializeSchemaHandler(configs)
producer_client.InitializeProducers(configs.Kafka, configs.Env)
for i := 0; i < 2; i++ {
go lib.ProcessProtobufRequestChannel(configs.Kafka.Kafka_Topic_Protobuf)
}
for i := 0; i < configs.Server.Goroutines; i++ {
go lib.ProcessJsonRequestChannel(configs.Kafka.Kafka_Topic_Json)
}