TP-5555 | kafka fix
This commit is contained in:
@@ -53,6 +53,7 @@ kafka:
|
||||
enabled: false
|
||||
sasl:
|
||||
enabled: true
|
||||
mechanism: KAFKA_SASL_MECHANISM
|
||||
consumer:
|
||||
batch.size: 1
|
||||
goroutines.max.pool.size: 20
|
||||
|
||||
@@ -17,6 +17,7 @@ type KafkaConfig struct {
|
||||
ConsumerBatchSize int
|
||||
GoroutinesMaxPoolSize int
|
||||
ConsumerMaxTimeoutMs string
|
||||
Mechanism string
|
||||
}
|
||||
|
||||
func NewKafkaConfig() *KafkaConfig {
|
||||
@@ -32,6 +33,7 @@ func NewKafkaConfig() *KafkaConfig {
|
||||
ConsumerBatchSize: viper.GetInt("kafka.consumer.batch.size"),
|
||||
GoroutinesMaxPoolSize: viper.GetInt("kafka.consumer.goroutines.max.pool.size"),
|
||||
ConsumerMaxTimeoutMs: viper.GetString("kafka.consumer.max.timeout.ms"),
|
||||
Mechanism: viper.GetString("kafka.sasl.mechanism"),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -9,7 +9,7 @@ func ConfluentProducer(baseConfig *configs.KafkaConfig) (*kafka.Producer, error)
|
||||
return kafka.NewProducer(&kafka.ConfigMap{
|
||||
"bootstrap.servers": baseConfig.Brokers[0],
|
||||
"security.protocol": "SASL_SSL",
|
||||
"sasl.mechanisms": "PLAIN",
|
||||
"sasl.mechanisms": baseConfig.Mechanism,
|
||||
"sasl.username": baseConfig.Username,
|
||||
"sasl.password": baseConfig.Password,
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user