From 1cd660785b558556294fd807dda4785cf610627e Mon Sep 17 00:00:00 2001 From: varnit goyal Date: Thu, 24 Oct 2024 15:56:09 +0530 Subject: [PATCH 1/2] TP-55555 | kafka-fix --- internal/dependencies/dependencies.go | 2 +- service/ExceptionService.go | 9 +++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/internal/dependencies/dependencies.go b/internal/dependencies/dependencies.go index f31253e..8aa3f83 100644 --- a/internal/dependencies/dependencies.go +++ b/internal/dependencies/dependencies.go @@ -73,7 +73,7 @@ func InitDependencies() *Dependencies { projectServiceClient := service.NewProjectCreator(logger, dbClient, s3Client, kafkaProducer) sourceMapServiceClient := service.NewSourceMapService(dbClient, s3Client, configs.GetAWSConfig()) releaseServiceClient := service.NewReleaseService(logger, dbClient) - exceptionServiceClient := service.NewExceptionService(logger, dbClient, kafkaProducer, cacheClient) + exceptionServiceClient := service.NewExceptionService(logger, dbClient, kafkaProducer, cacheClient, *configs.GetKafkaConfig()) searchServiceClient := service.NewSearchService(logger, elasticSearch) authService := service.NewAuthService(mjolnirClient) houstonService := service.NewHoustonService(logger, dbClient, kafkaProducer, houstonClient) diff --git a/service/ExceptionService.go b/service/ExceptionService.go index b20c8ca..dfa62c7 100644 --- a/service/ExceptionService.go +++ b/service/ExceptionService.go @@ -2,6 +2,7 @@ package service import ( "bufio" + "cybertron/configs" "cybertron/models/db" "cybertron/models/instrumentation" inPodCache "cybertron/pkg/cache" @@ -22,6 +23,7 @@ type ExceptionService struct { dbClient *gorm.DB kafkaProducer producer.KProducer inPodCacheClient *inPodCache.Cache + kafkaConfig configs.KafkaConfig } type Frame struct { @@ -57,12 +59,13 @@ type Payload struct { Extra interface{} `json:"extra"` } -func NewExceptionService(logger *log.Logger, dbClient *gorm.DB, kafkaProducer producer.KProducer, cache *inPodCache.Cache) *ExceptionService { +func NewExceptionService(logger *log.Logger, dbClient *gorm.DB, kafkaProducer producer.KProducer, cache *inPodCache.Cache, kafkaConfig configs.KafkaConfig) *ExceptionService { return &ExceptionService{ logger: logger, dbClient: dbClient, kafkaProducer: kafkaProducer, inPodCacheClient: cache, + kafkaConfig: kafkaConfig, } } @@ -108,12 +111,10 @@ func (exceptionService *ExceptionService) CatchErrors(c *gin.Context) { for _, errorItem := range jsonData.Exception.Values { errorItem.ProjectId = projectID - //todo update release id - errorItem.ReleaseId = "release-1" errorItem.Breadcrumbs = jsonData.Breadcrumbs errorItem.Extra = jsonData.Extra errorItem.Request = jsonData.Request - err := exceptionService.kafkaProducer.PublishEvent(errorItem, "kafka-stream", "", nil, encoder.JsonEncoderInstance) + err := exceptionService.kafkaProducer.PublishEvent(errorItem, exceptionService.kafkaConfig.Topics["cybertron"], "", nil, encoder.JsonEncoderInstance) if err != nil { fmt.Println("Failed to push error to kafka") From 8409ab6a47271f8ed3d427810c5f44c17707554a Mon Sep 17 00:00:00 2001 From: varnit goyal Date: Thu, 24 Oct 2024 16:43:54 +0530 Subject: [PATCH 2/2] TP-55555 | kafka-fix --- configs/application.yml | 5 ++--- configs/kafka.go | 12 ++++-------- service/ExceptionService.go | 2 +- 3 files changed, 7 insertions(+), 12 deletions(-) diff --git a/configs/application.yml b/configs/application.yml index edb0503..b43b5be 100644 --- a/configs/application.yml +++ b/configs/application.yml @@ -45,9 +45,8 @@ kafka: password: KAFKA_PASSWORD username: KAFKA_USERNAME brokers: KAFKA_BROKERS - group: - names: KAFKA_GROUP - topics: KAFKA_TOPICS + group: KAFKA_GROUP + topic: KAFKA_TOPIC tls: insecureSkipVerify: true enabled: false diff --git a/configs/kafka.go b/configs/kafka.go index 5b488f2..ac24958 100644 --- a/configs/kafka.go +++ b/configs/kafka.go @@ -9,8 +9,8 @@ type KafkaConfig struct { Brokers []string Username string Password string - Group map[string]string - Topics map[string]string + Group string + Topic string TlsInsureSkipVerification bool TlsEnabled bool SaslEnabled bool @@ -25,8 +25,8 @@ func NewKafkaConfig() *KafkaConfig { Brokers: strings.Split(viper.GetString("kafka.brokers"), ","), Username: viper.GetString("kafka.username"), Password: viper.GetString("kafka.password"), - Group: viper.GetStringMapString("kafka.group.names"), - Topics: viper.GetStringMapString("kafka.topics"), + Group: viper.GetString("kafka.group.names"), + Topic: viper.GetString("kafka.topic"), TlsInsureSkipVerification: viper.GetBool("kafka.tls.insecureSkipVerify"), SaslEnabled: viper.GetBool("kafka.sasl.enabled"), TlsEnabled: viper.GetBool("kafka.tls.enabled"), @@ -36,7 +36,3 @@ func NewKafkaConfig() *KafkaConfig { Mechanism: viper.GetString("kafka.sasl.mechanism"), } } - -func (kc *KafkaConfig) GetTopic(topic string) string { - return kc.Topics[topic] -} diff --git a/service/ExceptionService.go b/service/ExceptionService.go index dfa62c7..63eb72e 100644 --- a/service/ExceptionService.go +++ b/service/ExceptionService.go @@ -114,7 +114,7 @@ func (exceptionService *ExceptionService) CatchErrors(c *gin.Context) { errorItem.Breadcrumbs = jsonData.Breadcrumbs errorItem.Extra = jsonData.Extra errorItem.Request = jsonData.Request - err := exceptionService.kafkaProducer.PublishEvent(errorItem, exceptionService.kafkaConfig.Topics["cybertron"], "", nil, encoder.JsonEncoderInstance) + err := exceptionService.kafkaProducer.PublishEvent(errorItem, exceptionService.kafkaConfig.Topic, "", nil, encoder.JsonEncoderInstance) if err != nil { fmt.Println("Failed to push error to kafka")