TP-55555 | Exception handler to push errors to kafka

This commit is contained in:
Lokesh Dugar
2024-07-27 19:54:12 +05:30
parent 7a9241fb99
commit 16b42f1272
7 changed files with 122 additions and 19 deletions

View File

@@ -18,6 +18,7 @@ type AppConfig struct {
timezone string
httpConfig *HttpConfig
clientConfigs *ClientConfigs
kafkaConfig *KafkaConfig
}
type MigConfig struct {
@@ -41,6 +42,7 @@ func LoadConfig() {
timezone: getString("timezone", true),
clientConfigs: loadClientConfigs(),
httpConfig: NewHttpConfig(),
kafkaConfig: NewKafkaConfig(),
}
}
@@ -94,3 +96,7 @@ func readConfig() {
func GetHttpConfig() *HttpConfig {
return appConfig.httpConfig
}
func GetKafkaConfig() *KafkaConfig {
return appConfig.kafkaConfig
}

View File

@@ -7,6 +7,7 @@ import (
"cybertron/internal/transport/handler"
"cybertron/pkg/db"
httpclient "cybertron/pkg/httpClient"
"cybertron/pkg/kafka/producer"
"cybertron/pkg/log"
"cybertron/service"
"go.uber.org/zap"
@@ -22,15 +23,17 @@ type Dependencies struct {
}
type Service struct {
DocumentService *document.HttpClient
ProjectService *service.ProjectCreator
ReleaseService *service.ReleaseService
DocumentService *document.HttpClient
ProjectService *service.ProjectCreator
ReleaseService *service.ReleaseService
ExceptionService *service.ExceptionService
// Add your service here
}
type Handler struct {
ProjectHandler *handler.ProjectHandler
ReleaseHandler *handler.ReleasesHandler
ProjectHandler *handler.ProjectHandler
ReleaseHandler *handler.ReleasesHandler
ExceptionHandler *handler.ExceptionHandler
}
type Repositories struct {
@@ -43,12 +46,14 @@ func InitDependencies() *Dependencies {
repositories := initRepositories(dbClient)
logger := log.Log
httpClient := httpclient.NewHttpClient(*configs.GetHttpConfig())
kafkaProducer := initKafkaProducer()
documentServiceClient := document.NewDocumentServiceHttpClient(httpClient, logger, configs.GetDocumentServiceHttpClientConfigs())
projectServiceClient := service.NewProjectCreator(logger, dbClient)
releaseServiceClient := service.NewReleaseService(logger, dbClient)
exceptionServiceClient := service.NewExceptionService(logger, dbClient, kafkaProducer)
services := initServices(documentServiceClient, projectServiceClient, releaseServiceClient)
handlers := initHandlers(projectServiceClient, releaseServiceClient)
services := initServices(documentServiceClient, projectServiceClient, releaseServiceClient, exceptionServiceClient)
handlers := initHandlers(projectServiceClient, releaseServiceClient, exceptionServiceClient)
return &Dependencies{
Service: services,
@@ -59,11 +64,12 @@ func InitDependencies() *Dependencies {
}
}
func initServices(documentService *document.HttpClient, projectService *service.ProjectCreator, releaseService *service.ReleaseService) *Service {
func initServices(documentService *document.HttpClient, projectService *service.ProjectCreator, releaseService *service.ReleaseService, exceptionService *service.ExceptionService) *Service {
return &Service{
DocumentService: documentService,
ProjectService: projectService,
ReleaseService: releaseService,
DocumentService: documentService,
ProjectService: projectService,
ReleaseService: releaseService,
ExceptionService: exceptionService,
}
}
@@ -74,11 +80,18 @@ func initRepositories(dbClient *gorm.DB) *Repositories {
}
}
func initHandlers(projectService *service.ProjectCreator, releaseService *service.ReleaseService) *Handler {
func initHandlers(projectService *service.ProjectCreator, releaseService *service.ReleaseService, exceotionService *service.ExceptionService) *Handler {
projectHandler := handler.NewProjectHandler(projectService)
releaseHandler := handler.NewReleaseHandler(releaseService)
exceptionHandler := handler.NewExceptionHandler(exceotionService)
return &Handler{
ProjectHandler: projectHandler,
ReleaseHandler: releaseHandler,
ProjectHandler: projectHandler,
ReleaseHandler: releaseHandler,
ExceptionHandler: exceptionHandler,
}
}
func initKafkaProducer() producer.KProducer {
kafkaProducer := producer.NewKProducer(configs.GetEnv(), configs.GetKafkaConfig())
return kafkaProducer
}

View File

@@ -0,0 +1,20 @@
package handler
import (
"cybertron/service"
"github.com/gin-gonic/gin"
)
type ExceptionHandler struct {
exceptionService *service.ExceptionService
}
func (h *ExceptionHandler) CatchErrors(c *gin.Context) {
h.exceptionService.CatchErrors(c)
}
func NewExceptionHandler(es *service.ExceptionService) *ExceptionHandler {
return &ExceptionHandler{
exceptionService: es,
}
}

View File

@@ -0,0 +1,15 @@
package router
import (
"cybertron/internal/dependencies"
"cybertron/internal/transport/handler"
"github.com/gin-gonic/gin"
)
func ExceptionRouter(r *gin.Engine, dep *dependencies.Dependencies) {
exceptionHandler := handler.NewExceptionHandler(dep.Service.ExceptionService)
exceptionRouterGroup := r.Group("/api/v1")
{
exceptionRouterGroup.POST("/catch-errors", exceptionHandler.CatchErrors)
}
}

View File

@@ -32,6 +32,7 @@ func (s *Server) router() {
router.ReadinessRouter(s.gin)
router.ProjectRouter(s.gin, s.dependencies)
router.ReleasesRouter(s.gin, s.dependencies)
router.ExceptionRouter(s.gin, s.dependencies)
}
func (s *Server) Start() {

View File

@@ -0,0 +1,47 @@
package service
import (
"cybertron/configs"
"cybertron/pkg/encoder"
"cybertron/pkg/kafka/producer"
"cybertron/pkg/log"
"fmt"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
"gorm.io/gorm"
"net/http"
)
type ExceptionService struct {
logger *log.Logger
dbClient *gorm.DB
kafkaProducer producer.KProducer
}
func NewExceptionService(logger *log.Logger, dbClient *gorm.DB, kafkaProducer producer.KProducer) *ExceptionService {
return &ExceptionService{
logger: logger,
dbClient: dbClient,
kafkaProducer: kafkaProducer,
}
}
func (exceptionService *ExceptionService) CatchErrors(c *gin.Context) {
var errorsPayload []interface{}
if err := c.BindJSON(&errorsPayload); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid JSON payload"})
return
}
headerMap := make(map[string]string)
for _, errorItem := range errorsPayload {
err := exceptionService.kafkaProducer.PublishEvent(errorItem, configs.GetKafkaConfig().GetTopic("js-error-topic"), uuid.NewString(), headerMap, encoder.JsonEncoderInstance)
if err != nil {
fmt.Println("Failed to push error to kafka")
}
}
c.JSON(http.StatusOK, gin.H{"status": "success"})
}

View File

@@ -27,7 +27,8 @@ func NewReleaseService(logger *log.Logger, dbClient *gorm.DB) *ReleaseService {
}
}
func (rs *ReleaseService) AddRelease(c *gin.Context) {
// FIXME: This may not be requried now, can be used in source maps services only
func (releaseService *ReleaseService) AddRelease(c *gin.Context) {
var releaseBody ReleaseBody
if err := c.BindJSON(&releaseBody); err != nil {
c.JSON(http.StatusBadRequest, gin.H{
@@ -43,7 +44,7 @@ func (rs *ReleaseService) AddRelease(c *gin.Context) {
SourceMapUrl: releaseBody.SourceMapUrl,
}
if result := rs.dbClient.Create(&releaseToBeAdded); result.Error != nil {
if result := releaseService.dbClient.Create(&releaseToBeAdded); result.Error != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": result.Error.Error()})
return
}
@@ -53,13 +54,13 @@ func (rs *ReleaseService) AddRelease(c *gin.Context) {
})
}
func (rs *ReleaseService) GetReleases(c *gin.Context) {
func (releaseService *ReleaseService) GetReleases(c *gin.Context) {
projectRefId := c.Query("project_reference_id")
var releases []db.Release
rs.dbClient.Where("project_reference_id = ?", projectRefId).Find(&releases)
releaseService.dbClient.Where("project_reference_id = ?", projectRefId).Find(&releases)
if result := rs.dbClient.Find(&releases); result.Error != nil {
if result := releaseService.dbClient.Find(&releases); result.Error != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": result.Error.Error()})
return
}