Merge pull request #16 from navi-ppl/TP-55555/error-log-consumer

Tp 55555/fix es query
This commit is contained in:
Varnit Goyal
2024-09-11 15:41:10 +05:30
committed by GitHub
7 changed files with 444 additions and 34 deletions

View File

@@ -67,17 +67,18 @@ DocumentService:
generate_token: DOCUMENT_SERVICE_MOCK_GENERATE_TOKEN generate_token: DOCUMENT_SERVICE_MOCK_GENERATE_TOKEN
elastic: elastic:
addresses: https://localhost:9200 addresses: https://office-server.tail3fba9.ts.net:9200
username: elastic username: elastic
password: 9457611267 password: 9457611267
index: cybertron index: cybertron
api_key: U3NUSmFKRUJYaHF5bTJkOUozUU06Z1ZDTE9hZm1RUnlXeHRNY21yeGxfQQ== api_key: SFQ1aHhwRUJWRVNkVUlrRWZMYWk6ZFpDTnpWYXdTNDYwdGt1QzlaRV9YQQ==
aws: aws:
region: ap-south-1 region: ap-south-1
bucket: navi-cd955a63c4476df0f00c1cea0e4a40d1 bucket: navi-cd955a63c4476df0f00c1cea0e4a40d1
#mjolnir config
mjolnir: mjolnir:
service.url: https://qa-mjolnir-service.np.navi-ppl.in service.url: https://qa-mjolnir-service.np.navi-ppl.in
realm.id: ZicSxsvBwE realm.id: ZicSxsvBwE

View File

@@ -6,10 +6,9 @@ import (
"cybertron/configs" "cybertron/configs"
"encoding/json" "encoding/json"
elasticsearch8 "github.com/elastic/go-elasticsearch/v8" elasticsearch8 "github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/typedapi/core/search"
"github.com/elastic/go-elasticsearch/v8/typedapi/types"
"log" "log"
"net/http" "net/http"
"strings"
) )
type ElasticSearchClient struct { type ElasticSearchClient struct {
@@ -45,16 +44,15 @@ func (el *ElasticSearchClient) IndexDocument(document interface{}) {
} }
} }
func (el *ElasticSearchClient) SearchDocuments(query *types.Query, fields []string) ([]map[string]interface{}, error) { func (el *ElasticSearchClient) SearchDocuments(searchRequest string, fields []string) ([]map[string]interface{}, map[string]interface{}, int64, error) {
res, err := el.client.Search(). res, err := el.client.Search().
Index(el.Config.Index). Index(el.Config.Index).Raw(strings.NewReader(searchRequest)).
Request(&search.Request{
Query: query,
}).Source_(fields).
Do(context.TODO()) Do(context.TODO())
if err != nil { if err != nil {
log.Println("Error getting response: %s", err) log.Println("Error getting response: %s", err)
return nil, nil, 0, err
} }
var results []map[string]interface{} var results []map[string]interface{}
for _, hit := range res.Hits.Hits { for _, hit := range res.Hits.Hits {
var doc map[string]interface{} var doc map[string]interface{}
@@ -67,8 +65,16 @@ func (el *ElasticSearchClient) SearchDocuments(query *types.Query, fields []stri
} }
results = append(results, doc) results = append(results, doc)
} }
aggregations := make(map[string]interface{})
if res.Aggregations != nil {
for aggName, agg := range res.Aggregations {
aggregations[aggName] = agg
}
}
return results, nil log.Printf("%s", aggregations)
return results, aggregations, res.Hits.Total.Value, nil
} }
func (el *ElasticSearchClient) GetDocument(documentID string) (interface{}, error) { func (el *ElasticSearchClient) GetDocument(documentID string) (interface{}, error) {

View File

@@ -1,7 +1,6 @@
package transport package transport
import ( import (
"cybertron/internal/transport/middleware"
"cybertron/internal/transport/router" "cybertron/internal/transport/router"
"fmt" "fmt"
"github.com/gin-contrib/cors" "github.com/gin-contrib/cors"
@@ -49,7 +48,7 @@ func (s *Server) Start() {
AllowCredentials: true, AllowCredentials: true,
MaxAge: 24 * time.Hour, MaxAge: 24 * time.Hour,
})) }))
s.gin.Use(middleware.PermissionMiddleware(s.dependencies.Service.AuthService)) //s.gin.Use(middleware.PermissionMiddleware(s.dependencies.Service.AuthService))
s.router() s.router()
port := configs.GetPort() port := configs.GetPort()

