146 lines
4.9 KiB
Go
146 lines
4.9 KiB
Go
package producer
|
|
|
|
import (
|
|
"crypto/tls"
|
|
"strings"
|
|
"time"
|
|
|
|
"com.navi.medici.janus/config"
|
|
metrics "com.navi.medici.janus/instrumentation"
|
|
"com.navi.medici.janus/utils"
|
|
"github.com/Shopify/sarama"
|
|
"github.com/sony/gobreaker"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
var (
|
|
asyncProducer sarama.AsyncProducer
|
|
logger *zap.Logger
|
|
cb *gobreaker.CircuitBreaker
|
|
defaultClickstreamTopic string
|
|
litmusClickstreamTopic string
|
|
appClickstreamTopic string
|
|
)
|
|
|
|
func InitializeProducers(kafkaConfiguration config.KafkaConfigurations, env string) {
|
|
logger = utils.GetLogger()
|
|
asyncProducer = GetAsyncProducer(kafkaConfiguration, env)
|
|
defaultClickstreamTopic = kafkaConfiguration.Kafka_Topic_Json
|
|
litmusClickstreamTopic = kafkaConfiguration.Kafka_Topic_Litmus
|
|
appClickstreamTopic = kafkaConfiguration.Kafka_Topic_App
|
|
|
|
cb = gobreaker.NewCircuitBreaker(gobreaker.Settings{
|
|
Name: "kafka-producer",
|
|
Timeout: 10 * time.Second,
|
|
ReadyToTrip: func(counts gobreaker.Counts) bool {
|
|
failureRatio := float64(counts.TotalFailures) / float64(counts.Requests)
|
|
return counts.Requests >= 3 && failureRatio >= 0.6
|
|
},
|
|
OnStateChange: func(name string, from gobreaker.State, to gobreaker.State) {
|
|
logger.Info("Circuit breaker state changed",
|
|
zap.String("name", name),
|
|
zap.String("from", from.String()),
|
|
zap.String("to", to.String()))
|
|
},
|
|
})
|
|
|
|
go processProducerErrors()
|
|
}
|
|
|
|
func GetDefaultClickstreamTopic() string {
|
|
return defaultClickstreamTopic
|
|
}
|
|
|
|
func GetLitmusClickstreamTopic() string {
|
|
return litmusClickstreamTopic
|
|
}
|
|
|
|
func GetAppClickstreamTopic() string {
|
|
return appClickstreamTopic
|
|
}
|
|
|
|
func GetAsyncProducerConfig(kafkaConfiguration config.KafkaConfigurations, env string) *sarama.Config {
|
|
config := sarama.NewConfig()
|
|
|
|
config.Net.TLS.Enable = true
|
|
config.Net.TLS.Config = createTLSConfiguration()
|
|
config.Net.SASL.Enable = true
|
|
config.Net.SASL.Handshake = true
|
|
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512)
|
|
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
|
|
return &XDGSCRAMClient{HashGeneratorFcn: SHA512}
|
|
}
|
|
config.Net.SASL.User = kafkaConfiguration.Sasl_User
|
|
config.Net.SASL.Password = kafkaConfiguration.Sasl_Password
|
|
|
|
config.Producer.Retry.Max = 3
|
|
config.Producer.RequiredAcks = sarama.WaitForLocal
|
|
config.Producer.Compression = sarama.CompressionGZIP
|
|
config.Producer.Flush.Bytes = 31000
|
|
config.Producer.Flush.Frequency = 100 * time.Millisecond
|
|
|
|
config.Metadata.RefreshFrequency = 1 * time.Minute
|
|
|
|
return config
|
|
}
|
|
|
|
func GetAsyncProducer(kafkaConfiguration config.KafkaConfigurations, env string) sarama.AsyncProducer {
|
|
config := GetAsyncProducerConfig(kafkaConfiguration, env)
|
|
brokerList := strings.Split(kafkaConfiguration.Bootstrap_Servers, ",")
|
|
producer, err := sarama.NewAsyncProducer(brokerList, config)
|
|
if err != nil {
|
|
logger.Fatal("Failed to create async producer", zap.Error(err))
|
|
}
|
|
return producer
|
|
}
|
|
|
|
func WriteMessageToKafkaAsync(message *sarama.ProducerMessage, source string) error {
|
|
_, err := cb.Execute(func() (interface{}, error) {
|
|
beforeKafkaPushTime := utils.NanosToMillis(time.Now().UnixNano())
|
|
asyncProducer.Input() <- message
|
|
metrics.KafkaPushTimeHist.WithLabelValues(message.Topic, source).Observe(float64(utils.NanosToMillis(time.Now().UnixNano()) - beforeKafkaPushTime))
|
|
metrics.KafkaPushCounter.WithLabelValues(message.Topic, source).Inc()
|
|
logger.Debug("Successfully written to Kafka topic : " + message.Topic + " for source : " + source)
|
|
return nil, nil
|
|
})
|
|
return err
|
|
}
|
|
|
|
func processProducerErrors() {
|
|
for err := range asyncProducer.Errors() {
|
|
logger.Error("Failed to write message to Kafka", zap.Error(err))
|
|
metrics.ProducerFailCounter.WithLabelValues(err.Msg.Topic).Inc()
|
|
}
|
|
}
|
|
|
|
func createTLSConfiguration() (t *tls.Config) {
|
|
t = &tls.Config{
|
|
InsecureSkipVerify: false,
|
|
MinVersion: tls.VersionTLS12,
|
|
}
|
|
return t
|
|
}
|
|
|
|
// using confluent-kafka-go
|
|
// func GetProducerConfig(kafkaConfiguration config.KafkaConfigurations) *kafka.ConfigMap {
|
|
// var config = kafka.ConfigMap {
|
|
// "bootstrap.servers": kafkaConfiguration.Bootstrap_Servers,
|
|
// // "ssl.endpoint.identification.algorithm" : kafkaConfiguration.SSL_Endpoint_Algorithm,
|
|
// // "sasl.mechanism": kafkaConfiguration.SASL_Mechanism,
|
|
// // "request.timeout.ms": kafkaConfiguration.Request_Timeout_Ms,
|
|
// // "security.protocol": kafkaConfiguration.Security_Protocol,
|
|
// // "retry.backoff.ms": kafkaConfiguration.Retry_Backoff_MS,
|
|
// // "sasl.jaas.config": kafkaConfiguration.Sasl_JAAS_Config
|
|
// }
|
|
// return &config
|
|
// }
|
|
|
|
// func GetProducer(kafkaConfiguration config.KafkaConfigurations) *kafka.Producer {
|
|
// var config = GetProducerConfig(kafkaConfiguration)
|
|
// producer, err := kafka.NewProducer(config)
|
|
// if err != nil {
|
|
// panic(err)
|
|
// }
|
|
// return producer
|
|
// }
|