Files
alfred-be/alfred/cmd/ingester/app/server.go
2026-03-08 16:14:42 +05:30

173 lines
6.5 KiB
Go

package app
import (
"alfred/cmd/ingester/app/handler"
"alfred/cmd/ingester/app/validation"
"alfred/config"
"alfred/internal/clients"
"alfred/internal/metrics"
kafka "alfred/pkg/kafka/produce"
"alfred/pkg/log"
"alfred/pkg/s3"
"alfred/repository"
"alfred/repositoryAccessLayer"
"alfred/utils"
"fmt"
"github.com/gin-gonic/gin"
httpTrace "github.com/navi-commons/go-tracer/http"
"go.elastic.co/apm/module/apmgin/v2"
"go.uber.org/zap"
"net/http"
"strconv"
"time"
)
type Server struct {
gin *gin.Engine
producer kafka.KProducer
repositories *repositoryAccessLayer.RepositoryAccessLayer
s3Handler *handler.S3Handler
httpClient *clients.HttpClient
metricsMiddleware *gin.RouterGroup
}
func NewServer(gin *gin.Engine) *Server {
httpClient := clients.NewHttpClient(config.GetIngesterConfig().HttpConfig)
kP := kafka.NewKProducer(config.GetIngesterConfig().BaseConfig.Env, config.GetIngesterConfig().KafkaConfig.BaseConfig)
s3Client := s3.NewS3Client()
esConfig := config.GetIngesterConfig().ElasticSearchConfig.BaseConfig
repositories := repository.InitRepositories(esConfig)
repositoryAccessLayer := repositoryAccessLayer.InitRepositoryAccessLayer(repositories)
return &Server{
gin: gin,
producer: kP,
repositories: repositoryAccessLayer,
s3Handler: handler.NewS3Handler(s3Client),
httpClient: httpClient,
}
}
func (s *Server) Handler() {
if config.GetIngesterConfig().TracerConfig.APMEnabled {
s.gin.Use(httpTrace.TracerGinMiddleware())
}
s.metricsMiddleware = s.createMetricMiddleware()
s.healthCheckHandler()
s.gin.Use(apmgin.Middleware(s.gin))
s.s3AppPresignHandler()
s.gin.Use(s.corsHandler())
s.ingesterHandler()
metrics.AdminHandler(config.GetIngesterConfig().BaseConfig.MetricPort)
s.cruiseControlHandler()
}
func (s *Server) createMetricMiddleware() *gin.RouterGroup {
return s.gin.Group("", func(c *gin.Context) {
startTime := time.Now()
c.Next()
endTime := float64(time.Since(startTime))
status := strconv.Itoa(c.Writer.Status())
metrics.AlfredApiRequestCounter.WithLabelValues(c.FullPath(), c.Request.Method, status).Inc()
metrics.AlfredApiRequestLatencySum.WithLabelValues(c.FullPath(), c.Request.Method, status).Add(endTime)
metrics.AlfredApiRequestLatencyHistogram.WithLabelValues(c.FullPath(), c.Request.Method, status).Observe(endTime)
metrics.AlfredApiRequestLatencySummary.WithLabelValues(c.FullPath(), c.Request.Method, status).Observe(endTime)
})
}
func (s *Server) s3HandlerMetricMiddleware(requestUrl string) *gin.RouterGroup {
return s.gin.Group(utils.EMPTY, func(c *gin.Context) {
startTime := time.Now()
c.Next()
endTime := float64(time.Since(startTime))
status := strconv.Itoa(c.Writer.Status())
metrics.AlfredApiRequestCounter.WithLabelValues(requestUrl, c.Request.Method, status).Inc()
metrics.AlfredApiRequestLatencySum.WithLabelValues(requestUrl, c.Request.Method, status).Add(endTime)
metrics.AlfredApiRequestLatencyHistogram.WithLabelValues(requestUrl, c.Request.Method, status).Observe(endTime)
metrics.AlfredApiRequestLatencySummary.WithLabelValues(requestUrl, c.Request.Method, status).Observe(endTime)
})
}
func (s *Server) cruiseControlHandler() {
cruiseMiddleware := validation.RateControl(utils.CRUISE_CONTROL)
cruiseV2Middleware := validation.RateControl(utils.CRUISE_CONTROL_V2)
ccc := handler.NewCruiseControlHandler(s.repositories, s.httpClient)
s.metricsMiddleware.GET(utils.CRUISE_CONTROL, cruiseMiddleware, ccc.FetchCruiseControlConfig)
s.metricsMiddleware.GET(utils.CRUISE_CONTROL_V2, cruiseV2Middleware, ccc.FetchCruiseControlConfigV2)
s.metricsMiddleware.POST("/invalidate/cache", ccc.InvalidateCruiseCache)
}
func (s *Server) ingesterHandler() {
ingestWebSessionV2Middleware := validation.RateControl(utils.WEB_SESSIONS_V2)
ih := handler.NewIngesterHandler(s.producer, s.repositories, s.httpClient)
s.metricsMiddleware.POST("/ingest/event", ih.IngestEvent)
s.metricsMiddleware.POST("/ingest/session", ih.IngestSessionRecordingEvent)
s.metricsMiddleware.POST("/ingest/metrics", ih.IngestMetrics)
s.metricsMiddleware.POST("/v2/ingest/metrics", ih.IngestMetricsV2)
s.metricsMiddleware.POST("/v2/ingest/session", ih.IngestSessionRecordingEventV2)
s.metricsMiddleware.POST("/v2/ingest/event", ih.IngestEventV2)
s.gin.POST(utils.WEB_SESSIONS_V2, ingestWebSessionV2Middleware, ih.IngestWebSessionRecordingEventV2)
s.metricsMiddleware.POST("/ingest/app/crashes", ih.IngestEventV2)
s.gin.GET("/ingest/web/session/pre-sign/:fileName", s.s3Handler.FetchPreSignedUploadUrlV2)
}
func (s *Server) corsHandler() gin.HandlerFunc {
return func(c *gin.Context) {
whitelistedDomains := getWhitelistedDomains()
c.Writer.Header().Set("Access-Control-Allow-Methods", "POST, OPTIONS, GET, PUT")
c.Writer.Header().Set("Access-Control-Allow-Headers", config.NewCorsConfig().AllowedCustomHeaders)
origin := c.Request.Header.Get("Origin")
if !whitelistedDomains[origin] {
c.AbortWithStatus(http.StatusUnauthorized)
return
}
c.Writer.Header().Set("Access-Control-Allow-Origin", origin)
if c.Request.Method == "OPTIONS" {
c.AbortWithStatus(http.StatusOK)
return
}
startTime := time.Now()
c.Next()
endTime := float64(time.Since(startTime))
status := strconv.Itoa(c.Writer.Status())
metrics.AlfredApiRequestCounter.WithLabelValues(c.FullPath(), c.Request.Method, status).Inc()
metrics.AlfredApiRequestLatencySum.WithLabelValues(c.FullPath(), c.Request.Method, status).Add(endTime)
metrics.AlfredApiRequestLatencyHistogram.WithLabelValues(c.FullPath(), c.Request.Method, status).Observe(endTime)
metrics.AlfredApiRequestLatencySummary.WithLabelValues(c.FullPath(), c.Request.Method, status).Observe(endTime)
}
}
func getWhitelistedDomains() map[string]bool {
allowedList := make(map[string]bool)
domains := config.NewCorsConfig().WhitelistedDomains
for _, domain := range domains {
domainLocal := domain
allowedList[domainLocal] = true
}
return allowedList
}
func (s *Server) s3AppPresignHandler() {
s.s3HandlerMetricMiddleware("/ingest/session/pre-sign").GET("/ingest/session/pre-sign/:sessionId", s.s3Handler.FetchPresignedUploadURL)
s.s3HandlerMetricMiddleware("/v2/ingest/session/pre-sign").GET("/v2/ingest/session/pre-sign/:fileName", s.s3Handler.FetchPreSignedUploadUrlV2)
}
func (s *Server) Start() {
log.Info("starting alfred ingester server",
zap.String("port", strconv.Itoa(config.GetIngesterConfig().BaseConfig.Port)))
s.gin.Run(fmt.Sprintf(":%v", config.GetIngesterConfig().BaseConfig.Port))
}
func (s *Server) healthCheckHandler() {
s.metricsMiddleware.GET("/ping", func(c *gin.Context) {
c.String(http.StatusOK, "pong")
})
}