write to kafka using request handler channel

This commit is contained in:
“nishant-sharma”
2021-03-30 16:03:09 +05:30
parent 503dfdee98
commit 5acd0d552e
6 changed files with 38 additions and 48 deletions

11
main.go
View File

@@ -2,9 +2,10 @@ package main
import ( import (
lib "com.navi.medici.janus/lib"
config "com.navi.medici.janus/config" config "com.navi.medici.janus/config"
producer_module "com.navi.medici.janus/producer"
server "com.navi.medici.janus/server" server "com.navi.medici.janus/server"
producer_module "com.navi.medici.janus/producer"
"fmt" "fmt"
"log" "log"
@@ -48,11 +49,17 @@ func init() {
log.Printf(configuration.SchemaRegistry.Endpoint) log.Printf(configuration.SchemaRegistry.Endpoint)
producer_module.GetSchemaVersions(configuration.SchemaRegistry.Endpoint, strings.Split(configuration.SchemaRegistry.Topics, ",")) producer_module.GetSchemaVersions(configuration.SchemaRegistry.Endpoint, strings.Split(configuration.SchemaRegistry.Topics, ","))
producer_module.InitializeProducers(configuration.Kafka)
// TO DO: read number of goroutines from config
for i := 0; i < 10; i++ {
go lib.ProcessRequestChannel()
}
// sync producer using sarama // sync producer using sarama
// sync_producer := producer_module.GetSyncProducer(configuration.Kafka) // sync_producer := producer_module.GetSyncProducer(configuration.Kafka)
// log.Println(sync_producer) // log.Println(sync_producer)
// producer_module.KafkaWriter(sync_producer) // producer_module.KafkaWriter(sync_producer, topic, messageToSendBytes)
// sync_producer.Close() // sync_producer.Close()
// async producer using sarama // async producer using sarama

View File

@@ -8,7 +8,7 @@ import (
var ( var (
schemaVersionMap = make(map[string]int) SchemaVersionMap = make(map[string]int)
) )
@@ -22,10 +22,10 @@ func GetSchemaVersions(schemaRegistryEndpoint string, topicList []string) {
log.Println(err) log.Println(err)
} else { } else {
schemaId := schema.ID() schemaId := schema.ID()
schemaVersionMap[topic] = schemaId SchemaVersionMap[topic] = schemaId
} }
} }
log.Println(schemaVersionMap) log.Println(SchemaVersionMap)
} }

View File

@@ -1,49 +1,16 @@
package producer package producer
import ( import (
// "fmt"
"log" "log"
// "io/ioutil"
"encoding/binary"
// data "com.navi.medici.janus/data"
"github.com/Shopify/sarama" "github.com/Shopify/sarama"
) )
func KafkaWriter(producer sarama.SyncProducer, topic string, messageToSendBytes []byte) {
// to be of the format [magicByte] + [schemaID] + [messageIndex] + [value] func WriteMessageToKafkaAsync(message *sarama.ProducerMessage) {
recordValue := []byte{} select {
case asyncProducer.Input() <- message:
// add [magicByte] case err := <- asyncProducer.Errors():
recordValue = append(recordValue, byte(0)) log.Printf("FAILED TO WRITE TO KAFKA", err)
// add schemaID]
schemaIDBytes := make([]byte, 4)
binary.BigEndian.PutUint32(schemaIDBytes, uint32(schemaVersionMap[topic]))
recordValue = append(recordValue, schemaIDBytes...)
// add [messageIndex]
messageIndexBytes := []byte{byte(2), byte(0)}
recordValue = append(recordValue, messageIndexBytes...)
// Now write the bytes from the actual value...
// valueBytes, _ := proto.Marshal(&sensorReading)
recordValue = append(recordValue, messageToSendBytes...)
msg := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.ByteEncoder(recordValue),
} }
partition, offset, error := producer.SendMessage(msg)
if error != nil {
log.Fatalln("Failed to write to kafka:", error)
}
log.Printf("Partition: ", partition)
log.Printf("Offset: ", offset)
// log.Printf(error)
// log.Printf("Pixel sent: %s", messageToSend)
} }

View File

@@ -8,6 +8,10 @@ import (
// "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka" // "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
) )
var (
syncProducer sarama.SyncProducer
asyncProducer sarama.AsyncProducer
)
func GetSyncProducerConfig(kafkaConfiguration config.KafkaConfigurations) *sarama.Config { func GetSyncProducerConfig(kafkaConfiguration config.KafkaConfigurations) *sarama.Config {
config := sarama.NewConfig() config := sarama.NewConfig()
@@ -81,6 +85,12 @@ func GetAsyncProducer(kafkaConfiguration config.KafkaConfigurations) sarama.Asyn
return producer return producer
} }
func InitializeProducers(kafkaConfiguration config.KafkaConfigurations) {
syncProducer = GetSyncProducer(kafkaConfiguration)
asyncProducer = GetAsyncProducer(kafkaConfiguration)
}
// using confluent-kafka-go // using confluent-kafka-go
// func GetProducerConfig(kafkaConfiguration config.KafkaConfigurations) *kafka.ConfigMap { // func GetProducerConfig(kafkaConfiguration config.KafkaConfigurations) *kafka.ConfigMap {
// var config = kafka.ConfigMap { // var config = kafka.ConfigMap {

View File

@@ -1,6 +1,8 @@
package server package server
import ( import (
lib "com.navi.medici.janus/lib"
"io" "io"
"io/ioutil" "io/ioutil"
"log" "log"
@@ -17,5 +19,6 @@ func eventsHandler(w http.ResponseWriter, r *http.Request) {
} }
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
lib.RequestChannel <- &lib.RequestObject{Body: body, Header: r.Header}
io.WriteString(w, "ok") io.WriteString(w, "ok")
} }

View File

@@ -6,6 +6,7 @@ import (
"sync" "sync"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/gorilla/mux"
) )
type Server struct { type Server struct {
@@ -28,16 +29,18 @@ func NewServer(port string) (*Server, error) {
return nil, errors.Wrap(err, "failed to create listener") return nil, errors.Wrap(err, "failed to create listener")
} }
mux := http.NewServeMux() // mux := http.NewServeMux()
// mux.HandleFunc("/events", eventsHandler)
mux.HandleFunc("/events", eventsHandler)
// mux.HandleFunc("/events", eventshandler).Methods("POST") // mux.HandleFunc("/events", eventshandler).Methods("POST")
// mux.HandleFunc("/health", healthHandler).Methods("GET") // mux.HandleFunc("/health", healthHandler).Methods("GET")
// mux.HandleFunc("/health/toggle", healthToggleHandler).Methods("POST") // mux.HandleFunc("/health/toggle", healthToggleHandler).Methods("POST")
// mux.HandleFunc("/refresh/schema", refreshSchemaHandler).Methods("POST") // mux.HandleFunc("/refresh/schema", refreshSchemaHandler).Methods("POST")
// mux.HandleFunc("/stop", stopHandler).Methods("POST") // mux.HandleFunc("/stop", stopHandler).Methods("POST")
httpServer := &http.Server{Addr: ":" + port, Handler: mux} router := mux.NewRouter()
router.HandleFunc("/events", eventsHandler).Methods("POST")
httpServer := &http.Server{Addr: ":" + port, Handler: router}
newServer := &Server{HttpServer: httpServer, Listener: listener} newServer := &Server{HttpServer: httpServer, Listener: listener}
return newServer, nil return newServer, nil