TP-5555 | clean up job
This commit is contained in:
49
pkg/jobs/jobScheduler.go
Normal file
49
pkg/jobs/jobScheduler.go
Normal file
@@ -0,0 +1,49 @@
|
||||
package jobs
|
||||
|
||||
import (
|
||||
"cybertron/internal/client/elastic"
|
||||
"cybertron/pkg/log"
|
||||
"cybertron/pkg/utils"
|
||||
"github.com/go-co-op/gocron/v2"
|
||||
"go.uber.org/zap"
|
||||
"time"
|
||||
)
|
||||
|
||||
type JobScheduler struct {
|
||||
logger *log.Logger
|
||||
scheduler gocron.Scheduler
|
||||
elasticSearchClient *elastic.ElasticSearchClient
|
||||
}
|
||||
|
||||
func NewJobScheduler(logger *log.Logger, elasticSearchClient *elastic.ElasticSearchClient) *JobScheduler {
|
||||
s, err := gocron.NewScheduler()
|
||||
if err != nil {
|
||||
logger.Error("Failed to start scheduler", zap.Error(err))
|
||||
}
|
||||
|
||||
return &JobScheduler{
|
||||
logger: logger,
|
||||
scheduler: s,
|
||||
elasticSearchClient: elasticSearchClient,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *JobScheduler) ScheduleEsCleanUpJob() {
|
||||
s.scheduler.NewJob(gocron.DurationJob(2*time.Hour), gocron.NewTask(esCleanUpJob, s))
|
||||
}
|
||||
func esCleanUpJob(s *JobScheduler) {
|
||||
println("running es clean up job ")
|
||||
now := time.Now().Unix()
|
||||
// Subtract 7 days (7 days * 24 hours * 60 minutes * 60 seconds)
|
||||
sevenDaysAgo := now - (7 * 24 * 60 * 60)
|
||||
rangeQuery := utils.CreateRangeQueryForLteString("created_at", sevenDaysAgo)
|
||||
search_query := utils.CreateSearchQuery(rangeQuery)
|
||||
es_query := utils.CreateEsQuery(search_query)
|
||||
println("final query %v", es_query)
|
||||
s.elasticSearchClient.DeleteDocuments(es_query)
|
||||
|
||||
}
|
||||
func (s *JobScheduler) Start() {
|
||||
s.logger.Info("Starting job scheduler")
|
||||
s.scheduler.Start()
|
||||
}
|
||||
@@ -11,6 +11,7 @@ const (
|
||||
TermQueryForBool = `{ "term": { "%s": %t } }`
|
||||
RangeQuery = `{ "range": { "%s": { "gte": %d, "lte": %d } } }`
|
||||
RangeQueryGteString = `{ "range": { "%s": { "gte": "%s" } } }`
|
||||
RangeQueryLteString = `{ "range": { "%s": { "lte": "%d" } } }`
|
||||
MustQuery = `"must": [ %s ] `
|
||||
MustNotQuery = `"must_not": [ %s ]`
|
||||
ShouldQuery = `"should": [ %s ] `
|
||||
@@ -107,6 +108,10 @@ func CreateRangeQueryForGteString(key string, greaterThan string) string {
|
||||
return fmt.Sprintf(RangeQueryGteString, key, greaterThan)
|
||||
}
|
||||
|
||||
func CreateRangeQueryForLteString(key string, lessThan int64) string {
|
||||
return fmt.Sprintf(RangeQueryLteString, key, lessThan)
|
||||
}
|
||||
|
||||
func CreateMustQuery(filters ...string) string {
|
||||
return fmt.Sprintf(MustQuery, strings.Join(filters, ","))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user