View File

@@ -0,0 +1,17 @@
package utils
import (
"github.com/gin-gonic/gin"
"net/http"
)
func ErrorResponse(c *gin.Context, errMsg string) {
if errMsg == "" {
errMsg = "Something went wrong"
}
c.JSON(http.StatusInternalServerError, gin.H{
"status": "error",
"message": errMsg,
})
}

View File

@@ -0,0 +1,258 @@
package utils
import (
"fmt"
"strings"
)
const (
TermQuery = `{ "term": { "%s": "%s" } }`
TermQueryForInt = `{ "term": { "%s": %d } }`
TermQueryForBool = `{ "term": { "%s": %t } }`
RangeQuery = `{ "range": { "%s": { "gte": %d, "lte": %d } } }`
RangeQueryGteString = `{ "range": { "%s": { "gte": "%s" } } }`
MustQuery = `"must": [ %s ] `
MustNotQuery = `"must_not": [ %s ]`
ShouldQuery = `"should": [ %s ] `
BoolQuery = `{ "bool":{ %s } }`
SortQuery = `"sort": [ { "%s": { "order": "%s" } } ]`
CollapseQuery = `"collapse": { "field": "%s" }`
MatchAllQuery = `{ "match_all": {} }`
FromQuery = `"from": %d`
SizeQuery = `"size": %d`
SearchQuery = `"query": %s`
FieldsQuery = `"fields": [ "%s" ]`
EsQuery = "{ %s }"
sourceQuery = `"_source": %t`
AggregationQuery = `"aggs": { %s }`
AggregationQueryFormat = `"%s": { %s }` // aggregation name, aggregation query
TermsAggregationQuery = `"terms": { "field": "%s", "size": %d }`
MinAggregationQuery = `"min": { "field": "%s" }`
MaxAggregationQuery = `"max": { "field": "%s" }`
CardinalityAggregationQuery = `"cardinality": { "field": "%s" }`
FilterAggregationQuery = `"filter": %s`
TrackTotalHitsQuery = `"track_total_hits": %t`
ScriptQuery = `"script": { "source": "%s" , "lang": "painless" }`
TermsAggregationQueryWithOrder = `"terms": { "field": "%s", "size": %d, "order" : { "%s" : "%s" } }`
TermsAggregationQueryWithoutSize = `"terms": { "field": "%s" }`
CustomQuery = `"%s": %s`
BulkUpdateQuery = `{ "update": {"_index": "%s", "_id": "%s" } }`
TermsSubQuery = `{ "terms": { "%s": [ %s ] } }`
MultiSortQuery = `"sort":[ %s ]`
SortQueryFields = `{ "%s": { "order": "%s" } }`
MatchQuery = `{ "match": { "%s": "%s" } }`
SearchAfter = `"search_after": [%s]`
CompositeAggsQuery = `"composite": {"size" : %d, "sources" : [{"%s": {%s}}] }`
CompositeAggsQueryWithAfterKey = `"composite": {"size" : %d, "after": {"%s": "%s"}, "sources" : [{"%s": {%s}}] }`
TopHitsAggsQuery = `"top_hits":{"size" : %d, "_source": {"includes" : [ %s ]}}`
ValueCountAggsQuery = `"value_count" : {"field": "%s"}`
)
const (
COLON = ":"
HYPHEN = "-"
EMPTY = ""
UNDERSCORE = "_"
NEWLINE = "\n"
FORWARD_SLASH = "/"
ASTERISK = "*"
DOT = "."
COMMA = ","
SEMICOLON = ";"
AMPERSAND = "&"
)
func CreateCustomQuery(key string, query string) string {
return fmt.Sprintf(CustomQuery, key, query)
}
func CreateTermSubQuery(key string, ids ...string) string {
var termQueries []string
for _, id := range ids {
idCopy := id
termQueries = append(termQueries, fmt.Sprintf(TermQuery, key, idCopy))
}
return strings.Join(termQueries, ",")
}
func CreateTermSubQueryForInt(key string, ids ...int64) string {
var termQueries []string
for _, id := range ids {
idCopy := id
termQueries = append(termQueries, fmt.Sprintf(TermQueryForInt, key, idCopy))
}
return strings.Join(termQueries, ",")
}
func CreateTermSubQueryBool(key string, ids ...bool) string {
var termQueries []string
for _, id := range ids {
idCopy := id
termQueries = append(termQueries, fmt.Sprintf(TermQueryForBool, key, idCopy))
}
return strings.Join(termQueries, ",")
}
func CreateRangeQuery(key string, greaterThan int64, lessThan int64) string {
return fmt.Sprintf(RangeQuery, key, greaterThan, lessThan)
}
func CreateRangeQueryForGteString(key string, greaterThan string) string {
return fmt.Sprintf(RangeQueryGteString, key, greaterThan)
}
func CreateMustQuery(filters ...string) string {
return fmt.Sprintf(MustQuery, strings.Join(filters, ","))
}
func CreateMatchAllQuery() string {
return MatchAllQuery
}
func CreateMatchQuery(key string, value string) string {
return fmt.Sprintf(MatchQuery, key, value)
}
func CreateShouldQuery(filters ...string) string {
return fmt.Sprintf(ShouldQuery, strings.Join(filters, ","))
}
func CreateBoolQuery(filters ...string) string {
return fmt.Sprintf(BoolQuery, strings.Join(filters, ","))
}
func CreateSearchQuery(filters ...string) string {
return fmt.Sprintf(SearchQuery, strings.Join(filters, ","))
}
func CreateFieldsQuery(fields ...string) string {
return fmt.Sprintf(FieldsQuery, strings.Join(fields, ","))
}
func CreateSortQuery(key string, order string, format string) string {
if format != EMPTY {
order += fmt.Sprintf(`", "format": "%s`, format)
}
return fmt.Sprintf(SortQuery, key, order)
}
func CreateCollapseQuery(key string) string {
return fmt.Sprintf(CollapseQuery, key)
}
func CreateCardinalityAggregationQuery(field string) string {
return fmt.Sprintf(CardinalityAggregationQuery, field)
}
func CreateFromQuery(from int64) string {
return fmt.Sprintf(FromQuery, from)
}
func CreateSizeQuery(size int64) string {
return fmt.Sprintf(SizeQuery, size)
}
func CreateEsQuery(query ...string) string {
return fmt.Sprintf(EsQuery, strings.Join(query, ","))
}
func CreateSourceQuery(source bool) string {
return fmt.Sprintf(sourceQuery, source)
}
func CreateFilterAggregationQuery(value ...string) string {
return fmt.Sprintf(FilterAggregationQuery, strings.Join(value, ","))
}
func CreateAggregationQuery(aggregations ...string) string {
return fmt.Sprintf(AggregationQuery, strings.Join(aggregations, ","))
}
func CreateMinAggregationQuery(field string) string {
return fmt.Sprintf(MinAggregationQuery, field)
}
func CreateMaxAggregationQuery(field string) string {
return fmt.Sprintf(MaxAggregationQuery, field)
}
func CreateTermsAggregationQuery(field string, size int) string {
return fmt.Sprintf(TermsAggregationQuery, field, size)
}
func CreateTermsAggregationQueryWithoutSize(field string) string {
return fmt.Sprintf(TermsAggregationQueryWithoutSize, field)
}
func BuildAggregationQuery(aggregationName string, aggregationQueries ...string) string {
return fmt.Sprintf(AggregationQueryFormat, aggregationName, strings.Join(aggregationQueries, ","))
}
func CreateTrackTotalHitsQuery(trackTotalHits bool) string {
return fmt.Sprintf(TrackTotalHitsQuery, trackTotalHits)
}
func CreateBoolShouldQuery(queries ...string) string {
return fmt.Sprintf(CreateBoolQuery(CreateShouldQuery(strings.Join(queries, ","))))
}
func CreateScriptQuery(script string) string {
return fmt.Sprintf(ScriptQuery, script)
}
func CreateMustNotQuery(filters ...string) string {
return fmt.Sprintf(MustNotQuery, strings.Join(filters, ","))
}
func CreateTermsAggregationQueryWithOrder(field string, size int, filter, order string) string {
return fmt.Sprintf(TermsAggregationQueryWithOrder, field, size, filter, order)
}
func CreateBulkUpdateQuery(index, docId string) string {
return fmt.Sprintf(BulkUpdateQuery, index, docId)
}
func CreateTermsSubQuery(key string, ids []string) string {
var idsList []string
for _, id := range ids {
idsList = append(idsList, fmt.Sprintf(`"%s"`, id))
}
return fmt.Sprintf(TermsSubQuery, key, strings.Join(idsList, ","))
}
func CreateSortQueryFields(key string, order string, format string) string {
if format != EMPTY {
order += fmt.Sprintf(`", "format": "%s`, format)
}
return fmt.Sprintf(SortQueryFields, key, order)
}
func CreateMultiSortQuery(sortQueries ...string) string {
return fmt.Sprintf(MultiSortQuery, strings.Join(sortQueries, ","))
}
func CreateSearchAfterQuery(searchQuery ...string) string {
return fmt.Sprintf(SearchAfter, strings.Join(searchQuery, ","))
}
func CreateCompositeAggsQuery(size int, key string, afterKey string, afterKeyValue string, sources ...string) string {
if afterKeyValue != EMPTY {
println("in after key query")
return fmt.Sprintf(CompositeAggsQueryWithAfterKey, size, afterKey, afterKeyValue, key, strings.Join(sources, ","))
}
return fmt.Sprintf(CompositeAggsQuery, size, key, strings.Join(sources, ","))
}
func CreateTopHitsAggsQuery(size int, fields []string) string {
var fieldsList []string
for _, field := range fields {
fieldsList = append(fieldsList, fmt.Sprintf(`"%s"`, field))
}
return fmt.Sprintf(TopHitsAggsQuery, size, strings.Join(fieldsList, ", "))
}
func CreateValueCountAggsQuery(field string) string {
return fmt.Sprintf(ValueCountAggsQuery, field)
}

