Files
cybertron/pkg/kafka/producer/produce.go
2024-12-19 15:39:07 +05:30

75 lines
2.0 KiB
Go

package producer
import (
"cybertron/configs"
"cybertron/pkg/encoder"
"cybertron/pkg/kafka"
"cybertron/pkg/log"
ConfluentKafka "github.com/confluentinc/confluent-kafka-go/v2/kafka"
"go.uber.org/zap"
"os"
)
type KProducer interface {
PublishEvent(msgPayload interface{}, topic, key string, headers map[string]string, marshaller encoder.Encoder) error
}
type KProducerImpl struct {
AsyncProducer *ConfluentKafka.Producer
}
type ConfluentKafkaImplementation struct {
AsyncProducer *ConfluentKafka.Producer
}
func NewKProducer(env string, baseConfig *configs.KafkaConfig) *KProducerImpl {
producer, err := kafka.ConfluentProducer(baseConfig)
if err != nil {
log.Log.Error(" kafka producer failed", zap.Error(err))
os.Exit(1)
}
kProducer := &KProducerImpl{
AsyncProducer: producer,
}
return kProducer
}
func (kp *KProducerImpl) PublishEvent(msgPayload interface{}, topic, key string, headers map[string]string, marshaller encoder.Encoder) error {
msg, err := marshaller.Encode(msgPayload)
if err != nil {
log.Log.Error("error while marshalling msg payload", zap.Error(err))
return err
}
deliveryChan := make(chan ConfluentKafka.Event)
producerError := kp.AsyncProducer.Produce(&ConfluentKafka.Message{
TopicPartition: ConfluentKafka.TopicPartition{
Topic: &topic,
Partition: ConfluentKafka.PartitionAny,
},
Value: msg,
}, deliveryChan)
if producerError != nil {
log.Log.Error("producer error: %v", zap.Error(producerError))
}
e := <-deliveryChan
message := e.(*ConfluentKafka.Message)
if message.TopicPartition.Error != nil {
log.Log.Info("failed to deliver message:",
zap.Int32("topic partition", message.TopicPartition.Partition))
} else {
//log.Log.Info("delivered to topic at offset",
// zap.String("topic", *message.TopicPartition.Topic),
// zap.Int32("partition", message.TopicPartition.Partition),
// zap.String("offset", message.TopicPartition.Offset.String()))
}
close(deliveryChan)
kp.AsyncProducer.Flush(30)
return nil
}