64 lines
2.3 KiB
Go
64 lines
2.3 KiB
Go
package repository
|
|
|
|
import (
|
|
"alfred/internal/metrics"
|
|
"alfred/model/ingester"
|
|
"alfred/pkg/es7"
|
|
"alfred/pkg/log"
|
|
"alfred/utils"
|
|
"context"
|
|
"fmt"
|
|
"github.com/elastic/go-elasticsearch/v8/esapi"
|
|
"go.uber.org/zap"
|
|
"strings"
|
|
)
|
|
|
|
type AppFragmentsRepository interface {
|
|
CreateFragment(fragmentData ingester.FragmentModel, message string, fragmentIngestionIndex string) error
|
|
FetchUniqueFragments(key string, fragmentIngestionIndex string) (*esapi.Response, error)
|
|
}
|
|
|
|
type AppFragmentsRepositoryImpl struct {
|
|
esClient es7.ElasticSearchClient
|
|
}
|
|
|
|
func NewAppFragmentsRepository(esClient es7.ElasticSearchClient) AppFragmentsRepository {
|
|
return &AppFragmentsRepositoryImpl{
|
|
esClient: esClient,
|
|
}
|
|
}
|
|
|
|
func (r *AppFragmentsRepositoryImpl) CreateFragment(fragmentData ingester.FragmentModel, message string, fragmentIngestionIndex string) error {
|
|
req := esapi.IndexRequest{
|
|
Index: fragmentIngestionIndex,
|
|
DocumentID: utils.FRAGMENT_NAME + utils.HYPHEN + fragmentData.FragmentAttributes.FragmentName + utils.UNDERSCORE + utils.SCREEN_NAME + utils.HYPHEN + fragmentData.FragmentAttributes.ScreenName + utils.UNDERSCORE + utils.VERTICAL + utils.HYPHEN + fragmentData.FragmentAttributes.Vertical,
|
|
Body: strings.NewReader(message),
|
|
}
|
|
|
|
indexResponse, err := req.Do(context.Background(), r.esClient.GetESClient())
|
|
if err != nil {
|
|
metrics.ElasticSearchIngestionFailureCounter.WithLabelValues(fragmentIngestionIndex).Inc()
|
|
log.Error("fragment insertion document in ES failed", zap.String("response", fmt.Sprintf("%v", indexResponse)), zap.Error(err))
|
|
return err
|
|
}
|
|
defer indexResponse.Body.Close()
|
|
metrics.ElasticSearchIngestionSuccessCounter.WithLabelValues(fragmentIngestionIndex).Inc()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *AppFragmentsRepositoryImpl) FetchUniqueFragments(key string, fragmentIngestionIndex string) (*esapi.Response, error) {
|
|
content := getUniqueValues(key)
|
|
result, err := r.esClient.FetchESData(fragmentIngestionIndex, content)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
func getUniqueValues(key string) string {
|
|
termsAggregationQuery := createTermsAggregationQuery(key, utils.EsUpperLimit)
|
|
aggregationQuery := createAggregationQuery(buildAggregationQuery("buckets", termsAggregationQuery))
|
|
return createEsQuery(aggregationQuery, createSizeQuery(0), createTrackTotalHitsQuery(false))
|
|
}
|