From aa88bf13f379c1dce290858152af01c0f2528751 Mon Sep 17 00:00:00 2001 From: varnit goyal Date: Fri, 11 Oct 2024 15:16:45 +0530 Subject: [PATCH] TP-87883 | cybertron log enricher pipeline --- configs/application.yml | 1 + configs/kafka.go | 2 ++ pkg/kafka/consumer/consume.go | 2 +- 3 files changed, 4 insertions(+), 1 deletion(-) diff --git a/configs/application.yml b/configs/application.yml index 71f6f56..edc83d9 100644 --- a/configs/application.yml +++ b/configs/application.yml @@ -53,6 +53,7 @@ kafka: enabled: false sasl: enabled: true + mechanism: KAFKA_SASL_MECHANISM consumer: batch.size: 1 goroutines.max.pool.size: 20 diff --git a/configs/kafka.go b/configs/kafka.go index 09a637e..5b488f2 100644 --- a/configs/kafka.go +++ b/configs/kafka.go @@ -17,6 +17,7 @@ type KafkaConfig struct { ConsumerBatchSize int GoroutinesMaxPoolSize int ConsumerMaxTimeoutMs string + Mechanism string } func NewKafkaConfig() *KafkaConfig { @@ -32,6 +33,7 @@ func NewKafkaConfig() *KafkaConfig { ConsumerBatchSize: viper.GetInt("kafka.consumer.batch.size"), GoroutinesMaxPoolSize: viper.GetInt("kafka.consumer.goroutines.max.pool.size"), ConsumerMaxTimeoutMs: viper.GetString("kafka.consumer.max.timeout.ms"), + Mechanism: viper.GetString("kafka.sasl.mechanism"), } } diff --git a/pkg/kafka/consumer/consume.go b/pkg/kafka/consumer/consume.go index c27fa06..a52846b 100644 --- a/pkg/kafka/consumer/consume.go +++ b/pkg/kafka/consumer/consume.go @@ -22,7 +22,7 @@ func NewKafkaConsumers(baseConfig *configs.KafkaConfig, errorProcessor *service. "session.timeout.ms": 6000, "auto.offset.reset": "earliest", "security.protocol": "SASL_SSL", - "sasl.mechanisms": "PLAIN", + "sasl.mechanisms": baseConfig.Mechanism, "sasl.username": baseConfig.Username, "sasl.password": baseConfig.Password, })