75 lines
3.2 KiB
Go
75 lines
3.2 KiB
Go
package scheduler
|
|
|
|
import (
|
|
"alfred/config"
|
|
"alfred/internal/infra"
|
|
"alfred/internal/metrics"
|
|
"alfred/internal/shedlock"
|
|
"alfred/pkg/log"
|
|
"alfred/repositoryAccessLayer"
|
|
"alfred/utils"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
func ScheduleCronForErrorEventsUpdate(repositoryAccessLayer *repositoryAccessLayer.RepositoryAccessLayer) {
|
|
schedule := config.GetCoreConfig().ElasticSearchConfig.ErrorEventsUpdateCronSchedule
|
|
go func() {
|
|
_, err := infra.ScheduleJob(schedule, func() {
|
|
log.Info("Cron Initiated for error events update")
|
|
err := shedlock.Lock(utils.ERROR_EVENT_UPDATE_CRON, config.GetCoreConfig().ShedlockConfig.ErrorEventUpdateCronLockedUntil, repositoryAccessLayer)
|
|
if err != nil {
|
|
log.Error("Cron Already locked for error event update", zap.Error(err))
|
|
return
|
|
}
|
|
err = ErrorEventsUpdateHandler(utils.NAVI_USER_APP, *repositoryAccessLayer)
|
|
if err != nil {
|
|
metrics.ErrorEventsUpdateCronFailureCounter.WithLabelValues(utils.NAVI_USER_APP, err.Error()).Inc()
|
|
log.Error("Error while updating error events for "+utils.NAVI_USER_APP, zap.Error(err))
|
|
return
|
|
}
|
|
})
|
|
if err != nil {
|
|
log.Error("Error while scheduling cron for error events update "+utils.NAVI_USER_APP, zap.Error(err))
|
|
return
|
|
}
|
|
|
|
}()
|
|
|
|
//uncomment when cosmos onboard
|
|
//
|
|
//go func() {
|
|
// _, err := infra.ScheduleJob(schedule, func() {
|
|
// errorEventsUploadIndex := config.GetCoreConfig().ElasticSearchConfig.ErrorEventsUploadIndexClientMap[utils.COSMOS]
|
|
// sessionUploadIndex := config.GetCoreConfig().ElasticSearchConfig.SessionUploadIndexClientMap[utils.COSMOS]
|
|
// errorEventsLastCronTimestampIndex := config.GetCoreConfig().ElasticSearchConfig.ErrorEventsLastCronTimestampIndex
|
|
// endTime := utils.GetCurrentTimeInMillis()
|
|
// startTime, err := repositoryAccessLayer.ErrorEventsAccessLayer.GetErrorEventsLastCronTimestamp(utils.COSMOS, errorEventsLastCronTimestampIndex)
|
|
// if err != nil {
|
|
// log.Error("Error while fetching last cron timestamp for error events "+utils.COSMOS, zap.Error(err))
|
|
// return
|
|
// }
|
|
// errorEventsUploadIndexList := helper.CreateSearchIndex(errorEventsUploadIndex, startTime, endTime)
|
|
// invalidSessionIds, err := repositoryAccessLayer.ErrorEventsAccessLayer.FetchSessionIdFromSessionErrorEventsWithTimeRange(errorEventsUploadIndexList, startTime, endTime)
|
|
// if err != nil {
|
|
// log.Error("Error while fetching error events session id with time range "+utils.COSMOS, zap.Error(err))
|
|
// return
|
|
// }
|
|
// sessionUploadIndexList := helper.CreateSearchIndex(sessionUploadIndex, startTime, endTime)
|
|
// err = repositoryAccessLayer.SessionsAccessLayer.UpdateSessionErrorEventsWithSessionId(invalidSessionIds, sessionUploadIndexList)
|
|
// if err != nil {
|
|
// log.Error("Error while updating invalid sessions with session id "+utils.COSMOS, zap.Error(err))
|
|
// return
|
|
// }
|
|
// err = repositoryAccessLayer.ErrorEventsAccessLayer.UpdateErrorEventsLastCronTimestamp(endTime, utils.COSMOS, errorEventsLastCronTimestampIndex)
|
|
// if err != nil {
|
|
// log.Error("Error while updating error events last cron timestamp "+utils.COSMOS, zap.Error(err))
|
|
// return
|
|
// }
|
|
// })
|
|
// if err != nil {
|
|
// log.Error("Error while scheduling cron for error events update "+utils.COSMOS, zap.Error(err))
|
|
// return
|
|
// }
|
|
//}()
|
|
}
|