diff --git a/config/config.go b/config/config.go index 2d1fe23..4841d7c 100644 --- a/config/config.go +++ b/config/config.go @@ -4,7 +4,6 @@ type Configurations struct { Env string Server ServerConfigurations Kafka KafkaConfigurations - SchemaRegistry SchemaRegistryConfigurations } type ServerConfigurations struct { @@ -22,8 +21,3 @@ type KafkaConfigurations struct { Kafka_Topic_Json string Kafka_Topic_Protobuf string } - -type SchemaRegistryConfigurations struct { - Endpoint string - Topics string -} diff --git a/config/config.yml b/config/config.yml index c1666d1..aabf5b5 100644 --- a/config/config.yml +++ b/config/config.yml @@ -12,7 +12,3 @@ kafka: sasl_password: KAFKA_SASL_PASSWORD # read from environment variable kafka_topic_json: KAFKA_TOPIC_JSON kafka_topic_protobuf: KAFKA_TOPIC_PROTOBUF - -schemaRegistry: - endpoint: SCHEMA_REGISTRY_ENDPOINT - topics: SCHEMA_TOPICS diff --git a/docker.env b/docker.env index f4bcbce..546df6e 100644 --- a/docker.env +++ b/docker.env @@ -1,4 +1,2 @@ BOOTSTRAP_SERVERS=docker.for.mac.localhost:9092 -SCHEMA_REGISTRY_ENDPOINT=http://docker.for.mac.localhost:8081 -SCHEMA_TOPICS=testgo KAFKA_TOPIC=testgo diff --git a/main.go b/main.go index 6830763..6813780 100644 --- a/main.go +++ b/main.go @@ -4,7 +4,6 @@ import ( "com.navi.medici.janus/config" "com.navi.medici.janus/lib" producer_client "com.navi.medici.janus/producer" - "com.navi.medici.janus/schema" "com.navi.medici.janus/server" "com.navi.medici.janus/utils" "github.com/spf13/viper" @@ -27,7 +26,6 @@ func init() { port = configs.Server.Port logger.Debug("Service started on PORT: " + port) logger.Debug("Kafka Bootstrap Servers: " + configs.Kafka.Bootstrap_Servers) - schema.InitializeSchemaHandler(configs) producer_client.InitializeProducers(configs.Kafka, configs.Env) logger.Info("Producer Initialized, starting goroutines for event processing") for i := 0; i < configs.Server.Goroutines; i++ { @@ -54,8 +52,6 @@ func getConfigs() config.Configurations { // Following coinfigurations read from environment variables configuration.Env = viper.GetString(configuration.Env) configuration.Kafka.Bootstrap_Servers = viper.GetString(configuration.Kafka.Bootstrap_Servers) - configuration.SchemaRegistry.Endpoint = viper.GetString(configuration.SchemaRegistry.Endpoint) - configuration.SchemaRegistry.Topics = viper.GetString(configuration.SchemaRegistry.Topics) configuration.Kafka.Kafka_Topic_Json = viper.GetString(configuration.Kafka.Kafka_Topic_Json) configuration.Kafka.Kafka_Topic_Protobuf = viper.GetString(configuration.Kafka.Kafka_Topic_Protobuf) diff --git a/schema/schema.json b/schema/schema.json deleted file mode 100644 index 4b2f648..0000000 --- a/schema/schema.json +++ /dev/null @@ -1,119 +0,0 @@ -{ - "type": "object", - "properties": { - "app": { - "type": "object", - "properties": { - "name": { - "type": "string" - }, - "version": { - "type": "string" - } - } - }, - "device": { - "type": "object", - "properties": { - "device_id": { - "type": "string" - }, - "advertising_id": { - "type": "string" - }, - "installation_id": { - "type": "string" - }, - "category": { - "type": "string" - }, - "height": { - "type": "string" - }, - "imei": { - "type": "string" - }, - "manufacturer": { - "type": "string" - }, - "model": { - "type": "string" - }, - "screen_resolution": { - "type": "string" - }, - "os": { - "type": "string" - }, - "os_version": { - "type": "string" - }, - "os_api_level": { - "type": "string" - }, - "rooted": { - "type": "string" - }, - "fingerprint": { - "type": "string" - } - } - }, - "network": { - "type": "object", - "properties": { - "carrier": { - "type": "string" - }, - "type": { - "type": "string" - }, - "ip": { - "type": "string" - } - } - }, - "location": { - "type": "object", - "properties": { - "latitude": { - "type": "string" - }, - "longitude": { - "type": "string" - } - } - }, - "referer": { - "type": "object", - "properties": { - "url": { - "type": "string" - } - } - }, - "user": { - "type": "object", - "properties": { - "customer_id": { - "type": "string" - } - } - }, - "events": { - "type": "array", - "items": { - "type": "object", - "properties": { - "event_type": { - "type": "string" - }, - "timestamp": { - "type": "number" - } - } - } - } - }, - "additionalProperties": false -} \ No newline at end of file diff --git a/schema/schema_util.go b/schema/schema_util.go deleted file mode 100644 index 25df622..0000000 --- a/schema/schema_util.go +++ /dev/null @@ -1,41 +0,0 @@ -package schema - -import ( - "com.navi.medici.janus/config" - "com.navi.medici.janus/utils" - "fmt" - "github.com/riferrei/srclient" - "go.uber.org/zap" - "strings" -) - -var ( - SchemaRegistryEndpoint string - TopicList []string - SchemaVersionMap = make(map[string]int) - logger *zap.Logger -) - -func InitializeSchemaHandler(configuration config.Configurations) { - logger = utils.GetLogger() - SchemaRegistryEndpoint = configuration.SchemaRegistry.Endpoint - TopicList = strings.Split(configuration.SchemaRegistry.Topics, ",") - // initialize schema version map which contains latest schema version for topic(s) - GetSchemaVersions() -} - -func GetSchemaVersions() { - - schemaRegistryClient := srclient.CreateSchemaRegistryClient(SchemaRegistryEndpoint) - schemaRegistryClient.CodecCreationEnabled(false) - for _, topic := range TopicList { - schema, err := schemaRegistryClient.GetLatestSchema(topic, false) - if err != nil { - logger.Error("error in fetching latest schema", zap.Error(err)) - } else { - schemaId := schema.ID() - SchemaVersionMap[topic] = schemaId - } - } - logger.Debug("Schema Version Map", zap.String("", fmt.Sprintf("%v", SchemaVersionMap))) -} diff --git a/server/handlers.go b/server/handlers.go index 79f3098..be0d1da 100644 --- a/server/handlers.go +++ b/server/handlers.go @@ -24,12 +24,6 @@ const ( ERROR = "error" ) -type NewSchemaRequest struct { - Topic string `json:"topic"` - Schema string `json:"schema"` - SchemaType string `json:"schema_type"` -} - type CustomResponse struct { Code int `json:"code"` Message string `json:"message"`