revert decision dedup behavior to 1.3.4 (#1675)

* revert decision dedup behavior to 1.3.4
This commit is contained in:
Thibault "bui" Koechlin 2022-07-22 11:20:10 +02:00 committed by GitHub
parent 3adb90e7b7
commit 0eea20fa7c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 111 additions and 1183 deletions

File diff suppressed because it is too large Load diff

View file

@ -22,19 +22,16 @@ type DecisionsByScenario struct {
Type string
}
func BuildDecisionRequestWithFilter(query *ent.DecisionQuery, filter map[string][]string) (*ent.DecisionQuery, []*sql.Predicate, error) {
func BuildDecisionRequestWithFilter(query *ent.DecisionQuery, filter map[string][]string) (*ent.DecisionQuery, error) {
var err error
var start_ip, start_sfx, end_ip, end_sfx int64
var ip_sz int
var contains bool = true
/*if contains is true, return bans that *contains* the given value (value is the inner)
else, return bans that are *contained* by the given value (value is the outer)*/
// contains == true -> return bans that *contain* the given value (value is the inner)
// contains == false or missing -> return bans *contained* in the given value (value is the outer)
// simulated == true -> include simulated rows
// simulated == false or missing -> exclude simulated rows
/*the simulated filter is a bit different : if it's not present *or* set to false, specifically exclude records with simulated to true */
if v, ok := filter["simulated"]; ok {
if v[0] == "false" {
query = query.Where(decision.SimulatedEQ(false))
@ -43,14 +40,13 @@ func BuildDecisionRequestWithFilter(query *ent.DecisionQuery, filter map[string]
} else {
query = query.Where(decision.SimulatedEQ(false))
}
t := sql.Table(decision.Table)
joinPredicate := make([]*sql.Predicate, 0)
for param, value := range filter {
switch param {
case "contains":
contains, err = strconv.ParseBool(value[0])
if err != nil {
return nil, nil, errors.Wrapf(InvalidFilter, "invalid contains value : %s", err)
return nil, errors.Wrapf(InvalidFilter, "invalid contains value : %s", err)
}
case "scopes":
scopes := strings.Split(value[0], ",")
@ -75,24 +71,9 @@ func BuildDecisionRequestWithFilter(query *ent.DecisionQuery, filter map[string]
query = query.Where(
decision.OriginIn(strings.Split(value[0], ",")...),
)
origins := strings.Split(value[0], ",")
originsContainsPredicate := make([]*sql.Predicate, 0)
for _, origin := range origins {
pred := sql.EqualFold(t.C(decision.FieldOrigin), origin)
originsContainsPredicate = append(originsContainsPredicate, pred)
}
joinPredicate = append(joinPredicate, sql.Or(originsContainsPredicate...))
case "scenarios_containing":
predicates := decisionPredicatesFromStr(value[0], decision.ScenarioContainsFold)
query = query.Where(decision.Or(predicates...))
scenarios := strings.Split(value[0], ",")
scenariosContainsPredicate := make([]*sql.Predicate, 0)
for _, scenario := range scenarios {
pred := sql.ContainsFold(t.C(decision.FieldScenario), scenario)
scenariosContainsPredicate = append(scenariosContainsPredicate, pred)
}
joinPredicate = append(joinPredicate, sql.Or(scenariosContainsPredicate...))
case "scenarios_not_containing":
predicates := decisionPredicatesFromStr(value[0], decision.ScenarioContainsFold)
query = query.Where(decision.Not(
@ -100,26 +81,79 @@ func BuildDecisionRequestWithFilter(query *ent.DecisionQuery, filter map[string]
predicates...,
),
))
scenarios := strings.Split(value[0], ",")
scenariosContainsPredicate := make([]*sql.Predicate, 0)
for _, scenario := range scenarios {
pred := sql.ContainsFold(t.C(decision.FieldScenario), scenario)
scenariosContainsPredicate = append(scenariosContainsPredicate, sql.Not(pred))
}
joinPredicate = append(joinPredicate, sql.Or(scenariosContainsPredicate...))
case "ip", "range":
ip_sz, start_ip, start_sfx, end_ip, end_sfx, err = types.Addr2Ints(value[0])
if err != nil {
return nil, nil, errors.Wrapf(InvalidIPOrRange, "unable to convert '%s' to int: %s", value[0], err)
return nil, errors.Wrapf(InvalidIPOrRange, "unable to convert '%s' to int: %s", value[0], err)
}
}
}
query, err = applyStartIpEndIpFilter(query, contains, ip_sz, start_ip, start_sfx, end_ip, end_sfx)
if err != nil {
return nil, nil, errors.Wrapf(err, "fail to apply StartIpEndIpFilter")
return nil, errors.Wrapf(err, "fail to apply StartIpEndIpFilter")
}
return query, joinPredicate, nil
return query, nil
}
func (c *Client) QueryAllDecisionsWithFilters(filters map[string][]string) ([]*ent.Decision, error) {
query := c.Ent.Decision.Query().Where(
decision.UntilGT(time.Now().UTC()),
longestDecisionForScopeTypeValue,
)
query, err := BuildDecisionRequestWithFilter(query, filters)
if err != nil {
c.Log.Warningf("QueryAllDecisionsWithFilters : %s", err)
return []*ent.Decision{}, errors.Wrap(QueryFail, "get all decisions with filters")
}
data, err := query.All(c.CTX)
if err != nil {
c.Log.Warningf("QueryAllDecisionsWithFilters : %s", err)
return []*ent.Decision{}, errors.Wrap(QueryFail, "get all decisions with filters")
}
return data, nil
}
func (c *Client) QueryExpiredDecisionsWithFilters(filters map[string][]string) ([]*ent.Decision, error) {
query := c.Ent.Decision.Query().Where(
decision.UntilLT(time.Now().UTC()),
longestDecisionForScopeTypeValue,
)
query, err := BuildDecisionRequestWithFilter(query, filters)
if err != nil {
c.Log.Warningf("QueryExpiredDecisionsWithFilters : %s", err)
return []*ent.Decision{}, errors.Wrap(QueryFail, "get expired decisions with filters")
}
data, err := query.All(c.CTX)
if err != nil {
c.Log.Warningf("QueryExpiredDecisionsWithFilters : %s", err)
return []*ent.Decision{}, errors.Wrap(QueryFail, "expired decisions")
}
return data, nil
}
func (c *Client) QueryDecisionCountByScenario(filters map[string][]string) ([]*DecisionsByScenario, error) {
query := c.Ent.Decision.Query().Where(
decision.UntilGT(time.Now().UTC()),
)
query, err := BuildDecisionRequestWithFilter(query, filters)
if err != nil {
c.Log.Warningf("QueryDecisionCountByScenario : %s", err)
return nil, errors.Wrap(QueryFail, "count all decisions with filters")
}
var r []*DecisionsByScenario
err = query.GroupBy(decision.FieldScenario, decision.FieldOrigin, decision.FieldType).Aggregate(ent.Count()).Scan(c.CTX, &r)
if err != nil {
c.Log.Warningf("QueryDecisionCountByScenario : %s", err)
return nil, errors.Wrap(QueryFail, "count all decisions with filters")
}
return r, nil
}
func (c *Client) QueryDecisionWithFilter(filter map[string][]string) ([]*ent.Decision, error) {
@ -129,7 +163,7 @@ func (c *Client) QueryDecisionWithFilter(filter map[string][]string) ([]*ent.Dec
decisions := c.Ent.Decision.Query().
Where(decision.UntilGTE(time.Now().UTC()))
decisions, _, err = BuildDecisionRequestWithFilter(decisions, filter)
decisions, err = BuildDecisionRequestWithFilter(decisions, filter)
if err != nil {
return []*ent.Decision{}, err
}
@ -153,141 +187,67 @@ func (c *Client) QueryDecisionWithFilter(filter map[string][]string) ([]*ent.Dec
return data, nil
}
func (c *Client) QueryAllDecisionsWithFilters(filters map[string][]string) ([]*ent.Decision, error) {
query := c.Ent.Decision.Query().Where(
decision.UntilGT(time.Now().UTC()),
// ent translation of https://stackoverflow.com/a/28090544
func longestDecisionForScopeTypeValue(s *sql.Selector) {
t := sql.Table(decision.Table)
s.LeftJoin(t).OnP(sql.And(
sql.ColumnsEQ(
t.C(decision.FieldValue),
s.C(decision.FieldValue),
),
sql.ColumnsEQ(
t.C(decision.FieldType),
s.C(decision.FieldType),
),
sql.ColumnsEQ(
t.C(decision.FieldScope),
s.C(decision.FieldScope),
),
sql.ColumnsGT(
t.C(decision.FieldUntil),
s.C(decision.FieldUntil),
),
))
s.Where(
sql.IsNull(
t.C(decision.FieldUntil),
),
)
query, _, err := BuildDecisionRequestWithFilter(query, filters)
if err != nil {
c.Log.Warningf("QueryAllDecisionsWithFilters : %s", err)
return []*ent.Decision{}, errors.Wrap(QueryFail, "get all decisions with filters")
}
//Order is *very* important, the dedup assumes that decisions are sorted per IP and per time left
data, err := query.Order(ent.Asc(decision.FieldValue), ent.Desc(decision.FieldUntil)).All(c.CTX)
if err != nil {
c.Log.Warningf("QueryAllDecisionsWithFilters : %s", err)
return []*ent.Decision{}, errors.Wrap(QueryFail, "get all decisions with filters")
}
return data, nil
}
func (c *Client) QueryDecisionCountByScenario(filters map[string][]string) ([]*DecisionsByScenario, error) {
query := c.Ent.Decision.Query().Where(
decision.UntilGT(time.Now().UTC()),
)
query, _, err := BuildDecisionRequestWithFilter(query, filters)
if err != nil {
c.Log.Warningf("QueryDecisionCountByScenario : %s", err)
return nil, errors.Wrap(QueryFail, "count all decisions with filters")
}
var r []*DecisionsByScenario
err = query.GroupBy(decision.FieldScenario, decision.FieldOrigin, decision.FieldType).Aggregate(ent.Count()).Scan(c.CTX, &r)
if err != nil {
c.Log.Warningf("QueryDecisionCountByScenario : %s", err)
return nil, errors.Wrap(QueryFail, "count all decisions with filters")
}
return r, nil
}
func (c *Client) QueryExpiredDecisionsWithFilters(filters map[string][]string) ([]*ent.Decision, error) {
now := time.Now().UTC()
func (c *Client) QueryExpiredDecisionsSinceWithFilters(since time.Time, filters map[string][]string) ([]*ent.Decision, error) {
query := c.Ent.Decision.Query().Where(
decision.UntilLT(time.Now().UTC()),
decision.UntilGT(since),
longestDecisionForScopeTypeValue,
)
query, predicates, err := BuildDecisionRequestWithFilter(query, filters)
query, err := BuildDecisionRequestWithFilter(query, filters)
if err != nil {
c.Log.Warningf("QueryExpiredDecisionsWithFilters : %s", err)
return []*ent.Decision{}, errors.Wrap(QueryFail, "get expired decisions with filters")
c.Log.Warningf("QueryExpiredDecisionsSinceWithFilters : %s", err)
return []*ent.Decision{}, errors.Wrap(QueryFail, "expired decisions with filters")
}
query = query.Where(func(s *sql.Selector) {
t := sql.Table(decision.Table).As("t1")
subQuery := sql.Select(t.C(decision.FieldValue)).From(t).Where(sql.GT(t.C(decision.FieldUntil), now))
for _, predicate := range predicates {
subQuery.Where(predicate)
}
subQuery.Where(sql.And(
sql.ColumnsEQ(t.C(decision.FieldType), s.C(decision.FieldType)),
sql.ColumnsEQ(t.C(decision.FieldScope), s.C(decision.FieldScope)),
))
s.Where(
sql.NotIn(
s.C(decision.FieldValue),
subQuery,
),
)
})
data, err := query.Order(ent.Asc(decision.FieldValue), ent.Desc(decision.FieldUntil)).All(c.CTX)
data, err := query.All(c.CTX)
if err != nil {
c.Log.Warningf("QueryExpiredDecisionsWithFilters : %s", err)
return []*ent.Decision{}, errors.Wrap(QueryFail, "expired decisions")
c.Log.Warningf("QueryExpiredDecisionsSinceWithFilters : %s", err)
return []*ent.Decision{}, errors.Wrap(QueryFail, "expired decisions with filters")
}
return data, nil
}
//The "dedup" is not performed in SQL here because we suck at it, we do it in Go:
// - Get all decisions (expired or not) with an end time after the last pull from the bouncer
// - Sort them by increasing expiration date
// - Iterate over them, keeping only decisions that have expired but only if we don't have an active decision with the same scope/value/type
func (c *Client) QueryExpiredDecisionsSinceWithFilters(since time.Time, filters map[string][]string) ([]*ent.Decision, error) {
now := time.Now().UTC()
query := c.Ent.Decision.Query().Where(
decision.UntilGT(since),
)
query, _, err := BuildDecisionRequestWithFilter(query, filters)
if err != nil {
c.Log.Warningf("QueryExpiredDecisionsSinceWithFilters : %s", err)
return []*ent.Decision{}, errors.Wrap(QueryFail, "expired decisions with filters")
}
data, err := query.Order(ent.Asc(decision.FieldValue), ent.Asc(decision.FieldUntil)).All(c.CTX)
if err != nil {
c.Log.Warningf("QueryExpiredDecisionsSinceWithFilters : %s", err)
return []*ent.Decision{}, errors.Wrap(QueryFail, "expired decisions with filters")
}
ret := make([]*ent.Decision, 0)
deletedDecisions := make(map[string]*ent.Decision)
for _, d := range data {
key := fmt.Sprintf("%s:%s:%s", d.Scope, d.Type, d.Value)
if d.Until.Before(now) {
deletedDecisions[key] = d
}
if d.Until.After(now) {
delete(deletedDecisions, key)
}
}
for _, d := range deletedDecisions {
ret = append(ret, d)
}
return ret, nil
}
func (c *Client) QueryNewDecisionsSinceWithFilters(since time.Time, filters map[string][]string) ([]*ent.Decision, error) {
query := c.Ent.Decision.Query().Where(
decision.CreatedAtGT(since),
decision.UntilGT(time.Now().UTC()),
longestDecisionForScopeTypeValue,
)
query, _, err := BuildDecisionRequestWithFilter(query, filters)
query, err := BuildDecisionRequestWithFilter(query, filters)
if err != nil {
c.Log.Warningf("BuildDecisionRequestWithFilter : %s", err)
return []*ent.Decision{}, errors.Wrap(QueryFail, "expired decisions with filters")
c.Log.Warningf("QueryNewDecisionsSinceWithFilters : %s", err)
return []*ent.Decision{}, errors.Wrapf(QueryFail, "new decisions since '%s'", since.String())
}
//Order is *very* important, the dedup assumes that decisions are sorted per IP and per time left
data, err := query.Order(ent.Asc(decision.FieldValue), ent.Desc(decision.FieldUntil)).All(c.CTX)
data, err := query.All(c.CTX)
if err != nil {
c.Log.Warningf("QueryNewDecisionsSinceWithFilters : %s", err)
return []*ent.Decision{}, errors.Wrapf(QueryFail, "new decisions since '%s'", since.String())
@ -335,6 +295,7 @@ func (c *Client) DeleteDecisionsWithFilter(filter map[string][]string) (string,
return "0", errors.Wrap(InvalidFilter, fmt.Sprintf("'%s' doesn't exist", param))
}
}
if ip_sz == 4 {
if contains { /*decision contains {start_ip,end_ip}*/
decisions = decisions.Where(decision.And(