From 5acd0d552e76fb11fa3d39268fbb98c8cbef2fce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cnishant-sharma=E2=80=9D?= Date: Tue, 30 Mar 2021 16:03:09 +0530 Subject: [PATCH] write to kafka using request handler channel --- main.go | 11 +++++++-- producer/get_schema.go | 6 ++--- producer/kafka_writer.go | 45 +++++-------------------------------- producer/producer_config.go | 10 +++++++++ server/handlers.go | 3 +++ server/server.go | 11 +++++---- 6 files changed, 38 insertions(+), 48 deletions(-) diff --git a/main.go b/main.go index 02d9d3a..db44155 100644 --- a/main.go +++ b/main.go @@ -2,9 +2,10 @@ package main import ( + lib "com.navi.medici.janus/lib" config "com.navi.medici.janus/config" - producer_module "com.navi.medici.janus/producer" server "com.navi.medici.janus/server" + producer_module "com.navi.medici.janus/producer" "fmt" "log" @@ -48,11 +49,17 @@ func init() { log.Printf(configuration.SchemaRegistry.Endpoint) producer_module.GetSchemaVersions(configuration.SchemaRegistry.Endpoint, strings.Split(configuration.SchemaRegistry.Topics, ",")) + producer_module.InitializeProducers(configuration.Kafka) + + // TO DO: read number of goroutines from config + for i := 0; i < 10; i++ { + go lib.ProcessRequestChannel() + } // sync producer using sarama // sync_producer := producer_module.GetSyncProducer(configuration.Kafka) // log.Println(sync_producer) - // producer_module.KafkaWriter(sync_producer) + // producer_module.KafkaWriter(sync_producer, topic, messageToSendBytes) // sync_producer.Close() // async producer using sarama diff --git a/producer/get_schema.go b/producer/get_schema.go index f15eddb..cbe9b2e 100644 --- a/producer/get_schema.go +++ b/producer/get_schema.go @@ -8,7 +8,7 @@ import ( var ( - schemaVersionMap = make(map[string]int) + SchemaVersionMap = make(map[string]int) ) @@ -22,10 +22,10 @@ func GetSchemaVersions(schemaRegistryEndpoint string, topicList []string) { log.Println(err) } else { schemaId := schema.ID() - schemaVersionMap[topic] = schemaId + SchemaVersionMap[topic] = schemaId } } - log.Println(schemaVersionMap) + log.Println(SchemaVersionMap) } \ No newline at end of file diff --git a/producer/kafka_writer.go b/producer/kafka_writer.go index 8d10d08..36a5842 100644 --- a/producer/kafka_writer.go +++ b/producer/kafka_writer.go @@ -1,49 +1,16 @@ package producer import ( - // "fmt" "log" - // "io/ioutil" - "encoding/binary" - - // data "com.navi.medici.janus/data" "github.com/Shopify/sarama" ) -func KafkaWriter(producer sarama.SyncProducer, topic string, messageToSendBytes []byte) { - - // to be of the format [magicByte] + [schemaID] + [messageIndex] + [value] - recordValue := []byte{} - - // add [magicByte] - recordValue = append(recordValue, byte(0)) - - // add schemaID] - schemaIDBytes := make([]byte, 4) - binary.BigEndian.PutUint32(schemaIDBytes, uint32(schemaVersionMap[topic])) - recordValue = append(recordValue, schemaIDBytes...) - - // add [messageIndex] - messageIndexBytes := []byte{byte(2), byte(0)} - recordValue = append(recordValue, messageIndexBytes...) - - // Now write the bytes from the actual value... - // valueBytes, _ := proto.Marshal(&sensorReading) - recordValue = append(recordValue, messageToSendBytes...) - - msg := &sarama.ProducerMessage{ - Topic: topic, - Value: sarama.ByteEncoder(recordValue), - } - - partition, offset, error := producer.SendMessage(msg) - if error != nil { - log.Fatalln("Failed to write to kafka:", error) - } - log.Printf("Partition: ", partition) - log.Printf("Offset: ", offset) - // log.Printf(error) - // log.Printf("Pixel sent: %s", messageToSend) +func WriteMessageToKafkaAsync(message *sarama.ProducerMessage) { + select { + case asyncProducer.Input() <- message: + case err := <- asyncProducer.Errors(): + log.Printf("FAILED TO WRITE TO KAFKA", err) + } } diff --git a/producer/producer_config.go b/producer/producer_config.go index 95f30e0..76d25a1 100644 --- a/producer/producer_config.go +++ b/producer/producer_config.go @@ -8,6 +8,10 @@ import ( // "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka" ) +var ( + syncProducer sarama.SyncProducer + asyncProducer sarama.AsyncProducer +) func GetSyncProducerConfig(kafkaConfiguration config.KafkaConfigurations) *sarama.Config { config := sarama.NewConfig() @@ -81,6 +85,12 @@ func GetAsyncProducer(kafkaConfiguration config.KafkaConfigurations) sarama.Asyn return producer } + +func InitializeProducers(kafkaConfiguration config.KafkaConfigurations) { + syncProducer = GetSyncProducer(kafkaConfiguration) + asyncProducer = GetAsyncProducer(kafkaConfiguration) +} + // using confluent-kafka-go // func GetProducerConfig(kafkaConfiguration config.KafkaConfigurations) *kafka.ConfigMap { // var config = kafka.ConfigMap { diff --git a/server/handlers.go b/server/handlers.go index 9e9fd56..f0d89c2 100644 --- a/server/handlers.go +++ b/server/handlers.go @@ -1,6 +1,8 @@ package server import ( + lib "com.navi.medici.janus/lib" + "io" "io/ioutil" "log" @@ -17,5 +19,6 @@ func eventsHandler(w http.ResponseWriter, r *http.Request) { } w.Header().Set("Content-Type", "application/json") + lib.RequestChannel <- &lib.RequestObject{Body: body, Header: r.Header} io.WriteString(w, "ok") } diff --git a/server/server.go b/server/server.go index 815cbf1..f00e577 100644 --- a/server/server.go +++ b/server/server.go @@ -6,6 +6,7 @@ import ( "sync" "github.com/pkg/errors" + "github.com/gorilla/mux" ) type Server struct { @@ -28,16 +29,18 @@ func NewServer(port string) (*Server, error) { return nil, errors.Wrap(err, "failed to create listener") } - mux := http.NewServeMux() - - mux.HandleFunc("/events", eventsHandler) + // mux := http.NewServeMux() + // mux.HandleFunc("/events", eventsHandler) // mux.HandleFunc("/events", eventshandler).Methods("POST") // mux.HandleFunc("/health", healthHandler).Methods("GET") // mux.HandleFunc("/health/toggle", healthToggleHandler).Methods("POST") // mux.HandleFunc("/refresh/schema", refreshSchemaHandler).Methods("POST") // mux.HandleFunc("/stop", stopHandler).Methods("POST") - httpServer := &http.Server{Addr: ":" + port, Handler: mux} + router := mux.NewRouter() + router.HandleFunc("/events", eventsHandler).Methods("POST") + + httpServer := &http.Server{Addr: ":" + port, Handler: router} newServer := &Server{HttpServer: httpServer, Listener: listener} return newServer, nil