added level based logging using uber/zap
This commit is contained in:
@@ -1,46 +1,46 @@
|
||||
package producer
|
||||
|
||||
import (
|
||||
config "com.navi.medici.janus/config"
|
||||
|
||||
"os"
|
||||
"com.navi.medici.janus/config"
|
||||
"com.navi.medici.janus/utils"
|
||||
"go.uber.org/zap"
|
||||
"log"
|
||||
"time"
|
||||
"strings"
|
||||
"github.com/Shopify/sarama"
|
||||
|
||||
"crypto/tls"
|
||||
// "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
|
||||
"github.com/Shopify/sarama"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
var (
|
||||
syncProducer sarama.SyncProducer
|
||||
asyncProducer sarama.AsyncProducer
|
||||
syncProducer sarama.SyncProducer
|
||||
asyncProducer sarama.AsyncProducer
|
||||
logger *zap.Logger
|
||||
)
|
||||
|
||||
func GetSyncProducerConfig(kafkaConfiguration config.KafkaConfigurations, env string) *sarama.Config {
|
||||
config := sarama.NewConfig()
|
||||
// security configs
|
||||
// security configs
|
||||
//if env == "PROD" {
|
||||
log.Print(kafkaConfiguration.Sasl_User)
|
||||
config.Net.TLS.Enable = true
|
||||
config.Net.TLS.Config = createTLSConfiguration()
|
||||
config.Net.SASL.Enable = true
|
||||
config.Net.SASL.Handshake = true
|
||||
config.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512
|
||||
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
|
||||
return &XDGSCRAMClient{HashGeneratorFcn: SHA512}
|
||||
}
|
||||
config.Net.SASL.User = kafkaConfiguration.Sasl_User
|
||||
config.Net.SASL.Password = kafkaConfiguration.Sasl_Password
|
||||
config.Net.TLS.Enable = true
|
||||
config.Net.TLS.Config = createTLSConfiguration()
|
||||
config.Net.SASL.Enable = true
|
||||
config.Net.SASL.Handshake = true
|
||||
config.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512
|
||||
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
|
||||
return &XDGSCRAMClient{HashGeneratorFcn: SHA512}
|
||||
}
|
||||
config.Net.SASL.User = kafkaConfiguration.Sasl_User
|
||||
config.Net.SASL.Password = kafkaConfiguration.Sasl_Password
|
||||
//}
|
||||
|
||||
|
||||
// producer configs
|
||||
// to be changed: read from config file
|
||||
config.Producer.Retry.Max = 3
|
||||
config.Producer.RequiredAcks = sarama.WaitForLocal
|
||||
config.Producer.Compression = sarama.CompressionGZIP
|
||||
config.Producer.Return.Successes = true
|
||||
config.Producer.Retry.Max = 3
|
||||
config.Producer.RequiredAcks = sarama.WaitForLocal
|
||||
config.Producer.Compression = sarama.CompressionGZIP
|
||||
config.Producer.Return.Successes = true
|
||||
|
||||
// metadata configs
|
||||
config.Metadata.RefreshFrequency = 1 * time.Minute
|
||||
@@ -48,38 +48,35 @@ func GetSyncProducerConfig(kafkaConfiguration config.KafkaConfigurations, env st
|
||||
return config
|
||||
}
|
||||
|
||||
|
||||
func GetAsyncProducerConfig(kafkaConfiguration config.KafkaConfigurations, env string) *sarama.Config {
|
||||
config := sarama.NewConfig()
|
||||
// security configs
|
||||
// security configs
|
||||
//if env == "PROD" {
|
||||
log.Print(kafkaConfiguration.Sasl_User)
|
||||
config.Net.TLS.Enable = true
|
||||
config.Net.TLS.Config = createTLSConfiguration()
|
||||
config.Net.SASL.Enable = true
|
||||
config.Net.SASL.Handshake = true
|
||||
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512)
|
||||
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
|
||||
return &XDGSCRAMClient{HashGeneratorFcn: SHA512}
|
||||
}
|
||||
config.Net.SASL.User = kafkaConfiguration.Sasl_User
|
||||
config.Net.SASL.Password = kafkaConfiguration.Sasl_Password
|
||||
//}
|
||||
config.Net.TLS.Enable = true
|
||||
config.Net.TLS.Config = createTLSConfiguration()
|
||||
config.Net.SASL.Enable = true
|
||||
config.Net.SASL.Handshake = true
|
||||
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512)
|
||||
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
|
||||
return &XDGSCRAMClient{HashGeneratorFcn: SHA512}
|
||||
}
|
||||
config.Net.SASL.User = kafkaConfiguration.Sasl_User
|
||||
config.Net.SASL.Password = kafkaConfiguration.Sasl_Password
|
||||
//}
|
||||
// producer configs
|
||||
// to be changed: read from config file
|
||||
config.Producer.Retry.Max = 3
|
||||
config.Producer.RequiredAcks = sarama.WaitForLocal
|
||||
config.Producer.Compression = sarama.CompressionGZIP
|
||||
config.Producer.Flush.Bytes = 31000
|
||||
config.Producer.Flush.Frequency = 100 * time.Millisecond
|
||||
config.Producer.Retry.Max = 3
|
||||
config.Producer.RequiredAcks = sarama.WaitForLocal
|
||||
config.Producer.Compression = sarama.CompressionGZIP
|
||||
config.Producer.Flush.Bytes = 31000
|
||||
config.Producer.Flush.Frequency = 100 * time.Millisecond
|
||||
|
||||
// metadata configs
|
||||
config.Metadata.RefreshFrequency = 1 * time.Minute
|
||||
config.Metadata.RefreshFrequency = 1 * time.Minute
|
||||
|
||||
return config
|
||||
}
|
||||
|
||||
|
||||
func GetSyncProducer(kafkaConfiguration config.KafkaConfigurations, env string) sarama.SyncProducer {
|
||||
config := GetSyncProducerConfig(kafkaConfiguration, env)
|
||||
brokerList := strings.Split(kafkaConfiguration.Bootstrap_Servers, ",")
|
||||
@@ -90,7 +87,6 @@ func GetSyncProducer(kafkaConfiguration config.KafkaConfigurations, env string)
|
||||
return producer
|
||||
}
|
||||
|
||||
|
||||
func GetAsyncProducer(kafkaConfiguration config.KafkaConfigurations, env string) sarama.AsyncProducer {
|
||||
config := GetAsyncProducerConfig(kafkaConfiguration, env)
|
||||
brokerList := strings.Split(kafkaConfiguration.Bootstrap_Servers, ",")
|
||||
@@ -101,9 +97,9 @@ func GetAsyncProducer(kafkaConfiguration config.KafkaConfigurations, env string)
|
||||
return producer
|
||||
}
|
||||
|
||||
|
||||
func InitializeProducers(kafkaConfiguration config.KafkaConfigurations, env string) {
|
||||
sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
|
||||
logger = utils.GetLogger()
|
||||
syncProducer = GetSyncProducer(kafkaConfiguration, env)
|
||||
asyncProducer = GetAsyncProducer(kafkaConfiguration, env)
|
||||
}
|
||||
@@ -115,7 +111,6 @@ func createTLSConfiguration() (t *tls.Config) {
|
||||
return t
|
||||
}
|
||||
|
||||
|
||||
// using confluent-kafka-go
|
||||
// func GetProducerConfig(kafkaConfiguration config.KafkaConfigurations) *kafka.ConfigMap {
|
||||
// var config = kafka.ConfigMap {
|
||||
@@ -130,7 +125,6 @@ func createTLSConfiguration() (t *tls.Config) {
|
||||
// return &config
|
||||
// }
|
||||
|
||||
|
||||
// func GetProducer(kafkaConfiguration config.KafkaConfigurations) *kafka.Producer {
|
||||
// var config = GetProducerConfig(kafkaConfiguration)
|
||||
// producer, err := kafka.NewProducer(config)
|
||||
|
||||
Reference in New Issue
Block a user