Merge pull request #23 from medici/DE-1113-producer-metrics
Added producer metrics, updated kafka push logic
This commit is contained in:
1
go.mod
1
go.mod
@@ -8,6 +8,7 @@ require (
|
||||
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b // indirect
|
||||
github.com/golang/protobuf v1.4.1 // indirect
|
||||
github.com/gorilla/mux v1.8.0
|
||||
github.com/magiconair/properties v1.8.7 // indirect
|
||||
github.com/pkg/errors v0.9.1
|
||||
github.com/prometheus/client_golang v0.9.3
|
||||
github.com/riferrei/srclient v0.2.1
|
||||
|
||||
2
go.sum
2
go.sum
@@ -169,6 +169,8 @@ github.com/linkedin/goavro/v2 v2.9.7 h1:Vd++Rb/RKcmNJjM0HP/JJFMEWa21eUBVKPYlKehO
|
||||
github.com/linkedin/goavro/v2 v2.9.7/go.mod h1:UgQUb2N/pmueQYH9bfqFioWxzYCZXSfF8Jw03O5sjqA=
|
||||
github.com/magiconair/properties v1.8.1 h1:ZC2Vc7/ZFkGmsVC9KvOjumD+G5lXy2RtTKyzRKO2BQ4=
|
||||
github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
|
||||
github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY=
|
||||
github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0=
|
||||
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
|
||||
github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
|
||||
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
|
||||
|
||||
@@ -5,23 +5,23 @@ import (
|
||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||
)
|
||||
|
||||
var producerSuccessCounter = promauto.NewCounterVec(
|
||||
var KafkaPushCounter = promauto.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Name: "janus_producer_send_success",
|
||||
Help: "Number of Success requests.",
|
||||
Name: "janus_kafka_push_requests",
|
||||
Help: "Number of Kafka Push requests",
|
||||
},
|
||||
[]string{"source"},
|
||||
[]string{"topic", "source"},
|
||||
)
|
||||
|
||||
var producerFailCounter = promauto.NewCounterVec(
|
||||
var ProducerFailCounter = promauto.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Name: "janus_producer_send_fail",
|
||||
Help: "Number of failed requests.",
|
||||
},
|
||||
[]string{"source"},
|
||||
[]string{"topic"},
|
||||
)
|
||||
|
||||
var requestCounter = promauto.NewCounterVec(
|
||||
var RequestCounter = promauto.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Name: "janus_total_requests",
|
||||
Help: "Number of get requests.",
|
||||
@@ -29,13 +29,26 @@ var requestCounter = promauto.NewCounterVec(
|
||||
[]string{"source"},
|
||||
)
|
||||
|
||||
var EventProcessingTimeHist = promauto.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Name: "janus_event_processing_time_ms",
|
||||
Help: "Time taken to process event",
|
||||
},
|
||||
[]string{"topic", "source"},
|
||||
)
|
||||
|
||||
func IncrementCounter(metricName string, path string) {
|
||||
if metricName == "success" {
|
||||
producerSuccessCounter.WithLabelValues(path).Inc()
|
||||
} else if metricName == "fail" {
|
||||
producerFailCounter.WithLabelValues(path).Inc()
|
||||
} else if metricName == "request" {
|
||||
requestCounter.WithLabelValues(path).Inc()
|
||||
}
|
||||
}
|
||||
var EventHandlerTimeHist = promauto.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Name: "janus_event_handler_time_ms",
|
||||
Help: "Time taken to receive request, send it to processing and return response",
|
||||
},
|
||||
[]string{"event_type", "status"},
|
||||
)
|
||||
|
||||
var KafkaPushTimeHist = promauto.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Name: "janus_kafka_push_time_ms",
|
||||
Help: "Time taken to write event to Kafka",
|
||||
},
|
||||
[]string{"topic", "source"},
|
||||
)
|
||||
|
||||
@@ -3,9 +3,10 @@ package lib
|
||||
import (
|
||||
metrics "com.navi.medici.janus/instrumentation"
|
||||
producer_module "com.navi.medici.janus/producer"
|
||||
|
||||
"com.navi.medici.janus/utils"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/Shopify/sarama"
|
||||
)
|
||||
@@ -28,7 +29,7 @@ func ProcessJsonRequestChannel(topic string) {
|
||||
}
|
||||
|
||||
func ClickstreamJsonEventHandler(request RequestObject, topic string) {
|
||||
|
||||
eventProcessingStartTime := utils.NanosToMillis(time.Now().UnixNano())
|
||||
messageBytes := request.Body
|
||||
|
||||
//getting the client which has sent this event
|
||||
@@ -55,7 +56,9 @@ func ClickstreamJsonEventHandler(request RequestObject, topic string) {
|
||||
Value: sarama.ByteEncoder(recordValue),
|
||||
}
|
||||
|
||||
metrics.IncrementCounter("request", source)
|
||||
metrics.RequestCounter.WithLabelValues(source).Inc()
|
||||
// processing complete, record duration metrics
|
||||
metrics.EventProcessingTimeHist.WithLabelValues(topic, source).Observe(float64(utils.NanosToMillis(time.Now().UnixNano()) - eventProcessingStartTime))
|
||||
producer_module.WriteMessageToKafkaAsync(message, source)
|
||||
}
|
||||
|
||||
|
||||
@@ -2,17 +2,24 @@ package producer
|
||||
|
||||
import (
|
||||
metrics "com.navi.medici.janus/instrumentation"
|
||||
"com.navi.medici.janus/utils"
|
||||
"github.com/Shopify/sarama"
|
||||
"go.uber.org/zap"
|
||||
"time"
|
||||
)
|
||||
|
||||
func WriteMessageToKafkaAsync(message *sarama.ProducerMessage, source string) {
|
||||
select {
|
||||
case asyncProducer.Input() <- message:
|
||||
metrics.IncrementCounter("success", source)
|
||||
logger.Debug("SUCCESSFULLY WRITTEN TO KAFKA FOR SOURCE " + source)
|
||||
case err := <-asyncProducer.Errors():
|
||||
metrics.IncrementCounter("fail", source)
|
||||
logger.Error("FAILED TO WRITE TO KAFKA FOR SOURCE "+source, zap.Error(err))
|
||||
beforeKafkaPushTime := utils.NanosToMillis(time.Now().UnixNano())
|
||||
asyncProducer.Input() <- message
|
||||
metrics.KafkaPushTimeHist.WithLabelValues(message.Topic, source).Observe(float64(utils.NanosToMillis(time.Now().UnixNano()) - beforeKafkaPushTime))
|
||||
metrics.KafkaPushCounter.WithLabelValues(message.Topic, source).Inc()
|
||||
logger.Debug("SUCCESSFULLY WRITTEN TO KAFKA FOR SOURCE " + source)
|
||||
}
|
||||
|
||||
func processProducerErrors() {
|
||||
for {
|
||||
err := <-asyncProducer.Errors()
|
||||
logger.Error("Failed to write message to Kafka", zap.Error(err.Err))
|
||||
metrics.ProducerFailCounter.WithLabelValues(err.Msg.Topic).Inc()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -91,6 +91,8 @@ func GetAsyncProducer(kafkaConfiguration config.KafkaConfigurations, env string)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
go processProducerErrors()
|
||||
return producer
|
||||
}
|
||||
|
||||
@@ -1,20 +1,27 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
metrics "com.navi.medici.janus/instrumentation"
|
||||
"com.navi.medici.janus/lib"
|
||||
"io"
|
||||
|
||||
// "log"
|
||||
"com.navi.medici.janus/utils"
|
||||
"compress/gzip"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"time"
|
||||
)
|
||||
|
||||
var (
|
||||
healthyBool bool = true
|
||||
)
|
||||
|
||||
const (
|
||||
JSON = "json"
|
||||
SUCCESS = "success"
|
||||
ERROR = "error"
|
||||
)
|
||||
|
||||
type NewSchemaRequest struct {
|
||||
Topic string `json:"topic"`
|
||||
Schema string `json:"schema"`
|
||||
@@ -27,6 +34,7 @@ type CustomResponse struct {
|
||||
}
|
||||
|
||||
func eventsHandlerJson(w http.ResponseWriter, r *http.Request) {
|
||||
eventHandlerStartTime := utils.NanosToMillis(time.Now().UnixNano())
|
||||
var reader io.Reader
|
||||
// check if body is gzip compressed
|
||||
if r.Header.Get("Content-Encoding") == "gzip" {
|
||||
@@ -36,6 +44,7 @@ func eventsHandlerJson(w http.ResponseWriter, r *http.Request) {
|
||||
// log.Printf(err)
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
http.Error(w, "Error while decompressing GZIP payload", http.StatusBadRequest)
|
||||
metrics.EventHandlerTimeHist.WithLabelValues(JSON, ERROR).Observe(float64(utils.NanosToMillis(time.Now().UnixNano()) - eventHandlerStartTime))
|
||||
return
|
||||
}
|
||||
} else {
|
||||
@@ -47,14 +56,15 @@ func eventsHandlerJson(w http.ResponseWriter, r *http.Request) {
|
||||
// log.Printf(err)
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
http.Error(w, "Request body invalid", http.StatusBadRequest)
|
||||
metrics.EventHandlerTimeHist.WithLabelValues(JSON, ERROR).Observe(float64(utils.NanosToMillis(time.Now().UnixNano()) - eventHandlerStartTime))
|
||||
return
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
lib.JsonRequestChannel <- &lib.RequestObject{Body: body, Header: r.Header}
|
||||
//io.WriteString(w, "ok")
|
||||
var rsp = CustomResponse{Code: 200, Message: "OK"}
|
||||
json.NewEncoder(w).Encode(rsp)
|
||||
metrics.EventHandlerTimeHist.WithLabelValues(JSON, SUCCESS).Observe(float64(utils.NanosToMillis(time.Now().UnixNano()) - eventHandlerStartTime))
|
||||
}
|
||||
|
||||
func healthHandler(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
7
utils/time.go
Normal file
7
utils/time.go
Normal file
@@ -0,0 +1,7 @@
|
||||
package utils
|
||||
|
||||
import "time"
|
||||
|
||||
func NanosToMillis(timestamp int64) int64 {
|
||||
return timestamp * (int64(time.Nanosecond) / int64(time.Millisecond))
|
||||
}
|
||||
Reference in New Issue
Block a user