DE-3386 | Redirecting litmus events to new topic
This commit is contained in:
@@ -1,9 +1,9 @@
|
||||
package config
|
||||
|
||||
type Configurations struct {
|
||||
Env string
|
||||
Server ServerConfigurations
|
||||
Kafka KafkaConfigurations
|
||||
Env string
|
||||
Server ServerConfigurations
|
||||
Kafka KafkaConfigurations
|
||||
}
|
||||
|
||||
type ServerConfigurations struct {
|
||||
@@ -19,5 +19,6 @@ type KafkaConfigurations struct {
|
||||
Sasl_User string
|
||||
Sasl_Password string
|
||||
Kafka_Topic_Json string
|
||||
Kafka_Topic_Litmus string
|
||||
Kafka_Topic_Protobuf string
|
||||
}
|
||||
|
||||
@@ -12,3 +12,4 @@ kafka:
|
||||
sasl_password: KAFKA_SASL_PASSWORD # read from environment variable
|
||||
kafka_topic_json: KAFKA_TOPIC_JSON
|
||||
kafka_topic_protobuf: KAFKA_TOPIC_PROTOBUF
|
||||
kafka_topic_litmus: KAFKA_TOPIC_LITMUS
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
metrics "com.navi.medici.janus/instrumentation"
|
||||
@@ -77,20 +78,28 @@ func (wp *WorkerPool) processRequest(request RequestObject) {
|
||||
source := getSource(result)
|
||||
|
||||
recordValue := messageBytes
|
||||
topicFromSource := getTopicFromSource(source)
|
||||
|
||||
message := &sarama.ProducerMessage{
|
||||
Topic: producer_module.GetKafkaTopic(),
|
||||
Topic: getTopicFromSource(source),
|
||||
Value: sarama.ByteEncoder(recordValue),
|
||||
}
|
||||
|
||||
metrics.RequestCounter.WithLabelValues(source).Inc()
|
||||
metrics.EventProcessingTimeHist.WithLabelValues(producer_module.GetKafkaTopic(), source).Observe(float64(utils.NanosToMillis(time.Now().UnixNano()) - eventProcessingStartTime))
|
||||
metrics.EventProcessingTimeHist.WithLabelValues(topicFromSource, source).Observe(float64(utils.NanosToMillis(time.Now().UnixNano()) - eventProcessingStartTime))
|
||||
|
||||
if err := producer_module.WriteMessageToKafkaAsync(message, source); err != nil {
|
||||
wp.logger.Error("Failed to write message to Kafka", zap.Error(err), zap.String("source", source))
|
||||
}
|
||||
}
|
||||
|
||||
func getTopicFromSource(source string) string {
|
||||
if strings.ToUpper(source) == "LITMUS" {
|
||||
return producer_module.GetLitmusClickstreamTopic()
|
||||
}
|
||||
return producer_module.GetDefaultClickstreamTopic()
|
||||
}
|
||||
|
||||
func getSource(event map[string]interface{}) string {
|
||||
if event["source"] == nil {
|
||||
return "UNKNOWN"
|
||||
|
||||
1
main.go
1
main.go
@@ -56,6 +56,7 @@ func getConfigs() config.Configurations {
|
||||
configuration.Kafka.Bootstrap_Servers = viper.GetString("BOOTSTRAP_SERVERS")
|
||||
configuration.Kafka.Kafka_Topic_Json = viper.GetString("KAFKA_TOPIC_JSON")
|
||||
configuration.Kafka.Kafka_Topic_Protobuf = viper.GetString("KAFKA_TOPIC_PROTOBUF")
|
||||
configuration.Kafka.Kafka_Topic_Litmus = viper.GetString("KAFKA_TOPIC_LITMUS")
|
||||
configuration.Kafka.Sasl_User = viper.GetString("KAFKA_SASL_USER")
|
||||
configuration.Kafka.Sasl_Password = viper.GetString("KAFKA_SASL_PASSWORD")
|
||||
cors = viper.GetString("CORS_LIST")
|
||||
|
||||
@@ -14,16 +14,18 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
asyncProducer sarama.AsyncProducer
|
||||
logger *zap.Logger
|
||||
cb *gobreaker.CircuitBreaker
|
||||
kafkaTopic string
|
||||
asyncProducer sarama.AsyncProducer
|
||||
logger *zap.Logger
|
||||
cb *gobreaker.CircuitBreaker
|
||||
defaultClickstreamTopic string
|
||||
litmusClickstreamTopic string
|
||||
)
|
||||
|
||||
func InitializeProducers(kafkaConfiguration config.KafkaConfigurations, env string) {
|
||||
logger = utils.GetLogger()
|
||||
asyncProducer = GetAsyncProducer(kafkaConfiguration, env)
|
||||
kafkaTopic = kafkaConfiguration.Kafka_Topic_Json
|
||||
defaultClickstreamTopic = kafkaConfiguration.Kafka_Topic_Json
|
||||
litmusClickstreamTopic = kafkaConfiguration.Kafka_Topic_Litmus
|
||||
|
||||
cb = gobreaker.NewCircuitBreaker(gobreaker.Settings{
|
||||
Name: "kafka-producer",
|
||||
@@ -43,8 +45,12 @@ func InitializeProducers(kafkaConfiguration config.KafkaConfigurations, env stri
|
||||
go processProducerErrors()
|
||||
}
|
||||
|
||||
func GetKafkaTopic() string {
|
||||
return kafkaTopic
|
||||
func GetDefaultClickstreamTopic() string {
|
||||
return defaultClickstreamTopic
|
||||
}
|
||||
|
||||
func GetLitmusClickstreamTopic() string {
|
||||
return litmusClickstreamTopic
|
||||
}
|
||||
|
||||
func GetAsyncProducerConfig(kafkaConfiguration config.KafkaConfigurations, env string) *sarama.Config {
|
||||
|
||||
Reference in New Issue
Block a user