Files
alfred-be/alfred/repository/appFragmentsRepository.go
2026-03-08 16:14:42 +05:30

64 lines
2.3 KiB
Go

package repository
import (
"alfred/internal/metrics"
"alfred/model/ingester"
"alfred/pkg/es7"
"alfred/pkg/log"
"alfred/utils"
"context"
"fmt"
"github.com/elastic/go-elasticsearch/v8/esapi"
"go.uber.org/zap"
"strings"
)
type AppFragmentsRepository interface {
CreateFragment(fragmentData ingester.FragmentModel, message string, fragmentIngestionIndex string) error
FetchUniqueFragments(key string, fragmentIngestionIndex string) (*esapi.Response, error)
}
type AppFragmentsRepositoryImpl struct {
esClient es7.ElasticSearchClient
}
func NewAppFragmentsRepository(esClient es7.ElasticSearchClient) AppFragmentsRepository {
return &AppFragmentsRepositoryImpl{
esClient: esClient,
}
}
func (r *AppFragmentsRepositoryImpl) CreateFragment(fragmentData ingester.FragmentModel, message string, fragmentIngestionIndex string) error {
req := esapi.IndexRequest{
Index: fragmentIngestionIndex,
DocumentID: utils.FRAGMENT_NAME + utils.HYPHEN + fragmentData.FragmentAttributes.FragmentName + utils.UNDERSCORE + utils.SCREEN_NAME + utils.HYPHEN + fragmentData.FragmentAttributes.ScreenName + utils.UNDERSCORE + utils.VERTICAL + utils.HYPHEN + fragmentData.FragmentAttributes.Vertical,
Body: strings.NewReader(message),
}
indexResponse, err := req.Do(context.Background(), r.esClient.GetESClient())
if err != nil {
metrics.ElasticSearchIngestionFailureCounter.WithLabelValues(fragmentIngestionIndex).Inc()
log.Error("fragment insertion document in ES failed", zap.String("response", fmt.Sprintf("%v", indexResponse)), zap.Error(err))
return err
}
defer indexResponse.Body.Close()
metrics.ElasticSearchIngestionSuccessCounter.WithLabelValues(fragmentIngestionIndex).Inc()
return nil
}
func (r *AppFragmentsRepositoryImpl) FetchUniqueFragments(key string, fragmentIngestionIndex string) (*esapi.Response, error) {
content := getUniqueValues(key)
result, err := r.esClient.FetchESData(fragmentIngestionIndex, content)
if err != nil {
return nil, err
}
return result, nil
}
func getUniqueValues(key string) string {
termsAggregationQuery := createTermsAggregationQuery(key, utils.EsUpperLimit)
aggregationQuery := createAggregationQuery(buildAggregationQuery("buckets", termsAggregationQuery))
return createEsQuery(aggregationQuery, createSizeQuery(0), createTrackTotalHitsQuery(false))
}