TP-55555 | kafka fix

This commit is contained in:
varnit goyal
2024-10-22 13:04:56 +05:30
parent 396d28ed64
commit 2581c1b76a
7 changed files with 100 additions and 12 deletions

View File

@@ -7,6 +7,7 @@ import (
"cybertron/internal/client/elastic"
"cybertron/internal/database"
"cybertron/internal/transport/handler"
inPodCache "cybertron/pkg/cache"
"cybertron/pkg/db"
"cybertron/pkg/houstonClient"
httpclient "cybertron/pkg/httpClient"
@@ -66,12 +67,13 @@ func InitDependencies() *Dependencies {
elasticSearch, _ := elastic.NewElasticClient(*configs.GetElasticConfig())
mjolnirClient := mjolnirClient.NewMjolnirClient(*configs.GetMjolnirConfig())
houstonClient := houstonClient.NewHoustonClient(*configs.GetHoustonConfig())
cacheClient := inPodCache.NewCache()
documentServiceClient := document.NewDocumentServiceHttpClient(httpClient, logger, configs.GetDocumentServiceHttpClientConfigs())
projectServiceClient := service.NewProjectCreator(logger, dbClient, s3Client, kafkaProducer)
sourceMapServiceClient := service.NewSourceMapService(dbClient, s3Client, configs.GetAWSConfig())
releaseServiceClient := service.NewReleaseService(logger, dbClient)
exceptionServiceClient := service.NewExceptionService(logger, dbClient, kafkaProducer)
exceptionServiceClient := service.NewExceptionService(logger, dbClient, kafkaProducer, cacheClient)
searchServiceClient := service.NewSearchService(logger, elasticSearch)
authService := service.NewAuthService(mjolnirClient)
houstonService := service.NewHoustonService(logger, dbClient, kafkaProducer, houstonClient)

View File

@@ -11,6 +11,6 @@ func ExceptionRouter(r *gin.Engine, dep *dependencies.Dependencies) {
exceptionHandler := handler.NewExceptionHandler(dep.Service.ExceptionService)
exceptionRouterGroup := r.Group("/api")
{
exceptionRouterGroup.POST("/:projectId/envelope", exceptionHandler.CatchErrors)
exceptionRouterGroup.POST("/:projectId/envelope/", exceptionHandler.CatchErrors)
}
}

View File

@@ -14,4 +14,5 @@ type Project struct {
Team string `json:"team"`
Icon string `json:"icon"`
GithubUrl string `json:"githubUrl"`
Secret string `json:"secret"`
}

47
pkg/cache/inPodMemory.go vendored Normal file
View File

@@ -0,0 +1,47 @@
package inPodCache
import (
"sync"
)
// Cache represents an in-memory key-value store.
type Cache struct {
data map[string]interface{}
mu sync.RWMutex
}
// NewCache creates and initializes a new Cache instance.
func NewCache() *Cache {
return &Cache{
data: make(map[string]interface{}),
}
}
// Set adds or updates a key-value pair in the cache.
func (c *Cache) Set(key string, value interface{}) {
c.mu.Lock()
defer c.mu.Unlock()
c.data[key] = value
}
// Get retrieves the value associated with the given key from the cache.
func (c *Cache) Get(key string) (interface{}, bool) {
c.mu.RLock()
defer c.mu.RUnlock()
value, ok := c.data[key]
return value, ok
}
// Delete removes a key-value pair from the cache.
func (c *Cache) Delete(key string) {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.data, key)
}
// Clear removes all key-value pairs from the cache.
func (c *Cache) Clear() {
c.mu.Lock()
defer c.mu.Unlock()
c.data = make(map[string]interface{})
}

View File

@@ -2,11 +2,14 @@ package service
import (
"bufio"
"cybertron/models/db"
"cybertron/models/instrumentation"
inPodCache "cybertron/pkg/cache"
"cybertron/pkg/encoder"
"cybertron/pkg/kafka/producer"
"cybertron/pkg/log"
"cybertron/pkg/metrics"
"cybertron/pkg/utils"
"encoding/json"
"fmt"
"github.com/gin-gonic/gin"
@@ -15,9 +18,10 @@ import (
)
type ExceptionService struct {
logger *log.Logger
dbClient *gorm.DB
kafkaProducer producer.KProducer
logger *log.Logger
dbClient *gorm.DB
kafkaProducer producer.KProducer
inPodCacheClient *inPodCache.Cache
}
type Frame struct {
@@ -53,15 +57,36 @@ type Payload struct {
Extra interface{} `json:"extra"`
}
func NewExceptionService(logger *log.Logger, dbClient *gorm.DB, kafkaProducer producer.KProducer) *ExceptionService {
func NewExceptionService(logger *log.Logger, dbClient *gorm.DB, kafkaProducer producer.KProducer, cache *inPodCache.Cache) *ExceptionService {
return &ExceptionService{
logger: logger,
dbClient: dbClient,
kafkaProducer: kafkaProducer,
logger: logger,
dbClient: dbClient,
kafkaProducer: kafkaProducer,
inPodCacheClient: cache,
}
}
func (exceptionService *ExceptionService) CatchErrors(c *gin.Context) {
projectID := c.Param("projectId")
secret := c.Query("sentry_key")
var secretFromDb, found = exceptionService.inPodCacheClient.Get(projectID)
//validate project id and secret is valid
if !found {
var projectData db.Project
exceptionService.dbClient.First(&projectData, "project_reference_id = ?", projectID)
secretFromDb = projectData.Secret
exceptionService.inPodCacheClient.Set(projectID, secretFromDb)
}
if secret != secretFromDb {
utils.ErrorResponse(c, "Unable to validate")
return
}
scanner := bufio.NewScanner(c.Request.Body)
var lines []string
@@ -72,7 +97,6 @@ func (exceptionService *ExceptionService) CatchErrors(c *gin.Context) {
}
var jsonData Payload
projectID := c.Param("projectId")
err := json.Unmarshal([]byte(lines[2]), &jsonData)
if err != nil {

View File

@@ -1,10 +1,12 @@
package service
import (
"crypto/rand"
"cybertron/internal/client/aws"
"cybertron/models/db"
"cybertron/pkg/kafka/producer"
"cybertron/pkg/log"
"encoding/hex"
"math/big"
"net/http"
"strings"
@@ -46,6 +48,13 @@ func (pc *ProjectCreator) CreateProject(ctx *gin.Context) {
return
}
var uuidGenerated = uuid.New()
secret := make([]byte, 16)
_, err := rand.Read(secret)
if err != nil {
// handle error here
return
}
//converting uuid to all numbers since sentry only supports
//all digit project uuid
var i big.Int
@@ -58,6 +67,7 @@ func (pc *ProjectCreator) CreateProject(ctx *gin.Context) {
Team: projectBody.Team,
Icon: projectBody.Icon,
GithubUrl: projectBody.GithubUrl,
Secret: hex.EncodeToString(secret),
})
if result.Error != nil {

View File

@@ -68,7 +68,7 @@ func (s *SourceMapService) GetSourceMapUploadUrl(ctx *gin.Context) {
State: "IN_PROGRESS",
})
if result.Error != nil {
ctx.JSON(http.StatusInternalServerError, gin.H{"error": result.Error.Error(), "message": "Failed to create project"})
ctx.JSON(http.StatusInternalServerError, gin.H{"error": result.Error.Error(), "message": "Failed to source map"})
return
}
ctx.JSON(http.StatusOK, gin.H{
@@ -113,7 +113,11 @@ func (s *SourceMapService) ValidateSourceMap(ctx *gin.Context) {
})
}
var existingSourceMap db.SourceMap
s.dbClient.First(&existingSourceMap, "ProjectReferenceId = ? and ReleaseReferenceId= ? and file_name = ?", projectId, releaseId, fileName)
result := s.dbClient.First(&existingSourceMap, "ProjectReferenceId = ? and ReleaseReferenceId= ? and file_name = ?", projectId, releaseId, fileName)
if result.Error != nil {
ctx.JSON(http.StatusInternalServerError, gin.H{"error": result.Error.Error()})
return
}
if existingSourceMap.State == "DONE" {
ctx.JSON(http.StatusOK, gin.H{"status": "Source map stored successfully"})