98 lines
4.6 KiB
Go
98 lines
4.6 KiB
Go
package scheduler
|
|
|
|
import (
|
|
"alfred/cmd/core/app/helper"
|
|
"alfred/config"
|
|
"alfred/pkg/log"
|
|
"alfred/repositoryAccessLayer"
|
|
"alfred/utils"
|
|
"errors"
|
|
"go.uber.org/zap"
|
|
"math"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
func ErrorEventsUpdateHandler(clientName string, repositoryAccessLayer repositoryAccessLayer.RepositoryAccessLayer) error {
|
|
errorEventsUploadIndex := config.GetCoreConfig().ElasticSearchConfig.ErrorEventsUploadIndexClientMap[clientName]
|
|
sessionUploadIndex := config.GetCoreConfig().ElasticSearchConfig.SessionUploadIndexClientMap[clientName]
|
|
errorEventsLastCronTimestampIndex := config.GetCoreConfig().ElasticSearchConfig.ErrorEventsLastCronTimestampIndex
|
|
errorEventsUpdateCronDelayTimeInMinutes := config.GetCoreConfig().ElasticSearchConfig.ErrorEventsUpdateCronDelayTimeInMinutes
|
|
sessionErrorEventsFilter := config.GetCoreConfig().SessionErrorEventsFilter
|
|
endTime := utils.GetCurrentTimeInMillis() - ((errorEventsUpdateCronDelayTimeInMinutes * time.Minute).Milliseconds())
|
|
var result interface{}
|
|
var err error
|
|
retryFuncWithResponseAndError := func() (interface{}, error) {
|
|
result, err = repositoryAccessLayer.ErrorEventsAccessLayer.GetErrorEventsLastCronTimestamp(clientName, errorEventsLastCronTimestampIndex)
|
|
return result, err
|
|
}
|
|
result, err = utils.RetryFunctionWithResponseAndError(retryFuncWithResponseAndError, config.GetCoreConfig().ElasticSearchConfig.ElasticSearchUpdateMaxRetry, config.GetCoreConfig().ElasticSearchConfig.ElasticSearchUpdateRetryBackOffInSeconds*time.Second)
|
|
if err != nil {
|
|
log.Error("Error while fetching last cron timestamp for error events "+clientName, zap.Error(err))
|
|
return err
|
|
}
|
|
startTime := result.(int64)
|
|
errorEventsUploadIndexList := helper.CreateSearchIndex(errorEventsUploadIndex, startTime, endTime)
|
|
retryFuncWithResponseAndError = func() (interface{}, error) {
|
|
result, err = repositoryAccessLayer.ErrorEventsAccessLayer.FetchSessionIdFromSessionErrorEventsWithTimeRange(errorEventsUploadIndexList, startTime, endTime, sessionErrorEventsFilter)
|
|
return result, err
|
|
}
|
|
result, err = utils.RetryFunctionWithResponseAndError(retryFuncWithResponseAndError, config.GetCoreConfig().ElasticSearchConfig.ElasticSearchUpdateMaxRetry, config.GetCoreConfig().ElasticSearchConfig.ElasticSearchUpdateRetryBackOffInSeconds*time.Second)
|
|
if err != nil || result == nil {
|
|
log.Error("Error while fetching error events session id with time range "+clientName, zap.Error(err))
|
|
return err
|
|
}
|
|
invalidSessionIdsList := result.([]string)
|
|
|
|
batchSize := config.GetCoreConfig().ElasticSearchConfig.ErrorEventsUpdateBatchSize
|
|
var waitGroupForSessionUpdate sync.WaitGroup
|
|
var hasErrorMutex sync.Mutex
|
|
hasErrorInUpdate := false
|
|
maxConcurrency := config.GetCoreConfig().MaxUpdateSessionErrorEventsGofuncConcurrency
|
|
semaphore := make(chan struct{}, maxConcurrency)
|
|
|
|
for i := 0; i < len(invalidSessionIdsList); i += batchSize {
|
|
end := int(math.Min(float64(i+batchSize), float64(len(invalidSessionIdsList))))
|
|
invalidSessionIds := invalidSessionIdsList[i:end]
|
|
// Acquire a slot in the semaphore (blocks if the limit is reached)
|
|
semaphore <- struct{}{}
|
|
waitGroupForSessionUpdate.Add(1)
|
|
go func(invalidSessionIds []string) {
|
|
defer func() {
|
|
// Release the slot in the semaphore when the goroutine exits
|
|
<-semaphore
|
|
waitGroupForSessionUpdate.Done()
|
|
}()
|
|
retryFuncWithError := func() error {
|
|
err = repositoryAccessLayer.SessionsAccessLayer.UpdateSessionErrorEventsWithSessionId(invalidSessionIds, []string{sessionUploadIndex + "*"}, true)
|
|
return err
|
|
}
|
|
err = utils.RetryFunctionWithError(retryFuncWithError, config.GetCoreConfig().ElasticSearchConfig.ElasticSearchUpdateMaxRetry, config.GetCoreConfig().ElasticSearchConfig.ElasticSearchUpdateRetryBackOffInSeconds*time.Second)
|
|
if err != nil {
|
|
hasErrorMutex.Lock()
|
|
hasErrorInUpdate = true
|
|
log.Error("Error while updating invalid sessions with session id "+clientName, zap.Error(err))
|
|
hasErrorMutex.Unlock()
|
|
return
|
|
}
|
|
}(invalidSessionIds)
|
|
}
|
|
waitGroupForSessionUpdate.Wait()
|
|
close(semaphore)
|
|
|
|
if hasErrorInUpdate {
|
|
return errors.New("Error while updating invalid sessions with session id " + clientName)
|
|
}
|
|
|
|
retryFuncWithError := func() error {
|
|
err = repositoryAccessLayer.ErrorEventsAccessLayer.UpdateErrorEventsLastCronTimestamp(endTime, clientName, errorEventsLastCronTimestampIndex)
|
|
return err
|
|
}
|
|
err = utils.RetryFunctionWithError(retryFuncWithError, config.GetCoreConfig().ElasticSearchConfig.ElasticSearchUpdateMaxRetry, config.GetCoreConfig().ElasticSearchConfig.ElasticSearchUpdateRetryBackOffInSeconds*time.Second)
|
|
if err != nil {
|
|
log.Error("Error while updating error events last cron timestamp "+clientName, zap.Error(err))
|
|
return err
|
|
}
|
|
return nil
|
|
}
|