package repository import ( "alfred/internal/metrics" "alfred/model/es" "alfred/model/ingester" "alfred/pkg/es7" "alfred/pkg/log" "alfred/utils" "bytes" "context" "encoding/json" "errors" "fmt" "github.com/elastic/go-elasticsearch/v8/esapi" "go.uber.org/zap" "strings" ) type SessionsRepository interface { UploadSession(alfredSessionRecordingEvent ingester.SessionUploadRequest, sessionUploadIndex string) (int, error) FetchSessionWithTimeRange(startTimestamp, endTimestamp int64, page *es.Page, sessionUploadIndexList []string, sortBy, appOs string) (*esapi.Response, error) FetchSessionsWithSessionIds(sessionIds []string, sessionUploadIndex string) (*esapi.Response, error) FetchSessionAndSessionDurationWithSessionIds(sessionIds []string, sessionUploadIndex string, page *es.Page, sortBy string) (*esapi.Response, error) FetchDeviceAttributesForMetrics(startTimestamp int64, endTimestamp int64, snapshotPerSecond int64, sessionUploadIndex string, page *es.Page) (*esapi.Response, error) UpdateSessionErrorEventsWithSessionId(sessionId []string, sessionUploadIndexList []string, hasErrors bool) error FetchSessionListFromSessionIds(sessionIds []string, sessionUploadIndex string, page *es.Page, sortBy string) (*esapi.Response, error) FetchSessionWithLabels(appVersion []string, deviceIds []string, startTimestamp, endTimestamp int64, page *es.Page, sessionUploadSearchIndexList []string, appOs string) (*esapi.Response, error) FetchUniqueKeys(key string, sessionUploadIndex string) (*esapi.Response, error) } type SessionsRepositoryImpl struct { esClient es7.ElasticSearchClient } func NewSessionsRepository(esClient es7.ElasticSearchClient) SessionsRepository { return &SessionsRepositoryImpl{ esClient: esClient, } } func (r *SessionsRepositoryImpl) UploadSession(alfredSessionRecordingEvent ingester.SessionUploadRequest, sessionUploadIndex string) (int, error) { year, month, day := utils.GetCurrentDate() index := sessionUploadIndex + fmt.Sprintf("-%d-%d-%d", year, month, day) message, err := json.Marshal(alfredSessionRecordingEvent) if err != nil { metrics.ElasticSearchIngestionFailureCounter.WithLabelValues(sessionUploadIndex).Inc() log.Error("error ingesting app session", zap.Error(err)) return 0, err } req := esapi.IndexRequest{ Index: index, DocumentID: alfredSessionRecordingEvent.SessionUploadEventAttributes.EventId, Body: bytes.NewReader(message), } indexResponse, err := req.Do(context.Background(), r.esClient.GetESClient()) if indexResponse != nil { defer indexResponse.Body.Close() } if err != nil { metrics.ElasticSearchIngestionFailureCounter.WithLabelValues(sessionUploadIndex).Inc() log.Error("IndexResponse is nil for UploadSession", zap.Error(err)) return 500, err } if indexResponse.IsError() { metrics.ElasticSearchIngestionFailureCounter.WithLabelValues(sessionUploadIndex).Inc() err = errors.New("create session upload data failed") } if err != nil { metrics.ElasticSearchIngestionFailureCounter.WithLabelValues(sessionUploadIndex).Inc() log.Error("session upload data insert document in ES failed", zap.String("response", fmt.Sprintf("%v", indexResponse)), zap.Error(err)) return indexResponse.StatusCode, err } metrics.ElasticSearchIngestionSuccessCounter.WithLabelValues(sessionUploadIndex).Inc() return indexResponse.StatusCode, nil } func (r *SessionsRepositoryImpl) FetchSessionWithTimeRange(startTimestamp, endTimestamp int64, page *es.Page, sessionUploadIndexList []string, sortBy, appOs string) (*esapi.Response, error) { content := getAllPaginatedSessionsQuery(startTimestamp, endTimestamp, page, sortBy, appOs) result, err := r.esClient.FetchESDataMultipleIndex(sessionUploadIndexList, content) if err != nil { return nil, err } return result, err } func (r *SessionsRepositoryImpl) FetchSessionsWithSessionIds(sessionIds []string, sessionUploadIndex string) (*esapi.Response, error) { content := getSessionsFromSessionIdsQuery(sessionIds) result, err := r.esClient.FetchESData(sessionUploadIndex, content) if err != nil { return nil, err } return result, err } func (r *SessionsRepositoryImpl) FetchSessionAndSessionDurationWithSessionIds(sessionIds []string, sessionUploadIndex string, page *es.Page, sortBy string) (*esapi.Response, error) { content := getSessionAndSessionDurationFromSessionIdQuery(sessionIds, page, sortBy) result, err := r.esClient.FetchESData(sessionUploadIndex, content) if err != nil { return nil, err } return result, err } func (r *SessionsRepositoryImpl) FetchDeviceAttributesForMetrics(startTimestamp int64, endTimestamp int64, snapshotPerSecond int64, sessionUploadIndex string, page *es.Page) (*esapi.Response, error) { content := getAggsOnSessionIdsForDeviceAttributes(startTimestamp, endTimestamp, snapshotPerSecond, page) result, err := r.esClient.FetchESData(sessionUploadIndex, content) if err != nil { log.Error("Fetching Device Metrics Failed:", zap.Error(err)) return nil, err } return result, err } func (r *SessionsRepositoryImpl) UpdateSessionErrorEventsWithSessionId(sessionId []string, sessionUploadIndexList []string, hasErrors bool) error { content := getUpdateSessionErrorEventsWithSessionIdQuery(sessionId, hasErrors) response, err := r.esClient.UpdateESData(sessionUploadIndexList, content) defer response.Body.Close() if err != nil { log.Error("update session error events with session id failed", zap.Error(err), zap.String("query", content)) return err } if response.IsError() { err = errors.New("update has_errors true in session failed") log.Error("update session error events with session id failed", zap.Error(err), zap.String("response", fmt.Sprintf("%v", response)), zap.String("query", content)) return err } return nil } func (r *SessionsRepositoryImpl) FetchSessionListFromSessionIds(sessionIds []string, sessionUploadIndex string, page *es.Page, sortBy string) (*esapi.Response, error) { content := getSessionListFromSessionIdsQuery(sessionIds, page, sortBy) result, err := r.esClient.FetchESData(sessionUploadIndex, content) if err != nil { return nil, err } return result, err } func (r *SessionsRepositoryImpl) FetchSessionWithLabels(appVersion []string, deviceIds []string, startTimestamp, endTimestamp int64, page *es.Page, sessionUploadSearchIndexList []string, appOs string) (*esapi.Response, error) { var content string if endTimestamp == 0 { content = getPaginatedSessionsFromLabelsQueryWithoutTimeRange(appVersion, deviceIds, page, appOs) } else { content = getPaginatedSessionsFromLabelsQuery(appVersion, deviceIds, startTimestamp, endTimestamp, page, appOs) } result, err := r.esClient.FetchESDataMultipleIndex(sessionUploadSearchIndexList, content) if err != nil { return nil, err } return result, err } func (r *SessionsRepositoryImpl) FetchUniqueKeys(key string, sessionUploadIndex string) (*esapi.Response, error) { content := getUniqueValuesQueryFromSession(key) result, err := r.esClient.FetchESData(sessionUploadIndex, content) if err != nil { return nil, err } return result, nil } func getAllPaginatedSessionsQuery(startTimestamp int64, endTimestamp int64, page *es.Page, sortBy, appOs string) string { rangeQuery := createRangeQuery("base_attributes.client_ts", startTimestamp, endTimestamp) appOsMatchQuery := createMatchQuery("base_attributes.app_os", appOs) boolQuery := createBoolQuery(createMustQuery(rangeQuery, appOsMatchQuery)) searchQuery := createSearchQuery(boolQuery) sortQuery := createSortQuery(sortBy, string(page.SortDirection), "strict_date_optional_time_nanos") collapseQuery := createCollapseQuery("base_attributes.session_id") return createEsQuery(searchQuery, sortQuery, collapseQuery, createFromQuery(page.PageSize*page.PageNumber), createSizeQuery(page.PageSize)) } func getSessionsFromSessionIdsQuery(sessionIds []string) string { shouldQuery := createShouldQuery(createTermSubQuery("base_attributes.session_id", sessionIds...)) boolQuery := createBoolQuery(shouldQuery) searchQuery := createSearchQuery(boolQuery) sortQuery := createSortQuery("base_attributes.client_ts", "asc", "strict_date_optional_time_nanos") return createEsQuery(searchQuery, sortQuery, createSizeQuery(utils.SessionUpperLimit)) } func getSessionAndSessionDurationFromSessionIdQuery(sessionIds []string, page *es.Page, sortBy string) string { shouldQuery := createShouldQuery(createTermSubQuery("base_attributes.session_id", sessionIds...)) boolQuery := createBoolQuery(shouldQuery) searchQuery := createSearchQuery(boolQuery) sortQuery := createSortQuery(sortBy, string(page.SortDirection), "strict_date_optional_time_nanos") termsAggregation := createTermsAggregationQuery("base_attributes.session_id", int(page.PageSize)) minTimeStampAggregation := buildAggregationQuery("min_timestamp", createMinAggregationQuery("base_attributes.session_time_stamp")) maxTimeStampAggregation := buildAggregationQuery("max_timestamp", createMaxAggregationQuery("base_attributes.event_end_time_stamp")) maxClientTimestampAggregation := buildAggregationQuery("max_client_timestamp", createMaxAggregationQuery("base_attributes.client_ts")) innerAggregationQuery := createAggregationQuery(minTimeStampAggregation, maxTimeStampAggregation, maxClientTimestampAggregation) aggregationQuery := createAggregationQuery(buildAggregationQuery("buckets", termsAggregation, innerAggregationQuery)) return createEsQuery(searchQuery, sortQuery, aggregationQuery, createSizeQuery(utils.SessionUpperLimit)) } func getAggsOnSessionIdsForDeviceAttributes(startTimestamp int64, endTimestamp int64, snapshotPerSecond int64, page *es.Page) string { shouldQuery := createShouldQuery(createTermSubQueryForInt("base_attributes.snapshot_per_second", snapshotPerSecond)) rangeQuery := createRangeQuery("base_attributes.client_ts", startTimestamp, endTimestamp) boolQuery := createBoolQuery(createMustQuery(createBoolQuery(shouldQuery), rangeQuery)) searchQuery := createSearchQuery(boolQuery) termsAggregation := createTermsAggregationQueryWithoutSize("base_attributes.session_id") maxBeginningBatteryAggregation := buildAggregationQuery("max_beginning_battery", createMaxAggregationQuery("session_upload_event_attributes.beginning_device_attributes.battery")) minEndBatteryAggregation := buildAggregationQuery("min_end_battery", createMinAggregationQuery("session_upload_event_attributes.end_device_attributes.battery")) minBeginningMemoryAggregation := buildAggregationQuery("min_end_memory", createMinAggregationQuery("session_upload_event_attributes.end_device_attributes.memory")) maxEndMemoryAggregation := buildAggregationQuery("max_end_memory", createMaxAggregationQuery("session_upload_event_attributes.end_device_attributes.memory")) maxEventEndTimestampAggregation := buildAggregationQuery("max_event_end_timestamp", createMaxAggregationQuery("base_attributes.event_end_time_stamp")) minClientTimestampAggregation := buildAggregationQuery("min_client_timestamp", createMinAggregationQuery("base_attributes.client_ts")) innerAggregationQuery := createAggregationQuery(maxBeginningBatteryAggregation, minEndBatteryAggregation, minBeginningMemoryAggregation, maxEndMemoryAggregation, maxEventEndTimestampAggregation, minClientTimestampAggregation) rangeQueryOnEndBattery := createRangeQuery("session_upload_event_attributes.end_device_attributes.battery", 1, 101) filterQuery := createCustomQuery("filter", rangeQueryOnEndBattery) aggregationQuery := createAggregationQuery(buildAggregationQuery("buckets", termsAggregation, innerAggregationQuery)) finalAggregationQuery := createAggregationQuery(buildAggregationQuery("filter", filterQuery, aggregationQuery)) return createEsQuery(searchQuery, finalAggregationQuery, createFromQuery(page.PageSize*page.PageNumber), createSizeQuery(0)) } func getUpdateSessionErrorEventsWithSessionIdQuery(sessionIds []string, hasErrors bool) string { shouldQuery := createShouldQuery(createTermSubQuery("base_attributes.session_id", sessionIds...)) mustNotQuery := createMustNotQuery(createTermSubQueryBool("base_attributes.has_errors", hasErrors)) boolQuery := createBoolQuery(shouldQuery, mustNotQuery) searchQuery := createSearchQuery(boolQuery) scriptQuery := createScriptQuery("ctx._source.base_attributes.has_errors = " + fmt.Sprintf("%t", hasErrors)) return createEsQuery(searchQuery, scriptQuery) } func getSessionListFromSessionIdsQuery(sessionIds []string, page *es.Page, sortBy string) string { shouldQuery := createShouldQuery(createTermSubQuery("base_attributes.session_id", sessionIds...)) boolQuery := createBoolQuery(shouldQuery) searchQuery := createSearchQuery(boolQuery) sortQuery := createSortQuery(sortBy, string(page.SortDirection), "strict_date_optional_time_nanos") collapseQuery := createCollapseQuery("base_attributes.session_id") return createEsQuery(searchQuery, sortQuery, collapseQuery, createFromQuery(page.PageSize*page.PageNumber), createSizeQuery(page.PageSize)) } func getPaginatedSessionsFromLabelsQuery(appVersion, deviceIds []string, startTimestamp, endTimestamp int64, page *es.Page, appOs string) string { multipleShouldQuery := []string{ createBoolShouldQuery(createTermSubQuery("base_attributes.device_id", deviceIds...)), createBoolShouldQuery(createTermSubQuery("base_attributes.app_version_code", appVersion...))} appOsMatchQuery := createMatchQuery("base_attributes.app_os", appOs) shouldQuery := strings.Join(multipleShouldQuery, ",") rangeQuery := createRangeQuery("created_at", startTimestamp, endTimestamp) query := createEsQuery( createSearchQuery(createBoolQuery(createMustQuery(shouldQuery, rangeQuery, appOsMatchQuery))), createSortQuery("created_at", string(page.SortDirection), utils.EMPTY), createCollapseQuery("base_attributes.session_id"), createFromQuery(page.PageSize*page.PageNumber), createSizeQuery(page.PageSize), ) return query } func getPaginatedSessionsFromLabelsQueryWithoutTimeRange(appVersion []string, deviceIds []string, page *es.Page, appOs string) string { multipleShouldQuery := []string{ createBoolShouldQuery(createTermSubQuery("base_attributes.device_id", deviceIds...)), createBoolShouldQuery(createTermSubQuery("base_attributes.app_version_code", appVersion...))} appOsMatchQuery := createMatchQuery("base_attributes.app_os", appOs) shouldQuery := strings.Join(multipleShouldQuery, ",") query := createEsQuery( createSearchQuery(createBoolQuery(createMustQuery(shouldQuery, appOsMatchQuery))), createSortQuery("created_at", string(page.SortDirection), utils.EMPTY), createCollapseQuery("base_attributes.session_id"), createFromQuery(page.PageSize*page.PageNumber), createSizeQuery(page.PageSize), ) return query } func getUniqueValuesQueryFromSession(fieldName string) string { termsAggregation := createTermsAggregationQuery(fieldName, utils.SessionUpperLimit) aggregationQuery := createAggregationQuery(buildAggregationQuery("buckets", termsAggregation)) return createEsQuery(aggregationQuery, createSizeQuery(0), `"track_total_hits": false`) }