114 lines
3.7 KiB
Go
114 lines
3.7 KiB
Go
package repository
|
|
|
|
import (
|
|
"alfred/internal/metrics"
|
|
"alfred/model/core/cruise"
|
|
"alfred/pkg/es7"
|
|
"alfred/pkg/log"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"github.com/elastic/go-elasticsearch/v8/esapi"
|
|
"go.uber.org/zap"
|
|
"strings"
|
|
)
|
|
|
|
type CruiseControlRepository interface {
|
|
CreateCruiseControlConfig(cruiseControl *cruise.ControlConfig, cruiseControlIndex string) error
|
|
FetchPreviousAppVersion(cruiseControlIndex, appOs string) (*esapi.Response, error)
|
|
FetchCruiseControlConfig(appVersion, appOs, cruiseControlIndex string) (*esapi.Response, error)
|
|
FetchAllCruiseControlAppVersions(cruiseControlIndex, appOs string) (*esapi.Response, error)
|
|
}
|
|
|
|
type CruiseControlRepositoryImpl struct {
|
|
esClient es7.ElasticSearchClient
|
|
}
|
|
|
|
func NewCruiseControlRepository(esClient es7.ElasticSearchClient) CruiseControlRepository {
|
|
return &CruiseControlRepositoryImpl{
|
|
esClient: esClient,
|
|
}
|
|
}
|
|
|
|
func (r *CruiseControlRepositoryImpl) CreateCruiseControlConfig(cruiseControl *cruise.ControlConfig, cruiseControlIndex string) error {
|
|
|
|
bdy, err := json.Marshal(cruiseControl)
|
|
if err != nil {
|
|
return fmt.Errorf("cruise control request conversion to byte failed: %w", err)
|
|
}
|
|
|
|
req := esapi.IndexRequest{
|
|
Index: cruiseControlIndex,
|
|
DocumentID: cruiseControl.OsConfig.AppVersion,
|
|
Body: strings.NewReader(string(bdy)),
|
|
Refresh: "true",
|
|
}
|
|
|
|
indexResponse, err := req.Do(context.Background(), r.esClient.GetESClient())
|
|
if err != nil {
|
|
metrics.ElasticSearchIngestionFailureCounter.WithLabelValues(cruiseControlIndex).Inc()
|
|
log.Error("cruise-control insert document in ES failed", zap.String("response", fmt.Sprintf("%v", indexResponse)), zap.Error(err))
|
|
return err
|
|
}
|
|
defer indexResponse.Body.Close()
|
|
metrics.ElasticSearchIngestionSuccessCounter.WithLabelValues(cruiseControlIndex).Inc()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *CruiseControlRepositoryImpl) FetchPreviousAppVersion(cruiseControlIndex, appOs string) (*esapi.Response, error) {
|
|
content := getPreviousAppVersionQuery(appOs)
|
|
result, err := r.esClient.FetchESData(cruiseControlIndex, content)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
func (r *CruiseControlRepositoryImpl) FetchCruiseControlConfig(appVersion, appOs, cruiseControlIndex string) (*esapi.Response, error) {
|
|
content := getCruiseControlQuery(appVersion, appOs)
|
|
result, err := r.esClient.FetchESData(cruiseControlIndex, content)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return result, nil
|
|
}
|
|
func (r *CruiseControlRepositoryImpl) FetchAllCruiseControlAppVersions(cruiseControlIndex, appOs string) (*esapi.Response, error) {
|
|
content := getAllCruiseControlAppVersionQuery(appOs)
|
|
result, err := r.esClient.FetchESData(cruiseControlIndex, content)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
func getPreviousAppVersionQuery(appOs string) string {
|
|
query := createEsQuery(
|
|
createSearchQuery(createMatchQuery("type", appOs)),
|
|
createFieldsQuery("os_config.app_version"),
|
|
createSourceQuery(false),
|
|
createSortQuery("config_time", "desc", ""),
|
|
createSizeQuery(1),
|
|
)
|
|
return query
|
|
}
|
|
|
|
func getCruiseControlQuery(appVersion, appOs string) string {
|
|
var termQueries []string
|
|
termQueries = append(termQueries, createTermSubQuery("os_config.app_version.keyword", appVersion))
|
|
termQueries = append(termQueries, createTermSubQuery("type", appOs))
|
|
mustQuery := createMustQuery(termQueries...)
|
|
return createEsQuery(createSearchQuery(createBoolQuery(mustQuery)), createSizeQuery(100))
|
|
}
|
|
|
|
func getAllCruiseControlAppVersionQuery(appOs string) string {
|
|
query := createEsQuery(
|
|
createSearchQuery(createMatchQuery("type", appOs)),
|
|
createFieldsQuery("os_config.app_version"),
|
|
createSourceQuery(false),
|
|
createSortQuery("os_config.app_version.keyword", "desc", ""),
|
|
createSizeQuery(10000),
|
|
)
|
|
return query
|
|
}
|