diff --git a/lib/RequestHandler.go b/lib/RequestHandler.go index bd6ac90..68b494d 100644 --- a/lib/RequestHandler.go +++ b/lib/RequestHandler.go @@ -3,10 +3,7 @@ package lib import ( metrics "com.navi.medici.janus/instrumentation" producer_module "com.navi.medici.janus/producer" - "com.navi.medici.janus/schema" - // "log" - "encoding/binary" "encoding/json" "net/http" @@ -23,13 +20,6 @@ type RequestObject struct { Header http.Header } -func ProcessProtobufRequestChannel(topic string) { - for { - request := <-ProtobufRequestChannel - ClickstreamProtobufEventHandler(*request, topic) - } -} - func ProcessJsonRequestChannel(topic string) { for { request := <-JsonRequestChannel @@ -37,40 +27,6 @@ func ProcessJsonRequestChannel(topic string) { } } -func ClickstreamProtobufEventHandler(request RequestObject, topic string) { - - messageBytes := request.Body - //getting the client which has sent this event - var result map[string]interface{} - json.Unmarshal(messageBytes, &result) - source := getSource(result) - //log.Print(source) - // to be of the format [magicByte] + [schemaID] + [messageIndex] + [value] - recordValue := []byte{} - - // add [magicByte] - recordValue = append(recordValue, byte(0)) - - // add schemaID] - schemaIDBytes := make([]byte, 4) - binary.BigEndian.PutUint32(schemaIDBytes, uint32(schema.SchemaVersionMap[topic])) - recordValue = append(recordValue, schemaIDBytes...) - - // add [messageIndex] - messageIndexBytes := []byte{byte(2), byte(0)} - recordValue = append(recordValue, messageIndexBytes...) - - // Now write the bytes from the actual message... - recordValue = append(recordValue, messageBytes...) - - message := &sarama.ProducerMessage{ - Topic: topic, - Value: sarama.ByteEncoder(recordValue), - } - metrics.IncrementCounter("request", source) - producer_module.WriteMessageToKafkaAsync(message, source) -} - func ClickstreamJsonEventHandler(request RequestObject, topic string) { messageBytes := request.Body diff --git a/main.go b/main.go index 1bef910..d788af9 100644 --- a/main.go +++ b/main.go @@ -23,10 +23,6 @@ func init() { schema.InitializeSchemaHandler(configs) producer_client.InitializeProducers(configs.Kafka, configs.Env) - for i := 0; i < 2; i++ { - go lib.ProcessProtobufRequestChannel(configs.Kafka.Kafka_Topic_Protobuf) - } - for i := 0; i < configs.Server.Goroutines; i++ { go lib.ProcessJsonRequestChannel(configs.Kafka.Kafka_Topic_Json) }