DE-121 - added env variable
This commit is contained in:
2
main.go
2
main.go
@@ -60,7 +60,7 @@ func init() {
|
||||
producer_module.GetSchemaVersions()
|
||||
|
||||
// initialize producers
|
||||
producer_module.InitializeProducers(configuration)
|
||||
producer_module.InitializeProducers(configuration.Kafka, configuration.Env)
|
||||
|
||||
for i := 0; i < configuration.Server.Goroutines; i++ {
|
||||
go lib.ProcessProtobufRequestChannel(configuration.Kafka.Kafka_Topic_Protobuf)
|
||||
|
||||
@@ -16,12 +16,12 @@ var (
|
||||
asyncProducer sarama.AsyncProducer
|
||||
)
|
||||
|
||||
func GetSyncProducerConfig(configuration config.Configurations) *sarama.Config {
|
||||
func GetSyncProducerConfig(kafkaConfiguration config.KafkaConfigurations, env string) *sarama.Config {
|
||||
config := sarama.NewConfig()
|
||||
kafkaConfiguration configuration.KafkaConfigurations
|
||||
//kafkaConfiguration configuration.KafkaConfigurations
|
||||
// security configs
|
||||
// config.Net.TLS.Enable = true
|
||||
if config.Env == "PROD" {
|
||||
if env == "PROD" {
|
||||
config.Net.SASL.Enable = true
|
||||
config.Net.SASL.Handshake = true
|
||||
config.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512
|
||||
@@ -44,12 +44,12 @@ func GetSyncProducerConfig(configuration config.Configurations) *sarama.Config {
|
||||
}
|
||||
|
||||
|
||||
func GetAsyncProducerConfig(configuration config.Configurations) *sarama.Config {
|
||||
func GetAsyncProducerConfig(kafkaConfiguration config.KafkaConfigurations, env string) *sarama.Config {
|
||||
config := sarama.NewConfig()
|
||||
kafkaConfiguration configuration.KafkaConfigurations
|
||||
//kafkaConfiguration configuration.KafkaConfigurations
|
||||
// security configs
|
||||
// config.Net.TLS.Enable = true
|
||||
if config.Env != "PROD" {
|
||||
if env != "PROD" {
|
||||
config.Net.SASL.Enable = true
|
||||
config.Net.SASL.Handshake = true
|
||||
config.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512
|
||||
@@ -71,8 +71,8 @@ func GetAsyncProducerConfig(configuration config.Configurations) *sarama.Config
|
||||
}
|
||||
|
||||
|
||||
func GetSyncProducer(kafkaConfiguration config.KafkaConfigurations) sarama.SyncProducer {
|
||||
config := GetSyncProducerConfig(kafkaConfiguration)
|
||||
func GetSyncProducer(kafkaConfiguration config.KafkaConfigurations, env string) sarama.SyncProducer {
|
||||
config := GetSyncProducerConfig(kafkaConfiguration, env)
|
||||
brokerList := strings.Split(kafkaConfiguration.Bootstrap_Servers, ",")
|
||||
producer, err := sarama.NewSyncProducer(brokerList, config)
|
||||
if err != nil {
|
||||
@@ -82,8 +82,8 @@ func GetSyncProducer(kafkaConfiguration config.KafkaConfigurations) sarama.SyncP
|
||||
}
|
||||
|
||||
|
||||
func GetAsyncProducer(kafkaConfiguration config.KafkaConfigurations) sarama.AsyncProducer {
|
||||
config := GetAsyncProducerConfig(kafkaConfiguration)
|
||||
func GetAsyncProducer(kafkaConfiguration config.KafkaConfigurations, env string) sarama.AsyncProducer {
|
||||
config := GetAsyncProducerConfig(kafkaConfiguration, env)
|
||||
brokerList := strings.Split(kafkaConfiguration.Bootstrap_Servers, ",")
|
||||
producer, err := sarama.NewAsyncProducer(brokerList, config)
|
||||
if err != nil {
|
||||
@@ -93,10 +93,10 @@ func GetAsyncProducer(kafkaConfiguration config.KafkaConfigurations) sarama.Asyn
|
||||
}
|
||||
|
||||
|
||||
func InitializeProducers(configuration config) {
|
||||
func InitializeProducers(kafkaConfiguration config.KafkaConfigurations, env string) {
|
||||
sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
|
||||
syncProducer = GetSyncProducer(configuration)
|
||||
asyncProducer = GetAsyncProducer(configuration)
|
||||
syncProducer = GetSyncProducer(kafkaConfiguration, env)
|
||||
asyncProducer = GetAsyncProducer(kafkaConfiguration, env)
|
||||
}
|
||||
|
||||
// using confluent-kafka-go
|
||||
|
||||
Reference in New Issue
Block a user