initial commit

This commit is contained in:
2026-03-08 16:14:42 +05:30
parent a6c5792518
commit 9f90b62759
323 changed files with 23869 additions and 3 deletions

View File

@@ -0,0 +1,34 @@
package repositoryAccessLayer
import (
"alfred/mapper"
"alfred/model/ingester"
"alfred/repository"
)
type AppFragmentsAccessLayer interface {
CreateFragment(fragmentData ingester.FragmentModel, message string, fragmentIngestionIndex string) error
FetchUniqueFragments(key string, fragmentIngestionIndex string) ([]string, error)
}
type AppFragmentsAccessLayerImpl struct {
appFragmentsRepository repository.AppFragmentsRepository
}
func NewAppFragmentAccessLayer(appFragmentsRepository repository.AppFragmentsRepository) AppFragmentsAccessLayer {
return &AppFragmentsAccessLayerImpl{
appFragmentsRepository: appFragmentsRepository,
}
}
func (ral *AppFragmentsAccessLayerImpl) CreateFragment(fragmentData ingester.FragmentModel, message string, fragmentIngestionIndex string) error {
return ral.appFragmentsRepository.CreateFragment(fragmentData, message, fragmentIngestionIndex)
}
func (ral *AppFragmentsAccessLayerImpl) FetchUniqueFragments(key string, fragmentIngestionIndex string) ([]string, error) {
result, err := ral.appFragmentsRepository.FetchUniqueFragments(key, fragmentIngestionIndex)
if err != nil {
return nil, err
}
return mapper.MapESResponseToGetUniqueValues(result, key)
}

View File

@@ -0,0 +1,54 @@
package repositoryAccessLayer
import (
"alfred/mapper"
"alfred/model/core/cruise"
"alfred/model/es"
"alfred/repository"
"alfred/utils"
)
type CruiseControlAccessLayer interface {
CreateCruiseControlConfig(cruiseControl *cruise.ControlConfig, cruiseControlIndex string) error
FetchPreviousAppVersion(cruiseControlIndex, appOs string) (string, error)
FetchCruiseControlConfig(appVersion, appOs, cruiseControlIndex string) (*es.ESResponse, error)
FetchAllCruiseControlAppVersions(cruiseControlIndex, appOs string) ([]string, error)
}
type CruiseControlAccessLayerImpl struct {
cruiseControlRepository repository.CruiseControlRepository
}
func NewCruiseControlAccessLayer(cruiseControlRepository repository.CruiseControlRepository) CruiseControlAccessLayer {
return &CruiseControlAccessLayerImpl{
cruiseControlRepository: cruiseControlRepository,
}
}
func (ral *CruiseControlAccessLayerImpl) CreateCruiseControlConfig(cruiseControl *cruise.ControlConfig, cruiseControlIndex string) error {
return ral.cruiseControlRepository.CreateCruiseControlConfig(cruiseControl, cruiseControlIndex)
}
func (ral *CruiseControlAccessLayerImpl) FetchPreviousAppVersion(cruiseControlIndex, appOs string) (string, error) {
result, err := ral.cruiseControlRepository.FetchPreviousAppVersion(cruiseControlIndex, appOs)
if err != nil {
return utils.EMPTY, err
}
return mapper.MapCruiseResponseToString(result)
}
func (ral *CruiseControlAccessLayerImpl) FetchCruiseControlConfig(appVersion, appOs string, cruiseControlIndex string) (*es.ESResponse, error) {
result, err := ral.cruiseControlRepository.FetchCruiseControlConfig(appVersion, appOs, cruiseControlIndex)
if err != nil {
return nil, err
}
return mapper.MapESApiResponseToESResponse(result)
}
func (ral *CruiseControlAccessLayerImpl) FetchAllCruiseControlAppVersions(cruiseControlIndex, appOs string) ([]string, error) {
result, err := ral.cruiseControlRepository.FetchAllCruiseControlAppVersions(cruiseControlIndex, appOs)
if err != nil {
return nil, err
}
return mapper.MapCruiseResponseToList(result)
}

View File

