179 lines
7.3 KiB
Go
179 lines
7.3 KiB
Go
package repository
|
|
|
|
import (
|
|
"alfred/internal/metrics"
|
|
"alfred/pkg/es7"
|
|
"alfred/pkg/log"
|
|
"alfred/utils"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"github.com/elastic/go-elasticsearch/v8/esapi"
|
|
"go.uber.org/zap"
|
|
"strings"
|
|
)
|
|
|
|
type ErrorEventsRepository interface {
|
|
UploadErrorEvents(message string, errorsIngestionIndex string) (int, error)
|
|
FetchSessionIdFromSessionErrorEventsWithTimeRange(errorEventsUploadIndexList []string, startTime, endTime int64, allowedErrorEventsFilter []string) (*esapi.Response, error)
|
|
FetchErrorEvents(sessionId string, errorEventsUploadIndex string) (*esapi.Response, error)
|
|
GetErrorEventsLastCronTimestamp(errorEventsTimeIndex string) (*esapi.Response, error)
|
|
UpdateErrorEventsLastCronTimestamp(timestamp int64, clientName string, errorEventsTimeIndex string) error
|
|
FetchSessionErrorEventsWithKeyValue(keyValueMap map[string][]string, errorEventsIndex string) (*esapi.Response, error)
|
|
UpdateErrorEventsInActiveBulk(errorEventsdocIdList, errorEventsindexList []string) error
|
|
}
|
|
|
|
type ErrorEventsRepositoryImpl struct {
|
|
esClient es7.ElasticSearchClient
|
|
}
|
|
|
|
func NewErrorEventsRepository(esClient es7.ElasticSearchClient) ErrorEventsRepository {
|
|
return &ErrorEventsRepositoryImpl{
|
|
esClient: esClient,
|
|
}
|
|
}
|
|
|
|
func (r *ErrorEventsRepositoryImpl) UploadErrorEvents(message string, errorsIngestionIndex string) (int, error) {
|
|
year, month, day := utils.GetCurrentDate()
|
|
index := errorsIngestionIndex + fmt.Sprintf("-%d-%d-%d", year, month, day)
|
|
|
|
req := esapi.IndexRequest{
|
|
Index: index,
|
|
Body: strings.NewReader(message),
|
|
}
|
|
|
|
indexResponse, err := req.Do(context.Background(), r.esClient.GetESClient())
|
|
if indexResponse != nil {
|
|
defer indexResponse.Body.Close()
|
|
}
|
|
if err != nil {
|
|
metrics.ElasticSearchIngestionFailureCounter.WithLabelValues(errorsIngestionIndex).Inc()
|
|
log.Error("IndexResponse is nil for UploadErrorEvents", zap.Error(err))
|
|
return 500, err
|
|
}
|
|
|
|
if indexResponse.IsError() {
|
|
metrics.ElasticSearchIngestionFailureCounter.WithLabelValues(errorsIngestionIndex).Inc()
|
|
err = errors.New("create error events upload data failed")
|
|
}
|
|
|
|
if err != nil {
|
|
metrics.ElasticSearchIngestionFailureCounter.WithLabelValues(errorsIngestionIndex).Inc()
|
|
log.Error("error-events data insert document upload in ES failed", zap.String("response", fmt.Sprintf("%v", indexResponse)), zap.Error(err))
|
|
return indexResponse.StatusCode, err
|
|
}
|
|
metrics.ElasticSearchIngestionSuccessCounter.WithLabelValues(errorsIngestionIndex).Inc()
|
|
return indexResponse.StatusCode, nil
|
|
}
|
|
|
|
func (r *ErrorEventsRepositoryImpl) FetchSessionIdFromSessionErrorEventsWithTimeRange(errorEventsUploadIndexList []string, startTime, endTime int64, sessionErrorEventsFilter []string) (*esapi.Response, error) {
|
|
content := getSessionIdFromSessionErrorEventsWithTimeRangeQuery(startTime, endTime, sessionErrorEventsFilter)
|
|
result, err := r.esClient.FetchESDataMultipleIndex(errorEventsUploadIndexList, content)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
func (r *ErrorEventsRepositoryImpl) FetchErrorEvents(sessionId string, errorEventsUploadIndex string) (*esapi.Response, error) {
|
|
content := getErrorEventsQuery(sessionId)
|
|
result, err := r.esClient.FetchESData(errorEventsUploadIndex, content)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
func (r *ErrorEventsRepositoryImpl) GetErrorEventsLastCronTimestamp(errorEventsTimeIndex string) (*esapi.Response, error) {
|
|
content := getErrorEventsLastCronTimestampQuery()
|
|
result, err := r.esClient.FetchESData(errorEventsTimeIndex, content)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
func (r *ErrorEventsRepositoryImpl) UpdateErrorEventsLastCronTimestamp(timestamp int64, clientName string, errorEventsTimeIndex string) error {
|
|
content := getUpdateErrorEventsLastCronTimestampQuery(timestamp, clientName)
|
|
response, err := r.esClient.UpdateESData([]string{errorEventsTimeIndex}, content)
|
|
defer response.Body.Close()
|
|
if response.IsError() {
|
|
err = errors.New("update error events last cron timestamp failed")
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *ErrorEventsRepositoryImpl) FetchSessionErrorEventsWithKeyValue(keyValueMap map[string][]string, errorEventsUploadIndex string) (*esapi.Response, error) {
|
|
content := getSessionErrorEventsFromSessionIdQuery(keyValueMap)
|
|
result, err := r.esClient.FetchESData(errorEventsUploadIndex, content)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
func (r *ErrorEventsRepositoryImpl) UpdateErrorEventsInActiveBulk(errorEventsdocIdList, errorEventsindexList []string) error {
|
|
content := getUpdateErrorEventsInActiveBulkQuery(errorEventsdocIdList, errorEventsindexList)
|
|
result, err := r.esClient.UpdateESDataBulk(content, errorEventsindexList)
|
|
if result.IsError() {
|
|
err = errors.New("update error events in active bulk failed")
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer result.Body.Close()
|
|
return nil
|
|
}
|
|
|
|
func getSessionIdFromSessionErrorEventsWithTimeRangeQuery(startTime int64, endTime int64, sessionErrorEventsFilter []string) string {
|
|
var queryList []string
|
|
queryList = append(queryList, createRangeQuery("error_event.error_timestamp", startTime, endTime))
|
|
queryList = append(queryList, createTermsSubQuery("error_event.error_name", sessionErrorEventsFilter))
|
|
mustNotQuery := createMustNotQuery(createTermSubQueryBool("error_event.is_active", false))
|
|
boolQuery := createBoolQuery(createMustQuery(queryList...), mustNotQuery)
|
|
searchQuery := createSearchQuery(boolQuery)
|
|
aggregationQuery := buildAggregationQuery("buckets", createTermsAggregationQuery("error_attributes.session_id", utils.SessionUpperLimit))
|
|
return createEsQuery(createSizeQuery(0), searchQuery, createAggregationQuery(aggregationQuery))
|
|
}
|
|
|
|
func getErrorEventsQuery(sessionId string) string {
|
|
mustQuery := createMustQuery(createTermSubQuery("error_attributes.session_id", sessionId))
|
|
boolQuery := createBoolQuery(mustQuery)
|
|
return createEsQuery(createSearchQuery(boolQuery))
|
|
}
|
|
|
|
func getErrorEventsLastCronTimestampQuery() string {
|
|
return createEsQuery(createSearchQuery(createMatchAllQuery()))
|
|
}
|
|
|
|
func getUpdateErrorEventsLastCronTimestampQuery(timestamp int64, clientName string) string {
|
|
source := fmt.Sprintf(`ctx._source.%s = %dL`, clientName, timestamp)
|
|
scriptQuery := createScriptQuery(source)
|
|
searchQuery := createSearchQuery(createMatchAllQuery())
|
|
return createEsQuery(searchQuery, scriptQuery)
|
|
}
|
|
|
|
func getSessionErrorEventsFromSessionIdQuery(keyValueMap map[string][]string) string {
|
|
var queryList []string
|
|
for key, value := range keyValueMap {
|
|
queryList = append(queryList, createTermsSubQuery(key, value))
|
|
}
|
|
mustQuery := createMustQuery(queryList...)
|
|
mustNotQuery := createMustNotQuery(createTermSubQueryBool("error_event.is_active", false))
|
|
searchQuery := createSearchQuery(createBoolQuery(mustQuery, mustNotQuery))
|
|
return createEsQuery(searchQuery, createSizeQuery(utils.EsUpperLimit))
|
|
}
|
|
|
|
func getUpdateErrorEventsInActiveBulkQuery(errorEventsdocIdList, errorEventsindexList []string) string {
|
|
var updateQuery []string
|
|
for sliceIndex, docId := range errorEventsdocIdList {
|
|
bulkUpdateQueryBool := createBulkUpdateQuery(errorEventsindexList[sliceIndex], docId)
|
|
scriptQuery := createEsQuery(createScriptQuery("ctx._source.error_event.is_active = " + fmt.Sprintf("%t", false)))
|
|
updateQuery = append(updateQuery, strings.Join([]string{bulkUpdateQueryBool, scriptQuery}, utils.NEWLINE))
|
|
}
|
|
return strings.Join(updateQuery, utils.NEWLINE) + utils.NEWLINE
|
|
}
|