GetExprEnv usage optimization (#1515)
* avoid multiples calls to GetExprEnv * cache ExprEnv in node process * use global expression env * remove block profile rate
This commit is contained in:
parent
f1dbe8c9dd
commit
a49b023a28
|
@ -296,13 +296,15 @@ func PourItemToHolders(parsed types.Event, holders []BucketFactory, buckets *Buc
|
||||||
BucketPourCache["OK"] = append(BucketPourCache["OK"], evt.(types.Event))
|
BucketPourCache["OK"] = append(BucketPourCache["OK"], evt.(types.Event))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
cachedExprEnv := exprhelpers.GetExprEnv(map[string]interface{}{"evt": &parsed})
|
||||||
|
|
||||||
//find the relevant holders (scenarios)
|
//find the relevant holders (scenarios)
|
||||||
for idx, holder := range holders {
|
for idx, holder := range holders {
|
||||||
|
|
||||||
//evaluate bucket's condition
|
//evaluate bucket's condition
|
||||||
if holder.RunTimeFilter != nil {
|
if holder.RunTimeFilter != nil {
|
||||||
holder.logger.Tracef("event against holder %d/%d", idx, len(holders))
|
holder.logger.Tracef("event against holder %d/%d", idx, len(holders))
|
||||||
output, err := expr.Run(holder.RunTimeFilter, exprhelpers.GetExprEnv(map[string]interface{}{"evt": &parsed}))
|
output, err := expr.Run(holder.RunTimeFilter, cachedExprEnv)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
holder.logger.Errorf("failed parsing : %v", err)
|
holder.logger.Errorf("failed parsing : %v", err)
|
||||||
return false, fmt.Errorf("leaky failed : %s", err)
|
return false, fmt.Errorf("leaky failed : %s", err)
|
||||||
|
@ -314,7 +316,7 @@ func PourItemToHolders(parsed types.Event, holders []BucketFactory, buckets *Buc
|
||||||
}
|
}
|
||||||
|
|
||||||
if holder.Debug {
|
if holder.Debug {
|
||||||
holder.ExprDebugger.Run(holder.logger, condition, exprhelpers.GetExprEnv(map[string]interface{}{"evt": &parsed}))
|
holder.ExprDebugger.Run(holder.logger, condition, cachedExprEnv)
|
||||||
}
|
}
|
||||||
if !condition {
|
if !condition {
|
||||||
holder.logger.Debugf("Event leaving node : ko (filter mismatch)")
|
holder.logger.Debugf("Event leaving node : ko (filter mismatch)")
|
||||||
|
@ -325,7 +327,7 @@ func PourItemToHolders(parsed types.Event, holders []BucketFactory, buckets *Buc
|
||||||
//groupby determines the partition key for the specific bucket
|
//groupby determines the partition key for the specific bucket
|
||||||
var groupby string
|
var groupby string
|
||||||
if holder.RunTimeGroupBy != nil {
|
if holder.RunTimeGroupBy != nil {
|
||||||
tmpGroupBy, err := expr.Run(holder.RunTimeGroupBy, exprhelpers.GetExprEnv(map[string]interface{}{"evt": &parsed}))
|
tmpGroupBy, err := expr.Run(holder.RunTimeGroupBy, cachedExprEnv)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
holder.logger.Errorf("failed groupby : %v", err)
|
holder.logger.Errorf("failed groupby : %v", err)
|
||||||
return false, errors.New("leaky failed :/")
|
return false, errors.New("leaky failed :/")
|
||||||
|
|
|
@ -106,15 +106,17 @@ func (n *Node) validate(pctx *UnixParserCtx, ectx EnricherCtx) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Node) process(p *types.Event, ctx UnixParserCtx) (bool, error) {
|
func (n *Node) process(p *types.Event, ctx UnixParserCtx, expressionEnv map[string]interface{}) (bool, error) {
|
||||||
var NodeState bool
|
var NodeState bool
|
||||||
var NodeHasOKGrok bool
|
var NodeHasOKGrok bool
|
||||||
clog := n.Logger
|
clog := n.Logger
|
||||||
|
|
||||||
|
cachedExprEnv := expressionEnv
|
||||||
|
|
||||||
clog.Tracef("Event entering node")
|
clog.Tracef("Event entering node")
|
||||||
if n.RunTimeFilter != nil {
|
if n.RunTimeFilter != nil {
|
||||||
//Evaluate node's filter
|
//Evaluate node's filter
|
||||||
output, err := expr.Run(n.RunTimeFilter, exprhelpers.GetExprEnv(map[string]interface{}{"evt": p}))
|
output, err := expr.Run(n.RunTimeFilter, cachedExprEnv)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
clog.Warningf("failed to run filter : %v", err)
|
clog.Warningf("failed to run filter : %v", err)
|
||||||
clog.Debugf("Event leaving node : ko")
|
clog.Debugf("Event leaving node : ko")
|
||||||
|
@ -124,7 +126,7 @@ func (n *Node) process(p *types.Event, ctx UnixParserCtx) (bool, error) {
|
||||||
switch out := output.(type) {
|
switch out := output.(type) {
|
||||||
case bool:
|
case bool:
|
||||||
if n.Debug {
|
if n.Debug {
|
||||||
n.ExprDebugger.Run(clog, out, exprhelpers.GetExprEnv(map[string]interface{}{"evt": p}))
|
n.ExprDebugger.Run(clog, out, cachedExprEnv)
|
||||||
}
|
}
|
||||||
if !out {
|
if !out {
|
||||||
clog.Debugf("Event leaving node : ko (failed filter)")
|
clog.Debugf("Event leaving node : ko (failed filter)")
|
||||||
|
@ -188,7 +190,7 @@ func (n *Node) process(p *types.Event, ctx UnixParserCtx) (bool, error) {
|
||||||
}
|
}
|
||||||
/* run whitelist expression tests anyway */
|
/* run whitelist expression tests anyway */
|
||||||
for eidx, e := range n.Whitelist.B_Exprs {
|
for eidx, e := range n.Whitelist.B_Exprs {
|
||||||
output, err := expr.Run(e.Filter, exprhelpers.GetExprEnv(map[string]interface{}{"evt": p}))
|
output, err := expr.Run(e.Filter, cachedExprEnv)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
clog.Warningf("failed to run whitelist expr : %v", err)
|
clog.Warningf("failed to run whitelist expr : %v", err)
|
||||||
clog.Debugf("Event leaving node : ko")
|
clog.Debugf("Event leaving node : ko")
|
||||||
|
@ -197,7 +199,7 @@ func (n *Node) process(p *types.Event, ctx UnixParserCtx) (bool, error) {
|
||||||
switch out := output.(type) {
|
switch out := output.(type) {
|
||||||
case bool:
|
case bool:
|
||||||
if n.Debug {
|
if n.Debug {
|
||||||
e.ExprDebugger.Run(clog, out, exprhelpers.GetExprEnv(map[string]interface{}{"evt": p}))
|
e.ExprDebugger.Run(clog, out, cachedExprEnv)
|
||||||
}
|
}
|
||||||
if out {
|
if out {
|
||||||
clog.Debugf("Event is whitelisted by expr, reason [%s]", n.Whitelist.Reason)
|
clog.Debugf("Event is whitelisted by expr, reason [%s]", n.Whitelist.Reason)
|
||||||
|
@ -238,7 +240,7 @@ func (n *Node) process(p *types.Event, ctx UnixParserCtx) (bool, error) {
|
||||||
NodeState = false
|
NodeState = false
|
||||||
}
|
}
|
||||||
} else if n.Grok.RunTimeValue != nil {
|
} else if n.Grok.RunTimeValue != nil {
|
||||||
output, err := expr.Run(n.Grok.RunTimeValue, exprhelpers.GetExprEnv(map[string]interface{}{"evt": p}))
|
output, err := expr.Run(n.Grok.RunTimeValue, cachedExprEnv)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
clog.Warningf("failed to run RunTimeValue : %v", err)
|
clog.Warningf("failed to run RunTimeValue : %v", err)
|
||||||
NodeState = false
|
NodeState = false
|
||||||
|
@ -285,7 +287,7 @@ func (n *Node) process(p *types.Event, ctx UnixParserCtx) (bool, error) {
|
||||||
//Iterate on leafs
|
//Iterate on leafs
|
||||||
if len(n.LeavesNodes) > 0 {
|
if len(n.LeavesNodes) > 0 {
|
||||||
for _, leaf := range n.LeavesNodes {
|
for _, leaf := range n.LeavesNodes {
|
||||||
ret, err := leaf.process(p, ctx)
|
ret, err := leaf.process(p, ctx, cachedExprEnv)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
clog.Tracef("\tNode (%s) failed : %v", leaf.rn, err)
|
clog.Tracef("\tNode (%s) failed : %v", leaf.rn, err)
|
||||||
clog.Debugf("Event leaving node : ko")
|
clog.Debugf("Event leaving node : ko")
|
||||||
|
|
|
@ -111,12 +111,14 @@ func (n *Node) ProcessStatics(statics []types.ExtraField, event *types.Event) er
|
||||||
var value string
|
var value string
|
||||||
clog := n.Logger
|
clog := n.Logger
|
||||||
|
|
||||||
|
cachedExprEnv := exprhelpers.GetExprEnv(map[string]interface{}{"evt": event})
|
||||||
|
|
||||||
for _, static := range statics {
|
for _, static := range statics {
|
||||||
value = ""
|
value = ""
|
||||||
if static.Value != "" {
|
if static.Value != "" {
|
||||||
value = static.Value
|
value = static.Value
|
||||||
} else if static.RunTimeValue != nil {
|
} else if static.RunTimeValue != nil {
|
||||||
output, err := expr.Run(static.RunTimeValue, exprhelpers.GetExprEnv(map[string]interface{}{"evt": event}))
|
output, err := expr.Run(static.RunTimeValue, cachedExprEnv)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
clog.Warningf("failed to run RunTimeValue : %v", err)
|
clog.Warningf("failed to run RunTimeValue : %v", err)
|
||||||
continue
|
continue
|
||||||
|
@ -255,6 +257,8 @@ func Parse(ctx UnixParserCtx, xp types.Event, nodes []Node) (types.Event, error)
|
||||||
log.Tracef("INPUT '%s'", event.Line.Raw)
|
log.Tracef("INPUT '%s'", event.Line.Raw)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
cachedExprEnv := exprhelpers.GetExprEnv(map[string]interface{}{"evt": &event})
|
||||||
|
|
||||||
if ParseDump {
|
if ParseDump {
|
||||||
if StageParseCache == nil {
|
if StageParseCache == nil {
|
||||||
StageParseCache = make(map[string]map[string][]ParserResult)
|
StageParseCache = make(map[string]map[string][]ParserResult)
|
||||||
|
@ -298,7 +302,7 @@ func Parse(ctx UnixParserCtx, xp types.Event, nodes []Node) (types.Event, error)
|
||||||
if ctx.Profiling {
|
if ctx.Profiling {
|
||||||
node.Profiling = true
|
node.Profiling = true
|
||||||
}
|
}
|
||||||
ret, err := node.process(&event, ctx)
|
ret, err := node.process(&event, ctx, cachedExprEnv)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
clog.Fatalf("Error while processing node : %v", err)
|
clog.Fatalf("Error while processing node : %v", err)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue