package repository import ( "alfred/api/request" "alfred/config" "alfred/internal/metrics" "alfred/model/es" "alfred/model/ingester" "alfred/pkg/es7" "alfred/pkg/log" "alfred/utils" "bytes" "context" "encoding/json" "fmt" "github.com/elastic/go-elasticsearch/v8/esapi" "go.uber.org/zap" "strings" ) type WebSessionsRepository interface { UploadWebSession(sessionUploadRequest ingester.WebSessionUploadRequest, webSessionUploadIndex string) error FetchWebSessionsWithSessionId(sessionId, webSessionUploadIndex string) (*esapi.Response, error) FetchAllWebSession(filters request.WebSessionFilters, page *es.Page, webSessionUploadIndexList []string) (*esapi.Response, error) FetchWebSessionsWithDurationResponse(webSessionIds []string, webSessionUploadIndex string, filters request.WebSessionFilters, page *es.Page) (*esapi.Response, error) } type WebSessionsRepositoryImpl struct { esClient es7.ElasticSearchClient } func NewWebSessionsRepository(esClient es7.ElasticSearchClient) WebSessionsRepository { return &WebSessionsRepositoryImpl{ esClient: esClient, } } func (r *WebSessionsRepositoryImpl) UploadWebSession(sessionUploadRequest ingester.WebSessionUploadRequest, webSessionUploadIndex string) error { year, month, day := utils.GetCurrentDate() index := webSessionUploadIndex + fmt.Sprintf("-%d-%d-%d", year, month, day) message, err := json.Marshal(sessionUploadRequest) if err != nil { metrics.ElasticSearchIngestionFailureCounter.WithLabelValues(webSessionUploadIndex).Inc() log.Error("error ingesting web session", zap.Error(err)) return err } documentId := sessionUploadRequest.SessionAttributes.EventId if sessionUploadRequest.BaseAttributes.Version >= config.GetCollectorConfig().ElasticSearchConfig.MinWebVersionSupportingSingleDoc { documentId = sessionUploadRequest.BaseAttributes.SessionId } req := esapi.IndexRequest{ Index: index, DocumentID: documentId, Body: bytes.NewReader(message), } indexResponse, err := req.Do(context.Background(), r.esClient.GetESClient()) if err != nil { metrics.ElasticSearchIngestionFailureCounter.WithLabelValues(webSessionUploadIndex).Inc() log.Error("web session ingestion failed", zap.String("response", fmt.Sprintf("%v", indexResponse)), zap.Error(err)) return err } defer indexResponse.Body.Close() metrics.ElasticSearchIngestionSuccessCounter.WithLabelValues(webSessionUploadIndex).Inc() return nil } func (r *WebSessionsRepositoryImpl) FetchWebSessionsWithSessionId(sessionId, webSessionUploadIndex string) (*esapi.Response, error) { content := getWebSessionFromSessionIdQuery(sessionId) result, err := r.esClient.FetchESData(webSessionUploadIndex, content) if err != nil { return nil, err } return result, err } func (r *WebSessionsRepositoryImpl) FetchAllWebSession(filters request.WebSessionFilters, page *es.Page, webSessionUploadIndexList []string) (*esapi.Response, error) { content := getAllPaginatedWebSessionsQuery(filters, page) result, err := r.esClient.FetchESDataMultipleIndex(webSessionUploadIndexList, content) if err != nil { return nil, err } return result, nil } func (r *WebSessionsRepositoryImpl) FetchWebSessionsWithDurationResponse(webSessionIds []string, webSessionUploadIndex string, filters request.WebSessionFilters, page *es.Page) (*esapi.Response, error) { content := getWebSessionWithDurationQuery(webSessionIds, filters, page) result, err := r.esClient.FetchESData(webSessionUploadIndex, content) if err != nil { return nil, err } return result, nil } func getWebSessionFromSessionIdQuery(sessionId string) string { termFilter := createTermSubQuery("base_attributes.session_id", sessionId) var sortQueryFields []string sortQueryFields = append(sortQueryFields, createSortQueryFields("base_attributes.client_timestamp", "asc", "strict_date_optional_time_nanos")) sortQueryFields = append(sortQueryFields, createSortQueryFields("session_attribute.event_id", "asc", "")) query := createEsQuery( createSearchQuery(createBoolQuery(createShouldQuery(termFilter))), createMultiSortQuery(sortQueryFields...), createSizeQuery(utils.EsUpperLimit), ) return query } func getAllPaginatedWebSessionsQuery(filters request.WebSessionFilters, page *es.Page) string { projectName, agentId, ticketId, sessionId, deviceId, emailId, phoneNumber := getFilterList(filters) multipleShouldQuery := []string{createBoolShouldQuery(createTermSubQuery("base_attributes.project_name", projectName...)), createBoolShouldQuery(createTermSubQuery("base_attributes.metadata.agentId", agentId...)), createBoolShouldQuery(createTermSubQuery("base_attributes.metadata.ticketId", ticketId...)), createBoolShouldQuery(createTermSubQuery("base_attributes.metadata.emailId", emailId...)), createBoolShouldQuery(createTermSubQuery("base_attributes.metadata.phoneNumber", phoneNumber...)), createBoolShouldQuery(createTermSubQuery("base_attributes.session_id", sessionId...)), createBoolShouldQuery(createTermSubQuery("base_attributes.device_id", deviceId...))} shouldQuery := strings.Join(multipleShouldQuery, ",") rangeQuery := createRangeQuery("base_attributes.client_timestamp", filters.StartTimestamp, filters.EndTimestamp) query := createEsQuery( createSearchQuery(createBoolQuery(createMustQuery(shouldQuery, rangeQuery))), createSortQuery(filters.SortBy, string(page.SortDirection), "strict_date_optional_time_nanos"), createCollapseQuery("base_attributes.session_id"), createFromQuery(page.PageSize*page.PageNumber), createSizeQuery(page.PageSize), ) return query } func getWebSessionWithDurationQuery(webSessionIds []string, filters request.WebSessionFilters, page *es.Page) string { boolshouldQuery := createBoolShouldQuery(createTermSubQuery("base_attributes.session_id", webSessionIds...)) sortQuery := createSortQuery(filters.SortBy, string(page.SortDirection), "strict_date_optional_time_nanos") collapseQuery := createCollapseQuery("base_attributes.session_id") minTimestampAggregationQuery := buildAggregationQuery("min_start_timestamp", createMinAggregationQuery("session_attribute.start_timestamp")) maxTimestampAggregationQuery := buildAggregationQuery("max_end_timestamp", createMaxAggregationQuery("session_attribute.end_timestamp")) termsAggregation := createTermsAggregationQuery("base_attributes.session_id", int(page.PageSize)) innerAggregationQuery := createAggregationQuery(minTimestampAggregationQuery, maxTimestampAggregationQuery) aggregationQuery := createAggregationQuery(buildAggregationQuery("buckets", termsAggregation, innerAggregationQuery)) return createEsQuery(createSearchQuery(boolshouldQuery), sortQuery, collapseQuery, aggregationQuery, createSizeQuery(utils.EsUpperLimit)) } func getFilterList(filters request.WebSessionFilters) (projectName, agentId, ticketId, sessionId, deviceId, emailId, phoneNumber []string) { if filters.ProjectName != utils.EMPTY { projectName = strings.Split(filters.ProjectName, utils.COMMA) } if filters.AgentId != utils.EMPTY { agentId = strings.Split(filters.AgentId, utils.COMMA) } if filters.TicketId != utils.EMPTY { ticketId = strings.Split(filters.TicketId, utils.COMMA) } if filters.SessionId != utils.EMPTY { sessionId = strings.Split(filters.SessionId, utils.COMMA) } if len(filters.DeviceId) != 0 { deviceId = filters.DeviceId } if filters.EmailId != utils.EMPTY { emailId = strings.Split(filters.EmailId, utils.COMMA) } return projectName, agentId, ticketId, sessionId, deviceId, emailId, phoneNumber }