diff --git a/config/config.go b/config/config.go index 4841d7c..b025c67 100644 --- a/config/config.go +++ b/config/config.go @@ -1,9 +1,9 @@ package config type Configurations struct { - Env string - Server ServerConfigurations - Kafka KafkaConfigurations + Env string + Server ServerConfigurations + Kafka KafkaConfigurations } type ServerConfigurations struct { @@ -19,5 +19,6 @@ type KafkaConfigurations struct { Sasl_User string Sasl_Password string Kafka_Topic_Json string + Kafka_Topic_Litmus string Kafka_Topic_Protobuf string } diff --git a/config/config.yml b/config/config.yml index aabf5b5..0705bce 100644 --- a/config/config.yml +++ b/config/config.yml @@ -12,3 +12,4 @@ kafka: sasl_password: KAFKA_SASL_PASSWORD # read from environment variable kafka_topic_json: KAFKA_TOPIC_JSON kafka_topic_protobuf: KAFKA_TOPIC_PROTOBUF + kafka_topic_litmus: KAFKA_TOPIC_LITMUS diff --git a/lib/RequestHandler.go b/lib/RequestHandler.go index 4396965..05e2079 100644 --- a/lib/RequestHandler.go +++ b/lib/RequestHandler.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "net/http" + "strings" "time" metrics "com.navi.medici.janus/instrumentation" @@ -77,20 +78,28 @@ func (wp *WorkerPool) processRequest(request RequestObject) { source := getSource(result) recordValue := messageBytes + topicFromSource := getTopicFromSource(source) message := &sarama.ProducerMessage{ - Topic: producer_module.GetKafkaTopic(), + Topic: getTopicFromSource(source), Value: sarama.ByteEncoder(recordValue), } metrics.RequestCounter.WithLabelValues(source).Inc() - metrics.EventProcessingTimeHist.WithLabelValues(producer_module.GetKafkaTopic(), source).Observe(float64(utils.NanosToMillis(time.Now().UnixNano()) - eventProcessingStartTime)) + metrics.EventProcessingTimeHist.WithLabelValues(topicFromSource, source).Observe(float64(utils.NanosToMillis(time.Now().UnixNano()) - eventProcessingStartTime)) if err := producer_module.WriteMessageToKafkaAsync(message, source); err != nil { wp.logger.Error("Failed to write message to Kafka", zap.Error(err), zap.String("source", source)) } } +func getTopicFromSource(source string) string { + if strings.ToUpper(source) == "LITMUS" { + return producer_module.GetLitmusClickstreamTopic() + } + return producer_module.GetDefaultClickstreamTopic() +} + func getSource(event map[string]interface{}) string { if event["source"] == nil { return "UNKNOWN" diff --git a/main.go b/main.go index 351c284..6c1bd05 100644 --- a/main.go +++ b/main.go @@ -56,6 +56,7 @@ func getConfigs() config.Configurations { configuration.Kafka.Bootstrap_Servers = viper.GetString("BOOTSTRAP_SERVERS") configuration.Kafka.Kafka_Topic_Json = viper.GetString("KAFKA_TOPIC_JSON") configuration.Kafka.Kafka_Topic_Protobuf = viper.GetString("KAFKA_TOPIC_PROTOBUF") + configuration.Kafka.Kafka_Topic_Litmus = viper.GetString("KAFKA_TOPIC_LITMUS") configuration.Kafka.Sasl_User = viper.GetString("KAFKA_SASL_USER") configuration.Kafka.Sasl_Password = viper.GetString("KAFKA_SASL_PASSWORD") cors = viper.GetString("CORS_LIST") diff --git a/producer/producer_client.go b/producer/producer_client.go index 4fe7479..0569ca5 100644 --- a/producer/producer_client.go +++ b/producer/producer_client.go @@ -14,16 +14,18 @@ import ( ) var ( - asyncProducer sarama.AsyncProducer - logger *zap.Logger - cb *gobreaker.CircuitBreaker - kafkaTopic string + asyncProducer sarama.AsyncProducer + logger *zap.Logger + cb *gobreaker.CircuitBreaker + defaultClickstreamTopic string + litmusClickstreamTopic string ) func InitializeProducers(kafkaConfiguration config.KafkaConfigurations, env string) { logger = utils.GetLogger() asyncProducer = GetAsyncProducer(kafkaConfiguration, env) - kafkaTopic = kafkaConfiguration.Kafka_Topic_Json + defaultClickstreamTopic = kafkaConfiguration.Kafka_Topic_Json + litmusClickstreamTopic = kafkaConfiguration.Kafka_Topic_Litmus cb = gobreaker.NewCircuitBreaker(gobreaker.Settings{ Name: "kafka-producer", @@ -43,8 +45,12 @@ func InitializeProducers(kafkaConfiguration config.KafkaConfigurations, env stri go processProducerErrors() } -func GetKafkaTopic() string { - return kafkaTopic +func GetDefaultClickstreamTopic() string { + return defaultClickstreamTopic +} + +func GetLitmusClickstreamTopic() string { + return litmusClickstreamTopic } func GetAsyncProducerConfig(kafkaConfiguration config.KafkaConfigurations, env string) *sarama.Config {