167 lines
7.3 KiB
Go
167 lines
7.3 KiB
Go
package repository
|
|
|
|
import (
|
|
"alfred/api/request"
|
|
"alfred/config"
|
|
"alfred/internal/metrics"
|
|
"alfred/model/es"
|
|
"alfred/model/ingester"
|
|
"alfred/pkg/es7"
|
|
"alfred/pkg/log"
|
|
"alfred/utils"
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"github.com/elastic/go-elasticsearch/v8/esapi"
|
|
"go.uber.org/zap"
|
|
"strings"
|
|
)
|
|
|
|
type WebSessionsRepository interface {
|
|
UploadWebSession(sessionUploadRequest ingester.WebSessionUploadRequest, webSessionUploadIndex string) error
|
|
FetchWebSessionsWithSessionId(sessionId, webSessionUploadIndex string) (*esapi.Response, error)
|
|
FetchAllWebSession(filters request.WebSessionFilters, page *es.Page, webSessionUploadIndexList []string) (*esapi.Response, error)
|
|
FetchWebSessionsWithDurationResponse(webSessionIds []string, webSessionUploadIndex string, filters request.WebSessionFilters, page *es.Page) (*esapi.Response, error)
|
|
}
|
|
|
|
type WebSessionsRepositoryImpl struct {
|
|
esClient es7.ElasticSearchClient
|
|
}
|
|
|
|
func NewWebSessionsRepository(esClient es7.ElasticSearchClient) WebSessionsRepository {
|
|
return &WebSessionsRepositoryImpl{
|
|
esClient: esClient,
|
|
}
|
|
}
|
|
|
|
func (r *WebSessionsRepositoryImpl) UploadWebSession(sessionUploadRequest ingester.WebSessionUploadRequest, webSessionUploadIndex string) error {
|
|
year, month, day := utils.GetCurrentDate()
|
|
|
|
index := webSessionUploadIndex + fmt.Sprintf("-%d-%d-%d", year, month, day)
|
|
message, err := json.Marshal(sessionUploadRequest)
|
|
if err != nil {
|
|
metrics.ElasticSearchIngestionFailureCounter.WithLabelValues(webSessionUploadIndex).Inc()
|
|
log.Error("error ingesting web session", zap.Error(err))
|
|
return err
|
|
}
|
|
documentId := sessionUploadRequest.SessionAttributes.EventId
|
|
if sessionUploadRequest.BaseAttributes.Version >= config.GetCollectorConfig().ElasticSearchConfig.MinWebVersionSupportingSingleDoc {
|
|
documentId = sessionUploadRequest.BaseAttributes.SessionId
|
|
}
|
|
req := esapi.IndexRequest{
|
|
Index: index,
|
|
DocumentID: documentId,
|
|
Body: bytes.NewReader(message),
|
|
}
|
|
|
|
indexResponse, err := req.Do(context.Background(), r.esClient.GetESClient())
|
|
if err != nil {
|
|
metrics.ElasticSearchIngestionFailureCounter.WithLabelValues(webSessionUploadIndex).Inc()
|
|
log.Error("web session ingestion failed", zap.String("response", fmt.Sprintf("%v", indexResponse)), zap.Error(err))
|
|
return err
|
|
}
|
|
defer indexResponse.Body.Close()
|
|
|
|
metrics.ElasticSearchIngestionSuccessCounter.WithLabelValues(webSessionUploadIndex).Inc()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *WebSessionsRepositoryImpl) FetchWebSessionsWithSessionId(sessionId, webSessionUploadIndex string) (*esapi.Response, error) {
|
|
content := getWebSessionFromSessionIdQuery(sessionId)
|
|
result, err := r.esClient.FetchESData(webSessionUploadIndex, content)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return result, err
|
|
}
|
|
|
|
func (r *WebSessionsRepositoryImpl) FetchAllWebSession(filters request.WebSessionFilters, page *es.Page, webSessionUploadIndexList []string) (*esapi.Response, error) {
|
|
content := getAllPaginatedWebSessionsQuery(filters, page)
|
|
result, err := r.esClient.FetchESDataMultipleIndex(webSessionUploadIndexList, content)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
func (r *WebSessionsRepositoryImpl) FetchWebSessionsWithDurationResponse(webSessionIds []string, webSessionUploadIndex string, filters request.WebSessionFilters, page *es.Page) (*esapi.Response, error) {
|
|
content := getWebSessionWithDurationQuery(webSessionIds, filters, page)
|
|
result, err := r.esClient.FetchESData(webSessionUploadIndex, content)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
func getWebSessionFromSessionIdQuery(sessionId string) string {
|
|
termFilter := createTermSubQuery("base_attributes.session_id", sessionId)
|
|
var sortQueryFields []string
|
|
sortQueryFields = append(sortQueryFields, createSortQueryFields("base_attributes.client_timestamp", "asc", "strict_date_optional_time_nanos"))
|
|
sortQueryFields = append(sortQueryFields, createSortQueryFields("session_attribute.event_id", "asc", ""))
|
|
query := createEsQuery(
|
|
createSearchQuery(createBoolQuery(createShouldQuery(termFilter))),
|
|
createMultiSortQuery(sortQueryFields...),
|
|
createSizeQuery(utils.EsUpperLimit),
|
|
)
|
|
return query
|
|
}
|
|
|
|
func getAllPaginatedWebSessionsQuery(filters request.WebSessionFilters, page *es.Page) string {
|
|
projectName, agentId, ticketId, sessionId, deviceId, emailId, phoneNumber := getFilterList(filters)
|
|
multipleShouldQuery := []string{createBoolShouldQuery(createTermSubQuery("base_attributes.project_name", projectName...)),
|
|
createBoolShouldQuery(createTermSubQuery("base_attributes.metadata.agentId", agentId...)),
|
|
createBoolShouldQuery(createTermSubQuery("base_attributes.metadata.ticketId", ticketId...)),
|
|
createBoolShouldQuery(createTermSubQuery("base_attributes.metadata.emailId", emailId...)),
|
|
createBoolShouldQuery(createTermSubQuery("base_attributes.metadata.phoneNumber", phoneNumber...)),
|
|
createBoolShouldQuery(createTermSubQuery("base_attributes.session_id", sessionId...)),
|
|
createBoolShouldQuery(createTermSubQuery("base_attributes.device_id", deviceId...))}
|
|
shouldQuery := strings.Join(multipleShouldQuery, ",")
|
|
rangeQuery := createRangeQuery("base_attributes.client_timestamp", filters.StartTimestamp, filters.EndTimestamp)
|
|
|
|
query := createEsQuery(
|
|
createSearchQuery(createBoolQuery(createMustQuery(shouldQuery, rangeQuery))),
|
|
createSortQuery(filters.SortBy, string(page.SortDirection), "strict_date_optional_time_nanos"),
|
|
createCollapseQuery("base_attributes.session_id"),
|
|
createFromQuery(page.PageSize*page.PageNumber),
|
|
createSizeQuery(page.PageSize),
|
|
)
|
|
return query
|
|
}
|
|
|
|
func getWebSessionWithDurationQuery(webSessionIds []string, filters request.WebSessionFilters, page *es.Page) string {
|
|
boolshouldQuery := createBoolShouldQuery(createTermSubQuery("base_attributes.session_id", webSessionIds...))
|
|
sortQuery := createSortQuery(filters.SortBy, string(page.SortDirection), "strict_date_optional_time_nanos")
|
|
collapseQuery := createCollapseQuery("base_attributes.session_id")
|
|
minTimestampAggregationQuery := buildAggregationQuery("min_start_timestamp", createMinAggregationQuery("session_attribute.start_timestamp"))
|
|
maxTimestampAggregationQuery := buildAggregationQuery("max_end_timestamp", createMaxAggregationQuery("session_attribute.end_timestamp"))
|
|
termsAggregation := createTermsAggregationQuery("base_attributes.session_id", int(page.PageSize))
|
|
innerAggregationQuery := createAggregationQuery(minTimestampAggregationQuery, maxTimestampAggregationQuery)
|
|
aggregationQuery := createAggregationQuery(buildAggregationQuery("buckets", termsAggregation, innerAggregationQuery))
|
|
return createEsQuery(createSearchQuery(boolshouldQuery), sortQuery, collapseQuery, aggregationQuery, createSizeQuery(utils.EsUpperLimit))
|
|
}
|
|
|
|
func getFilterList(filters request.WebSessionFilters) (projectName, agentId, ticketId, sessionId, deviceId, emailId, phoneNumber []string) {
|
|
if filters.ProjectName != utils.EMPTY {
|
|
projectName = strings.Split(filters.ProjectName, utils.COMMA)
|
|
}
|
|
if filters.AgentId != utils.EMPTY {
|
|
agentId = strings.Split(filters.AgentId, utils.COMMA)
|
|
}
|
|
if filters.TicketId != utils.EMPTY {
|
|
ticketId = strings.Split(filters.TicketId, utils.COMMA)
|
|
}
|
|
if filters.SessionId != utils.EMPTY {
|
|
sessionId = strings.Split(filters.SessionId, utils.COMMA)
|
|
}
|
|
if len(filters.DeviceId) != 0 {
|
|
deviceId = filters.DeviceId
|
|
}
|
|
if filters.EmailId != utils.EMPTY {
|
|
emailId = strings.Split(filters.EmailId, utils.COMMA)
|
|
}
|
|
return projectName, agentId, ticketId, sessionId, deviceId, emailId, phoneNumber
|
|
}
|