@@ -0,0 +1,35 @@
package repositoryAccessLayer
import (
"alfred/mapper"
"alfred/model/core"
"alfred/repository"
)
type DeviceMetricsAccessLayer interface {
InsertDeviceMetrics(deviceMetrics *core.DeviceMetricsModel, index string) error
GetDeviceMetrics(startTime int64, endTime int64, snapshotPerSecond int64, client string, index []string) ([]core.DeviceMetricsEsResponse, error)
}
type DeviceMetricsAccessLayerImpl struct {
deviceMetricsRepository repository.DeviceMetricsRepository
}
func NewDeviceMetricskAccessLayer(deviceMetricsRepository repository.DeviceMetricsRepository) DeviceMetricsAccessLayer {
return &DeviceMetricsAccessLayerImpl{
deviceMetricsRepository: deviceMetricsRepository,
}
}
func (dm *DeviceMetricsAccessLayerImpl) InsertDeviceMetrics(deviceMetrics *core.DeviceMetricsModel, index string) error {
err := dm.deviceMetricsRepository.InsertDeviceMetrics(deviceMetrics, index)
return err
}
func (dm *DeviceMetricsAccessLayerImpl) GetDeviceMetrics(startTime int64, endTime int64, snapshotPerSecond int64, client string, index []string) ([]core.DeviceMetricsEsResponse, error) {
response, err := dm.deviceMetricsRepository.GetDeviceMetrics(startTime, endTime, snapshotPerSecond, client, index)
if err != nil {
return nil, err
}
return mapper.MapEsApiResponseToDeviceMetricsResponse(response)
}

View File

@@ -0,0 +1,79 @@
package repositoryAccessLayer
import (
"alfred/mapper"
"alfred/model/es"
"alfred/repository"
"errors"
)
type ErrorEventsAccessLayer interface {
UploadErrorEvents(message string, errorsIngestionIndex string) (int, error)
FetchSessionIdFromSessionErrorEventsWithTimeRange(errorEventsUploadIndexList []string, startTime, endTime int64, sessionErrorEventsFilter []string) ([]string, error)
FetchErrorEvents(sessionId string, errorEventsUploadIndex string) ([]es.ErrorEventsResponse, error)
GetErrorEventsLastCronTimestamp(clientName string, errorEventsTimeIndex string) (int64, error)
UpdateErrorEventsLastCronTimestamp(timestamp int64, clientName string, errorEventsTimeIndex string) error
FetchSessionErrorEventsWithKeyValue(keyValueMap map[string][]string, errorEventsIndex string) ([]es.ErrorEventsResponse, error)
UpdateErrorEventsInActiveBulk(errorEventsdocIdList, errorEventsindexList []string) error
}
type ErrorEventsAccessLayerImpl struct {
errorEventsRepository repository.ErrorEventsRepository
}
func NewErrorEventsAccessLayer(errorEventsRepository repository.ErrorEventsRepository) ErrorEventsAccessLayer {
return &ErrorEventsAccessLayerImpl{
errorEventsRepository: errorEventsRepository,
}
}
func (ral *ErrorEventsAccessLayerImpl) UploadErrorEvents(message string, errorsIngestionIndex string) (int, error) {
return ral.errorEventsRepository.UploadErrorEvents(message, errorsIngestionIndex)
}
func (ral *ErrorEventsAccessLayerImpl) FetchSessionIdFromSessionErrorEventsWithTimeRange(errorEventsUploadIndexList []string, startTime, endTime int64, sessionErrorEventsFilter []string) ([]string, error) {
result, err := ral.errorEventsRepository.FetchSessionIdFromSessionErrorEventsWithTimeRange(errorEventsUploadIndexList, startTime, endTime, sessionErrorEventsFilter)
if result.IsError() {
defer result.Body.Close()
err = errors.New("error while fetching session error events session id with time range")
}
if err != nil {
return nil, err
}
return mapper.MapEsResponseAndGetSessionIds(result)
}
func (ral *ErrorEventsAccessLayerImpl) FetchErrorEvents(sessionId string, errorEventsUploadIndex string) ([]es.ErrorEventsResponse, error) {
result, err := ral.errorEventsRepository.FetchErrorEvents(sessionId, errorEventsUploadIndex)
if err != nil {
return nil, err
}
return mapper.MapEsResponseToErrorEventsResponse(result)
}
func (ral *ErrorEventsAccessLayerImpl) FetchSessionErrorEventsWithKeyValue(keyValueMap map[string][]string, errorEventsIndex string) ([]es.ErrorEventsResponse, error) {
result, err := ral.errorEventsRepository.FetchSessionErrorEventsWithKeyValue(keyValueMap, errorEventsIndex)
if result.IsError() {
defer result.Body.Close()
err = errors.New("error while fetching session error events with key value")
}
if err != nil {
return nil, err
}
return mapper.MapEsResponseToErrorEventsResponse(result)
}
func (ral *ErrorEventsAccessLayerImpl) GetErrorEventsLastCronTimestamp(clientName string, errorEventsTimeIndex string) (int64, error) {
result, err := ral.errorEventsRepository.GetErrorEventsLastCronTimestamp(errorEventsTimeIndex)
if err != nil {
return 0, err
}
return mapper.MapEsResponseToTimeStamp(clientName, result)
}
func (ral *ErrorEventsAccessLayerImpl) UpdateErrorEventsLastCronTimestamp(timestamp int64, clientName string, errorEventsTimeIndex string) error {
return ral.errorEventsRepository.UpdateErrorEventsLastCronTimestamp(timestamp, clientName, errorEventsTimeIndex)
}
func (ral *ErrorEventsAccessLayerImpl) UpdateErrorEventsInActiveBulk(errorEventsdocIdList, errorEventsindexList []string) error {
return ral.errorEventsRepository.UpdateErrorEventsInActiveBulk(errorEventsdocIdList, errorEventsindexList)
}

