kafka writer function

This commit is contained in:
“nishant-sharma”
2021-03-30 13:36:55 +05:30
parent 54253b24e8
commit 503dfdee98

49
producer/kafka_writer.go Normal file
View File

@@ -0,0 +1,49 @@
package producer
import (
// "fmt"
"log"
// "io/ioutil"
"encoding/binary"
// data "com.navi.medici.janus/data"
"github.com/Shopify/sarama"
)
func KafkaWriter(producer sarama.SyncProducer, topic string, messageToSendBytes []byte) {
// to be of the format [magicByte] + [schemaID] + [messageIndex] + [value]
recordValue := []byte{}
// add [magicByte]
recordValue = append(recordValue, byte(0))
// add schemaID]
schemaIDBytes := make([]byte, 4)
binary.BigEndian.PutUint32(schemaIDBytes, uint32(schemaVersionMap[topic]))
recordValue = append(recordValue, schemaIDBytes...)
// add [messageIndex]
messageIndexBytes := []byte{byte(2), byte(0)}
recordValue = append(recordValue, messageIndexBytes...)
// Now write the bytes from the actual value...
// valueBytes, _ := proto.Marshal(&sensorReading)
recordValue = append(recordValue, messageToSendBytes...)
msg := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.ByteEncoder(recordValue),
}
partition, offset, error := producer.SendMessage(msg)
if error != nil {
log.Fatalln("Failed to write to kafka:", error)
}
log.Printf("Partition: ", partition)
log.Printf("Offset: ", offset)
// log.Printf(error)
// log.Printf("Pixel sent: %s", messageToSend)
}