Tp 55555/integrate document service client (#5)

* TP-55555 | document client and kafka integration

* TP-55555 | introduce service concept refactor code
This commit is contained in:
Varnit Goyal
2024-07-27 17:00:47 +05:30
committed by GitHub
parent f75297880d
commit f20af81520
21 changed files with 764 additions and 103 deletions

28
pkg/encoder/encoder.go Normal file
View File

@@ -0,0 +1,28 @@
package encoder
import (
"encoding/json"
"fmt"
"google.golang.org/protobuf/proto"
)
type Encoder interface {
Encode(v interface{}) ([]byte, error)
}
type JsonEncoder struct{}
type ProtoEncoder struct{}
var JsonEncoderInstance = &JsonEncoder{}
var ProtoEncoderInstance = &ProtoEncoder{}
func (j *JsonEncoder) Encode(v interface{}) ([]byte, error) {
return json.Marshal(v)
}
func (p *ProtoEncoder) Encode(v interface{}) ([]byte, error) {
if message, ok := v.(proto.Message); ok {
return proto.Marshal(message)
}
return nil, fmt.Errorf("not a proto message")
}

View File

@@ -0,0 +1,139 @@
package httpclient
import (
"bytes"
"context"
"cybertron/configs"
"cybertron/pkg/log"
"cybertron/pkg/metrics"
"cybertron/pkg/utils"
"errors"
"fmt"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
"net/http"
"net/url"
"path"
"time"
)
type HttpClient interface {
Do(req *http.Request) (*http.Response, error)
}
type CircuitBreaker interface {
ExecuteRequest(req func() (interface{}, error)) (interface{}, error)
}
var logger = log.Log
// todo - custom configs for all clients
func NewHttpClient(httpConfig configs.HttpConfig) HttpClient {
return &http.Client{
Transport: &http.Transport{
MaxIdleConns: httpConfig.MaxIdleConnectionPool,
MaxConnsPerHost: httpConfig.MaxConnection,
},
Timeout: time.Duration(httpConfig.MaxTimeoutInSeconds * time.Second.Nanoseconds()),
}
}
// GetHttpRequest is a generic function to create a http request with the given method, url and body.
// Accepts the body as a proto message. Adds the necessary headers.
func GetHttpRequest(requestMetadata *context.Context, method string, url string, body proto.Message) (*http.Request, error) {
requestBody, _ := proto.Marshal(body)
req, err := http.NewRequest(method, url, bytes.NewBuffer(requestBody))
if err != nil {
return nil, err
}
err = addHeadersToRequest(req, requestMetadata)
if err != nil {
return nil, err
}
return req, nil
}
// addHeadersToRequest adds the necessary headers to the request.
func addHeadersToRequest(req *http.Request, requestMetadata *context.Context) error {
//correlationId := metadata.GetRequestMetadata(requestMetadata, utils.CORRELATION_ID_HEADER)
//if correlationId == nil {
// return fmt.Errorf("correlation id not found in request metadata")
//}
//req.Header.Add(utils.CORRELATION_ID_HEADER, *correlationId)
//
//saCustomerId := metadata.GetRequestMetadata(requestMetadata, utils.CUSTOMER_ID_HEADER)
//if saCustomerId != nil {
// req.Header.Add(utils.CUSTOMER_ID_HEADER, *saCustomerId)
//}
return nil
}
// GenerateUrl builds a url for the given endpoint and query params. To append path params to the url,
// pass each part of the path as a parameter. For example, if the endpoint is /billers/{billerId}/bills,
// then pass "billers", "{billerId}" and "bills" as parameters, in the same order.
func GenerateUrl(baseUrl, apiVersion string, queryParams map[string]string, endpoint ...string) (*url.URL, error) {
url, err := url.Parse(baseUrl)
if err != nil {
logger.Error(err.Error())
return nil, err
}
url.Path = path.Join(url.Path, apiVersion, path.Join(endpoint...))
if queryParams != nil {
query := url.Query()
for key, value := range queryParams {
query.Add(key, value)
}
url.RawQuery = query.Encode()
}
return url, nil
}
// HttpCallToClientWithCB is a generic function to make an api call to client service with circuit breaker pattern.
// Returns error only when http response is nil. This is because, in case of internal failures, downstream services are expected
// to share the error details in the response body.
func HttpCallToClientWithCB(requestMetadata *context.Context, req *http.Request, httpClient *HttpClient, circuitBreaker CircuitBreaker) (*http.Response, error) {
resp, err := circuitBreaker.ExecuteRequest(func() (interface{}, error) {
return HttpCallToClient(req, httpClient)
})
if resp == nil {
return nil, err
}
if httpResponse, ok := resp.(*http.Response); ok && httpResponse != nil {
if utils.IsErrorStatusCode(httpResponse.StatusCode) {
logger.ErrorWithCtx(requestMetadata, "api call failed", zap.String("url", req.URL.String()), zap.Int("status_code", httpResponse.StatusCode))
if httpResponse.Body == nil {
return nil, errors.New("api call failed: response body is nil")
}
} else {
logger.InfoWithCtx(requestMetadata, "api call successful", zap.String("url", req.URL.String()), zap.Int("status_code", httpResponse.StatusCode))
}
return httpResponse, nil
}
log.Log.ErrorWithCtx(requestMetadata, "unexpected response", zap.String("response", utils.ConvertToString(resp)))
return nil, errors.New("unexpected response type")
}
// HttpCallToClient is a generic function to make an api call to client service. It expects
// all the headers to be set in the request. Also records the metrics for the api call.
// In error scenario, downstream service is expected to return 4xx or 5xx status code with error details in the response body.
// Errors returned by this method will increase the failure counter in circuit breaker. Error is only returned if status code is 5XX.
func HttpCallToClient(req *http.Request, httpClient *HttpClient) (*http.Response, error) {
httpMethodCall := func(req *http.Request) (*http.Response, error) {
resp, err := (*httpClient).Do(req)
if err != nil {
return nil, err
}
// Only returning error in cases we want CB to consider failure.
if resp.StatusCode >= 500 {
return resp, errors.New("api call failed with status code: " + fmt.Sprintf("%d", resp.StatusCode))
}
return resp, nil
}
return metrics.RecordClientHttpCallMetrics(req, httpMethodCall)
}

77
pkg/kafka/config.go Normal file
View File

@@ -0,0 +1,77 @@
package kafka
import (
"crypto/tls"
"cybertron/configs"
"github.com/IBM/sarama"
"time"
)
func SaramaSyncProducer(env string, baseConfig *configs.KafkaConfig) (sarama.AsyncProducer, error) {
return sarama.NewAsyncProducer(baseConfig.Brokers, kafkaProducerConfig(env, baseConfig))
}
func SaramaKafkaConsumer(env string, baseConfig *configs.KafkaConfig, groupID string) (sarama.ConsumerGroup, error) {
consumerGroup, err := sarama.NewConsumerGroup(baseConfig.Brokers, groupID, kafkaConsumerConfig(env, baseConfig, groupID))
if err != nil {
return nil, err
}
return consumerGroup, nil
}
func kafkaProducerConfig(env string, baseConfig *configs.KafkaConfig) *sarama.Config {
kafkaConfig := kafkaConfiguration(env, baseConfig)
kafkaConfig.Producer.Retry.Max = 3
kafkaConfig.Producer.RequiredAcks = sarama.WaitForLocal
kafkaConfig.Producer.Compression = sarama.CompressionSnappy
kafkaConfig.Producer.Return.Successes = true
kafkaConfig.Producer.Flush.Bytes = 31000
kafkaConfig.Producer.Flush.Frequency = 100 * time.Millisecond
kafkaConfig.Producer.Flush.Messages = 100
kafkaConfig.ChannelBufferSize = 512
kafkaConfig.Producer.MaxMessageBytes = 2000000
kafkaConfig.Metadata.RefreshFrequency = 1 * time.Minute
return kafkaConfig
}
func kafkaConsumerConfig(env string, baseConfig *configs.KafkaConfig, groupId string) *sarama.Config {
kConfig := kafkaConfiguration(env, baseConfig)
kConfig.Version = sarama.V3_6_0_0
kConfig.Consumer.Offsets.Initial = sarama.OffsetNewest
kConfig.Consumer.MaxProcessingTime = 50 * time.Millisecond
kConfig.Consumer.Return.Errors = true
kConfig.ClientID = groupId
kConfig.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRoundRobin()}
return kConfig
}
func kafkaConfiguration(env string, baseConfig *configs.KafkaConfig) *sarama.Config {
kafkaConfig := sarama.NewConfig()
if env == "local" {
return kafkaConfig
} else if env == "prod" {
kafkaConfig.Net.SASL.Mechanism = sarama.SASLTypePlaintext
} else {
kafkaConfig.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512
kafkaConfig.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &XDGSCRAMClient{HashGeneratorFcn: SHA512}
}
}
kafkaConfig.Net.SASL.User = baseConfig.Username
kafkaConfig.Net.SASL.Password = baseConfig.Password
kafkaConfig.Net.SASL.Enable = baseConfig.SaslEnabled
kafkaConfig.Net.TLS.Enable = baseConfig.TlsEnabled
kafkaConfig.Net.TLS.Config = &tls.Config{
InsecureSkipVerify: baseConfig.TlsInsureSkipVerification,
}
return kafkaConfig
}

