package producer import ( config "com.navi.medici.janus/config" "os" "log" "time" "strings" "github.com/Shopify/sarama" // "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka" ) var ( syncProducer sarama.SyncProducer asyncProducer sarama.AsyncProducer ) func GetSyncProducerConfig(kafkaConfiguration config.KafkaConfigurations) *sarama.Config { config := sarama.NewConfig() // security configs // config.Net.TLS.Enable = true config.Net.SASL.Enable = true config.Net.SASL.Handshake = true config.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512 config.Net.SASL.User = kafkaConfiguration.Sasl_User config.Net.SASL.Password = kafkaConfiguration.Sasl_Password // producer configs // to be changed: read from config file config.Producer.Retry.Max = 3 config.Producer.RequiredAcks = sarama.WaitForLocal config.Producer.Compression = sarama.CompressionGZIP config.Producer.Return.Successes = true // metadata configs config.Metadata.RefreshFrequency = 1 * time.Minute return config } func GetAsyncProducerConfig(kafkaConfiguration config.KafkaConfigurations) *sarama.Config { config := sarama.NewConfig() // security configs // config.Net.TLS.Enable = true config.Net.SASL.Enable = true config.Net.SASL.Handshake = true config.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512 config.Net.SASL.User = kafkaConfiguration.Sasl_User config.Net.SASL.Password = kafkaConfiguration.Sasl_Password // producer configs // to be changed: read from config file config.Producer.Retry.Max = 3 config.Producer.RequiredAcks = sarama.WaitForLocal config.Producer.Compression = sarama.CompressionGZIP config.Producer.Flush.Bytes = 31000 config.Producer.Flush.Frequency = 100 * time.Millisecond // metadata configs config.Metadata.RefreshFrequency = 1 * time.Minute return config } func GetSyncProducer(kafkaConfiguration config.KafkaConfigurations) sarama.SyncProducer { config := GetSyncProducerConfig(kafkaConfiguration) brokerList := strings.Split(kafkaConfiguration.Bootstrap_Servers, ",") producer, err := sarama.NewSyncProducer(brokerList, config) if err != nil { panic(err) } return producer } func GetAsyncProducer(kafkaConfiguration config.KafkaConfigurations) sarama.AsyncProducer { config := GetAsyncProducerConfig(kafkaConfiguration) brokerList := strings.Split(kafkaConfiguration.Bootstrap_Servers, ",") producer, err := sarama.NewAsyncProducer(brokerList, config) if err != nil { panic(err) } return producer } func InitializeProducers(kafkaConfiguration config.KafkaConfigurations) { sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags) syncProducer = GetSyncProducer(kafkaConfiguration) asyncProducer = GetAsyncProducer(kafkaConfiguration) } // using confluent-kafka-go // func GetProducerConfig(kafkaConfiguration config.KafkaConfigurations) *kafka.ConfigMap { // var config = kafka.ConfigMap { // "bootstrap.servers": kafkaConfiguration.Bootstrap_Servers, // // "ssl.endpoint.identification.algorithm" : kafkaConfiguration.SSL_Endpoint_Algorithm, // // "sasl.mechanism": kafkaConfiguration.SASL_Mechanism, // // "request.timeout.ms": kafkaConfiguration.Request_Timeout_Ms, // // "security.protocol": kafkaConfiguration.Security_Protocol, // // "retry.backoff.ms": kafkaConfiguration.Retry_Backoff_MS, // // "sasl.jaas.config": kafkaConfiguration.Sasl_JAAS_Config // } // return &config // } // func GetProducer(kafkaConfiguration config.KafkaConfigurations) *kafka.Producer { // var config = GetProducerConfig(kafkaConfiguration) // producer, err := kafka.NewProducer(config) // if err != nil { // panic(err) // } // return producer // }