Files
janus/producer/kafka_writer.go
2023-02-02 17:47:11 +05:30

26 lines
853 B
Go

package producer
import (
metrics "com.navi.medici.janus/instrumentation"
"com.navi.medici.janus/utils"
"github.com/Shopify/sarama"
"go.uber.org/zap"
"time"
)
func WriteMessageToKafkaAsync(message *sarama.ProducerMessage, source string) {
beforeKafkaPushTime := utils.NanosToMillis(time.Now().UnixNano())
asyncProducer.Input() <- message
metrics.KafkaPushTimeHist.WithLabelValues(message.Topic, source).Observe(float64(utils.NanosToMillis(time.Now().UnixNano()) - beforeKafkaPushTime))
metrics.KafkaPushCounter.WithLabelValues(message.Topic, source).Inc()
logger.Debug("SUCCESSFULLY WRITTEN TO KAFKA FOR SOURCE " + source)
}
func processProducerErrors() {
for {
err := <-asyncProducer.Errors()
logger.Error("Failed to write message to Kafka", zap.Error(err.Err))
metrics.ProducerFailCounter.WithLabelValues(err.Msg.Topic).Inc()
}
}