View File

@@ -0,0 +1,95 @@
package repositoryAccessLayer
import (
"alfred/mapper"
"alfred/model/es"
"alfred/pkg/log"
"alfred/repository"
"go.uber.org/zap"
)
type EventsAccessLayer interface {
CreateEventIngester(message string, eventIngestionIndex string) (int, error)
FetchEventsWithLabels(labels, appName, screenName, fragmentName, vertical, appVersion []string, deviceIds []string, snapshotPerSecond []int64, startTime, endTime int64, page *es.Page, eventIngestionIndexList []string, phoneNumber []string, customerId []string, codePushVersion []string, agentEmailId []string, appOs string) ([]es.EventResponse, error)
FetchEventsFromSessionId(sessionId string, page *es.Page, eventIngestionIndex string, withAggregation bool) ([]es.EventResponse, *int64, error)
FetchUniqueKeys(key string, eventIngestionIndex string) ([]string, error)
FetchAllEventsFromSession(sessionId string, page *es.Page, eventIngestionIndex string) ([]es.EventResponse, *int64, error)
FetchEventsFromSession(sessionId string, page *es.Page, eventIngestionIndex, indexName string) ([]es.EventResponse, *int64, error)
FetchZipsFromSession(sessionId string, eventIngestionIndex string, indexName string) ([]string, error)
FetchUniqueKeysWithFilters(key, eventIngestionIndex, minAppVersion string) ([]string, error)
}
type EventsAccessLayerImpl struct {
eventsRepository repository.EventsRepository
}
func NewEventsAccessLayer(eventsRepository repository.EventsRepository) EventsAccessLayer {
return &EventsAccessLayerImpl{
eventsRepository: eventsRepository,
}
}
func (ral *EventsAccessLayerImpl) CreateEventIngester(message string, eventIngestionIndex string) (int, error) {
return ral.eventsRepository.CreateEventIngester(message, eventIngestionIndex)
}
func (ral *EventsAccessLayerImpl) FetchEventsWithLabels(labels, appName, screenName, fragmentName, vertical, appVersion []string, deviceIds []string, snapshotPerSecond []int64, startTime, endTime int64, page *es.Page, eventIngestionIndexList []string, phoneNumber []string, customerId []string, codePushVersion []string, agentEmailId []string, appOs string) ([]es.EventResponse, error) {
result, err := ral.eventsRepository.FetchEventsWithLabels(labels, appName, screenName, fragmentName, vertical, appVersion, deviceIds, snapshotPerSecond, startTime, endTime, page, eventIngestionIndexList, phoneNumber, customerId, codePushVersion, agentEmailId, appOs)
if err != nil {
return nil, err
}
eventsResponse, _, err := mapper.MapEsResponseToEventResponse(result)
if err != nil {
log.Error("elasticsearch search response mapping failed", zap.Error(err))
return nil, err
}
return eventsResponse, nil
}
func (ral *EventsAccessLayerImpl) FetchEventsFromSessionId(sessionId string, page *es.Page, eventIngestionIndex string, withAggregation bool) ([]es.EventResponse, *int64, error) {
result, err := ral.eventsRepository.FetchEventsFromSessionId(sessionId, eventIngestionIndex, withAggregation)
if err != nil {
return nil, nil, err
}
return mapper.MapEsResponseToEventResponse(result)
}
func (ral *EventsAccessLayerImpl) FetchUniqueKeys(key string, eventIngestionIndex string) ([]string, error) {
result, err := ral.eventsRepository.FetchUniqueKeys(key, eventIngestionIndex)
if err != nil {
return nil, err
}
return mapper.MapESResponseToGetUniqueValues(result, key)
}
func (ral *EventsAccessLayerImpl) FetchUniqueKeysWithFilters(key, eventIngestionIndex, minAppVersion string) ([]string, error) {
result, err := ral.eventsRepository.FetchUniqueKeysWithFilters(key, eventIngestionIndex, minAppVersion)
if err != nil {
return nil, err
}
return mapper.MapESResponseToGetUniqueValues(result, key)
}
func (ral *EventsAccessLayerImpl) FetchAllEventsFromSession(sessionId string, page *es.Page, eventIngestionIndex string) ([]es.EventResponse, *int64, error) {
result, err := ral.eventsRepository.FetchAllEventsFromSession(sessionId, eventIngestionIndex)
if err != nil {
return nil, nil, err
}
return mapper.MapEsResponseToEventResponse(result)
}
func (ral *EventsAccessLayerImpl) FetchEventsFromSession(sessionId string, page *es.Page, eventIngestionIndex, indexName string) ([]es.EventResponse, *int64, error) {
result, err := ral.eventsRepository.FetchEventsFromSession(sessionId, eventIngestionIndex, indexName)
if err != nil {
return nil, nil, err
}
return mapper.MapEsResponseToEventResponse(result)
}
func (ral *EventsAccessLayerImpl) FetchZipsFromSession(sessionId string, eventIngestionIndex string, indexName string) ([]string, error) {
result, err := ral.eventsRepository.FetchZipsFromSession(sessionId, eventIngestionIndex, indexName)
if err != nil {
return nil, err
}
return mapper.MapEsResponseToZipResponse(result)
}

