2021-03-30 13:36:55 +05:30
|
|
|
package producer
|
|
|
|
|
|
|
|
|
|
import (
|
2023-01-12 12:59:22 +05:30
|
|
|
metrics "com.navi.medici.janus/instrumentation"
|
|
|
|
|
"com.navi.medici.janus/utils"
|
|
|
|
|
"github.com/Shopify/sarama"
|
|
|
|
|
"go.uber.org/zap"
|
|
|
|
|
"time"
|
2021-03-30 13:36:55 +05:30
|
|
|
)
|
|
|
|
|
|
2022-03-31 16:24:32 +05:30
|
|
|
func WriteMessageToKafkaAsync(message *sarama.ProducerMessage, source string) {
|
2023-01-12 12:59:22 +05:30
|
|
|
beforeKafkaPushTime := utils.NanosToMillis(time.Now().UnixNano())
|
|
|
|
|
asyncProducer.Input() <- message
|
|
|
|
|
metrics.KafkaPushTimeHist.WithLabelValues(message.Topic, source).Observe(float64(utils.NanosToMillis(time.Now().UnixNano()) - beforeKafkaPushTime))
|
|
|
|
|
metrics.KafkaPushCounter.WithLabelValues(message.Topic, source).Inc()
|
|
|
|
|
logger.Info("SUCCESSFULLY WRITTEN TO KAFKA FOR SOURCE " + source)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func processProducerSuccesses() {
|
|
|
|
|
for {
|
|
|
|
|
msg := <-asyncProducer.Successes()
|
|
|
|
|
metrics.ProducerSuccessCounter.WithLabelValues(msg.Topic).Inc()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func processProducerErrors() {
|
|
|
|
|
for {
|
|
|
|
|
err := <-asyncProducer.Errors()
|
|
|
|
|
logger.Error("Failed to write message to Kafka", zap.Error(err.Err))
|
|
|
|
|
metrics.ProducerFailCounter.WithLabelValues(err.Msg.Topic).Inc()
|
|
|
|
|
}
|
|
|
|
|
|
2021-03-30 13:36:55 +05:30
|
|
|
}
|