DE-3247 | refactored code and added logs

This commit is contained in:
puru
2024-08-23 15:24:00 +05:30
parent 90ddba18b0
commit 7981dedde4
8 changed files with 311 additions and 205 deletions

4
go.mod
View File

@@ -8,9 +8,11 @@ require (
github.com/magiconair/properties v1.8.7 // indirect
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.11.1
github.com/riferrei/srclient v0.2.1
github.com/rs/cors v1.8.2
github.com/sony/gobreaker v1.0.0
github.com/spf13/viper v1.7.1
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c
go.uber.org/zap v1.10.0
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4
gopkg.in/yaml.v3 v3.0.1
)

23
go.sum
View File

@@ -89,7 +89,6 @@ github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvq
github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/golang/protobuf v1.4.3 h1:JjCZWpVbqXDqFVmTfYWEVTMIYrL/NPdPSCHPJ0T/raM=
github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.2 h1:aeE13tS0IiQgFjYdoL8qN3K1N2bXXtI6Vi51/y7BpMw=
github.com/golang/snappy v0.0.2/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
@@ -177,8 +176,6 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/linkedin/goavro/v2 v2.9.7 h1:Vd++Rb/RKcmNJjM0HP/JJFMEWa21eUBVKPYlKehOGrM=
github.com/linkedin/goavro/v2 v2.9.7/go.mod h1:UgQUb2N/pmueQYH9bfqFioWxzYCZXSfF8Jw03O5sjqA=
github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY=
github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0=
@@ -239,8 +236,6 @@ github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/riferrei/srclient v0.2.1 h1:uIJhzPXW+suDsEDOZKf4oTZZXTyxtw98cFC70rFzvgU=
github.com/riferrei/srclient v0.2.1/go.mod h1:SmCz0lrYQ1pLqXlYq0yPnRccHLGh+llDA0i6hecPeW8=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rs/cors v1.8.2 h1:KCooALfAYGs415Cwu5ABvv9n9509fSiG5SQJn/AQo4U=
@@ -255,6 +250,8 @@ github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1
github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s=
github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=
github.com/sony/gobreaker v1.0.0 h1:feX5fGGXSl3dYd4aHZItw+FpHLvvoaqkawKjVNiFMNQ=
github.com/sony/gobreaker v1.0.0/go.mod h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJOjmxWY=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spf13/afero v1.1.2 h1:m8/z1t7/fwjysjQRYbP0RD+bUIF/8tJwPdEZsI83ACI=
github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ=
@@ -268,12 +265,20 @@ github.com/spf13/viper v1.7.1 h1:pM5oEahlgWv/WnHXpgbKz7iLIxRf65tye2Ci+XFK5sk=
github.com/spf13/viper v1.7.1/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
@@ -346,8 +351,6 @@ golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a h1:DcqTD9SDLc+1P/r1EmRBwnVsrOwW+kk2vWf9n+1sGhs=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
@@ -381,6 +384,7 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.5 h1:i6eZZ+zk0SOf0xgBpEpPD18qWcJda6q1sxt3S0kzyUQ=
golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 h1:SvFZT6jyqRaOeXpc5h/JSfZenJ2O330aBsf7JfSUXmQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
@@ -450,8 +454,9 @@ gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=

View File

