Merge pull request #48 from navi-medici/DE-3551

DE-3551 | segregating app events to different topic
This commit is contained in:
Purushotham Pururava Pushpavanth
2025-01-17 13:53:18 +05:30
committed by GitHub
5 changed files with 38 additions and 14 deletions

View File

@@ -20,5 +20,6 @@ type KafkaConfigurations struct {
Sasl_Password string
Kafka_Topic_Json string
Kafka_Topic_Litmus string
Kafka_Topic_App string
Kafka_Topic_Protobuf string
}

View File

@@ -13,3 +13,4 @@ kafka:
kafka_topic_json: KAFKA_TOPIC_JSON
kafka_topic_protobuf: KAFKA_TOPIC_PROTOBUF
kafka_topic_litmus: KAFKA_TOPIC_LITMUS
kafka_topic_app: KAFKA_TOPIC_APP

View File

@@ -78,26 +78,41 @@ func (wp *WorkerPool) processRequest(request RequestObject) {
source := getSource(result)
recordValue := messageBytes
topicFromSource := getTopicFromSource(source)
topics := getTopicsFromSource(source)
// Send to all designated topics
for _, topic := range topics {
message := &sarama.ProducerMessage{
Topic: topicFromSource,
Topic: topic,
Value: sarama.ByteEncoder(recordValue),
}
metrics.RequestCounter.WithLabelValues(source).Inc()
metrics.EventProcessingTimeHist.WithLabelValues(topicFromSource, source).Observe(float64(utils.NanosToMillis(time.Now().UnixNano()) - eventProcessingStartTime))
metrics.EventProcessingTimeHist.WithLabelValues(topic, source).Observe(float64(utils.NanosToMillis(time.Now().UnixNano()) - eventProcessingStartTime))
if err := producer_module.WriteMessageToKafkaAsync(message, source); err != nil {
wp.logger.Error("Failed to write message to Kafka", zap.Error(err), zap.String("source", source))
wp.logger.Error("Failed to write message to Kafka",
zap.Error(err),
zap.String("source", source),
zap.String("topic", topic))
}
}
}
func getTopicFromSource(source string) string {
if strings.ToUpper(source) == "LITMUS" {
return producer_module.GetLitmusClickstreamTopic()
func getTopicsFromSource(source string) []string {
switch strings.ToUpper(source) {
case "LITMUS":
return []string{producer_module.GetLitmusClickstreamTopic()}
case "SYNCTIMER", "SYNCEVENTTASK", "IOSPULSETIMER":
// TODO : remove default topic for events from app source once users
// have migrated their pipelines to new topic
return []string{
producer_module.GetAppClickstreamTopic(),
producer_module.GetDefaultClickstreamTopic(),
}
default:
return []string{producer_module.GetDefaultClickstreamTopic()}
}
return producer_module.GetDefaultClickstreamTopic()
}
func getSource(event map[string]interface{}) string {

View File

@@ -57,6 +57,7 @@ func getConfigs() config.Configurations {
configuration.Kafka.Kafka_Topic_Json = viper.GetString("KAFKA_TOPIC_JSON")
configuration.Kafka.Kafka_Topic_Protobuf = viper.GetString("KAFKA_TOPIC_PROTOBUF")
configuration.Kafka.Kafka_Topic_Litmus = viper.GetString("KAFKA_TOPIC_LITMUS")
configuration.Kafka.Kafka_Topic_App = viper.GetString("KAFKA_TOPIC_APP")
configuration.Kafka.Sasl_User = viper.GetString("KAFKA_SASL_USER")
configuration.Kafka.Sasl_Password = viper.GetString("KAFKA_SASL_PASSWORD")
cors = viper.GetString("CORS_LIST")

View File

@@ -19,6 +19,7 @@ var (
cb *gobreaker.CircuitBreaker
defaultClickstreamTopic string
litmusClickstreamTopic string
appClickstreamTopic string
)
func InitializeProducers(kafkaConfiguration config.KafkaConfigurations, env string) {
@@ -26,6 +27,7 @@ func InitializeProducers(kafkaConfiguration config.KafkaConfigurations, env stri
asyncProducer = GetAsyncProducer(kafkaConfiguration, env)
defaultClickstreamTopic = kafkaConfiguration.Kafka_Topic_Json
litmusClickstreamTopic = kafkaConfiguration.Kafka_Topic_Litmus
appClickstreamTopic = kafkaConfiguration.Kafka_Topic_App
cb = gobreaker.NewCircuitBreaker(gobreaker.Settings{
Name: "kafka-producer",
@@ -53,6 +55,10 @@ func GetLitmusClickstreamTopic() string {
return litmusClickstreamTopic
}
func GetAppClickstreamTopic() string {
return appClickstreamTopic
}
func GetAsyncProducerConfig(kafkaConfiguration config.KafkaConfigurations, env string) *sarama.Config {
config := sarama.NewConfig()
@@ -94,7 +100,7 @@ func WriteMessageToKafkaAsync(message *sarama.ProducerMessage, source string) er
asyncProducer.Input() <- message
metrics.KafkaPushTimeHist.WithLabelValues(message.Topic, source).Observe(float64(utils.NanosToMillis(time.Now().UnixNano()) - beforeKafkaPushTime))
metrics.KafkaPushCounter.WithLabelValues(message.Topic, source).Inc()
logger.Debug("Successfully written to Kafka for source " + source)
logger.Debug("Successfully written to Kafka topic : " + message.Topic + " for source : " + source)
return nil, nil
})
return err