From 3d3200d381b085e165036fe28482ff3552751e4c Mon Sep 17 00:00:00 2001 From: Nitin Date: Tue, 27 Jul 2021 13:21:26 +0530 Subject: [PATCH] DE-121 - added env variable --- main.go | 2 +- producer/producer_config.go | 26 +++++++++++++------------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/main.go b/main.go index fb90f59..14affb1 100644 --- a/main.go +++ b/main.go @@ -60,7 +60,7 @@ func init() { producer_module.GetSchemaVersions() // initialize producers - producer_module.InitializeProducers(configuration) + producer_module.InitializeProducers(configuration.Kafka, configuration.Env) for i := 0; i < configuration.Server.Goroutines; i++ { go lib.ProcessProtobufRequestChannel(configuration.Kafka.Kafka_Topic_Protobuf) diff --git a/producer/producer_config.go b/producer/producer_config.go index 7d8a894..e66602d 100644 --- a/producer/producer_config.go +++ b/producer/producer_config.go @@ -16,12 +16,12 @@ var ( asyncProducer sarama.AsyncProducer ) -func GetSyncProducerConfig(configuration config.Configurations) *sarama.Config { +func GetSyncProducerConfig(kafkaConfiguration config.KafkaConfigurations, env string) *sarama.Config { config := sarama.NewConfig() - kafkaConfiguration configuration.KafkaConfigurations + //kafkaConfiguration configuration.KafkaConfigurations // security configs // config.Net.TLS.Enable = true - if config.Env == "PROD" { + if env == "PROD" { config.Net.SASL.Enable = true config.Net.SASL.Handshake = true config.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512 @@ -44,12 +44,12 @@ func GetSyncProducerConfig(configuration config.Configurations) *sarama.Config { } -func GetAsyncProducerConfig(configuration config.Configurations) *sarama.Config { +func GetAsyncProducerConfig(kafkaConfiguration config.KafkaConfigurations, env string) *sarama.Config { config := sarama.NewConfig() - kafkaConfiguration configuration.KafkaConfigurations + //kafkaConfiguration configuration.KafkaConfigurations // security configs // config.Net.TLS.Enable = true - if config.Env != "PROD" { + if env != "PROD" { config.Net.SASL.Enable = true config.Net.SASL.Handshake = true config.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512 @@ -71,8 +71,8 @@ func GetAsyncProducerConfig(configuration config.Configurations) *sarama.Config } -func GetSyncProducer(kafkaConfiguration config.KafkaConfigurations) sarama.SyncProducer { - config := GetSyncProducerConfig(kafkaConfiguration) +func GetSyncProducer(kafkaConfiguration config.KafkaConfigurations, env string) sarama.SyncProducer { + config := GetSyncProducerConfig(kafkaConfiguration, env) brokerList := strings.Split(kafkaConfiguration.Bootstrap_Servers, ",") producer, err := sarama.NewSyncProducer(brokerList, config) if err != nil { @@ -82,8 +82,8 @@ func GetSyncProducer(kafkaConfiguration config.KafkaConfigurations) sarama.SyncP } -func GetAsyncProducer(kafkaConfiguration config.KafkaConfigurations) sarama.AsyncProducer { - config := GetAsyncProducerConfig(kafkaConfiguration) +func GetAsyncProducer(kafkaConfiguration config.KafkaConfigurations, env string) sarama.AsyncProducer { + config := GetAsyncProducerConfig(kafkaConfiguration, env) brokerList := strings.Split(kafkaConfiguration.Bootstrap_Servers, ",") producer, err := sarama.NewAsyncProducer(brokerList, config) if err != nil { @@ -93,10 +93,10 @@ func GetAsyncProducer(kafkaConfiguration config.KafkaConfigurations) sarama.Asyn } -func InitializeProducers(configuration config) { +func InitializeProducers(kafkaConfiguration config.KafkaConfigurations, env string) { sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags) - syncProducer = GetSyncProducer(configuration) - asyncProducer = GetAsyncProducer(configuration) + syncProducer = GetSyncProducer(kafkaConfiguration, env) + asyncProducer = GetAsyncProducer(kafkaConfiguration, env) } // using confluent-kafka-go