DE-121 - added scram

This commit is contained in:
Nitin
2021-07-27 18:39:42 +05:30
parent 43516e4bb4
commit 514f2b3e00

View File

@@ -8,6 +8,7 @@ import (
"time" "time"
"strings" "strings"
"github.com/Shopify/sarama" "github.com/Shopify/sarama"
"crypto/tls"
// "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka" // "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
) )
@@ -22,6 +23,8 @@ func GetSyncProducerConfig(kafkaConfiguration config.KafkaConfigurations, env st
// security configs // security configs
// config.Net.TLS.Enable = true // config.Net.TLS.Enable = true
if env == "PROD" { if env == "PROD" {
config.Net.TLS.Enable = true
config.Net.TLS.Config = createTLSConfiguration()
config.Net.SASL.Enable = true config.Net.SASL.Enable = true
config.Net.SASL.Handshake = true config.Net.SASL.Handshake = true
config.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512 config.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512
@@ -53,6 +56,8 @@ func GetAsyncProducerConfig(kafkaConfiguration config.KafkaConfigurations, env s
// security configs // security configs
// config.Net.TLS.Enable = true // config.Net.TLS.Enable = true
if env == "PROD" { if env == "PROD" {
config.Net.TLS.Enable = true
config.Net.TLS.Config = createTLSConfiguration()
config.Net.SASL.Enable = true config.Net.SASL.Enable = true
config.Net.SASL.Handshake = true config.Net.SASL.Handshake = true
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512) config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512)
@@ -105,6 +110,14 @@ func InitializeProducers(kafkaConfiguration config.KafkaConfigurations, env stri
asyncProducer = GetAsyncProducer(kafkaConfiguration, env) asyncProducer = GetAsyncProducer(kafkaConfiguration, env)
} }
func createTLSConfiguration() (t *tls.Config) {
t = &tls.Config{
InsecureSkipVerify: false,
}
return t
}
// using confluent-kafka-go // using confluent-kafka-go
// func GetProducerConfig(kafkaConfiguration config.KafkaConfigurations) *kafka.ConfigMap { // func GetProducerConfig(kafkaConfiguration config.KafkaConfigurations) *kafka.ConfigMap {
// var config = kafka.ConfigMap { // var config = kafka.ConfigMap {