TP-55555 | kafka-fix

This commit is contained in:
varnit goyal
2024-10-24 16:43:54 +05:30
parent 1cd660785b
commit 8409ab6a47
3 changed files with 7 additions and 12 deletions

View File

@@ -45,9 +45,8 @@ kafka:
password: KAFKA_PASSWORD
username: KAFKA_USERNAME
brokers: KAFKA_BROKERS
group:
names: KAFKA_GROUP
topics: KAFKA_TOPICS
group: KAFKA_GROUP
topic: KAFKA_TOPIC
tls:
insecureSkipVerify: true
enabled: false

View File

@@ -9,8 +9,8 @@ type KafkaConfig struct {
Brokers []string
Username string
Password string
Group map[string]string
Topics map[string]string
Group string
Topic string
TlsInsureSkipVerification bool
TlsEnabled bool
SaslEnabled bool
@@ -25,8 +25,8 @@ func NewKafkaConfig() *KafkaConfig {
Brokers: strings.Split(viper.GetString("kafka.brokers"), ","),
Username: viper.GetString("kafka.username"),
Password: viper.GetString("kafka.password"),
Group: viper.GetStringMapString("kafka.group.names"),
Topics: viper.GetStringMapString("kafka.topics"),
Group: viper.GetString("kafka.group.names"),
Topic: viper.GetString("kafka.topic"),
TlsInsureSkipVerification: viper.GetBool("kafka.tls.insecureSkipVerify"),
SaslEnabled: viper.GetBool("kafka.sasl.enabled"),
TlsEnabled: viper.GetBool("kafka.tls.enabled"),
@@ -36,7 +36,3 @@ func NewKafkaConfig() *KafkaConfig {
Mechanism: viper.GetString("kafka.sasl.mechanism"),
}
}
func (kc *KafkaConfig) GetTopic(topic string) string {
return kc.Topics[topic]
}

View File

@@ -114,7 +114,7 @@ func (exceptionService *ExceptionService) CatchErrors(c *gin.Context) {
errorItem.Breadcrumbs = jsonData.Breadcrumbs
errorItem.Extra = jsonData.Extra
errorItem.Request = jsonData.Request
err := exceptionService.kafkaProducer.PublishEvent(errorItem, exceptionService.kafkaConfig.Topics["cybertron"], "", nil, encoder.JsonEncoderInstance)
err := exceptionService.kafkaProducer.PublishEvent(errorItem, exceptionService.kafkaConfig.Topic, "", nil, encoder.JsonEncoderInstance)
if err != nil {
fmt.Println("Failed to push error to kafka")