diff --git a/go.mod b/go.mod index ce1fd00..adef47d 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b // indirect github.com/golang/protobuf v1.4.1 // indirect github.com/gorilla/mux v1.8.0 + github.com/magiconair/properties v1.8.7 // indirect github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v0.9.3 github.com/riferrei/srclient v0.2.1 diff --git a/go.sum b/go.sum index 0fe7a3d..cdb09a7 100644 --- a/go.sum +++ b/go.sum @@ -169,6 +169,8 @@ github.com/linkedin/goavro/v2 v2.9.7 h1:Vd++Rb/RKcmNJjM0HP/JJFMEWa21eUBVKPYlKehO github.com/linkedin/goavro/v2 v2.9.7/go.mod h1:UgQUb2N/pmueQYH9bfqFioWxzYCZXSfF8Jw03O5sjqA= github.com/magiconair/properties v1.8.1 h1:ZC2Vc7/ZFkGmsVC9KvOjumD+G5lXy2RtTKyzRKO2BQ4= github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= +github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY= +github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0= github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= diff --git a/instrumentation/metrics.go b/instrumentation/metrics.go index 1c57b2a..6688316 100644 --- a/instrumentation/metrics.go +++ b/instrumentation/metrics.go @@ -5,23 +5,23 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" ) -var producerSuccessCounter = promauto.NewCounterVec( +var KafkaPushCounter = promauto.NewCounterVec( prometheus.CounterOpts{ - Name: "janus_producer_send_success", - Help: "Number of Success requests.", + Name: "janus_kafka_push_requests", + Help: "Number of Kafka Push requests", }, - []string{"source"}, + []string{"topic", "source"}, ) -var producerFailCounter = promauto.NewCounterVec( +var ProducerFailCounter = promauto.NewCounterVec( prometheus.CounterOpts{ Name: "janus_producer_send_fail", Help: "Number of failed requests.", }, - []string{"source"}, + []string{"topic"}, ) -var requestCounter = promauto.NewCounterVec( +var RequestCounter = promauto.NewCounterVec( prometheus.CounterOpts{ Name: "janus_total_requests", Help: "Number of get requests.", @@ -29,13 +29,26 @@ var requestCounter = promauto.NewCounterVec( []string{"source"}, ) +var EventProcessingTimeHist = promauto.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "janus_event_processing_time_ms", + Help: "Time taken to process event", + }, + []string{"topic", "source"}, +) -func IncrementCounter(metricName string, path string) { - if metricName == "success" { - producerSuccessCounter.WithLabelValues(path).Inc() - } else if metricName == "fail" { - producerFailCounter.WithLabelValues(path).Inc() - } else if metricName == "request" { - requestCounter.WithLabelValues(path).Inc() - } -} +var EventHandlerTimeHist = promauto.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "janus_event_handler_time_ms", + Help: "Time taken to receive request, send it to processing and return response", + }, + []string{"event_type", "status"}, +) + +var KafkaPushTimeHist = promauto.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "janus_kafka_push_time_ms", + Help: "Time taken to write event to Kafka", + }, + []string{"topic", "source"}, +) diff --git a/lib/RequestHandler.go b/lib/RequestHandler.go index 68b494d..41df64e 100644 --- a/lib/RequestHandler.go +++ b/lib/RequestHandler.go @@ -3,9 +3,10 @@ package lib import ( metrics "com.navi.medici.janus/instrumentation" producer_module "com.navi.medici.janus/producer" - + "com.navi.medici.janus/utils" "encoding/json" "net/http" + "time" "github.com/Shopify/sarama" ) @@ -28,7 +29,7 @@ func ProcessJsonRequestChannel(topic string) { } func ClickstreamJsonEventHandler(request RequestObject, topic string) { - + eventProcessingStartTime := utils.NanosToMillis(time.Now().UnixNano()) messageBytes := request.Body //getting the client which has sent this event @@ -55,7 +56,9 @@ func ClickstreamJsonEventHandler(request RequestObject, topic string) { Value: sarama.ByteEncoder(recordValue), } - metrics.IncrementCounter("request", source) + metrics.RequestCounter.WithLabelValues(source).Inc() + // processing complete, record duration metrics + metrics.EventProcessingTimeHist.WithLabelValues(topic, source).Observe(float64(utils.NanosToMillis(time.Now().UnixNano()) - eventProcessingStartTime)) producer_module.WriteMessageToKafkaAsync(message, source) } diff --git a/producer/kafka_writer.go b/producer/kafka_writer.go index a6cfa86..9bb3b81 100644 --- a/producer/kafka_writer.go +++ b/producer/kafka_writer.go @@ -2,17 +2,24 @@ package producer import ( metrics "com.navi.medici.janus/instrumentation" + "com.navi.medici.janus/utils" "github.com/Shopify/sarama" "go.uber.org/zap" + "time" ) func WriteMessageToKafkaAsync(message *sarama.ProducerMessage, source string) { - select { - case asyncProducer.Input() <- message: - metrics.IncrementCounter("success", source) - logger.Debug("SUCCESSFULLY WRITTEN TO KAFKA FOR SOURCE " + source) - case err := <-asyncProducer.Errors(): - metrics.IncrementCounter("fail", source) - logger.Error("FAILED TO WRITE TO KAFKA FOR SOURCE "+source, zap.Error(err)) + beforeKafkaPushTime := utils.NanosToMillis(time.Now().UnixNano()) + asyncProducer.Input() <- message + metrics.KafkaPushTimeHist.WithLabelValues(message.Topic, source).Observe(float64(utils.NanosToMillis(time.Now().UnixNano()) - beforeKafkaPushTime)) + metrics.KafkaPushCounter.WithLabelValues(message.Topic, source).Inc() + logger.Debug("SUCCESSFULLY WRITTEN TO KAFKA FOR SOURCE " + source) +} + +func processProducerErrors() { + for { + err := <-asyncProducer.Errors() + logger.Error("Failed to write message to Kafka", zap.Error(err.Err)) + metrics.ProducerFailCounter.WithLabelValues(err.Msg.Topic).Inc() } } diff --git a/producer/producer_config.go b/producer/producer_client.go similarity index 99% rename from producer/producer_config.go rename to producer/producer_client.go index 9ae7808..8c4b9ad 100644 --- a/producer/producer_config.go +++ b/producer/producer_client.go @@ -91,6 +91,8 @@ func GetAsyncProducer(kafkaConfiguration config.KafkaConfigurations, env string) if err != nil { panic(err) } + + go processProducerErrors() return producer } diff --git a/server/handlers.go b/server/handlers.go index dcb9f33..c68e5cb 100644 --- a/server/handlers.go +++ b/server/handlers.go @@ -1,20 +1,27 @@ package server import ( + metrics "com.navi.medici.janus/instrumentation" "com.navi.medici.janus/lib" - "io" - - // "log" + "com.navi.medici.janus/utils" "compress/gzip" "encoding/json" + "io" "io/ioutil" "net/http" + "time" ) var ( healthyBool bool = true ) +const ( + JSON = "json" + SUCCESS = "success" + ERROR = "error" +) + type NewSchemaRequest struct { Topic string `json:"topic"` Schema string `json:"schema"` @@ -27,6 +34,7 @@ type CustomResponse struct { } func eventsHandlerJson(w http.ResponseWriter, r *http.Request) { + eventHandlerStartTime := utils.NanosToMillis(time.Now().UnixNano()) var reader io.Reader // check if body is gzip compressed if r.Header.Get("Content-Encoding") == "gzip" { @@ -36,6 +44,7 @@ func eventsHandlerJson(w http.ResponseWriter, r *http.Request) { // log.Printf(err) w.Header().Set("Content-Type", "application/json") http.Error(w, "Error while decompressing GZIP payload", http.StatusBadRequest) + metrics.EventHandlerTimeHist.WithLabelValues(JSON, ERROR).Observe(float64(utils.NanosToMillis(time.Now().UnixNano()) - eventHandlerStartTime)) return } } else { @@ -47,14 +56,15 @@ func eventsHandlerJson(w http.ResponseWriter, r *http.Request) { // log.Printf(err) w.Header().Set("Content-Type", "application/json") http.Error(w, "Request body invalid", http.StatusBadRequest) + metrics.EventHandlerTimeHist.WithLabelValues(JSON, ERROR).Observe(float64(utils.NanosToMillis(time.Now().UnixNano()) - eventHandlerStartTime)) return } w.Header().Set("Content-Type", "application/json") lib.JsonRequestChannel <- &lib.RequestObject{Body: body, Header: r.Header} - //io.WriteString(w, "ok") var rsp = CustomResponse{Code: 200, Message: "OK"} json.NewEncoder(w).Encode(rsp) + metrics.EventHandlerTimeHist.WithLabelValues(JSON, SUCCESS).Observe(float64(utils.NanosToMillis(time.Now().UnixNano()) - eventHandlerStartTime)) } func healthHandler(w http.ResponseWriter, r *http.Request) { diff --git a/utils/time.go b/utils/time.go new file mode 100644 index 0000000..80f348f --- /dev/null +++ b/utils/time.go @@ -0,0 +1,7 @@ +package utils + +import "time" + +func NanosToMillis(timestamp int64) int64 { + return timestamp * (int64(time.Nanosecond) / int64(time.Millisecond)) +}