TP-55555 | release repository

This commit is contained in:
varnit-goyal_navi
2024-07-29 15:59:13 +05:30
11 changed files with 181 additions and 179 deletions

View File

@@ -42,21 +42,21 @@ http:
# Kafka config
kafka:
password: KAFKA_PASSWORD
username: KAFKA_USERNAME
brokers: KAFKA_BROKERS
password: XX
username: varnitgoyal/varnitgoyal95@gmail.com/ocid1.streampool.oc1.ap-mumbai-1.amaaaaaaotdslraanepwp54txqqxkmg4l6dghrhufiezqkx2lqhndgxoq7pa
brokers: cell-1.streaming.ap-mumbai-1.oci.oraclecloud.com:9092
group:
names: KAFKA_GROUP_NAMES
topics: KAFKA_TOPICS
names: {"kafka-stream": "kafka-stream"}
topics: {"kafka-stream": "kafka-stream"}
tls:
insecureSkipVerify: KAFKA_TLS_INSECURE_SKIP_VERIFY
enabled: KAFKA_TLS_ENABLED
insecureSkipVerify: true
enabled: false
sasl:
enabled: KAFKA_SASL_ENABLED
enabled: true
consumer:
batch.size: KAFKA_CONSUMER_BATCH_SIZE
goroutines.max.pool.size: KAFKA_CONSUMER_GOROUTINES_MAX_POOL_SIZE
max.timeout.ms: KAFKA_CONSUMER_MAX_TIMEOUT_MS
batch.size: 1
goroutines.max.pool.size: 20
max.timeout.ms: 300
auto.offset.reset: latest
@@ -65,3 +65,5 @@ DocumentService:
baseurl: DOCUMENT_SERVICE_BASEURL
mock:
generate_token: DOCUMENT_SERVICE_MOCK_GENERATE_TOKEN

15
configs/aws_config.go Normal file
View File

@@ -0,0 +1,15 @@
package configs
import "github.com/spf13/viper"
type AwsConfig struct {
Region string
Bucket string
}
func NewAWSConfig() *AwsConfig {
return &AwsConfig{
Region: viper.GetString("aws.Region"),
Bucket: viper.GetString("aws.Bucket"),
}
}

View File

@@ -18,7 +18,8 @@ type AppConfig struct {
timezone string
httpConfig *HttpConfig
clientConfigs *ClientConfigs
kafkaConfig *KafkaConfig
awsConfig *AwsConfig
KafkaConfig *KafkaConfig
}
type MigConfig struct {
@@ -42,7 +43,8 @@ func LoadConfig() {
timezone: getString("timezone", true),
clientConfigs: loadClientConfigs(),
httpConfig: NewHttpConfig(),
kafkaConfig: NewKafkaConfig(),
awsConfig: NewAWSConfig(),
KafkaConfig: NewKafkaConfig(),
}
}
@@ -79,6 +81,10 @@ func GetTimezone() *string {
return &appConfig.timezone
}
func GetAWSConfig() *AwsConfig {
return appConfig.awsConfig
}
func readConfig() {
viper.AutomaticEnv()
viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
@@ -98,5 +104,5 @@ func GetHttpConfig() *HttpConfig {
}
func GetKafkaConfig() *KafkaConfig {
return appConfig.kafkaConfig
return appConfig.KafkaConfig
}

View File

@@ -22,9 +22,9 @@ func RunDatabaseMigrations() error {
}
err = appMigrate.Up()
if err != nil && err != migrate.ErrNoChange {
log.Error("migrations error", zap.Error(err))
log.Log.Error("migrations error", zap.Error(err))
return err
}
log.Info("migrations successful")
log.Log.Info("migrations successful")
return nil
}

39
go.mod
View File

