Files
alfred-be/alfred/repository/deviceMetricsRepository.go
2026-03-08 16:14:42 +05:30

77 lines
2.6 KiB
Go

package repository
import (
"alfred/internal/metrics"
"alfred/model/core"
"alfred/pkg/es7"
"alfred/pkg/log"
"alfred/utils"
"context"
"encoding/json"
"fmt"
"github.com/elastic/go-elasticsearch/v8/esapi"
"go.uber.org/zap"
"strings"
)
type DeviceMetricsRepository interface {
InsertDeviceMetrics(deviceMetrics *core.DeviceMetricsModel, index string) error
GetDeviceMetrics(startTime int64, endTime int64, snapshotPerSecond int64, client string, index []string) (*esapi.Response, error)
}
type DeviceMetricsRepositoryImpl struct {
esClient es7.ElasticSearchClient
}
func NewDeviceMetricsRepository(esClient es7.ElasticSearchClient) DeviceMetricsRepository {
return &DeviceMetricsRepositoryImpl{
esClient: esClient,
}
}
func (dm *DeviceMetricsRepositoryImpl) InsertDeviceMetrics(deviceMetrics *core.DeviceMetricsModel, deviceIndex string) error {
year, month, day := utils.GetCurrentDate()
index := deviceIndex + fmt.Sprintf("-%d-%d-%d", year, month, day)
body, err := json.Marshal(deviceMetrics)
if err != nil {
metrics.ElasticSearchIngestionFailureCounter.WithLabelValues(deviceIndex).Inc()
log.Error("Pushing to Device Metrics Index Failed", zap.Any("Body:", body), zap.Error(err))
return err
}
req := esapi.IndexRequest{
Index: index,
Body: strings.NewReader(string(body)),
}
indexResponse, err := req.Do(context.Background(), dm.esClient.GetESClient())
if err != nil {
log.Error("Insert For Device Metrics Failed", zap.Error(err))
metrics.DeviceMetricsUpdateFailureCounter.WithLabelValues(deviceIndex).Inc()
return err
}
defer indexResponse.Body.Close()
metrics.DeviceMetricsUpdateSuccessCounter.WithLabelValues(deviceIndex).Inc()
return nil
}
func (dm *DeviceMetricsRepositoryImpl) GetDeviceMetrics(startTime int64, endTime int64, snapshotPerSecond int64, client string, index []string) (*esapi.Response, error) {
content := getDeviceMetricsQuery(startTime, endTime, snapshotPerSecond, client)
response, err := dm.esClient.FetchESDataMultipleIndex(index, content)
if err != nil {
return nil, err
}
return response, nil
}
func getDeviceMetricsQuery(startTime int64, endTime int64, snapshotPerSecond int64, client string) string {
mustQuery := createMustQuery(createTermSubQueryForInt("device_metrics.snapshot_per_second", snapshotPerSecond), createTermSubQuery("device_metrics.client", client))
rangeQuery := createRangeQuery("device_metrics.created_at", startTime, endTime)
boolQuery := createBoolQuery(createMustQuery(createBoolQuery(mustQuery), rangeQuery))
searchQuery := createSearchQuery(boolQuery)
return createEsQuery(searchQuery)
}