diff --git a/configs/application.yml b/configs/application.yml index 5afcc27..09d257f 100644 --- a/configs/application.yml +++ b/configs/application.yml @@ -42,21 +42,21 @@ http: # Kafka config kafka: - password: KAFKA_PASSWORD - username: KAFKA_USERNAME - brokers: KAFKA_BROKERS + password: XX + username: varnitgoyal/varnitgoyal95@gmail.com/ocid1.streampool.oc1.ap-mumbai-1.amaaaaaaotdslraanepwp54txqqxkmg4l6dghrhufiezqkx2lqhndgxoq7pa + brokers: cell-1.streaming.ap-mumbai-1.oci.oraclecloud.com:9092 group: - names: KAFKA_GROUP_NAMES - topics: KAFKA_TOPICS + names: {"kafka-stream": "kafka-stream"} + topics: {"kafka-stream": "kafka-stream"} tls: - insecureSkipVerify: KAFKA_TLS_INSECURE_SKIP_VERIFY - enabled: KAFKA_TLS_ENABLED + insecureSkipVerify: true + enabled: false sasl: - enabled: KAFKA_SASL_ENABLED + enabled: true consumer: - batch.size: KAFKA_CONSUMER_BATCH_SIZE - goroutines.max.pool.size: KAFKA_CONSUMER_GOROUTINES_MAX_POOL_SIZE - max.timeout.ms: KAFKA_CONSUMER_MAX_TIMEOUT_MS + batch.size: 1 + goroutines.max.pool.size: 20 + max.timeout.ms: 300 auto.offset.reset: latest @@ -65,3 +65,5 @@ DocumentService: baseurl: DOCUMENT_SERVICE_BASEURL mock: generate_token: DOCUMENT_SERVICE_MOCK_GENERATE_TOKEN + + diff --git a/configs/aws_config.go b/configs/aws_config.go new file mode 100644 index 0000000..ba88c17 --- /dev/null +++ b/configs/aws_config.go @@ -0,0 +1,15 @@ +package configs + +import "github.com/spf13/viper" + +type AwsConfig struct { + Region string + Bucket string +} + +func NewAWSConfig() *AwsConfig { + return &AwsConfig{ + Region: viper.GetString("aws.Region"), + Bucket: viper.GetString("aws.Bucket"), + } +} diff --git a/configs/config.go b/configs/config.go index 21b7e60..7f9f8fd 100644 --- a/configs/config.go +++ b/configs/config.go @@ -18,7 +18,8 @@ type AppConfig struct { timezone string httpConfig *HttpConfig clientConfigs *ClientConfigs - kafkaConfig *KafkaConfig + awsConfig *AwsConfig + KafkaConfig *KafkaConfig } type MigConfig struct { @@ -42,7 +43,8 @@ func LoadConfig() { timezone: getString("timezone", true), clientConfigs: loadClientConfigs(), httpConfig: NewHttpConfig(), - kafkaConfig: NewKafkaConfig(), + awsConfig: NewAWSConfig(), + KafkaConfig: NewKafkaConfig(), } } @@ -79,6 +81,10 @@ func GetTimezone() *string { return &appConfig.timezone } +func GetAWSConfig() *AwsConfig { + return appConfig.awsConfig +} + func readConfig() { viper.AutomaticEnv() viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_")) @@ -98,5 +104,5 @@ func GetHttpConfig() *HttpConfig { } func GetKafkaConfig() *KafkaConfig { - return appConfig.kafkaConfig + return appConfig.KafkaConfig } diff --git a/db/migrations.go b/db/migrations.go index e0b9b63..a1416ee 100644 --- a/db/migrations.go +++ b/db/migrations.go @@ -22,9 +22,9 @@ func RunDatabaseMigrations() error { } err = appMigrate.Up() if err != nil && err != migrate.ErrNoChange { - log.Error("migrations error", zap.Error(err)) + log.Log.Error("migrations error", zap.Error(err)) return err } - log.Info("migrations successful") + log.Log.Info("migrations successful") return nil } diff --git a/go.mod b/go.mod index 21411ba..580ea70 100644 --- a/go.mod +++ b/go.mod @@ -3,13 +3,15 @@ module cybertron go 1.21.1 require ( - github.com/IBM/sarama v1.43.2 + github.com/aws/aws-sdk-go-v2/config v1.27.27 + github.com/aws/aws-sdk-go-v2/service/s3 v1.58.2 + github.com/confluentinc/confluent-kafka-go/v2 v2.5.0 github.com/gin-contrib/zap v0.2.0 github.com/gin-gonic/gin v1.9.1 github.com/golang-migrate/migrate/v4 v4.17.1 - github.com/google/uuid v1.4.0 + github.com/google/uuid v1.6.0 github.com/prometheus/client_golang v1.19.1 - github.com/spf13/cobra v1.7.0 + github.com/spf13/cobra v1.8.0 github.com/spf13/viper v1.17.0 github.com/xdg-go/scram v1.1.1 go.elastic.co/ecszap v1.0.2 @@ -20,15 +22,27 @@ require ( ) require ( + github.com/aws/aws-sdk-go-v2 v1.30.3 // indirect + github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.3 // indirect + github.com/aws/aws-sdk-go-v2/credentials v1.17.27 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.11 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.15 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.15 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 // indirect + github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.15 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.3 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.17 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.17 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.15 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.22.4 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.26.4 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.30.3 // indirect + github.com/aws/smithy-go v1.20.3 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bytedance/sonic v1.10.2 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/chenzhuoyu/base64x v0.0.0-20230717121745-296ad89f973d // indirect github.com/chenzhuoyu/iasm v0.9.0 // indirect - github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect - github.com/eapache/go-resiliency v1.6.0 // indirect - github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect - github.com/eapache/queue v1.1.0 // indirect github.com/fsnotify/fsnotify v1.6.0 // indirect github.com/gabriel-vasile/mimetype v1.4.3 // indirect github.com/gin-contrib/sse v0.1.0 // indirect @@ -36,21 +50,14 @@ require ( github.com/go-playground/universal-translator v0.18.1 // indirect github.com/go-playground/validator/v10 v10.15.5 // indirect github.com/goccy/go-json v0.10.2 // indirect - github.com/golang/snappy v0.0.4 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect - github.com/hashicorp/go-uuid v1.0.3 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect github.com/jackc/pgx/v5 v5.5.4 // indirect github.com/jackc/puddle/v2 v2.2.1 // indirect - github.com/jcmturner/aescts/v2 v2.0.0 // indirect - github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect - github.com/jcmturner/gofork v1.7.6 // indirect - github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect - github.com/jcmturner/rpc/v2 v2.0.3 // indirect github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/now v1.1.5 // indirect github.com/json-iterator/go v1.1.12 // indirect @@ -64,12 +71,10 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/pelletier/go-toml/v2 v2.1.0 // indirect - github.com/pierrec/lz4/v4 v4.1.21 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_model v0.5.0 // indirect github.com/prometheus/common v0.48.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect - github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect github.com/rogpeppe/go-internal v1.11.0 // indirect github.com/sagikazarmark/locafero v0.3.0 // indirect github.com/sagikazarmark/slog-shim v0.1.0 // indirect @@ -86,7 +91,7 @@ require ( go.uber.org/multierr v1.11.0 // indirect golang.org/x/arch v0.5.0 // indirect golang.org/x/crypto v0.22.0 // indirect - golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect + golang.org/x/exp v0.0.0-20240112132812-db7319d0e0e3 // indirect golang.org/x/net v0.24.0 // indirect golang.org/x/sync v0.7.0 // indirect golang.org/x/sys v0.19.0 // indirect diff --git a/internal/client/aws/aws.go b/internal/client/aws/aws.go new file mode 100644 index 0000000..2d67d61 --- /dev/null +++ b/internal/client/aws/aws.go @@ -0,0 +1,26 @@ +package aws + +import ( + "context" + "cybertron/configs" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/service/s3" + "log" +) + +type Actions struct { + S3Client *s3.Client + S3PresignClient *s3.PresignClient +} + +func NewS3client(clientConfig configs.AwsConfig) *Actions { + cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithRegion(clientConfig.Region)) + if err != nil { + log.Fatal(err) + } + svc := s3.NewFromConfig(cfg) + return &Actions{ + S3Client: svc, + S3PresignClient: s3.NewPresignClient(svc), + } +} diff --git a/internal/dependencies/dependencies.go b/internal/dependencies/dependencies.go index f17ea1e..fb873db 100644 --- a/internal/dependencies/dependencies.go +++ b/internal/dependencies/dependencies.go @@ -2,6 +2,7 @@ package dependencies import ( "cybertron/configs" + "cybertron/internal/client/aws" "cybertron/internal/client/document" "cybertron/internal/database" "cybertron/internal/transport/handler" @@ -27,6 +28,7 @@ type Service struct { ProjectService *service.ProjectCreator ReleaseService *service.ReleaseService ExceptionService *service.ExceptionService + S3Client *aws.Actions // Add your service here } @@ -46,15 +48,16 @@ func InitDependencies() *Dependencies { repositories := initRepositories(dbClient) logger := log.Log httpClient := httpclient.NewHttpClient(*configs.GetHttpConfig()) + s3Client := aws.NewS3client(*configs.GetAWSConfig()) kafkaProducer := initKafkaProducer() + documentServiceClient := document.NewDocumentServiceHttpClient(httpClient, logger, configs.GetDocumentServiceHttpClientConfigs()) - projectServiceClient := service.NewProjectCreator(logger, dbClient) + projectServiceClient := service.NewProjectCreator(logger, dbClient, s3Client, kafkaProducer) releaseServiceClient := service.NewReleaseService(logger, dbClient) exceptionServiceClient := service.NewExceptionService(logger, dbClient, kafkaProducer) services := initServices(documentServiceClient, projectServiceClient, releaseServiceClient, exceptionServiceClient) handlers := initHandlers(projectServiceClient, releaseServiceClient, exceptionServiceClient) - return &Dependencies{ Service: services, DBClient: dbClient, diff --git a/internal/transport/handler/project.go b/internal/transport/handler/project.go index 306ebfc..a73713c 100644 --- a/internal/transport/handler/project.go +++ b/internal/transport/handler/project.go @@ -10,7 +10,7 @@ type ProjectHandler struct { } func (h *ProjectHandler) ProjectCreate(c *gin.Context) { - h.projectCreatorService.ProjectCreate(c) + h.projectCreatorService.CreateProject(c) } func (h *ProjectHandler) ProjectGet(c *gin.Context) { diff --git a/pkg/kafka/config.go b/pkg/kafka/config.go index 5e6dd0c..2eb5cab 100644 --- a/pkg/kafka/config.go +++ b/pkg/kafka/config.go @@ -1,77 +1,16 @@ package kafka import ( - "crypto/tls" "cybertron/configs" - "github.com/IBM/sarama" - "time" + "github.com/confluentinc/confluent-kafka-go/v2/kafka" ) -func SaramaSyncProducer(env string, baseConfig *configs.KafkaConfig) (sarama.AsyncProducer, error) { - return sarama.NewAsyncProducer(baseConfig.Brokers, kafkaProducerConfig(env, baseConfig)) -} - -func SaramaKafkaConsumer(env string, baseConfig *configs.KafkaConfig, groupID string) (sarama.ConsumerGroup, error) { - consumerGroup, err := sarama.NewConsumerGroup(baseConfig.Brokers, groupID, kafkaConsumerConfig(env, baseConfig, groupID)) - if err != nil { - return nil, err - } - - return consumerGroup, nil -} - -func kafkaProducerConfig(env string, baseConfig *configs.KafkaConfig) *sarama.Config { - kafkaConfig := kafkaConfiguration(env, baseConfig) - - kafkaConfig.Producer.Retry.Max = 3 - kafkaConfig.Producer.RequiredAcks = sarama.WaitForLocal - kafkaConfig.Producer.Compression = sarama.CompressionSnappy - kafkaConfig.Producer.Return.Successes = true - kafkaConfig.Producer.Flush.Bytes = 31000 - kafkaConfig.Producer.Flush.Frequency = 100 * time.Millisecond - kafkaConfig.Producer.Flush.Messages = 100 - kafkaConfig.ChannelBufferSize = 512 - kafkaConfig.Producer.MaxMessageBytes = 2000000 - - kafkaConfig.Metadata.RefreshFrequency = 1 * time.Minute - - return kafkaConfig -} - -func kafkaConsumerConfig(env string, baseConfig *configs.KafkaConfig, groupId string) *sarama.Config { - kConfig := kafkaConfiguration(env, baseConfig) - - kConfig.Version = sarama.V3_6_0_0 - kConfig.Consumer.Offsets.Initial = sarama.OffsetNewest - kConfig.Consumer.MaxProcessingTime = 50 * time.Millisecond - kConfig.Consumer.Return.Errors = true - kConfig.ClientID = groupId - kConfig.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRoundRobin()} - - return kConfig -} - -func kafkaConfiguration(env string, baseConfig *configs.KafkaConfig) *sarama.Config { - kafkaConfig := sarama.NewConfig() - - if env == "local" { - return kafkaConfig - } else if env == "prod" { - kafkaConfig.Net.SASL.Mechanism = sarama.SASLTypePlaintext - } else { - kafkaConfig.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512 - kafkaConfig.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { - return &XDGSCRAMClient{HashGeneratorFcn: SHA512} - } - } - - kafkaConfig.Net.SASL.User = baseConfig.Username - kafkaConfig.Net.SASL.Password = baseConfig.Password - kafkaConfig.Net.SASL.Enable = baseConfig.SaslEnabled - kafkaConfig.Net.TLS.Enable = baseConfig.TlsEnabled - kafkaConfig.Net.TLS.Config = &tls.Config{ - InsecureSkipVerify: baseConfig.TlsInsureSkipVerification, - } - - return kafkaConfig +func ConfluentProducer(baseConfig *configs.KafkaConfig) (*kafka.Producer, error) { + return kafka.NewProducer(&kafka.ConfigMap{ + "bootstrap.servers": baseConfig.Brokers[0], + "security.protocol": "SASL_SSL", + "sasl.mechanisms": "PLAIN", + "sasl.username": baseConfig.Username, + "sasl.password": baseConfig.Password, + }) } diff --git a/pkg/kafka/producer/produce.go b/pkg/kafka/producer/produce.go index a9d29bd..a373381 100644 --- a/pkg/kafka/producer/produce.go +++ b/pkg/kafka/producer/produce.go @@ -5,95 +5,71 @@ import ( "cybertron/pkg/encoder" "cybertron/pkg/kafka" "cybertron/pkg/log" - "github.com/IBM/sarama" + "fmt" + ConfluentKafka "github.com/confluentinc/confluent-kafka-go/v2/kafka" "go.uber.org/zap" "os" ) type KProducer interface { - Errors() - Successes() PublishEvent(msgPayload interface{}, topic, key string, headers map[string]string, marshaller encoder.Encoder) error - sendMessage(msgPayload []byte, topic, key string, headers map[string]string) error } type KProducerImpl struct { - AsyncProducer sarama.AsyncProducer + AsyncProducer *ConfluentKafka.Producer +} + +type ConfluentKafkaImplementation struct { + AsyncProducer *ConfluentKafka.Producer } func NewKProducer(env string, baseConfig *configs.KafkaConfig) *KProducerImpl { - - producer, err := kafka.SaramaSyncProducer(env, baseConfig) + producer, err := kafka.ConfluentProducer(baseConfig) if err != nil { - log.Log.Error("sarama kafka producer failed", zap.Error(err)) + log.Log.Error(" kafka producer failed", zap.Error(err)) os.Exit(1) } - kProducer := &KProducerImpl{ AsyncProducer: producer, } - - go func() { - kProducer.Errors() - }() - go func() { - kProducer.Successes() - }() return kProducer } -// Errors keep the track of failed messages. -func (kp *KProducerImpl) Errors() { - for err := range kp.AsyncProducer.Errors() { - keyBytes, errEncode := err.Msg.Key.Encode() - if errEncode != nil { - log.Log.Error("key encoding failed for failed message", zap.String("topic", err.Msg.Topic)) - } - - // todo - add metrics - log.Log.Error("failed to emit event to kafka", zap.String("topic", err.Msg.Topic), - zap.String("key", string(keyBytes)), zap.String("value", string(keyBytes)), zap.String("error", err.Error())) - } -} - -// Successes is to check if message successfully delivered to kafka -func (kp *KProducerImpl) Successes() { - for msg := range kp.AsyncProducer.Successes() { - _, errEncode := msg.Key.Encode() - if errEncode != nil { - log.Log.Error("key encoding failed for failed message", zap.String("topic", msg.Topic)) - } - - // todo - add metrics - } -} - -func (kp *KProducerImpl) sendMessage(msgPayload []byte, topic, key string, headers map[string]string) error { - var saramaHeaders []sarama.RecordHeader - for k, v := range headers { - saramaHeaders = append(saramaHeaders, sarama.RecordHeader{ - Key: []byte(k), - Value: []byte(v), - }) - } - - message := &sarama.ProducerMessage{ - Topic: topic, - Key: sarama.StringEncoder(key), - Value: sarama.StringEncoder(msgPayload), - Headers: saramaHeaders, - } - kp.AsyncProducer.Input() <- message - // todo - add metrics - return nil -} - func (kp *KProducerImpl) PublishEvent(msgPayload interface{}, topic, key string, headers map[string]string, marshaller encoder.Encoder) error { msg, err := marshaller.Encode(msgPayload) if err != nil { log.Log.Error("error while marshalling msg payload", zap.Error(err)) return err } + deliveryChan := make(chan ConfluentKafka.Event) + producerError := kp.AsyncProducer.Produce(&ConfluentKafka.Message{ + TopicPartition: ConfluentKafka.TopicPartition{ + Topic: &topic, + Partition: ConfluentKafka.PartitionAny, + }, + Value: msg, + }, deliveryChan) - return kp.sendMessage(msg, topic, key, headers) + if producerError != nil { + fmt.Printf("producer error: %v", producerError) + } + + e := <-deliveryChan + + message := e.(*ConfluentKafka.Message) + if message.TopicPartition.Error != nil { + fmt.Printf("failed to deliver message: %v\n", + message.TopicPartition) + } else { + fmt.Printf("delivered to topic %s [%d] at offset %v\n", + *message.TopicPartition.Topic, + message.TopicPartition.Partition, + message.TopicPartition.Offset) + } + + close(deliveryChan) + + kp.AsyncProducer.Flush(30) + + return nil } diff --git a/service/ProjectCreator.go b/service/ProjectCreator.go index 7f28f23..5532e08 100644 --- a/service/ProjectCreator.go +++ b/service/ProjectCreator.go @@ -1,7 +1,9 @@ package service import ( + "cybertron/internal/client/aws" "cybertron/models/db" + "cybertron/pkg/kafka/producer" "cybertron/pkg/log" "github.com/gin-gonic/gin" "github.com/google/uuid" @@ -10,8 +12,10 @@ import ( ) type ProjectCreator struct { - logger *log.Logger - dbClient *gorm.DB + logger *log.Logger + dbClient *gorm.DB + s3Client *aws.Actions + kafkaProducer producer.KProducer } type ProjectBody struct { @@ -19,17 +23,19 @@ type ProjectBody struct { Team string `json:"team" binding:"required"` } -func NewProjectCreator(logger *log.Logger, dbClient *gorm.DB) *ProjectCreator { +func NewProjectCreator(logger *log.Logger, dbClient *gorm.DB, s3Client *aws.Actions, kafkaProducer producer.KProducer) *ProjectCreator { return &ProjectCreator{ - logger: logger, - dbClient: dbClient, + logger: logger, + dbClient: dbClient, + s3Client: s3Client, + kafkaProducer: kafkaProducer, } } -func (pc *ProjectCreator) ProjectCreate(c *gin.Context) { +func (pc *ProjectCreator) CreateProject(ctx *gin.Context) { var projectBody ProjectBody - if err := c.BindJSON(&projectBody); err != nil { - c.JSON(http.StatusBadRequest, gin.H{ + if err := ctx.BindJSON(&projectBody); err != nil { + ctx.JSON(http.StatusBadRequest, gin.H{ "message": "Invalid Request", }) return @@ -43,16 +49,40 @@ func (pc *ProjectCreator) ProjectCreate(c *gin.Context) { }) if result.Error != nil { - c.JSON(http.StatusInternalServerError, gin.H{"error": result.Error.Error(), "message": "Failed to create project"}) + ctx.JSON(http.StatusInternalServerError, gin.H{"error": result.Error.Error(), "message": "Failed to create project"}) return } - c.JSON(http.StatusOK, gin.H{ + ctx.JSON(http.StatusOK, gin.H{ "message": "Project created", }) } func (pc *ProjectCreator) ProjectGet(c *gin.Context) { + //s3 processing + //buckets, err := pc.s3Client.S3Client.ListBuckets(context.TODO(), &s3.ListBucketsInput{}) + //key := "async js.pdf" + //bucket := "navi-cd955a63c4476df0f00c1cea0e4a40d1" + //request, err := pc.s3Client.S3PresignClient.PresignGetObject(context.TODO(), &s3.GetObjectInput{ + // Bucket: &bucket, + // Key: &key, + //}, func(opts *s3.PresignOptions) { + // opts.Expires = time.Duration(7 * 24 * time.Hour) + //}) + //pc.logger.Info(request.URL) + //if err != nil { + // log.Log.Fatal(err.Error()) + // pc.logger.Error("S3 List Buckets Failed") + //} + + //kafka producer testing + + //generate Random numbers in go + + //err := pc.kafkaProducer.PublishEvent(rand.Int(), "kafka-stream", "", nil, encoder.JsonEncoderInstance) + //if err != nil { + // pc.logger.Info(err.Error()) + //} var projects []db.Project pc.dbClient.Find(&projects)