Files
houston-be/pkg/kafka/produce/produce.go

64 lines
1.6 KiB
Go
Raw Normal View History

package kafka
import (
"blaze/pkg/kafka"
"os"
"github.com/Shopify/sarama"
"go.uber.org/zap"
)
type KProducer struct {
sarama.AsyncProducer
logger *zap.Logger
}
func NewKProducer(logger *zap.Logger) *KProducer {
producer, err := kafka.SaramaSyncProducer()
if err != nil {
logger.Error("sarama kafka producer failed", zap.Error(err))
os.Exit(1)
}
kProducer := &KProducer{
AsyncProducer: producer,
logger: logger,
}
kProducer.Errors()
kProducer.Successes()
return kProducer
}
// Errors keep the track of failed messages.
func (kp *KProducer) Errors() {
go func() {
for err := range kp.AsyncProducer.Errors() {
keyBytes, errEncode := err.Msg.Key.Encode()
if errEncode != nil {
kp.logger.Error("key encoding failed for failed message", zap.String("topic", err.Msg.Topic))
}
// metrics.KafkaEventIngestionEventFailureCounter.WithLabelValues(err.Msg.Topic).Inc()
kp.logger.Error("failed to emit event to kafka", zap.String("topic", err.Msg.Topic),
zap.String("key", string(keyBytes)), zap.Any("value", string(keyBytes)), zap.String("error", err.Error()))
}
}()
}
// Successes is to check if message successfully delivered to kafka
func (kp *KProducer) Successes() {
go func() {
for msg := range kp.AsyncProducer.Successes() {
keyBytes, errEncode := msg.Key.Encode()
if errEncode != nil {
kp.logger.Error("key encoding failed for failed message", zap.String("topic", msg.Topic))
}
// metrics.KafkaEventIngestionEventSuccessCounter.WithLabelValues(msg.Topic).Inc()
kp.logger.Info("successfully ingested event to kafka", zap.String("topic", msg.Topic), zap.String("key", string(keyBytes)))
}
}()
}