Files
janus/lib/RequestHandler.go

99 lines
2.2 KiB
Go

package lib
import (
"context"
"encoding/json"
"net/http"
"time"
metrics "com.navi.medici.janus/instrumentation"
producer_module "com.navi.medici.janus/producer"
"com.navi.medici.janus/utils"
"github.com/Shopify/sarama"
"go.uber.org/zap"
)
type RequestObject struct {
Body []byte
Header http.Header
}
type WorkerPool struct {
workers int
jobQueue chan RequestObject
logger *zap.Logger
}
func NewWorkerPool(workers int) *WorkerPool {
return &WorkerPool{
workers: workers,
jobQueue: make(chan RequestObject, workers),
logger: utils.GetLogger(),
}
}
func (wp *WorkerPool) Start(ctx context.Context) {
for i := 0; i < wp.workers; i++ {
go wp.worker(ctx)
}
}
func (wp *WorkerPool) worker(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case job := <-wp.jobQueue:
wp.processRequest(job)
}
}
}
func (wp *WorkerPool) AddJob(job RequestObject) {
wp.jobQueue <- job
}
func (wp *WorkerPool) processRequest(request RequestObject) {
defer func() {
if r := recover(); r != nil {
wp.logger.Error("Recovered from panic in processRequest", zap.Any("panic", r))
}
}()
eventProcessingStartTime := utils.NanosToMillis(time.Now().UnixNano())
messageBytes := request.Body
var result map[string]interface{}
if err := json.Unmarshal(messageBytes, &result); err != nil {
wp.logger.Error("Failed to unmarshal JSON",
zap.Error(err),
zap.String("messageBytes", string(messageBytes)),
zap.Int("bytesLength", len(messageBytes)))
return
}
source := getSource(result)
recordValue := messageBytes
message := &sarama.ProducerMessage{
Topic: producer_module.GetKafkaTopic(),
Value: sarama.ByteEncoder(recordValue),
}
metrics.RequestCounter.WithLabelValues(source).Inc()
metrics.EventProcessingTimeHist.WithLabelValues(producer_module.GetKafkaTopic(), source).Observe(float64(utils.NanosToMillis(time.Now().UnixNano()) - eventProcessingStartTime))
if err := producer_module.WriteMessageToKafkaAsync(message, source); err != nil {
wp.logger.Error("Failed to write message to Kafka", zap.Error(err), zap.String("source", source))
}
}
func getSource(event map[string]interface{}) string {
if event["source"] == nil {
return "UNKNOWN"
}
return event["source"].(string)
}