resolved conflicts commit
This commit is contained in:
@@ -3,8 +3,7 @@ package lib
|
||||
import (
|
||||
metrics "com.navi.medici.janus/instrumentation"
|
||||
producer_module "com.navi.medici.janus/producer"
|
||||
"com.navi.medici.janus/schema"
|
||||
"encoding/binary"
|
||||
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
|
||||
@@ -21,13 +20,6 @@ type RequestObject struct {
|
||||
Header http.Header
|
||||
}
|
||||
|
||||
func ProcessProtobufRequestChannel(topic string) {
|
||||
for {
|
||||
request := <-ProtobufRequestChannel
|
||||
ClickstreamProtobufEventHandler(*request, topic)
|
||||
}
|
||||
}
|
||||
|
||||
func ProcessJsonRequestChannel(topic string) {
|
||||
for {
|
||||
request := <-JsonRequestChannel
|
||||
@@ -35,40 +27,6 @@ func ProcessJsonRequestChannel(topic string) {
|
||||
}
|
||||
}
|
||||
|
||||
func ClickstreamProtobufEventHandler(request RequestObject, topic string) {
|
||||
|
||||
messageBytes := request.Body
|
||||
//getting the client which has sent this event
|
||||
var result map[string]interface{}
|
||||
json.Unmarshal(messageBytes, &result)
|
||||
source := getSource(result)
|
||||
//log.Print(source)
|
||||
// to be of the format [magicByte] + [schemaID] + [messageIndex] + [value]
|
||||
recordValue := []byte{}
|
||||
|
||||
// add [magicByte]
|
||||
recordValue = append(recordValue, byte(0))
|
||||
|
||||
// add schemaID]
|
||||
schemaIDBytes := make([]byte, 4)
|
||||
binary.BigEndian.PutUint32(schemaIDBytes, uint32(schema.SchemaVersionMap[topic]))
|
||||
recordValue = append(recordValue, schemaIDBytes...)
|
||||
|
||||
// add [messageIndex]
|
||||
messageIndexBytes := []byte{byte(2), byte(0)}
|
||||
recordValue = append(recordValue, messageIndexBytes...)
|
||||
|
||||
// Now write the bytes from the actual message...
|
||||
recordValue = append(recordValue, messageBytes...)
|
||||
|
||||
message := &sarama.ProducerMessage{
|
||||
Topic: topic,
|
||||
Value: sarama.ByteEncoder(recordValue),
|
||||
}
|
||||
metrics.IncrementCounter("request", source)
|
||||
producer_module.WriteMessageToKafkaAsync(message, source)
|
||||
}
|
||||
|
||||
func ClickstreamJsonEventHandler(request RequestObject, topic string) {
|
||||
|
||||
messageBytes := request.Body
|
||||
|
||||
18
main.go
18
main.go
@@ -7,10 +7,9 @@ import (
|
||||
"com.navi.medici.janus/schema"
|
||||
"com.navi.medici.janus/server"
|
||||
"com.navi.medici.janus/utils"
|
||||
"github.com/spf13/viper"
|
||||
"go.uber.org/zap"
|
||||
"log"
|
||||
|
||||
"github.com/spf13/viper"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -19,7 +18,7 @@ var (
|
||||
logger *zap.Logger
|
||||
)
|
||||
|
||||
const metrics_port = "4000"
|
||||
const METRICS_PORT = "4000"
|
||||
|
||||
func init() {
|
||||
configs := getConfigs()
|
||||
@@ -27,15 +26,10 @@ func init() {
|
||||
logger = utils.GetLogger()
|
||||
port = configs.Server.Port
|
||||
logger.Debug("PORT IS: " + port)
|
||||
logger.Debug(configs.Kafka.Bootstrap_Servers)
|
||||
logger.Debug(configs.SchemaRegistry.Endpoint)
|
||||
logger.Debug("Bootstrap Servers:" + configs.Kafka.Bootstrap_Servers)
|
||||
schema.InitializeSchemaHandler(configs)
|
||||
producer_client.InitializeProducers(configs.Kafka, configs.Env)
|
||||
|
||||
for i := 0; i < 2; i++ {
|
||||
go lib.ProcessProtobufRequestChannel(configs.Kafka.Kafka_Topic_Protobuf)
|
||||
}
|
||||
|
||||
for i := 0; i < configs.Server.Goroutines; i++ {
|
||||
go lib.ProcessJsonRequestChannel(configs.Kafka.Kafka_Topic_Json)
|
||||
}
|
||||
@@ -49,12 +43,12 @@ func getConfigs() config.Configurations {
|
||||
|
||||
var configuration config.Configurations
|
||||
if err := viper.ReadInConfig(); err != nil {
|
||||
log.Fatalf("Error reading config file, %s\n", err)
|
||||
log.Fatalln("Error reading config file, %s", err)
|
||||
}
|
||||
|
||||
err := viper.Unmarshal(&configuration)
|
||||
if err != nil {
|
||||
log.Fatalf("Unable to decode into struct, %v\n", err)
|
||||
log.Fatalln("Unable to decode into struct, %v", err)
|
||||
}
|
||||
|
||||
// Following coinfigurations read from environment variables
|
||||
@@ -76,7 +70,7 @@ func getConfigs() config.Configurations {
|
||||
func main() {
|
||||
logger.Debug("Serving on http://0.0.0.0:" + port)
|
||||
httpServer, err1 := server.NewServer(port, cors)
|
||||
metricsServer, err2 := server.MetricServer(metrics_port)
|
||||
metricsServer, err2 := server.MetricServer(METRICS_PORT)
|
||||
if err1 != nil {
|
||||
logger.Fatal("Unable to start server, %v", zap.String("error", err1.Error()))
|
||||
}
|
||||
|
||||
@@ -2,12 +2,11 @@ package server
|
||||
|
||||
import (
|
||||
"com.navi.medici.janus/lib"
|
||||
"com.navi.medici.janus/schema"
|
||||
"io"
|
||||
|
||||
// "log"
|
||||
"compress/gzip"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
)
|
||||
@@ -27,43 +26,6 @@ type CustomResponse struct {
|
||||
Message string `json:"message"`
|
||||
}
|
||||
|
||||
func eventsHandler(w http.ResponseWriter, r *http.Request) {
|
||||
var reader io.Reader
|
||||
|
||||
// for name, values := range r.Header {
|
||||
// // Loop over all values for the name.
|
||||
// for _, value := range values {
|
||||
// fmt.Println("HEADER: ", name, value)
|
||||
// }
|
||||
// }
|
||||
|
||||
// check if body is gzip compressed
|
||||
if r.Header.Get("Content-Encoding") == "gzip" {
|
||||
var err error
|
||||
reader, err = gzip.NewReader(r.Body)
|
||||
if err != nil {
|
||||
// log.Printf(err)
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
http.Error(w, "Error while decompressing GZIP payload", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
} else {
|
||||
reader = r.Body
|
||||
}
|
||||
|
||||
body, err := ioutil.ReadAll(reader)
|
||||
if err != nil {
|
||||
// log.Printf(err)
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
http.Error(w, "Request body invalid", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
lib.ProtobufRequestChannel <- &lib.RequestObject{Body: body, Header: r.Header}
|
||||
io.WriteString(w, "ok")
|
||||
}
|
||||
|
||||
func eventsHandlerJson(w http.ResponseWriter, r *http.Request) {
|
||||
var reader io.Reader
|
||||
// check if body is gzip compressed
|
||||
@@ -104,37 +66,3 @@ func healthHandler(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func healthToggleHandler(w http.ResponseWriter, r *http.Request) {
|
||||
healthyBool = !healthyBool
|
||||
io.WriteString(w, "toggled")
|
||||
}
|
||||
|
||||
func getSchemaHandler(w http.ResponseWriter, r *http.Request) {
|
||||
schemaMapJson, _ := json.Marshal(schema.SchemaVersionMap)
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.Write(schemaMapJson)
|
||||
}
|
||||
|
||||
func refreshSchemaHandler(w http.ResponseWriter, r *http.Request) {
|
||||
schema.GetSchemaVersions()
|
||||
io.WriteString(w, "Updated Schema Map")
|
||||
}
|
||||
|
||||
func addSchemaHandler(w http.ResponseWriter, r *http.Request) {
|
||||
body, _ := ioutil.ReadAll(r.Body)
|
||||
|
||||
var bodyJson NewSchemaRequest
|
||||
error := json.Unmarshal(body, &bodyJson)
|
||||
fmt.Println(error)
|
||||
|
||||
errorSchema := schema.AddSchema(bodyJson.Topic, bodyJson.SchemaType, bodyJson.Schema)
|
||||
fmt.Println("errorSchema: ")
|
||||
fmt.Println(error)
|
||||
if errorSchema != nil {
|
||||
http.Error(w, fmt.Sprintf("Error creating the schema %s", errorSchema), http.StatusBadRequest)
|
||||
} else {
|
||||
io.WriteString(w, "added")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -33,16 +33,8 @@ func NewServer(port string, corsList string) (*Server, error) {
|
||||
}
|
||||
|
||||
router := mux.NewRouter()
|
||||
router.HandleFunc("/events", eventsHandler).Methods("POST")
|
||||
router.HandleFunc("/events/json", eventsHandlerJson).Methods("POST")
|
||||
router.HandleFunc("/health", healthHandler).Methods("GET")
|
||||
router.HandleFunc("/health/toggle", healthToggleHandler).Methods("GET")
|
||||
router.HandleFunc("/schema/get", getSchemaHandler).Methods("GET")
|
||||
router.HandleFunc("/schema/refresh", refreshSchemaHandler).Methods("POST")
|
||||
router.HandleFunc("/schema/add", addSchemaHandler).Methods("POST")
|
||||
//router.Path("/metrics").Handler(promhttp.Handler())
|
||||
// router.HandleFunc("/test", testHandler).Methods("GET")
|
||||
// router.HandleFunc("/stop", stopHandler).Methods("POST")
|
||||
|
||||
httpServer := &http.Server{Addr: ":" + port, Handler: enableCors(router, corsList)}
|
||||
newServer := &Server{HttpServer: httpServer, Listener: listener}
|
||||
|
||||
Reference in New Issue
Block a user