TP-55555|db additions adjustments in service,handler

This commit is contained in:
podili-varshitha_navi
2024-07-30 17:25:49 +05:30
23 changed files with 458 additions and 207 deletions

View File

@@ -42,21 +42,21 @@ http:
# Kafka config
kafka:
password: KAFKA_PASSWORD
username: KAFKA_USERNAME
brokers: KAFKA_BROKERS
password: XX
username: varnitgoyal/varnitgoyal95@gmail.com/ocid1.streampool.oc1.ap-mumbai-1.amaaaaaaotdslraanepwp54txqqxkmg4l6dghrhufiezqkx2lqhndgxoq7pa
brokers: cell-1.streaming.ap-mumbai-1.oci.oraclecloud.com:9092
group:
names: KAFKA_GROUP_NAMES
topics: KAFKA_TOPICS
names: {"kafka-stream": "kafka-stream"}
topics: {"kafka-stream": "kafka-stream"}
tls:
insecureSkipVerify: KAFKA_TLS_INSECURE_SKIP_VERIFY
enabled: KAFKA_TLS_ENABLED
insecureSkipVerify: true
enabled: false
sasl:
enabled: KAFKA_SASL_ENABLED
enabled: true
consumer:
batch.size: KAFKA_CONSUMER_BATCH_SIZE
goroutines.max.pool.size: KAFKA_CONSUMER_GOROUTINES_MAX_POOL_SIZE
max.timeout.ms: KAFKA_CONSUMER_MAX_TIMEOUT_MS
batch.size: 1
goroutines.max.pool.size: 20
max.timeout.ms: 300
auto.offset.reset: latest
@@ -65,3 +65,5 @@ DocumentService:
baseurl: DOCUMENT_SERVICE_BASEURL
mock:
generate_token: DOCUMENT_SERVICE_MOCK_GENERATE_TOKEN

15
configs/aws_config.go Normal file
View File

@@ -0,0 +1,15 @@
package configs
import "github.com/spf13/viper"
type AwsConfig struct {
Region string
Bucket string
}
func NewAWSConfig() *AwsConfig {
return &AwsConfig{
Region: viper.GetString("aws.Region"),
Bucket: viper.GetString("aws.Bucket"),
}
}

View File

