Files
alfred-be/alfred/repository/webSessionRepository.go

167 lines
7.3 KiB
Go
Raw Permalink Normal View History

2026-03-08 16:14:42 +05:30
package repository
import (
"alfred/api/request"
"alfred/config"
"alfred/internal/metrics"
"alfred/model/es"
"alfred/model/ingester"
"alfred/pkg/es7"
"alfred/pkg/log"
"alfred/utils"
"bytes"
"context"
"encoding/json"
"fmt"
"github.com/elastic/go-elasticsearch/v8/esapi"
"go.uber.org/zap"
"strings"
)
type WebSessionsRepository interface {
UploadWebSession(sessionUploadRequest ingester.WebSessionUploadRequest, webSessionUploadIndex string) error
FetchWebSessionsWithSessionId(sessionId, webSessionUploadIndex string) (*esapi.Response, error)
FetchAllWebSession(filters request.WebSessionFilters, page *es.Page, webSessionUploadIndexList []string) (*esapi.Response, error)
FetchWebSessionsWithDurationResponse(webSessionIds []string, webSessionUploadIndex string, filters request.WebSessionFilters, page *es.Page) (*esapi.Response, error)
}
type WebSessionsRepositoryImpl struct {
esClient es7.ElasticSearchClient
}
func NewWebSessionsRepository(esClient es7.ElasticSearchClient) WebSessionsRepository {
return &WebSessionsRepositoryImpl{
esClient: esClient,
}
}
func (r *WebSessionsRepositoryImpl) UploadWebSession(sessionUploadRequest ingester.WebSessionUploadRequest, webSessionUploadIndex string) error {
year, month, day := utils.GetCurrentDate()
index := webSessionUploadIndex + fmt.Sprintf("-%d-%d-%d", year, month, day)
message, err := json.Marshal(sessionUploadRequest)
if err != nil {
metrics.ElasticSearchIngestionFailureCounter.WithLabelValues(webSessionUploadIndex).Inc()
log.Error("error ingesting web session", zap.Error(err))
return err
}
documentId := sessionUploadRequest.SessionAttributes.EventId
if sessionUploadRequest.BaseAttributes.Version >= config.GetCollectorConfig().ElasticSearchConfig.MinWebVersionSupportingSingleDoc {
documentId = sessionUploadRequest.BaseAttributes.SessionId
}
req := esapi.IndexRequest{
Index: index,
DocumentID: documentId,
Body: bytes.NewReader(message),
}
indexResponse, err := req.Do(context.Background(), r.esClient.GetESClient())
if err != nil {
metrics.ElasticSearchIngestionFailureCounter.WithLabelValues(webSessionUploadIndex).Inc()
log.Error("web session ingestion failed", zap.String("response", fmt.Sprintf("%v", indexResponse)), zap.Error(err))
return err
}
defer indexResponse.Body.Close()
metrics.ElasticSearchIngestionSuccessCounter.WithLabelValues(webSessionUploadIndex).Inc()
return nil
}
func (r *WebSessionsRepositoryImpl) FetchWebSessionsWithSessionId(sessionId, webSessionUploadIndex string) (*esapi.Response, error) {
content := getWebSessionFromSessionIdQuery(sessionId)
result, err := r.esClient.FetchESData(webSessionUploadIndex, content)
if err != nil {
return nil, err
}
return result, err
}
func (r *WebSessionsRepositoryImpl) FetchAllWebSession(filters request.WebSessionFilters, page *es.Page, webSessionUploadIndexList []string) (*esapi.Response, error) {
content := getAllPaginatedWebSessionsQuery(filters, page)
result, err := r.esClient.FetchESDataMultipleIndex(webSessionUploadIndexList, content)
if err != nil {
return nil, err
}
return result, nil
}
func (r *WebSessionsRepositoryImpl) FetchWebSessionsWithDurationResponse(webSessionIds []string, webSessionUploadIndex string, filters request.WebSessionFilters, page *es.Page) (*esapi.Response, error) {
content := getWebSessionWithDurationQuery(webSessionIds, filters, page)
result, err := r.esClient.FetchESData(webSessionUploadIndex, content)
if err != nil {
return nil, err
}
return result, nil
}
func getWebSessionFromSessionIdQuery(sessionId string) string {
termFilter := createTermSubQuery("base_attributes.session_id", sessionId)
var sortQueryFields []string
sortQueryFields = append(sortQueryFields, createSortQueryFields("base_attributes.client_timestamp", "asc", "strict_date_optional_time_nanos"))
sortQueryFields = append(sortQueryFields, createSortQueryFields("session_attribute.event_id", "asc", ""))
query := createEsQuery(
createSearchQuery(createBoolQuery(createShouldQuery(termFilter))),
createMultiSortQuery(sortQueryFields...),
createSizeQuery(utils.EsUpperLimit),
)
return query
}
func getAllPaginatedWebSessionsQuery(filters request.WebSessionFilters, page *es.Page) string {
projectName, agentId, ticketId, sessionId, deviceId, emailId, phoneNumber := getFilterList(filters)
multipleShouldQuery := []string{createBoolShouldQuery(createTermSubQuery("base_attributes.project_name", projectName...)),
createBoolShouldQuery(createTermSubQuery("base_attributes.metadata.agentId", agentId...)),
createBoolShouldQuery(createTermSubQuery("base_attributes.metadata.ticketId", ticketId...)),
createBoolShouldQuery(createTermSubQuery("base_attributes.metadata.emailId", emailId...)),
createBoolShouldQuery(createTermSubQuery("base_attributes.metadata.phoneNumber", phoneNumber...)),
createBoolShouldQuery(createTermSubQuery("base_attributes.session_id", sessionId...)),
createBoolShouldQuery(createTermSubQuery("base_attributes.device_id", deviceId...))}
shouldQuery := strings.Join(multipleShouldQuery, ",")
rangeQuery := createRangeQuery("base_attributes.client_timestamp", filters.StartTimestamp, filters.EndTimestamp)
query := createEsQuery(
createSearchQuery(createBoolQuery(createMustQuery(shouldQuery, rangeQuery))),
createSortQuery(filters.SortBy, string(page.SortDirection), "strict_date_optional_time_nanos"),
createCollapseQuery("base_attributes.session_id"),
createFromQuery(page.PageSize*page.PageNumber),
createSizeQuery(page.PageSize),
)
return query
}
func getWebSessionWithDurationQuery(webSessionIds []string, filters request.WebSessionFilters, page *es.Page) string {
boolshouldQuery := createBoolShouldQuery(createTermSubQuery("base_attributes.session_id", webSessionIds...))
sortQuery := createSortQuery(filters.SortBy, string(page.SortDirection), "strict_date_optional_time_nanos")
collapseQuery := createCollapseQuery("base_attributes.session_id")
minTimestampAggregationQuery := buildAggregationQuery("min_start_timestamp", createMinAggregationQuery("session_attribute.start_timestamp"))
maxTimestampAggregationQuery := buildAggregationQuery("max_end_timestamp", createMaxAggregationQuery("session_attribute.end_timestamp"))
termsAggregation := createTermsAggregationQuery("base_attributes.session_id", int(page.PageSize))
innerAggregationQuery := createAggregationQuery(minTimestampAggregationQuery, maxTimestampAggregationQuery)
aggregationQuery := createAggregationQuery(buildAggregationQuery("buckets", termsAggregation, innerAggregationQuery))
return createEsQuery(createSearchQuery(boolshouldQuery), sortQuery, collapseQuery, aggregationQuery, createSizeQuery(utils.EsUpperLimit))
}
func getFilterList(filters request.WebSessionFilters) (projectName, agentId, ticketId, sessionId, deviceId, emailId, phoneNumber []string) {
if filters.ProjectName != utils.EMPTY {
projectName = strings.Split(filters.ProjectName, utils.COMMA)
}
if filters.AgentId != utils.EMPTY {
agentId = strings.Split(filters.AgentId, utils.COMMA)
}
if filters.TicketId != utils.EMPTY {
ticketId = strings.Split(filters.TicketId, utils.COMMA)
}
if filters.SessionId != utils.EMPTY {
sessionId = strings.Split(filters.SessionId, utils.COMMA)
}
if len(filters.DeviceId) != 0 {
deviceId = filters.DeviceId
}
if filters.EmailId != utils.EMPTY {
emailId = strings.Split(filters.EmailId, utils.COMMA)
}
return projectName, agentId, ticketId, sessionId, deviceId, emailId, phoneNumber
}