From eda34db8fc4b9e62c518f2a66ce008b8ccc56ea9 Mon Sep 17 00:00:00 2001 From: varnit goyal Date: Tue, 10 Dec 2024 13:24:49 +0530 Subject: [PATCH 1/2] TP-5555 | clean up job --- go.mod | 5 ++- internal/client/elastic/elastic.go | 7 +++ internal/dependencies/dependencies.go | 5 ++- pkg/jobs/jobScheduler.go | 49 +++++++++++++++++++++ pkg/utils/elastic_query_util.go | 5 +++ service/searchService.go | 63 --------------------------- 6 files changed, 69 insertions(+), 65 deletions(-) create mode 100644 pkg/jobs/jobScheduler.go diff --git a/go.mod b/go.mod index fc86b5f..7c904a8 100644 --- a/go.mod +++ b/go.mod @@ -50,6 +50,7 @@ require ( github.com/fsnotify/fsnotify v1.6.0 // indirect github.com/gabriel-vasile/mimetype v1.4.3 // indirect github.com/gin-contrib/sse v0.1.0 // indirect + github.com/go-co-op/gocron/v2 v2.12.4 // indirect github.com/go-logr/logr v1.4.1 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-playground/locales v0.14.1 // indirect @@ -66,6 +67,7 @@ require ( github.com/jackc/puddle/v2 v2.2.1 // indirect github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/now v1.1.5 // indirect + github.com/jonboulle/clockwork v0.4.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/compress v1.17.8 // indirect github.com/klauspost/cpuid/v2 v2.2.7 // indirect @@ -81,6 +83,7 @@ require ( github.com/prometheus/client_model v0.5.0 // indirect github.com/prometheus/common v0.48.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect + github.com/robfig/cron/v3 v3.0.1 // indirect github.com/rogpeppe/go-internal v1.11.0 // indirect github.com/sagikazarmark/locafero v0.3.0 // indirect github.com/sagikazarmark/slog-shim v0.1.0 // indirect @@ -100,7 +103,7 @@ require ( go.uber.org/multierr v1.11.0 // indirect golang.org/x/arch v0.7.0 // indirect golang.org/x/crypto v0.22.0 // indirect - golang.org/x/exp v0.0.0-20240112132812-db7319d0e0e3 // indirect + golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8 // indirect golang.org/x/net v0.24.0 // indirect golang.org/x/sync v0.7.0 // indirect golang.org/x/sys v0.20.0 // indirect diff --git a/internal/client/elastic/elastic.go b/internal/client/elastic/elastic.go index 808550a..8ff62f1 100644 --- a/internal/client/elastic/elastic.go +++ b/internal/client/elastic/elastic.go @@ -45,6 +45,13 @@ func (el *ElasticSearchClient) IndexDocument(document interface{}) { } } +func (el *ElasticSearchClient) DeleteDocuments(searchRequest string) { + _, err := el.client.DeleteByQuery(el.Config.Index).Raw(strings.NewReader(searchRequest)).Do(context.TODO()) + if err != nil { + log.Printf("unable to delete documents: %s", err.Error()) + } + log.Printf("successfully deleted documents: %s", searchRequest) +} func (el *ElasticSearchClient) SearchDocuments(searchRequest string, fields []string) ([]map[string]interface{}, map[string]interface{}, int64, error) { res, err := el.client.Search(). Index(el.Config.Index).Raw(strings.NewReader(searchRequest)). diff --git a/internal/dependencies/dependencies.go b/internal/dependencies/dependencies.go index 20b5e8d..8c0cd81 100644 --- a/internal/dependencies/dependencies.go +++ b/internal/dependencies/dependencies.go @@ -11,6 +11,7 @@ import ( "cybertron/pkg/db" "cybertron/pkg/houstonClient" httpclient "cybertron/pkg/httpClient" + "cybertron/pkg/jobs" "cybertron/pkg/kafka/producer" "cybertron/pkg/log" "cybertron/pkg/mjolnirClient" @@ -77,7 +78,9 @@ func InitDependencies() *Dependencies { searchServiceClient := service.NewSearchService(logger, elasticSearch) authService := service.NewAuthService(mjolnirClient) houstonService := service.NewHoustonService(logger, dbClient, kafkaProducer, houstonClient) - + jobScheduler := jobs.NewJobScheduler(logger, elasticSearch) + jobScheduler.ScheduleEsCleanUpJob() + jobScheduler.Start() services := initServices(documentServiceClient, projectServiceClient, sourceMapServiceClient, releaseServiceClient, exceptionServiceClient, searchServiceClient, authService) handlers := initHandlers(projectServiceClient, sourceMapServiceClient, releaseServiceClient, exceptionServiceClient, searchServiceClient, houstonService) diff --git a/pkg/jobs/jobScheduler.go b/pkg/jobs/jobScheduler.go new file mode 100644 index 0000000..4e999cc --- /dev/null +++ b/pkg/jobs/jobScheduler.go @@ -0,0 +1,49 @@ +package jobs + +import ( + "cybertron/internal/client/elastic" + "cybertron/pkg/log" + "cybertron/pkg/utils" + "github.com/go-co-op/gocron/v2" + "go.uber.org/zap" + "time" +) + +type JobScheduler struct { + logger *log.Logger + scheduler gocron.Scheduler + elasticSearchClient *elastic.ElasticSearchClient +} + +func NewJobScheduler(logger *log.Logger, elasticSearchClient *elastic.ElasticSearchClient) *JobScheduler { + s, err := gocron.NewScheduler() + if err != nil { + logger.Error("Failed to start scheduler", zap.Error(err)) + } + + return &JobScheduler{ + logger: logger, + scheduler: s, + elasticSearchClient: elasticSearchClient, + } +} + +func (s *JobScheduler) ScheduleEsCleanUpJob() { + s.scheduler.NewJob(gocron.DurationJob(2*time.Hour), gocron.NewTask(esCleanUpJob, s)) +} +func esCleanUpJob(s *JobScheduler) { + println("running es clean up job ") + now := time.Now().Unix() + // Subtract 7 days (7 days * 24 hours * 60 minutes * 60 seconds) + sevenDaysAgo := now - (7 * 24 * 60 * 60) + rangeQuery := utils.CreateRangeQueryForLteString("created_at", sevenDaysAgo) + search_query := utils.CreateSearchQuery(rangeQuery) + es_query := utils.CreateEsQuery(search_query) + println("final query %v", es_query) + s.elasticSearchClient.DeleteDocuments(es_query) + +} +func (s *JobScheduler) Start() { + s.logger.Info("Starting job scheduler") + s.scheduler.Start() +} diff --git a/pkg/utils/elastic_query_util.go b/pkg/utils/elastic_query_util.go index 2513d56..ace1dea 100644 --- a/pkg/utils/elastic_query_util.go +++ b/pkg/utils/elastic_query_util.go @@ -11,6 +11,7 @@ const ( TermQueryForBool = `{ "term": { "%s": %t } }` RangeQuery = `{ "range": { "%s": { "gte": %d, "lte": %d } } }` RangeQueryGteString = `{ "range": { "%s": { "gte": "%s" } } }` + RangeQueryLteString = `{ "range": { "%s": { "lte": "%d" } } }` MustQuery = `"must": [ %s ] ` MustNotQuery = `"must_not": [ %s ]` ShouldQuery = `"should": [ %s ] ` @@ -107,6 +108,10 @@ func CreateRangeQueryForGteString(key string, greaterThan string) string { return fmt.Sprintf(RangeQueryGteString, key, greaterThan) } +func CreateRangeQueryForLteString(key string, lessThan int64) string { + return fmt.Sprintf(RangeQueryLteString, key, lessThan) +} + func CreateMustQuery(filters ...string) string { return fmt.Sprintf(MustQuery, strings.Join(filters, ",")) } diff --git a/service/searchService.go b/service/searchService.go index dfc492c..2d690d0 100644 --- a/service/searchService.go +++ b/service/searchService.go @@ -47,24 +47,6 @@ func (s *SearchService) GetErrorDetails(c *gin.Context) { after_query := utils.CreateFromQuery(fromInNumber) es_query := utils.CreateEsQuery(search_query, size_query, sort_query, after_query) - // searchRequest := ` - //{ - // "size": 1, - // "query": { - // "term": { - // "error_hash": { - // "value": "%s" - // } - // } - // }, - // "sort": [ - // { "created_at": { "order": "asc" } } - // ], - // "search_after": ["1724732743"] - //} - // - // ` - searchRequestformatted := es_query fields := []string{"error", "significant_stack", "title"} @@ -125,51 +107,6 @@ func (s *SearchService) GetErrorList(c *gin.Context) { final_query := utils.CreateEsQuery(search_query, size_query, compositeAggsQuery) println("%s", final_query) - // searchRequest := ` - //{ - // "size": 0, - // "query": { - // "term": { - // "project_id": { - // "value": "%s" - // } - // } - // }, - // "aggs": { - // "errors_by_hash": { - // "composite": { - // "size": 3, - // "sources": [ - // { - // "error_hash": { - // "terms": { - // "field": "error_hash.keyword" - // } - // } - // } - // ] - // }, - // "aggs": { - // "unique_errors": { - // "top_hits": { - // "_source": { - // "includes": ["error", "error_hash"] - // } - // } - // }, - // "error_count": { - // "value_count": { - // "field": "error.keyword" - // } - // } - // } - // } - // - // } - //} - // - // ` - fields := []string{"error", "significant_stack", "title"} var _, aggs, total, err = s.elasticSearchClient.SearchDocuments(final_query, fields) From 8366c352f764610d8e6b38eeb0384e5f3290cd4f Mon Sep 17 00:00:00 2001 From: varnit goyal Date: Tue, 10 Dec 2024 14:57:21 +0530 Subject: [PATCH 2/2] TP-5555 | clean up job --- configs/application.yml | 3 ++ configs/config.go | 62 +++++++++++++++------------ configs/jobScheduler.go | 15 +++++++ go.mod | 5 +-- internal/client/elastic/elastic.go | 1 + internal/dependencies/dependencies.go | 6 +-- pkg/jobs/jobScheduler.go | 16 +++++-- 7 files changed, 70 insertions(+), 38 deletions(-) create mode 100644 configs/jobScheduler.go diff --git a/configs/application.yml b/configs/application.yml index b43b5be..4ba726e 100644 --- a/configs/application.yml +++ b/configs/application.yml @@ -86,3 +86,6 @@ mjolnir: houston: service.url: HOUSTON_SERVICE_URL realm.id: HOUSTON_REALM_ID + +jobScheduler: + retentionInDays: 7 diff --git a/configs/config.go b/configs/config.go index b4b88af..7a7fb2d 100644 --- a/configs/config.go +++ b/configs/config.go @@ -9,20 +9,21 @@ import ( ) type AppConfig struct { - name string - env string - port int - metricsPort int - prometheus *Prometheus - postgres Postgres - timezone string - httpConfig *HttpConfig - clientConfigs *ClientConfigs - awsConfig *AwsConfig - KafkaConfig *KafkaConfig - ElasticConfig *ElasticConfig - mjolnir *MjolnirClientConfig - houston *HoustonClientConfig + name string + env string + port int + metricsPort int + prometheus *Prometheus + postgres Postgres + timezone string + httpConfig *HttpConfig + clientConfigs *ClientConfigs + awsConfig *AwsConfig + KafkaConfig *KafkaConfig + ElasticConfig *ElasticConfig + mjolnir *MjolnirClientConfig + houston *HoustonClientConfig + JobSchedulerConfig *JobSchedulerConfig } type MigConfig struct { @@ -37,20 +38,21 @@ func LoadConfig() { readConfig() appConfig = AppConfig{ - name: getString("name", true), - env: getString("env", true), - port: getInt("port", true), - metricsPort: getInt("metrics.port", true), - prometheus: GetPrometheusConfig(), - postgres: getPostgresConfig(), - timezone: getString("timezone", true), - clientConfigs: loadClientConfigs(), - httpConfig: NewHttpConfig(), - awsConfig: NewAWSConfig(), - KafkaConfig: NewKafkaConfig(), - ElasticConfig: NewElasticConfig(), - mjolnir: NewMjolnirConfig(), - houston: NewHoustonConfig(), + name: getString("name", true), + env: getString("env", true), + port: getInt("port", true), + metricsPort: getInt("metrics.port", true), + prometheus: GetPrometheusConfig(), + postgres: getPostgresConfig(), + timezone: getString("timezone", true), + clientConfigs: loadClientConfigs(), + httpConfig: NewHttpConfig(), + awsConfig: NewAWSConfig(), + KafkaConfig: NewKafkaConfig(), + ElasticConfig: NewElasticConfig(), + mjolnir: NewMjolnirConfig(), + houston: NewHoustonConfig(), + JobSchedulerConfig: NewJobSchedulerConfig(), } } @@ -124,3 +126,7 @@ func GetMjolnirConfig() *MjolnirClientConfig { func GetHoustonConfig() *HoustonClientConfig { return appConfig.houston } + +func GetJobSchedulerConfig() *JobSchedulerConfig { + return appConfig.JobSchedulerConfig +} diff --git a/configs/jobScheduler.go b/configs/jobScheduler.go new file mode 100644 index 0000000..bc9fda7 --- /dev/null +++ b/configs/jobScheduler.go @@ -0,0 +1,15 @@ +package configs + +type JobSchedulerConfig struct { + MaxRetentionDays int +} + +func NewJobSchedulerConfig() *JobSchedulerConfig { + return &JobSchedulerConfig{ + MaxRetentionDays: getInt("jobScheduler.retentionInDays", true), + } +} + +func (p *JobSchedulerConfig) GetMaxRetentionDays() int { + return p.MaxRetentionDays +} diff --git a/go.mod b/go.mod index 7c904a8..35281b8 100644 --- a/go.mod +++ b/go.mod @@ -8,10 +8,11 @@ require ( github.com/confluentinc/confluent-kafka-go/v2 v2.5.0 github.com/elastic/go-elasticsearch/v8 v8.14.0 github.com/gin-contrib/cors v1.7.2 - github.com/gin-contrib/zap v0.2.0 github.com/gin-gonic/gin v1.9.1 + github.com/go-co-op/gocron/v2 v2.12.4 github.com/golang-migrate/migrate/v4 v4.17.1 github.com/google/uuid v1.6.0 + github.com/lib/pq v1.10.9 github.com/prometheus/client_golang v1.19.1 github.com/spf13/cobra v1.8.0 github.com/spf13/viper v1.17.0 @@ -50,7 +51,6 @@ require ( github.com/fsnotify/fsnotify v1.6.0 // indirect github.com/gabriel-vasile/mimetype v1.4.3 // indirect github.com/gin-contrib/sse v0.1.0 // indirect - github.com/go-co-op/gocron/v2 v2.12.4 // indirect github.com/go-logr/logr v1.4.1 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-playground/locales v0.14.1 // indirect @@ -72,7 +72,6 @@ require ( github.com/klauspost/compress v1.17.8 // indirect github.com/klauspost/cpuid/v2 v2.2.7 // indirect github.com/leodido/go-urn v1.4.0 // indirect - github.com/lib/pq v1.10.9 // indirect github.com/magiconair/properties v1.8.7 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect diff --git a/internal/client/elastic/elastic.go b/internal/client/elastic/elastic.go index 8ff62f1..98db07e 100644 --- a/internal/client/elastic/elastic.go +++ b/internal/client/elastic/elastic.go @@ -49,6 +49,7 @@ func (el *ElasticSearchClient) DeleteDocuments(searchRequest string) { _, err := el.client.DeleteByQuery(el.Config.Index).Raw(strings.NewReader(searchRequest)).Do(context.TODO()) if err != nil { log.Printf("unable to delete documents: %s", err.Error()) + return } log.Printf("successfully deleted documents: %s", searchRequest) } diff --git a/internal/dependencies/dependencies.go b/internal/dependencies/dependencies.go index 8c0cd81..8a176e6 100644 --- a/internal/dependencies/dependencies.go +++ b/internal/dependencies/dependencies.go @@ -78,11 +78,11 @@ func InitDependencies() *Dependencies { searchServiceClient := service.NewSearchService(logger, elasticSearch) authService := service.NewAuthService(mjolnirClient) houstonService := service.NewHoustonService(logger, dbClient, kafkaProducer, houstonClient) - jobScheduler := jobs.NewJobScheduler(logger, elasticSearch) - jobScheduler.ScheduleEsCleanUpJob() - jobScheduler.Start() services := initServices(documentServiceClient, projectServiceClient, sourceMapServiceClient, releaseServiceClient, exceptionServiceClient, searchServiceClient, authService) handlers := initHandlers(projectServiceClient, sourceMapServiceClient, releaseServiceClient, exceptionServiceClient, searchServiceClient, houstonService) + jobScheduler := jobs.NewJobScheduler(logger, elasticSearch, configs.GetJobSchedulerConfig()) + jobScheduler.ScheduleEsCleanUpJob() + jobScheduler.Start() return &Dependencies{ Service: services, diff --git a/pkg/jobs/jobScheduler.go b/pkg/jobs/jobScheduler.go index 4e999cc..f930501 100644 --- a/pkg/jobs/jobScheduler.go +++ b/pkg/jobs/jobScheduler.go @@ -1,6 +1,7 @@ package jobs import ( + "cybertron/configs" "cybertron/internal/client/elastic" "cybertron/pkg/log" "cybertron/pkg/utils" @@ -13,9 +14,10 @@ type JobScheduler struct { logger *log.Logger scheduler gocron.Scheduler elasticSearchClient *elastic.ElasticSearchClient + config configs.JobSchedulerConfig } -func NewJobScheduler(logger *log.Logger, elasticSearchClient *elastic.ElasticSearchClient) *JobScheduler { +func NewJobScheduler(logger *log.Logger, elasticSearchClient *elastic.ElasticSearchClient, jonSchedulerConfig *configs.JobSchedulerConfig) *JobScheduler { s, err := gocron.NewScheduler() if err != nil { logger.Error("Failed to start scheduler", zap.Error(err)) @@ -25,17 +27,23 @@ func NewJobScheduler(logger *log.Logger, elasticSearchClient *elastic.ElasticSea logger: logger, scheduler: s, elasticSearchClient: elasticSearchClient, + config: *jonSchedulerConfig, } } func (s *JobScheduler) ScheduleEsCleanUpJob() { - s.scheduler.NewJob(gocron.DurationJob(2*time.Hour), gocron.NewTask(esCleanUpJob, s)) + s.scheduler.NewJob(gocron.DurationJob(2*time.Second), gocron.NewTask(esCleanUpJob, s)) } func esCleanUpJob(s *JobScheduler) { - println("running es clean up job ") + now := time.Now().Unix() // Subtract 7 days (7 days * 24 hours * 60 minutes * 60 seconds) - sevenDaysAgo := now - (7 * 24 * 60 * 60) + retentionDays := s.config.GetMaxRetentionDays() + + if retentionDays <= 0 { + retentionDays = 7 + } + sevenDaysAgo := now - (int64(retentionDays) * 24 * 60 * 60) rangeQuery := utils.CreateRangeQueryForLteString("created_at", sevenDaysAgo) search_query := utils.CreateSearchQuery(rangeQuery) es_query := utils.CreateEsQuery(search_query)