skeleton
This commit is contained in:
24
config/config.go
Normal file
24
config/config.go
Normal file
@@ -0,0 +1,24 @@
|
||||
package config
|
||||
|
||||
|
||||
type Configurations struct {
|
||||
Server ServerConfigurations
|
||||
Kafka KafkaConfigurations
|
||||
}
|
||||
|
||||
|
||||
type ServerConfigurations struct {
|
||||
Port string
|
||||
}
|
||||
|
||||
|
||||
type KafkaConfigurations struct {
|
||||
Bootstrap_Servers string
|
||||
Request_Timeout_Ms int
|
||||
Retry_Backoff_MS int
|
||||
Sasl_User string
|
||||
Sasl_Password string
|
||||
// SSL_Endpoint_Algorithm string
|
||||
// SASL_Mechanism string
|
||||
// Security_Protocol string
|
||||
}
|
||||
12
config/config.yml
Normal file
12
config/config.yml
Normal file
@@ -0,0 +1,12 @@
|
||||
server:
|
||||
port: 8000
|
||||
|
||||
kafka:
|
||||
bootstrap_servers: BOOTSTRAP_SERVERS # read from environment variable
|
||||
ssl_endpoint_algorithm: https
|
||||
sasl_mechanism: PLAIN
|
||||
security_protocol: SASL_SSL
|
||||
request_timeout_ms: 20000
|
||||
retry_backoff_ms: 500
|
||||
sasl_user: SASL_USER # read from environment variable
|
||||
sasl_password: SASL_PASSWORD # read from environment variable
|
||||
68
main.go
Normal file
68
main.go
Normal file
@@ -0,0 +1,68 @@
|
||||
package main
|
||||
|
||||
|
||||
import (
|
||||
config "com.navi.medici.janus/config"
|
||||
// producer_module "com.navi.medici.janus/producer"
|
||||
server "com.navi.medici.janus/server"
|
||||
|
||||
"fmt"
|
||||
"log"
|
||||
"github.com/spf13/viper"
|
||||
)
|
||||
|
||||
var (
|
||||
port string
|
||||
)
|
||||
|
||||
|
||||
func init() {
|
||||
|
||||
viper.SetConfigName("config")
|
||||
viper.AddConfigPath("./config")
|
||||
viper.SetConfigType("yml")
|
||||
viper.AutomaticEnv()
|
||||
|
||||
var configuration config.Configurations
|
||||
if err := viper.ReadInConfig(); err != nil {
|
||||
fmt.Printf("Error reading config file, %s", err)
|
||||
}
|
||||
|
||||
err := viper.Unmarshal(&configuration)
|
||||
if err != nil {
|
||||
fmt.Printf("Unable to decode into struct, %v", err)
|
||||
}
|
||||
|
||||
|
||||
configuration.Kafka.Bootstrap_Servers = viper.GetString(configuration.Kafka.Bootstrap_Servers)
|
||||
configuration.Kafka.Sasl_User = viper.GetString(configuration.Kafka.Sasl_User)
|
||||
configuration.Kafka.Sasl_Password = viper.GetString(configuration.Kafka.Sasl_Password)
|
||||
|
||||
port = configuration.Server.Port
|
||||
log.Printf("PORT IS: ", port)
|
||||
|
||||
// sync producer using sarama
|
||||
// sync_producer := producer_module.GetSyncProducer(configuration.Kafka)
|
||||
// log.Println(sync_producer)
|
||||
// producer_module.KafkaWriter(sync_producer)
|
||||
// sync_producer.Close()
|
||||
|
||||
// async producer using sarama
|
||||
// async_producer := producer_module.GetAsyncProducer(configuration.Kafka)
|
||||
// log.Println(async_producer)
|
||||
// producer_module.KafkaWriter(async_producer)
|
||||
// async_producer.Close()
|
||||
|
||||
|
||||
}
|
||||
|
||||
func main() {
|
||||
|
||||
log.Printf("Serving on http://0.0.0.0:", port)
|
||||
httpServer, err := server.NewServer(port)
|
||||
if err != nil {
|
||||
log.Fatalln("Unable to start server, ", err)
|
||||
}
|
||||
httpServer.HttpServer.Serve(httpServer.Listener)
|
||||
|
||||
}
|
||||
106
producer/producer_config.go
Normal file
106
producer/producer_config.go
Normal file
@@ -0,0 +1,106 @@
|
||||
package producer
|
||||
|
||||
import (
|
||||
config "com.navi.medici.janus/config"
|
||||
|
||||
"time"
|
||||
"github.com/Shopify/sarama"
|
||||
// "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
|
||||
)
|
||||
|
||||
|
||||
func GetSyncProducerConfig(kafkaConfiguration config.KafkaConfigurations) *sarama.Config {
|
||||
config := sarama.NewConfig()
|
||||
|
||||
// security configs
|
||||
config.Net.TLS.Enable = true
|
||||
config.Net.SASL.Enable = true
|
||||
config.Net.SASL.Handshake = true
|
||||
config.Net.SASL.Mechanism = sarama.SASLTypePlaintext
|
||||
config.Net.SASL.User = kafkaConfiguration.Sasl_User
|
||||
config.Net.SASL.Password = kafkaConfiguration.Sasl_Password
|
||||
|
||||
// producer configs
|
||||
// to be changed: read from config file
|
||||
config.Producer.Retry.Max = 3
|
||||
config.Producer.RequiredAcks = sarama.WaitForLocal
|
||||
config.Producer.Compression = sarama.CompressionGZIP
|
||||
config.Producer.Return.Successes = true
|
||||
|
||||
// metadata configs
|
||||
config.Metadata.RefreshFrequency = 1 * time.Minute
|
||||
|
||||
return config
|
||||
}
|
||||
|
||||
|
||||
func GetAsyncProducerConfig(kafkaConfiguration config.KafkaConfigurations) *sarama.Config {
|
||||
config := sarama.NewConfig()
|
||||
|
||||
// security configs
|
||||
config.Net.TLS.Enable = true
|
||||
config.Net.SASL.Enable = true
|
||||
config.Net.SASL.Handshake = true
|
||||
config.Net.SASL.Mechanism = sarama.SASLTypePlaintext
|
||||
config.Net.SASL.User = kafkaConfiguration.Sasl_User
|
||||
config.Net.SASL.Password = kafkaConfiguration.Sasl_Password
|
||||
|
||||
// producer configs
|
||||
// to be changed: read from config file
|
||||
config.Producer.Retry.Max = 3
|
||||
config.Producer.RequiredAcks = sarama.WaitForLocal
|
||||
config.Producer.Compression = sarama.CompressionGZIP
|
||||
config.Producer.Flush.Bytes = 31000
|
||||
config.Producer.Flush.Frequency = 100 * time.Millisecond
|
||||
|
||||
// metadata configs
|
||||
config.Metadata.RefreshFrequency = 1 * time.Minute
|
||||
|
||||
return config
|
||||
}
|
||||
|
||||
|
||||
func GetSyncProducer(kafkaConfiguration config.KafkaConfigurations) sarama.SyncProducer {
|
||||
config := GetSyncProducerConfig(kafkaConfiguration)
|
||||
brokerList := []string {kafkaConfiguration.Bootstrap_Servers}
|
||||
producer, err := sarama.NewSyncProducer(brokerList, config)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return producer
|
||||
}
|
||||
|
||||
|
||||
func GetAsyncProducer(kafkaConfiguration config.KafkaConfigurations) sarama.AsyncProducer {
|
||||
config := GetAsyncProducerConfig(kafkaConfiguration)
|
||||
brokerList := []string {kafkaConfiguration.Bootstrap_Servers}
|
||||
producer, err := sarama.NewAsyncProducer(brokerList, config)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return producer
|
||||
}
|
||||
|
||||
// using confluent-kafka-go
|
||||
// func GetProducerConfig(kafkaConfiguration config.KafkaConfigurations) *kafka.ConfigMap {
|
||||
// var config = kafka.ConfigMap {
|
||||
// "bootstrap.servers": kafkaConfiguration.Bootstrap_Servers,
|
||||
// // "ssl.endpoint.identification.algorithm" : kafkaConfiguration.SSL_Endpoint_Algorithm,
|
||||
// // "sasl.mechanism": kafkaConfiguration.SASL_Mechanism,
|
||||
// // "request.timeout.ms": kafkaConfiguration.Request_Timeout_Ms,
|
||||
// // "security.protocol": kafkaConfiguration.Security_Protocol,
|
||||
// // "retry.backoff.ms": kafkaConfiguration.Retry_Backoff_MS,
|
||||
// // "sasl.jaas.config": kafkaConfiguration.Sasl_JAAS_Config
|
||||
// }
|
||||
// return &config
|
||||
// }
|
||||
|
||||
|
||||
// func GetProducer(kafkaConfiguration config.KafkaConfigurations) *kafka.Producer {
|
||||
// var config = GetProducerConfig(kafkaConfiguration)
|
||||
// producer, err := kafka.NewProducer(config)
|
||||
// if err != nil {
|
||||
// panic(err)
|
||||
// }
|
||||
// return producer
|
||||
// }
|
||||
21
server/handlers.go
Normal file
21
server/handlers.go
Normal file
@@ -0,0 +1,21 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
func eventsHandler(w http.ResponseWriter, r *http.Request) {
|
||||
var reader io.Reader
|
||||
reader = r.Body
|
||||
body, err := ioutil.ReadAll(reader)
|
||||
|
||||
if err == nil {
|
||||
log.Printf(string(body))
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
io.WriteString(w, "ok")
|
||||
}
|
||||
45
server/server.go
Normal file
45
server/server.go
Normal file
@@ -0,0 +1,45 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"net"
|
||||
"net/http"
|
||||
"sync"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type Server struct {
|
||||
HttpServer *http.Server
|
||||
Listener net.Listener
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
var (
|
||||
networkTCP = "tcp"
|
||||
)
|
||||
|
||||
|
||||
func NewServer(port string) (*Server, error) {
|
||||
|
||||
network := networkTCP
|
||||
|
||||
listener, err := net.Listen(network, ":" + port)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to create listener")
|
||||
}
|
||||
|
||||
mux := http.NewServeMux()
|
||||
|
||||
mux.HandleFunc("/events", eventsHandler)
|
||||
// mux.HandleFunc("/events", eventshandler).Methods("POST")
|
||||
// mux.HandleFunc("/health", healthHandler).Methods("GET")
|
||||
// mux.HandleFunc("/health/toggle", healthToggleHandler).Methods("POST")
|
||||
// mux.HandleFunc("/refresh/schema", refreshSchemaHandler).Methods("POST")
|
||||
// mux.HandleFunc("/stop", stopHandler).Methods("POST")
|
||||
|
||||
httpServer := &http.Server{Addr: ":" + port, Handler: mux}
|
||||
newServer := &Server{HttpServer: httpServer, Listener: listener}
|
||||
return newServer, nil
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user