DE-552 changed metrics port and removed unecessary magic bytes in payload
This commit is contained in:
@@ -85,12 +85,12 @@ func ClickstreamJsonEventHandler(request RequestObject, topic string) {
|
||||
recordValue := []byte{}
|
||||
|
||||
// add [magicByte]
|
||||
recordValue = append(recordValue, byte(0))
|
||||
// recordValue = append(recordValue, byte(0))
|
||||
|
||||
// add schemaID]
|
||||
schemaIDBytes := make([]byte, 4)
|
||||
binary.BigEndian.PutUint32(schemaIDBytes, uint32(producer_module.SchemaVersionMap[topic]))
|
||||
recordValue = append(recordValue, schemaIDBytes...)
|
||||
// // add schemaID]
|
||||
// schemaIDBytes := make([]byte, 4)
|
||||
// binary.BigEndian.PutUint32(schemaIDBytes, uint32(producer_module.SchemaVersionMap[topic]))
|
||||
// recordValue = append(recordValue, schemaIDBytes...)
|
||||
|
||||
// Now write the bytes from the actual message...
|
||||
recordValue = append(recordValue, messageBytes...)
|
||||
|
||||
14
main.go
14
main.go
@@ -75,10 +75,16 @@ func init() {
|
||||
func main() {
|
||||
|
||||
log.Printf("Serving on http://0.0.0.0:", port)
|
||||
httpServer, err := server.NewServer(port)
|
||||
if err != nil {
|
||||
log.Fatalln("Unable to start server, ", err)
|
||||
httpServer, err1 := server.NewServer(port)
|
||||
metricsServer, err2 := server.MetricServer("4000")
|
||||
if err1 != nil {
|
||||
log.Fatalln("Unable to start server, ", err1)
|
||||
}
|
||||
httpServer.HttpServer.Serve(httpServer.Listener)
|
||||
if err2 != nil {
|
||||
log.Fatalln("Unable to start Metric server, ", err2)
|
||||
}
|
||||
go httpServer.HttpServer.Serve(httpServer.Listener)
|
||||
go metricsServer.HttpServer.Serve(metricsServer.Listener)
|
||||
|
||||
select {}
|
||||
}
|
||||
|
||||
@@ -38,7 +38,7 @@ func NewServer(port string) (*Server, error) {
|
||||
router.HandleFunc("/schema/get", getSchemaHandler).Methods("GET")
|
||||
router.HandleFunc("/schema/refresh", refreshSchemaHandler).Methods("POST")
|
||||
router.HandleFunc("/schema/add", addSchemaHandler).Methods("POST")
|
||||
router.Path("/metrics").Handler(promhttp.Handler())
|
||||
//router.Path("/metrics").Handler(promhttp.Handler())
|
||||
// router.HandleFunc("/test", testHandler).Methods("GET")
|
||||
// router.HandleFunc("/stop", stopHandler).Methods("POST")
|
||||
|
||||
@@ -47,3 +47,20 @@ func NewServer(port string) (*Server, error) {
|
||||
return newServer, nil
|
||||
|
||||
}
|
||||
|
||||
func MetricServer(port string) (*Server, error) {
|
||||
network := networkTCP
|
||||
|
||||
listener, err := net.Listen(network, ":"+port)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to create listener")
|
||||
}
|
||||
|
||||
router := mux.NewRouter()
|
||||
|
||||
router.Path("/metrics").Handler(promhttp.Handler())
|
||||
|
||||
httpServer := &http.Server{Addr: ":" + port, Handler: router}
|
||||
mServer := &Server{HttpServer: httpServer, Listener: listener}
|
||||
return mServer, nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user