diff --git a/configs/application.yml b/configs/application.yml index edb0503..b43b5be 100644 --- a/configs/application.yml +++ b/configs/application.yml @@ -45,9 +45,8 @@ kafka: password: KAFKA_PASSWORD username: KAFKA_USERNAME brokers: KAFKA_BROKERS - group: - names: KAFKA_GROUP - topics: KAFKA_TOPICS + group: KAFKA_GROUP + topic: KAFKA_TOPIC tls: insecureSkipVerify: true enabled: false diff --git a/configs/kafka.go b/configs/kafka.go index 5b488f2..ac24958 100644 --- a/configs/kafka.go +++ b/configs/kafka.go @@ -9,8 +9,8 @@ type KafkaConfig struct { Brokers []string Username string Password string - Group map[string]string - Topics map[string]string + Group string + Topic string TlsInsureSkipVerification bool TlsEnabled bool SaslEnabled bool @@ -25,8 +25,8 @@ func NewKafkaConfig() *KafkaConfig { Brokers: strings.Split(viper.GetString("kafka.brokers"), ","), Username: viper.GetString("kafka.username"), Password: viper.GetString("kafka.password"), - Group: viper.GetStringMapString("kafka.group.names"), - Topics: viper.GetStringMapString("kafka.topics"), + Group: viper.GetString("kafka.group.names"), + Topic: viper.GetString("kafka.topic"), TlsInsureSkipVerification: viper.GetBool("kafka.tls.insecureSkipVerify"), SaslEnabled: viper.GetBool("kafka.sasl.enabled"), TlsEnabled: viper.GetBool("kafka.tls.enabled"), @@ -36,7 +36,3 @@ func NewKafkaConfig() *KafkaConfig { Mechanism: viper.GetString("kafka.sasl.mechanism"), } } - -func (kc *KafkaConfig) GetTopic(topic string) string { - return kc.Topics[topic] -} diff --git a/internal/dependencies/dependencies.go b/internal/dependencies/dependencies.go index f31253e..8aa3f83 100644 --- a/internal/dependencies/dependencies.go +++ b/internal/dependencies/dependencies.go @@ -73,7 +73,7 @@ func InitDependencies() *Dependencies { projectServiceClient := service.NewProjectCreator(logger, dbClient, s3Client, kafkaProducer) sourceMapServiceClient := service.NewSourceMapService(dbClient, s3Client, configs.GetAWSConfig()) releaseServiceClient := service.NewReleaseService(logger, dbClient) - exceptionServiceClient := service.NewExceptionService(logger, dbClient, kafkaProducer, cacheClient) + exceptionServiceClient := service.NewExceptionService(logger, dbClient, kafkaProducer, cacheClient, *configs.GetKafkaConfig()) searchServiceClient := service.NewSearchService(logger, elasticSearch) authService := service.NewAuthService(mjolnirClient) houstonService := service.NewHoustonService(logger, dbClient, kafkaProducer, houstonClient) diff --git a/service/ExceptionService.go b/service/ExceptionService.go index b20c8ca..63eb72e 100644 --- a/service/ExceptionService.go +++ b/service/ExceptionService.go @@ -2,6 +2,7 @@ package service import ( "bufio" + "cybertron/configs" "cybertron/models/db" "cybertron/models/instrumentation" inPodCache "cybertron/pkg/cache" @@ -22,6 +23,7 @@ type ExceptionService struct { dbClient *gorm.DB kafkaProducer producer.KProducer inPodCacheClient *inPodCache.Cache + kafkaConfig configs.KafkaConfig } type Frame struct { @@ -57,12 +59,13 @@ type Payload struct { Extra interface{} `json:"extra"` } -func NewExceptionService(logger *log.Logger, dbClient *gorm.DB, kafkaProducer producer.KProducer, cache *inPodCache.Cache) *ExceptionService { +func NewExceptionService(logger *log.Logger, dbClient *gorm.DB, kafkaProducer producer.KProducer, cache *inPodCache.Cache, kafkaConfig configs.KafkaConfig) *ExceptionService { return &ExceptionService{ logger: logger, dbClient: dbClient, kafkaProducer: kafkaProducer, inPodCacheClient: cache, + kafkaConfig: kafkaConfig, } } @@ -108,12 +111,10 @@ func (exceptionService *ExceptionService) CatchErrors(c *gin.Context) { for _, errorItem := range jsonData.Exception.Values { errorItem.ProjectId = projectID - //todo update release id - errorItem.ReleaseId = "release-1" errorItem.Breadcrumbs = jsonData.Breadcrumbs errorItem.Extra = jsonData.Extra errorItem.Request = jsonData.Request - err := exceptionService.kafkaProducer.PublishEvent(errorItem, "kafka-stream", "", nil, encoder.JsonEncoderInstance) + err := exceptionService.kafkaProducer.PublishEvent(errorItem, exceptionService.kafkaConfig.Topic, "", nil, encoder.JsonEncoderInstance) if err != nil { fmt.Println("Failed to push error to kafka")