77 lines
2.6 KiB
Go
77 lines
2.6 KiB
Go
package repository
|
|
|
|
import (
|
|
"alfred/internal/metrics"
|
|
"alfred/model/core"
|
|
"alfred/pkg/es7"
|
|
"alfred/pkg/log"
|
|
"alfred/utils"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"github.com/elastic/go-elasticsearch/v8/esapi"
|
|
"go.uber.org/zap"
|
|
"strings"
|
|
)
|
|
|
|
type DeviceMetricsRepository interface {
|
|
InsertDeviceMetrics(deviceMetrics *core.DeviceMetricsModel, index string) error
|
|
GetDeviceMetrics(startTime int64, endTime int64, snapshotPerSecond int64, client string, index []string) (*esapi.Response, error)
|
|
}
|
|
|
|
type DeviceMetricsRepositoryImpl struct {
|
|
esClient es7.ElasticSearchClient
|
|
}
|
|
|
|
func NewDeviceMetricsRepository(esClient es7.ElasticSearchClient) DeviceMetricsRepository {
|
|
return &DeviceMetricsRepositoryImpl{
|
|
esClient: esClient,
|
|
}
|
|
}
|
|
|
|
func (dm *DeviceMetricsRepositoryImpl) InsertDeviceMetrics(deviceMetrics *core.DeviceMetricsModel, deviceIndex string) error {
|
|
|
|
year, month, day := utils.GetCurrentDate()
|
|
index := deviceIndex + fmt.Sprintf("-%d-%d-%d", year, month, day)
|
|
|
|
body, err := json.Marshal(deviceMetrics)
|
|
if err != nil {
|
|
metrics.ElasticSearchIngestionFailureCounter.WithLabelValues(deviceIndex).Inc()
|
|
log.Error("Pushing to Device Metrics Index Failed", zap.Any("Body:", body), zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
req := esapi.IndexRequest{
|
|
Index: index,
|
|
Body: strings.NewReader(string(body)),
|
|
}
|
|
|
|
indexResponse, err := req.Do(context.Background(), dm.esClient.GetESClient())
|
|
if err != nil {
|
|
log.Error("Insert For Device Metrics Failed", zap.Error(err))
|
|
metrics.DeviceMetricsUpdateFailureCounter.WithLabelValues(deviceIndex).Inc()
|
|
return err
|
|
}
|
|
defer indexResponse.Body.Close()
|
|
metrics.DeviceMetricsUpdateSuccessCounter.WithLabelValues(deviceIndex).Inc()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (dm *DeviceMetricsRepositoryImpl) GetDeviceMetrics(startTime int64, endTime int64, snapshotPerSecond int64, client string, index []string) (*esapi.Response, error) {
|
|
content := getDeviceMetricsQuery(startTime, endTime, snapshotPerSecond, client)
|
|
response, err := dm.esClient.FetchESDataMultipleIndex(index, content)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return response, nil
|
|
}
|
|
|
|
func getDeviceMetricsQuery(startTime int64, endTime int64, snapshotPerSecond int64, client string) string {
|
|
mustQuery := createMustQuery(createTermSubQueryForInt("device_metrics.snapshot_per_second", snapshotPerSecond), createTermSubQuery("device_metrics.client", client))
|
|
rangeQuery := createRangeQuery("device_metrics.created_at", startTime, endTime)
|
|
boolQuery := createBoolQuery(createMustQuery(createBoolQuery(mustQuery), rangeQuery))
|
|
searchQuery := createSearchQuery(boolQuery)
|
|
return createEsQuery(searchQuery)
|
|
}
|