Merge pull request #36 from rashmin-patel_navi/master

DE-1338 | DNM | removing schema registry dependency
This commit is contained in:
Rashmin Patel
2024-08-12 14:12:24 +05:30
committed by GitHub
7 changed files with 0 additions and 182 deletions

View File

@@ -4,7 +4,6 @@ type Configurations struct {
Env string Env string
Server ServerConfigurations Server ServerConfigurations
Kafka KafkaConfigurations Kafka KafkaConfigurations
SchemaRegistry SchemaRegistryConfigurations
} }
type ServerConfigurations struct { type ServerConfigurations struct {
@@ -22,8 +21,3 @@ type KafkaConfigurations struct {
Kafka_Topic_Json string Kafka_Topic_Json string
Kafka_Topic_Protobuf string Kafka_Topic_Protobuf string
} }
type SchemaRegistryConfigurations struct {
Endpoint string
Topics string
}

View File

@@ -12,7 +12,3 @@ kafka:
sasl_password: KAFKA_SASL_PASSWORD # read from environment variable sasl_password: KAFKA_SASL_PASSWORD # read from environment variable
kafka_topic_json: KAFKA_TOPIC_JSON kafka_topic_json: KAFKA_TOPIC_JSON
kafka_topic_protobuf: KAFKA_TOPIC_PROTOBUF kafka_topic_protobuf: KAFKA_TOPIC_PROTOBUF
schemaRegistry:
endpoint: SCHEMA_REGISTRY_ENDPOINT
topics: SCHEMA_TOPICS

View File

@@ -1,4 +1,2 @@
BOOTSTRAP_SERVERS=docker.for.mac.localhost:9092 BOOTSTRAP_SERVERS=docker.for.mac.localhost:9092
SCHEMA_REGISTRY_ENDPOINT=http://docker.for.mac.localhost:8081
SCHEMA_TOPICS=testgo
KAFKA_TOPIC=testgo KAFKA_TOPIC=testgo

View File

@@ -4,7 +4,6 @@ import (
"com.navi.medici.janus/config" "com.navi.medici.janus/config"
"com.navi.medici.janus/lib" "com.navi.medici.janus/lib"
producer_client "com.navi.medici.janus/producer" producer_client "com.navi.medici.janus/producer"
"com.navi.medici.janus/schema"
"com.navi.medici.janus/server" "com.navi.medici.janus/server"
"com.navi.medici.janus/utils" "com.navi.medici.janus/utils"
"github.com/spf13/viper" "github.com/spf13/viper"
@@ -27,7 +26,6 @@ func init() {
port = configs.Server.Port port = configs.Server.Port
logger.Debug("Service started on PORT: " + port) logger.Debug("Service started on PORT: " + port)
logger.Debug("Kafka Bootstrap Servers: " + configs.Kafka.Bootstrap_Servers) logger.Debug("Kafka Bootstrap Servers: " + configs.Kafka.Bootstrap_Servers)
schema.InitializeSchemaHandler(configs)
producer_client.InitializeProducers(configs.Kafka, configs.Env) producer_client.InitializeProducers(configs.Kafka, configs.Env)
logger.Info("Producer Initialized, starting goroutines for event processing") logger.Info("Producer Initialized, starting goroutines for event processing")
for i := 0; i < configs.Server.Goroutines; i++ { for i := 0; i < configs.Server.Goroutines; i++ {
@@ -54,8 +52,6 @@ func getConfigs() config.Configurations {
// Following coinfigurations read from environment variables // Following coinfigurations read from environment variables
configuration.Env = viper.GetString(configuration.Env) configuration.Env = viper.GetString(configuration.Env)
configuration.Kafka.Bootstrap_Servers = viper.GetString(configuration.Kafka.Bootstrap_Servers) configuration.Kafka.Bootstrap_Servers = viper.GetString(configuration.Kafka.Bootstrap_Servers)
configuration.SchemaRegistry.Endpoint = viper.GetString(configuration.SchemaRegistry.Endpoint)
configuration.SchemaRegistry.Topics = viper.GetString(configuration.SchemaRegistry.Topics)
configuration.Kafka.Kafka_Topic_Json = viper.GetString(configuration.Kafka.Kafka_Topic_Json) configuration.Kafka.Kafka_Topic_Json = viper.GetString(configuration.Kafka.Kafka_Topic_Json)
configuration.Kafka.Kafka_Topic_Protobuf = viper.GetString(configuration.Kafka.Kafka_Topic_Protobuf) configuration.Kafka.Kafka_Topic_Protobuf = viper.GetString(configuration.Kafka.Kafka_Topic_Protobuf)

View File

@@ -1,119 +0,0 @@
{
"type": "object",
"properties": {
"app": {
"type": "object",
"properties": {
"name": {
"type": "string"
},
"version": {
"type": "string"
}
}
},
"device": {
"type": "object",
"properties": {
"device_id": {
"type": "string"
},
"advertising_id": {
"type": "string"
},
"installation_id": {
"type": "string"
},
"category": {
"type": "string"
},
"height": {
"type": "string"
},
"imei": {
"type": "string"
},
"manufacturer": {
"type": "string"
},
"model": {
"type": "string"
},
"screen_resolution": {
"type": "string"
},
"os": {
"type": "string"
},
"os_version": {
"type": "string"
},
"os_api_level": {
"type": "string"
},
"rooted": {
"type": "string"
},
"fingerprint": {
"type": "string"
}
}
},
"network": {
"type": "object",
"properties": {
"carrier": {
"type": "string"
},
"type": {
"type": "string"
},
"ip": {
"type": "string"
}
}
},
"location": {
"type": "object",
"properties": {
"latitude": {
"type": "string"
},
"longitude": {
"type": "string"
}
}
},
"referer": {
"type": "object",
"properties": {
"url": {
"type": "string"
}
}
},
"user": {
"type": "object",
"properties": {
"customer_id": {
"type": "string"
}
}
},
"events": {
"type": "array",
"items": {
"type": "object",
"properties": {
"event_type": {
"type": "string"
},
"timestamp": {
"type": "number"
}
}
}
}
},
"additionalProperties": false
}

View File

@@ -1,41 +0,0 @@
package schema
import (
"com.navi.medici.janus/config"
"com.navi.medici.janus/utils"
"fmt"
"github.com/riferrei/srclient"
"go.uber.org/zap"
"strings"
)
var (
SchemaRegistryEndpoint string
TopicList []string
SchemaVersionMap = make(map[string]int)
logger *zap.Logger
)
func InitializeSchemaHandler(configuration config.Configurations) {
logger = utils.GetLogger()
SchemaRegistryEndpoint = configuration.SchemaRegistry.Endpoint
TopicList = strings.Split(configuration.SchemaRegistry.Topics, ",")
// initialize schema version map which contains latest schema version for topic(s)
GetSchemaVersions()
}
func GetSchemaVersions() {
schemaRegistryClient := srclient.CreateSchemaRegistryClient(SchemaRegistryEndpoint)
schemaRegistryClient.CodecCreationEnabled(false)
for _, topic := range TopicList {
schema, err := schemaRegistryClient.GetLatestSchema(topic, false)
if err != nil {
logger.Error("error in fetching latest schema", zap.Error(err))
} else {
schemaId := schema.ID()
SchemaVersionMap[topic] = schemaId
}
}
logger.Debug("Schema Version Map", zap.String("", fmt.Sprintf("%v", SchemaVersionMap)))
}

View File

@@ -24,12 +24,6 @@ const (
ERROR = "error" ERROR = "error"
) )
type NewSchemaRequest struct {
Topic string `json:"topic"`
Schema string `json:"schema"`
SchemaType string `json:"schema_type"`
}
type CustomResponse struct { type CustomResponse struct {
Code int `json:"code"` Code int `json:"code"`
Message string `json:"message"` Message string `json:"message"`