diff --git a/lib/RequestHandler.go b/lib/RequestHandler.go index a1c732f..f894d11 100644 --- a/lib/RequestHandler.go +++ b/lib/RequestHandler.go @@ -4,6 +4,7 @@ import ( metrics "com.navi.medici.janus/instrumentation" producer_module "com.navi.medici.janus/producer" "com.navi.medici.janus/schema" + "com.navi.medici.janus/utils" "encoding/binary" "encoding/json" @@ -42,7 +43,7 @@ func ClickstreamProtobufEventHandler(request RequestObject, topic string) { //getting the client which has sent this event var result map[string]interface{} json.Unmarshal(messageBytes, &result) - source := result["source"].(string) + source := getSource(result) //log.Print(source) // to be of the format [magicByte] + [schemaID] + [messageIndex] + [value] recordValue := []byte{} @@ -77,7 +78,9 @@ func ClickstreamJsonEventHandler(request RequestObject, topic string) { //getting the client which has sent this event var result map[string]interface{} json.Unmarshal(messageBytes, &result) - source := result["source"].(string) + source := getSource(result) + logger := utils.GetLogger() + logger.Debug("Source: " + source) // to be of the format [magicByte] + [schemaID] + [messageIndex] + [value] recordValue := []byte{} @@ -101,3 +104,10 @@ func ClickstreamJsonEventHandler(request RequestObject, topic string) { metrics.IncrementCounter("request", source) producer_module.WriteMessageToKafkaAsync(message, source) } + +func getSource(event map[string]interface{}) string { + if event["source"] == nil { + event["source"] = "UNKNOWN" + } + return event["source"].(string) +} diff --git a/main.go b/main.go index 5f9d771..eb70fb9 100644 --- a/main.go +++ b/main.go @@ -8,6 +8,7 @@ import ( "com.navi.medici.janus/server" "com.navi.medici.janus/utils" "go.uber.org/zap" + "log" "github.com/spf13/viper" ) @@ -21,11 +22,11 @@ var ( const metrics_port = "4000" func init() { - utils.InitializeLogger() - logger = utils.GetLogger() configs := getConfigs() + utils.InitializeLogger(configs.Env) + logger = utils.GetLogger() port = configs.Server.Port - logger.Debug("PORT IS: %v" + port) + logger.Debug("PORT IS: " + port) logger.Debug(configs.Kafka.Bootstrap_Servers) logger.Debug(configs.SchemaRegistry.Endpoint) schema.InitializeSchemaHandler(configs) @@ -48,12 +49,12 @@ func getConfigs() config.Configurations { var configuration config.Configurations if err := viper.ReadInConfig(); err != nil { - logger.Fatal("Error reading config file, %v", zap.String("error", err.Error())) + log.Fatalf("Error reading config file, %s\n", err) } err := viper.Unmarshal(&configuration) if err != nil { - logger.Fatal("Unable to decode into struct, %v", zap.String("error", err.Error())) + log.Fatalf("Unable to decode into struct, %v\n", err) } // Following coinfigurations read from environment variables diff --git a/utils/logger.go b/utils/logger.go index 9445d33..6b41676 100644 --- a/utils/logger.go +++ b/utils/logger.go @@ -7,9 +7,13 @@ import ( var logger *zap.Logger -func InitializeLogger() { +func InitializeLogger(env string) { var init_err error - logger, init_err = zap.NewProduction() + if env == "dev" { + logger, init_err = zap.NewDevelopment() + } else { + logger, init_err = zap.NewProduction() + } if init_err != nil { log.Fatal("failed to initialize logger") }