From 1cd660785b558556294fd807dda4785cf610627e Mon Sep 17 00:00:00 2001 From: varnit goyal Date: Thu, 24 Oct 2024 15:56:09 +0530 Subject: [PATCH] TP-55555 | kafka-fix --- internal/dependencies/dependencies.go | 2 +- service/ExceptionService.go | 9 +++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/internal/dependencies/dependencies.go b/internal/dependencies/dependencies.go index f31253e..8aa3f83 100644 --- a/internal/dependencies/dependencies.go +++ b/internal/dependencies/dependencies.go @@ -73,7 +73,7 @@ func InitDependencies() *Dependencies { projectServiceClient := service.NewProjectCreator(logger, dbClient, s3Client, kafkaProducer) sourceMapServiceClient := service.NewSourceMapService(dbClient, s3Client, configs.GetAWSConfig()) releaseServiceClient := service.NewReleaseService(logger, dbClient) - exceptionServiceClient := service.NewExceptionService(logger, dbClient, kafkaProducer, cacheClient) + exceptionServiceClient := service.NewExceptionService(logger, dbClient, kafkaProducer, cacheClient, *configs.GetKafkaConfig()) searchServiceClient := service.NewSearchService(logger, elasticSearch) authService := service.NewAuthService(mjolnirClient) houstonService := service.NewHoustonService(logger, dbClient, kafkaProducer, houstonClient) diff --git a/service/ExceptionService.go b/service/ExceptionService.go index b20c8ca..dfa62c7 100644 --- a/service/ExceptionService.go +++ b/service/ExceptionService.go @@ -2,6 +2,7 @@ package service import ( "bufio" + "cybertron/configs" "cybertron/models/db" "cybertron/models/instrumentation" inPodCache "cybertron/pkg/cache" @@ -22,6 +23,7 @@ type ExceptionService struct { dbClient *gorm.DB kafkaProducer producer.KProducer inPodCacheClient *inPodCache.Cache + kafkaConfig configs.KafkaConfig } type Frame struct { @@ -57,12 +59,13 @@ type Payload struct { Extra interface{} `json:"extra"` } -func NewExceptionService(logger *log.Logger, dbClient *gorm.DB, kafkaProducer producer.KProducer, cache *inPodCache.Cache) *ExceptionService { +func NewExceptionService(logger *log.Logger, dbClient *gorm.DB, kafkaProducer producer.KProducer, cache *inPodCache.Cache, kafkaConfig configs.KafkaConfig) *ExceptionService { return &ExceptionService{ logger: logger, dbClient: dbClient, kafkaProducer: kafkaProducer, inPodCacheClient: cache, + kafkaConfig: kafkaConfig, } } @@ -108,12 +111,10 @@ func (exceptionService *ExceptionService) CatchErrors(c *gin.Context) { for _, errorItem := range jsonData.Exception.Values { errorItem.ProjectId = projectID - //todo update release id - errorItem.ReleaseId = "release-1" errorItem.Breadcrumbs = jsonData.Breadcrumbs errorItem.Extra = jsonData.Extra errorItem.Request = jsonData.Request - err := exceptionService.kafkaProducer.PublishEvent(errorItem, "kafka-stream", "", nil, encoder.JsonEncoderInstance) + err := exceptionService.kafkaProducer.PublishEvent(errorItem, exceptionService.kafkaConfig.Topics["cybertron"], "", nil, encoder.JsonEncoderInstance) if err != nil { fmt.Println("Failed to push error to kafka")