Files
alfred-be/alfred/scheduler/errorEventsHandler.go
2026-03-08 16:14:42 +05:30

98 lines
4.6 KiB
Go

package scheduler
import (
"alfred/cmd/core/app/helper"
"alfred/config"
"alfred/pkg/log"
"alfred/repositoryAccessLayer"
"alfred/utils"
"errors"
"go.uber.org/zap"
"math"
"sync"
"time"
)
func ErrorEventsUpdateHandler(clientName string, repositoryAccessLayer repositoryAccessLayer.RepositoryAccessLayer) error {
errorEventsUploadIndex := config.GetCoreConfig().ElasticSearchConfig.ErrorEventsUploadIndexClientMap[clientName]
sessionUploadIndex := config.GetCoreConfig().ElasticSearchConfig.SessionUploadIndexClientMap[clientName]
errorEventsLastCronTimestampIndex := config.GetCoreConfig().ElasticSearchConfig.ErrorEventsLastCronTimestampIndex
errorEventsUpdateCronDelayTimeInMinutes := config.GetCoreConfig().ElasticSearchConfig.ErrorEventsUpdateCronDelayTimeInMinutes
sessionErrorEventsFilter := config.GetCoreConfig().SessionErrorEventsFilter
endTime := utils.GetCurrentTimeInMillis() - ((errorEventsUpdateCronDelayTimeInMinutes * time.Minute).Milliseconds())
var result interface{}
var err error
retryFuncWithResponseAndError := func() (interface{}, error) {
result, err = repositoryAccessLayer.ErrorEventsAccessLayer.GetErrorEventsLastCronTimestamp(clientName, errorEventsLastCronTimestampIndex)
return result, err
}
result, err = utils.RetryFunctionWithResponseAndError(retryFuncWithResponseAndError, config.GetCoreConfig().ElasticSearchConfig.ElasticSearchUpdateMaxRetry, config.GetCoreConfig().ElasticSearchConfig.ElasticSearchUpdateRetryBackOffInSeconds*time.Second)
if err != nil {
log.Error("Error while fetching last cron timestamp for error events "+clientName, zap.Error(err))
return err
}
startTime := result.(int64)
errorEventsUploadIndexList := helper.CreateSearchIndex(errorEventsUploadIndex, startTime, endTime)
retryFuncWithResponseAndError = func() (interface{}, error) {
result, err = repositoryAccessLayer.ErrorEventsAccessLayer.FetchSessionIdFromSessionErrorEventsWithTimeRange(errorEventsUploadIndexList, startTime, endTime, sessionErrorEventsFilter)
return result, err
}
result, err = utils.RetryFunctionWithResponseAndError(retryFuncWithResponseAndError, config.GetCoreConfig().ElasticSearchConfig.ElasticSearchUpdateMaxRetry, config.GetCoreConfig().ElasticSearchConfig.ElasticSearchUpdateRetryBackOffInSeconds*time.Second)
if err != nil || result == nil {
log.Error("Error while fetching error events session id with time range "+clientName, zap.Error(err))
return err
}
invalidSessionIdsList := result.([]string)
batchSize := config.GetCoreConfig().ElasticSearchConfig.ErrorEventsUpdateBatchSize
var waitGroupForSessionUpdate sync.WaitGroup
var hasErrorMutex sync.Mutex
hasErrorInUpdate := false
maxConcurrency := config.GetCoreConfig().MaxUpdateSessionErrorEventsGofuncConcurrency
semaphore := make(chan struct{}, maxConcurrency)
for i := 0; i < len(invalidSessionIdsList); i += batchSize {
end := int(math.Min(float64(i+batchSize), float64(len(invalidSessionIdsList))))
invalidSessionIds := invalidSessionIdsList[i:end]
// Acquire a slot in the semaphore (blocks if the limit is reached)
semaphore <- struct{}{}
waitGroupForSessionUpdate.Add(1)
go func(invalidSessionIds []string) {
defer func() {
// Release the slot in the semaphore when the goroutine exits
<-semaphore
waitGroupForSessionUpdate.Done()
}()
retryFuncWithError := func() error {
err = repositoryAccessLayer.SessionsAccessLayer.UpdateSessionErrorEventsWithSessionId(invalidSessionIds, []string{sessionUploadIndex + "*"}, true)
return err
}
err = utils.RetryFunctionWithError(retryFuncWithError, config.GetCoreConfig().ElasticSearchConfig.ElasticSearchUpdateMaxRetry, config.GetCoreConfig().ElasticSearchConfig.ElasticSearchUpdateRetryBackOffInSeconds*time.Second)
if err != nil {
hasErrorMutex.Lock()
hasErrorInUpdate = true
log.Error("Error while updating invalid sessions with session id "+clientName, zap.Error(err))
hasErrorMutex.Unlock()
return
}
}(invalidSessionIds)
}
waitGroupForSessionUpdate.Wait()
close(semaphore)
if hasErrorInUpdate {
return errors.New("Error while updating invalid sessions with session id " + clientName)
}
retryFuncWithError := func() error {
err = repositoryAccessLayer.ErrorEventsAccessLayer.UpdateErrorEventsLastCronTimestamp(endTime, clientName, errorEventsLastCronTimestampIndex)
return err
}
err = utils.RetryFunctionWithError(retryFuncWithError, config.GetCoreConfig().ElasticSearchConfig.ElasticSearchUpdateMaxRetry, config.GetCoreConfig().ElasticSearchConfig.ElasticSearchUpdateRetryBackOffInSeconds*time.Second)
if err != nil {
log.Error("Error while updating error events last cron timestamp "+clientName, zap.Error(err))
return err
}
return nil
}