Merge pull request #3 from medici/sasl_scram

DE-121 - added env variable
This commit is contained in:
nitin kulkarni
2021-07-27 11:54:07 +05:30
committed by GitHub Enterprise
4 changed files with 32 additions and 21 deletions

View File

@@ -2,6 +2,7 @@ package config
type Configurations struct {
Env string
Server ServerConfigurations
Kafka KafkaConfigurations
SchemaRegistry SchemaRegistryConfigurations

View File

@@ -1,3 +1,4 @@
env: ENVIRONMENT # read from environment variable
server:
port: 8000
goroutines: 1000

11
main.go
View File

@@ -35,14 +35,19 @@ func init() {
}
// Following coinfigurations read from environment variables
configuration.Env = viper.GetString(configuration.Env)
configuration.Kafka.Bootstrap_Servers = viper.GetString(configuration.Kafka.Bootstrap_Servers)
configuration.Kafka.Sasl_User = viper.GetString(configuration.Kafka.Sasl_User)
configuration.Kafka.Sasl_Password = viper.GetString(configuration.Kafka.Sasl_Password)
configuration.SchemaRegistry.Endpoint = viper.GetString(configuration.SchemaRegistry.Endpoint)
configuration.SchemaRegistry.Topics = viper.GetString(configuration.SchemaRegistry.Topics)
configuration.Kafka.Kafka_Topic_Json = viper.GetString(configuration.Kafka.Kafka_Topic_Json)
configuration.Kafka.Kafka_Topic_Protobuf = viper.GetString(configuration.Kafka.Kafka_Topic_Protobuf)
log.Printf("Env is: ", configuration.Env)
if configuration.Env == "PROD" {
configuration.Kafka.Sasl_User = viper.GetString(configuration.Kafka.Sasl_User)
configuration.Kafka.Sasl_Password = viper.GetString(configuration.Kafka.Sasl_Password)
}
port = configuration.Server.Port
log.Printf("PORT IS: ", port)
log.Printf(configuration.Kafka.Bootstrap_Servers)
@@ -55,7 +60,7 @@ func init() {
producer_module.GetSchemaVersions()
// initialize producers
producer_module.InitializeProducers(configuration.Kafka)
producer_module.InitializeProducers(configuration)
for i := 0; i < configuration.Server.Goroutines; i++ {
go lib.ProcessProtobufRequestChannel(configuration.Kafka.Kafka_Topic_Protobuf)

View File

@@ -16,16 +16,19 @@ var (
asyncProducer sarama.AsyncProducer
)
func GetSyncProducerConfig(kafkaConfiguration config.KafkaConfigurations) *sarama.Config {
func GetSyncProducerConfig(configuration config) *sarama.Config {
config := sarama.NewConfig()
kafkaConfiguration configuration.KafkaConfigurations
// security configs
// config.Net.TLS.Enable = true
config.Net.SASL.Enable = true
config.Net.SASL.Handshake = true
config.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512
config.Net.SASL.User = kafkaConfiguration.Sasl_User
config.Net.SASL.Password = kafkaConfiguration.Sasl_Password
if config.Env == "PROD" {
config.Net.SASL.Enable = true
config.Net.SASL.Handshake = true
config.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512
config.Net.SASL.User = kafkaConfiguration.Sasl_User
config.Net.SASL.Password = kafkaConfiguration.Sasl_Password
}
// producer configs
// to be changed: read from config file
@@ -41,17 +44,18 @@ func GetSyncProducerConfig(kafkaConfiguration config.KafkaConfigurations) *saram
}
func GetAsyncProducerConfig(kafkaConfiguration config.KafkaConfigurations) *sarama.Config {
func GetAsyncProducerConfig(configuration config) *sarama.Config {
config := sarama.NewConfig()
kafkaConfiguration configuration.KafkaConfigurations
// security configs
// config.Net.TLS.Enable = true
config.Net.SASL.Enable = true
config.Net.SASL.Handshake = true
config.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512
config.Net.SASL.User = kafkaConfiguration.Sasl_User
config.Net.SASL.Password = kafkaConfiguration.Sasl_Password
if config.Env != "PROD" {
config.Net.SASL.Enable = true
config.Net.SASL.Handshake = true
config.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512
config.Net.SASL.User = kafkaConfiguration.Sasl_User
config.Net.SASL.Password = kafkaConfiguration.Sasl_Password
}
// producer configs
// to be changed: read from config file
config.Producer.Retry.Max = 3
@@ -89,10 +93,10 @@ func GetAsyncProducer(kafkaConfiguration config.KafkaConfigurations) sarama.Asyn
}
func InitializeProducers(kafkaConfiguration config.KafkaConfigurations) {
func InitializeProducers(configuration config) {
sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
syncProducer = GetSyncProducer(kafkaConfiguration)
asyncProducer = GetAsyncProducer(kafkaConfiguration)
syncProducer = GetSyncProducer(configuration)
asyncProducer = GetAsyncProducer(configuration)
}
// using confluent-kafka-go