View File

@@ -0,0 +1,31 @@
package repositoryAccessLayer
import (
"alfred/repository"
)
type RepositoryAccessLayer struct {
SessionsAccessLayer SessionsAccessLayer
WebSessionsAccessLayer WebSessionsAccessLayer
EventsAccessLayer EventsAccessLayer
CruiseControlAccessLayer CruiseControlAccessLayer
AppFragmentsAccessLayer AppFragmentsAccessLayer
ErrorEventsAccessLayer ErrorEventsAccessLayer
ShedlockAccessLayer ShedlockAccessLayer
VideoGenerationStatusAccessLayer VideoGenerationStatusAccessLayer
DeviceMetricsAccessLayer DeviceMetricsAccessLayer
}
func InitRepositoryAccessLayer(repositories *repository.Repository) *RepositoryAccessLayer {
return &RepositoryAccessLayer{
SessionsAccessLayer: NewSessionsAccessLayer(repositories.SessionsRepository),
WebSessionsAccessLayer: NewWebSessionsAccessLayer(repositories.WebSessionsRepository),
EventsAccessLayer: NewEventsAccessLayer(repositories.EventsRepository),
CruiseControlAccessLayer: NewCruiseControlAccessLayer(repositories.CruiseControlRepository),
AppFragmentsAccessLayer: NewAppFragmentAccessLayer(repositories.AppFragmentRepository),
ErrorEventsAccessLayer: NewErrorEventsAccessLayer(repositories.ErrorEventsRepository),
ShedlockAccessLayer: NewShedlockAccessLayer(repositories.ShedlockRepository),
VideoGenerationStatusAccessLayer: NewVideoGenerationStatusAccessLayer(repositories.VideoGenerationStatusRepository),
DeviceMetricsAccessLayer: NewDeviceMetricskAccessLayer(repositories.DeviceMetricsRepository),
}
}

