cleanup
This commit is contained in:
@@ -21,9 +21,6 @@ type KafkaConfigurations struct {
|
||||
Retry_Backoff_MS int
|
||||
Sasl_User string
|
||||
Sasl_Password string
|
||||
// SSL_Endpoint_Algorithm string
|
||||
// SASL_Mechanism string
|
||||
// Security_Protocol string
|
||||
}
|
||||
|
||||
type SchemaRegistryConfigurations struct {
|
||||
|
||||
@@ -5,9 +5,6 @@ server:
|
||||
|
||||
kafka:
|
||||
bootstrap_servers: BOOTSTRAP_SERVERS # read from environment variable
|
||||
ssl_endpoint_algorithm: https
|
||||
sasl_mechanism: PLAIN
|
||||
security_protocol: SASL_SSL
|
||||
request_timeout_ms: 20000
|
||||
retry_backoff_ms: 500
|
||||
sasl_user: KAFKA_SASL_USER # read from environment variable
|
||||
|
||||
@@ -30,6 +30,7 @@ func ClickstreamProtobufEventHandler(request RequestObject, topic string) {
|
||||
|
||||
messageBytes := request.Body
|
||||
|
||||
// to be of the format [magicByte] + [schemaID] + [messageIndex] + [value]
|
||||
recordValue := []byte{}
|
||||
|
||||
// add [magicByte]
|
||||
@@ -44,8 +45,7 @@ func ClickstreamProtobufEventHandler(request RequestObject, topic string) {
|
||||
messageIndexBytes := []byte{byte(2), byte(0)}
|
||||
recordValue = append(recordValue, messageIndexBytes...)
|
||||
|
||||
// Now write the bytes from the actual value...
|
||||
// valueBytes, _ := proto.Marshal(&sensorReading)
|
||||
// Now write the bytes from the actual message...
|
||||
recordValue = append(recordValue, messageBytes...)
|
||||
|
||||
message := &sarama.ProducerMessage{
|
||||
@@ -53,10 +53,6 @@ func ClickstreamProtobufEventHandler(request RequestObject, topic string) {
|
||||
Value: sarama.ByteEncoder(recordValue),
|
||||
}
|
||||
|
||||
if message != nil {
|
||||
log.Printf("WRITING TO KAFKA")
|
||||
}
|
||||
|
||||
producer_module.WriteMessageToKafkaAsync(message)
|
||||
|
||||
}
|
||||
|
||||
25
main.go
25
main.go
@@ -7,7 +7,6 @@ import (
|
||||
server "com.navi.medici.janus/server"
|
||||
producer_module "com.navi.medici.janus/producer"
|
||||
|
||||
"fmt"
|
||||
"log"
|
||||
"strings"
|
||||
"github.com/spf13/viper"
|
||||
@@ -27,14 +26,15 @@ func init() {
|
||||
|
||||
var configuration config.Configurations
|
||||
if err := viper.ReadInConfig(); err != nil {
|
||||
fmt.Printf("Error reading config file, %s", err)
|
||||
log.Fatalln("Error reading config file, %s", err)
|
||||
}
|
||||
|
||||
err := viper.Unmarshal(&configuration)
|
||||
if err != nil {
|
||||
fmt.Printf("Unable to decode into struct, %v", err)
|
||||
log.Fatalln("Unable to decode into struct, %v", err)
|
||||
}
|
||||
|
||||
// Following coinfigurations read from environment variables
|
||||
configuration.Kafka.Bootstrap_Servers = viper.GetString(configuration.Kafka.Bootstrap_Servers)
|
||||
configuration.Kafka.Sasl_User = viper.GetString(configuration.Kafka.Sasl_User)
|
||||
configuration.Kafka.Sasl_Password = viper.GetString(configuration.Kafka.Sasl_Password)
|
||||
@@ -45,31 +45,18 @@ func init() {
|
||||
port = configuration.Server.Port
|
||||
log.Printf("PORT IS: ", port)
|
||||
log.Printf(configuration.Kafka.Bootstrap_Servers)
|
||||
log.Printf(configuration.Kafka.Sasl_User)
|
||||
log.Printf(configuration.Kafka.Sasl_Password)
|
||||
log.Printf(configuration.SchemaRegistry.Endpoint)
|
||||
|
||||
// initialize schema version map which contains latest schema version for topic(s)
|
||||
producer_module.GetSchemaVersions(configuration.SchemaRegistry.Endpoint, strings.Split(configuration.SchemaRegistry.Topics, ","))
|
||||
|
||||
// initialize producers
|
||||
producer_module.InitializeProducers(configuration.Kafka)
|
||||
|
||||
// TO DO: read number of goroutines from config
|
||||
for i := 0; i < configuration.Server.Goroutines; i++ {
|
||||
go lib.ProcessRequestChannel(configuration.Server.Kafka_Topic)
|
||||
}
|
||||
|
||||
// sync producer using sarama
|
||||
// sync_producer := producer_module.GetSyncProducer(configuration.Kafka)
|
||||
// log.Println(sync_producer)
|
||||
// producer_module.KafkaWriter(sync_producer, topic, messageToSendBytes)
|
||||
// sync_producer.Close()
|
||||
|
||||
// async producer using sarama
|
||||
// async_producer := producer_module.GetAsyncProducer(configuration.Kafka)
|
||||
// log.Println(async_producer)
|
||||
// producer_module.KafkaWriter(async_producer)
|
||||
// async_producer.Close()
|
||||
|
||||
|
||||
}
|
||||
|
||||
func main() {
|
||||
|
||||
@@ -27,7 +27,7 @@ func eventsHandler(w http.ResponseWriter, r *http.Request) {
|
||||
io.WriteString(w, "ok")
|
||||
}
|
||||
|
||||
func healthHandler(w http.ResponseWriter, r *http.Request){
|
||||
func healthHandler(w http.ResponseWriter, r *http.Request) {
|
||||
if healthyBool {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
io.WriteString(w, "true")
|
||||
@@ -37,7 +37,7 @@ func healthHandler(w http.ResponseWriter, r *http.Request){
|
||||
}
|
||||
}
|
||||
|
||||
func healthToggleHandler(w http.ResponseWriter, r *http.Request){
|
||||
func healthToggleHandler(w http.ResponseWriter, r *http.Request) {
|
||||
healthyBool = !healthyBool
|
||||
io.WriteString(w, "toggled")
|
||||
}
|
||||
|
||||
@@ -29,18 +29,12 @@ func NewServer(port string) (*Server, error) {
|
||||
return nil, errors.Wrap(err, "failed to create listener")
|
||||
}
|
||||
|
||||
// mux := http.NewServeMux()
|
||||
// mux.HandleFunc("/events", eventsHandler)
|
||||
// mux.HandleFunc("/events", eventshandler).Methods("POST")
|
||||
// mux.HandleFunc("/health", healthHandler).Methods("GET")
|
||||
// mux.HandleFunc("/health/toggle", healthToggleHandler).Methods("POST")
|
||||
// mux.HandleFunc("/refresh/schema", refreshSchemaHandler).Methods("POST")
|
||||
// mux.HandleFunc("/stop", stopHandler).Methods("POST")
|
||||
|
||||
router := mux.NewRouter()
|
||||
router.HandleFunc("/events", eventsHandler).Methods("POST")
|
||||
router.HandleFunc("/health", healthHandler).Methods("GET")
|
||||
router.HandleFunc("/health/toggle", healthToggleHandler).Methods("GET")
|
||||
// router.HandleFunc("/schema/refresh", schemaRefreshHandler).Methods("POST")
|
||||
// router.HandleFunc("/stop", stopHandler).Methods("POST")
|
||||
|
||||
httpServer := &http.Server{Addr: ":" + port, Handler: router}
|
||||
newServer := &Server{HttpServer: httpServer, Listener: listener}
|
||||
|
||||
Reference in New Issue
Block a user