DE-4747 instrumentation
This commit is contained in:
@@ -1,16 +1,19 @@
|
||||
package producer
|
||||
|
||||
import (
|
||||
metrics "com.navi.medici.janus/instrumentation"
|
||||
"log"
|
||||
|
||||
"github.com/Shopify/sarama"
|
||||
)
|
||||
|
||||
|
||||
func WriteMessageToKafkaAsync(message *sarama.ProducerMessage) {
|
||||
func WriteMessageToKafkaAsync(message *sarama.ProducerMessage, source string) {
|
||||
select {
|
||||
case asyncProducer.Input() <- message:
|
||||
metrics.IncrementCounter("success", source)
|
||||
log.Printf("SUCCESSFULLY WRITTEN TO KAFKA FOR SOURCE %v", source)
|
||||
case err := <- asyncProducer.Errors():
|
||||
log.Printf("FAILED TO WRITE TO KAFKA", err)
|
||||
metrics.IncrementCounter("fail", source)
|
||||
log.Printf("FAILED TO WRITE TO KAFKA FOR SOURCE %v %v", source, err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -21,6 +21,7 @@ func GetSyncProducerConfig(kafkaConfiguration config.KafkaConfigurations, env st
|
||||
config := sarama.NewConfig()
|
||||
// security configs
|
||||
//if env == "PROD" {
|
||||
log.Print(kafkaConfiguration.Sasl_User)
|
||||
config.Net.TLS.Enable = true
|
||||
config.Net.TLS.Config = createTLSConfiguration()
|
||||
config.Net.SASL.Enable = true
|
||||
@@ -52,6 +53,7 @@ func GetAsyncProducerConfig(kafkaConfiguration config.KafkaConfigurations, env s
|
||||
config := sarama.NewConfig()
|
||||
// security configs
|
||||
//if env == "PROD" {
|
||||
log.Print(kafkaConfiguration.Sasl_User)
|
||||
config.Net.TLS.Enable = true
|
||||
config.Net.TLS.Config = createTLSConfiguration()
|
||||
config.Net.SASL.Enable = true
|
||||
|
||||
Reference in New Issue
Block a user