Merge pull request #37 from navi-ppl/TP-55555/kafka-fix
Tp 55555/kafka fix
This commit is contained in:
@@ -45,9 +45,8 @@ kafka:
|
||||
password: KAFKA_PASSWORD
|
||||
username: KAFKA_USERNAME
|
||||
brokers: KAFKA_BROKERS
|
||||
group:
|
||||
names: KAFKA_GROUP
|
||||
topics: KAFKA_TOPICS
|
||||
group: KAFKA_GROUP
|
||||
topic: KAFKA_TOPIC
|
||||
tls:
|
||||
insecureSkipVerify: true
|
||||
enabled: false
|
||||
|
||||
@@ -9,8 +9,8 @@ type KafkaConfig struct {
|
||||
Brokers []string
|
||||
Username string
|
||||
Password string
|
||||
Group map[string]string
|
||||
Topics map[string]string
|
||||
Group string
|
||||
Topic string
|
||||
TlsInsureSkipVerification bool
|
||||
TlsEnabled bool
|
||||
SaslEnabled bool
|
||||
@@ -25,8 +25,8 @@ func NewKafkaConfig() *KafkaConfig {
|
||||
Brokers: strings.Split(viper.GetString("kafka.brokers"), ","),
|
||||
Username: viper.GetString("kafka.username"),
|
||||
Password: viper.GetString("kafka.password"),
|
||||
Group: viper.GetStringMapString("kafka.group.names"),
|
||||
Topics: viper.GetStringMapString("kafka.topics"),
|
||||
Group: viper.GetString("kafka.group.names"),
|
||||
Topic: viper.GetString("kafka.topic"),
|
||||
TlsInsureSkipVerification: viper.GetBool("kafka.tls.insecureSkipVerify"),
|
||||
SaslEnabled: viper.GetBool("kafka.sasl.enabled"),
|
||||
TlsEnabled: viper.GetBool("kafka.tls.enabled"),
|
||||
@@ -36,7 +36,3 @@ func NewKafkaConfig() *KafkaConfig {
|
||||
Mechanism: viper.GetString("kafka.sasl.mechanism"),
|
||||
}
|
||||
}
|
||||
|
||||
func (kc *KafkaConfig) GetTopic(topic string) string {
|
||||
return kc.Topics[topic]
|
||||
}
|
||||
|
||||
@@ -73,7 +73,7 @@ func InitDependencies() *Dependencies {
|
||||
projectServiceClient := service.NewProjectCreator(logger, dbClient, s3Client, kafkaProducer)
|
||||
sourceMapServiceClient := service.NewSourceMapService(dbClient, s3Client, configs.GetAWSConfig())
|
||||
releaseServiceClient := service.NewReleaseService(logger, dbClient)
|
||||
exceptionServiceClient := service.NewExceptionService(logger, dbClient, kafkaProducer, cacheClient)
|
||||
exceptionServiceClient := service.NewExceptionService(logger, dbClient, kafkaProducer, cacheClient, *configs.GetKafkaConfig())
|
||||
searchServiceClient := service.NewSearchService(logger, elasticSearch)
|
||||
authService := service.NewAuthService(mjolnirClient)
|
||||
houstonService := service.NewHoustonService(logger, dbClient, kafkaProducer, houstonClient)
|
||||
|
||||
@@ -2,6 +2,7 @@ package service
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"cybertron/configs"
|
||||
"cybertron/models/db"
|
||||
"cybertron/models/instrumentation"
|
||||
inPodCache "cybertron/pkg/cache"
|
||||
@@ -22,6 +23,7 @@ type ExceptionService struct {
|
||||
dbClient *gorm.DB
|
||||
kafkaProducer producer.KProducer
|
||||
inPodCacheClient *inPodCache.Cache
|
||||
kafkaConfig configs.KafkaConfig
|
||||
}
|
||||
|
||||
type Frame struct {
|
||||
@@ -57,12 +59,13 @@ type Payload struct {
|
||||
Extra interface{} `json:"extra"`
|
||||
}
|
||||
|
||||
func NewExceptionService(logger *log.Logger, dbClient *gorm.DB, kafkaProducer producer.KProducer, cache *inPodCache.Cache) *ExceptionService {
|
||||
func NewExceptionService(logger *log.Logger, dbClient *gorm.DB, kafkaProducer producer.KProducer, cache *inPodCache.Cache, kafkaConfig configs.KafkaConfig) *ExceptionService {
|
||||
return &ExceptionService{
|
||||
logger: logger,
|
||||
dbClient: dbClient,
|
||||
kafkaProducer: kafkaProducer,
|
||||
inPodCacheClient: cache,
|
||||
kafkaConfig: kafkaConfig,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -108,12 +111,10 @@ func (exceptionService *ExceptionService) CatchErrors(c *gin.Context) {
|
||||
|
||||
for _, errorItem := range jsonData.Exception.Values {
|
||||
errorItem.ProjectId = projectID
|
||||
//todo update release id
|
||||
errorItem.ReleaseId = "release-1"
|
||||
errorItem.Breadcrumbs = jsonData.Breadcrumbs
|
||||
errorItem.Extra = jsonData.Extra
|
||||
errorItem.Request = jsonData.Request
|
||||
err := exceptionService.kafkaProducer.PublishEvent(errorItem, "kafka-stream", "", nil, encoder.JsonEncoderInstance)
|
||||
err := exceptionService.kafkaProducer.PublishEvent(errorItem, exceptionService.kafkaConfig.Topic, "", nil, encoder.JsonEncoderInstance)
|
||||
|
||||
if err != nil {
|
||||
fmt.Println("Failed to push error to kafka")
|
||||
|
||||
Reference in New Issue
Block a user