Merge pull request #41 from navi-medici/DE-3386

DE-3386 | Redirecting litmus events to new topic
This commit is contained in:
Purushotham Pururava Pushpavanth
2024-10-10 16:21:29 +05:30
committed by GitHub
5 changed files with 30 additions and 12 deletions

View File

@@ -1,9 +1,9 @@
package config
type Configurations struct {
Env string
Server ServerConfigurations
Kafka KafkaConfigurations
Env string
Server ServerConfigurations
Kafka KafkaConfigurations
}
type ServerConfigurations struct {
@@ -19,5 +19,6 @@ type KafkaConfigurations struct {
Sasl_User string
Sasl_Password string
Kafka_Topic_Json string
Kafka_Topic_Litmus string
Kafka_Topic_Protobuf string
}

View File

@@ -12,3 +12,4 @@ kafka:
sasl_password: KAFKA_SASL_PASSWORD # read from environment variable
kafka_topic_json: KAFKA_TOPIC_JSON
kafka_topic_protobuf: KAFKA_TOPIC_PROTOBUF
kafka_topic_litmus: KAFKA_TOPIC_LITMUS

View File

@@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"net/http"
"strings"
"time"
metrics "com.navi.medici.janus/instrumentation"
@@ -77,20 +78,28 @@ func (wp *WorkerPool) processRequest(request RequestObject) {
source := getSource(result)
recordValue := messageBytes
topicFromSource := getTopicFromSource(source)
message := &sarama.ProducerMessage{
Topic: producer_module.GetKafkaTopic(),
Topic: topicFromSource,
Value: sarama.ByteEncoder(recordValue),
}
metrics.RequestCounter.WithLabelValues(source).Inc()
metrics.EventProcessingTimeHist.WithLabelValues(producer_module.GetKafkaTopic(), source).Observe(float64(utils.NanosToMillis(time.Now().UnixNano()) - eventProcessingStartTime))
metrics.EventProcessingTimeHist.WithLabelValues(topicFromSource, source).Observe(float64(utils.NanosToMillis(time.Now().UnixNano()) - eventProcessingStartTime))
if err := producer_module.WriteMessageToKafkaAsync(message, source); err != nil {
wp.logger.Error("Failed to write message to Kafka", zap.Error(err), zap.String("source", source))
}
}
func getTopicFromSource(source string) string {
if strings.ToUpper(source) == "LITMUS" {
return producer_module.GetLitmusClickstreamTopic()
}
return producer_module.GetDefaultClickstreamTopic()
}
func getSource(event map[string]interface{}) string {
if event["source"] == nil {
return "UNKNOWN"

View File

@@ -56,6 +56,7 @@ func getConfigs() config.Configurations {
configuration.Kafka.Bootstrap_Servers = viper.GetString("BOOTSTRAP_SERVERS")
configuration.Kafka.Kafka_Topic_Json = viper.GetString("KAFKA_TOPIC_JSON")
configuration.Kafka.Kafka_Topic_Protobuf = viper.GetString("KAFKA_TOPIC_PROTOBUF")
configuration.Kafka.Kafka_Topic_Litmus = viper.GetString("KAFKA_TOPIC_LITMUS")
configuration.Kafka.Sasl_User = viper.GetString("KAFKA_SASL_USER")
configuration.Kafka.Sasl_Password = viper.GetString("KAFKA_SASL_PASSWORD")
cors = viper.GetString("CORS_LIST")

View File

@@ -14,16 +14,18 @@ import (
)
var (
asyncProducer sarama.AsyncProducer
logger *zap.Logger
cb *gobreaker.CircuitBreaker
kafkaTopic string
asyncProducer sarama.AsyncProducer
logger *zap.Logger
cb *gobreaker.CircuitBreaker
defaultClickstreamTopic string
litmusClickstreamTopic string
)
func InitializeProducers(kafkaConfiguration config.KafkaConfigurations, env string) {
logger = utils.GetLogger()
asyncProducer = GetAsyncProducer(kafkaConfiguration, env)
kafkaTopic = kafkaConfiguration.Kafka_Topic_Json
defaultClickstreamTopic = kafkaConfiguration.Kafka_Topic_Json
litmusClickstreamTopic = kafkaConfiguration.Kafka_Topic_Litmus
cb = gobreaker.NewCircuitBreaker(gobreaker.Settings{
Name: "kafka-producer",
@@ -43,8 +45,12 @@ func InitializeProducers(kafkaConfiguration config.KafkaConfigurations, env stri
go processProducerErrors()
}
func GetKafkaTopic() string {
return kafkaTopic
func GetDefaultClickstreamTopic() string {
return defaultClickstreamTopic
}
func GetLitmusClickstreamTopic() string {
return litmusClickstreamTopic
}
func GetAsyncProducerConfig(kafkaConfiguration config.KafkaConfigurations, env string) *sarama.Config {