@@ -1,19 +1,17 @@
package lib
import (
metrics "com.navi.medici.janus/instrumentation"
producer_module "com.navi.medici.janus/producer"
"com.navi.medici.janus/utils"
"context"
"encoding/json"
"net/http"
"time"
"github.com/Shopify/sarama"
)
metrics "com.navi.medici.janus/instrumentation"
producer_module "com.navi.medici.janus/producer"
"com.navi.medici.janus/utils"
var (
ProtobufRequestChannel = make(chan *RequestObject)
JsonRequestChannel = make(chan *RequestObject)
"github.com/Shopify/sarama"
"go.uber.org/zap"
)
type RequestObject struct {
@@ -21,45 +19,72 @@ type RequestObject struct {
Header http.Header
}
func ProcessJsonRequestChannel(topic string) {
for {
request := <-JsonRequestChannel
ClickstreamJsonEventHandler(*request, topic)
type WorkerPool struct {
workers int
jobQueue chan RequestObject
logger *zap.Logger
}
func NewWorkerPool(workers int) *WorkerPool {
return &WorkerPool{
workers: workers,
jobQueue: make(chan RequestObject, workers),
logger: utils.GetLogger(),
}
}
func ClickstreamJsonEventHandler(request RequestObject, topic string) {
func (wp *WorkerPool) Start(ctx context.Context) {
for i := 0; i < wp.workers; i++ {
go wp.worker(ctx)
}
}
func (wp *WorkerPool) worker(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case job := <-wp.jobQueue:
wp.processRequest(job)
}
}
}
func (wp *WorkerPool) AddJob(job RequestObject) {
wp.jobQueue <- job
}
func (wp *WorkerPool) processRequest(request RequestObject) {
defer func() {
if r := recover(); r != nil {
wp.logger.Error("Recovered from panic in processRequest", zap.Any("panic", r))
}
}()
eventProcessingStartTime := utils.NanosToMillis(time.Now().UnixNano())
messageBytes := request.Body
//getting the client which has sent this event
var result map[string]interface{}
json.Unmarshal(messageBytes, &result)
if err := json.Unmarshal(messageBytes, &result); err != nil {
wp.logger.Error("Failed to unmarshal JSON", zap.Error(err))
return
}
source := getSource(result)
// to be of the format [magicByte] + [schemaID] + [messageIndex] + [value]
recordValue := []byte{}
// add [magicByte]
// recordValue = append(recordValue, byte(0))
// // add schemaID]
// schemaIDBytes := make([]byte, 4)
// binary.BigEndian.PutUint32(schemaIDBytes, uint32(schema.SchemaVersionMap[topic]))
// recordValue = append(recordValue, schemaIDBytes...)
// Now write the bytes from the actual message...
recordValue = append(recordValue, messageBytes...)
recordValue := messageBytes
message := &sarama.ProducerMessage{
Topic: topic,
Topic: producer_module.GetKafkaTopic(),
Value: sarama.ByteEncoder(recordValue),
}
metrics.RequestCounter.WithLabelValues(source).Inc()
// processing complete, record duration metrics
metrics.EventProcessingTimeHist.WithLabelValues(topic, source).Observe(float64(utils.NanosToMillis(time.Now().UnixNano()) - eventProcessingStartTime))
producer_module.WriteMessageToKafkaAsync(message, source)
metrics.EventProcessingTimeHist.WithLabelValues(producer_module.GetKafkaTopic(), source).Observe(float64(utils.NanosToMillis(time.Now().UnixNano()) - eventProcessingStartTime))
if err := producer_module.WriteMessageToKafkaAsync(message, source); err != nil {
wp.logger.Error("Failed to write message to Kafka", zap.Error(err), zap.String("source", source))
}
}
func getSource(event map[string]interface{}) string {

91
main.go
View File

@@ -1,6 +1,12 @@
package main
import (
"context"
"log"
"os"
"os/signal"
"syscall"
"com.navi.medici.janus/config"
"com.navi.medici.janus/lib"
producer_client "com.navi.medici.janus/producer"
@@ -8,7 +14,6 @@ import (
"com.navi.medici.janus/utils"
"github.com/spf13/viper"
"go.uber.org/zap"
"log"
)
var (
@@ -24,13 +29,10 @@ func init() {
utils.InitializeLogger(configs.Env)
logger = utils.GetLogger()
port = configs.Server.Port
logger.Debug("Service started on PORT: " + port)
logger.Debug("Kafka Bootstrap Servers: " + configs.Kafka.Bootstrap_Servers)
logger.Info("Service started on PORT: " + port)
logger.Info("Kafka Bootstrap Servers: " + configs.Kafka.Bootstrap_Servers)
producer_client.InitializeProducers(configs.Kafka, configs.Env)
logger.Info("Producer Initialized, starting goroutines for event processing")
for i := 0; i < configs.Server.Goroutines; i++ {
go lib.ProcessJsonRequestChannel(configs.Kafka.Kafka_Topic_Json)
}
logger.Info("Producer Initialized")
}
func getConfigs() config.Configurations {
@@ -41,40 +43,67 @@ func getConfigs() config.Configurations {
var configuration config.Configurations
if err := viper.ReadInConfig(); err != nil {
log.Fatalln("Error reading config file, %s", err)
log.Fatalf("Error reading config file: %s", err)
}
err := viper.Unmarshal(&configuration)
if err != nil {
log.Fatalln("Unable to decode into struct, %v", err)
log.Fatalf("Unable to decode into struct: %v", err)
}
// Following coinfigurations read from environment variables
configuration.Env = viper.GetString(configuration.Env)
configuration.Kafka.Bootstrap_Servers = viper.GetString(configuration.Kafka.Bootstrap_Servers)
configuration.Kafka.Kafka_Topic_Json = viper.GetString(configuration.Kafka.Kafka_Topic_Json)
configuration.Kafka.Kafka_Topic_Protobuf = viper.GetString(configuration.Kafka.Kafka_Topic_Protobuf)
// Read configurations from environment variables
configuration.Env = viper.GetString("ENVIRONMENT")
configuration.Kafka.Bootstrap_Servers = viper.GetString("BOOTSTRAP_SERVERS")
configuration.Kafka.Kafka_Topic_Json = viper.GetString("KAFKA_TOPIC_JSON")
configuration.Kafka.Kafka_Topic_Protobuf = viper.GetString("KAFKA_TOPIC_PROTOBUF")
configuration.Kafka.Sasl_User = viper.GetString("KAFKA_SASL_USER")
configuration.Kafka.Sasl_Password = viper.GetString("KAFKA_SASL_PASSWORD")
cors = viper.GetString("CORS_LIST")
//if configuration.Env == "PROD" {
configuration.Kafka.Sasl_User = viper.GetString(configuration.Kafka.Sasl_User)
configuration.Kafka.Sasl_Password = viper.GetString(configuration.Kafka.Sasl_Password)
//}
cors = viper.GetString(configuration.Server.Cors)
return configuration
}
func main() {
logger.Debug("Serving on http://0.0.0.0:" + port)
httpServer, err1 := server.NewServer(port, cors)
metricsServer, err2 := server.MetricServer(METRICS_PORT)
if err1 != nil {
logger.Fatal("Unable to start server, %v", zap.Error(err1))
}
if err2 != nil {
logger.Fatal("Unable to start Metric server, %v", zap.Error(err2))
}
go httpServer.HttpServer.Serve(httpServer.Listener)
go metricsServer.HttpServer.Serve(metricsServer.Listener)
defer logger.Sync()
select {}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
configs := getConfigs()
// Initialize worker pool
workerPool := lib.NewWorkerPool(configs.Server.Goroutines)
workerPool.Start(ctx)
// Initialize HTTP server
httpServer, err := server.NewServer(port, cors, workerPool, logger)
if err != nil {
logger.Fatal("Unable to create HTTP server", zap.Error(err))
}
// Initialize metrics server
metricsServer, err := server.MetricServer(METRICS_PORT, logger)
if err != nil {
logger.Fatal("Unable to create Metric server", zap.Error(err))
}
// Start servers
httpServer.StartServer()
metricsServer.StartServer()
// Handle graceful shutdown
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
<-sigChan
logger.Info("Shutting down servers...")
// Stop servers
httpServer.StopServer()
metricsServer.StopServer()
// Stop worker pool
cancel()
logger.Info("Servers shut down gracefully")
}

View File

@@ -1,25 +1,25 @@
package producer
import (
metrics "com.navi.medici.janus/instrumentation"
"com.navi.medici.janus/utils"
"github.com/Shopify/sarama"
"go.uber.org/zap"
"time"
)
// import (
// metrics "com.navi.medici.janus/instrumentation"
// "com.navi.medici.janus/utils"
// "github.com/Shopify/sarama"
// "go.uber.org/zap"
// "time"
// )
func WriteMessageToKafkaAsync(message *sarama.ProducerMessage, source string) {
beforeKafkaPushTime := utils.NanosToMillis(time.Now().UnixNano())
asyncProducer.Input() <- message
metrics.KafkaPushTimeHist.WithLabelValues(message.Topic, source).Observe(float64(utils.NanosToMillis(time.Now().UnixNano()) - beforeKafkaPushTime))
metrics.KafkaPushCounter.WithLabelValues(message.Topic, source).Inc()
logger.Debug("SUCCESSFULLY WRITTEN TO KAFKA FOR SOURCE " + source)
}
// func WriteMessageToKafkaAsync(message *sarama.ProducerMessage, source string) {
// beforeKafkaPushTime := utils.NanosToMillis(time.Now().UnixNano())
// asyncProducer.Input() <- message
// metrics.KafkaPushTimeHist.WithLabelValues(message.Topic, source).Observe(float64(utils.NanosToMillis(time.Now().UnixNano()) - beforeKafkaPushTime))
// metrics.KafkaPushCounter.WithLabelValues(message.Topic, source).Inc()
// logger.Debug("SUCCESSFULLY WRITTEN TO KAFKA FOR SOURCE " + source)
// }
func processProducerErrors() {
for {
err := <-asyncProducer.Errors()
logger.Error("Failed to write message to Kafka", zap.Error(err.Err))
metrics.ProducerFailCounter.WithLabelValues(err.Msg.Topic).Inc()
}
}
// func processProducerErrors() {
// for {
// err := <-asyncProducer.Errors()
// logger.Error("Failed to write message to Kafka", zap.Error(err.Err))
// metrics.ProducerFailCounter.WithLabelValues(err.Msg.Topic).Inc()
// }
// }

View File

@@ -1,54 +1,55 @@
package producer
import (
"com.navi.medici.janus/config"
"com.navi.medici.janus/utils"
"crypto/tls"
"github.com/Shopify/sarama"
"go.uber.org/zap"
"strings"
"time"
"com.navi.medici.janus/config"
metrics "com.navi.medici.janus/instrumentation"
"com.navi.medici.janus/utils"
"github.com/Shopify/sarama"
"github.com/sony/gobreaker"
"go.uber.org/zap"
)
var (
syncProducer sarama.SyncProducer
asyncProducer sarama.AsyncProducer
logger *zap.Logger
cb *gobreaker.CircuitBreaker
kafkaTopic string
)
func GetSyncProducerConfig(kafkaConfiguration config.KafkaConfigurations, env string) *sarama.Config {
config := sarama.NewConfig()
// security configs
//if env == "PROD" {
config.Net.TLS.Enable = true
config.Net.TLS.Config = createTLSConfiguration()
config.Net.SASL.Enable = true
config.Net.SASL.Handshake = true
config.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &XDGSCRAMClient{HashGeneratorFcn: SHA512}
}
config.Net.SASL.User = kafkaConfiguration.Sasl_User
config.Net.SASL.Password = kafkaConfiguration.Sasl_Password
//}
func InitializeProducers(kafkaConfiguration config.KafkaConfigurations, env string) {
logger = utils.GetLogger()
asyncProducer = GetAsyncProducer(kafkaConfiguration, env)
kafkaTopic = kafkaConfiguration.Kafka_Topic_Json
// producer configs
// to be changed: read from config file
config.Producer.Retry.Max = 3
config.Producer.RequiredAcks = sarama.WaitForLocal
config.Producer.Compression = sarama.CompressionGZIP
config.Producer.Return.Successes = true
cb = gobreaker.NewCircuitBreaker(gobreaker.Settings{
Name: "kafka-producer",
Timeout: 10 * time.Second,
ReadyToTrip: func(counts gobreaker.Counts) bool {
failureRatio := float64(counts.TotalFailures) / float64(counts.Requests)
return counts.Requests >= 3 && failureRatio >= 0.6
},
OnStateChange: func(name string, from gobreaker.State, to gobreaker.State) {
logger.Info("Circuit breaker state changed",
zap.String("name", name),
zap.String("from", from.String()),
zap.String("to", to.String()))
},
})
// metadata configs
config.Metadata.RefreshFrequency = 1 * time.Minute
go processProducerErrors()
}
return config
func GetKafkaTopic() string {
return kafkaTopic
}
func GetAsyncProducerConfig(kafkaConfiguration config.KafkaConfigurations, env string) *sarama.Config {
config := sarama.NewConfig()
// security configs
//if env == "PROD" {
config.Net.TLS.Enable = true
config.Net.TLS.Config = createTLSConfiguration()
config.Net.SASL.Enable = true
@@ -59,47 +60,45 @@ func GetAsyncProducerConfig(kafkaConfiguration config.KafkaConfigurations, env s
}
config.Net.SASL.User = kafkaConfiguration.Sasl_User
config.Net.SASL.Password = kafkaConfiguration.Sasl_Password
//}
// producer configs
// to be changed: read from config file
config.Producer.Retry.Max = 3
config.Producer.RequiredAcks = sarama.WaitForLocal
config.Producer.Compression = sarama.CompressionGZIP
config.Producer.Flush.Bytes = 31000
config.Producer.Flush.Frequency = 100 * time.Millisecond
// metadata configs
config.Metadata.RefreshFrequency = 1 * time.Minute
return config
}
func GetSyncProducer(kafkaConfiguration config.KafkaConfigurations, env string) sarama.SyncProducer {
config := GetSyncProducerConfig(kafkaConfiguration, env)
brokerList := strings.Split(kafkaConfiguration.Bootstrap_Servers, ",")
producer, err := sarama.NewSyncProducer(brokerList, config)
if err != nil {
panic(err)
}
return producer
}
func GetAsyncProducer(kafkaConfiguration config.KafkaConfigurations, env string) sarama.AsyncProducer {
config := GetAsyncProducerConfig(kafkaConfiguration, env)
brokerList := strings.Split(kafkaConfiguration.Bootstrap_Servers, ",")
producer, err := sarama.NewAsyncProducer(brokerList, config)
if err != nil {
panic(err)
logger.Fatal("Failed to create async producer", zap.Error(err))
}
go processProducerErrors()
return producer
}
func InitializeProducers(kafkaConfiguration config.KafkaConfigurations, env string) {
logger = utils.GetLogger()
syncProducer = GetSyncProducer(kafkaConfiguration, env)
asyncProducer = GetAsyncProducer(kafkaConfiguration, env)
func WriteMessageToKafkaAsync(message *sarama.ProducerMessage, source string) error {
_, err := cb.Execute(func() (interface{}, error) {
beforeKafkaPushTime := utils.NanosToMillis(time.Now().UnixNano())
asyncProducer.Input() <- message
metrics.KafkaPushTimeHist.WithLabelValues(message.Topic, source).Observe(float64(utils.NanosToMillis(time.Now().UnixNano()) - beforeKafkaPushTime))
metrics.KafkaPushCounter.WithLabelValues(message.Topic, source).Inc()
logger.Debug("Successfully written to Kafka for source " + source)
return nil, nil
})
return err
}
func processProducerErrors() {
for err := range asyncProducer.Errors() {
logger.Error("Failed to write message to Kafka", zap.Error(err))
metrics.ProducerFailCounter.WithLabelValues(err.Msg.Topic).Inc()
}
}
func createTLSConfiguration() (t *tls.Config) {

View File

@@ -1,20 +1,22 @@
package server
import (
metrics "com.navi.medici.janus/instrumentation"
"com.navi.medici.janus/lib"
"com.navi.medici.janus/utils"
"compress/gzip"
"encoding/json"
"io"
"io/ioutil"
"net/http"
"time"
"fmt"
"strings"
"time"
metrics "com.navi.medici.janus/instrumentation"
"com.navi.medici.janus/lib"
"com.navi.medici.janus/utils"
"go.uber.org/zap"
)
var (
logger *zap.Logger
healthyBool bool = true
)
@@ -29,60 +31,68 @@ type CustomResponse struct {
Message string `json:"message"`
}
func eventsHandlerJson(w http.ResponseWriter, r *http.Request) {
eventHandlerStartTime := utils.NanosToMillis(time.Now().UnixNano())
headerName := "X-Correlation-Id"
substring := "abcdabcd-1234"
headerValue, ok := r.Header[headerName]
if ok {
if strings.HasPrefix(headerValue[0], substring) {
func init() {
logger = utils.GetLogger()
}
func eventsHandlerJson(workerPool *lib.WorkerPool) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
eventHandlerStartTime := utils.NanosToMillis(time.Now().UnixNano())
headerName := "X-Correlation-Id"
substring := "abcdabcd-1234"
headerValue := r.Header.Get(headerName)
if strings.HasPrefix(headerValue, substring) {
w.Header().Set("Content-Type", "application/json")
var rsp = CustomResponse{Code: 200, Message: "OK"}
rsp := CustomResponse{Code: 200, Message: "OK"}
json.NewEncoder(w).Encode(rsp)
fmt.Println("Dropped blacklisted request")
logger.Info("Dropped blacklisted request", zap.String("correlationID", headerValue))
return
}
}
var reader io.Reader
// check if body is gzip compressed
if r.Header.Get("Content-Encoding") == "gzip" {
var err error
reader, err = gzip.NewReader(r.Body)
}
var reader io.Reader
switch r.Header.Get("Content-Encoding") {
case "gzip":
var err error
reader, err = gzip.NewReader(r.Body)
if err != nil {
logger.Error("Error decompressing GZIP payload", zap.Error(err))
w.Header().Set("Content-Type", "application/json")
http.Error(w, "Error decompressing GZIP payload", http.StatusBadRequest)
metrics.EventHandlerTimeHist.WithLabelValues(JSON, ERROR).Observe(float64(utils.NanosToMillis(time.Now().UnixNano()) - eventHandlerStartTime))
return
}
defer reader.(*gzip.Reader).Close()
default:
reader = r.Body
}
body, err := ioutil.ReadAll(reader) // todo Limit to 1MB
if err != nil {
// log.Printf(err)
w.Header().Set("Content-Type", "application/json")
http.Error(w, "Error while decompressing GZIP payload", http.StatusBadRequest)
logger.Error("Error reading request body", zap.Error(err))
http.Error(w, "Request body invalid", http.StatusBadRequest)
metrics.EventHandlerTimeHist.WithLabelValues(JSON, ERROR).Observe(float64(utils.NanosToMillis(time.Now().UnixNano()) - eventHandlerStartTime))
return
}
} else {
reader = r.Body
}
body, err := ioutil.ReadAll(reader)
if err != nil {
// log.Printf(err)
w.Header().Set("Content-Type", "application/json")
http.Error(w, "Request body invalid", http.StatusBadRequest)
metrics.EventHandlerTimeHist.WithLabelValues(JSON, ERROR).Observe(float64(utils.NanosToMillis(time.Now().UnixNano()) - eventHandlerStartTime))
return
workerPool.AddJob(lib.RequestObject{Body: body, Header: r.Header})
rsp := CustomResponse{Code: 200, Message: "OK"}
if err := json.NewEncoder(w).Encode(rsp); err != nil {
logger.Error("Error encoding response", zap.Error(err))
}
metrics.EventHandlerTimeHist.WithLabelValues(JSON, SUCCESS).Observe(float64(utils.NanosToMillis(time.Now().UnixNano()) - eventHandlerStartTime))
}
w.Header().Set("Content-Type", "application/json")
lib.JsonRequestChannel <- &lib.RequestObject{Body: body, Header: r.Header}
var rsp = CustomResponse{Code: 200, Message: "OK"}
json.NewEncoder(w).Encode(rsp)
metrics.EventHandlerTimeHist.WithLabelValues(JSON, SUCCESS).Observe(float64(utils.NanosToMillis(time.Now().UnixNano()) - eventHandlerStartTime))
}
func healthHandler(w http.ResponseWriter, r *http.Request) {
if healthyBool {
w.Header().Set("Content-Type", "application/json")
io.WriteString(w, "true")
io.WriteString(w, `{"status":"healthy"}`)
} else {
http.Error(w, "server unhealthy", http.StatusServiceUnavailable)
return
http.Error(w, `{"status":"unhealthy"}`, http.StatusServiceUnavailable)
}
}

View File

@@ -1,30 +1,31 @@
package server
import (
"fmt"
"net"
"net/http"
"strings"
"sync"
"com.navi.medici.janus/lib"
"github.com/gorilla/mux"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/rs/cors"
"go.uber.org/zap"
)
type Server struct {
HttpServer *http.Server
Listener net.Listener
wg sync.WaitGroup
logger *zap.Logger
}
var (
networkTCP = "tcp"
)
func NewServer(port string, corsList string) (*Server, error) {
func NewServer(port string, corsList string, workerPool *lib.WorkerPool, logger *zap.Logger) (*Server, error) {
network := networkTCP
listener, err := net.Listen(network, ":"+port)
@@ -33,39 +34,74 @@ func NewServer(port string, corsList string) (*Server, error) {
}
router := mux.NewRouter()
router.HandleFunc("/events/json", eventsHandlerJson).Methods("POST")
router.HandleFunc("/events/json", eventsHandlerJson(workerPool)).Methods("POST")
router.HandleFunc("/health", healthHandler).Methods("GET")
httpServer := &http.Server{Addr: ":" + port, Handler: enableCors(router, corsList)}
newServer := &Server{HttpServer: httpServer, Listener: listener}
return newServer, nil
httpServer := &http.Server{
Addr: ":" + port,
Handler: enableCors(router, corsList),
}
newServer := &Server{
HttpServer: httpServer,
Listener: listener,
logger: logger,
}
return newServer, nil
}
func enableCors(handler http.Handler, corsList string) http.Handler {
corsArray := strings.Split(corsList, ",")
fmt.Print(corsArray)
c := cors.New(cors.Options{
AllowedOrigins: corsArray,
AllowCredentials: false,
// Enable Debugging for testing, consider disabling in production
Debug: false,
Debug: false,
})
return c.Handler(handler)
}
func MetricServer(port string) (*Server, error) {
func MetricServer(port string, logger *zap.Logger) (*Server, error) {
network := networkTCP
listener, err := net.Listen(network, ":"+port)
if err != nil {
return nil, errors.Wrap(err, "failed to create listener")
return nil, errors.Wrap(err, "failed to create listener for metric server")
}
router := mux.NewRouter()
router.Path("/metrics").Handler(promhttp.Handler())
httpServer := &http.Server{Addr: ":" + port, Handler: router}
mServer := &Server{HttpServer: httpServer, Listener: listener}
httpServer := &http.Server{
Addr: ":" + port,
Handler: router,
}
mServer := &Server{
HttpServer: httpServer,
Listener: listener,
logger: logger,
}
return mServer, nil
}
func (s *Server) StartServer() {
s.wg.Add(1)
go func() {
defer s.wg.Done()
s.logger.Info("Starting server", zap.String("address", s.HttpServer.Addr))
if err := s.HttpServer.Serve(s.Listener); err != nil && err != http.ErrServerClosed {
s.logger.Fatal("Server error", zap.Error(err))
}
}()
}
func (s *Server) StopServer() {
s.logger.Info("Stopping server", zap.String("address", s.HttpServer.Addr))
if err := s.HttpServer.Close(); err != nil {
s.logger.Error("Error stopping server", zap.Error(err))
}
s.wg.Wait()
s.logger.Info("Server stopped", zap.String("address", s.HttpServer.Addr))
}