Files
janus/lib/RequestHandler.go

109 lines
2.5 KiB
Go
Raw Normal View History

2021-03-30 16:05:40 +05:30
package lib
import (
"context"
"encoding/json"
"net/http"
"strings"
2023-02-02 14:00:32 +05:30
"time"
2021-03-30 16:05:40 +05:30
metrics "com.navi.medici.janus/instrumentation"
producer_module "com.navi.medici.janus/producer"
"com.navi.medici.janus/utils"
2021-03-30 16:05:40 +05:30
"github.com/Shopify/sarama"
"go.uber.org/zap"
2021-03-30 16:05:40 +05:30
)
type RequestObject struct {
Body []byte
Header http.Header
2021-03-30 16:05:40 +05:30
}
type WorkerPool struct {
workers int
jobQueue chan RequestObject
logger *zap.Logger
}
func NewWorkerPool(workers int) *WorkerPool {
return &WorkerPool{
workers: workers,
jobQueue: make(chan RequestObject, workers),
logger: utils.GetLogger(),
}
}
func (wp *WorkerPool) Start(ctx context.Context) {
for i := 0; i < wp.workers; i++ {
go wp.worker(ctx)
}
}
func (wp *WorkerPool) worker(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case job := <-wp.jobQueue:
wp.processRequest(job)
}
}
2021-04-27 16:40:13 +05:30
}
func (wp *WorkerPool) AddJob(job RequestObject) {
wp.jobQueue <- job
}
func (wp *WorkerPool) processRequest(request RequestObject) {
defer func() {
if r := recover(); r != nil {
wp.logger.Error("Recovered from panic in processRequest", zap.Any("panic", r))
}
}()
eventProcessingStartTime := utils.NanosToMillis(time.Now().UnixNano())
messageBytes := request.Body
2021-04-27 16:40:13 +05:30
var result map[string]interface{}
if err := json.Unmarshal(messageBytes, &result); err != nil {
wp.logger.Error("Failed to unmarshal JSON",
zap.Error(err),
zap.String("client_addr", utils.GetClientIP(request.Header.Get("X-Forwarded-For"), "")),
zap.String("messageBytes", string(messageBytes)),
zap.Int("bytesLength", len(messageBytes)))
return
}
2021-04-27 16:40:13 +05:30
source := getSource(result)
2021-04-27 16:40:13 +05:30
recordValue := messageBytes
topicFromSource := getTopicFromSource(source)
2021-04-27 16:40:13 +05:30
message := &sarama.ProducerMessage{
Topic: topicFromSource,
Value: sarama.ByteEncoder(recordValue),
}
metrics.RequestCounter.WithLabelValues(source).Inc()
metrics.EventProcessingTimeHist.WithLabelValues(topicFromSource, source).Observe(float64(utils.NanosToMillis(time.Now().UnixNano()) - eventProcessingStartTime))
if err := producer_module.WriteMessageToKafkaAsync(message, source); err != nil {
wp.logger.Error("Failed to write message to Kafka", zap.Error(err), zap.String("source", source))
}
2021-03-30 16:05:40 +05:30
}
2021-04-27 16:40:13 +05:30
func getTopicFromSource(source string) string {
if strings.ToUpper(source) == "LITMUS" {
return producer_module.GetLitmusClickstreamTopic()
}
return producer_module.GetDefaultClickstreamTopic()
}
func getSource(event map[string]interface{}) string {
if event["source"] == nil {
return "UNKNOWN"
}
return event["source"].(string)
2021-03-30 16:05:40 +05:30
}