TP-5555 | clean up job

This commit is contained in:
varnit goyal
2024-12-10 14:57:21 +05:30
parent d27c023186
commit 8366c352f7
7 changed files with 70 additions and 38 deletions

View File

@@ -86,3 +86,6 @@ mjolnir:
houston: houston:
service.url: HOUSTON_SERVICE_URL service.url: HOUSTON_SERVICE_URL
realm.id: HOUSTON_REALM_ID realm.id: HOUSTON_REALM_ID
jobScheduler:
retentionInDays: 7

View File

@@ -23,6 +23,7 @@ type AppConfig struct {
ElasticConfig *ElasticConfig ElasticConfig *ElasticConfig
mjolnir *MjolnirClientConfig mjolnir *MjolnirClientConfig
houston *HoustonClientConfig houston *HoustonClientConfig
JobSchedulerConfig *JobSchedulerConfig
} }
type MigConfig struct { type MigConfig struct {
@@ -51,6 +52,7 @@ func LoadConfig() {
ElasticConfig: NewElasticConfig(), ElasticConfig: NewElasticConfig(),
mjolnir: NewMjolnirConfig(), mjolnir: NewMjolnirConfig(),
houston: NewHoustonConfig(), houston: NewHoustonConfig(),
JobSchedulerConfig: NewJobSchedulerConfig(),
} }
} }
@@ -124,3 +126,7 @@ func GetMjolnirConfig() *MjolnirClientConfig {
func GetHoustonConfig() *HoustonClientConfig { func GetHoustonConfig() *HoustonClientConfig {
return appConfig.houston return appConfig.houston
} }
func GetJobSchedulerConfig() *JobSchedulerConfig {
return appConfig.JobSchedulerConfig
}

15
configs/jobScheduler.go Normal file
View File

@@ -0,0 +1,15 @@
package configs
type JobSchedulerConfig struct {
MaxRetentionDays int
}
func NewJobSchedulerConfig() *JobSchedulerConfig {
return &JobSchedulerConfig{
MaxRetentionDays: getInt("jobScheduler.retentionInDays", true),
}
}
func (p *JobSchedulerConfig) GetMaxRetentionDays() int {
return p.MaxRetentionDays
}

5
go.mod
View File

@@ -8,10 +8,11 @@ require (
github.com/confluentinc/confluent-kafka-go/v2 v2.5.0 github.com/confluentinc/confluent-kafka-go/v2 v2.5.0
github.com/elastic/go-elasticsearch/v8 v8.14.0 github.com/elastic/go-elasticsearch/v8 v8.14.0
github.com/gin-contrib/cors v1.7.2 github.com/gin-contrib/cors v1.7.2
github.com/gin-contrib/zap v0.2.0
github.com/gin-gonic/gin v1.9.1 github.com/gin-gonic/gin v1.9.1
github.com/go-co-op/gocron/v2 v2.12.4
github.com/golang-migrate/migrate/v4 v4.17.1 github.com/golang-migrate/migrate/v4 v4.17.1
github.com/google/uuid v1.6.0 github.com/google/uuid v1.6.0
github.com/lib/pq v1.10.9
github.com/prometheus/client_golang v1.19.1 github.com/prometheus/client_golang v1.19.1
github.com/spf13/cobra v1.8.0 github.com/spf13/cobra v1.8.0
github.com/spf13/viper v1.17.0 github.com/spf13/viper v1.17.0
@@ -50,7 +51,6 @@ require (
github.com/fsnotify/fsnotify v1.6.0 // indirect github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/gabriel-vasile/mimetype v1.4.3 // indirect github.com/gabriel-vasile/mimetype v1.4.3 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect github.com/gin-contrib/sse v0.1.0 // indirect
github.com/go-co-op/gocron/v2 v2.12.4 // indirect
github.com/go-logr/logr v1.4.1 // indirect github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-playground/locales v0.14.1 // indirect github.com/go-playground/locales v0.14.1 // indirect
@@ -72,7 +72,6 @@ require (
github.com/klauspost/compress v1.17.8 // indirect github.com/klauspost/compress v1.17.8 // indirect
github.com/klauspost/cpuid/v2 v2.2.7 // indirect github.com/klauspost/cpuid/v2 v2.2.7 // indirect
github.com/leodido/go-urn v1.4.0 // indirect github.com/leodido/go-urn v1.4.0 // indirect
github.com/lib/pq v1.10.9 // indirect
github.com/magiconair/properties v1.8.7 // indirect github.com/magiconair/properties v1.8.7 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect

View File

@@ -49,6 +49,7 @@ func (el *ElasticSearchClient) DeleteDocuments(searchRequest string) {
_, err := el.client.DeleteByQuery(el.Config.Index).Raw(strings.NewReader(searchRequest)).Do(context.TODO()) _, err := el.client.DeleteByQuery(el.Config.Index).Raw(strings.NewReader(searchRequest)).Do(context.TODO())
if err != nil { if err != nil {
log.Printf("unable to delete documents: %s", err.Error()) log.Printf("unable to delete documents: %s", err.Error())
return
} }
log.Printf("successfully deleted documents: %s", searchRequest) log.Printf("successfully deleted documents: %s", searchRequest)
} }

View File

@@ -78,11 +78,11 @@ func InitDependencies() *Dependencies {
searchServiceClient := service.NewSearchService(logger, elasticSearch) searchServiceClient := service.NewSearchService(logger, elasticSearch)
authService := service.NewAuthService(mjolnirClient) authService := service.NewAuthService(mjolnirClient)
houstonService := service.NewHoustonService(logger, dbClient, kafkaProducer, houstonClient) houstonService := service.NewHoustonService(logger, dbClient, kafkaProducer, houstonClient)
jobScheduler := jobs.NewJobScheduler(logger, elasticSearch)
jobScheduler.ScheduleEsCleanUpJob()
jobScheduler.Start()
services := initServices(documentServiceClient, projectServiceClient, sourceMapServiceClient, releaseServiceClient, exceptionServiceClient, searchServiceClient, authService) services := initServices(documentServiceClient, projectServiceClient, sourceMapServiceClient, releaseServiceClient, exceptionServiceClient, searchServiceClient, authService)
handlers := initHandlers(projectServiceClient, sourceMapServiceClient, releaseServiceClient, exceptionServiceClient, searchServiceClient, houstonService) handlers := initHandlers(projectServiceClient, sourceMapServiceClient, releaseServiceClient, exceptionServiceClient, searchServiceClient, houstonService)
jobScheduler := jobs.NewJobScheduler(logger, elasticSearch, configs.GetJobSchedulerConfig())
jobScheduler.ScheduleEsCleanUpJob()
jobScheduler.Start()
return &Dependencies{ return &Dependencies{
Service: services, Service: services,

View File

@@ -1,6 +1,7 @@
package jobs package jobs
import ( import (
"cybertron/configs"
"cybertron/internal/client/elastic" "cybertron/internal/client/elastic"
"cybertron/pkg/log" "cybertron/pkg/log"
"cybertron/pkg/utils" "cybertron/pkg/utils"
@@ -13,9 +14,10 @@ type JobScheduler struct {
logger *log.Logger logger *log.Logger
scheduler gocron.Scheduler scheduler gocron.Scheduler
elasticSearchClient *elastic.ElasticSearchClient elasticSearchClient *elastic.ElasticSearchClient
config configs.JobSchedulerConfig
} }
func NewJobScheduler(logger *log.Logger, elasticSearchClient *elastic.ElasticSearchClient) *JobScheduler { func NewJobScheduler(logger *log.Logger, elasticSearchClient *elastic.ElasticSearchClient, jonSchedulerConfig *configs.JobSchedulerConfig) *JobScheduler {
s, err := gocron.NewScheduler() s, err := gocron.NewScheduler()
if err != nil { if err != nil {
logger.Error("Failed to start scheduler", zap.Error(err)) logger.Error("Failed to start scheduler", zap.Error(err))
@@ -25,17 +27,23 @@ func NewJobScheduler(logger *log.Logger, elasticSearchClient *elastic.ElasticSea
logger: logger, logger: logger,
scheduler: s, scheduler: s,
elasticSearchClient: elasticSearchClient, elasticSearchClient: elasticSearchClient,
config: *jonSchedulerConfig,
} }
} }
func (s *JobScheduler) ScheduleEsCleanUpJob() { func (s *JobScheduler) ScheduleEsCleanUpJob() {
s.scheduler.NewJob(gocron.DurationJob(2*time.Hour), gocron.NewTask(esCleanUpJob, s)) s.scheduler.NewJob(gocron.DurationJob(2*time.Second), gocron.NewTask(esCleanUpJob, s))
} }
func esCleanUpJob(s *JobScheduler) { func esCleanUpJob(s *JobScheduler) {
println("running es clean up job ")
now := time.Now().Unix() now := time.Now().Unix()
// Subtract 7 days (7 days * 24 hours * 60 minutes * 60 seconds) // Subtract 7 days (7 days * 24 hours * 60 minutes * 60 seconds)
sevenDaysAgo := now - (7 * 24 * 60 * 60) retentionDays := s.config.GetMaxRetentionDays()
if retentionDays <= 0 {
retentionDays = 7
}
sevenDaysAgo := now - (int64(retentionDays) * 24 * 60 * 60)
rangeQuery := utils.CreateRangeQueryForLteString("created_at", sevenDaysAgo) rangeQuery := utils.CreateRangeQueryForLteString("created_at", sevenDaysAgo)
search_query := utils.CreateSearchQuery(rangeQuery) search_query := utils.CreateSearchQuery(rangeQuery)
es_query := utils.CreateEsQuery(search_query) es_query := utils.CreateEsQuery(search_query)