diff --git a/config/config.go b/config/config.go index fd88ca6..5d83a87 100644 --- a/config/config.go +++ b/config/config.go @@ -2,6 +2,7 @@ package config type Configurations struct { + Env string Server ServerConfigurations Kafka KafkaConfigurations SchemaRegistry SchemaRegistryConfigurations diff --git a/config/config.yml b/config/config.yml index 77ad501..d998afb 100644 --- a/config/config.yml +++ b/config/config.yml @@ -1,3 +1,4 @@ +env: ENVIRONMENT # read from environment variable server: port: 8000 goroutines: 1000 diff --git a/main.go b/main.go index 3e7c756..fb90f59 100644 --- a/main.go +++ b/main.go @@ -35,14 +35,19 @@ func init() { } // Following coinfigurations read from environment variables + configuration.Env = viper.GetString(configuration.Env) configuration.Kafka.Bootstrap_Servers = viper.GetString(configuration.Kafka.Bootstrap_Servers) - configuration.Kafka.Sasl_User = viper.GetString(configuration.Kafka.Sasl_User) - configuration.Kafka.Sasl_Password = viper.GetString(configuration.Kafka.Sasl_Password) configuration.SchemaRegistry.Endpoint = viper.GetString(configuration.SchemaRegistry.Endpoint) configuration.SchemaRegistry.Topics = viper.GetString(configuration.SchemaRegistry.Topics) configuration.Kafka.Kafka_Topic_Json = viper.GetString(configuration.Kafka.Kafka_Topic_Json) configuration.Kafka.Kafka_Topic_Protobuf = viper.GetString(configuration.Kafka.Kafka_Topic_Protobuf) + log.Printf("Env is: ", configuration.Env) + if configuration.Env == "PROD" { + configuration.Kafka.Sasl_User = viper.GetString(configuration.Kafka.Sasl_User) + configuration.Kafka.Sasl_Password = viper.GetString(configuration.Kafka.Sasl_Password) + } + port = configuration.Server.Port log.Printf("PORT IS: ", port) log.Printf(configuration.Kafka.Bootstrap_Servers) @@ -55,7 +60,7 @@ func init() { producer_module.GetSchemaVersions() // initialize producers - producer_module.InitializeProducers(configuration.Kafka) + producer_module.InitializeProducers(configuration) for i := 0; i < configuration.Server.Goroutines; i++ { go lib.ProcessProtobufRequestChannel(configuration.Kafka.Kafka_Topic_Protobuf) diff --git a/producer/producer_config.go b/producer/producer_config.go index d075a47..c2f75e7 100644 --- a/producer/producer_config.go +++ b/producer/producer_config.go @@ -16,16 +16,19 @@ var ( asyncProducer sarama.AsyncProducer ) -func GetSyncProducerConfig(kafkaConfiguration config.KafkaConfigurations) *sarama.Config { +func GetSyncProducerConfig(configuration config) *sarama.Config { config := sarama.NewConfig() - + kafkaConfiguration configuration.KafkaConfigurations // security configs // config.Net.TLS.Enable = true - config.Net.SASL.Enable = true - config.Net.SASL.Handshake = true - config.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512 - config.Net.SASL.User = kafkaConfiguration.Sasl_User - config.Net.SASL.Password = kafkaConfiguration.Sasl_Password + if config.Env == "PROD" { + config.Net.SASL.Enable = true + config.Net.SASL.Handshake = true + config.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512 + config.Net.SASL.User = kafkaConfiguration.Sasl_User + config.Net.SASL.Password = kafkaConfiguration.Sasl_Password + } + // producer configs // to be changed: read from config file @@ -41,17 +44,18 @@ func GetSyncProducerConfig(kafkaConfiguration config.KafkaConfigurations) *saram } -func GetAsyncProducerConfig(kafkaConfiguration config.KafkaConfigurations) *sarama.Config { +func GetAsyncProducerConfig(configuration config) *sarama.Config { config := sarama.NewConfig() - + kafkaConfiguration configuration.KafkaConfigurations // security configs // config.Net.TLS.Enable = true - config.Net.SASL.Enable = true - config.Net.SASL.Handshake = true - config.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512 - config.Net.SASL.User = kafkaConfiguration.Sasl_User - config.Net.SASL.Password = kafkaConfiguration.Sasl_Password - + if config.Env != "PROD" { + config.Net.SASL.Enable = true + config.Net.SASL.Handshake = true + config.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512 + config.Net.SASL.User = kafkaConfiguration.Sasl_User + config.Net.SASL.Password = kafkaConfiguration.Sasl_Password + } // producer configs // to be changed: read from config file config.Producer.Retry.Max = 3 @@ -89,10 +93,10 @@ func GetAsyncProducer(kafkaConfiguration config.KafkaConfigurations) sarama.Asyn } -func InitializeProducers(kafkaConfiguration config.KafkaConfigurations) { +func InitializeProducers(configuration config) { sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags) - syncProducer = GetSyncProducer(kafkaConfiguration) - asyncProducer = GetAsyncProducer(kafkaConfiguration) + syncProducer = GetSyncProducer(configuration) + asyncProducer = GetAsyncProducer(configuration) } // using confluent-kafka-go