diff --git a/config/config.go b/config/config.go index 7d70d42..d198f7f 100644 --- a/config/config.go +++ b/config/config.go @@ -10,6 +10,8 @@ type Configurations struct { type ServerConfigurations struct { Port string + Goroutines int + Kafka_Topic string } diff --git a/config/config.yml b/config/config.yml index 8af79f8..6d13fcb 100644 --- a/config/config.yml +++ b/config/config.yml @@ -1,5 +1,7 @@ server: port: 8000 + goroutines: 1000 + kafka_topic: KAFKA_TOPIC kafka: bootstrap_servers: BOOTSTRAP_SERVERS # read from environment variable @@ -13,4 +15,4 @@ kafka: schemaRegistry: endpoint: SCHEMA_REGISTRY_ENDPOINT - topics: TOPICS + topics: SCHEMA_TOPICS diff --git a/lib/RequestHandler.go b/lib/RequestHandler.go index d42826c..dabcbd7 100644 --- a/lib/RequestHandler.go +++ b/lib/RequestHandler.go @@ -19,10 +19,10 @@ type RequestObject struct { Header http.Header } -func ProcessRequestChannel() { +func ProcessRequestChannel(topic string) { for { request := <- RequestChannel - ClickstreamProtobufEventHandler(*request, "testgo") + ClickstreamProtobufEventHandler(*request, topic) } } @@ -54,7 +54,7 @@ func ClickstreamProtobufEventHandler(request RequestObject, topic string) { } if message != nil { - log.Printf("WRITTEN TO KAFKA") + log.Printf("WRITING TO KAFKA") } producer_module.WriteMessageToKafkaAsync(message) diff --git a/main.go b/main.go index db44155..8d0004d 100644 --- a/main.go +++ b/main.go @@ -40,6 +40,7 @@ func init() { configuration.Kafka.Sasl_Password = viper.GetString(configuration.Kafka.Sasl_Password) configuration.SchemaRegistry.Endpoint = viper.GetString(configuration.SchemaRegistry.Endpoint) configuration.SchemaRegistry.Topics = viper.GetString(configuration.SchemaRegistry.Topics) + configuration.Server.Kafka_Topic = viper.GetString(configuration.Server.Kafka_Topic) port = configuration.Server.Port log.Printf("PORT IS: ", port) @@ -52,8 +53,8 @@ func init() { producer_module.InitializeProducers(configuration.Kafka) // TO DO: read number of goroutines from config - for i := 0; i < 10; i++ { - go lib.ProcessRequestChannel() + for i := 0; i < configuration.Server.Goroutines; i++ { + go lib.ProcessRequestChannel(configuration.Server.Kafka_Topic) } // sync producer using sarama