package main import ( "context" "log" "os" "os/signal" "syscall" "com.navi.medici.janus/config" "com.navi.medici.janus/lib" producer_client "com.navi.medici.janus/producer" "com.navi.medici.janus/server" "com.navi.medici.janus/utils" "github.com/spf13/viper" "go.uber.org/zap" ) var ( port string cors string logger *zap.Logger ) const METRICS_PORT = "4000" func init() { configs := getConfigs() utils.InitializeLogger(configs.Env) logger = utils.GetLogger() port = configs.Server.Port logger.Info("Service started on PORT: " + port) logger.Info("Kafka Bootstrap Servers: " + configs.Kafka.Bootstrap_Servers) producer_client.InitializeProducers(configs.Kafka, configs.Env) logger.Info("Producer Initialized") } func getConfigs() config.Configurations { viper.SetConfigName("config") viper.AddConfigPath("./config") viper.SetConfigType("yml") viper.AutomaticEnv() var configuration config.Configurations if err := viper.ReadInConfig(); err != nil { log.Fatalf("Error reading config file: %s", err) } err := viper.Unmarshal(&configuration) if err != nil { log.Fatalf("Unable to decode into struct: %v", err) } // Read configurations from environment variables configuration.Env = viper.GetString("ENVIRONMENT") configuration.Kafka.Bootstrap_Servers = viper.GetString("BOOTSTRAP_SERVERS") configuration.Kafka.Kafka_Topic_Json = viper.GetString("KAFKA_TOPIC_JSON") configuration.Kafka.Kafka_Topic_Protobuf = viper.GetString("KAFKA_TOPIC_PROTOBUF") configuration.Kafka.Sasl_User = viper.GetString("KAFKA_SASL_USER") configuration.Kafka.Sasl_Password = viper.GetString("KAFKA_SASL_PASSWORD") cors = viper.GetString("CORS_LIST") return configuration } func main() { defer logger.Sync() ctx, cancel := context.WithCancel(context.Background()) defer cancel() configs := getConfigs() // Initialize worker pool workerPool := lib.NewWorkerPool(configs.Server.Goroutines) workerPool.Start(ctx) // Initialize HTTP server httpServer, err := server.NewServer(port, cors, workerPool, logger) if err != nil { logger.Fatal("Unable to create HTTP server", zap.Error(err)) } // Initialize metrics server metricsServer, err := server.MetricServer(METRICS_PORT, logger) if err != nil { logger.Fatal("Unable to create Metric server", zap.Error(err)) } // Start servers httpServer.StartServer() metricsServer.StartServer() // Handle graceful shutdown sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) <-sigChan logger.Info("Shutting down servers...") // Stop servers httpServer.StopServer() metricsServer.StopServer() // Stop worker pool cancel() logger.Info("Servers shut down gracefully") }