TP-5555 | cache stack trace

This commit is contained in:
varnit goyal
2024-12-20 15:39:07 +05:30
parent 88405e6a86
commit 5dccc8fa6f
5 changed files with 374 additions and 24 deletions

View File

@@ -12,14 +12,14 @@
<env name="ELASTIC_ADDRESSES" value="https://140.238.250.204:9200" />
<env name="ELASTIC_API_KEY" value="" />
<env name="ELASTIC_INDEX" value="cybertron" />
<env name="ELASTIC_PASSWORD" value="" />
<env name="ELASTIC_PASSWORD" value="9457611266" />
<env name="ELASTIC_USERNAME" value="elastic" />
<env name="ENV" value="local" />
<env name="HOUSTON_REALM_ID" value="ZicSxsvBwE" />
<env name="HOUSTON_SERVICE_URL" value="https://qa-houston.np.navi-sa.in" />
<env name="KAFKA_BROKERS" value="cell-1.streaming.ap-mumbai-1.oci.oraclecloud.com:9092" />
<env name="KAFKA_GROUP" value="kafka-stream" />
<env name="KAFKA_PASSWORD" value="" />
<env name="KAFKA_PASSWORD" value="kDia1uC.GI;)Al5eQ)+Q" />
<env name="KAFKA_SASL_MECHANISM" value="PLAIN" />
<env name="KAFKA_TOPIC" value="kafka-stream" />
<env name="KAFKA_USERNAME" value="varnitgoyal/varnitgoyal95@gmail.com/ocid1.streampool.oc1.ap-mumbai-1.amaaaaaaotdslraanepwp54txqqxkmg4l6dghrhufiezqkx2lqhndgxoq7pa" />

View File

@@ -3,10 +3,13 @@ package db
import (
"context"
"crypto/tls"
"encoding/json"
elasticsearch8 "github.com/elastic/go-elasticsearch/v8"
"log"
"log-enricher/configs"
"log-enricher/pkg/utils"
"net/http"
"strings"
)
type ElasticSearchClient struct {
@@ -45,3 +48,62 @@ func (el *ElasticSearchClient) IndexDocument(document interface{}) error {
}
return nil
}
func (el *ElasticSearchClient) SearchDocuments(searchRequest string, fields []string) ([]map[string]interface{}, map[string]interface{}, int64, error) {
res, err := el.client.Search().
Index("cybertron").Raw(strings.NewReader(searchRequest)).
Do(context.TODO())
if err != nil {
log.Println("Error getting response: %s", err)
return nil, nil, 0, err
}
var results []map[string]interface{}
for _, hit := range res.Hits.Hits {
var doc map[string]interface{}
err := json.Unmarshal(hit.Source_, &doc)
doc["id"] = *hit.Id_
if err != nil {
log.Printf("Error unmarshalling document: %s", err)
continue
}
results = append(results, doc)
}
aggregations := make(map[string]interface{})
if res.Aggregations != nil {
for aggName, agg := range res.Aggregations {
aggregations[aggName] = agg
}
}
log.Printf("%s", aggregations)
return results, aggregations, res.Hits.Total.Value, nil
}
func (el *ElasticSearchClient) GetStackTraceByErrorHash(errorHash string, projectId string) map[string]interface{} {
term_query := utils.CreateTermSubQuery("error_hash", errorHash)
project_id_term_query := utils.CreateTermSubQuery("project_id", projectId)
mustQuery := utils.CreateMustQuery(term_query, project_id_term_query)
boolQuery := utils.CreateBoolQuery(mustQuery)
search_query := utils.CreateSearchQuery(boolQuery)
size_query := utils.CreateSizeQuery(1)
sort_query := utils.CreateSortQuery("created_at", "desc", "")
after_query := utils.CreateFromQuery(0)
es_query := utils.CreateEsQuery(search_query, size_query, sort_query, after_query)
fields := []string{"error", "significant_stack", "title"}
searchRequestformatted := es_query
response, _, _, _ := el.SearchDocuments(searchRequestformatted, fields)
filteredDoc := map[string]interface{}{}
significantStack, ok := response[0]["significant_stack"]
if ok {
filteredDoc["significant_stack"] = significantStack
}
stackFrame, ok := response[0]["stack_trace"]
if ok {
filteredDoc["stack_trace"] = stackFrame
}
return filteredDoc
}

View File

