From 7981dedde434336ed360a7d386a005c4071bc6b3 Mon Sep 17 00:00:00 2001 From: puru Date: Fri, 23 Aug 2024 15:24:00 +0530 Subject: [PATCH] DE-3247 | refactored code and added logs --- go.mod | 4 +- go.sum | 23 ++++---- lib/RequestHandler.go | 89 ++++++++++++++++++++----------- main.go | 91 ++++++++++++++++++++----------- producer/kafka_writer.go | 42 +++++++-------- producer/producer_client.go | 103 ++++++++++++++++++------------------ server/handlers.go | 98 +++++++++++++++++++--------------- server/server.go | 66 +++++++++++++++++------ 8 files changed, 311 insertions(+), 205 deletions(-) diff --git a/go.mod b/go.mod index d302d29..1c5dd66 100644 --- a/go.mod +++ b/go.mod @@ -8,9 +8,11 @@ require ( github.com/magiconair/properties v1.8.7 // indirect github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.11.1 - github.com/riferrei/srclient v0.2.1 github.com/rs/cors v1.8.2 + github.com/sony/gobreaker v1.0.0 github.com/spf13/viper v1.7.1 github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c go.uber.org/zap v1.10.0 + golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 + gopkg.in/yaml.v3 v3.0.1 ) diff --git a/go.sum b/go.sum index e13dde9..6e470ba 100644 --- a/go.sum +++ b/go.sum @@ -89,7 +89,6 @@ github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvq github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.4.3 h1:JjCZWpVbqXDqFVmTfYWEVTMIYrL/NPdPSCHPJ0T/raM= github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= -github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.2 h1:aeE13tS0IiQgFjYdoL8qN3K1N2bXXtI6Vi51/y7BpMw= github.com/golang/snappy v0.0.2/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= @@ -177,8 +176,6 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/linkedin/goavro/v2 v2.9.7 h1:Vd++Rb/RKcmNJjM0HP/JJFMEWa21eUBVKPYlKehOGrM= -github.com/linkedin/goavro/v2 v2.9.7/go.mod h1:UgQUb2N/pmueQYH9bfqFioWxzYCZXSfF8Jw03O5sjqA= github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY= github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0= @@ -239,8 +236,6 @@ github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1 github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= -github.com/riferrei/srclient v0.2.1 h1:uIJhzPXW+suDsEDOZKf4oTZZXTyxtw98cFC70rFzvgU= -github.com/riferrei/srclient v0.2.1/go.mod h1:SmCz0lrYQ1pLqXlYq0yPnRccHLGh+llDA0i6hecPeW8= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rs/cors v1.8.2 h1:KCooALfAYGs415Cwu5ABvv9n9509fSiG5SQJn/AQo4U= @@ -255,6 +250,8 @@ github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1 github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s= github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM= +github.com/sony/gobreaker v1.0.0 h1:feX5fGGXSl3dYd4aHZItw+FpHLvvoaqkawKjVNiFMNQ= +github.com/sony/gobreaker v1.0.0/go.mod h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJOjmxWY= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spf13/afero v1.1.2 h1:m8/z1t7/fwjysjQRYbP0RD+bUIF/8tJwPdEZsI83ACI= github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ= @@ -268,12 +265,20 @@ github.com/spf13/viper v1.7.1 h1:pM5oEahlgWv/WnHXpgbKz7iLIxRf65tye2Ci+XFK5sk= github.com/spf13/viper v1.7.1/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= @@ -346,8 +351,6 @@ golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20201207232520-09787c993a3a h1:DcqTD9SDLc+1P/r1EmRBwnVsrOwW+kk2vWf9n+1sGhs= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -381,6 +384,7 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.5 h1:i6eZZ+zk0SOf0xgBpEpPD18qWcJda6q1sxt3S0kzyUQ= golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 h1:SvFZT6jyqRaOeXpc5h/JSfZenJ2O330aBsf7JfSUXmQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= @@ -450,8 +454,9 @@ gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/lib/RequestHandler.go b/lib/RequestHandler.go index 41df64e..5a50ae4 100644 --- a/lib/RequestHandler.go +++ b/lib/RequestHandler.go @@ -1,19 +1,17 @@ package lib import ( - metrics "com.navi.medici.janus/instrumentation" - producer_module "com.navi.medici.janus/producer" - "com.navi.medici.janus/utils" + "context" "encoding/json" "net/http" "time" - "github.com/Shopify/sarama" -) + metrics "com.navi.medici.janus/instrumentation" + producer_module "com.navi.medici.janus/producer" + "com.navi.medici.janus/utils" -var ( - ProtobufRequestChannel = make(chan *RequestObject) - JsonRequestChannel = make(chan *RequestObject) + "github.com/Shopify/sarama" + "go.uber.org/zap" ) type RequestObject struct { @@ -21,45 +19,72 @@ type RequestObject struct { Header http.Header } -func ProcessJsonRequestChannel(topic string) { - for { - request := <-JsonRequestChannel - ClickstreamJsonEventHandler(*request, topic) +type WorkerPool struct { + workers int + jobQueue chan RequestObject + logger *zap.Logger +} + +func NewWorkerPool(workers int) *WorkerPool { + return &WorkerPool{ + workers: workers, + jobQueue: make(chan RequestObject, workers), + logger: utils.GetLogger(), } } -func ClickstreamJsonEventHandler(request RequestObject, topic string) { +func (wp *WorkerPool) Start(ctx context.Context) { + for i := 0; i < wp.workers; i++ { + go wp.worker(ctx) + } +} + +func (wp *WorkerPool) worker(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case job := <-wp.jobQueue: + wp.processRequest(job) + } + } +} + +func (wp *WorkerPool) AddJob(job RequestObject) { + wp.jobQueue <- job +} + +func (wp *WorkerPool) processRequest(request RequestObject) { + defer func() { + if r := recover(); r != nil { + wp.logger.Error("Recovered from panic in processRequest", zap.Any("panic", r)) + } + }() + eventProcessingStartTime := utils.NanosToMillis(time.Now().UnixNano()) messageBytes := request.Body - //getting the client which has sent this event var result map[string]interface{} - json.Unmarshal(messageBytes, &result) + if err := json.Unmarshal(messageBytes, &result); err != nil { + wp.logger.Error("Failed to unmarshal JSON", zap.Error(err)) + return + } + source := getSource(result) - // to be of the format [magicByte] + [schemaID] + [messageIndex] + [value] - recordValue := []byte{} - - // add [magicByte] - // recordValue = append(recordValue, byte(0)) - - // // add schemaID] - // schemaIDBytes := make([]byte, 4) - // binary.BigEndian.PutUint32(schemaIDBytes, uint32(schema.SchemaVersionMap[topic])) - // recordValue = append(recordValue, schemaIDBytes...) - - // Now write the bytes from the actual message... - recordValue = append(recordValue, messageBytes...) + recordValue := messageBytes message := &sarama.ProducerMessage{ - Topic: topic, + Topic: producer_module.GetKafkaTopic(), Value: sarama.ByteEncoder(recordValue), } metrics.RequestCounter.WithLabelValues(source).Inc() - // processing complete, record duration metrics - metrics.EventProcessingTimeHist.WithLabelValues(topic, source).Observe(float64(utils.NanosToMillis(time.Now().UnixNano()) - eventProcessingStartTime)) - producer_module.WriteMessageToKafkaAsync(message, source) + metrics.EventProcessingTimeHist.WithLabelValues(producer_module.GetKafkaTopic(), source).Observe(float64(utils.NanosToMillis(time.Now().UnixNano()) - eventProcessingStartTime)) + + if err := producer_module.WriteMessageToKafkaAsync(message, source); err != nil { + wp.logger.Error("Failed to write message to Kafka", zap.Error(err), zap.String("source", source)) + } } func getSource(event map[string]interface{}) string { diff --git a/main.go b/main.go index 6813780..351c284 100644 --- a/main.go +++ b/main.go @@ -1,6 +1,12 @@ package main import ( + "context" + "log" + "os" + "os/signal" + "syscall" + "com.navi.medici.janus/config" "com.navi.medici.janus/lib" producer_client "com.navi.medici.janus/producer" @@ -8,7 +14,6 @@ import ( "com.navi.medici.janus/utils" "github.com/spf13/viper" "go.uber.org/zap" - "log" ) var ( @@ -24,13 +29,10 @@ func init() { utils.InitializeLogger(configs.Env) logger = utils.GetLogger() port = configs.Server.Port - logger.Debug("Service started on PORT: " + port) - logger.Debug("Kafka Bootstrap Servers: " + configs.Kafka.Bootstrap_Servers) + logger.Info("Service started on PORT: " + port) + logger.Info("Kafka Bootstrap Servers: " + configs.Kafka.Bootstrap_Servers) producer_client.InitializeProducers(configs.Kafka, configs.Env) - logger.Info("Producer Initialized, starting goroutines for event processing") - for i := 0; i < configs.Server.Goroutines; i++ { - go lib.ProcessJsonRequestChannel(configs.Kafka.Kafka_Topic_Json) - } + logger.Info("Producer Initialized") } func getConfigs() config.Configurations { @@ -41,40 +43,67 @@ func getConfigs() config.Configurations { var configuration config.Configurations if err := viper.ReadInConfig(); err != nil { - log.Fatalln("Error reading config file, %s", err) + log.Fatalf("Error reading config file: %s", err) } err := viper.Unmarshal(&configuration) if err != nil { - log.Fatalln("Unable to decode into struct, %v", err) + log.Fatalf("Unable to decode into struct: %v", err) } - // Following coinfigurations read from environment variables - configuration.Env = viper.GetString(configuration.Env) - configuration.Kafka.Bootstrap_Servers = viper.GetString(configuration.Kafka.Bootstrap_Servers) - configuration.Kafka.Kafka_Topic_Json = viper.GetString(configuration.Kafka.Kafka_Topic_Json) - configuration.Kafka.Kafka_Topic_Protobuf = viper.GetString(configuration.Kafka.Kafka_Topic_Protobuf) + // Read configurations from environment variables + configuration.Env = viper.GetString("ENVIRONMENT") + configuration.Kafka.Bootstrap_Servers = viper.GetString("BOOTSTRAP_SERVERS") + configuration.Kafka.Kafka_Topic_Json = viper.GetString("KAFKA_TOPIC_JSON") + configuration.Kafka.Kafka_Topic_Protobuf = viper.GetString("KAFKA_TOPIC_PROTOBUF") + configuration.Kafka.Sasl_User = viper.GetString("KAFKA_SASL_USER") + configuration.Kafka.Sasl_Password = viper.GetString("KAFKA_SASL_PASSWORD") + cors = viper.GetString("CORS_LIST") - //if configuration.Env == "PROD" { - configuration.Kafka.Sasl_User = viper.GetString(configuration.Kafka.Sasl_User) - configuration.Kafka.Sasl_Password = viper.GetString(configuration.Kafka.Sasl_Password) - //} - cors = viper.GetString(configuration.Server.Cors) return configuration } func main() { - logger.Debug("Serving on http://0.0.0.0:" + port) - httpServer, err1 := server.NewServer(port, cors) - metricsServer, err2 := server.MetricServer(METRICS_PORT) - if err1 != nil { - logger.Fatal("Unable to start server, %v", zap.Error(err1)) - } - if err2 != nil { - logger.Fatal("Unable to start Metric server, %v", zap.Error(err2)) - } - go httpServer.HttpServer.Serve(httpServer.Listener) - go metricsServer.HttpServer.Serve(metricsServer.Listener) + defer logger.Sync() - select {} + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + configs := getConfigs() + + // Initialize worker pool + workerPool := lib.NewWorkerPool(configs.Server.Goroutines) + workerPool.Start(ctx) + + // Initialize HTTP server + httpServer, err := server.NewServer(port, cors, workerPool, logger) + if err != nil { + logger.Fatal("Unable to create HTTP server", zap.Error(err)) + } + + // Initialize metrics server + metricsServer, err := server.MetricServer(METRICS_PORT, logger) + if err != nil { + logger.Fatal("Unable to create Metric server", zap.Error(err)) + } + + // Start servers + httpServer.StartServer() + metricsServer.StartServer() + + // Handle graceful shutdown + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + + <-sigChan + logger.Info("Shutting down servers...") + + // Stop servers + httpServer.StopServer() + metricsServer.StopServer() + + // Stop worker pool + cancel() + + logger.Info("Servers shut down gracefully") } diff --git a/producer/kafka_writer.go b/producer/kafka_writer.go index 9bb3b81..75823dc 100644 --- a/producer/kafka_writer.go +++ b/producer/kafka_writer.go @@ -1,25 +1,25 @@ package producer -import ( - metrics "com.navi.medici.janus/instrumentation" - "com.navi.medici.janus/utils" - "github.com/Shopify/sarama" - "go.uber.org/zap" - "time" -) +// import ( +// metrics "com.navi.medici.janus/instrumentation" +// "com.navi.medici.janus/utils" +// "github.com/Shopify/sarama" +// "go.uber.org/zap" +// "time" +// ) -func WriteMessageToKafkaAsync(message *sarama.ProducerMessage, source string) { - beforeKafkaPushTime := utils.NanosToMillis(time.Now().UnixNano()) - asyncProducer.Input() <- message - metrics.KafkaPushTimeHist.WithLabelValues(message.Topic, source).Observe(float64(utils.NanosToMillis(time.Now().UnixNano()) - beforeKafkaPushTime)) - metrics.KafkaPushCounter.WithLabelValues(message.Topic, source).Inc() - logger.Debug("SUCCESSFULLY WRITTEN TO KAFKA FOR SOURCE " + source) -} +// func WriteMessageToKafkaAsync(message *sarama.ProducerMessage, source string) { +// beforeKafkaPushTime := utils.NanosToMillis(time.Now().UnixNano()) +// asyncProducer.Input() <- message +// metrics.KafkaPushTimeHist.WithLabelValues(message.Topic, source).Observe(float64(utils.NanosToMillis(time.Now().UnixNano()) - beforeKafkaPushTime)) +// metrics.KafkaPushCounter.WithLabelValues(message.Topic, source).Inc() +// logger.Debug("SUCCESSFULLY WRITTEN TO KAFKA FOR SOURCE " + source) +// } -func processProducerErrors() { - for { - err := <-asyncProducer.Errors() - logger.Error("Failed to write message to Kafka", zap.Error(err.Err)) - metrics.ProducerFailCounter.WithLabelValues(err.Msg.Topic).Inc() - } -} +// func processProducerErrors() { +// for { +// err := <-asyncProducer.Errors() +// logger.Error("Failed to write message to Kafka", zap.Error(err.Err)) +// metrics.ProducerFailCounter.WithLabelValues(err.Msg.Topic).Inc() +// } +// } diff --git a/producer/producer_client.go b/producer/producer_client.go index 31b070c..4fe7479 100644 --- a/producer/producer_client.go +++ b/producer/producer_client.go @@ -1,54 +1,55 @@ package producer import ( - "com.navi.medici.janus/config" - "com.navi.medici.janus/utils" "crypto/tls" - "github.com/Shopify/sarama" - "go.uber.org/zap" "strings" "time" + + "com.navi.medici.janus/config" + metrics "com.navi.medici.janus/instrumentation" + "com.navi.medici.janus/utils" + "github.com/Shopify/sarama" + "github.com/sony/gobreaker" + "go.uber.org/zap" ) var ( - syncProducer sarama.SyncProducer asyncProducer sarama.AsyncProducer logger *zap.Logger + cb *gobreaker.CircuitBreaker + kafkaTopic string ) -func GetSyncProducerConfig(kafkaConfiguration config.KafkaConfigurations, env string) *sarama.Config { - config := sarama.NewConfig() - // security configs - //if env == "PROD" { - config.Net.TLS.Enable = true - config.Net.TLS.Config = createTLSConfiguration() - config.Net.SASL.Enable = true - config.Net.SASL.Handshake = true - config.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512 - config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { - return &XDGSCRAMClient{HashGeneratorFcn: SHA512} - } - config.Net.SASL.User = kafkaConfiguration.Sasl_User - config.Net.SASL.Password = kafkaConfiguration.Sasl_Password - //} +func InitializeProducers(kafkaConfiguration config.KafkaConfigurations, env string) { + logger = utils.GetLogger() + asyncProducer = GetAsyncProducer(kafkaConfiguration, env) + kafkaTopic = kafkaConfiguration.Kafka_Topic_Json - // producer configs - // to be changed: read from config file - config.Producer.Retry.Max = 3 - config.Producer.RequiredAcks = sarama.WaitForLocal - config.Producer.Compression = sarama.CompressionGZIP - config.Producer.Return.Successes = true + cb = gobreaker.NewCircuitBreaker(gobreaker.Settings{ + Name: "kafka-producer", + Timeout: 10 * time.Second, + ReadyToTrip: func(counts gobreaker.Counts) bool { + failureRatio := float64(counts.TotalFailures) / float64(counts.Requests) + return counts.Requests >= 3 && failureRatio >= 0.6 + }, + OnStateChange: func(name string, from gobreaker.State, to gobreaker.State) { + logger.Info("Circuit breaker state changed", + zap.String("name", name), + zap.String("from", from.String()), + zap.String("to", to.String())) + }, + }) - // metadata configs - config.Metadata.RefreshFrequency = 1 * time.Minute + go processProducerErrors() +} - return config +func GetKafkaTopic() string { + return kafkaTopic } func GetAsyncProducerConfig(kafkaConfiguration config.KafkaConfigurations, env string) *sarama.Config { config := sarama.NewConfig() - // security configs - //if env == "PROD" { + config.Net.TLS.Enable = true config.Net.TLS.Config = createTLSConfiguration() config.Net.SASL.Enable = true @@ -59,47 +60,45 @@ func GetAsyncProducerConfig(kafkaConfiguration config.KafkaConfigurations, env s } config.Net.SASL.User = kafkaConfiguration.Sasl_User config.Net.SASL.Password = kafkaConfiguration.Sasl_Password - //} - // producer configs - // to be changed: read from config file + config.Producer.Retry.Max = 3 config.Producer.RequiredAcks = sarama.WaitForLocal config.Producer.Compression = sarama.CompressionGZIP config.Producer.Flush.Bytes = 31000 config.Producer.Flush.Frequency = 100 * time.Millisecond - // metadata configs config.Metadata.RefreshFrequency = 1 * time.Minute return config } -func GetSyncProducer(kafkaConfiguration config.KafkaConfigurations, env string) sarama.SyncProducer { - config := GetSyncProducerConfig(kafkaConfiguration, env) - brokerList := strings.Split(kafkaConfiguration.Bootstrap_Servers, ",") - producer, err := sarama.NewSyncProducer(brokerList, config) - if err != nil { - panic(err) - } - return producer -} - func GetAsyncProducer(kafkaConfiguration config.KafkaConfigurations, env string) sarama.AsyncProducer { config := GetAsyncProducerConfig(kafkaConfiguration, env) brokerList := strings.Split(kafkaConfiguration.Bootstrap_Servers, ",") producer, err := sarama.NewAsyncProducer(brokerList, config) if err != nil { - panic(err) + logger.Fatal("Failed to create async producer", zap.Error(err)) } - - go processProducerErrors() return producer } -func InitializeProducers(kafkaConfiguration config.KafkaConfigurations, env string) { - logger = utils.GetLogger() - syncProducer = GetSyncProducer(kafkaConfiguration, env) - asyncProducer = GetAsyncProducer(kafkaConfiguration, env) +func WriteMessageToKafkaAsync(message *sarama.ProducerMessage, source string) error { + _, err := cb.Execute(func() (interface{}, error) { + beforeKafkaPushTime := utils.NanosToMillis(time.Now().UnixNano()) + asyncProducer.Input() <- message + metrics.KafkaPushTimeHist.WithLabelValues(message.Topic, source).Observe(float64(utils.NanosToMillis(time.Now().UnixNano()) - beforeKafkaPushTime)) + metrics.KafkaPushCounter.WithLabelValues(message.Topic, source).Inc() + logger.Debug("Successfully written to Kafka for source " + source) + return nil, nil + }) + return err +} + +func processProducerErrors() { + for err := range asyncProducer.Errors() { + logger.Error("Failed to write message to Kafka", zap.Error(err)) + metrics.ProducerFailCounter.WithLabelValues(err.Msg.Topic).Inc() + } } func createTLSConfiguration() (t *tls.Config) { diff --git a/server/handlers.go b/server/handlers.go index be0d1da..4231c43 100644 --- a/server/handlers.go +++ b/server/handlers.go @@ -1,20 +1,22 @@ package server import ( - metrics "com.navi.medici.janus/instrumentation" - "com.navi.medici.janus/lib" - "com.navi.medici.janus/utils" "compress/gzip" "encoding/json" "io" "io/ioutil" "net/http" - "time" - "fmt" "strings" + "time" + + metrics "com.navi.medici.janus/instrumentation" + "com.navi.medici.janus/lib" + "com.navi.medici.janus/utils" + "go.uber.org/zap" ) var ( + logger *zap.Logger healthyBool bool = true ) @@ -29,60 +31,68 @@ type CustomResponse struct { Message string `json:"message"` } -func eventsHandlerJson(w http.ResponseWriter, r *http.Request) { - eventHandlerStartTime := utils.NanosToMillis(time.Now().UnixNano()) - - headerName := "X-Correlation-Id" - substring := "abcdabcd-1234" - headerValue, ok := r.Header[headerName] - if ok { - if strings.HasPrefix(headerValue[0], substring) { +func init() { + logger = utils.GetLogger() +} + +func eventsHandlerJson(workerPool *lib.WorkerPool) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + + eventHandlerStartTime := utils.NanosToMillis(time.Now().UnixNano()) + + headerName := "X-Correlation-Id" + substring := "abcdabcd-1234" + headerValue := r.Header.Get(headerName) + if strings.HasPrefix(headerValue, substring) { w.Header().Set("Content-Type", "application/json") - var rsp = CustomResponse{Code: 200, Message: "OK"} + rsp := CustomResponse{Code: 200, Message: "OK"} json.NewEncoder(w).Encode(rsp) - fmt.Println("Dropped blacklisted request") + logger.Info("Dropped blacklisted request", zap.String("correlationID", headerValue)) return - } - } - - var reader io.Reader - // check if body is gzip compressed - if r.Header.Get("Content-Encoding") == "gzip" { - var err error - reader, err = gzip.NewReader(r.Body) + } + + var reader io.Reader + switch r.Header.Get("Content-Encoding") { + case "gzip": + var err error + reader, err = gzip.NewReader(r.Body) + if err != nil { + logger.Error("Error decompressing GZIP payload", zap.Error(err)) + w.Header().Set("Content-Type", "application/json") + http.Error(w, "Error decompressing GZIP payload", http.StatusBadRequest) + metrics.EventHandlerTimeHist.WithLabelValues(JSON, ERROR).Observe(float64(utils.NanosToMillis(time.Now().UnixNano()) - eventHandlerStartTime)) + return + } + defer reader.(*gzip.Reader).Close() + default: + reader = r.Body + } + + body, err := ioutil.ReadAll(reader) // todo Limit to 1MB if err != nil { - // log.Printf(err) - w.Header().Set("Content-Type", "application/json") - http.Error(w, "Error while decompressing GZIP payload", http.StatusBadRequest) + logger.Error("Error reading request body", zap.Error(err)) + http.Error(w, "Request body invalid", http.StatusBadRequest) metrics.EventHandlerTimeHist.WithLabelValues(JSON, ERROR).Observe(float64(utils.NanosToMillis(time.Now().UnixNano()) - eventHandlerStartTime)) return } - } else { - reader = r.Body - } - body, err := ioutil.ReadAll(reader) - if err != nil { - // log.Printf(err) w.Header().Set("Content-Type", "application/json") - http.Error(w, "Request body invalid", http.StatusBadRequest) - metrics.EventHandlerTimeHist.WithLabelValues(JSON, ERROR).Observe(float64(utils.NanosToMillis(time.Now().UnixNano()) - eventHandlerStartTime)) - return + + workerPool.AddJob(lib.RequestObject{Body: body, Header: r.Header}) + + rsp := CustomResponse{Code: 200, Message: "OK"} + if err := json.NewEncoder(w).Encode(rsp); err != nil { + logger.Error("Error encoding response", zap.Error(err)) + } + metrics.EventHandlerTimeHist.WithLabelValues(JSON, SUCCESS).Observe(float64(utils.NanosToMillis(time.Now().UnixNano()) - eventHandlerStartTime)) } - - w.Header().Set("Content-Type", "application/json") - lib.JsonRequestChannel <- &lib.RequestObject{Body: body, Header: r.Header} - var rsp = CustomResponse{Code: 200, Message: "OK"} - json.NewEncoder(w).Encode(rsp) - metrics.EventHandlerTimeHist.WithLabelValues(JSON, SUCCESS).Observe(float64(utils.NanosToMillis(time.Now().UnixNano()) - eventHandlerStartTime)) } func healthHandler(w http.ResponseWriter, r *http.Request) { if healthyBool { w.Header().Set("Content-Type", "application/json") - io.WriteString(w, "true") + io.WriteString(w, `{"status":"healthy"}`) } else { - http.Error(w, "server unhealthy", http.StatusServiceUnavailable) - return + http.Error(w, `{"status":"unhealthy"}`, http.StatusServiceUnavailable) } } diff --git a/server/server.go b/server/server.go index 6dfbb0a..aa4ce68 100644 --- a/server/server.go +++ b/server/server.go @@ -1,30 +1,31 @@ package server import ( - "fmt" "net" "net/http" "strings" "sync" + "com.navi.medici.janus/lib" "github.com/gorilla/mux" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/rs/cors" + "go.uber.org/zap" ) type Server struct { HttpServer *http.Server Listener net.Listener wg sync.WaitGroup + logger *zap.Logger } var ( networkTCP = "tcp" ) -func NewServer(port string, corsList string) (*Server, error) { - +func NewServer(port string, corsList string, workerPool *lib.WorkerPool, logger *zap.Logger) (*Server, error) { network := networkTCP listener, err := net.Listen(network, ":"+port) @@ -33,39 +34,74 @@ func NewServer(port string, corsList string) (*Server, error) { } router := mux.NewRouter() - router.HandleFunc("/events/json", eventsHandlerJson).Methods("POST") + router.HandleFunc("/events/json", eventsHandlerJson(workerPool)).Methods("POST") router.HandleFunc("/health", healthHandler).Methods("GET") - httpServer := &http.Server{Addr: ":" + port, Handler: enableCors(router, corsList)} - newServer := &Server{HttpServer: httpServer, Listener: listener} - return newServer, nil + httpServer := &http.Server{ + Addr: ":" + port, + Handler: enableCors(router, corsList), + } + newServer := &Server{ + HttpServer: httpServer, + Listener: listener, + logger: logger, + } + + return newServer, nil } func enableCors(handler http.Handler, corsList string) http.Handler { corsArray := strings.Split(corsList, ",") - fmt.Print(corsArray) c := cors.New(cors.Options{ AllowedOrigins: corsArray, AllowCredentials: false, - // Enable Debugging for testing, consider disabling in production - Debug: false, + Debug: false, }) return c.Handler(handler) } -func MetricServer(port string) (*Server, error) { + +func MetricServer(port string, logger *zap.Logger) (*Server, error) { network := networkTCP listener, err := net.Listen(network, ":"+port) if err != nil { - return nil, errors.Wrap(err, "failed to create listener") + return nil, errors.Wrap(err, "failed to create listener for metric server") } router := mux.NewRouter() - router.Path("/metrics").Handler(promhttp.Handler()) - httpServer := &http.Server{Addr: ":" + port, Handler: router} - mServer := &Server{HttpServer: httpServer, Listener: listener} + httpServer := &http.Server{ + Addr: ":" + port, + Handler: router, + } + + mServer := &Server{ + HttpServer: httpServer, + Listener: listener, + logger: logger, + } + return mServer, nil } + +func (s *Server) StartServer() { + s.wg.Add(1) + go func() { + defer s.wg.Done() + s.logger.Info("Starting server", zap.String("address", s.HttpServer.Addr)) + if err := s.HttpServer.Serve(s.Listener); err != nil && err != http.ErrServerClosed { + s.logger.Fatal("Server error", zap.Error(err)) + } + }() +} + +func (s *Server) StopServer() { + s.logger.Info("Stopping server", zap.String("address", s.HttpServer.Addr)) + if err := s.HttpServer.Close(); err != nil { + s.logger.Error("Error stopping server", zap.Error(err)) + } + s.wg.Wait() + s.logger.Info("Server stopped", zap.String("address", s.HttpServer.Addr)) +}