View File

@@ -0,0 +1,99 @@
package producer
import (
"cybertron/configs"
"cybertron/pkg/encoder"
"cybertron/pkg/kafka"
"cybertron/pkg/log"
"github.com/IBM/sarama"
"go.uber.org/zap"
"os"
)
type KProducer interface {
Errors()
Successes()
PublishEvent(msgPayload interface{}, topic, key string, headers map[string]string, marshaller encoder.Encoder) error
sendMessage(msgPayload []byte, topic, key string, headers map[string]string) error
}
type KProducerImpl struct {
AsyncProducer sarama.AsyncProducer
}
func NewKProducer(env string, baseConfig *configs.KafkaConfig) *KProducerImpl {
producer, err := kafka.SaramaSyncProducer(env, baseConfig)
if err != nil {
log.Log.Error("sarama kafka producer failed", zap.Error(err))
os.Exit(1)
}
kProducer := &KProducerImpl{
AsyncProducer: producer,
}
go func() {
kProducer.Errors()
}()
go func() {
kProducer.Successes()
}()
return kProducer
}
// Errors keep the track of failed messages.
func (kp *KProducerImpl) Errors() {
for err := range kp.AsyncProducer.Errors() {
keyBytes, errEncode := err.Msg.Key.Encode()
if errEncode != nil {
log.Log.Error("key encoding failed for failed message", zap.String("topic", err.Msg.Topic))
}
// todo - add metrics
log.Log.Error("failed to emit event to kafka", zap.String("topic", err.Msg.Topic),
zap.String("key", string(keyBytes)), zap.String("value", string(keyBytes)), zap.String("error", err.Error()))
}
}
// Successes is to check if message successfully delivered to kafka
func (kp *KProducerImpl) Successes() {
for msg := range kp.AsyncProducer.Successes() {
_, errEncode := msg.Key.Encode()
if errEncode != nil {
log.Log.Error("key encoding failed for failed message", zap.String("topic", msg.Topic))
}
// todo - add metrics
}
}
func (kp *KProducerImpl) sendMessage(msgPayload []byte, topic, key string, headers map[string]string) error {
var saramaHeaders []sarama.RecordHeader
for k, v := range headers {
saramaHeaders = append(saramaHeaders, sarama.RecordHeader{
Key: []byte(k),
Value: []byte(v),
})
}
message := &sarama.ProducerMessage{
Topic: topic,
Key: sarama.StringEncoder(key),
Value: sarama.StringEncoder(msgPayload),
Headers: saramaHeaders,
}
kp.AsyncProducer.Input() <- message
// todo - add metrics
return nil
}
func (kp *KProducerImpl) PublishEvent(msgPayload interface{}, topic, key string, headers map[string]string, marshaller encoder.Encoder) error {
msg, err := marshaller.Encode(msgPayload)
if err != nil {
log.Log.Error("error while marshalling msg payload", zap.Error(err))
return err
}
return kp.sendMessage(msg, topic, key, headers)
}

