package scheduler import ( "alfred/config" "alfred/internal/infra" "alfred/internal/metrics" "alfred/internal/shedlock" "alfred/pkg/log" "alfred/repositoryAccessLayer" "alfred/utils" "go.uber.org/zap" ) func ScheduleCronForErrorEventsUpdate(repositoryAccessLayer *repositoryAccessLayer.RepositoryAccessLayer) { schedule := config.GetCoreConfig().ElasticSearchConfig.ErrorEventsUpdateCronSchedule go func() { _, err := infra.ScheduleJob(schedule, func() { log.Info("Cron Initiated for error events update") err := shedlock.Lock(utils.ERROR_EVENT_UPDATE_CRON, config.GetCoreConfig().ShedlockConfig.ErrorEventUpdateCronLockedUntil, repositoryAccessLayer) if err != nil { log.Error("Cron Already locked for error event update", zap.Error(err)) return } err = ErrorEventsUpdateHandler(utils.NAVI_USER_APP, *repositoryAccessLayer) if err != nil { metrics.ErrorEventsUpdateCronFailureCounter.WithLabelValues(utils.NAVI_USER_APP, err.Error()).Inc() log.Error("Error while updating error events for "+utils.NAVI_USER_APP, zap.Error(err)) return } }) if err != nil { log.Error("Error while scheduling cron for error events update "+utils.NAVI_USER_APP, zap.Error(err)) return } }() //uncomment when cosmos onboard // //go func() { // _, err := infra.ScheduleJob(schedule, func() { // errorEventsUploadIndex := config.GetCoreConfig().ElasticSearchConfig.ErrorEventsUploadIndexClientMap[utils.COSMOS] // sessionUploadIndex := config.GetCoreConfig().ElasticSearchConfig.SessionUploadIndexClientMap[utils.COSMOS] // errorEventsLastCronTimestampIndex := config.GetCoreConfig().ElasticSearchConfig.ErrorEventsLastCronTimestampIndex // endTime := utils.GetCurrentTimeInMillis() // startTime, err := repositoryAccessLayer.ErrorEventsAccessLayer.GetErrorEventsLastCronTimestamp(utils.COSMOS, errorEventsLastCronTimestampIndex) // if err != nil { // log.Error("Error while fetching last cron timestamp for error events "+utils.COSMOS, zap.Error(err)) // return // } // errorEventsUploadIndexList := helper.CreateSearchIndex(errorEventsUploadIndex, startTime, endTime) // invalidSessionIds, err := repositoryAccessLayer.ErrorEventsAccessLayer.FetchSessionIdFromSessionErrorEventsWithTimeRange(errorEventsUploadIndexList, startTime, endTime) // if err != nil { // log.Error("Error while fetching error events session id with time range "+utils.COSMOS, zap.Error(err)) // return // } // sessionUploadIndexList := helper.CreateSearchIndex(sessionUploadIndex, startTime, endTime) // err = repositoryAccessLayer.SessionsAccessLayer.UpdateSessionErrorEventsWithSessionId(invalidSessionIds, sessionUploadIndexList) // if err != nil { // log.Error("Error while updating invalid sessions with session id "+utils.COSMOS, zap.Error(err)) // return // } // err = repositoryAccessLayer.ErrorEventsAccessLayer.UpdateErrorEventsLastCronTimestamp(endTime, utils.COSMOS, errorEventsLastCronTimestampIndex) // if err != nil { // log.Error("Error while updating error events last cron timestamp "+utils.COSMOS, zap.Error(err)) // return // } // }) // if err != nil { // log.Error("Error while scheduling cron for error events update "+utils.COSMOS, zap.Error(err)) // return // } //}() }