Files
alfred-be/alfred/repository/sessionRepository.go
2026-03-08 16:14:42 +05:30

277 lines
15 KiB
Go

package repository
import (
"alfred/internal/metrics"
"alfred/model/es"
"alfred/model/ingester"
"alfred/pkg/es7"
"alfred/pkg/log"
"alfred/utils"
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"github.com/elastic/go-elasticsearch/v8/esapi"
"go.uber.org/zap"
"strings"
)
type SessionsRepository interface {
UploadSession(alfredSessionRecordingEvent ingester.SessionUploadRequest, sessionUploadIndex string) (int, error)
FetchSessionWithTimeRange(startTimestamp, endTimestamp int64, page *es.Page, sessionUploadIndexList []string, sortBy, appOs string) (*esapi.Response, error)
FetchSessionsWithSessionIds(sessionIds []string, sessionUploadIndex string) (*esapi.Response, error)
FetchSessionAndSessionDurationWithSessionIds(sessionIds []string, sessionUploadIndex string, page *es.Page, sortBy string) (*esapi.Response, error)
FetchDeviceAttributesForMetrics(startTimestamp int64, endTimestamp int64, snapshotPerSecond int64, sessionUploadIndex string, page *es.Page) (*esapi.Response, error)
UpdateSessionErrorEventsWithSessionId(sessionId []string, sessionUploadIndexList []string, hasErrors bool) error
FetchSessionListFromSessionIds(sessionIds []string, sessionUploadIndex string, page *es.Page, sortBy string) (*esapi.Response, error)
FetchSessionWithLabels(appVersion []string, deviceIds []string, startTimestamp, endTimestamp int64, page *es.Page, sessionUploadSearchIndexList []string, appOs string) (*esapi.Response, error)
FetchUniqueKeys(key string, sessionUploadIndex string) (*esapi.Response, error)
}
type SessionsRepositoryImpl struct {
esClient es7.ElasticSearchClient
}
func NewSessionsRepository(esClient es7.ElasticSearchClient) SessionsRepository {
return &SessionsRepositoryImpl{
esClient: esClient,
}
}
func (r *SessionsRepositoryImpl) UploadSession(alfredSessionRecordingEvent ingester.SessionUploadRequest, sessionUploadIndex string) (int, error) {
year, month, day := utils.GetCurrentDate()
index := sessionUploadIndex + fmt.Sprintf("-%d-%d-%d", year, month, day)
message, err := json.Marshal(alfredSessionRecordingEvent)
if err != nil {
metrics.ElasticSearchIngestionFailureCounter.WithLabelValues(sessionUploadIndex).Inc()
log.Error("error ingesting app session", zap.Error(err))
return 0, err
}
req := esapi.IndexRequest{
Index: index,
DocumentID: alfredSessionRecordingEvent.SessionUploadEventAttributes.EventId,
Body: bytes.NewReader(message),
}
indexResponse, err := req.Do(context.Background(), r.esClient.GetESClient())
if indexResponse != nil {
defer indexResponse.Body.Close()
}
if err != nil {
metrics.ElasticSearchIngestionFailureCounter.WithLabelValues(sessionUploadIndex).Inc()
log.Error("IndexResponse is nil for UploadSession", zap.Error(err))
return 500, err
}
if indexResponse.IsError() {
metrics.ElasticSearchIngestionFailureCounter.WithLabelValues(sessionUploadIndex).Inc()
err = errors.New("create session upload data failed")
}
if err != nil {
metrics.ElasticSearchIngestionFailureCounter.WithLabelValues(sessionUploadIndex).Inc()
log.Error("session upload data insert document in ES failed", zap.String("response", fmt.Sprintf("%v", indexResponse)), zap.Error(err))
return indexResponse.StatusCode, err
}
metrics.ElasticSearchIngestionSuccessCounter.WithLabelValues(sessionUploadIndex).Inc()
return indexResponse.StatusCode, nil
}
func (r *SessionsRepositoryImpl) FetchSessionWithTimeRange(startTimestamp, endTimestamp int64, page *es.Page, sessionUploadIndexList []string, sortBy, appOs string) (*esapi.Response, error) {
content := getAllPaginatedSessionsQuery(startTimestamp, endTimestamp, page, sortBy, appOs)
result, err := r.esClient.FetchESDataMultipleIndex(sessionUploadIndexList, content)
if err != nil {
return nil, err
}
return result, err
}
func (r *SessionsRepositoryImpl) FetchSessionsWithSessionIds(sessionIds []string, sessionUploadIndex string) (*esapi.Response, error) {
content := getSessionsFromSessionIdsQuery(sessionIds)
result, err := r.esClient.FetchESData(sessionUploadIndex, content)
if err != nil {
return nil, err
}
return result, err
}
func (r *SessionsRepositoryImpl) FetchSessionAndSessionDurationWithSessionIds(sessionIds []string, sessionUploadIndex string, page *es.Page, sortBy string) (*esapi.Response, error) {
content := getSessionAndSessionDurationFromSessionIdQuery(sessionIds, page, sortBy)
result, err := r.esClient.FetchESData(sessionUploadIndex, content)
if err != nil {
return nil, err
}
return result, err
}
func (r *SessionsRepositoryImpl) FetchDeviceAttributesForMetrics(startTimestamp int64, endTimestamp int64, snapshotPerSecond int64, sessionUploadIndex string, page *es.Page) (*esapi.Response, error) {
content := getAggsOnSessionIdsForDeviceAttributes(startTimestamp, endTimestamp, snapshotPerSecond, page)
result, err := r.esClient.FetchESData(sessionUploadIndex, content)
if err != nil {
log.Error("Fetching Device Metrics Failed:", zap.Error(err))
return nil, err
}
return result, err
}
func (r *SessionsRepositoryImpl) UpdateSessionErrorEventsWithSessionId(sessionId []string, sessionUploadIndexList []string, hasErrors bool) error {
content := getUpdateSessionErrorEventsWithSessionIdQuery(sessionId, hasErrors)
response, err := r.esClient.UpdateESData(sessionUploadIndexList, content)
defer response.Body.Close()
if err != nil {
log.Error("update session error events with session id failed", zap.Error(err), zap.String("query", content))
return err
}
if response.IsError() {
err = errors.New("update has_errors true in session failed")
log.Error("update session error events with session id failed", zap.Error(err), zap.String("response", fmt.Sprintf("%v", response)), zap.String("query", content))
return err
}
return nil
}
func (r *SessionsRepositoryImpl) FetchSessionListFromSessionIds(sessionIds []string, sessionUploadIndex string, page *es.Page, sortBy string) (*esapi.Response, error) {
content := getSessionListFromSessionIdsQuery(sessionIds, page, sortBy)
result, err := r.esClient.FetchESData(sessionUploadIndex, content)
if err != nil {
return nil, err
}
return result, err
}
func (r *SessionsRepositoryImpl) FetchSessionWithLabels(appVersion []string, deviceIds []string, startTimestamp, endTimestamp int64, page *es.Page, sessionUploadSearchIndexList []string, appOs string) (*esapi.Response, error) {
var content string
if endTimestamp == 0 {
content = getPaginatedSessionsFromLabelsQueryWithoutTimeRange(appVersion, deviceIds, page, appOs)
} else {
content = getPaginatedSessionsFromLabelsQuery(appVersion, deviceIds, startTimestamp, endTimestamp, page, appOs)
}
result, err := r.esClient.FetchESDataMultipleIndex(sessionUploadSearchIndexList, content)
if err != nil {
return nil, err
}
return result, err
}
func (r *SessionsRepositoryImpl) FetchUniqueKeys(key string, sessionUploadIndex string) (*esapi.Response, error) {
content := getUniqueValuesQueryFromSession(key)
result, err := r.esClient.FetchESData(sessionUploadIndex, content)
if err != nil {
return nil, err
}
return result, nil
}
func getAllPaginatedSessionsQuery(startTimestamp int64, endTimestamp int64, page *es.Page, sortBy, appOs string) string {
rangeQuery := createRangeQuery("base_attributes.client_ts", startTimestamp, endTimestamp)
appOsMatchQuery := createMatchQuery("base_attributes.app_os", appOs)
boolQuery := createBoolQuery(createMustQuery(rangeQuery, appOsMatchQuery))
searchQuery := createSearchQuery(boolQuery)
sortQuery := createSortQuery(sortBy, string(page.SortDirection), "strict_date_optional_time_nanos")
collapseQuery := createCollapseQuery("base_attributes.session_id")
return createEsQuery(searchQuery, sortQuery, collapseQuery, createFromQuery(page.PageSize*page.PageNumber), createSizeQuery(page.PageSize))
}
func getSessionsFromSessionIdsQuery(sessionIds []string) string {
shouldQuery := createShouldQuery(createTermSubQuery("base_attributes.session_id", sessionIds...))
boolQuery := createBoolQuery(shouldQuery)
searchQuery := createSearchQuery(boolQuery)
sortQuery := createSortQuery("base_attributes.client_ts", "asc", "strict_date_optional_time_nanos")
return createEsQuery(searchQuery, sortQuery, createSizeQuery(utils.SessionUpperLimit))
}
func getSessionAndSessionDurationFromSessionIdQuery(sessionIds []string, page *es.Page, sortBy string) string {
shouldQuery := createShouldQuery(createTermSubQuery("base_attributes.session_id", sessionIds...))
boolQuery := createBoolQuery(shouldQuery)
searchQuery := createSearchQuery(boolQuery)
sortQuery := createSortQuery(sortBy, string(page.SortDirection), "strict_date_optional_time_nanos")
termsAggregation := createTermsAggregationQuery("base_attributes.session_id", int(page.PageSize))
minTimeStampAggregation := buildAggregationQuery("min_timestamp", createMinAggregationQuery("base_attributes.session_time_stamp"))
maxTimeStampAggregation := buildAggregationQuery("max_timestamp", createMaxAggregationQuery("base_attributes.event_end_time_stamp"))
maxClientTimestampAggregation := buildAggregationQuery("max_client_timestamp", createMaxAggregationQuery("base_attributes.client_ts"))
innerAggregationQuery := createAggregationQuery(minTimeStampAggregation, maxTimeStampAggregation, maxClientTimestampAggregation)
aggregationQuery := createAggregationQuery(buildAggregationQuery("buckets", termsAggregation, innerAggregationQuery))
return createEsQuery(searchQuery, sortQuery, aggregationQuery, createSizeQuery(utils.SessionUpperLimit))
}
func getAggsOnSessionIdsForDeviceAttributes(startTimestamp int64, endTimestamp int64, snapshotPerSecond int64, page *es.Page) string {
shouldQuery := createShouldQuery(createTermSubQueryForInt("base_attributes.snapshot_per_second", snapshotPerSecond))
rangeQuery := createRangeQuery("base_attributes.client_ts", startTimestamp, endTimestamp)
boolQuery := createBoolQuery(createMustQuery(createBoolQuery(shouldQuery), rangeQuery))
searchQuery := createSearchQuery(boolQuery)
termsAggregation := createTermsAggregationQueryWithoutSize("base_attributes.session_id")
maxBeginningBatteryAggregation := buildAggregationQuery("max_beginning_battery", createMaxAggregationQuery("session_upload_event_attributes.beginning_device_attributes.battery"))
minEndBatteryAggregation := buildAggregationQuery("min_end_battery", createMinAggregationQuery("session_upload_event_attributes.end_device_attributes.battery"))
minBeginningMemoryAggregation := buildAggregationQuery("min_end_memory", createMinAggregationQuery("session_upload_event_attributes.end_device_attributes.memory"))
maxEndMemoryAggregation := buildAggregationQuery("max_end_memory", createMaxAggregationQuery("session_upload_event_attributes.end_device_attributes.memory"))
maxEventEndTimestampAggregation := buildAggregationQuery("max_event_end_timestamp", createMaxAggregationQuery("base_attributes.event_end_time_stamp"))
minClientTimestampAggregation := buildAggregationQuery("min_client_timestamp", createMinAggregationQuery("base_attributes.client_ts"))
innerAggregationQuery := createAggregationQuery(maxBeginningBatteryAggregation, minEndBatteryAggregation, minBeginningMemoryAggregation, maxEndMemoryAggregation, maxEventEndTimestampAggregation, minClientTimestampAggregation)
rangeQueryOnEndBattery := createRangeQuery("session_upload_event_attributes.end_device_attributes.battery", 1, 101)
filterQuery := createCustomQuery("filter", rangeQueryOnEndBattery)
aggregationQuery := createAggregationQuery(buildAggregationQuery("buckets", termsAggregation, innerAggregationQuery))
finalAggregationQuery := createAggregationQuery(buildAggregationQuery("filter", filterQuery, aggregationQuery))
return createEsQuery(searchQuery, finalAggregationQuery, createFromQuery(page.PageSize*page.PageNumber), createSizeQuery(0))
}
func getUpdateSessionErrorEventsWithSessionIdQuery(sessionIds []string, hasErrors bool) string {
shouldQuery := createShouldQuery(createTermSubQuery("base_attributes.session_id", sessionIds...))
mustNotQuery := createMustNotQuery(createTermSubQueryBool("base_attributes.has_errors", hasErrors))
boolQuery := createBoolQuery(shouldQuery, mustNotQuery)
searchQuery := createSearchQuery(boolQuery)
scriptQuery := createScriptQuery("ctx._source.base_attributes.has_errors = " + fmt.Sprintf("%t", hasErrors))
return createEsQuery(searchQuery, scriptQuery)
}
func getSessionListFromSessionIdsQuery(sessionIds []string, page *es.Page, sortBy string) string {
shouldQuery := createShouldQuery(createTermSubQuery("base_attributes.session_id", sessionIds...))
boolQuery := createBoolQuery(shouldQuery)
searchQuery := createSearchQuery(boolQuery)
sortQuery := createSortQuery(sortBy, string(page.SortDirection), "strict_date_optional_time_nanos")
collapseQuery := createCollapseQuery("base_attributes.session_id")
return createEsQuery(searchQuery, sortQuery, collapseQuery, createFromQuery(page.PageSize*page.PageNumber), createSizeQuery(page.PageSize))
}
func getPaginatedSessionsFromLabelsQuery(appVersion, deviceIds []string, startTimestamp, endTimestamp int64, page *es.Page, appOs string) string {
multipleShouldQuery := []string{
createBoolShouldQuery(createTermSubQuery("base_attributes.device_id", deviceIds...)),
createBoolShouldQuery(createTermSubQuery("base_attributes.app_version_code", appVersion...))}
appOsMatchQuery := createMatchQuery("base_attributes.app_os", appOs)
shouldQuery := strings.Join(multipleShouldQuery, ",")
rangeQuery := createRangeQuery("created_at", startTimestamp, endTimestamp)
query := createEsQuery(
createSearchQuery(createBoolQuery(createMustQuery(shouldQuery, rangeQuery, appOsMatchQuery))),
createSortQuery("created_at", string(page.SortDirection), utils.EMPTY),
createCollapseQuery("base_attributes.session_id"),
createFromQuery(page.PageSize*page.PageNumber),
createSizeQuery(page.PageSize),
)
return query
}
func getPaginatedSessionsFromLabelsQueryWithoutTimeRange(appVersion []string, deviceIds []string, page *es.Page, appOs string) string {
multipleShouldQuery := []string{
createBoolShouldQuery(createTermSubQuery("base_attributes.device_id", deviceIds...)),
createBoolShouldQuery(createTermSubQuery("base_attributes.app_version_code", appVersion...))}
appOsMatchQuery := createMatchQuery("base_attributes.app_os", appOs)
shouldQuery := strings.Join(multipleShouldQuery, ",")
query := createEsQuery(
createSearchQuery(createBoolQuery(createMustQuery(shouldQuery, appOsMatchQuery))),
createSortQuery("created_at", string(page.SortDirection), utils.EMPTY),
createCollapseQuery("base_attributes.session_id"),
createFromQuery(page.PageSize*page.PageNumber),
createSizeQuery(page.PageSize),
)
return query
}
func getUniqueValuesQueryFromSession(fieldName string) string {
termsAggregation := createTermsAggregationQuery(fieldName, utils.SessionUpperLimit)
aggregationQuery := createAggregationQuery(buildAggregationQuery("buckets", termsAggregation))
return createEsQuery(aggregationQuery, createSizeQuery(0), `"track_total_hits": false`)
}