35
pkg/kafka/scram_client.go Normal file
View File

@@ -0,0 +1,35 @@
package kafka
import (
"crypto/sha512"
"github.com/xdg-go/scram"
)
var (
SHA512 scram.HashGeneratorFcn = sha512.New
)
type XDGSCRAMClient struct {
*scram.Client
*scram.ClientConversation
scram.HashGeneratorFcn
}
func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) {
x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID)
if err != nil {
return err
}
x.ClientConversation = x.Client.NewConversation()
return nil
}
func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) {
response, err = x.ClientConversation.Step(challenge)
return
}
func (x *XDGSCRAMClient) Done() bool {
return x.ClientConversation.Done()
}

View File

@@ -1,11 +1,15 @@
package log
import (
"github.com/gin-gonic/gin"
"context"
"go.elastic.co/ecszap"
"go.uber.org/zap"
)
const (
CORRELATION_ID_HEADER = "X-Correlation-Id"
)
type Logger struct {
log *zap.Logger
}
@@ -15,7 +19,8 @@ var Log *Logger
func initiateLogger() *zap.Logger {
config := zap.NewProductionConfig()
config.EncoderConfig = ecszap.ECSCompatibleEncoderConfig(config.EncoderConfig)
log, err := config.Build(ecszap.WrapCoreOption(), zap.AddCaller())
log, err := config.Build(ecszap.WrapCoreOption(), zap.AddCallerSkip(1))
log = log.With(zap.String("service", "t-store"))
if err != nil {
panic(err)
@@ -23,48 +28,6 @@ func initiateLogger() *zap.Logger {
return log
}
func Error(message string, fields ...zap.Field) {
Log.log.Error(appendBaseMessage(message), fields...)
}
func Warn(message string, fields ...zap.Field) {
Log.log.Warn(appendBaseMessage(message), fields...)
}
func Info(message string, fields ...zap.Field) {
Log.log.Info(appendBaseMessage(message), fields...)
}
func Fatal(message string, fields ...zap.Field) {
Log.log.Fatal(appendBaseMessage(message), fields...)
}
func Panic(message string, fields ...zap.Field) {
Log.log.Panic(appendBaseMessage(message), fields...)
}
func ErrorWithContext(c *gin.Context, message string, fields ...zap.Field) {
requestLogEntryWithCorrelationId(c).Error(appendBaseMessage(message), fields...)
}
func WarnWithContext(c *gin.Context, message string, fields ...zap.Field) {
requestLogEntryWithCorrelationId(c).Warn(appendBaseMessage(message), fields...)
}
func InfoWithContext(c *gin.Context, message string, fields ...zap.Field) {
requestLogEntryWithCorrelationId(c).Info(appendBaseMessage(message), fields...)
}
func requestLogEntryWithCorrelationId(c *gin.Context) *zap.Logger {
return Log.log.With(
zap.String("CorrelationId", c.Value("X-Correlation-Id").(string)),
)
}
func appendBaseMessage(message string) string {
return "cybertron" + message
}
func (l *Logger) GetLog() *zap.Logger {
return Log.log
}
@@ -74,3 +37,81 @@ func init() {
log: initiateLogger(),
}
}
func (l *Logger) Info(msg string, fields ...zap.Field) {
l.log.Info(msg, fields...)
}
func (l *Logger) Error(msg string, fields ...zap.Field) {
l.log.Error(msg, fields...)
}
func (l *Logger) Fatal(msg string, fields ...zap.Field) {
l.log.Fatal(msg, fields...)
}
func (l *Logger) Warn(msg string, fields ...zap.Field) {
l.log.Warn(msg, fields...)
}
func (l *Logger) Panic(msg string, fields ...zap.Field) {
l.log.Panic(msg, fields...)
}
func (l *Logger) InfoWithCtx(ctx *context.Context, msg string, fields ...zap.Field) {
correlationId := getCorrelationId(ctx)
if correlationId != "" {
fields = append(fields, zap.String("correlation_id", correlationId))
}
l.log.Info(msg, fields...)
}
func (l *Logger) ErrorWithCtx(ctx *context.Context, msg string, fields ...zap.Field) {
correlationId := getCorrelationId(ctx)
if correlationId != "" {
fields = append(fields, zap.String("correlation_id", correlationId))
}
l.log.Error(msg, fields...)
}
func (l *Logger) FatalWithCtx(ctx *context.Context, msg string, fields ...zap.Field) {
correlationId := getCorrelationId(ctx)
if correlationId != "" {
fields = append(fields, zap.String("correlation_id", correlationId))
}
l.log.Fatal(msg, fields...)
}
func (l *Logger) WarnWithCtx(ctx *context.Context, msg string, fields ...zap.Field) {
correlationId := getCorrelationId(ctx)
if correlationId != "" {
fields = append(fields, zap.String("correlation_id", correlationId))
}
l.log.Warn(msg, fields...)
}
func (l *Logger) PanicWithCtx(ctx *context.Context, msg string, fields ...zap.Field) {
correlationId := getCorrelationId(ctx)
if correlationId != "" {
fields = append(fields, zap.String("correlation_id", correlationId))
}
l.log.Panic(msg, fields...)
}
func getCorrelationId(ctx *context.Context) string {
if ctx == nil {
return ""
}
correlationId, ok := (*ctx).Value(CORRELATION_ID_HEADER).(string)
if ok {
return correlationId
}
return ""
}

View File

@@ -0,0 +1,6 @@
package utils
// IsErrorStatusCode checks if the status code is an error status code.
func IsErrorStatusCode(statusCode int) bool {
return statusCode >= 400
}

21
pkg/utils/string_utils.go Normal file
View File

@@ -0,0 +1,21 @@
package utils
import (
"fmt"
"strconv"
)
func ConvertToString(value any) string {
return fmt.Sprintf("%+v", value)
}
func ConvertStringToFloat64(value string) (float64, error) {
return strconv.ParseFloat(value, 64)
}
func GetLastNChars(str string, n int) string {
if n > len(str) {
return str
}
return str[len(str)-n:]
}