TP-55555 | kafka-fix
This commit is contained in:
@@ -73,7 +73,7 @@ func InitDependencies() *Dependencies {
|
||||
projectServiceClient := service.NewProjectCreator(logger, dbClient, s3Client, kafkaProducer)
|
||||
sourceMapServiceClient := service.NewSourceMapService(dbClient, s3Client, configs.GetAWSConfig())
|
||||
releaseServiceClient := service.NewReleaseService(logger, dbClient)
|
||||
exceptionServiceClient := service.NewExceptionService(logger, dbClient, kafkaProducer, cacheClient)
|
||||
exceptionServiceClient := service.NewExceptionService(logger, dbClient, kafkaProducer, cacheClient, *configs.GetKafkaConfig())
|
||||
searchServiceClient := service.NewSearchService(logger, elasticSearch)
|
||||
authService := service.NewAuthService(mjolnirClient)
|
||||
houstonService := service.NewHoustonService(logger, dbClient, kafkaProducer, houstonClient)
|
||||
|
||||
@@ -2,6 +2,7 @@ package service
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"cybertron/configs"
|
||||
"cybertron/models/db"
|
||||
"cybertron/models/instrumentation"
|
||||
inPodCache "cybertron/pkg/cache"
|
||||
@@ -22,6 +23,7 @@ type ExceptionService struct {
|
||||
dbClient *gorm.DB
|
||||
kafkaProducer producer.KProducer
|
||||
inPodCacheClient *inPodCache.Cache
|
||||
kafkaConfig configs.KafkaConfig
|
||||
}
|
||||
|
||||
type Frame struct {
|
||||
@@ -57,12 +59,13 @@ type Payload struct {
|
||||
Extra interface{} `json:"extra"`
|
||||
}
|
||||
|
||||
func NewExceptionService(logger *log.Logger, dbClient *gorm.DB, kafkaProducer producer.KProducer, cache *inPodCache.Cache) *ExceptionService {
|
||||
func NewExceptionService(logger *log.Logger, dbClient *gorm.DB, kafkaProducer producer.KProducer, cache *inPodCache.Cache, kafkaConfig configs.KafkaConfig) *ExceptionService {
|
||||
return &ExceptionService{
|
||||
logger: logger,
|
||||
dbClient: dbClient,
|
||||
kafkaProducer: kafkaProducer,
|
||||
inPodCacheClient: cache,
|
||||
kafkaConfig: kafkaConfig,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -108,12 +111,10 @@ func (exceptionService *ExceptionService) CatchErrors(c *gin.Context) {
|
||||
|
||||
for _, errorItem := range jsonData.Exception.Values {
|
||||
errorItem.ProjectId = projectID
|
||||
//todo update release id
|
||||
errorItem.ReleaseId = "release-1"
|
||||
errorItem.Breadcrumbs = jsonData.Breadcrumbs
|
||||
errorItem.Extra = jsonData.Extra
|
||||
errorItem.Request = jsonData.Request
|
||||
err := exceptionService.kafkaProducer.PublishEvent(errorItem, "kafka-stream", "", nil, encoder.JsonEncoderInstance)
|
||||
err := exceptionService.kafkaProducer.PublishEvent(errorItem, exceptionService.kafkaConfig.Topics["cybertron"], "", nil, encoder.JsonEncoderInstance)
|
||||
|
||||
if err != nil {
|
||||
fmt.Println("Failed to push error to kafka")
|
||||
|
||||
Reference in New Issue
Block a user