26 lines
853 B
Go
26 lines
853 B
Go
package producer
|
|
|
|
import (
|
|
metrics "com.navi.medici.janus/instrumentation"
|
|
"com.navi.medici.janus/utils"
|
|
"github.com/Shopify/sarama"
|
|
"go.uber.org/zap"
|
|
"time"
|
|
)
|
|
|
|
func WriteMessageToKafkaAsync(message *sarama.ProducerMessage, source string) {
|
|
beforeKafkaPushTime := utils.NanosToMillis(time.Now().UnixNano())
|
|
asyncProducer.Input() <- message
|
|
metrics.KafkaPushTimeHist.WithLabelValues(message.Topic, source).Observe(float64(utils.NanosToMillis(time.Now().UnixNano()) - beforeKafkaPushTime))
|
|
metrics.KafkaPushCounter.WithLabelValues(message.Topic, source).Inc()
|
|
logger.Debug("SUCCESSFULLY WRITTEN TO KAFKA FOR SOURCE " + source)
|
|
}
|
|
|
|
func processProducerErrors() {
|
|
for {
|
|
err := <-asyncProducer.Errors()
|
|
logger.Error("Failed to write message to Kafka", zap.Error(err.Err))
|
|
metrics.ProducerFailCounter.WithLabelValues(err.Msg.Topic).Inc()
|
|
}
|
|
}
|