@@ -3,13 +3,15 @@ module cybertron
go 1.21.1
require (
github.com/IBM/sarama v1.43.2
github.com/aws/aws-sdk-go-v2/config v1.27.27
github.com/aws/aws-sdk-go-v2/service/s3 v1.58.2
github.com/confluentinc/confluent-kafka-go/v2 v2.5.0
github.com/gin-contrib/zap v0.2.0
github.com/gin-gonic/gin v1.9.1
github.com/golang-migrate/migrate/v4 v4.17.1
github.com/google/uuid v1.4.0
github.com/google/uuid v1.6.0
github.com/prometheus/client_golang v1.19.1
github.com/spf13/cobra v1.7.0
github.com/spf13/cobra v1.8.0
github.com/spf13/viper v1.17.0
github.com/xdg-go/scram v1.1.1
go.elastic.co/ecszap v1.0.2
@@ -20,15 +22,27 @@ require (
)
require (
github.com/aws/aws-sdk-go-v2 v1.30.3 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.3 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.17.27 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.11 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.15 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.15 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.15 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.3 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.17 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.17 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.15 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.22.4 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.26.4 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.30.3 // indirect
github.com/aws/smithy-go v1.20.3 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bytedance/sonic v1.10.2 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/chenzhuoyu/base64x v0.0.0-20230717121745-296ad89f973d // indirect
github.com/chenzhuoyu/iasm v0.9.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/eapache/go-resiliency v1.6.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/gabriel-vasile/mimetype v1.4.3 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect
@@ -36,21 +50,14 @@ require (
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/go-playground/validator/v10 v10.15.5 // indirect
github.com/goccy/go-json v0.10.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-uuid v1.0.3 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/pgx/v5 v5.5.4 // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
github.com/jcmturner/gofork v1.7.6 // indirect
github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect
github.com/json-iterator/go v1.1.12 // indirect
@@ -64,12 +71,10 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pelletier/go-toml/v2 v2.1.0 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.48.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
github.com/rogpeppe/go-internal v1.11.0 // indirect
github.com/sagikazarmark/locafero v0.3.0 // indirect
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
@@ -86,7 +91,7 @@ require (
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/arch v0.5.0 // indirect
golang.org/x/crypto v0.22.0 // indirect
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect
golang.org/x/exp v0.0.0-20240112132812-db7319d0e0e3 // indirect
golang.org/x/net v0.24.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/sys v0.19.0 // indirect

View File

@@ -0,0 +1,26 @@
package aws
import (
"context"
"cybertron/configs"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/s3"
"log"
)
type Actions struct {
S3Client *s3.Client
S3PresignClient *s3.PresignClient
}
func NewS3client(clientConfig configs.AwsConfig) *Actions {
cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithRegion(clientConfig.Region))
if err != nil {
log.Fatal(err)
}
svc := s3.NewFromConfig(cfg)
return &Actions{
S3Client: svc,
S3PresignClient: s3.NewPresignClient(svc),
}
}

View File

@@ -2,6 +2,7 @@ package dependencies
import (
"cybertron/configs"
"cybertron/internal/client/aws"
"cybertron/internal/client/document"
"cybertron/internal/database"
"cybertron/internal/transport/handler"
@@ -27,6 +28,7 @@ type Service struct {
ProjectService *service.ProjectCreator
ReleaseService *service.ReleaseService
ExceptionService *service.ExceptionService
S3Client *aws.Actions
// Add your service here
}
@@ -46,15 +48,16 @@ func InitDependencies() *Dependencies {
repositories := initRepositories(dbClient)
logger := log.Log
httpClient := httpclient.NewHttpClient(*configs.GetHttpConfig())
s3Client := aws.NewS3client(*configs.GetAWSConfig())
kafkaProducer := initKafkaProducer()
documentServiceClient := document.NewDocumentServiceHttpClient(httpClient, logger, configs.GetDocumentServiceHttpClientConfigs())
projectServiceClient := service.NewProjectCreator(logger, dbClient)
projectServiceClient := service.NewProjectCreator(logger, dbClient, s3Client, kafkaProducer)
releaseServiceClient := service.NewReleaseService(logger, dbClient)
exceptionServiceClient := service.NewExceptionService(logger, dbClient, kafkaProducer)
services := initServices(documentServiceClient, projectServiceClient, releaseServiceClient, exceptionServiceClient)
handlers := initHandlers(projectServiceClient, releaseServiceClient, exceptionServiceClient)
return &Dependencies{
Service: services,
DBClient: dbClient,

View File

@@ -10,7 +10,7 @@ type ProjectHandler struct {
}
func (h *ProjectHandler) ProjectCreate(c *gin.Context) {
h.projectCreatorService.ProjectCreate(c)
h.projectCreatorService.CreateProject(c)
}
func (h *ProjectHandler) ProjectGet(c *gin.Context) {

View File

@@ -1,77 +1,16 @@
package kafka
import (
"crypto/tls"
"cybertron/configs"
"github.com/IBM/sarama"
"time"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)
func SaramaSyncProducer(env string, baseConfig *configs.KafkaConfig) (sarama.AsyncProducer, error) {
return sarama.NewAsyncProducer(baseConfig.Brokers, kafkaProducerConfig(env, baseConfig))
}
func SaramaKafkaConsumer(env string, baseConfig *configs.KafkaConfig, groupID string) (sarama.ConsumerGroup, error) {
consumerGroup, err := sarama.NewConsumerGroup(baseConfig.Brokers, groupID, kafkaConsumerConfig(env, baseConfig, groupID))
if err != nil {
return nil, err
}
return consumerGroup, nil
}
func kafkaProducerConfig(env string, baseConfig *configs.KafkaConfig) *sarama.Config {
kafkaConfig := kafkaConfiguration(env, baseConfig)
kafkaConfig.Producer.Retry.Max = 3
kafkaConfig.Producer.RequiredAcks = sarama.WaitForLocal
kafkaConfig.Producer.Compression = sarama.CompressionSnappy
kafkaConfig.Producer.Return.Successes = true
kafkaConfig.Producer.Flush.Bytes = 31000
kafkaConfig.Producer.Flush.Frequency = 100 * time.Millisecond
kafkaConfig.Producer.Flush.Messages = 100
kafkaConfig.ChannelBufferSize = 512
kafkaConfig.Producer.MaxMessageBytes = 2000000
kafkaConfig.Metadata.RefreshFrequency = 1 * time.Minute
return kafkaConfig
}
func kafkaConsumerConfig(env string, baseConfig *configs.KafkaConfig, groupId string) *sarama.Config {
kConfig := kafkaConfiguration(env, baseConfig)
kConfig.Version = sarama.V3_6_0_0
kConfig.Consumer.Offsets.Initial = sarama.OffsetNewest
kConfig.Consumer.MaxProcessingTime = 50 * time.Millisecond
kConfig.Consumer.Return.Errors = true
kConfig.ClientID = groupId
kConfig.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRoundRobin()}
return kConfig
}
func kafkaConfiguration(env string, baseConfig *configs.KafkaConfig) *sarama.Config {
kafkaConfig := sarama.NewConfig()
if env == "local" {
return kafkaConfig
} else if env == "prod" {
kafkaConfig.Net.SASL.Mechanism = sarama.SASLTypePlaintext
} else {
kafkaConfig.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512
kafkaConfig.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &XDGSCRAMClient{HashGeneratorFcn: SHA512}
}
}
kafkaConfig.Net.SASL.User = baseConfig.Username
kafkaConfig.Net.SASL.Password = baseConfig.Password
kafkaConfig.Net.SASL.Enable = baseConfig.SaslEnabled
kafkaConfig.Net.TLS.Enable = baseConfig.TlsEnabled
kafkaConfig.Net.TLS.Config = &tls.Config{
InsecureSkipVerify: baseConfig.TlsInsureSkipVerification,
}
return kafkaConfig
func ConfluentProducer(baseConfig *configs.KafkaConfig) (*kafka.Producer, error) {
return kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": baseConfig.Brokers[0],
"security.protocol": "SASL_SSL",
"sasl.mechanisms": "PLAIN",
"sasl.username": baseConfig.Username,
"sasl.password": baseConfig.Password,
})
}

View File

@@ -5,95 +5,71 @@ import (
"cybertron/pkg/encoder"
"cybertron/pkg/kafka"
"cybertron/pkg/log"
"github.com/IBM/sarama"
"fmt"
ConfluentKafka "github.com/confluentinc/confluent-kafka-go/v2/kafka"
"go.uber.org/zap"
"os"
)
type KProducer interface {
Errors()
Successes()
PublishEvent(msgPayload interface{}, topic, key string, headers map[string]string, marshaller encoder.Encoder) error
sendMessage(msgPayload []byte, topic, key string, headers map[string]string) error
}
type KProducerImpl struct {
AsyncProducer sarama.AsyncProducer
AsyncProducer *ConfluentKafka.Producer
}
type ConfluentKafkaImplementation struct {
AsyncProducer *ConfluentKafka.Producer
}
func NewKProducer(env string, baseConfig *configs.KafkaConfig) *KProducerImpl {
producer, err := kafka.SaramaSyncProducer(env, baseConfig)
producer, err := kafka.ConfluentProducer(baseConfig)
if err != nil {
log.Log.Error("sarama kafka producer failed", zap.Error(err))
log.Log.Error(" kafka producer failed", zap.Error(err))
os.Exit(1)
}
kProducer := &KProducerImpl{
AsyncProducer: producer,
}
go func() {
kProducer.Errors()
}()
go func() {
kProducer.Successes()
}()
return kProducer
}
// Errors keep the track of failed messages.
func (kp *KProducerImpl) Errors() {
for err := range kp.AsyncProducer.Errors() {
keyBytes, errEncode := err.Msg.Key.Encode()
if errEncode != nil {
log.Log.Error("key encoding failed for failed message", zap.String("topic", err.Msg.Topic))
}
// todo - add metrics
log.Log.Error("failed to emit event to kafka", zap.String("topic", err.Msg.Topic),
zap.String("key", string(keyBytes)), zap.String("value", string(keyBytes)), zap.String("error", err.Error()))
}
}
// Successes is to check if message successfully delivered to kafka
func (kp *KProducerImpl) Successes() {
for msg := range kp.AsyncProducer.Successes() {
_, errEncode := msg.Key.Encode()
if errEncode != nil {
log.Log.Error("key encoding failed for failed message", zap.String("topic", msg.Topic))
}
// todo - add metrics
}
}
func (kp *KProducerImpl) sendMessage(msgPayload []byte, topic, key string, headers map[string]string) error {
var saramaHeaders []sarama.RecordHeader
for k, v := range headers {
saramaHeaders = append(saramaHeaders, sarama.RecordHeader{
Key: []byte(k),
Value: []byte(v),
})
}
message := &sarama.ProducerMessage{
Topic: topic,
Key: sarama.StringEncoder(key),
Value: sarama.StringEncoder(msgPayload),
Headers: saramaHeaders,
}
kp.AsyncProducer.Input() <- message
// todo - add metrics
return nil
}
func (kp *KProducerImpl) PublishEvent(msgPayload interface{}, topic, key string, headers map[string]string, marshaller encoder.Encoder) error {
msg, err := marshaller.Encode(msgPayload)
if err != nil {
log.Log.Error("error while marshalling msg payload", zap.Error(err))
return err
}
deliveryChan := make(chan ConfluentKafka.Event)
producerError := kp.AsyncProducer.Produce(&ConfluentKafka.Message{
TopicPartition: ConfluentKafka.TopicPartition{
Topic: &topic,
Partition: ConfluentKafka.PartitionAny,
},
Value: msg,
}, deliveryChan)
return kp.sendMessage(msg, topic, key, headers)
if producerError != nil {
fmt.Printf("producer error: %v", producerError)
}
e := <-deliveryChan
message := e.(*ConfluentKafka.Message)
if message.TopicPartition.Error != nil {
fmt.Printf("failed to deliver message: %v\n",
message.TopicPartition)
} else {
fmt.Printf("delivered to topic %s [%d] at offset %v\n",
*message.TopicPartition.Topic,
message.TopicPartition.Partition,
message.TopicPartition.Offset)
}
close(deliveryChan)
kp.AsyncProducer.Flush(30)
return nil
}

View File

@@ -1,7 +1,9 @@
package service
import (
"cybertron/internal/client/aws"
"cybertron/models/db"
"cybertron/pkg/kafka/producer"
"cybertron/pkg/log"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
@@ -10,8 +12,10 @@ import (
)
type ProjectCreator struct {
logger *log.Logger
dbClient *gorm.DB
logger *log.Logger
dbClient *gorm.DB
s3Client *aws.Actions
kafkaProducer producer.KProducer
}
type ProjectBody struct {
@@ -19,17 +23,19 @@ type ProjectBody struct {
Team string `json:"team" binding:"required"`
}
func NewProjectCreator(logger *log.Logger, dbClient *gorm.DB) *ProjectCreator {
func NewProjectCreator(logger *log.Logger, dbClient *gorm.DB, s3Client *aws.Actions, kafkaProducer producer.KProducer) *ProjectCreator {
return &ProjectCreator{
logger: logger,
dbClient: dbClient,
logger: logger,
dbClient: dbClient,
s3Client: s3Client,
kafkaProducer: kafkaProducer,
}
}
func (pc *ProjectCreator) ProjectCreate(c *gin.Context) {
func (pc *ProjectCreator) CreateProject(ctx *gin.Context) {
var projectBody ProjectBody
if err := c.BindJSON(&projectBody); err != nil {
c.JSON(http.StatusBadRequest, gin.H{
if err := ctx.BindJSON(&projectBody); err != nil {
ctx.JSON(http.StatusBadRequest, gin.H{
"message": "Invalid Request",
})
return
@@ -43,16 +49,40 @@ func (pc *ProjectCreator) ProjectCreate(c *gin.Context) {
})
if result.Error != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": result.Error.Error(), "message": "Failed to create project"})
ctx.JSON(http.StatusInternalServerError, gin.H{"error": result.Error.Error(), "message": "Failed to create project"})
return
}
c.JSON(http.StatusOK, gin.H{
ctx.JSON(http.StatusOK, gin.H{
"message": "Project created",
})
}
func (pc *ProjectCreator) ProjectGet(c *gin.Context) {
//s3 processing
//buckets, err := pc.s3Client.S3Client.ListBuckets(context.TODO(), &s3.ListBucketsInput{})
//key := "async js.pdf"
//bucket := "navi-cd955a63c4476df0f00c1cea0e4a40d1"
//request, err := pc.s3Client.S3PresignClient.PresignGetObject(context.TODO(), &s3.GetObjectInput{
// Bucket: &bucket,
// Key: &key,
//}, func(opts *s3.PresignOptions) {
// opts.Expires = time.Duration(7 * 24 * time.Hour)
//})
//pc.logger.Info(request.URL)
//if err != nil {
// log.Log.Fatal(err.Error())
// pc.logger.Error("S3 List Buckets Failed")
//}
//kafka producer testing
//generate Random numbers in go
//err := pc.kafkaProducer.PublishEvent(rand.Int(), "kafka-stream", "", nil, encoder.JsonEncoderInstance)
//if err != nil {
// pc.logger.Info(err.Error())
//}
var projects []db.Project
pc.dbClient.Find(&projects)