View File

@@ -0,0 +1,126 @@
package repositoryAccessLayer
import (
"alfred/mapper"
"alfred/model/es"
"alfred/model/ingester"
"alfred/pkg/log"
"alfred/repository"
"go.uber.org/zap"
)
type SessionsAccessLayer interface {
UploadSession(alfredSessionRecordingEvent ingester.SessionUploadRequest, sessionUploadIndex string) (int, error)
FetchSessionsWithSessionIds(sessionIds []string, page *es.Page, sessionUploadIndex string) ([]es.SessionResponse, error)
FetchSessionAndSessionDurationWithSessionIds(sessionIds []string, sessionUploadIndex string, page *es.Page, sortBy string) ([]es.SessionResponse, error)
FetchSessionWithTimeRange(startTimestamp, endTimestamp int64, page *es.Page, sessionUploadIndexList []string, sessionUploadIndex, sortBy, appOs string) ([]es.SessionResponse, error)
FetchDeviceAttributesForMetrics(startTimestamp int64, endTimestamp int64, snapshotPerSecond int64, sessionUploadIndex string, page *es.Page) (*es.SessionEsResponseForDeviceAttributes, error)
UpdateSessionErrorEventsWithSessionId(sessionId []string, sessionUploadIndexList []string, hasErrors bool) error
FetchSessionWithSessionDuration(sessionIds []string, sessionUploadIndex string, page *es.Page, sortBy string) ([]es.SessionResponse, error)
FetchSessionWithLabels(appVersion []string, deviceIds []string, startTimestamp, endTimestamp int64, page *es.Page, sessionUploadSearchIndexList []string, appOs string) ([]es.SessionResponse, error)
FetchUniqueKeys(key string, sessionUploadIndex string) ([]string, error)
}
type SessionsAccessLayerImpl struct {
sessionsRepository repository.SessionsRepository
}
func NewSessionsAccessLayer(sessionsRepository repository.SessionsRepository) SessionsAccessLayer {
return &SessionsAccessLayerImpl{
sessionsRepository: sessionsRepository,
}
}
func (ral *SessionsAccessLayerImpl) UploadSession(alfredSessionRecordingEvent ingester.SessionUploadRequest, sessionUploadIndex string) (int, error) {
return ral.sessionsRepository.UploadSession(alfredSessionRecordingEvent, sessionUploadIndex)
}
func (ral *SessionsAccessLayerImpl) FetchSessionsWithSessionIds(sessionIds []string, page *es.Page, sessionUploadIndex string) ([]es.SessionResponse, error) {
result, err := ral.sessionsRepository.FetchSessionsWithSessionIds(sessionIds, sessionUploadIndex)
if err != nil {
return nil, err
}
return mapper.MapEsResponseToSessionResponse(result)
}
func (ral *SessionsAccessLayerImpl) FetchSessionAndSessionDurationWithSessionIds(sessionIds []string, sessionUploadIndex string, page *es.Page, sortBy string) ([]es.SessionResponse, error) {
result, err := ral.sessionsRepository.FetchSessionAndSessionDurationWithSessionIds(sessionIds, sessionUploadIndex, page, sortBy)
if err != nil {
return nil, err
}
return mapper.MapEsResponseToSessionWithDurationResponse(result)
}
func (ral *SessionsAccessLayerImpl) FetchSessionWithTimeRange(startTimestamp, endTimestamp int64, page *es.Page, sessionUploadIndexList []string, sessionUploadIndex, sortBy, appOs string) ([]es.SessionResponse, error) {
result, err := ral.sessionsRepository.FetchSessionWithTimeRange(startTimestamp, endTimestamp, page, sessionUploadIndexList, sortBy, appOs)
if err != nil {
return nil, err
}
sessionResponse, err := mapper.MapEsResponseToSessionResponseForAllSessions(result)
if err != nil {
return nil, err
}
sessions := getUniqueSessions(sessionResponse)
sessionWithDurationResult, err := ral.sessionsRepository.FetchSessionAndSessionDurationWithSessionIds(sessions, sessionUploadIndex, page, sortBy)
if err != nil {
return nil, err
}
return mapper.MapEsResponseToSessionWithDurationResponse(sessionWithDurationResult)
}
func (ral *SessionsAccessLayerImpl) FetchDeviceAttributesForMetrics(startTimestamp int64, endTimestamp int64, snapshotPerSecond int64, sessionUploadIndex string, page *es.Page) (*es.SessionEsResponseForDeviceAttributes, error) {
result, err := ral.sessionsRepository.FetchDeviceAttributesForMetrics(startTimestamp, endTimestamp, snapshotPerSecond, sessionUploadIndex, page)
if err != nil {
log.Error("Fetching Device Metrics Failed:", zap.Error(err))
return nil, err
}
return mapper.MapEsApiResponseToSessionDeviceAttributesResponse(result)
}
func (ral *SessionsAccessLayerImpl) UpdateSessionErrorEventsWithSessionId(sessionId []string, sessionUploadIndexList []string, hasErrors bool) error {
return ral.sessionsRepository.UpdateSessionErrorEventsWithSessionId(sessionId, sessionUploadIndexList, hasErrors)
}
func (ral *SessionsAccessLayerImpl) FetchSessionWithSessionDuration(sessionIds []string, sessionUploadIndex string, page *es.Page, sortBy string) ([]es.SessionResponse, error) {
result, err := ral.sessionsRepository.FetchSessionListFromSessionIds(sessionIds, sessionUploadIndex, page, sortBy)
if err != nil {
return nil, err
}
sessionResponse, err := mapper.MapEsResponseToSessionResponseForAllSessions(result)
if err != nil {
return nil, err
}
sessions := getUniqueSessions(sessionResponse)
sessionWithDurationResult, err := ral.sessionsRepository.FetchSessionAndSessionDurationWithSessionIds(sessions, sessionUploadIndex, page, sortBy)
if err != nil {
return nil, err
}
return mapper.MapEsResponseToSessionWithDurationResponse(sessionWithDurationResult)
}
func (ral *SessionsAccessLayerImpl) FetchSessionWithLabels(appVersion []string, deviceIds []string, startTimestamp,
endTimestamp int64, page *es.Page, sessionUploadSearchIndexList []string, appOs string) ([]es.SessionResponse, error) {
result, err := ral.sessionsRepository.FetchSessionWithLabels(appVersion, deviceIds, startTimestamp, endTimestamp,
page, sessionUploadSearchIndexList, appOs)
if err != nil {
return nil, err
}
return mapper.MapEsResponseToSessionResponse(result)
}
func (ral *SessionsAccessLayerImpl) FetchUniqueKeys(key string, sessionUploadIndex string) ([]string, error) {
result, err := ral.sessionsRepository.FetchUniqueKeys(key, sessionUploadIndex)
if err != nil {
return nil, err
}
return mapper.MapESResponseToGetUniqueValues(result, key)
}
func getUniqueSessions(stringSlice []es.SessionResponse) []string {
var uniqueIds []string
for _, entry := range stringSlice {
sessionId := entry.Source.BaseAttributes.SessionId
uniqueIds = append(uniqueIds, sessionId)
}
return uniqueIds
}

