From 2d5cdfaee7bf5f7d9680afa4ff99d0bf34456e39 Mon Sep 17 00:00:00 2001 From: puru Date: Thu, 16 Jan 2025 21:04:39 +0530 Subject: [PATCH] DE-3551 | segregating app events to different topic --- config/config.go | 1 + config/config.yml | 1 + lib/RequestHandler.go | 41 +++++++++++++++++++++++++------------ main.go | 1 + producer/producer_client.go | 8 +++++++- 5 files changed, 38 insertions(+), 14 deletions(-) diff --git a/config/config.go b/config/config.go index b025c67..78356d0 100644 --- a/config/config.go +++ b/config/config.go @@ -20,5 +20,6 @@ type KafkaConfigurations struct { Sasl_Password string Kafka_Topic_Json string Kafka_Topic_Litmus string + Kafka_Topic_App string Kafka_Topic_Protobuf string } diff --git a/config/config.yml b/config/config.yml index 0705bce..2064c1a 100644 --- a/config/config.yml +++ b/config/config.yml @@ -13,3 +13,4 @@ kafka: kafka_topic_json: KAFKA_TOPIC_JSON kafka_topic_protobuf: KAFKA_TOPIC_PROTOBUF kafka_topic_litmus: KAFKA_TOPIC_LITMUS + kafka_topic_app: KAFKA_TOPIC_APP \ No newline at end of file diff --git a/lib/RequestHandler.go b/lib/RequestHandler.go index 8c492f4..d121e9f 100644 --- a/lib/RequestHandler.go +++ b/lib/RequestHandler.go @@ -78,26 +78,41 @@ func (wp *WorkerPool) processRequest(request RequestObject) { source := getSource(result) recordValue := messageBytes - topicFromSource := getTopicFromSource(source) + topics := getTopicsFromSource(source) - message := &sarama.ProducerMessage{ - Topic: topicFromSource, - Value: sarama.ByteEncoder(recordValue), - } + // Send to all designated topics + for _, topic := range topics { + message := &sarama.ProducerMessage{ + Topic: topic, + Value: sarama.ByteEncoder(recordValue), + } - metrics.RequestCounter.WithLabelValues(source).Inc() - metrics.EventProcessingTimeHist.WithLabelValues(topicFromSource, source).Observe(float64(utils.NanosToMillis(time.Now().UnixNano()) - eventProcessingStartTime)) + metrics.RequestCounter.WithLabelValues(source).Inc() + metrics.EventProcessingTimeHist.WithLabelValues(topic, source).Observe(float64(utils.NanosToMillis(time.Now().UnixNano()) - eventProcessingStartTime)) - if err := producer_module.WriteMessageToKafkaAsync(message, source); err != nil { - wp.logger.Error("Failed to write message to Kafka", zap.Error(err), zap.String("source", source)) + if err := producer_module.WriteMessageToKafkaAsync(message, source); err != nil { + wp.logger.Error("Failed to write message to Kafka", + zap.Error(err), + zap.String("source", source), + zap.String("topic", topic)) + } } } -func getTopicFromSource(source string) string { - if strings.ToUpper(source) == "LITMUS" { - return producer_module.GetLitmusClickstreamTopic() +func getTopicsFromSource(source string) []string { + switch strings.ToUpper(source) { + case "LITMUS": + return []string{producer_module.GetLitmusClickstreamTopic()} + case "SYNCTIMER", "SYNCEVENTTASK", "IOSPULSETIMER": + // TODO : remove default topic for events from app source once users + // have migrated their pipelines to new topic + return []string{ + producer_module.GetAppClickstreamTopic(), + producer_module.GetDefaultClickstreamTopic(), + } + default: + return []string{producer_module.GetDefaultClickstreamTopic()} } - return producer_module.GetDefaultClickstreamTopic() } func getSource(event map[string]interface{}) string { diff --git a/main.go b/main.go index 6c1bd05..91f555e 100644 --- a/main.go +++ b/main.go @@ -57,6 +57,7 @@ func getConfigs() config.Configurations { configuration.Kafka.Kafka_Topic_Json = viper.GetString("KAFKA_TOPIC_JSON") configuration.Kafka.Kafka_Topic_Protobuf = viper.GetString("KAFKA_TOPIC_PROTOBUF") configuration.Kafka.Kafka_Topic_Litmus = viper.GetString("KAFKA_TOPIC_LITMUS") + configuration.Kafka.Kafka_Topic_App = viper.GetString("KAFKA_TOPIC_APP") configuration.Kafka.Sasl_User = viper.GetString("KAFKA_SASL_USER") configuration.Kafka.Sasl_Password = viper.GetString("KAFKA_SASL_PASSWORD") cors = viper.GetString("CORS_LIST") diff --git a/producer/producer_client.go b/producer/producer_client.go index 0569ca5..8d82845 100644 --- a/producer/producer_client.go +++ b/producer/producer_client.go @@ -19,6 +19,7 @@ var ( cb *gobreaker.CircuitBreaker defaultClickstreamTopic string litmusClickstreamTopic string + appClickstreamTopic string ) func InitializeProducers(kafkaConfiguration config.KafkaConfigurations, env string) { @@ -26,6 +27,7 @@ func InitializeProducers(kafkaConfiguration config.KafkaConfigurations, env stri asyncProducer = GetAsyncProducer(kafkaConfiguration, env) defaultClickstreamTopic = kafkaConfiguration.Kafka_Topic_Json litmusClickstreamTopic = kafkaConfiguration.Kafka_Topic_Litmus + appClickstreamTopic = kafkaConfiguration.Kafka_Topic_App cb = gobreaker.NewCircuitBreaker(gobreaker.Settings{ Name: "kafka-producer", @@ -53,6 +55,10 @@ func GetLitmusClickstreamTopic() string { return litmusClickstreamTopic } +func GetAppClickstreamTopic() string { + return appClickstreamTopic +} + func GetAsyncProducerConfig(kafkaConfiguration config.KafkaConfigurations, env string) *sarama.Config { config := sarama.NewConfig() @@ -94,7 +100,7 @@ func WriteMessageToKafkaAsync(message *sarama.ProducerMessage, source string) er asyncProducer.Input() <- message metrics.KafkaPushTimeHist.WithLabelValues(message.Topic, source).Observe(float64(utils.NanosToMillis(time.Now().UnixNano()) - beforeKafkaPushTime)) metrics.KafkaPushCounter.WithLabelValues(message.Topic, source).Inc() - logger.Debug("Successfully written to Kafka for source " + source) + logger.Debug("Successfully written to Kafka topic : " + message.Topic + " for source : " + source) return nil, nil }) return err