diff --git a/main.go b/main.go index 178cf83..6830763 100644 --- a/main.go +++ b/main.go @@ -25,11 +25,11 @@ func init() { utils.InitializeLogger(configs.Env) logger = utils.GetLogger() port = configs.Server.Port - logger.Debug("PORT IS: " + port) - logger.Debug("Bootstrap Servers:" + configs.Kafka.Bootstrap_Servers) + logger.Debug("Service started on PORT: " + port) + logger.Debug("Kafka Bootstrap Servers: " + configs.Kafka.Bootstrap_Servers) schema.InitializeSchemaHandler(configs) producer_client.InitializeProducers(configs.Kafka, configs.Env) - + logger.Info("Producer Initialized, starting goroutines for event processing") for i := 0; i < configs.Server.Goroutines; i++ { go lib.ProcessJsonRequestChannel(configs.Kafka.Kafka_Topic_Json) } diff --git a/producer/kafka_writer.go b/producer/kafka_writer.go index 2d8e07b..a6cfa86 100644 --- a/producer/kafka_writer.go +++ b/producer/kafka_writer.go @@ -1,19 +1,18 @@ package producer import ( - metrics "com.navi.medici.janus/instrumentation" - "log" - "github.com/Shopify/sarama" + metrics "com.navi.medici.janus/instrumentation" + "github.com/Shopify/sarama" + "go.uber.org/zap" ) - func WriteMessageToKafkaAsync(message *sarama.ProducerMessage, source string) { - select { - case asyncProducer.Input() <- message: - metrics.IncrementCounter("success", source) -// log.Printf("SUCCESSFULLY WRITTEN TO KAFKA FOR SOURCE %v", source) - case err := <- asyncProducer.Errors(): - metrics.IncrementCounter("fail", source) - log.Printf("FAILED TO WRITE TO KAFKA FOR SOURCE %v %v", source, err) - } + select { + case asyncProducer.Input() <- message: + metrics.IncrementCounter("success", source) + logger.Debug("SUCCESSFULLY WRITTEN TO KAFKA FOR SOURCE " + source) + case err := <-asyncProducer.Errors(): + metrics.IncrementCounter("fail", source) + logger.Error("FAILED TO WRITE TO KAFKA FOR SOURCE "+source, zap.Error(err)) + } } diff --git a/utils/logger.go b/utils/logger.go index 6b41676..83ae8e7 100644 --- a/utils/logger.go +++ b/utils/logger.go @@ -9,7 +9,7 @@ var logger *zap.Logger func InitializeLogger(env string) { var init_err error - if env == "dev" { + if env == "DEV" { logger, init_err = zap.NewDevelopment() } else { logger, init_err = zap.NewProduction() @@ -20,5 +20,8 @@ func InitializeLogger(env string) { } func GetLogger() *zap.Logger { + if logger == nil { + InitializeLogger("PROD") + } return logger }