View File

@@ -0,0 +1,46 @@
package repositoryAccessLayer
import (
"alfred/mapper"
"alfred/model/common"
"alfred/model/es"
"alfred/repository"
)
type ShedlockAccessLayer interface {
InsertShedlockForCronWithOptimisticControl(shedLock *common.ShedLock, primaryTerm *int, seqNo *int) (int, error)
InsertShedlockForCron(shedLock *common.ShedLock) error
GetShedLockStatus(cronName string) (*es.ShedlockResponse, int, error)
}
type ShedLockAccessLayerImpl struct {
shedlockRepository repository.ShedlockRepository
}
func NewShedlockAccessLayer(shedlockRepository repository.ShedlockRepository) ShedlockAccessLayer {
return &ShedLockAccessLayerImpl{
shedlockRepository: shedlockRepository,
}
}
func (sl *ShedLockAccessLayerImpl) InsertShedlockForCron(shedLock *common.ShedLock) error {
err := sl.shedlockRepository.InsertShedlockForCron(shedLock)
return err
}
func (sl *ShedLockAccessLayerImpl) InsertShedlockForCronWithOptimisticControl(shedLock *common.ShedLock, primaryTerm *int, seqNo *int) (int, error) {
status, err := sl.shedlockRepository.InsertShedlockForCronWithOptimisticControl(shedLock, primaryTerm, seqNo)
return status, err
}
func (sl *ShedLockAccessLayerImpl) GetShedLockStatus(cronName string) (*es.ShedlockResponse, int, error) {
result, err := sl.shedlockRepository.GetShedLockStatus(cronName)
if err != nil {
return nil, 0, err
}
mappedResult := mapper.MapEsapiResponseToShedlockResponse(result)
return mappedResult, result.StatusCode, nil
}

