Files
alfred-be/alfred/scheduler/scheduleCronForErrorEventsUpdate.go
2026-03-08 16:14:42 +05:30

75 lines
3.2 KiB
Go

package scheduler
import (
"alfred/config"
"alfred/internal/infra"
"alfred/internal/metrics"
"alfred/internal/shedlock"
"alfred/pkg/log"
"alfred/repositoryAccessLayer"
"alfred/utils"
"go.uber.org/zap"
)
func ScheduleCronForErrorEventsUpdate(repositoryAccessLayer *repositoryAccessLayer.RepositoryAccessLayer) {
schedule := config.GetCoreConfig().ElasticSearchConfig.ErrorEventsUpdateCronSchedule
go func() {
_, err := infra.ScheduleJob(schedule, func() {
log.Info("Cron Initiated for error events update")
err := shedlock.Lock(utils.ERROR_EVENT_UPDATE_CRON, config.GetCoreConfig().ShedlockConfig.ErrorEventUpdateCronLockedUntil, repositoryAccessLayer)
if err != nil {
log.Error("Cron Already locked for error event update", zap.Error(err))
return
}
err = ErrorEventsUpdateHandler(utils.NAVI_USER_APP, *repositoryAccessLayer)
if err != nil {
metrics.ErrorEventsUpdateCronFailureCounter.WithLabelValues(utils.NAVI_USER_APP, err.Error()).Inc()
log.Error("Error while updating error events for "+utils.NAVI_USER_APP, zap.Error(err))
return
}
})
if err != nil {
log.Error("Error while scheduling cron for error events update "+utils.NAVI_USER_APP, zap.Error(err))
return
}
}()
//uncomment when cosmos onboard
//
//go func() {
// _, err := infra.ScheduleJob(schedule, func() {
// errorEventsUploadIndex := config.GetCoreConfig().ElasticSearchConfig.ErrorEventsUploadIndexClientMap[utils.COSMOS]
// sessionUploadIndex := config.GetCoreConfig().ElasticSearchConfig.SessionUploadIndexClientMap[utils.COSMOS]
// errorEventsLastCronTimestampIndex := config.GetCoreConfig().ElasticSearchConfig.ErrorEventsLastCronTimestampIndex
// endTime := utils.GetCurrentTimeInMillis()
// startTime, err := repositoryAccessLayer.ErrorEventsAccessLayer.GetErrorEventsLastCronTimestamp(utils.COSMOS, errorEventsLastCronTimestampIndex)
// if err != nil {
// log.Error("Error while fetching last cron timestamp for error events "+utils.COSMOS, zap.Error(err))
// return
// }
// errorEventsUploadIndexList := helper.CreateSearchIndex(errorEventsUploadIndex, startTime, endTime)
// invalidSessionIds, err := repositoryAccessLayer.ErrorEventsAccessLayer.FetchSessionIdFromSessionErrorEventsWithTimeRange(errorEventsUploadIndexList, startTime, endTime)
// if err != nil {
// log.Error("Error while fetching error events session id with time range "+utils.COSMOS, zap.Error(err))
// return
// }
// sessionUploadIndexList := helper.CreateSearchIndex(sessionUploadIndex, startTime, endTime)
// err = repositoryAccessLayer.SessionsAccessLayer.UpdateSessionErrorEventsWithSessionId(invalidSessionIds, sessionUploadIndexList)
// if err != nil {
// log.Error("Error while updating invalid sessions with session id "+utils.COSMOS, zap.Error(err))
// return
// }
// err = repositoryAccessLayer.ErrorEventsAccessLayer.UpdateErrorEventsLastCronTimestamp(endTime, utils.COSMOS, errorEventsLastCronTimestampIndex)
// if err != nil {
// log.Error("Error while updating error events last cron timestamp "+utils.COSMOS, zap.Error(err))
// return
// }
// })
// if err != nil {
// log.Error("Error while scheduling cron for error events update "+utils.COSMOS, zap.Error(err))
// return
// }
//}()
}