Files
janus/producer/kafka_writer.go

26 lines
853 B
Go
Raw Normal View History

2021-03-30 13:36:55 +05:30
package producer
import (
metrics "com.navi.medici.janus/instrumentation"
"com.navi.medici.janus/utils"
"github.com/Shopify/sarama"
"go.uber.org/zap"
"time"
2021-03-30 13:36:55 +05:30
)
2022-03-31 16:24:32 +05:30
func WriteMessageToKafkaAsync(message *sarama.ProducerMessage, source string) {
beforeKafkaPushTime := utils.NanosToMillis(time.Now().UnixNano())
asyncProducer.Input() <- message
metrics.KafkaPushTimeHist.WithLabelValues(message.Topic, source).Observe(float64(utils.NanosToMillis(time.Now().UnixNano()) - beforeKafkaPushTime))
metrics.KafkaPushCounter.WithLabelValues(message.Topic, source).Inc()
2023-02-02 14:00:32 +05:30
logger.Debug("SUCCESSFULLY WRITTEN TO KAFKA FOR SOURCE " + source)
}
func processProducerErrors() {
for {
err := <-asyncProducer.Errors()
logger.Error("Failed to write message to Kafka", zap.Error(err.Err))
metrics.ProducerFailCounter.WithLabelValues(err.Msg.Topic).Inc()
}
2021-03-30 13:36:55 +05:30
}