From 0fbd842e379b949fc3509d21e7695fbf131a41fb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cnishant-sharma=E2=80=9D?= Date: Thu, 1 Apr 2021 12:56:04 +0530 Subject: [PATCH] cleanup --- config/config.go | 3 --- config/config.yml | 3 --- lib/RequestHandler.go | 8 ++------ main.go | 25 ++++++------------------- server/handlers.go | 4 ++-- server/server.go | 10 ++-------- 6 files changed, 12 insertions(+), 41 deletions(-) diff --git a/config/config.go b/config/config.go index d198f7f..cbbd58c 100644 --- a/config/config.go +++ b/config/config.go @@ -21,9 +21,6 @@ type KafkaConfigurations struct { Retry_Backoff_MS int Sasl_User string Sasl_Password string - // SSL_Endpoint_Algorithm string - // SASL_Mechanism string - // Security_Protocol string } type SchemaRegistryConfigurations struct { diff --git a/config/config.yml b/config/config.yml index 4b202a0..ced4400 100644 --- a/config/config.yml +++ b/config/config.yml @@ -5,9 +5,6 @@ server: kafka: bootstrap_servers: BOOTSTRAP_SERVERS # read from environment variable - ssl_endpoint_algorithm: https - sasl_mechanism: PLAIN - security_protocol: SASL_SSL request_timeout_ms: 20000 retry_backoff_ms: 500 sasl_user: KAFKA_SASL_USER # read from environment variable diff --git a/lib/RequestHandler.go b/lib/RequestHandler.go index dabcbd7..f4ec99b 100644 --- a/lib/RequestHandler.go +++ b/lib/RequestHandler.go @@ -30,6 +30,7 @@ func ClickstreamProtobufEventHandler(request RequestObject, topic string) { messageBytes := request.Body + // to be of the format [magicByte] + [schemaID] + [messageIndex] + [value] recordValue := []byte{} // add [magicByte] @@ -44,8 +45,7 @@ func ClickstreamProtobufEventHandler(request RequestObject, topic string) { messageIndexBytes := []byte{byte(2), byte(0)} recordValue = append(recordValue, messageIndexBytes...) - // Now write the bytes from the actual value... - // valueBytes, _ := proto.Marshal(&sensorReading) + // Now write the bytes from the actual message... recordValue = append(recordValue, messageBytes...) message := &sarama.ProducerMessage{ @@ -53,10 +53,6 @@ func ClickstreamProtobufEventHandler(request RequestObject, topic string) { Value: sarama.ByteEncoder(recordValue), } - if message != nil { - log.Printf("WRITING TO KAFKA") - } - producer_module.WriteMessageToKafkaAsync(message) } diff --git a/main.go b/main.go index 8d0004d..1c6ff61 100644 --- a/main.go +++ b/main.go @@ -7,7 +7,6 @@ import ( server "com.navi.medici.janus/server" producer_module "com.navi.medici.janus/producer" - "fmt" "log" "strings" "github.com/spf13/viper" @@ -27,14 +26,15 @@ func init() { var configuration config.Configurations if err := viper.ReadInConfig(); err != nil { - fmt.Printf("Error reading config file, %s", err) + log.Fatalln("Error reading config file, %s", err) } err := viper.Unmarshal(&configuration) if err != nil { - fmt.Printf("Unable to decode into struct, %v", err) + log.Fatalln("Unable to decode into struct, %v", err) } + // Following coinfigurations read from environment variables configuration.Kafka.Bootstrap_Servers = viper.GetString(configuration.Kafka.Bootstrap_Servers) configuration.Kafka.Sasl_User = viper.GetString(configuration.Kafka.Sasl_User) configuration.Kafka.Sasl_Password = viper.GetString(configuration.Kafka.Sasl_Password) @@ -45,31 +45,18 @@ func init() { port = configuration.Server.Port log.Printf("PORT IS: ", port) log.Printf(configuration.Kafka.Bootstrap_Servers) - log.Printf(configuration.Kafka.Sasl_User) - log.Printf(configuration.Kafka.Sasl_Password) log.Printf(configuration.SchemaRegistry.Endpoint) + // initialize schema version map which contains latest schema version for topic(s) producer_module.GetSchemaVersions(configuration.SchemaRegistry.Endpoint, strings.Split(configuration.SchemaRegistry.Topics, ",")) + + // initialize producers producer_module.InitializeProducers(configuration.Kafka) - // TO DO: read number of goroutines from config for i := 0; i < configuration.Server.Goroutines; i++ { go lib.ProcessRequestChannel(configuration.Server.Kafka_Topic) } - // sync producer using sarama - // sync_producer := producer_module.GetSyncProducer(configuration.Kafka) - // log.Println(sync_producer) - // producer_module.KafkaWriter(sync_producer, topic, messageToSendBytes) - // sync_producer.Close() - - // async producer using sarama - // async_producer := producer_module.GetAsyncProducer(configuration.Kafka) - // log.Println(async_producer) - // producer_module.KafkaWriter(async_producer) - // async_producer.Close() - - } func main() { diff --git a/server/handlers.go b/server/handlers.go index 4e06d04..48517ae 100644 --- a/server/handlers.go +++ b/server/handlers.go @@ -27,7 +27,7 @@ func eventsHandler(w http.ResponseWriter, r *http.Request) { io.WriteString(w, "ok") } -func healthHandler(w http.ResponseWriter, r *http.Request){ +func healthHandler(w http.ResponseWriter, r *http.Request) { if healthyBool { w.Header().Set("Content-Type", "application/json") io.WriteString(w, "true") @@ -37,7 +37,7 @@ func healthHandler(w http.ResponseWriter, r *http.Request){ } } -func healthToggleHandler(w http.ResponseWriter, r *http.Request){ +func healthToggleHandler(w http.ResponseWriter, r *http.Request) { healthyBool = !healthyBool io.WriteString(w, "toggled") } diff --git a/server/server.go b/server/server.go index 041231c..1a4f052 100644 --- a/server/server.go +++ b/server/server.go @@ -29,18 +29,12 @@ func NewServer(port string) (*Server, error) { return nil, errors.Wrap(err, "failed to create listener") } - // mux := http.NewServeMux() - // mux.HandleFunc("/events", eventsHandler) - // mux.HandleFunc("/events", eventshandler).Methods("POST") - // mux.HandleFunc("/health", healthHandler).Methods("GET") - // mux.HandleFunc("/health/toggle", healthToggleHandler).Methods("POST") - // mux.HandleFunc("/refresh/schema", refreshSchemaHandler).Methods("POST") - // mux.HandleFunc("/stop", stopHandler).Methods("POST") - router := mux.NewRouter() router.HandleFunc("/events", eventsHandler).Methods("POST") router.HandleFunc("/health", healthHandler).Methods("GET") router.HandleFunc("/health/toggle", healthToggleHandler).Methods("GET") + // router.HandleFunc("/schema/refresh", schemaRefreshHandler).Methods("POST") + // router.HandleFunc("/stop", stopHandler).Methods("POST") httpServer := &http.Server{Addr: ":" + port, Handler: router} newServer := &Server{HttpServer: httpServer, Listener: listener}