added producer metrics, modified kafka write logic
This commit is contained in:
1
go.mod
1
go.mod
@@ -8,6 +8,7 @@ require (
|
||||
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b // indirect
|
||||
github.com/golang/protobuf v1.4.1 // indirect
|
||||
github.com/gorilla/mux v1.8.0
|
||||
github.com/magiconair/properties v1.8.7 // indirect
|
||||
github.com/pkg/errors v0.9.1
|
||||
github.com/prometheus/client_golang v0.9.3
|
||||
github.com/riferrei/srclient v0.2.1
|
||||
|
||||
2
go.sum
2
go.sum
@@ -169,6 +169,8 @@ github.com/linkedin/goavro/v2 v2.9.7 h1:Vd++Rb/RKcmNJjM0HP/JJFMEWa21eUBVKPYlKehO
|
||||
github.com/linkedin/goavro/v2 v2.9.7/go.mod h1:UgQUb2N/pmueQYH9bfqFioWxzYCZXSfF8Jw03O5sjqA=
|
||||
github.com/magiconair/properties v1.8.1 h1:ZC2Vc7/ZFkGmsVC9KvOjumD+G5lXy2RtTKyzRKO2BQ4=
|
||||
github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
|
||||
github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY=
|
||||
github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0=
|
||||
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
|
||||
github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
|
||||
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
|
||||
|
||||
@@ -5,23 +5,31 @@ import (
|
||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||
)
|
||||
|
||||
var producerSuccessCounter = promauto.NewCounterVec(
|
||||
var KafkaPushCounter = promauto.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Name: "janus_kafka_push_requests",
|
||||
Help: "Number of Kafka Push requests",
|
||||
},
|
||||
[]string{"topic", "source"},
|
||||
)
|
||||
|
||||
var ProducerSuccessCounter = promauto.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Name: "janus_producer_send_success",
|
||||
Help: "Number of Success requests.",
|
||||
},
|
||||
[]string{"source"},
|
||||
[]string{"topic"},
|
||||
)
|
||||
|
||||
var producerFailCounter = promauto.NewCounterVec(
|
||||
var ProducerFailCounter = promauto.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Name: "janus_producer_send_fail",
|
||||
Help: "Number of failed requests.",
|
||||
},
|
||||
[]string{"source"},
|
||||
[]string{"topic"},
|
||||
)
|
||||
|
||||
var requestCounter = promauto.NewCounterVec(
|
||||
var RequestCounter = promauto.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Name: "janus_total_requests",
|
||||
Help: "Number of get requests.",
|
||||
@@ -29,13 +37,26 @@ var requestCounter = promauto.NewCounterVec(
|
||||
[]string{"source"},
|
||||
)
|
||||
|
||||
var EventProcessingTimeHist = promauto.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Name: "janus_event_processing_time_ms",
|
||||
Help: "Time taken to process event",
|
||||
},
|
||||
[]string{"topic", "source"},
|
||||
)
|
||||
|
||||
func IncrementCounter(metricName string, path string) {
|
||||
if metricName == "success" {
|
||||
producerSuccessCounter.WithLabelValues(path).Inc()
|
||||
} else if metricName == "fail" {
|
||||
producerFailCounter.WithLabelValues(path).Inc()
|
||||
} else if metricName == "request" {
|
||||
requestCounter.WithLabelValues(path).Inc()
|
||||
}
|
||||
}
|
||||
var EventHandlerTimeHist = promauto.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Name: "janus_event_handler_time_ms",
|
||||
Help: "Time taken to receive request, send it to processing and return response",
|
||||
},
|
||||
[]string{"event_type", "status"},
|
||||
)
|
||||
|
||||
var KafkaPushTimeHist = promauto.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Name: "janus_kafka_push_time_ms",
|
||||
Help: "Time taken to write event to Kafka",
|
||||
},
|
||||
[]string{"topic", "source"},
|
||||
)
|
||||
|
||||
@@ -4,6 +4,8 @@ import (
|
||||
metrics "com.navi.medici.janus/instrumentation"
|
||||
producer_module "com.navi.medici.janus/producer"
|
||||
"com.navi.medici.janus/schema"
|
||||
"com.navi.medici.janus/utils"
|
||||
"time"
|
||||
|
||||
"encoding/binary"
|
||||
"encoding/json"
|
||||
@@ -66,12 +68,12 @@ func ClickstreamProtobufEventHandler(request RequestObject, topic string) {
|
||||
Topic: topic,
|
||||
Value: sarama.ByteEncoder(recordValue),
|
||||
}
|
||||
metrics.IncrementCounter("request", source)
|
||||
metrics.RequestCounter.WithLabelValues(source).Inc()
|
||||
producer_module.WriteMessageToKafkaAsync(message, source)
|
||||
}
|
||||
|
||||
func ClickstreamJsonEventHandler(request RequestObject, topic string) {
|
||||
|
||||
eventProcessingStartTime := utils.NanosToMillis(time.Now().UnixNano())
|
||||
messageBytes := request.Body
|
||||
|
||||
//getting the client which has sent this event
|
||||
@@ -98,6 +100,8 @@ func ClickstreamJsonEventHandler(request RequestObject, topic string) {
|
||||
Value: sarama.ByteEncoder(recordValue),
|
||||
}
|
||||
|
||||
metrics.IncrementCounter("request", source)
|
||||
metrics.RequestCounter.WithLabelValues(source).Inc()
|
||||
// processing complete, record duration metrics
|
||||
metrics.EventProcessingTimeHist.WithLabelValues(topic, source).Observe(float64(utils.NanosToMillis(time.Now().UnixNano()) - eventProcessingStartTime))
|
||||
producer_module.WriteMessageToKafkaAsync(message, source)
|
||||
}
|
||||
|
||||
@@ -1,19 +1,33 @@
|
||||
package producer
|
||||
|
||||
import (
|
||||
metrics "com.navi.medici.janus/instrumentation"
|
||||
"log"
|
||||
"github.com/Shopify/sarama"
|
||||
metrics "com.navi.medici.janus/instrumentation"
|
||||
"com.navi.medici.janus/utils"
|
||||
"github.com/Shopify/sarama"
|
||||
"go.uber.org/zap"
|
||||
"time"
|
||||
)
|
||||
|
||||
|
||||
func WriteMessageToKafkaAsync(message *sarama.ProducerMessage, source string) {
|
||||
select {
|
||||
case asyncProducer.Input() <- message:
|
||||
metrics.IncrementCounter("success", source)
|
||||
// log.Printf("SUCCESSFULLY WRITTEN TO KAFKA FOR SOURCE %v", source)
|
||||
case err := <- asyncProducer.Errors():
|
||||
metrics.IncrementCounter("fail", source)
|
||||
log.Printf("FAILED TO WRITE TO KAFKA FOR SOURCE %v %v", source, err)
|
||||
}
|
||||
beforeKafkaPushTime := utils.NanosToMillis(time.Now().UnixNano())
|
||||
asyncProducer.Input() <- message
|
||||
metrics.KafkaPushTimeHist.WithLabelValues(message.Topic, source).Observe(float64(utils.NanosToMillis(time.Now().UnixNano()) - beforeKafkaPushTime))
|
||||
metrics.KafkaPushCounter.WithLabelValues(message.Topic, source).Inc()
|
||||
logger.Info("SUCCESSFULLY WRITTEN TO KAFKA FOR SOURCE " + source)
|
||||
}
|
||||
|
||||
func processProducerSuccesses() {
|
||||
for {
|
||||
msg := <-asyncProducer.Successes()
|
||||
metrics.ProducerSuccessCounter.WithLabelValues(msg.Topic).Inc()
|
||||
}
|
||||
}
|
||||
|
||||
func processProducerErrors() {
|
||||
for {
|
||||
err := <-asyncProducer.Errors()
|
||||
logger.Error("Failed to write message to Kafka", zap.Error(err.Err))
|
||||
metrics.ProducerFailCounter.WithLabelValues(err.Msg.Topic).Inc()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -94,6 +94,9 @@ func GetAsyncProducer(kafkaConfiguration config.KafkaConfigurations, env string)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
go processProducerSuccesses()
|
||||
go processProducerErrors()
|
||||
return producer
|
||||
}
|
||||
|
||||
@@ -1,8 +1,11 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
metrics "com.navi.medici.janus/instrumentation"
|
||||
"com.navi.medici.janus/lib"
|
||||
"com.navi.medici.janus/schema"
|
||||
"com.navi.medici.janus/utils"
|
||||
"time"
|
||||
|
||||
"compress/gzip"
|
||||
"encoding/json"
|
||||
@@ -16,6 +19,13 @@ var (
|
||||
healthyBool bool = true
|
||||
)
|
||||
|
||||
const (
|
||||
JSON = "json"
|
||||
PROTO = "proto"
|
||||
SUCCESS = "success"
|
||||
ERROR = "error"
|
||||
)
|
||||
|
||||
type NewSchemaRequest struct {
|
||||
Topic string `json:"topic"`
|
||||
Schema string `json:"schema"`
|
||||
@@ -65,6 +75,7 @@ func eventsHandler(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
func eventsHandlerJson(w http.ResponseWriter, r *http.Request) {
|
||||
eventHandlerStartTime := utils.NanosToMillis(time.Now().UnixNano())
|
||||
var reader io.Reader
|
||||
// check if body is gzip compressed
|
||||
if r.Header.Get("Content-Encoding") == "gzip" {
|
||||
@@ -74,6 +85,7 @@ func eventsHandlerJson(w http.ResponseWriter, r *http.Request) {
|
||||
// log.Printf(err)
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
http.Error(w, "Error while decompressing GZIP payload", http.StatusBadRequest)
|
||||
metrics.EventHandlerTimeHist.WithLabelValues(JSON, ERROR).Observe(float64(utils.NanosToMillis(time.Now().UnixNano()) - eventHandlerStartTime))
|
||||
return
|
||||
}
|
||||
} else {
|
||||
@@ -85,14 +97,15 @@ func eventsHandlerJson(w http.ResponseWriter, r *http.Request) {
|
||||
// log.Printf(err)
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
http.Error(w, "Request body invalid", http.StatusBadRequest)
|
||||
metrics.EventHandlerTimeHist.WithLabelValues(JSON, ERROR).Observe(float64(utils.NanosToMillis(time.Now().UnixNano()) - eventHandlerStartTime))
|
||||
return
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
lib.JsonRequestChannel <- &lib.RequestObject{Body: body, Header: r.Header}
|
||||
//io.WriteString(w, "ok")
|
||||
var rsp = CustomResponse{Code: 200, Message: "OK"}
|
||||
json.NewEncoder(w).Encode(rsp)
|
||||
metrics.EventHandlerTimeHist.WithLabelValues(JSON, SUCCESS).Observe(float64(utils.NanosToMillis(time.Now().UnixNano()) - eventHandlerStartTime))
|
||||
}
|
||||
|
||||
func healthHandler(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
7
utils/time.go
Normal file
7
utils/time.go
Normal file
@@ -0,0 +1,7 @@
|
||||
package utils
|
||||
|
||||
import "time"
|
||||
|
||||
func NanosToMillis(timestamp int64) int64 {
|
||||
return timestamp * (int64(time.Nanosecond) / int64(time.Millisecond))
|
||||
}
|
||||
Reference in New Issue
Block a user