@@ -17,6 +17,7 @@ type Logger struct {
var Log *Logger
func initiateLogger() *zap.Logger {
config := zap.NewProductionConfig()
config.EncoderConfig = ecszap.ECSCompatibleEncoderConfig(config.EncoderConfig)
log, err := config.Build(ecszap.WrapCoreOption(), zap.AddCallerSkip(1))

View File

@@ -0,0 +1,280 @@
package utils
import (
"fmt"
"strings"
)
const (
TermQuery = `{ "term": { "%s": "%s" } }`
TermQueryForInt = `{ "term": { "%s": %d } }`
TermQueryForBool = `{ "term": { "%s": %t } }`
RangeQuery = `{ "range": { "%s": { "gte": %d, "lte": %d } } }`
RangeQueryGteString = `{ "range": { "%s": { "gte": "%s" } } }`
RangeQueryLteString = `{ "range": { "%s": { "lte": "%d" } } }`
MustQuery = `"must": [ %s ] `
MustNotQuery = `"must_not": [ %s ]`
ShouldQuery = `"should": [ %s ] `
BoolQuery = `{ "bool":{ %s } }`
SortQuery = `"sort": [ { "%s": { "order": "%s" } } ]`
CollapseQuery = `"collapse": { "field": "%s" }`
MatchAllQuery = `{ "match_all": {} }`
MultiMatchQuery = `{"multi_match": { "query": "%s", "fields": [%s] }}`
FromQuery = `"from": %d`
SizeQuery = `"size": %d`
SearchQuery = `"query": %s`
FieldsQuery = `"fields": [ "%s" ]`
EsQuery = "{ %s }"
sourceQuery = `"_source": [%s]`
AggregationQuery = `"aggs": { %s }`
AggregationQueryFormat = `"%s": { %s }` // aggregation name, aggregation query
TermsAggregationQuery = `"terms": { "field": "%s", "size": %d, %s}`
MinAggregationQuery = `"min": { "field": "%s" }`
MaxAggregationQuery = `"max": { "field": "%s" }`
CardinalityAggregationQuery = `"cardinality": { "field": "%s" }`
FilterAggregationQuery = `"filter": %s`
TrackTotalHitsQuery = `"track_total_hits": %t`
ScriptQuery = `"script": { "source": "%s" , "lang": "painless" }`
TermsAggregationQueryWithOrder = `"terms": { "field": "%s", "size": %d, "order" : { "%s" : "%s" } }`
TermsAggregationQueryWithoutSize = `"terms": { "field": "%s" }`
CustomQuery = `"%s": %s`
CustomQueryWithBraces = `"%s": {%s}`
BulkUpdateQuery = `{ "update": {"_index": "%s", "_id": "%s" } }`
TermsSubQuery = `{ "terms": { "%s": [ %s ] } }`
MultiSortQuery = `"sort":[ %s ]`
SortQueryFields = `{ "%s": { "order": "%s" } }`
MatchQuery = `{ "match": { "%s": "%s" } }`
SearchAfter = `"search_after": [%s]`
CompositeAggsQuery = `"composite": {"size" : %d, "sources" : [{"%s": {%s}}] }`
CompositeAggsQueryWithAfterKey = `"composite": {"size" : %d, "after": {"%s": "%s"}, "sources" : [{"%s": {%s}}] }`
OrderQuery = `"order": {"%s": "%s"}`
TopHitsAggsQuery = `"top_hits":{"size" : %d, "_source": {"includes" : [ %s ]}}`
ValueCountAggsQuery = `"value_count" : {"field": "%s"}`
)
const (
COLON = ":"
HYPHEN = "-"
EMPTY = ""
UNDERSCORE = "_"
NEWLINE = "\n"
FORWARD_SLASH = "/"
ASTERISK = "*"
DOT = "."
COMMA = ","
SEMICOLON = ";"
AMPERSAND = "&"
)
func CreateCustomQuery(key string, query string) string {
return fmt.Sprintf(CustomQuery, key, query)
}
func CreateCustomQueryWithBraces(key string, query string) string {
return fmt.Sprintf(CustomQueryWithBraces, key, query)
}
func CreateTermSubQuery(key string, ids ...string) string {
var termQueries []string
for _, id := range ids {
idCopy := id
termQueries = append(termQueries, fmt.Sprintf(TermQuery, key, idCopy))
}
return strings.Join(termQueries, ",")
}
func CreateTermSubQueryForInt(key string, ids ...int64) string {
var termQueries []string
for _, id := range ids {
idCopy := id
termQueries = append(termQueries, fmt.Sprintf(TermQueryForInt, key, idCopy))
}
return strings.Join(termQueries, ",")
}
func CreateTermSubQueryBool(key string, ids ...bool) string {
var termQueries []string
for _, id := range ids {
idCopy := id
termQueries = append(termQueries, fmt.Sprintf(TermQueryForBool, key, idCopy))
}
return strings.Join(termQueries, ",")
}
func CreateRangeQuery(key string, greaterThan int64, lessThan int64) string {
return fmt.Sprintf(RangeQuery, key, greaterThan, lessThan)
}
func CreateRangeQueryForGteString(key string, greaterThan string) string {
return fmt.Sprintf(RangeQueryGteString, key, greaterThan)
}
func CreateRangeQueryForLteString(key string, lessThan int64) string {
return fmt.Sprintf(RangeQueryLteString, key, lessThan)
}
func CreateMustQuery(filters ...string) string {
return fmt.Sprintf(MustQuery, strings.Join(filters, ","))
}
func CreateMatchAllQuery() string {
return MatchAllQuery
}
func CreateMatchQuery(key string, value string) string {
return fmt.Sprintf(MatchQuery, key, value)
}
func CreateMultiMatchQuery(query string, fields ...string) string {
// Join fields into a JSON array format: ["field1", "field2", "field3"]
formattedFields := `"` + strings.Join(fields, `", "`) + `"`
return fmt.Sprintf(MultiMatchQuery, query, formattedFields)
}
func CreateShouldQuery(filters ...string) string {
return fmt.Sprintf(ShouldQuery, strings.Join(filters, ","))
}
func CreateBoolQuery(filters ...string) string {
return fmt.Sprintf(BoolQuery, strings.Join(filters, ","))
}
func CreateSearchQuery(filters ...string) string {
return fmt.Sprintf(SearchQuery, strings.Join(filters, ","))
}
func CreateFieldsQuery(fields ...string) string {
return fmt.Sprintf(FieldsQuery, strings.Join(fields, ","))
}
func CreateSortQuery(key string, order string, format string) string {
if format != EMPTY {
order += fmt.Sprintf(`", "format": "%s`, format)
}
return fmt.Sprintf(SortQuery, key, order)
}
func CreateCollapseQuery(key string) string {
return fmt.Sprintf(CollapseQuery, key)
}
func CreateCardinalityAggregationQuery(field string) string {
return fmt.Sprintf(CardinalityAggregationQuery, field)
}
func CreateFromQuery(from int64) string {
return fmt.Sprintf(FromQuery, from)
}
func CreateSizeQuery(size int64) string {
return fmt.Sprintf(SizeQuery, size)
}
func CreateEsQuery(query ...string) string {
return fmt.Sprintf(EsQuery, strings.Join(query, ","))
}
func CreateSourceQuery(fields ...string) string {
formattedFields := `"` + strings.Join(fields, `", "`) + `"`
return fmt.Sprintf(sourceQuery, formattedFields)
}
func CreateFilterAggregationQuery(value ...string) string {
return fmt.Sprintf(FilterAggregationQuery, strings.Join(value, ","))
}
func CreateAggregationQuery(aggregations ...string) string {
return fmt.Sprintf(AggregationQuery, strings.Join(aggregations, ","))
}
func CreateMinAggregationQuery(field string) string {
return fmt.Sprintf(MinAggregationQuery, field)
}
func CreateMaxAggregationQuery(field string) string {
return fmt.Sprintf(MaxAggregationQuery, field)
}
func CreateTermsAggregationQuery(field string, size int, orderQuery string) string {
return fmt.Sprintf(TermsAggregationQuery, field, size, orderQuery)
}
func CreateTermsAggregationQueryWithoutSize(field string) string {
return fmt.Sprintf(TermsAggregationQueryWithoutSize, field)
}
func BuildAggregationQuery(aggregationName string, aggregationQueries ...string) string {
return fmt.Sprintf(AggregationQueryFormat, aggregationName, strings.Join(aggregationQueries, ","))
}
func CreateTrackTotalHitsQuery(trackTotalHits bool) string {
return fmt.Sprintf(TrackTotalHitsQuery, trackTotalHits)
}
func CreateBoolShouldQuery(queries ...string) string {
return fmt.Sprintf(CreateBoolQuery(CreateShouldQuery(strings.Join(queries, ","))))
}
func CreateScriptQuery(script string) string {
return fmt.Sprintf(ScriptQuery, script)
}
func CreateMustNotQuery(filters ...string) string {
return fmt.Sprintf(MustNotQuery, strings.Join(filters, ","))
}
func CreateTermsAggregationQueryWithOrder(field string, size int, filter, order string) string {
return fmt.Sprintf(TermsAggregationQueryWithOrder, field, size, filter, order)
}
func CreateBulkUpdateQuery(index, docId string) string {
return fmt.Sprintf(BulkUpdateQuery, index, docId)
}
func CreateTermsSubQuery(key string, ids []string) string {
var idsList []string
for _, id := range ids {
idsList = append(idsList, fmt.Sprintf(`"%s"`, id))
}
return fmt.Sprintf(TermsSubQuery, key, strings.Join(idsList, ","))
}
func CreateSortQueryFields(key string, order string, format string) string {
if format != EMPTY {
order += fmt.Sprintf(`", "format": "%s`, format)
}
return fmt.Sprintf(SortQueryFields, key, order)
}
func CreateMultiSortQuery(sortQueries ...string) string {
return fmt.Sprintf(MultiSortQuery, strings.Join(sortQueries, ","))
}
func CreateSearchAfterQuery(searchQuery ...string) string {
return fmt.Sprintf(SearchAfter, strings.Join(searchQuery, ","))
}
func CreateCompositeAggsQuery(size int, key string, afterKey string, afterKeyValue string, sources ...string) string {
if afterKeyValue != EMPTY {
println("in after key query")
return fmt.Sprintf(CompositeAggsQueryWithAfterKey, size, afterKey, afterKeyValue, key, strings.Join(sources, ","))
}
return fmt.Sprintf(CompositeAggsQuery, size, key, strings.Join(sources, ","))
}
func CreateTopHitsAggsQuery(size int, fields []string) string {
var fieldsList []string
for _, field := range fields {
fieldsList = append(fieldsList, fmt.Sprintf(`"%s"`, field))
}
return fmt.Sprintf(TopHitsAggsQuery, size, strings.Join(fieldsList, ", "))
}
func CreateValueCountAggsQuery(field string) string {
return fmt.Sprintf(ValueCountAggsQuery, field)
}
func CreateOrderQuery(field string, order string) string {
return fmt.Sprintf(OrderQuery, field, order)
}

View File

@@ -168,30 +168,37 @@ func (ep *ErrorProcessor) ProcessError(error []byte) {
args = []string{string(frames)}
}
command := &symbolicator.Command{
Cmd: symbolicatorCommand,
Args: args,
}
var output symbolicator.SymbolicatedStackTrace
if len(frames) != 0 {
output, err = symbolicator.SymbolicatorClient(command)
}
for _, v := range invalidFrames {
output.Frames = append(output.Frames, symbolicator.SymbolicatedFrame{
Token: v.Filename,
OriginalLine: v.Lineno,
Lines: []string{"non symbolic code"},
Start: 0,
End: 0,
})
}
if err != nil {
ep.logger.Error("error occured in symobilicator client", zap.Error(err))
}
//make md5 hash of error
hash := encoder.Md5Encode(string(frames) + payload.Value)
cachedStack := ep.elasticSearchClient.GetStackTraceByErrorHash(hash, payload.ProjectId)
var output symbolicator.SymbolicatedStackTrace
if cachedStack != nil {
output = cachedStack
} else {
command := &symbolicator.Command{
Cmd: symbolicatorCommand,
Args: args,
}
if len(frames) != 0 {
output, err = symbolicator.SymbolicatorClient(command)
}
for _, v := range invalidFrames {
output.Frames = append(output.Frames, symbolicator.SymbolicatedFrame{
Token: v.Filename,
OriginalLine: v.Lineno,
Lines: []string{"non symbolic code"},
Start: 0,
End: 0,
})
}
if err != nil {
ep.logger.Error("error occured in symobilicator client", zap.Error(err))
}
}
//creating es document
ep.logger.Info("processed document successfully saving it to elasticsearch", zap.String("hash", hash))
errorDocument := &es.ErrorDocument{