@@ -18,6 +18,8 @@ type AppConfig struct {
timezone string
httpConfig *HttpConfig
clientConfigs *ClientConfigs
awsConfig *AwsConfig
KafkaConfig *KafkaConfig
}
type MigConfig struct {
@@ -41,6 +43,8 @@ func LoadConfig() {
timezone: getString("timezone", true),
clientConfigs: loadClientConfigs(),
httpConfig: NewHttpConfig(),
awsConfig: NewAWSConfig(),
KafkaConfig: NewKafkaConfig(),
}
}
@@ -77,6 +81,10 @@ func GetTimezone() *string {
return &appConfig.timezone
}
func GetAWSConfig() *AwsConfig {
return appConfig.awsConfig
}
func readConfig() {
viper.AutomaticEnv()
viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
@@ -94,3 +102,7 @@ func readConfig() {
func GetHttpConfig() *HttpConfig {
return appConfig.httpConfig
}
func GetKafkaConfig() *KafkaConfig {
return appConfig.KafkaConfig
}

View File

@@ -22,9 +22,9 @@ func RunDatabaseMigrations() error {
}
err = appMigrate.Up()
if err != nil && err != migrate.ErrNoChange {
log.Error("migrations error", zap.Error(err))
log.Log.Error("migrations error", zap.Error(err))
return err
}
log.Info("migrations successful")
log.Log.Info("migrations successful")
return nil
}

39
go.mod
View File

@@ -3,13 +3,15 @@ module cybertron
go 1.21.1
require (
github.com/IBM/sarama v1.43.2
github.com/aws/aws-sdk-go-v2/config v1.27.27
github.com/aws/aws-sdk-go-v2/service/s3 v1.58.2
github.com/confluentinc/confluent-kafka-go/v2 v2.5.0
github.com/gin-contrib/zap v0.2.0
github.com/gin-gonic/gin v1.9.1
github.com/golang-migrate/migrate/v4 v4.17.1
github.com/google/uuid v1.4.0
github.com/google/uuid v1.6.0
github.com/prometheus/client_golang v1.19.1
github.com/spf13/cobra v1.7.0
github.com/spf13/cobra v1.8.0
github.com/spf13/viper v1.17.0
github.com/xdg-go/scram v1.1.1
go.elastic.co/ecszap v1.0.2
@@ -20,15 +22,27 @@ require (
)
require (
github.com/aws/aws-sdk-go-v2 v1.30.3 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.3 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.17.27 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.11 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.15 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.15 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.15 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.3 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.17 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.17 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.15 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.22.4 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.26.4 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.30.3 // indirect
github.com/aws/smithy-go v1.20.3 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bytedance/sonic v1.10.2 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/chenzhuoyu/base64x v0.0.0-20230717121745-296ad89f973d // indirect
github.com/chenzhuoyu/iasm v0.9.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/eapache/go-resiliency v1.6.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/gabriel-vasile/mimetype v1.4.3 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect
@@ -36,21 +50,14 @@ require (
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/go-playground/validator/v10 v10.15.5 // indirect
github.com/goccy/go-json v0.10.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-uuid v1.0.3 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/pgx/v5 v5.5.4 // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
github.com/jcmturner/gofork v1.7.6 // indirect
github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect
github.com/json-iterator/go v1.1.12 // indirect
@@ -64,12 +71,10 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pelletier/go-toml/v2 v2.1.0 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.48.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
github.com/rogpeppe/go-internal v1.11.0 // indirect
github.com/sagikazarmark/locafero v0.3.0 // indirect
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
@@ -86,7 +91,7 @@ require (
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/arch v0.5.0 // indirect
golang.org/x/crypto v0.22.0 // indirect
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect
golang.org/x/exp v0.0.0-20240112132812-db7319d0e0e3 // indirect
golang.org/x/net v0.24.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/sys v0.19.0 // indirect

View File

@@ -0,0 +1,26 @@
package aws
import (
"context"
"cybertron/configs"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/s3"
"log"
)
type Actions struct {
S3Client *s3.Client
S3PresignClient *s3.PresignClient
}
func NewS3client(clientConfig configs.AwsConfig) *Actions {
cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithRegion(clientConfig.Region))
if err != nil {
log.Fatal(err)
}
svc := s3.NewFromConfig(cfg)
return &Actions{
S3Client: svc,
S3PresignClient: s3.NewPresignClient(svc),
}
}

View File

@@ -9,3 +9,13 @@ func InitProjectRepository(dbClient *gorm.DB) *gorm.DB {
dbClient.AutoMigrate(&db.Project{})
return dbClient
}
func InitReleaseRepository(dbClient *gorm.DB) *gorm.DB {
dbClient.AutoMigrate(&db.Release{})
return dbClient
}
func InitSourceMapRepository(dbClient *gorm.DB) *gorm.DB {
dbClient.AutoMigrate(&db.SourcMap{})
return dbClient
}

View File

@@ -2,11 +2,13 @@ package dependencies
import (
"cybertron/configs"
"cybertron/internal/client/aws"
"cybertron/internal/client/document"
"cybertron/internal/database"
"cybertron/internal/transport/handler"
"cybertron/pkg/db"
httpclient "cybertron/pkg/httpClient"
"cybertron/pkg/kafka/producer"
"cybertron/pkg/log"
"cybertron/service"
@@ -26,16 +28,22 @@ type Service struct {
DocumentService *document.HttpClient
ProjectService *service.ProjectCreator
SourceMapService *service.SourceMapService
ReleaseService *service.ReleaseService
ExceptionService *service.ExceptionService
S3Client *aws.Actions
// Add your service here
}
type Handler struct {
ProjectHandler *handler.ProjectHandler
SourceMapHandler *handler.SourceMapHandler
ReleaseHandler *handler.ReleasesHandler
ExceptionHandler *handler.ExceptionHandler
}
type Repositories struct {
ProjectRepository *gorm.DB
ReleaseRepository *gorm.DB
}
func InitDependencies() *Dependencies {
@@ -43,11 +51,17 @@ func InitDependencies() *Dependencies {
repositories := initRepositories(dbClient)
logger := log.Log
httpClient := httpclient.NewHttpClient(*configs.GetHttpConfig())
s3Client := aws.NewS3client(*configs.GetAWSConfig())
kafkaProducer := initKafkaProducer()
documentServiceClient := document.NewDocumentServiceHttpClient(httpClient, logger, configs.GetDocumentServiceHttpClientConfigs())
projectServiceClient := service.NewProjectCreator(logger, dbClient)
projectServiceClient := service.NewProjectCreator(logger, dbClient, s3Client, kafkaProducer)
sourceMapServiceClient := service.NewSourceMapService(dbClient)
services := initServices(documentServiceClient, projectServiceClient, sourceMapServiceClient)
handlers := initHandlers(projectServiceClient, sourceMapServiceClient)
releaseServiceClient := service.NewReleaseService(logger, dbClient)
exceptionServiceClient := service.NewExceptionService(logger, dbClient, kafkaProducer)
services := initServices(documentServiceClient, projectServiceClient, sourceMapServiceClient, releaseServiceClient, exceptionServiceClient)
handlers := initHandlers(projectServiceClient, sourceMapServiceClient, releaseServiceClient, exceptionServiceClient)
return &Dependencies{
Service: services,
DBClient: dbClient,
@@ -57,25 +71,37 @@ func InitDependencies() *Dependencies {
}
}
func initServices(documentService *document.HttpClient, projectService *service.ProjectCreator, sourceMapService *service.SourceMapService) *Service {
func initServices(documentService *document.HttpClient, projectService *service.ProjectCreator, sourceMapService *service.SourceMapService, releaseService *service.ReleaseService, exceptionService *service.ExceptionService) *Service {
return &Service{
DocumentService: documentService,
ProjectService: projectService,
SourceMapService: sourceMapService,
ReleaseService: releaseService,
ExceptionService: exceptionService,
}
}
func initRepositories(dbClient *gorm.DB) *Repositories {
return &Repositories{
ProjectRepository: database.InitProjectRepository(dbClient),
ReleaseRepository: database.InitReleaseRepository(dbClient),
}
}
func initHandlers(projectService *service.ProjectCreator, sourceMapService *service.SourceMapService) *Handler {
func initHandlers(projectService *service.ProjectCreator, sourceMapService *service.SourceMapService, releaseService *service.ReleaseService, exceotionService *service.ExceptionService) *Handler {
projectHandler := handler.NewProjectHandler(projectService)
sourceMapHandler := handler.NewSourceMapHandler(sourceMapService)
releaseHandler := handler.NewReleaseHandler(releaseService)
exceptionHandler := handler.NewExceptionHandler(exceotionService)
return &Handler{
ProjectHandler: projectHandler,
SourceMapHandler: sourceMapHandler,
ReleaseHandler: releaseHandler,
ExceptionHandler: exceptionHandler,
}
}
func initKafkaProducer() producer.KProducer {
kafkaProducer := producer.NewKProducer(configs.GetEnv(), configs.GetKafkaConfig())
return kafkaProducer
}

View File

@@ -0,0 +1,20 @@
package handler
import (
"cybertron/service"
"github.com/gin-gonic/gin"
)
type ExceptionHandler struct {
exceptionService *service.ExceptionService
}
func (h *ExceptionHandler) CatchErrors(c *gin.Context) {
h.exceptionService.CatchErrors(c)
}
func NewExceptionHandler(es *service.ExceptionService) *ExceptionHandler {
return &ExceptionHandler{
exceptionService: es,
}
}

View File

@@ -9,17 +9,12 @@ type ProjectHandler struct {
projectCreatorService *service.ProjectCreator
}
type ProjectBody struct {
Name string `json:"name" binding:"required"`
Team string `json:"team" binding:"required"`
}
func (h *ProjectHandler) ProjectCreate(c *gin.Context) {
h.projectCreatorService.CreateProject(c)
}
func (h *ProjectHandler) ProjectGet(c *gin.Context) {
h.projectCreatorService.GetProject(c)
h.projectCreatorService.ProjectGet(c)
}
func NewProjectHandler(projectCreatorService *service.ProjectCreator) *ProjectHandler {

View File

@@ -0,0 +1,24 @@
package handler
import (
"cybertron/service"
"github.com/gin-gonic/gin"
)
type ReleasesHandler struct {
releaseService *service.ReleaseService
}
func (h *ReleasesHandler) AddRelease(c *gin.Context) {
h.releaseService.AddRelease(c)
}
func (h *ReleasesHandler) GetReleases(c *gin.Context) {
h.releaseService.GetReleases(c)
}
func NewReleaseHandler(rs *service.ReleaseService) *ReleasesHandler {
return &ReleasesHandler{
releaseService: rs,
}
}

View File

@@ -1,7 +1,6 @@
package handler
import (
"cybertron/models/db"
"cybertron/service"
"net/http"
@@ -24,16 +23,5 @@ func (h *SourceMapHandler) GetSourceMap(c *gin.Context) {
}
func (h *SourceMapHandler) StoreSourceMap(c *gin.Context) {
var sourceMap db.SourceMap
if err := c.ShouldBindJSON(&sourceMap); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
if err := h.sourceMapService.StoreSourceMap(sourceMap); err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to store source map"})
return
}
c.JSON(http.StatusOK, gin.H{"status": "Source map stored successfully"})
h.sourceMapService.StoreSourceMap(c)
}

View File

@@ -0,0 +1,15 @@
package router
import (
"cybertron/internal/dependencies"
"cybertron/internal/transport/handler"
"github.com/gin-gonic/gin"
)
func ExceptionRouter(r *gin.Engine, dep *dependencies.Dependencies) {
exceptionHandler := handler.NewExceptionHandler(dep.Service.ExceptionService)
exceptionRouterGroup := r.Group("/api/v1")
{
exceptionRouterGroup.POST("/catch-errors", exceptionHandler.CatchErrors)
}
}

View File

@@ -0,0 +1,16 @@
package router
import (
"cybertron/internal/dependencies"
"cybertron/internal/transport/handler"
"github.com/gin-gonic/gin"
)
func ReleasesRouter(r *gin.Engine, dep *dependencies.Dependencies) {
releasesHandler := handler.NewReleaseHandler(dep.Service.ReleaseService)
releasesGroup := r.Group("/api/v1")
{
releasesGroup.POST("/release", releasesHandler.AddRelease)
releasesGroup.GET("/releases", releasesHandler.GetReleases)
}
}

View File

@@ -32,6 +32,8 @@ func (s *Server) router() {
router.ReadinessRouter(s.gin)
router.ProjectRouter(s.gin, s.dependencies)
router.SourceMapRouter(s.gin, s.dependencies.DBClient)
router.ReleasesRouter(s.gin, s.dependencies)
router.ExceptionRouter(s.gin, s.dependencies)
}
func (s *Server) Start() {

View File

@@ -1,9 +1,14 @@
package db
import "gorm.io/gorm"
import (
"github.com/google/uuid"
"gorm.io/gorm"
)
type Release struct {
gorm.Model
ReleaseId uuid.UUID
ProjectReferenceId string `gorm:"primaryKey"`
ReleaseVersion string `gorm:"column:name"`
SourceMapUrl string
}

View File

@@ -2,12 +2,15 @@ package db
import (
"github.com/google/uuid"
"gorm.io/gorm"
"time"
)
type Project struct {
gorm.Model
ProjectReferenceId uuid.UUID `gorm:"primaryKey"`
Name string `gorm:"column:name;unique"`
Team string
ID int `json:"id"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
DeletedAt time.Time `json:"deletedAt"`
ProjectReferenceId uuid.UUID `json:"projectReferenceId" gorm:"primaryKey"`
Name string `json:"name" gorm:"unique"`
Team string `json:"team"`
}

View File

@@ -1,77 +1,16 @@
package kafka
import (
"crypto/tls"
"cybertron/configs"
"github.com/IBM/sarama"
"time"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)
func SaramaSyncProducer(env string, baseConfig *configs.KafkaConfig) (sarama.AsyncProducer, error) {
return sarama.NewAsyncProducer(baseConfig.Brokers, kafkaProducerConfig(env, baseConfig))
}
func SaramaKafkaConsumer(env string, baseConfig *configs.KafkaConfig, groupID string) (sarama.ConsumerGroup, error) {
consumerGroup, err := sarama.NewConsumerGroup(baseConfig.Brokers, groupID, kafkaConsumerConfig(env, baseConfig, groupID))
if err != nil {
return nil, err
}
return consumerGroup, nil
}
func kafkaProducerConfig(env string, baseConfig *configs.KafkaConfig) *sarama.Config {
kafkaConfig := kafkaConfiguration(env, baseConfig)
kafkaConfig.Producer.Retry.Max = 3
kafkaConfig.Producer.RequiredAcks = sarama.WaitForLocal
kafkaConfig.Producer.Compression = sarama.CompressionSnappy
kafkaConfig.Producer.Return.Successes = true
kafkaConfig.Producer.Flush.Bytes = 31000
kafkaConfig.Producer.Flush.Frequency = 100 * time.Millisecond
kafkaConfig.Producer.Flush.Messages = 100
kafkaConfig.ChannelBufferSize = 512
kafkaConfig.Producer.MaxMessageBytes = 2000000
kafkaConfig.Metadata.RefreshFrequency = 1 * time.Minute
return kafkaConfig
}
func kafkaConsumerConfig(env string, baseConfig *configs.KafkaConfig, groupId string) *sarama.Config {
kConfig := kafkaConfiguration(env, baseConfig)
kConfig.Version = sarama.V3_6_0_0
kConfig.Consumer.Offsets.Initial = sarama.OffsetNewest
kConfig.Consumer.MaxProcessingTime = 50 * time.Millisecond
kConfig.Consumer.Return.Errors = true
kConfig.ClientID = groupId
kConfig.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRoundRobin()}
return kConfig
}
func kafkaConfiguration(env string, baseConfig *configs.KafkaConfig) *sarama.Config {
kafkaConfig := sarama.NewConfig()
if env == "local" {
return kafkaConfig
} else if env == "prod" {
kafkaConfig.Net.SASL.Mechanism = sarama.SASLTypePlaintext
} else {
kafkaConfig.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512
kafkaConfig.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &XDGSCRAMClient{HashGeneratorFcn: SHA512}
}
}
kafkaConfig.Net.SASL.User = baseConfig.Username
kafkaConfig.Net.SASL.Password = baseConfig.Password
kafkaConfig.Net.SASL.Enable = baseConfig.SaslEnabled
kafkaConfig.Net.TLS.Enable = baseConfig.TlsEnabled
kafkaConfig.Net.TLS.Config = &tls.Config{
InsecureSkipVerify: baseConfig.TlsInsureSkipVerification,
}
return kafkaConfig
func ConfluentProducer(baseConfig *configs.KafkaConfig) (*kafka.Producer, error) {
return kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": baseConfig.Brokers[0],
"security.protocol": "SASL_SSL",
"sasl.mechanisms": "PLAIN",
"sasl.username": baseConfig.Username,
"sasl.password": baseConfig.Password,
})
}

View File

@@ -5,95 +5,71 @@ import (
"cybertron/pkg/encoder"
"cybertron/pkg/kafka"
"cybertron/pkg/log"
"github.com/IBM/sarama"
"fmt"
ConfluentKafka "github.com/confluentinc/confluent-kafka-go/v2/kafka"
"go.uber.org/zap"
"os"
)
type KProducer interface {
Errors()
Successes()
PublishEvent(msgPayload interface{}, topic, key string, headers map[string]string, marshaller encoder.Encoder) error
sendMessage(msgPayload []byte, topic, key string, headers map[string]string) error
}
type KProducerImpl struct {
AsyncProducer sarama.AsyncProducer
AsyncProducer *ConfluentKafka.Producer
}
type ConfluentKafkaImplementation struct {
AsyncProducer *ConfluentKafka.Producer
}
func NewKProducer(env string, baseConfig *configs.KafkaConfig) *KProducerImpl {
producer, err := kafka.SaramaSyncProducer(env, baseConfig)
producer, err := kafka.ConfluentProducer(baseConfig)
if err != nil {
log.Log.Error("sarama kafka producer failed", zap.Error(err))
log.Log.Error(" kafka producer failed", zap.Error(err))
os.Exit(1)
}
kProducer := &KProducerImpl{
AsyncProducer: producer,
}
go func() {
kProducer.Errors()
}()
go func() {
kProducer.Successes()
}()
return kProducer
}
// Errors keep the track of failed messages.
func (kp *KProducerImpl) Errors() {
for err := range kp.AsyncProducer.Errors() {
keyBytes, errEncode := err.Msg.Key.Encode()
if errEncode != nil {
log.Log.Error("key encoding failed for failed message", zap.String("topic", err.Msg.Topic))
}
// todo - add metrics
log.Log.Error("failed to emit event to kafka", zap.String("topic", err.Msg.Topic),
zap.String("key", string(keyBytes)), zap.String("value", string(keyBytes)), zap.String("error", err.Error()))
}
}
// Successes is to check if message successfully delivered to kafka
func (kp *KProducerImpl) Successes() {
for msg := range kp.AsyncProducer.Successes() {
_, errEncode := msg.Key.Encode()
if errEncode != nil {
log.Log.Error("key encoding failed for failed message", zap.String("topic", msg.Topic))
}
// todo - add metrics
}
}
func (kp *KProducerImpl) sendMessage(msgPayload []byte, topic, key string, headers map[string]string) error {
var saramaHeaders []sarama.RecordHeader
for k, v := range headers {
saramaHeaders = append(saramaHeaders, sarama.RecordHeader{
Key: []byte(k),
Value: []byte(v),
})
}
message := &sarama.ProducerMessage{
Topic: topic,
Key: sarama.StringEncoder(key),
Value: sarama.StringEncoder(msgPayload),
Headers: saramaHeaders,
}
kp.AsyncProducer.Input() <- message
// todo - add metrics
return nil
}
func (kp *KProducerImpl) PublishEvent(msgPayload interface{}, topic, key string, headers map[string]string, marshaller encoder.Encoder) error {
msg, err := marshaller.Encode(msgPayload)
if err != nil {
log.Log.Error("error while marshalling msg payload", zap.Error(err))
return err
}
deliveryChan := make(chan ConfluentKafka.Event)
producerError := kp.AsyncProducer.Produce(&ConfluentKafka.Message{
TopicPartition: ConfluentKafka.TopicPartition{
Topic: &topic,
Partition: ConfluentKafka.PartitionAny,
},
Value: msg,
}, deliveryChan)
return kp.sendMessage(msg, topic, key, headers)
if producerError != nil {
fmt.Printf("producer error: %v", producerError)
}
e := <-deliveryChan
message := e.(*ConfluentKafka.Message)
if message.TopicPartition.Error != nil {
fmt.Printf("failed to deliver message: %v\n",
message.TopicPartition)
} else {
fmt.Printf("delivered to topic %s [%d] at offset %v\n",
*message.TopicPartition.Topic,
message.TopicPartition.Partition,
message.TopicPartition.Offset)
}
close(deliveryChan)
kp.AsyncProducer.Flush(30)
return nil
}

View File

@@ -0,0 +1,47 @@
package service
import (
"cybertron/configs"
"cybertron/pkg/encoder"
"cybertron/pkg/kafka/producer"
"cybertron/pkg/log"
"fmt"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
"gorm.io/gorm"
"net/http"
)
type ExceptionService struct {
logger *log.Logger
dbClient *gorm.DB
kafkaProducer producer.KProducer
}
func NewExceptionService(logger *log.Logger, dbClient *gorm.DB, kafkaProducer producer.KProducer) *ExceptionService {
return &ExceptionService{
logger: logger,
dbClient: dbClient,
kafkaProducer: kafkaProducer,
}
}
func (exceptionService *ExceptionService) CatchErrors(c *gin.Context) {
var errorsPayload []interface{}
if err := c.BindJSON(&errorsPayload); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid JSON payload"})
return
}
headerMap := make(map[string]string)
for _, errorItem := range errorsPayload {
err := exceptionService.kafkaProducer.PublishEvent(errorItem, configs.GetKafkaConfig().GetTopic("js-error-topic"), uuid.NewString(), headerMap, encoder.JsonEncoderInstance)
if err != nil {
fmt.Println("Failed to push error to kafka")
}
}
c.JSON(http.StatusOK, gin.H{"status": "success"})
}

View File

@@ -1,7 +1,9 @@
package service
import (
"cybertron/internal/client/aws"
"cybertron/models/db"
"cybertron/pkg/kafka/producer"
"cybertron/pkg/log"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
@@ -12,6 +14,8 @@ import (
type ProjectCreator struct {
logger *log.Logger
dbClient *gorm.DB
s3Client *aws.Actions
kafkaProducer producer.KProducer
}
type ProjectBody struct {
@@ -19,10 +23,12 @@ type ProjectBody struct {
Team string `json:"team" binding:"required"`
}
func NewProjectCreator(logger *log.Logger, dbClient *gorm.DB) *ProjectCreator {
func NewProjectCreator(logger *log.Logger, dbClient *gorm.DB, s3Client *aws.Actions, kafkaProducer producer.KProducer) *ProjectCreator {
return &ProjectCreator{
logger: logger,
dbClient: dbClient,
s3Client: s3Client,
kafkaProducer: kafkaProducer,
}
}
@@ -36,21 +42,54 @@ func (pc *ProjectCreator) CreateProject(ctx *gin.Context) {
}
// Write to database
pc.dbClient.Create(&db.Project{
result := pc.dbClient.Create(&db.Project{
ProjectReferenceId: uuid.New(),
Name: projectBody.Name,
Team: projectBody.Team,
})
if result.Error != nil {
ctx.JSON(http.StatusInternalServerError, gin.H{"error": result.Error.Error(), "message": "Failed to create project"})
return
}
ctx.JSON(http.StatusOK, gin.H{
"message": "Project created",
})
}
func (pc *ProjectCreator) GetProject(ctx *gin.Context) {
var project db.Project
pc.dbClient.First(&project, 1)
ctx.JSON(http.StatusOK, gin.H{
"message": project,
})
func (pc *ProjectCreator) ProjectGet(c *gin.Context) {
//s3 processing
//buckets, err := pc.s3Client.S3Client.ListBuckets(context.TODO(), &s3.ListBucketsInput{})
//key := "async js.pdf"
//bucket := "navi-cd955a63c4476df0f00c1cea0e4a40d1"
//request, err := pc.s3Client.S3PresignClient.PresignGetObject(context.TODO(), &s3.GetObjectInput{
// Bucket: &bucket,
// Key: &key,
//}, func(opts *s3.PresignOptions) {
// opts.Expires = time.Duration(7 * 24 * time.Hour)
//})
//pc.logger.Info(request.URL)
//if err != nil {
// log.Log.Fatal(err.Error())
// pc.logger.Error("S3 List Buckets Failed")
//}
//kafka producer testing
//generate Random numbers in go
//err := pc.kafkaProducer.PublishEvent(rand.Int(), "kafka-stream", "", nil, encoder.JsonEncoderInstance)
//if err != nil {
// pc.logger.Info(err.Error())
//}
var projects []db.Project
pc.dbClient.Find(&projects)
if result := pc.dbClient.Find(&projects); result.Error != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": result.Error.Error()})
return
}
c.JSON(http.StatusOK, projects)
}

69
service/ReleaseService.go Normal file
View File

@@ -0,0 +1,69 @@
package service
import (
"cybertron/models/db"
"cybertron/pkg/log"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
"gorm.io/gorm"
"net/http"
)
type ReleaseService struct {
logger *log.Logger
dbClient *gorm.DB
}
type ReleaseBody struct {
ProjectId string `json:"project_id" binding:"required"`
Version string `json:"version" binding:"required"`
SourceMapUrl string `json:"source_map_url" binding:"required"`
}
func NewReleaseService(logger *log.Logger, dbClient *gorm.DB) *ReleaseService {
return &ReleaseService{
logger: logger,
dbClient: dbClient,
}
}
// FIXME: This may not be requried now, can be used in source maps services only
func (releaseService *ReleaseService) AddRelease(c *gin.Context) {
var releaseBody ReleaseBody
if err := c.BindJSON(&releaseBody); err != nil {
c.JSON(http.StatusBadRequest, gin.H{
"message": "Invalid Request",
})
return
}
releaseToBeAdded := db.Release{
ReleaseId: uuid.New(),
ProjectReferenceId: releaseBody.ProjectId,
ReleaseVersion: releaseBody.Version,
SourceMapUrl: releaseBody.SourceMapUrl,
}
if result := releaseService.dbClient.Create(&releaseToBeAdded); result.Error != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": result.Error.Error()})
return
}
c.JSON(http.StatusOK, gin.H{
"message": "Release added successfully",
})
}
func (releaseService *ReleaseService) GetReleases(c *gin.Context) {
projectRefId := c.Query("project_reference_id")
var releases []db.Release
releaseService.dbClient.Where("project_reference_id = ?", projectRefId).Find(&releases)
if result := releaseService.dbClient.Find(&releases); result.Error != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": result.Error.Error()})
return
}
c.JSON(http.StatusOK, releases)
}

View File

@@ -2,6 +2,7 @@ package service
import (
"cybertron/models/db"
"net/http"
"time"
"gorm.io/gorm"
@@ -34,6 +35,22 @@ func (s *SourceMapService) GetSourceMap() db.SourceMap {
return sourceMap
}
func (s *SourceMapService) StoreSourceMap(sourceMap db.SourceMap) error {
return s.dbClient.Create(&sourceMap).Error
func (s *SourceMapService) StoreSourceMap(c *gin.Context) {
var sourceMap db.SourceMap
if err := c.ShouldBindJSON(&sourceMap); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
result := s.dbClient.Create(&db.SourceMap{
ReleaseReferenceId: sourceMap.ReleaseReferenceId,
ProjectReferenceId: sourceMap.ProjectReferenceId,
SourceMapZipUrl: sourceMap.SourceMapZipUrl,
})
if result.Error != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to store source map"})
return
}
c.JSON(http.StatusOK, gin.H{"status": "Source map stored successfully"})
}