package server import ( "compress/gzip" "encoding/json" "io" "io/ioutil" "net/http" "strings" "time" metrics "com.navi.medici.janus/instrumentation" "com.navi.medici.janus/lib" "com.navi.medici.janus/utils" "go.uber.org/zap" ) var ( logger *zap.Logger healthyBool bool = true ) const ( JSON = "json" SUCCESS = "success" ERROR = "error" ) type CustomResponse struct { Code int `json:"code"` Message string `json:"message"` } func init() { logger = utils.GetLogger() } func eventsHandlerJson(workerPool *lib.WorkerPool) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { eventHandlerStartTime := utils.NanosToMillis(time.Now().UnixNano()) headerName := "X-Correlation-Id" substring := "abcdabcd-1234" headerValue := r.Header.Get(headerName) if strings.HasPrefix(headerValue, substring) { w.Header().Set("Content-Type", "application/json") rsp := CustomResponse{Code: 200, Message: "OK"} json.NewEncoder(w).Encode(rsp) logger.Info("Dropped blacklisted request", zap.String("correlationID", headerValue)) return } var reader io.Reader switch r.Header.Get("Content-Encoding") { case "gzip": var err error reader, err = gzip.NewReader(r.Body) if err != nil { logger.Error("Error decompressing GZIP payload", zap.Error(err)) w.Header().Set("Content-Type", "application/json") http.Error(w, "Error decompressing GZIP payload", http.StatusBadRequest) metrics.EventHandlerTimeHist.WithLabelValues(JSON, ERROR).Observe(float64(utils.NanosToMillis(time.Now().UnixNano()) - eventHandlerStartTime)) return } defer reader.(*gzip.Reader).Close() default: reader = r.Body } body, err := ioutil.ReadAll(reader) // todo Limit to 1MB if err != nil { logger.Error("Error reading request body", zap.Error(err), zap.String("client_addr", utils.GetClientIP(r.Header.Get("X-Forwarded-For"), r.RemoteAddr)), zap.String("user_agent", r.UserAgent()), zap.Int64("content_length", r.ContentLength), ) http.Error(w, "Request body invalid", http.StatusBadRequest) metrics.EventHandlerTimeHist.WithLabelValues(JSON, ERROR).Observe(float64(utils.NanosToMillis(time.Now().UnixNano()) - eventHandlerStartTime)) return } w.Header().Set("Content-Type", "application/json") workerPool.AddJob(lib.RequestObject{Body: body, Header: r.Header}) rsp := CustomResponse{Code: 200, Message: "OK"} if err := json.NewEncoder(w).Encode(rsp); err != nil { logger.Error("Error encoding response", zap.Error(err)) } metrics.EventHandlerTimeHist.WithLabelValues(JSON, SUCCESS).Observe(float64(utils.NanosToMillis(time.Now().UnixNano()) - eventHandlerStartTime)) } } func healthHandler(w http.ResponseWriter, r *http.Request) { if healthyBool { w.Header().Set("Content-Type", "application/json") io.WriteString(w, `{"status":"healthy"}`) } else { http.Error(w, `{"status":"unhealthy"}`, http.StatusServiceUnavailable) } }