From 28fb7fc6654c895b8b7258b0d22d869527c0387b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cnishant-sharma=E2=80=9D?= Date: Thu, 25 Mar 2021 13:24:08 +0530 Subject: [PATCH] skeleton --- config/config.go | 24 ++++++++ config/config.yml | 12 ++++ main.go | 68 +++++++++++++++++++++++ producer/producer_config.go | 106 ++++++++++++++++++++++++++++++++++++ server/handlers.go | 21 +++++++ server/server.go | 45 +++++++++++++++ 6 files changed, 276 insertions(+) create mode 100644 config/config.go create mode 100644 config/config.yml create mode 100644 main.go create mode 100644 producer/producer_config.go create mode 100644 server/handlers.go create mode 100644 server/server.go diff --git a/config/config.go b/config/config.go new file mode 100644 index 0000000..99b8a94 --- /dev/null +++ b/config/config.go @@ -0,0 +1,24 @@ +package config + + +type Configurations struct { + Server ServerConfigurations + Kafka KafkaConfigurations +} + + +type ServerConfigurations struct { + Port string +} + + +type KafkaConfigurations struct { + Bootstrap_Servers string + Request_Timeout_Ms int + Retry_Backoff_MS int + Sasl_User string + Sasl_Password string + // SSL_Endpoint_Algorithm string + // SASL_Mechanism string + // Security_Protocol string +} diff --git a/config/config.yml b/config/config.yml new file mode 100644 index 0000000..b414060 --- /dev/null +++ b/config/config.yml @@ -0,0 +1,12 @@ +server: + port: 8000 + +kafka: + bootstrap_servers: BOOTSTRAP_SERVERS # read from environment variable + ssl_endpoint_algorithm: https + sasl_mechanism: PLAIN + security_protocol: SASL_SSL + request_timeout_ms: 20000 + retry_backoff_ms: 500 + sasl_user: SASL_USER # read from environment variable + sasl_password: SASL_PASSWORD # read from environment variable diff --git a/main.go b/main.go new file mode 100644 index 0000000..c08eaa6 --- /dev/null +++ b/main.go @@ -0,0 +1,68 @@ +package main + + +import ( + config "com.navi.medici.janus/config" + // producer_module "com.navi.medici.janus/producer" + server "com.navi.medici.janus/server" + + "fmt" + "log" + "github.com/spf13/viper" +) + +var ( + port string +) + + +func init() { + + viper.SetConfigName("config") + viper.AddConfigPath("./config") + viper.SetConfigType("yml") + viper.AutomaticEnv() + + var configuration config.Configurations + if err := viper.ReadInConfig(); err != nil { + fmt.Printf("Error reading config file, %s", err) + } + + err := viper.Unmarshal(&configuration) + if err != nil { + fmt.Printf("Unable to decode into struct, %v", err) + } + + + configuration.Kafka.Bootstrap_Servers = viper.GetString(configuration.Kafka.Bootstrap_Servers) + configuration.Kafka.Sasl_User = viper.GetString(configuration.Kafka.Sasl_User) + configuration.Kafka.Sasl_Password = viper.GetString(configuration.Kafka.Sasl_Password) + + port = configuration.Server.Port + log.Printf("PORT IS: ", port) + + // sync producer using sarama + // sync_producer := producer_module.GetSyncProducer(configuration.Kafka) + // log.Println(sync_producer) + // producer_module.KafkaWriter(sync_producer) + // sync_producer.Close() + + // async producer using sarama + // async_producer := producer_module.GetAsyncProducer(configuration.Kafka) + // log.Println(async_producer) + // producer_module.KafkaWriter(async_producer) + // async_producer.Close() + + +} + +func main() { + + log.Printf("Serving on http://0.0.0.0:", port) + httpServer, err := server.NewServer(port) + if err != nil { + log.Fatalln("Unable to start server, ", err) + } + httpServer.HttpServer.Serve(httpServer.Listener) + +} diff --git a/producer/producer_config.go b/producer/producer_config.go new file mode 100644 index 0000000..95f30e0 --- /dev/null +++ b/producer/producer_config.go @@ -0,0 +1,106 @@ +package producer + +import ( + config "com.navi.medici.janus/config" + + "time" + "github.com/Shopify/sarama" + // "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka" +) + + +func GetSyncProducerConfig(kafkaConfiguration config.KafkaConfigurations) *sarama.Config { + config := sarama.NewConfig() + + // security configs + config.Net.TLS.Enable = true + config.Net.SASL.Enable = true + config.Net.SASL.Handshake = true + config.Net.SASL.Mechanism = sarama.SASLTypePlaintext + config.Net.SASL.User = kafkaConfiguration.Sasl_User + config.Net.SASL.Password = kafkaConfiguration.Sasl_Password + + // producer configs + // to be changed: read from config file + config.Producer.Retry.Max = 3 + config.Producer.RequiredAcks = sarama.WaitForLocal + config.Producer.Compression = sarama.CompressionGZIP + config.Producer.Return.Successes = true + + // metadata configs + config.Metadata.RefreshFrequency = 1 * time.Minute + + return config +} + + +func GetAsyncProducerConfig(kafkaConfiguration config.KafkaConfigurations) *sarama.Config { + config := sarama.NewConfig() + + // security configs + config.Net.TLS.Enable = true + config.Net.SASL.Enable = true + config.Net.SASL.Handshake = true + config.Net.SASL.Mechanism = sarama.SASLTypePlaintext + config.Net.SASL.User = kafkaConfiguration.Sasl_User + config.Net.SASL.Password = kafkaConfiguration.Sasl_Password + + // producer configs + // to be changed: read from config file + config.Producer.Retry.Max = 3 + config.Producer.RequiredAcks = sarama.WaitForLocal + config.Producer.Compression = sarama.CompressionGZIP + config.Producer.Flush.Bytes = 31000 + config.Producer.Flush.Frequency = 100 * time.Millisecond + + // metadata configs + config.Metadata.RefreshFrequency = 1 * time.Minute + + return config +} + + +func GetSyncProducer(kafkaConfiguration config.KafkaConfigurations) sarama.SyncProducer { + config := GetSyncProducerConfig(kafkaConfiguration) + brokerList := []string {kafkaConfiguration.Bootstrap_Servers} + producer, err := sarama.NewSyncProducer(brokerList, config) + if err != nil { + panic(err) + } + return producer +} + + +func GetAsyncProducer(kafkaConfiguration config.KafkaConfigurations) sarama.AsyncProducer { + config := GetAsyncProducerConfig(kafkaConfiguration) + brokerList := []string {kafkaConfiguration.Bootstrap_Servers} + producer, err := sarama.NewAsyncProducer(brokerList, config) + if err != nil { + panic(err) + } + return producer +} + +// using confluent-kafka-go +// func GetProducerConfig(kafkaConfiguration config.KafkaConfigurations) *kafka.ConfigMap { +// var config = kafka.ConfigMap { +// "bootstrap.servers": kafkaConfiguration.Bootstrap_Servers, +// // "ssl.endpoint.identification.algorithm" : kafkaConfiguration.SSL_Endpoint_Algorithm, +// // "sasl.mechanism": kafkaConfiguration.SASL_Mechanism, +// // "request.timeout.ms": kafkaConfiguration.Request_Timeout_Ms, +// // "security.protocol": kafkaConfiguration.Security_Protocol, +// // "retry.backoff.ms": kafkaConfiguration.Retry_Backoff_MS, +// // "sasl.jaas.config": kafkaConfiguration.Sasl_JAAS_Config +// } +// return &config +// } + + +// func GetProducer(kafkaConfiguration config.KafkaConfigurations) *kafka.Producer { +// var config = GetProducerConfig(kafkaConfiguration) +// producer, err := kafka.NewProducer(config) +// if err != nil { +// panic(err) +// } +// return producer +// } diff --git a/server/handlers.go b/server/handlers.go new file mode 100644 index 0000000..9e9fd56 --- /dev/null +++ b/server/handlers.go @@ -0,0 +1,21 @@ +package server + +import ( + "io" + "io/ioutil" + "log" + "net/http" +) + +func eventsHandler(w http.ResponseWriter, r *http.Request) { + var reader io.Reader + reader = r.Body + body, err := ioutil.ReadAll(reader) + + if err == nil { + log.Printf(string(body)) + } + + w.Header().Set("Content-Type", "application/json") + io.WriteString(w, "ok") +} diff --git a/server/server.go b/server/server.go new file mode 100644 index 0000000..815cbf1 --- /dev/null +++ b/server/server.go @@ -0,0 +1,45 @@ +package server + +import ( + "net" + "net/http" + "sync" + + "github.com/pkg/errors" +) + +type Server struct { + HttpServer *http.Server + Listener net.Listener + wg sync.WaitGroup +} + +var ( + networkTCP = "tcp" +) + + +func NewServer(port string) (*Server, error) { + + network := networkTCP + + listener, err := net.Listen(network, ":" + port) + if err != nil { + return nil, errors.Wrap(err, "failed to create listener") + } + + mux := http.NewServeMux() + + mux.HandleFunc("/events", eventsHandler) + // mux.HandleFunc("/events", eventshandler).Methods("POST") + // mux.HandleFunc("/health", healthHandler).Methods("GET") + // mux.HandleFunc("/health/toggle", healthToggleHandler).Methods("POST") + // mux.HandleFunc("/refresh/schema", refreshSchemaHandler).Methods("POST") + // mux.HandleFunc("/stop", stopHandler).Methods("POST") + + httpServer := &http.Server{Addr: ":" + port, Handler: mux} + newServer := &Server{HttpServer: httpServer, Listener: listener} + return newServer, nil + +} +