package lib import ( "context" "encoding/json" "net/http" "strings" "time" metrics "com.navi.medici.janus/instrumentation" producer_module "com.navi.medici.janus/producer" "com.navi.medici.janus/utils" "github.com/Shopify/sarama" "go.uber.org/zap" ) type RequestObject struct { Body []byte Header http.Header } type WorkerPool struct { workers int jobQueue chan RequestObject logger *zap.Logger } func NewWorkerPool(workers int) *WorkerPool { return &WorkerPool{ workers: workers, jobQueue: make(chan RequestObject, workers), logger: utils.GetLogger(), } } func (wp *WorkerPool) Start(ctx context.Context) { for i := 0; i < wp.workers; i++ { go wp.worker(ctx) } } func (wp *WorkerPool) worker(ctx context.Context) { for { select { case <-ctx.Done(): return case job := <-wp.jobQueue: wp.processRequest(job) } } } func (wp *WorkerPool) AddJob(job RequestObject) { wp.jobQueue <- job } func (wp *WorkerPool) processRequest(request RequestObject) { defer func() { if r := recover(); r != nil { wp.logger.Error("Recovered from panic in processRequest", zap.Any("panic", r)) } }() eventProcessingStartTime := utils.NanosToMillis(time.Now().UnixNano()) messageBytes := request.Body var result map[string]interface{} if err := json.Unmarshal(messageBytes, &result); err != nil { wp.logger.Error("Failed to unmarshal JSON", zap.Error(err), zap.String("client_addr", utils.GetClientIP(request.Header.Get("X-Forwarded-For"), "")), zap.String("messageBytes", string(messageBytes)), zap.Int("bytesLength", len(messageBytes))) return } source := getSource(result) recordValue := messageBytes topics := getTopicsFromSource(source) // Send to all designated topics for _, topic := range topics { message := &sarama.ProducerMessage{ Topic: topic, Value: sarama.ByteEncoder(recordValue), } metrics.RequestCounter.WithLabelValues(source).Inc() metrics.EventProcessingTimeHist.WithLabelValues(topic, source).Observe(float64(utils.NanosToMillis(time.Now().UnixNano()) - eventProcessingStartTime)) if err := producer_module.WriteMessageToKafkaAsync(message, source); err != nil { wp.logger.Error("Failed to write message to Kafka", zap.Error(err), zap.String("source", source), zap.String("topic", topic)) } } } func getTopicsFromSource(source string) []string { switch strings.ToUpper(source) { case "LITMUS": return []string{producer_module.GetLitmusClickstreamTopic()} case "SYNCTIMER", "SYNCEVENTTASK", "IOSPULSETIMER": // TODO : remove default topic for events from app source once users // have migrated their pipelines to new topic return []string{ producer_module.GetAppClickstreamTopic(), producer_module.GetDefaultClickstreamTopic(), } default: return []string{producer_module.GetDefaultClickstreamTopic()} } } func getSource(event map[string]interface{}) string { if event["source"] == nil { return "UNKNOWN" } return event["source"].(string) }