TP-55555 | change elastic query to builder queries

This commit is contained in:
varnit-goyal_navi
2024-09-09 07:54:53 +05:30
parent 549a0f4af9
commit 38b4cc9374
5 changed files with 377 additions and 31 deletions

View File

@@ -42,7 +42,7 @@ http:
# Kafka config
kafka:
password: kDia1uC.GI;)Al5eQ)+Q
password: xxx
username: varnitgoyal/varnitgoyal95@gmail.com/ocid1.streampool.oc1.ap-mumbai-1.amaaaaaaotdslraanepwp54txqqxkmg4l6dghrhufiezqkx2lqhndgxoq7pa
brokers: cell-1.streaming.ap-mumbai-1.oci.oraclecloud.com:9092
group:
@@ -67,11 +67,11 @@ DocumentService:
generate_token: DOCUMENT_SERVICE_MOCK_GENERATE_TOKEN
elastic:
addresses: https://localhost:9200
addresses: https://office-server.tail3fba9.ts.net:9200
username: elastic
password: 9457611267
index: cybertron
api_key: U3NUSmFKRUJYaHF5bTJkOUozUU06Z1ZDTE9hZm1RUnlXeHRNY21yeGxfQQ==
api_key:

View File

@@ -6,10 +6,9 @@ import (
"cybertron/configs"
"encoding/json"
elasticsearch8 "github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/typedapi/core/search"
"github.com/elastic/go-elasticsearch/v8/typedapi/types"
"log"
"net/http"
"strings"
)
type ElasticSearchClient struct {
@@ -45,12 +44,9 @@ func (el *ElasticSearchClient) IndexDocument(document interface{}) {
}
}
func (el *ElasticSearchClient) SearchDocuments(query *types.Query, fields []string) ([]map[string]interface{}, error) {
func (el *ElasticSearchClient) SearchDocuments(searchRequest string, fields []string) ([]map[string]interface{}, map[string]interface{}, error) {
res, err := el.client.Search().
Index(el.Config.Index).
Request(&search.Request{
Query: query,
}).Source_(fields).
Index(el.Config.Index).Raw(strings.NewReader(searchRequest)).
Do(context.TODO())
if err != nil {
log.Println("Error getting response: %s", err)
@@ -67,8 +63,16 @@ func (el *ElasticSearchClient) SearchDocuments(query *types.Query, fields []stri
}
results = append(results, doc)
}
aggregations := make(map[string]interface{})
if res.Aggregations != nil {
for aggName, agg := range res.Aggregations {
aggregations[aggName] = agg
}
}
return results, nil
log.Printf("%s", aggregations)
return results, aggregations, nil
}
func (el *ElasticSearchClient) GetDocument(documentID string) (interface{}, error) {

View File

@@ -0,0 +1,247 @@
package utils
import (
"fmt"
"strings"
)
const (
TermQuery = `{ "term": { "%s": "%s" } }`
TermQueryForInt = `{ "term": { "%s": %d } }`
TermQueryForBool = `{ "term": { "%s": %t } }`
RangeQuery = `{ "range": { "%s": { "gte": %d, "lte": %d } } }`
RangeQueryGteString = `{ "range": { "%s": { "gte": "%s" } } }`
MustQuery = `"must": [ %s ] `
MustNotQuery = `"must_not": [ %s ]`
ShouldQuery = `"should": [ %s ] `
BoolQuery = `{ "bool":{ %s } }`
SortQuery = `"sort": [ { "%s": { "order": "%s" } } ]`
CollapseQuery = `"collapse": { "field": "%s" }`
MatchAllQuery = `{ "match_all": {} }`
FromQuery = `"from": %d`
SizeQuery = `"size": %d`
SearchQuery = `"query": %s`
FieldsQuery = `"fields": [ "%s" ]`
EsQuery = "{ %s }"
sourceQuery = `"_source": %t`
AggregationQuery = `"aggs": { %s }`
AggregationQueryFormat = `"%s": { %s }` // aggregation name, aggregation query
TermsAggregationQuery = `"terms": { "field": "%s", "size": %d }`
MinAggregationQuery = `"min": { "field": "%s" }`
MaxAggregationQuery = `"max": { "field": "%s" }`
CardinalityAggregationQuery = `"cardinality": { "field": "%s" }`
FilterAggregationQuery = `"filter": %s`
TrackTotalHitsQuery = `"track_total_hits": %t`
ScriptQuery = `"script": { "source": "%s" , "lang": "painless" }`
TermsAggregationQueryWithOrder = `"terms": { "field": "%s", "size": %d, "order" : { "%s" : "%s" } }`
TermsAggregationQueryWithoutSize = `"terms": { "field": "%s" }`
CustomQuery = `"%s": %s`
BulkUpdateQuery = `{ "update": {"_index": "%s", "_id": "%s" } }`
TermsSubQuery = `{ "terms": { "%s": [ %s ] } }`
MultiSortQuery = `"sort":[ %s ]`
SortQueryFields = `{ "%s": { "order": "%s" } }`
MatchQuery = `{ "match": { "%s": "%s" } }`
SearchAfter = `"search_after": [%s]`
CompositeAggsQuery = `"composite": {"size" : %d, "sources" : [{"%s": {%s}}] }`
TopHitsAggsQuery = `"top_hits":{"_source": {"includes" : ["%s"]}}`
ValueCountAggsQuery = `"value_count" : {"field": "%s"}`
)
const (
COLON = ":"
HYPHEN = "-"
EMPTY = ""
UNDERSCORE = "_"
NEWLINE = "\n"
FORWARD_SLASH = "/"
ASTERISK = "*"
DOT = "."
COMMA = ","
SEMICOLON = ";"
AMPERSAND = "&"
)
func CreateCustomQuery(key string, query string) string {
return fmt.Sprintf(CustomQuery, key, query)
}
func CreateTermSubQuery(key string, ids ...string) string {
var termQueries []string
for _, id := range ids {
idCopy := id
termQueries = append(termQueries, fmt.Sprintf(TermQuery, key, idCopy))
}
return strings.Join(termQueries, ",")
}
func CreateTermSubQueryForInt(key string, ids ...int64) string {
var termQueries []string
for _, id := range ids {
idCopy := id
termQueries = append(termQueries, fmt.Sprintf(TermQueryForInt, key, idCopy))
}
return strings.Join(termQueries, ",")
}
func CreateTermSubQueryBool(key string, ids ...bool) string {
var termQueries []string
for _, id := range ids {
idCopy := id
termQueries = append(termQueries, fmt.Sprintf(TermQueryForBool, key, idCopy))
}
return strings.Join(termQueries, ",")
}
func CreateRangeQuery(key string, greaterThan int64, lessThan int64) string {
return fmt.Sprintf(RangeQuery, key, greaterThan, lessThan)
}
func CreateRangeQueryForGteString(key string, greaterThan string) string {
return fmt.Sprintf(RangeQueryGteString, key, greaterThan)
}
func CreateMustQuery(filters ...string) string {
return fmt.Sprintf(MustQuery, strings.Join(filters, ","))
}
func CreateMatchAllQuery() string {
return MatchAllQuery
}
func CreateMatchQuery(key string, value string) string {
return fmt.Sprintf(MatchQuery, key, value)
}
func CreateShouldQuery(filters ...string) string {
return fmt.Sprintf(ShouldQuery, strings.Join(filters, ","))
}
func CreateBoolQuery(filters ...string) string {
return fmt.Sprintf(BoolQuery, strings.Join(filters, ","))
}
func CreateSearchQuery(filters ...string) string {
return fmt.Sprintf(SearchQuery, strings.Join(filters, ","))
}
func CreateFieldsQuery(fields ...string) string {
return fmt.Sprintf(FieldsQuery, strings.Join(fields, ","))
}
func CreateSortQuery(key string, order string, format string) string {
if format != EMPTY {
order += fmt.Sprintf(`", "format": "%s`, format)
}
return fmt.Sprintf(SortQuery, key, order)
}
func CreateCollapseQuery(key string) string {
return fmt.Sprintf(CollapseQuery, key)
}
func CreateCardinalityAggregationQuery(field string) string {
return fmt.Sprintf(CardinalityAggregationQuery, field)
}
func CreateFromQuery(from int64) string {
return fmt.Sprintf(FromQuery, from)
}
func CreateSizeQuery(size int64) string {
return fmt.Sprintf(SizeQuery, size)
}
func CreateEsQuery(query ...string) string {
return fmt.Sprintf(EsQuery, strings.Join(query, ","))
}
func CreateSourceQuery(source bool) string {
return fmt.Sprintf(sourceQuery, source)
}
func CreateFilterAggregationQuery(value ...string) string {
return fmt.Sprintf(FilterAggregationQuery, strings.Join(value, ","))
}
func CreateAggregationQuery(aggregations ...string) string {
return fmt.Sprintf(AggregationQuery, strings.Join(aggregations, ","))
}
func CreateMinAggregationQuery(field string) string {
return fmt.Sprintf(MinAggregationQuery, field)
}
func CreateMaxAggregationQuery(field string) string {
return fmt.Sprintf(MaxAggregationQuery, field)
}
func CreateTermsAggregationQuery(field string, size int) string {
return fmt.Sprintf(TermsAggregationQuery, field, size)
}
func CreateTermsAggregationQueryWithoutSize(field string) string {
return fmt.Sprintf(TermsAggregationQueryWithoutSize, field)
}
func BuildAggregationQuery(aggregationName string, aggregationQueries ...string) string {
return fmt.Sprintf(AggregationQueryFormat, aggregationName, strings.Join(aggregationQueries, ","))
}
func CreateTrackTotalHitsQuery(trackTotalHits bool) string {
return fmt.Sprintf(TrackTotalHitsQuery, trackTotalHits)
}
func CreateBoolShouldQuery(queries ...string) string {
return fmt.Sprintf(CreateBoolQuery(CreateShouldQuery(strings.Join(queries, ","))))
}
func CreateScriptQuery(script string) string {
return fmt.Sprintf(ScriptQuery, script)
}
func CreateMustNotQuery(filters ...string) string {
return fmt.Sprintf(MustNotQuery, strings.Join(filters, ","))
}
func CreateTermsAggregationQueryWithOrder(field string, size int, filter, order string) string {
return fmt.Sprintf(TermsAggregationQueryWithOrder, field, size, filter, order)
}
func CreateBulkUpdateQuery(index, docId string) string {
return fmt.Sprintf(BulkUpdateQuery, index, docId)
}
func CreateTermsSubQuery(key string, ids []string) string {
var idsList []string
for _, id := range ids {
idsList = append(idsList, fmt.Sprintf(`"%s"`, id))
}
return fmt.Sprintf(TermsSubQuery, key, strings.Join(idsList, ","))
}
func CreateSortQueryFields(key string, order string, format string) string {
if format != EMPTY {
order += fmt.Sprintf(`", "format": "%s`, format)
}
return fmt.Sprintf(SortQueryFields, key, order)
}
func CreateMultiSortQuery(sortQueries ...string) string {
return fmt.Sprintf(MultiSortQuery, strings.Join(sortQueries, ","))
}
func CreateSearchAfterQuery(searchQuery ...string) string {
return fmt.Sprintf(SearchAfter, strings.Join(searchQuery, ","))
}
func CreateCompositeAggsQuery(size int, key string, sources ...string) string {
return fmt.Sprintf(CompositeAggsQuery, size, key, strings.Join(sources, ","))
}
func CreateTopHitsAggsQuery(fields ...string) string {
return fmt.Sprintf(TopHitsAggsQuery, strings.Join(fields, ","))
}
func CreateValueCountAggsQuery(field string) string {
return fmt.Sprintf(ValueCountAggsQuery, field)
}

View File

@@ -31,18 +31,24 @@ type Stacktrace struct {
}
type ExceptionValue struct {
Type string `json:"type"`
Value string `json:"value"`
Stacktrace Stacktrace `json:"stacktrace"`
ProjectId string `json:"project_id,omitempty"`
ReleaseId string `json:"release_id,omitempty"`
Type string `json:"type"`
Value string `json:"value"`
Stacktrace Stacktrace `json:"stacktrace"`
ProjectId string `json:"project_id,omitempty"`
ReleaseId string `json:"release_id,omitempty"`
Breadcrumbs interface{} `json:"breadcrumbs,omitempty"`
Extras interface{} `json:"extras,omitempty"`
Request interface{} `json:"request,omitempty"`
}
type Exception struct {
Values []ExceptionValue `json:"values"`
}
type Payload struct {
Exception Exception `json:"exception"`
Exception Exception `json:"exception"`
Breadcrumbs interface{} `json:"breadcrumbs"`
Request interface{} `json:"request"`
Extra interface{} `json:"extra"`
}
func NewExceptionService(logger *log.Logger, dbClient *gorm.DB, kafkaProducer producer.KProducer) *ExceptionService {
@@ -73,6 +79,9 @@ func (exceptionService *ExceptionService) CatchErrors(c *gin.Context) {
for _, errorItem := range jsonData.Exception.Values {
errorItem.ProjectId = projectID
errorItem.ReleaseId = "release-1"
errorItem.Breadcrumbs = jsonData.Breadcrumbs
errorItem.Extras = jsonData.Extra
errorItem.Request = jsonData.Request
err := exceptionService.kafkaProducer.PublishEvent(errorItem, "kafka-stream", "", nil, encoder.JsonEncoderInstance)
if err != nil {

View File

@@ -3,7 +3,7 @@ package service
import (
"cybertron/internal/client/elastic"
"cybertron/pkg/log"
"github.com/elastic/go-elasticsearch/v8/typedapi/types"
"cybertron/pkg/utils"
"github.com/gin-gonic/gin"
"net/http"
)
@@ -25,25 +25,111 @@ func (s *SearchService) Search(c *gin.Context) {
}
func (s *SearchService) GetErrorDetails(c *gin.Context) {
documentId := c.Query("document_id")
response, _ := s.elasticSearchClient.GetDocument(documentId)
error_hash := c.Query("error_hash")
term_query := utils.CreateTermSubQuery("error_hash", error_hash)
search_query := utils.CreateSearchQuery(term_query)
size_query := utils.CreateSizeQuery(1)
sort_query := utils.CreateSortQuery("created_at", "asc", "")
after_query := utils.CreateSearchAfterQuery("1724732743")
es_query := utils.CreateEsQuery(search_query, size_query, sort_query, after_query)
println(es_query)
// searchRequest := `
//{
// "size": 1,
// "query": {
// "term": {
// "error_hash": {
// "value": "%s"
// }
// }
// },
// "sort": [
// { "created_at": { "order": "asc" } }
// ],
// "search_after": ["1724732743"]
//}
//
// `
searchRequestformatted := es_query
fields := []string{"error", "significant_stack", "title"}
var response, _, _ = s.elasticSearchClient.SearchDocuments(searchRequestformatted, fields)
c.JSON(http.StatusOK, response)
}
func (s *SearchService) GetErrorList(c *gin.Context) {
//todo pagination and aggregation of errors
projectId := c.Query("project_id")
println(projectId)
query := &types.Query{
Term: map[string]types.TermQuery{
"project_id": {
Value: projectId,
},
},
}
fields := []string{"error", "significant_stack", "title"}
var response, _ = s.elasticSearchClient.SearchDocuments(query, fields)
c.JSON(http.StatusOK, response)
term_query := utils.CreateTermSubQuery("project_id", projectId)
search_query := utils.CreateSearchQuery(term_query)
size_query := utils.CreateSizeQuery(0)
top_hits_aggs_name_query := utils.BuildAggregationQuery("unique_errors", utils.CreateTopHitsAggsQuery("error"))
top_hits_aggs_query := utils.CreateAggregationQuery(top_hits_aggs_name_query)
//building composite aggregation
composite_term_query := utils.CreateTermsAggregationQueryWithoutSize("error_hash.keyword")
composite_aggregation_query_without_name := utils.CreateCompositeAggsQuery(3, "error_hash", composite_term_query)
composite_aggs_query := utils.BuildAggregationQuery("errors_by_hash", composite_aggregation_query_without_name, top_hits_aggs_query)
compositeAggsQuery := utils.CreateAggregationQuery(composite_aggs_query)
//final_aggs_query := utils.CreateAggregationQuery(top_hits_value_count, compositeAggsQuery)
final_query := utils.CreateEsQuery(search_query, size_query, compositeAggsQuery)
println("%s", final_query)
// searchRequest := `
//{
// "size": 0,
// "query": {
// "term": {
// "project_id": {
// "value": "%s"
// }
// }
// },
// "aggs": {
// "errors_by_hash": {
// "composite": {
// "size": 3,
// "sources": [
// {
// "error_hash": {
// "terms": {
// "field": "error_hash.keyword"
// }
// }
// }
// ]
// },
// "aggs": {
// "unique_errors": {
// "top_hits": {
// "_source": {
// "includes": ["error", "error_hash"]
// }
// }
// },
// "error_count": {
// "value_count": {
// "field": "error.keyword"
// }
// }
// }
// }
//
// }
//}
//
// `
fields := []string{"error", "significant_stack", "title"}
var _, aggs, _ = s.elasticSearchClient.SearchDocuments(final_query, fields)
c.JSON(http.StatusOK, aggs)
}