View File

@@ -0,0 +1,45 @@
package repositoryAccessLayer
import (
"alfred/model/core"
"alfred/repository"
"github.com/elastic/go-elasticsearch/v8/esapi"
)
type VideoGenerationStatusAccessLayer interface {
CreateVideoGenerationStatus(videoGenerationStatus core.VideoFragmentStatusModel, message string, videoGenerationStatusIndex string) error
FetchVideoGenerationStatus(sessionId, videoGenerationStatusIndex string) (*esapi.Response, error)
UpdateFragmentVideoGenerationStatus(sessionId, videoGenerationStatusIndex, eventId, status, processedZipName string, currentFragment int64) (int, error)
UpdateVideoGenerationStatus(sessionId, videoGenerationStatusIndex, status string, videoGeneratedTillNow int) (int, error)
UpdateVideoDimensions(sessionId, videoGenerationStatusIndex string, width, height int) error
}
type VideoGenerationStatusAccessLayerImpl struct {
videoGenerationStatusRepository repository.VideoGenerationStatusRepository
}
func NewVideoGenerationStatusAccessLayer(videoGenerationStatusRepository repository.VideoGenerationStatusRepository) VideoGenerationStatusAccessLayer {
return &VideoGenerationStatusAccessLayerImpl{
videoGenerationStatusRepository: videoGenerationStatusRepository,
}
}
func (r *VideoGenerationStatusAccessLayerImpl) CreateVideoGenerationStatus(fragmentData core.VideoFragmentStatusModel, message string, videoGenerationStatusIndex string) error {
return r.videoGenerationStatusRepository.CreateVideoGenerationStatus(fragmentData, message, videoGenerationStatusIndex)
}
func (r *VideoGenerationStatusAccessLayerImpl) FetchVideoGenerationStatus(sessionId, videoGenerationStatusIndex string) (*esapi.Response, error) {
return r.videoGenerationStatusRepository.FetchVideoGenerationStatus(sessionId, videoGenerationStatusIndex)
}
func (r *VideoGenerationStatusAccessLayerImpl) UpdateFragmentVideoGenerationStatus(sessionId, videoGenerationStatusIndex, eventId, status, processedZipName string, currentFragment int64) (int, error) {
return r.videoGenerationStatusRepository.UpdateFragmentVideoGenerationStatus(sessionId, videoGenerationStatusIndex, eventId, status, processedZipName, currentFragment)
}
func (r *VideoGenerationStatusAccessLayerImpl) UpdateVideoGenerationStatus(sessionId, videoGenerationStatusIndex, status string, videoGeneratedTillNow int) (int, error) {
return r.videoGenerationStatusRepository.UpdateVideoGenerationStatus(sessionId, videoGenerationStatusIndex, status, videoGeneratedTillNow)
}
func (r *VideoGenerationStatusAccessLayerImpl) UpdateVideoDimensions(sessionId, videoGenerationStatusIndex string, width, height int) error {
return r.videoGenerationStatusRepository.UpdateVideoDimensions(sessionId, videoGenerationStatusIndex, width, height)
}

