Merge pull request #55 from navi-ppl/TP-5555/es-cleanup-job
TP-5555 | clean up job
This commit is contained in:
@@ -86,3 +86,6 @@ mjolnir:
|
|||||||
houston:
|
houston:
|
||||||
service.url: HOUSTON_SERVICE_URL
|
service.url: HOUSTON_SERVICE_URL
|
||||||
realm.id: HOUSTON_REALM_ID
|
realm.id: HOUSTON_REALM_ID
|
||||||
|
|
||||||
|
jobScheduler:
|
||||||
|
retentionInDays: 7
|
||||||
|
|||||||
@@ -23,6 +23,7 @@ type AppConfig struct {
|
|||||||
ElasticConfig *ElasticConfig
|
ElasticConfig *ElasticConfig
|
||||||
mjolnir *MjolnirClientConfig
|
mjolnir *MjolnirClientConfig
|
||||||
houston *HoustonClientConfig
|
houston *HoustonClientConfig
|
||||||
|
JobSchedulerConfig *JobSchedulerConfig
|
||||||
}
|
}
|
||||||
|
|
||||||
type MigConfig struct {
|
type MigConfig struct {
|
||||||
@@ -51,6 +52,7 @@ func LoadConfig() {
|
|||||||
ElasticConfig: NewElasticConfig(),
|
ElasticConfig: NewElasticConfig(),
|
||||||
mjolnir: NewMjolnirConfig(),
|
mjolnir: NewMjolnirConfig(),
|
||||||
houston: NewHoustonConfig(),
|
houston: NewHoustonConfig(),
|
||||||
|
JobSchedulerConfig: NewJobSchedulerConfig(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -124,3 +126,7 @@ func GetMjolnirConfig() *MjolnirClientConfig {
|
|||||||
func GetHoustonConfig() *HoustonClientConfig {
|
func GetHoustonConfig() *HoustonClientConfig {
|
||||||
return appConfig.houston
|
return appConfig.houston
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func GetJobSchedulerConfig() *JobSchedulerConfig {
|
||||||
|
return appConfig.JobSchedulerConfig
|
||||||
|
}
|
||||||
|
|||||||
15
configs/jobScheduler.go
Normal file
15
configs/jobScheduler.go
Normal file
@@ -0,0 +1,15 @@
|
|||||||
|
package configs
|
||||||
|
|
||||||
|
type JobSchedulerConfig struct {
|
||||||
|
MaxRetentionDays int
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewJobSchedulerConfig() *JobSchedulerConfig {
|
||||||
|
return &JobSchedulerConfig{
|
||||||
|
MaxRetentionDays: getInt("jobScheduler.retentionInDays", true),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *JobSchedulerConfig) GetMaxRetentionDays() int {
|
||||||
|
return p.MaxRetentionDays
|
||||||
|
}
|
||||||
8
go.mod
8
go.mod
@@ -8,10 +8,11 @@ require (
|
|||||||
github.com/confluentinc/confluent-kafka-go/v2 v2.5.0
|
github.com/confluentinc/confluent-kafka-go/v2 v2.5.0
|
||||||
github.com/elastic/go-elasticsearch/v8 v8.14.0
|
github.com/elastic/go-elasticsearch/v8 v8.14.0
|
||||||
github.com/gin-contrib/cors v1.7.2
|
github.com/gin-contrib/cors v1.7.2
|
||||||
github.com/gin-contrib/zap v0.2.0
|
|
||||||
github.com/gin-gonic/gin v1.9.1
|
github.com/gin-gonic/gin v1.9.1
|
||||||
|
github.com/go-co-op/gocron/v2 v2.12.4
|
||||||
github.com/golang-migrate/migrate/v4 v4.17.1
|
github.com/golang-migrate/migrate/v4 v4.17.1
|
||||||
github.com/google/uuid v1.6.0
|
github.com/google/uuid v1.6.0
|
||||||
|
github.com/lib/pq v1.10.9
|
||||||
github.com/prometheus/client_golang v1.19.1
|
github.com/prometheus/client_golang v1.19.1
|
||||||
github.com/spf13/cobra v1.8.0
|
github.com/spf13/cobra v1.8.0
|
||||||
github.com/spf13/viper v1.17.0
|
github.com/spf13/viper v1.17.0
|
||||||
@@ -66,11 +67,11 @@ require (
|
|||||||
github.com/jackc/puddle/v2 v2.2.1 // indirect
|
github.com/jackc/puddle/v2 v2.2.1 // indirect
|
||||||
github.com/jinzhu/inflection v1.0.0 // indirect
|
github.com/jinzhu/inflection v1.0.0 // indirect
|
||||||
github.com/jinzhu/now v1.1.5 // indirect
|
github.com/jinzhu/now v1.1.5 // indirect
|
||||||
|
github.com/jonboulle/clockwork v0.4.0 // indirect
|
||||||
github.com/json-iterator/go v1.1.12 // indirect
|
github.com/json-iterator/go v1.1.12 // indirect
|
||||||
github.com/klauspost/compress v1.17.8 // indirect
|
github.com/klauspost/compress v1.17.8 // indirect
|
||||||
github.com/klauspost/cpuid/v2 v2.2.7 // indirect
|
github.com/klauspost/cpuid/v2 v2.2.7 // indirect
|
||||||
github.com/leodido/go-urn v1.4.0 // indirect
|
github.com/leodido/go-urn v1.4.0 // indirect
|
||||||
github.com/lib/pq v1.10.9 // indirect
|
|
||||||
github.com/magiconair/properties v1.8.7 // indirect
|
github.com/magiconair/properties v1.8.7 // indirect
|
||||||
github.com/mattn/go-isatty v0.0.20 // indirect
|
github.com/mattn/go-isatty v0.0.20 // indirect
|
||||||
github.com/mitchellh/mapstructure v1.5.0 // indirect
|
github.com/mitchellh/mapstructure v1.5.0 // indirect
|
||||||
@@ -81,6 +82,7 @@ require (
|
|||||||
github.com/prometheus/client_model v0.5.0 // indirect
|
github.com/prometheus/client_model v0.5.0 // indirect
|
||||||
github.com/prometheus/common v0.48.0 // indirect
|
github.com/prometheus/common v0.48.0 // indirect
|
||||||
github.com/prometheus/procfs v0.12.0 // indirect
|
github.com/prometheus/procfs v0.12.0 // indirect
|
||||||
|
github.com/robfig/cron/v3 v3.0.1 // indirect
|
||||||
github.com/rogpeppe/go-internal v1.11.0 // indirect
|
github.com/rogpeppe/go-internal v1.11.0 // indirect
|
||||||
github.com/sagikazarmark/locafero v0.3.0 // indirect
|
github.com/sagikazarmark/locafero v0.3.0 // indirect
|
||||||
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
|
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
|
||||||
@@ -100,7 +102,7 @@ require (
|
|||||||
go.uber.org/multierr v1.11.0 // indirect
|
go.uber.org/multierr v1.11.0 // indirect
|
||||||
golang.org/x/arch v0.7.0 // indirect
|
golang.org/x/arch v0.7.0 // indirect
|
||||||
golang.org/x/crypto v0.22.0 // indirect
|
golang.org/x/crypto v0.22.0 // indirect
|
||||||
golang.org/x/exp v0.0.0-20240112132812-db7319d0e0e3 // indirect
|
golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8 // indirect
|
||||||
golang.org/x/net v0.24.0 // indirect
|
golang.org/x/net v0.24.0 // indirect
|
||||||
golang.org/x/sync v0.7.0 // indirect
|
golang.org/x/sync v0.7.0 // indirect
|
||||||
golang.org/x/sys v0.20.0 // indirect
|
golang.org/x/sys v0.20.0 // indirect
|
||||||
|
|||||||
@@ -45,6 +45,14 @@ func (el *ElasticSearchClient) IndexDocument(document interface{}) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (el *ElasticSearchClient) DeleteDocuments(searchRequest string) {
|
||||||
|
_, err := el.client.DeleteByQuery(el.Config.Index).Raw(strings.NewReader(searchRequest)).Do(context.TODO())
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("unable to delete documents: %s", err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
log.Printf("successfully deleted documents: %s", searchRequest)
|
||||||
|
}
|
||||||
func (el *ElasticSearchClient) SearchDocuments(searchRequest string, fields []string) ([]map[string]interface{}, map[string]interface{}, int64, error) {
|
func (el *ElasticSearchClient) SearchDocuments(searchRequest string, fields []string) ([]map[string]interface{}, map[string]interface{}, int64, error) {
|
||||||
res, err := el.client.Search().
|
res, err := el.client.Search().
|
||||||
Index(el.Config.Index).Raw(strings.NewReader(searchRequest)).
|
Index(el.Config.Index).Raw(strings.NewReader(searchRequest)).
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ import (
|
|||||||
"cybertron/pkg/db"
|
"cybertron/pkg/db"
|
||||||
"cybertron/pkg/houstonClient"
|
"cybertron/pkg/houstonClient"
|
||||||
httpclient "cybertron/pkg/httpClient"
|
httpclient "cybertron/pkg/httpClient"
|
||||||
|
"cybertron/pkg/jobs"
|
||||||
"cybertron/pkg/kafka/producer"
|
"cybertron/pkg/kafka/producer"
|
||||||
"cybertron/pkg/log"
|
"cybertron/pkg/log"
|
||||||
"cybertron/pkg/mjolnirClient"
|
"cybertron/pkg/mjolnirClient"
|
||||||
@@ -77,9 +78,11 @@ func InitDependencies() *Dependencies {
|
|||||||
searchServiceClient := service.NewSearchService(logger, elasticSearch)
|
searchServiceClient := service.NewSearchService(logger, elasticSearch)
|
||||||
authService := service.NewAuthService(mjolnirClient)
|
authService := service.NewAuthService(mjolnirClient)
|
||||||
houstonService := service.NewHoustonService(logger, dbClient, kafkaProducer, houstonClient)
|
houstonService := service.NewHoustonService(logger, dbClient, kafkaProducer, houstonClient)
|
||||||
|
|
||||||
services := initServices(documentServiceClient, projectServiceClient, sourceMapServiceClient, releaseServiceClient, exceptionServiceClient, searchServiceClient, authService)
|
services := initServices(documentServiceClient, projectServiceClient, sourceMapServiceClient, releaseServiceClient, exceptionServiceClient, searchServiceClient, authService)
|
||||||
handlers := initHandlers(projectServiceClient, sourceMapServiceClient, releaseServiceClient, exceptionServiceClient, searchServiceClient, houstonService)
|
handlers := initHandlers(projectServiceClient, sourceMapServiceClient, releaseServiceClient, exceptionServiceClient, searchServiceClient, houstonService)
|
||||||
|
jobScheduler := jobs.NewJobScheduler(logger, elasticSearch, configs.GetJobSchedulerConfig())
|
||||||
|
jobScheduler.ScheduleEsCleanUpJob()
|
||||||
|
jobScheduler.Start()
|
||||||
|
|
||||||
return &Dependencies{
|
return &Dependencies{
|
||||||
Service: services,
|
Service: services,
|
||||||
|
|||||||
57
pkg/jobs/jobScheduler.go
Normal file
57
pkg/jobs/jobScheduler.go
Normal file
@@ -0,0 +1,57 @@
|
|||||||
|
package jobs
|
||||||
|
|
||||||
|
import (
|
||||||
|
"cybertron/configs"
|
||||||
|
"cybertron/internal/client/elastic"
|
||||||
|
"cybertron/pkg/log"
|
||||||
|
"cybertron/pkg/utils"
|
||||||
|
"github.com/go-co-op/gocron/v2"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type JobScheduler struct {
|
||||||
|
logger *log.Logger
|
||||||
|
scheduler gocron.Scheduler
|
||||||
|
elasticSearchClient *elastic.ElasticSearchClient
|
||||||
|
config configs.JobSchedulerConfig
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewJobScheduler(logger *log.Logger, elasticSearchClient *elastic.ElasticSearchClient, jonSchedulerConfig *configs.JobSchedulerConfig) *JobScheduler {
|
||||||
|
s, err := gocron.NewScheduler()
|
||||||
|
if err != nil {
|
||||||
|
logger.Error("Failed to start scheduler", zap.Error(err))
|
||||||
|
}
|
||||||
|
|
||||||
|
return &JobScheduler{
|
||||||
|
logger: logger,
|
||||||
|
scheduler: s,
|
||||||
|
elasticSearchClient: elasticSearchClient,
|
||||||
|
config: *jonSchedulerConfig,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *JobScheduler) ScheduleEsCleanUpJob() {
|
||||||
|
s.scheduler.NewJob(gocron.DurationJob(2*time.Second), gocron.NewTask(esCleanUpJob, s))
|
||||||
|
}
|
||||||
|
func esCleanUpJob(s *JobScheduler) {
|
||||||
|
|
||||||
|
now := time.Now().Unix()
|
||||||
|
// Subtract 7 days (7 days * 24 hours * 60 minutes * 60 seconds)
|
||||||
|
retentionDays := s.config.GetMaxRetentionDays()
|
||||||
|
|
||||||
|
if retentionDays <= 0 {
|
||||||
|
retentionDays = 7
|
||||||
|
}
|
||||||
|
sevenDaysAgo := now - (int64(retentionDays) * 24 * 60 * 60)
|
||||||
|
rangeQuery := utils.CreateRangeQueryForLteString("created_at", sevenDaysAgo)
|
||||||
|
search_query := utils.CreateSearchQuery(rangeQuery)
|
||||||
|
es_query := utils.CreateEsQuery(search_query)
|
||||||
|
println("final query %v", es_query)
|
||||||
|
s.elasticSearchClient.DeleteDocuments(es_query)
|
||||||
|
|
||||||
|
}
|
||||||
|
func (s *JobScheduler) Start() {
|
||||||
|
s.logger.Info("Starting job scheduler")
|
||||||
|
s.scheduler.Start()
|
||||||
|
}
|
||||||
@@ -11,6 +11,7 @@ const (
|
|||||||
TermQueryForBool = `{ "term": { "%s": %t } }`
|
TermQueryForBool = `{ "term": { "%s": %t } }`
|
||||||
RangeQuery = `{ "range": { "%s": { "gte": %d, "lte": %d } } }`
|
RangeQuery = `{ "range": { "%s": { "gte": %d, "lte": %d } } }`
|
||||||
RangeQueryGteString = `{ "range": { "%s": { "gte": "%s" } } }`
|
RangeQueryGteString = `{ "range": { "%s": { "gte": "%s" } } }`
|
||||||
|
RangeQueryLteString = `{ "range": { "%s": { "lte": "%d" } } }`
|
||||||
MustQuery = `"must": [ %s ] `
|
MustQuery = `"must": [ %s ] `
|
||||||
MustNotQuery = `"must_not": [ %s ]`
|
MustNotQuery = `"must_not": [ %s ]`
|
||||||
ShouldQuery = `"should": [ %s ] `
|
ShouldQuery = `"should": [ %s ] `
|
||||||
@@ -109,6 +110,10 @@ func CreateRangeQueryForGteString(key string, greaterThan string) string {
|
|||||||
return fmt.Sprintf(RangeQueryGteString, key, greaterThan)
|
return fmt.Sprintf(RangeQueryGteString, key, greaterThan)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func CreateRangeQueryForLteString(key string, lessThan int64) string {
|
||||||
|
return fmt.Sprintf(RangeQueryLteString, key, lessThan)
|
||||||
|
}
|
||||||
|
|
||||||
func CreateMustQuery(filters ...string) string {
|
func CreateMustQuery(filters ...string) string {
|
||||||
return fmt.Sprintf(MustQuery, strings.Join(filters, ","))
|
return fmt.Sprintf(MustQuery, strings.Join(filters, ","))
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user