DE-121 - added env variable
This commit is contained in:
@@ -2,6 +2,7 @@ package config
|
||||
|
||||
|
||||
type Configurations struct {
|
||||
Env string
|
||||
Server ServerConfigurations
|
||||
Kafka KafkaConfigurations
|
||||
SchemaRegistry SchemaRegistryConfigurations
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
env: ENVIRONMENT # read from environment variable
|
||||
server:
|
||||
port: 8000
|
||||
goroutines: 1000
|
||||
|
||||
11
main.go
11
main.go
@@ -35,14 +35,19 @@ func init() {
|
||||
}
|
||||
|
||||
// Following coinfigurations read from environment variables
|
||||
configuration.Env = viper.GetString(configuration.Env)
|
||||
configuration.Kafka.Bootstrap_Servers = viper.GetString(configuration.Kafka.Bootstrap_Servers)
|
||||
configuration.Kafka.Sasl_User = viper.GetString(configuration.Kafka.Sasl_User)
|
||||
configuration.Kafka.Sasl_Password = viper.GetString(configuration.Kafka.Sasl_Password)
|
||||
configuration.SchemaRegistry.Endpoint = viper.GetString(configuration.SchemaRegistry.Endpoint)
|
||||
configuration.SchemaRegistry.Topics = viper.GetString(configuration.SchemaRegistry.Topics)
|
||||
configuration.Kafka.Kafka_Topic_Json = viper.GetString(configuration.Kafka.Kafka_Topic_Json)
|
||||
configuration.Kafka.Kafka_Topic_Protobuf = viper.GetString(configuration.Kafka.Kafka_Topic_Protobuf)
|
||||
|
||||
log.Printf("Env is: ", configuration.Env)
|
||||
if configuration.Env == "PROD" {
|
||||
configuration.Kafka.Sasl_User = viper.GetString(configuration.Kafka.Sasl_User)
|
||||
configuration.Kafka.Sasl_Password = viper.GetString(configuration.Kafka.Sasl_Password)
|
||||
}
|
||||
|
||||
port = configuration.Server.Port
|
||||
log.Printf("PORT IS: ", port)
|
||||
log.Printf(configuration.Kafka.Bootstrap_Servers)
|
||||
@@ -55,7 +60,7 @@ func init() {
|
||||
producer_module.GetSchemaVersions()
|
||||
|
||||
// initialize producers
|
||||
producer_module.InitializeProducers(configuration.Kafka)
|
||||
producer_module.InitializeProducers(configuration)
|
||||
|
||||
for i := 0; i < configuration.Server.Goroutines; i++ {
|
||||
go lib.ProcessProtobufRequestChannel(configuration.Kafka.Kafka_Topic_Protobuf)
|
||||
|
||||
@@ -16,16 +16,19 @@ var (
|
||||
asyncProducer sarama.AsyncProducer
|
||||
)
|
||||
|
||||
func GetSyncProducerConfig(kafkaConfiguration config.KafkaConfigurations) *sarama.Config {
|
||||
func GetSyncProducerConfig(configuration config) *sarama.Config {
|
||||
config := sarama.NewConfig()
|
||||
|
||||
kafkaConfiguration configuration.KafkaConfigurations
|
||||
// security configs
|
||||
// config.Net.TLS.Enable = true
|
||||
config.Net.SASL.Enable = true
|
||||
config.Net.SASL.Handshake = true
|
||||
config.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512
|
||||
config.Net.SASL.User = kafkaConfiguration.Sasl_User
|
||||
config.Net.SASL.Password = kafkaConfiguration.Sasl_Password
|
||||
if config.Env == "PROD" {
|
||||
config.Net.SASL.Enable = true
|
||||
config.Net.SASL.Handshake = true
|
||||
config.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512
|
||||
config.Net.SASL.User = kafkaConfiguration.Sasl_User
|
||||
config.Net.SASL.Password = kafkaConfiguration.Sasl_Password
|
||||
}
|
||||
|
||||
|
||||
// producer configs
|
||||
// to be changed: read from config file
|
||||
@@ -41,17 +44,18 @@ func GetSyncProducerConfig(kafkaConfiguration config.KafkaConfigurations) *saram
|
||||
}
|
||||
|
||||
|
||||
func GetAsyncProducerConfig(kafkaConfiguration config.KafkaConfigurations) *sarama.Config {
|
||||
func GetAsyncProducerConfig(configuration config) *sarama.Config {
|
||||
config := sarama.NewConfig()
|
||||
|
||||
kafkaConfiguration configuration.KafkaConfigurations
|
||||
// security configs
|
||||
// config.Net.TLS.Enable = true
|
||||
config.Net.SASL.Enable = true
|
||||
config.Net.SASL.Handshake = true
|
||||
config.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512
|
||||
config.Net.SASL.User = kafkaConfiguration.Sasl_User
|
||||
config.Net.SASL.Password = kafkaConfiguration.Sasl_Password
|
||||
|
||||
if config.Env != "PROD" {
|
||||
config.Net.SASL.Enable = true
|
||||
config.Net.SASL.Handshake = true
|
||||
config.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512
|
||||
config.Net.SASL.User = kafkaConfiguration.Sasl_User
|
||||
config.Net.SASL.Password = kafkaConfiguration.Sasl_Password
|
||||
}
|
||||
// producer configs
|
||||
// to be changed: read from config file
|
||||
config.Producer.Retry.Max = 3
|
||||
@@ -89,10 +93,10 @@ func GetAsyncProducer(kafkaConfiguration config.KafkaConfigurations) sarama.Asyn
|
||||
}
|
||||
|
||||
|
||||
func InitializeProducers(kafkaConfiguration config.KafkaConfigurations) {
|
||||
func InitializeProducers(configuration config) {
|
||||
sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
|
||||
syncProducer = GetSyncProducer(kafkaConfiguration)
|
||||
asyncProducer = GetAsyncProducer(kafkaConfiguration)
|
||||
syncProducer = GetSyncProducer(configuration)
|
||||
asyncProducer = GetAsyncProducer(configuration)
|
||||
}
|
||||
|
||||
// using confluent-kafka-go
|
||||
|
||||
Reference in New Issue
Block a user