Files
alfred-be/alfred/repository/cruiseControlRepository.go
2026-03-08 16:14:42 +05:30

114 lines
3.7 KiB
Go

package repository
import (
"alfred/internal/metrics"
"alfred/model/core/cruise"
"alfred/pkg/es7"
"alfred/pkg/log"
"context"
"encoding/json"
"fmt"
"github.com/elastic/go-elasticsearch/v8/esapi"
"go.uber.org/zap"
"strings"
)
type CruiseControlRepository interface {
CreateCruiseControlConfig(cruiseControl *cruise.ControlConfig, cruiseControlIndex string) error
FetchPreviousAppVersion(cruiseControlIndex, appOs string) (*esapi.Response, error)
FetchCruiseControlConfig(appVersion, appOs, cruiseControlIndex string) (*esapi.Response, error)
FetchAllCruiseControlAppVersions(cruiseControlIndex, appOs string) (*esapi.Response, error)
}
type CruiseControlRepositoryImpl struct {
esClient es7.ElasticSearchClient
}
func NewCruiseControlRepository(esClient es7.ElasticSearchClient) CruiseControlRepository {
return &CruiseControlRepositoryImpl{
esClient: esClient,
}
}
func (r *CruiseControlRepositoryImpl) CreateCruiseControlConfig(cruiseControl *cruise.ControlConfig, cruiseControlIndex string) error {
bdy, err := json.Marshal(cruiseControl)
if err != nil {
return fmt.Errorf("cruise control request conversion to byte failed: %w", err)
}
req := esapi.IndexRequest{
Index: cruiseControlIndex,
DocumentID: cruiseControl.OsConfig.AppVersion,
Body: strings.NewReader(string(bdy)),
Refresh: "true",
}
indexResponse, err := req.Do(context.Background(), r.esClient.GetESClient())
if err != nil {
metrics.ElasticSearchIngestionFailureCounter.WithLabelValues(cruiseControlIndex).Inc()
log.Error("cruise-control insert document in ES failed", zap.String("response", fmt.Sprintf("%v", indexResponse)), zap.Error(err))
return err
}
defer indexResponse.Body.Close()
metrics.ElasticSearchIngestionSuccessCounter.WithLabelValues(cruiseControlIndex).Inc()
return nil
}
func (r *CruiseControlRepositoryImpl) FetchPreviousAppVersion(cruiseControlIndex, appOs string) (*esapi.Response, error) {
content := getPreviousAppVersionQuery(appOs)
result, err := r.esClient.FetchESData(cruiseControlIndex, content)
if err != nil {
return nil, err
}
return result, nil
}
func (r *CruiseControlRepositoryImpl) FetchCruiseControlConfig(appVersion, appOs, cruiseControlIndex string) (*esapi.Response, error) {
content := getCruiseControlQuery(appVersion, appOs)
result, err := r.esClient.FetchESData(cruiseControlIndex, content)
if err != nil {
return nil, err
}
return result, nil
}
func (r *CruiseControlRepositoryImpl) FetchAllCruiseControlAppVersions(cruiseControlIndex, appOs string) (*esapi.Response, error) {
content := getAllCruiseControlAppVersionQuery(appOs)
result, err := r.esClient.FetchESData(cruiseControlIndex, content)
if err != nil {
return nil, err
}
return result, nil
}
func getPreviousAppVersionQuery(appOs string) string {
query := createEsQuery(
createSearchQuery(createMatchQuery("type", appOs)),
createFieldsQuery("os_config.app_version"),
createSourceQuery(false),
createSortQuery("config_time", "desc", ""),
createSizeQuery(1),
)
return query
}
func getCruiseControlQuery(appVersion, appOs string) string {
var termQueries []string
termQueries = append(termQueries, createTermSubQuery("os_config.app_version.keyword", appVersion))
termQueries = append(termQueries, createTermSubQuery("type", appOs))
mustQuery := createMustQuery(termQueries...)
return createEsQuery(createSearchQuery(createBoolQuery(mustQuery)), createSizeQuery(100))
}
func getAllCruiseControlAppVersionQuery(appOs string) string {
query := createEsQuery(
createSearchQuery(createMatchQuery("type", appOs)),
createFieldsQuery("os_config.app_version"),
createSourceQuery(false),
createSortQuery("os_config.app_version.keyword", "desc", ""),
createSizeQuery(10000),
)
return query
}