diff --git a/producer/kafka_writer.go b/producer/kafka_writer.go new file mode 100644 index 0000000..8d10d08 --- /dev/null +++ b/producer/kafka_writer.go @@ -0,0 +1,49 @@ +package producer + +import ( + // "fmt" + "log" + // "io/ioutil" + "encoding/binary" + + // data "com.navi.medici.janus/data" + + "github.com/Shopify/sarama" +) + +func KafkaWriter(producer sarama.SyncProducer, topic string, messageToSendBytes []byte) { + + // to be of the format [magicByte] + [schemaID] + [messageIndex] + [value] + recordValue := []byte{} + + // add [magicByte] + recordValue = append(recordValue, byte(0)) + + // add schemaID] + schemaIDBytes := make([]byte, 4) + binary.BigEndian.PutUint32(schemaIDBytes, uint32(schemaVersionMap[topic])) + recordValue = append(recordValue, schemaIDBytes...) + + // add [messageIndex] + messageIndexBytes := []byte{byte(2), byte(0)} + recordValue = append(recordValue, messageIndexBytes...) + + // Now write the bytes from the actual value... + // valueBytes, _ := proto.Marshal(&sensorReading) + recordValue = append(recordValue, messageToSendBytes...) + + msg := &sarama.ProducerMessage{ + Topic: topic, + Value: sarama.ByteEncoder(recordValue), + } + + partition, offset, error := producer.SendMessage(msg) + if error != nil { + log.Fatalln("Failed to write to kafka:", error) + } + log.Printf("Partition: ", partition) + log.Printf("Offset: ", offset) + // log.Printf(error) + // log.Printf("Pixel sent: %s", messageToSend) + +}