277 lines
15 KiB
Go
277 lines
15 KiB
Go
package repository
|
|
|
|
import (
|
|
"alfred/internal/metrics"
|
|
"alfred/model/es"
|
|
"alfred/model/ingester"
|
|
"alfred/pkg/es7"
|
|
"alfred/pkg/log"
|
|
"alfred/utils"
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"github.com/elastic/go-elasticsearch/v8/esapi"
|
|
"go.uber.org/zap"
|
|
"strings"
|
|
)
|
|
|
|
type SessionsRepository interface {
|
|
UploadSession(alfredSessionRecordingEvent ingester.SessionUploadRequest, sessionUploadIndex string) (int, error)
|
|
FetchSessionWithTimeRange(startTimestamp, endTimestamp int64, page *es.Page, sessionUploadIndexList []string, sortBy, appOs string) (*esapi.Response, error)
|
|
FetchSessionsWithSessionIds(sessionIds []string, sessionUploadIndex string) (*esapi.Response, error)
|
|
FetchSessionAndSessionDurationWithSessionIds(sessionIds []string, sessionUploadIndex string, page *es.Page, sortBy string) (*esapi.Response, error)
|
|
FetchDeviceAttributesForMetrics(startTimestamp int64, endTimestamp int64, snapshotPerSecond int64, sessionUploadIndex string, page *es.Page) (*esapi.Response, error)
|
|
UpdateSessionErrorEventsWithSessionId(sessionId []string, sessionUploadIndexList []string, hasErrors bool) error
|
|
FetchSessionListFromSessionIds(sessionIds []string, sessionUploadIndex string, page *es.Page, sortBy string) (*esapi.Response, error)
|
|
FetchSessionWithLabels(appVersion []string, deviceIds []string, startTimestamp, endTimestamp int64, page *es.Page, sessionUploadSearchIndexList []string, appOs string) (*esapi.Response, error)
|
|
FetchUniqueKeys(key string, sessionUploadIndex string) (*esapi.Response, error)
|
|
}
|
|
|
|
type SessionsRepositoryImpl struct {
|
|
esClient es7.ElasticSearchClient
|
|
}
|
|
|
|
func NewSessionsRepository(esClient es7.ElasticSearchClient) SessionsRepository {
|
|
return &SessionsRepositoryImpl{
|
|
esClient: esClient,
|
|
}
|
|
}
|
|
|
|
func (r *SessionsRepositoryImpl) UploadSession(alfredSessionRecordingEvent ingester.SessionUploadRequest, sessionUploadIndex string) (int, error) {
|
|
year, month, day := utils.GetCurrentDate()
|
|
index := sessionUploadIndex + fmt.Sprintf("-%d-%d-%d", year, month, day)
|
|
message, err := json.Marshal(alfredSessionRecordingEvent)
|
|
if err != nil {
|
|
metrics.ElasticSearchIngestionFailureCounter.WithLabelValues(sessionUploadIndex).Inc()
|
|
log.Error("error ingesting app session", zap.Error(err))
|
|
return 0, err
|
|
}
|
|
|
|
req := esapi.IndexRequest{
|
|
Index: index,
|
|
DocumentID: alfredSessionRecordingEvent.SessionUploadEventAttributes.EventId,
|
|
Body: bytes.NewReader(message),
|
|
}
|
|
|
|
indexResponse, err := req.Do(context.Background(), r.esClient.GetESClient())
|
|
if indexResponse != nil {
|
|
defer indexResponse.Body.Close()
|
|
}
|
|
if err != nil {
|
|
metrics.ElasticSearchIngestionFailureCounter.WithLabelValues(sessionUploadIndex).Inc()
|
|
log.Error("IndexResponse is nil for UploadSession", zap.Error(err))
|
|
return 500, err
|
|
}
|
|
|
|
if indexResponse.IsError() {
|
|
metrics.ElasticSearchIngestionFailureCounter.WithLabelValues(sessionUploadIndex).Inc()
|
|
err = errors.New("create session upload data failed")
|
|
}
|
|
|
|
if err != nil {
|
|
metrics.ElasticSearchIngestionFailureCounter.WithLabelValues(sessionUploadIndex).Inc()
|
|
log.Error("session upload data insert document in ES failed", zap.String("response", fmt.Sprintf("%v", indexResponse)), zap.Error(err))
|
|
return indexResponse.StatusCode, err
|
|
}
|
|
metrics.ElasticSearchIngestionSuccessCounter.WithLabelValues(sessionUploadIndex).Inc()
|
|
return indexResponse.StatusCode, nil
|
|
}
|
|
|
|
func (r *SessionsRepositoryImpl) FetchSessionWithTimeRange(startTimestamp, endTimestamp int64, page *es.Page, sessionUploadIndexList []string, sortBy, appOs string) (*esapi.Response, error) {
|
|
content := getAllPaginatedSessionsQuery(startTimestamp, endTimestamp, page, sortBy, appOs)
|
|
result, err := r.esClient.FetchESDataMultipleIndex(sessionUploadIndexList, content)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return result, err
|
|
}
|
|
|
|
func (r *SessionsRepositoryImpl) FetchSessionsWithSessionIds(sessionIds []string, sessionUploadIndex string) (*esapi.Response, error) {
|
|
content := getSessionsFromSessionIdsQuery(sessionIds)
|
|
result, err := r.esClient.FetchESData(sessionUploadIndex, content)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return result, err
|
|
}
|
|
|
|
func (r *SessionsRepositoryImpl) FetchSessionAndSessionDurationWithSessionIds(sessionIds []string, sessionUploadIndex string, page *es.Page, sortBy string) (*esapi.Response, error) {
|
|
content := getSessionAndSessionDurationFromSessionIdQuery(sessionIds, page, sortBy)
|
|
result, err := r.esClient.FetchESData(sessionUploadIndex, content)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return result, err
|
|
}
|
|
|
|
func (r *SessionsRepositoryImpl) FetchDeviceAttributesForMetrics(startTimestamp int64, endTimestamp int64, snapshotPerSecond int64, sessionUploadIndex string, page *es.Page) (*esapi.Response, error) {
|
|
content := getAggsOnSessionIdsForDeviceAttributes(startTimestamp, endTimestamp, snapshotPerSecond, page)
|
|
result, err := r.esClient.FetchESData(sessionUploadIndex, content)
|
|
if err != nil {
|
|
log.Error("Fetching Device Metrics Failed:", zap.Error(err))
|
|
return nil, err
|
|
}
|
|
return result, err
|
|
}
|
|
|
|
func (r *SessionsRepositoryImpl) UpdateSessionErrorEventsWithSessionId(sessionId []string, sessionUploadIndexList []string, hasErrors bool) error {
|
|
content := getUpdateSessionErrorEventsWithSessionIdQuery(sessionId, hasErrors)
|
|
response, err := r.esClient.UpdateESData(sessionUploadIndexList, content)
|
|
defer response.Body.Close()
|
|
if err != nil {
|
|
log.Error("update session error events with session id failed", zap.Error(err), zap.String("query", content))
|
|
return err
|
|
}
|
|
if response.IsError() {
|
|
err = errors.New("update has_errors true in session failed")
|
|
log.Error("update session error events with session id failed", zap.Error(err), zap.String("response", fmt.Sprintf("%v", response)), zap.String("query", content))
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *SessionsRepositoryImpl) FetchSessionListFromSessionIds(sessionIds []string, sessionUploadIndex string, page *es.Page, sortBy string) (*esapi.Response, error) {
|
|
content := getSessionListFromSessionIdsQuery(sessionIds, page, sortBy)
|
|
result, err := r.esClient.FetchESData(sessionUploadIndex, content)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return result, err
|
|
}
|
|
|
|
func (r *SessionsRepositoryImpl) FetchSessionWithLabels(appVersion []string, deviceIds []string, startTimestamp, endTimestamp int64, page *es.Page, sessionUploadSearchIndexList []string, appOs string) (*esapi.Response, error) {
|
|
var content string
|
|
if endTimestamp == 0 {
|
|
content = getPaginatedSessionsFromLabelsQueryWithoutTimeRange(appVersion, deviceIds, page, appOs)
|
|
} else {
|
|
content = getPaginatedSessionsFromLabelsQuery(appVersion, deviceIds, startTimestamp, endTimestamp, page, appOs)
|
|
}
|
|
result, err := r.esClient.FetchESDataMultipleIndex(sessionUploadSearchIndexList, content)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return result, err
|
|
}
|
|
|
|
func (r *SessionsRepositoryImpl) FetchUniqueKeys(key string, sessionUploadIndex string) (*esapi.Response, error) {
|
|
content := getUniqueValuesQueryFromSession(key)
|
|
result, err := r.esClient.FetchESData(sessionUploadIndex, content)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
func getAllPaginatedSessionsQuery(startTimestamp int64, endTimestamp int64, page *es.Page, sortBy, appOs string) string {
|
|
rangeQuery := createRangeQuery("base_attributes.client_ts", startTimestamp, endTimestamp)
|
|
appOsMatchQuery := createMatchQuery("base_attributes.app_os", appOs)
|
|
boolQuery := createBoolQuery(createMustQuery(rangeQuery, appOsMatchQuery))
|
|
searchQuery := createSearchQuery(boolQuery)
|
|
sortQuery := createSortQuery(sortBy, string(page.SortDirection), "strict_date_optional_time_nanos")
|
|
collapseQuery := createCollapseQuery("base_attributes.session_id")
|
|
return createEsQuery(searchQuery, sortQuery, collapseQuery, createFromQuery(page.PageSize*page.PageNumber), createSizeQuery(page.PageSize))
|
|
}
|
|
|
|
func getSessionsFromSessionIdsQuery(sessionIds []string) string {
|
|
shouldQuery := createShouldQuery(createTermSubQuery("base_attributes.session_id", sessionIds...))
|
|
boolQuery := createBoolQuery(shouldQuery)
|
|
searchQuery := createSearchQuery(boolQuery)
|
|
sortQuery := createSortQuery("base_attributes.client_ts", "asc", "strict_date_optional_time_nanos")
|
|
return createEsQuery(searchQuery, sortQuery, createSizeQuery(utils.SessionUpperLimit))
|
|
}
|
|
|
|
func getSessionAndSessionDurationFromSessionIdQuery(sessionIds []string, page *es.Page, sortBy string) string {
|
|
shouldQuery := createShouldQuery(createTermSubQuery("base_attributes.session_id", sessionIds...))
|
|
boolQuery := createBoolQuery(shouldQuery)
|
|
searchQuery := createSearchQuery(boolQuery)
|
|
sortQuery := createSortQuery(sortBy, string(page.SortDirection), "strict_date_optional_time_nanos")
|
|
termsAggregation := createTermsAggregationQuery("base_attributes.session_id", int(page.PageSize))
|
|
minTimeStampAggregation := buildAggregationQuery("min_timestamp", createMinAggregationQuery("base_attributes.session_time_stamp"))
|
|
maxTimeStampAggregation := buildAggregationQuery("max_timestamp", createMaxAggregationQuery("base_attributes.event_end_time_stamp"))
|
|
maxClientTimestampAggregation := buildAggregationQuery("max_client_timestamp", createMaxAggregationQuery("base_attributes.client_ts"))
|
|
innerAggregationQuery := createAggregationQuery(minTimeStampAggregation, maxTimeStampAggregation, maxClientTimestampAggregation)
|
|
aggregationQuery := createAggregationQuery(buildAggregationQuery("buckets", termsAggregation, innerAggregationQuery))
|
|
return createEsQuery(searchQuery, sortQuery, aggregationQuery, createSizeQuery(utils.SessionUpperLimit))
|
|
}
|
|
|
|
func getAggsOnSessionIdsForDeviceAttributes(startTimestamp int64, endTimestamp int64, snapshotPerSecond int64, page *es.Page) string {
|
|
shouldQuery := createShouldQuery(createTermSubQueryForInt("base_attributes.snapshot_per_second", snapshotPerSecond))
|
|
rangeQuery := createRangeQuery("base_attributes.client_ts", startTimestamp, endTimestamp)
|
|
boolQuery := createBoolQuery(createMustQuery(createBoolQuery(shouldQuery), rangeQuery))
|
|
searchQuery := createSearchQuery(boolQuery)
|
|
termsAggregation := createTermsAggregationQueryWithoutSize("base_attributes.session_id")
|
|
maxBeginningBatteryAggregation := buildAggregationQuery("max_beginning_battery", createMaxAggregationQuery("session_upload_event_attributes.beginning_device_attributes.battery"))
|
|
minEndBatteryAggregation := buildAggregationQuery("min_end_battery", createMinAggregationQuery("session_upload_event_attributes.end_device_attributes.battery"))
|
|
minBeginningMemoryAggregation := buildAggregationQuery("min_end_memory", createMinAggregationQuery("session_upload_event_attributes.end_device_attributes.memory"))
|
|
maxEndMemoryAggregation := buildAggregationQuery("max_end_memory", createMaxAggregationQuery("session_upload_event_attributes.end_device_attributes.memory"))
|
|
maxEventEndTimestampAggregation := buildAggregationQuery("max_event_end_timestamp", createMaxAggregationQuery("base_attributes.event_end_time_stamp"))
|
|
minClientTimestampAggregation := buildAggregationQuery("min_client_timestamp", createMinAggregationQuery("base_attributes.client_ts"))
|
|
innerAggregationQuery := createAggregationQuery(maxBeginningBatteryAggregation, minEndBatteryAggregation, minBeginningMemoryAggregation, maxEndMemoryAggregation, maxEventEndTimestampAggregation, minClientTimestampAggregation)
|
|
rangeQueryOnEndBattery := createRangeQuery("session_upload_event_attributes.end_device_attributes.battery", 1, 101)
|
|
filterQuery := createCustomQuery("filter", rangeQueryOnEndBattery)
|
|
aggregationQuery := createAggregationQuery(buildAggregationQuery("buckets", termsAggregation, innerAggregationQuery))
|
|
finalAggregationQuery := createAggregationQuery(buildAggregationQuery("filter", filterQuery, aggregationQuery))
|
|
return createEsQuery(searchQuery, finalAggregationQuery, createFromQuery(page.PageSize*page.PageNumber), createSizeQuery(0))
|
|
|
|
}
|
|
|
|
func getUpdateSessionErrorEventsWithSessionIdQuery(sessionIds []string, hasErrors bool) string {
|
|
shouldQuery := createShouldQuery(createTermSubQuery("base_attributes.session_id", sessionIds...))
|
|
mustNotQuery := createMustNotQuery(createTermSubQueryBool("base_attributes.has_errors", hasErrors))
|
|
boolQuery := createBoolQuery(shouldQuery, mustNotQuery)
|
|
searchQuery := createSearchQuery(boolQuery)
|
|
scriptQuery := createScriptQuery("ctx._source.base_attributes.has_errors = " + fmt.Sprintf("%t", hasErrors))
|
|
return createEsQuery(searchQuery, scriptQuery)
|
|
}
|
|
|
|
func getSessionListFromSessionIdsQuery(sessionIds []string, page *es.Page, sortBy string) string {
|
|
shouldQuery := createShouldQuery(createTermSubQuery("base_attributes.session_id", sessionIds...))
|
|
boolQuery := createBoolQuery(shouldQuery)
|
|
searchQuery := createSearchQuery(boolQuery)
|
|
sortQuery := createSortQuery(sortBy, string(page.SortDirection), "strict_date_optional_time_nanos")
|
|
collapseQuery := createCollapseQuery("base_attributes.session_id")
|
|
return createEsQuery(searchQuery, sortQuery, collapseQuery, createFromQuery(page.PageSize*page.PageNumber), createSizeQuery(page.PageSize))
|
|
}
|
|
|
|
func getPaginatedSessionsFromLabelsQuery(appVersion, deviceIds []string, startTimestamp, endTimestamp int64, page *es.Page, appOs string) string {
|
|
multipleShouldQuery := []string{
|
|
createBoolShouldQuery(createTermSubQuery("base_attributes.device_id", deviceIds...)),
|
|
createBoolShouldQuery(createTermSubQuery("base_attributes.app_version_code", appVersion...))}
|
|
appOsMatchQuery := createMatchQuery("base_attributes.app_os", appOs)
|
|
shouldQuery := strings.Join(multipleShouldQuery, ",")
|
|
rangeQuery := createRangeQuery("created_at", startTimestamp, endTimestamp)
|
|
query := createEsQuery(
|
|
createSearchQuery(createBoolQuery(createMustQuery(shouldQuery, rangeQuery, appOsMatchQuery))),
|
|
createSortQuery("created_at", string(page.SortDirection), utils.EMPTY),
|
|
createCollapseQuery("base_attributes.session_id"),
|
|
createFromQuery(page.PageSize*page.PageNumber),
|
|
createSizeQuery(page.PageSize),
|
|
)
|
|
return query
|
|
}
|
|
|
|
func getPaginatedSessionsFromLabelsQueryWithoutTimeRange(appVersion []string, deviceIds []string, page *es.Page, appOs string) string {
|
|
multipleShouldQuery := []string{
|
|
createBoolShouldQuery(createTermSubQuery("base_attributes.device_id", deviceIds...)),
|
|
createBoolShouldQuery(createTermSubQuery("base_attributes.app_version_code", appVersion...))}
|
|
appOsMatchQuery := createMatchQuery("base_attributes.app_os", appOs)
|
|
shouldQuery := strings.Join(multipleShouldQuery, ",")
|
|
query := createEsQuery(
|
|
createSearchQuery(createBoolQuery(createMustQuery(shouldQuery, appOsMatchQuery))),
|
|
createSortQuery("created_at", string(page.SortDirection), utils.EMPTY),
|
|
createCollapseQuery("base_attributes.session_id"),
|
|
createFromQuery(page.PageSize*page.PageNumber),
|
|
createSizeQuery(page.PageSize),
|
|
)
|
|
return query
|
|
}
|
|
|
|
func getUniqueValuesQueryFromSession(fieldName string) string {
|
|
termsAggregation := createTermsAggregationQuery(fieldName, utils.SessionUpperLimit)
|
|
aggregationQuery := createAggregationQuery(buildAggregationQuery("buckets", termsAggregation))
|
|
return createEsQuery(aggregationQuery, createSizeQuery(0), `"track_total_hits": false`)
|
|
|
|
}
|