View File

@@ -0,0 +1,78 @@
package repositoryAccessLayer
import (
"alfred/api/request"
"alfred/mapper"
"alfred/model/es"
"alfred/model/ingester"
"alfred/pkg/log"
"alfred/repository"
"go.uber.org/zap"
)
type WebSessionsAccessLayer interface {
UploadWebSession(sessionUploadRequest ingester.WebSessionUploadRequest, webSessionUploadIndex string) error
FetchWebSessionsWithSessionId(sessionId, webSessionUploadIndex string) ([]es.WebSessionResponse, error)
FetchAllWebSession(filters request.WebSessionFilters, page *es.Page, webSessionUploadIndexList []string) ([]es.WebSessionResponse, error)
FetchAllWebSessionWithDuration(filters request.WebSessionFilters, page *es.Page, webSessionUploadIndex string, webSessionUploadIndexList []string) ([]es.WebSessionResponse, error)
}
type WebSessionsAccessLayerImpl struct {
webSessionsRepository repository.WebSessionsRepository
}
func NewWebSessionsAccessLayer(webSessionsAccessLayer repository.WebSessionsRepository) WebSessionsAccessLayer {
return &WebSessionsAccessLayerImpl{
webSessionsRepository: webSessionsAccessLayer,
}
}
func (ral *WebSessionsAccessLayerImpl) UploadWebSession(sessionUploadRequest ingester.WebSessionUploadRequest, webSessionUploadIndex string) error {
return ral.webSessionsRepository.UploadWebSession(sessionUploadRequest, webSessionUploadIndex)
}
func (ral *WebSessionsAccessLayerImpl) FetchWebSessionsWithSessionId(sessionId, webSessionUploadIndex string) ([]es.WebSessionResponse, error) {
result, err := ral.webSessionsRepository.FetchWebSessionsWithSessionId(sessionId, webSessionUploadIndex)
if err != nil {
return nil, err
}
return mapper.MapEsResponseToWebSessionResponse(result, &es.Page{})
}
func (ral *WebSessionsAccessLayerImpl) FetchAllWebSession(filters request.WebSessionFilters, page *es.Page, webSessionUploadIndexList []string) ([]es.WebSessionResponse, error) {
result, err := ral.webSessionsRepository.FetchAllWebSession(filters, page, webSessionUploadIndexList)
if err != nil {
log.Error("error while fetching all web session in FetchAllWebSession", zap.Error(err))
return nil, err
}
return mapper.MapEsResponseToWebSessionResponse(result, page)
}
func (ral *WebSessionsAccessLayerImpl) FetchAllWebSessionWithDuration(filters request.WebSessionFilters, page *es.Page, webSessionUploadIndex string, webSessionUploadIndexList []string) ([]es.WebSessionResponse, error) {
result, err := ral.webSessionsRepository.FetchAllWebSession(filters, page, webSessionUploadIndexList)
if err != nil {
log.Error("error while fetching all web session in FetchAllWebSessionWithDuration", zap.Error(err))
return nil, err
}
webSessionResponse, err := mapper.MapEsResponseToWebSessionResponse(result, page)
if err != nil {
log.Error("error while mapping es response to web session response in FetchAllWebSessionWithDuration", zap.Error(err))
return nil, err
}
uniqueWebSessions := getUniqueWebSessions(webSessionResponse)
webSessionWithDurationResult, err := ral.webSessionsRepository.FetchWebSessionsWithDurationResponse(uniqueWebSessions, webSessionUploadIndex, filters, page)
if err != nil {
log.Error("error while fetching web session with duration in FetchAllWebSessionWithDuration", zap.Error(err))
return nil, err
}
return mapper.MapEsResponseToWebSessionResponseWithDuration(webSessionWithDurationResult, page)
}
func getUniqueWebSessions(stringSlice []es.WebSessionResponse) []string {
var uniqueIds []string
for _, entry := range stringSlice {
sessionId := entry.Source.WebBaseAttributes.SessionId
uniqueIds = append(uniqueIds, sessionId)
}
return uniqueIds
}