Files
alfred-be/alfred/repository/errorEventsRepository.go
2026-03-08 16:14:42 +05:30

179 lines
7.3 KiB
Go

package repository
import (
"alfred/internal/metrics"
"alfred/pkg/es7"
"alfred/pkg/log"
"alfred/utils"
"context"
"errors"
"fmt"
"github.com/elastic/go-elasticsearch/v8/esapi"
"go.uber.org/zap"
"strings"
)
type ErrorEventsRepository interface {
UploadErrorEvents(message string, errorsIngestionIndex string) (int, error)
FetchSessionIdFromSessionErrorEventsWithTimeRange(errorEventsUploadIndexList []string, startTime, endTime int64, allowedErrorEventsFilter []string) (*esapi.Response, error)
FetchErrorEvents(sessionId string, errorEventsUploadIndex string) (*esapi.Response, error)
GetErrorEventsLastCronTimestamp(errorEventsTimeIndex string) (*esapi.Response, error)
UpdateErrorEventsLastCronTimestamp(timestamp int64, clientName string, errorEventsTimeIndex string) error
FetchSessionErrorEventsWithKeyValue(keyValueMap map[string][]string, errorEventsIndex string) (*esapi.Response, error)
UpdateErrorEventsInActiveBulk(errorEventsdocIdList, errorEventsindexList []string) error
}
type ErrorEventsRepositoryImpl struct {
esClient es7.ElasticSearchClient
}
func NewErrorEventsRepository(esClient es7.ElasticSearchClient) ErrorEventsRepository {
return &ErrorEventsRepositoryImpl{
esClient: esClient,
}
}
func (r *ErrorEventsRepositoryImpl) UploadErrorEvents(message string, errorsIngestionIndex string) (int, error) {
year, month, day := utils.GetCurrentDate()
index := errorsIngestionIndex + fmt.Sprintf("-%d-%d-%d", year, month, day)
req := esapi.IndexRequest{
Index: index,
Body: strings.NewReader(message),
}
indexResponse, err := req.Do(context.Background(), r.esClient.GetESClient())
if indexResponse != nil {
defer indexResponse.Body.Close()
}
if err != nil {
metrics.ElasticSearchIngestionFailureCounter.WithLabelValues(errorsIngestionIndex).Inc()
log.Error("IndexResponse is nil for UploadErrorEvents", zap.Error(err))
return 500, err
}
if indexResponse.IsError() {
metrics.ElasticSearchIngestionFailureCounter.WithLabelValues(errorsIngestionIndex).Inc()
err = errors.New("create error events upload data failed")
}
if err != nil {
metrics.ElasticSearchIngestionFailureCounter.WithLabelValues(errorsIngestionIndex).Inc()
log.Error("error-events data insert document upload in ES failed", zap.String("response", fmt.Sprintf("%v", indexResponse)), zap.Error(err))
return indexResponse.StatusCode, err
}
metrics.ElasticSearchIngestionSuccessCounter.WithLabelValues(errorsIngestionIndex).Inc()
return indexResponse.StatusCode, nil
}
func (r *ErrorEventsRepositoryImpl) FetchSessionIdFromSessionErrorEventsWithTimeRange(errorEventsUploadIndexList []string, startTime, endTime int64, sessionErrorEventsFilter []string) (*esapi.Response, error) {
content := getSessionIdFromSessionErrorEventsWithTimeRangeQuery(startTime, endTime, sessionErrorEventsFilter)
result, err := r.esClient.FetchESDataMultipleIndex(errorEventsUploadIndexList, content)
if err != nil {
return nil, err
}
return result, nil
}
func (r *ErrorEventsRepositoryImpl) FetchErrorEvents(sessionId string, errorEventsUploadIndex string) (*esapi.Response, error) {
content := getErrorEventsQuery(sessionId)
result, err := r.esClient.FetchESData(errorEventsUploadIndex, content)
if err != nil {
return nil, err
}
return result, nil
}
func (r *ErrorEventsRepositoryImpl) GetErrorEventsLastCronTimestamp(errorEventsTimeIndex string) (*esapi.Response, error) {
content := getErrorEventsLastCronTimestampQuery()
result, err := r.esClient.FetchESData(errorEventsTimeIndex, content)
if err != nil {
return nil, err
}
return result, nil
}
func (r *ErrorEventsRepositoryImpl) UpdateErrorEventsLastCronTimestamp(timestamp int64, clientName string, errorEventsTimeIndex string) error {
content := getUpdateErrorEventsLastCronTimestampQuery(timestamp, clientName)
response, err := r.esClient.UpdateESData([]string{errorEventsTimeIndex}, content)
defer response.Body.Close()
if response.IsError() {
err = errors.New("update error events last cron timestamp failed")
}
if err != nil {
return err
}
return nil
}
func (r *ErrorEventsRepositoryImpl) FetchSessionErrorEventsWithKeyValue(keyValueMap map[string][]string, errorEventsUploadIndex string) (*esapi.Response, error) {
content := getSessionErrorEventsFromSessionIdQuery(keyValueMap)
result, err := r.esClient.FetchESData(errorEventsUploadIndex, content)
if err != nil {
return nil, err
}
return result, nil
}
func (r *ErrorEventsRepositoryImpl) UpdateErrorEventsInActiveBulk(errorEventsdocIdList, errorEventsindexList []string) error {
content := getUpdateErrorEventsInActiveBulkQuery(errorEventsdocIdList, errorEventsindexList)
result, err := r.esClient.UpdateESDataBulk(content, errorEventsindexList)
if result.IsError() {
err = errors.New("update error events in active bulk failed")
}
if err != nil {
return err
}
defer result.Body.Close()
return nil
}
func getSessionIdFromSessionErrorEventsWithTimeRangeQuery(startTime int64, endTime int64, sessionErrorEventsFilter []string) string {
var queryList []string
queryList = append(queryList, createRangeQuery("error_event.error_timestamp", startTime, endTime))
queryList = append(queryList, createTermsSubQuery("error_event.error_name", sessionErrorEventsFilter))
mustNotQuery := createMustNotQuery(createTermSubQueryBool("error_event.is_active", false))
boolQuery := createBoolQuery(createMustQuery(queryList...), mustNotQuery)
searchQuery := createSearchQuery(boolQuery)
aggregationQuery := buildAggregationQuery("buckets", createTermsAggregationQuery("error_attributes.session_id", utils.SessionUpperLimit))
return createEsQuery(createSizeQuery(0), searchQuery, createAggregationQuery(aggregationQuery))
}
func getErrorEventsQuery(sessionId string) string {
mustQuery := createMustQuery(createTermSubQuery("error_attributes.session_id", sessionId))
boolQuery := createBoolQuery(mustQuery)
return createEsQuery(createSearchQuery(boolQuery))
}
func getErrorEventsLastCronTimestampQuery() string {
return createEsQuery(createSearchQuery(createMatchAllQuery()))
}
func getUpdateErrorEventsLastCronTimestampQuery(timestamp int64, clientName string) string {
source := fmt.Sprintf(`ctx._source.%s = %dL`, clientName, timestamp)
scriptQuery := createScriptQuery(source)
searchQuery := createSearchQuery(createMatchAllQuery())
return createEsQuery(searchQuery, scriptQuery)
}
func getSessionErrorEventsFromSessionIdQuery(keyValueMap map[string][]string) string {
var queryList []string
for key, value := range keyValueMap {
queryList = append(queryList, createTermsSubQuery(key, value))
}
mustQuery := createMustQuery(queryList...)
mustNotQuery := createMustNotQuery(createTermSubQueryBool("error_event.is_active", false))
searchQuery := createSearchQuery(createBoolQuery(mustQuery, mustNotQuery))
return createEsQuery(searchQuery, createSizeQuery(utils.EsUpperLimit))
}
func getUpdateErrorEventsInActiveBulkQuery(errorEventsdocIdList, errorEventsindexList []string) string {
var updateQuery []string
for sliceIndex, docId := range errorEventsdocIdList {
bulkUpdateQueryBool := createBulkUpdateQuery(errorEventsindexList[sliceIndex], docId)
scriptQuery := createEsQuery(createScriptQuery("ctx._source.error_event.is_active = " + fmt.Sprintf("%t", false)))
updateQuery = append(updateQuery, strings.Join([]string{bulkUpdateQueryBool, scriptQuery}, utils.NEWLINE))
}
return strings.Join(updateQuery, utils.NEWLINE) + utils.NEWLINE
}