POC longpolling

This commit is contained in:
Cristian Nitescu 2022-07-28 16:56:01 +02:00
parent 865ff5c88d
commit e755d738b7
5 changed files with 148 additions and 20 deletions

2
go.mod
View file

@ -103,6 +103,7 @@ require (
github.com/go-playground/universal-translator v0.18.0 // indirect
github.com/go-playground/validator/v10 v10.10.0 // indirect
github.com/go-stack/stack v1.8.0 // indirect
github.com/gofrs/uuid v4.0.0+incompatible // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang-jwt/jwt/v4 v4.2.0 // indirect
github.com/golang/glog v0.0.0-20210429001901-424d2337a529 // indirect
@ -121,6 +122,7 @@ require (
github.com/jackc/pgproto3/v2 v2.2.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b // indirect
github.com/jackc/pgtype v1.9.1 // indirect
github.com/jcuga/golongpoll v1.3.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect

2
go.sum
View file

@ -480,6 +480,8 @@ github.com/jackc/puddle v1.1.3/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dv
github.com/jackc/puddle v1.2.0/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
github.com/jarcoal/httpmock v1.1.0 h1:F47ChZj1Y2zFsCXxNkBPwNNKnAyOATcdQibk0qEdVCE=
github.com/jarcoal/httpmock v1.1.0/go.mod h1:ATjnClrvW/3tijVmpL/va5Z3aAyGvqU3gCT8nX0Txik=
github.com/jcuga/golongpoll v1.3.0 h1:00lQC7C1a/4YcGnWcdWi1YzJYfat1Hal2+Cnlvgyado=
github.com/jcuga/golongpoll v1.3.0/go.mod h1:1ijFh83w68ylU44F+xSEyrXChP/7NnoAvgCVHWMggWA=
github.com/jhump/protoreflect v1.6.0 h1:h5jfMVslIg6l29nsMs0D8Wj17RDVdNYti0vDN/PZZoE=
github.com/jhump/protoreflect v1.6.0/go.mod h1:eaTn3RZAmMBcV0fifFvlm6VHNz3wSkYyXYWUh7ymB74=
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=

View file

@ -12,6 +12,8 @@ import (
"github.com/crowdsecurity/crowdsec/pkg/models"
"github.com/pkg/errors"
longpollClient "github.com/jcuga/golongpoll/client"
)
var (
@ -70,6 +72,37 @@ func NewClient(config *Config) (*ApiClient, error) {
return c, nil
}
func NewLongpollClient(config *Config) (*longpollClient.Client, error) {
t := &JWTTransport{
MachineID: &config.MachineID,
Password: &config.Password,
Scenarios: config.Scenarios,
URL: config.URL,
UserAgent: config.UserAgent,
VersionPrefix: config.VersionPrefix,
UpdateScenario: config.UpdateScenario,
}
tlsconfig := tls.Config{InsecureSkipVerify: InsecureSkipVerify}
if Cert != nil {
tlsconfig.RootCAs = CaCertPool
tlsconfig.Certificates = []tls.Certificate{*Cert}
}
http.DefaultTransport.(*http.Transport).TLSClientConfig = &tlsconfig
// TODO manage OnFailure
// subUrl, _ := url.Parse(config.URL.String() + "decisions/stream/poll")
// hardcode server as long as we're in test mode
subUrl, _ := url.Parse("http://127.0.0.1:8101/" + "decisions/stream/poll")
c, err := longpollClient.NewClient(longpollClient.ClientOptions{
SubscribeUrl: *subUrl,
Category: config.MachineID,
HttpClient: t.Client(),
PollTimeoutSeconds: 110,
})
return c, err
}
func NewDefaultClient(URL *url.URL, prefix string, userAgent string, client *http.Client) (*ApiClient, error) {
if client == nil {
client = &http.Client{}

View file

@ -2,6 +2,7 @@ package apiserver
import (
"context"
"encoding/json"
"fmt"
"net/url"
"strconv"
@ -21,6 +22,7 @@ import (
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
longpollClient "github.com/jcuga/golongpoll/client"
"gopkg.in/tomb.v2"
)
@ -28,6 +30,7 @@ var (
PullInterval = time.Hour * 2
PushInterval = time.Second * 30
MetricsInterval = time.Minute * 30
PollTimeout = time.Second * 110
)
var SCOPE_CAPI string = "CAPI"
@ -40,10 +43,12 @@ type apic struct {
metricsInterval time.Duration
dbClient *database.Client
apiClient *apiclient.ApiClient
longpollClient *longpollClient.Client
alertToPush chan []*models.Alert
mu sync.Mutex
pushTomb tomb.Tomb
pullTomb tomb.Tomb
longpollTomb tomb.Tomb
metricsTomb tomb.Tomb
startup bool
credentials *csconfig.ApiCredentialsCfg
@ -95,6 +100,7 @@ func NewAPIC(config *csconfig.OnlineApiClientCfg, dbClient *database.Client, con
startup: true,
credentials: config.Credentials,
pullTomb: tomb.Tomb{},
longpollTomb: tomb.Tomb{},
pushTomb: tomb.Tomb{},
metricsTomb: tomb.Tomb{},
scenarioList: make([]string, 0),
@ -122,7 +128,23 @@ func NewAPIC(config *csconfig.OnlineApiClientCfg, dbClient *database.Client, con
Scenarios: ret.scenarioList,
UpdateScenario: ret.FetchScenariosListFromDB,
})
return ret, err
if err != nil {
return nil, errors.Wrap(err, "creating api client")
}
// TODO configure polling url
ret.longpollClient, err = apiclient.NewLongpollClient(&apiclient.Config{
MachineID: config.Credentials.Login,
Password: password,
UserAgent: fmt.Sprintf("crowdsec/%s", cwversion.VersionStr()),
URL: apiURL,
VersionPrefix: "v2",
Scenarios: ret.scenarioList,
UpdateScenario: ret.FetchScenariosListFromDB,
})
if err != nil {
return nil, errors.Wrap(err, "creating polling client")
}
return ret, nil
}
// keep track of all alerts in cache and push it to CAPI every PushInterval.
@ -137,6 +159,7 @@ func (a *apic) Push() error {
select {
case <-a.pushTomb.Dying(): // if one apic routine is dying, do we kill the others?
a.pullTomb.Kill(nil)
a.longpollTomb.Kill(nil)
a.metricsTomb.Kill(nil)
log.Infof("push tomb is dying, sending cache (%d elements) before exiting", len(cache))
if len(cache) == 0 {
@ -400,25 +423,7 @@ func fillAlertsWithDecisions(alerts []*models.Alert, decisions []*models.Decisio
return alerts
}
//we receive only one list of decisions, that we need to break-up :
// one alert for "community blocklist"
// one alert per list we're subscribed to
func (a *apic) PullTop() error {
var err error
if lastPullIsOld, err := a.CAPIPullIsOld(); err != nil {
return err
} else if !lastPullIsOld {
return nil
}
data, _, err := a.apiClient.Decisions.GetStream(context.Background(), apiclient.DecisionsStreamOpts{Startup: a.startup})
if err != nil {
return errors.Wrap(err, "get stream")
}
a.startup = false
/*to count additions/deletions across lists*/
func (a *apic) HandleDecisionsStream(data *models.DecisionsStreamResponse) error {
add_counters, delete_counters := makeAddAndDeleteCounters()
// process deleted decisions
if nbDeleted, err := a.HandleDeletedDecisions(data.Deleted, delete_counters); err != nil {
@ -450,6 +455,28 @@ func (a *apic) PullTop() error {
return nil
}
//we receive only one list of decisions, that we need to break-up :
// one alert for "community blocklist"
// one alert per list we're subscribed to
func (a *apic) PullTop() error {
var err error
if lastPullIsOld, err := a.CAPIPullIsOld(); err != nil {
return err
} else if !lastPullIsOld {
return nil
}
data, _, err := a.apiClient.Decisions.GetStream(context.Background(), apiclient.DecisionsStreamOpts{Startup: a.startup})
if err != nil {
return errors.Wrap(err, "get stream")
}
a.startup = false
/*to count additions/deletions across lists*/
return a.HandleDecisionsStream(data)
}
func setAlertScenario(add_counters map[string]map[string]int, delete_counters map[string]map[string]int, alert *models.Alert) *models.Alert {
if *alert.Source.Scope == SCOPE_CAPI {
*alert.Source.Scope = SCOPE_CAPI_ALIAS
@ -494,6 +521,61 @@ func (a *apic) Pull() error {
case <-a.pullTomb.Dying(): // if one apic routine is dying, do we kill the others?
a.metricsTomb.Kill(nil)
a.pushTomb.Kill(nil)
a.longpollTomb.Kill(nil)
return nil
}
}
}
func (a *apic) LongPoll() error {
defer types.CatchPanic("lapi/longpollFromAPIC")
log.Infof("Start longpoll from CrowdSec Central API (timeout: %s)", PollTimeout)
toldOnce := false
for {
scenario, err := a.FetchScenariosListFromDB()
if err != nil {
log.Errorf("unable to fetch scenarios from db: %s", err)
}
if len(scenario) > 0 {
break
}
if !toldOnce {
log.Warning("scenario list is empty, will not pull yet")
toldOnce = true
}
time.Sleep(1 * time.Second)
}
// TODO start since the last event timestamp
pollChannel := a.longpollClient.Start(time.Now())
for {
select {
case event := <-pollChannel:
// get the decision
decision := models.Decision{}
stringData, err := json.Marshal(event.Data)
if err != nil {
log.Errorf("marshal error: %s", err)
continue
}
json.Unmarshal(stringData, &decision)
// do something with the decision
// TODO decision should have a type new/deleted
data := models.DecisionsStreamResponse{Deleted: make([]*models.Decision, 0, 1), New: make([]*models.Decision, 0, 1)}
data.New = append(data.New, &decision)
err = a.HandleDecisionsStream(&data)
if err != nil {
log.Errorf("handle stream error: %s", err)
continue
}
// TODO store last event timestamp
case <-a.pullTomb.Dying(): // if one apic routine is dying, do we kill the others?
a.metricsTomb.Kill(nil)
a.pushTomb.Kill(nil)
a.pullTomb.Kill(nil)
return nil
}
}
@ -570,6 +652,7 @@ func (a *apic) SendMetrics() error {
}
case <-a.metricsTomb.Dying(): // if one apic routine is dying, do we kill the others?
a.pullTomb.Kill(nil)
a.longpollTomb.Kill(nil)
a.pushTomb.Kill(nil)
return nil
}
@ -579,6 +662,7 @@ func (a *apic) SendMetrics() error {
func (a *apic) Shutdown() {
a.pushTomb.Kill(nil)
a.pullTomb.Kill(nil)
a.longpollTomb.Kill(nil)
a.metricsTomb.Kill(nil)
}

View file

@ -311,6 +311,13 @@ func (s *APIServer) Run(apiReady chan bool) error {
}
return nil
})
s.apic.longpollTomb.Go(func() error {
if err := s.apic.LongPoll(); err != nil {
log.Errorf("capi longpoll: %s", err)
return err
}
return nil
})
s.apic.metricsTomb.Go(func() error {
if err := s.apic.SendMetrics(); err != nil {
log.Errorf("capi metrics: %s", err)