View File

@@ -36,6 +36,9 @@ type ExceptionValue struct {
Stacktrace Stacktrace `json:"stacktrace"` Stacktrace Stacktrace `json:"stacktrace"`
ProjectId string `json:"project_id,omitempty"` ProjectId string `json:"project_id,omitempty"`
ReleaseId string `json:"release_id,omitempty"` ReleaseId string `json:"release_id,omitempty"`
Breadcrumbs interface{} `json:"breadcrumbs,omitempty"`
Extras interface{} `json:"extras,omitempty"`
Request interface{} `json:"request,omitempty"`
} }
type Exception struct { type Exception struct {
@@ -43,6 +46,9 @@ type Exception struct {
} }
type Payload struct { type Payload struct {
Exception Exception `json:"exception"` Exception Exception `json:"exception"`
Breadcrumbs interface{} `json:"breadcrumbs"`
Request interface{} `json:"request"`
Extra interface{} `json:"extra"`
} }
func NewExceptionService(logger *log.Logger, dbClient *gorm.DB, kafkaProducer producer.KProducer) *ExceptionService { func NewExceptionService(logger *log.Logger, dbClient *gorm.DB, kafkaProducer producer.KProducer) *ExceptionService {
@@ -72,7 +78,11 @@ func (exceptionService *ExceptionService) CatchErrors(c *gin.Context) {
for _, errorItem := range jsonData.Exception.Values { for _, errorItem := range jsonData.Exception.Values {
errorItem.ProjectId = projectID errorItem.ProjectId = projectID
//todo update release id
errorItem.ReleaseId = "release-1" errorItem.ReleaseId = "release-1"
errorItem.Breadcrumbs = jsonData.Breadcrumbs
errorItem.Extras = jsonData.Extra
errorItem.Request = jsonData.Request
err := exceptionService.kafkaProducer.PublishEvent(errorItem, "kafka-stream", "", nil, encoder.JsonEncoderInstance) err := exceptionService.kafkaProducer.PublishEvent(errorItem, "kafka-stream", "", nil, encoder.JsonEncoderInstance)
if err != nil { if err != nil {

View File

@@ -3,9 +3,10 @@ package service
import ( import (
"cybertron/internal/client/elastic" "cybertron/internal/client/elastic"
"cybertron/pkg/log" "cybertron/pkg/log"
"github.com/elastic/go-elasticsearch/v8/typedapi/types" "cybertron/pkg/utils"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"net/http" "net/http"
"strconv"
) )
type SearchService struct { type SearchService struct {
@@ -25,25 +26,143 @@ func (s *SearchService) Search(c *gin.Context) {
} }
func (s *SearchService) GetErrorDetails(c *gin.Context) { func (s *SearchService) GetErrorDetails(c *gin.Context) {
documentId := c.Query("document_id") error_hash := c.Query("error_hash")
response, _ := s.elasticSearchClient.GetDocument(documentId) from := c.DefaultQuery("from", "0")
c.JSON(http.StatusOK, response)
fromInNumber, err := strconv.ParseInt(from, 10, 64)
if err != nil {
c.JSON(http.StatusBadRequest, "from should be a number")
return
}
term_query := utils.CreateTermSubQuery("error_hash", error_hash)
search_query := utils.CreateSearchQuery(term_query)
size_query := utils.CreateSizeQuery(1)
sort_query := utils.CreateSortQuery("created_at", "asc", "")
after_query := utils.CreateFromQuery(fromInNumber)
es_query := utils.CreateEsQuery(search_query, size_query, sort_query, after_query)
println(es_query)
// searchRequest := `
//{
// "size": 1,
// "query": {
// "term": {
// "error_hash": {
// "value": "%s"
// }
// }
// },
// "sort": [
// { "created_at": { "order": "asc" } }
// ],
// "search_after": ["1724732743"]
//}
//
// `
searchRequestformatted := es_query
fields := []string{"error", "significant_stack", "title"}
response, _, total, err := s.elasticSearchClient.SearchDocuments(searchRequestformatted, fields)
if err != nil {
utils.ErrorResponse(c, "Failed to search please try again later")
}
c.JSON(http.StatusOK, gin.H{
"results": response,
"total": total,
})
} }
func (s *SearchService) GetErrorList(c *gin.Context) { func (s *SearchService) GetErrorList(c *gin.Context) {
//todo pagination and aggregation of errors //todo pagination and aggregation of errors
projectId := c.Query("project_id") projectId := c.Query("project_id")
println(projectId) size := c.DefaultQuery("size", "10")
query := &types.Query{ afterKey := c.DefaultQuery("after_key", "")
Term: map[string]types.TermQuery{
"project_id": { sizeInNumber, sizeParseError := strconv.Atoi(size)
Value: projectId,
}, if sizeParseError != nil {
}, c.JSON(http.StatusBadRequest, "size should be a number")
return
} }
term_query := utils.CreateTermSubQuery("project_id", projectId)
search_query := utils.CreateSearchQuery(term_query)
size_query := utils.CreateSizeQuery(0)
top_hits_aggs_name_query := utils.BuildAggregationQuery("unique_errors", utils.CreateTopHitsAggsQuery(1, []string{"error", "significant_stack", "created_at", "error_hash"}))
top_hits_aggs_query := utils.CreateAggregationQuery(top_hits_aggs_name_query)
//building composite aggregation
composite_term_query := utils.CreateTermsAggregationQueryWithoutSize("error_hash")
composite_aggregation_query_without_name := utils.CreateCompositeAggsQuery(sizeInNumber, "error_hash", "error_hash", afterKey, composite_term_query)
composite_aggs_query := utils.BuildAggregationQuery("errors_by_hash", composite_aggregation_query_without_name, top_hits_aggs_query)
compositeAggsQuery := utils.CreateAggregationQuery(composite_aggs_query)
//final_aggs_query := utils.CreateAggregationQuery(top_hits_value_count, compositeAggsQuery)
final_query := utils.CreateEsQuery(search_query, size_query, compositeAggsQuery)
println("%s", final_query)
// searchRequest := `
//{
// "size": 0,
// "query": {
// "term": {
// "project_id": {
// "value": "%s"
// }
// }
// },
// "aggs": {
// "errors_by_hash": {
// "composite": {
// "size": 3,
// "sources": [
// {
// "error_hash": {
// "terms": {
// "field": "error_hash.keyword"
// }
// }
// }
// ]
// },
// "aggs": {
// "unique_errors": {
// "top_hits": {
// "_source": {
// "includes": ["error", "error_hash"]
// }
// }
// },
// "error_count": {
// "value_count": {
// "field": "error.keyword"
// }
// }
// }
// }
//
// }
//}
//
// `
fields := []string{"error", "significant_stack", "title"} fields := []string{"error", "significant_stack", "title"}
var response, _ = s.elasticSearchClient.SearchDocuments(query, fields) var _, aggs, total, err = s.elasticSearchClient.SearchDocuments(final_query, fields)
c.JSON(http.StatusOK, response) if err != nil {
utils.ErrorResponse(c, "search failed please try again")
return
}
c.JSON(http.StatusOK, gin.H{
"results": aggs,
"total": total,
})
} }