From e940f69528fcdaaae16d8f4641317a0fe74dc8fc Mon Sep 17 00:00:00 2001 From: aishwarya-raimule Date: Thu, 12 Jan 2023 12:59:22 +0530 Subject: [PATCH] added producer metrics, modified kafka write logic --- go.mod | 1 + go.sum | 2 + instrumentation/metrics.go | 49 +++++++++++++------ lib/RequestHandler.go | 10 ++-- producer/kafka_writer.go | 38 +++++++++----- ...{producer_config.go => producer_client.go} | 3 ++ server/handlers.go | 15 +++++- utils/time.go | 7 +++ 8 files changed, 95 insertions(+), 30 deletions(-) rename producer/{producer_config.go => producer_client.go} (98%) create mode 100644 utils/time.go diff --git a/go.mod b/go.mod index ce1fd00..adef47d 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b // indirect github.com/golang/protobuf v1.4.1 // indirect github.com/gorilla/mux v1.8.0 + github.com/magiconair/properties v1.8.7 // indirect github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v0.9.3 github.com/riferrei/srclient v0.2.1 diff --git a/go.sum b/go.sum index 0fe7a3d..cdb09a7 100644 --- a/go.sum +++ b/go.sum @@ -169,6 +169,8 @@ github.com/linkedin/goavro/v2 v2.9.7 h1:Vd++Rb/RKcmNJjM0HP/JJFMEWa21eUBVKPYlKehO github.com/linkedin/goavro/v2 v2.9.7/go.mod h1:UgQUb2N/pmueQYH9bfqFioWxzYCZXSfF8Jw03O5sjqA= github.com/magiconair/properties v1.8.1 h1:ZC2Vc7/ZFkGmsVC9KvOjumD+G5lXy2RtTKyzRKO2BQ4= github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= +github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY= +github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0= github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= diff --git a/instrumentation/metrics.go b/instrumentation/metrics.go index 1c57b2a..6cdeb52 100644 --- a/instrumentation/metrics.go +++ b/instrumentation/metrics.go @@ -5,23 +5,31 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" ) -var producerSuccessCounter = promauto.NewCounterVec( +var KafkaPushCounter = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "janus_kafka_push_requests", + Help: "Number of Kafka Push requests", + }, + []string{"topic", "source"}, +) + +var ProducerSuccessCounter = promauto.NewCounterVec( prometheus.CounterOpts{ Name: "janus_producer_send_success", Help: "Number of Success requests.", }, - []string{"source"}, + []string{"topic"}, ) -var producerFailCounter = promauto.NewCounterVec( +var ProducerFailCounter = promauto.NewCounterVec( prometheus.CounterOpts{ Name: "janus_producer_send_fail", Help: "Number of failed requests.", }, - []string{"source"}, + []string{"topic"}, ) -var requestCounter = promauto.NewCounterVec( +var RequestCounter = promauto.NewCounterVec( prometheus.CounterOpts{ Name: "janus_total_requests", Help: "Number of get requests.", @@ -29,13 +37,26 @@ var requestCounter = promauto.NewCounterVec( []string{"source"}, ) +var EventProcessingTimeHist = promauto.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "janus_event_processing_time_ms", + Help: "Time taken to process event", + }, + []string{"topic", "source"}, +) -func IncrementCounter(metricName string, path string) { - if metricName == "success" { - producerSuccessCounter.WithLabelValues(path).Inc() - } else if metricName == "fail" { - producerFailCounter.WithLabelValues(path).Inc() - } else if metricName == "request" { - requestCounter.WithLabelValues(path).Inc() - } -} +var EventHandlerTimeHist = promauto.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "janus_event_handler_time_ms", + Help: "Time taken to receive request, send it to processing and return response", + }, + []string{"event_type", "status"}, +) + +var KafkaPushTimeHist = promauto.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "janus_kafka_push_time_ms", + Help: "Time taken to write event to Kafka", + }, + []string{"topic", "source"}, +) diff --git a/lib/RequestHandler.go b/lib/RequestHandler.go index a1c732f..939d8c0 100644 --- a/lib/RequestHandler.go +++ b/lib/RequestHandler.go @@ -4,6 +4,8 @@ import ( metrics "com.navi.medici.janus/instrumentation" producer_module "com.navi.medici.janus/producer" "com.navi.medici.janus/schema" + "com.navi.medici.janus/utils" + "time" "encoding/binary" "encoding/json" @@ -66,12 +68,12 @@ func ClickstreamProtobufEventHandler(request RequestObject, topic string) { Topic: topic, Value: sarama.ByteEncoder(recordValue), } - metrics.IncrementCounter("request", source) + metrics.RequestCounter.WithLabelValues(source).Inc() producer_module.WriteMessageToKafkaAsync(message, source) } func ClickstreamJsonEventHandler(request RequestObject, topic string) { - + eventProcessingStartTime := utils.NanosToMillis(time.Now().UnixNano()) messageBytes := request.Body //getting the client which has sent this event @@ -98,6 +100,8 @@ func ClickstreamJsonEventHandler(request RequestObject, topic string) { Value: sarama.ByteEncoder(recordValue), } - metrics.IncrementCounter("request", source) + metrics.RequestCounter.WithLabelValues(source).Inc() + // processing complete, record duration metrics + metrics.EventProcessingTimeHist.WithLabelValues(topic, source).Observe(float64(utils.NanosToMillis(time.Now().UnixNano()) - eventProcessingStartTime)) producer_module.WriteMessageToKafkaAsync(message, source) } diff --git a/producer/kafka_writer.go b/producer/kafka_writer.go index 2d8e07b..0d523ba 100644 --- a/producer/kafka_writer.go +++ b/producer/kafka_writer.go @@ -1,19 +1,33 @@ package producer import ( - metrics "com.navi.medici.janus/instrumentation" - "log" - "github.com/Shopify/sarama" + metrics "com.navi.medici.janus/instrumentation" + "com.navi.medici.janus/utils" + "github.com/Shopify/sarama" + "go.uber.org/zap" + "time" ) - func WriteMessageToKafkaAsync(message *sarama.ProducerMessage, source string) { - select { - case asyncProducer.Input() <- message: - metrics.IncrementCounter("success", source) -// log.Printf("SUCCESSFULLY WRITTEN TO KAFKA FOR SOURCE %v", source) - case err := <- asyncProducer.Errors(): - metrics.IncrementCounter("fail", source) - log.Printf("FAILED TO WRITE TO KAFKA FOR SOURCE %v %v", source, err) - } + beforeKafkaPushTime := utils.NanosToMillis(time.Now().UnixNano()) + asyncProducer.Input() <- message + metrics.KafkaPushTimeHist.WithLabelValues(message.Topic, source).Observe(float64(utils.NanosToMillis(time.Now().UnixNano()) - beforeKafkaPushTime)) + metrics.KafkaPushCounter.WithLabelValues(message.Topic, source).Inc() + logger.Info("SUCCESSFULLY WRITTEN TO KAFKA FOR SOURCE " + source) +} + +func processProducerSuccesses() { + for { + msg := <-asyncProducer.Successes() + metrics.ProducerSuccessCounter.WithLabelValues(msg.Topic).Inc() + } +} + +func processProducerErrors() { + for { + err := <-asyncProducer.Errors() + logger.Error("Failed to write message to Kafka", zap.Error(err.Err)) + metrics.ProducerFailCounter.WithLabelValues(err.Msg.Topic).Inc() + } + } diff --git a/producer/producer_config.go b/producer/producer_client.go similarity index 98% rename from producer/producer_config.go rename to producer/producer_client.go index ab3b7c8..a7ad854 100644 --- a/producer/producer_config.go +++ b/producer/producer_client.go @@ -94,6 +94,9 @@ func GetAsyncProducer(kafkaConfiguration config.KafkaConfigurations, env string) if err != nil { panic(err) } + + go processProducerSuccesses() + go processProducerErrors() return producer } diff --git a/server/handlers.go b/server/handlers.go index 223ce32..d0627c9 100644 --- a/server/handlers.go +++ b/server/handlers.go @@ -1,8 +1,11 @@ package server import ( + metrics "com.navi.medici.janus/instrumentation" "com.navi.medici.janus/lib" "com.navi.medici.janus/schema" + "com.navi.medici.janus/utils" + "time" "compress/gzip" "encoding/json" @@ -16,6 +19,13 @@ var ( healthyBool bool = true ) +const ( + JSON = "json" + PROTO = "proto" + SUCCESS = "success" + ERROR = "error" +) + type NewSchemaRequest struct { Topic string `json:"topic"` Schema string `json:"schema"` @@ -65,6 +75,7 @@ func eventsHandler(w http.ResponseWriter, r *http.Request) { } func eventsHandlerJson(w http.ResponseWriter, r *http.Request) { + eventHandlerStartTime := utils.NanosToMillis(time.Now().UnixNano()) var reader io.Reader // check if body is gzip compressed if r.Header.Get("Content-Encoding") == "gzip" { @@ -74,6 +85,7 @@ func eventsHandlerJson(w http.ResponseWriter, r *http.Request) { // log.Printf(err) w.Header().Set("Content-Type", "application/json") http.Error(w, "Error while decompressing GZIP payload", http.StatusBadRequest) + metrics.EventHandlerTimeHist.WithLabelValues(JSON, ERROR).Observe(float64(utils.NanosToMillis(time.Now().UnixNano()) - eventHandlerStartTime)) return } } else { @@ -85,14 +97,15 @@ func eventsHandlerJson(w http.ResponseWriter, r *http.Request) { // log.Printf(err) w.Header().Set("Content-Type", "application/json") http.Error(w, "Request body invalid", http.StatusBadRequest) + metrics.EventHandlerTimeHist.WithLabelValues(JSON, ERROR).Observe(float64(utils.NanosToMillis(time.Now().UnixNano()) - eventHandlerStartTime)) return } w.Header().Set("Content-Type", "application/json") lib.JsonRequestChannel <- &lib.RequestObject{Body: body, Header: r.Header} - //io.WriteString(w, "ok") var rsp = CustomResponse{Code: 200, Message: "OK"} json.NewEncoder(w).Encode(rsp) + metrics.EventHandlerTimeHist.WithLabelValues(JSON, SUCCESS).Observe(float64(utils.NanosToMillis(time.Now().UnixNano()) - eventHandlerStartTime)) } func healthHandler(w http.ResponseWriter, r *http.Request) { diff --git a/utils/time.go b/utils/time.go new file mode 100644 index 0000000..80f348f --- /dev/null +++ b/utils/time.go @@ -0,0 +1,7 @@ +package utils + +import "time" + +func NanosToMillis(timestamp int64) int64 { + return timestamp * (int64(time.Nanosecond) / int64(time.Millisecond)) +}