diff --git a/pkg/apiserver/apic.go b/pkg/apiserver/apic.go index c56df04c0..28a1a3d5f 100644 --- a/pkg/apiserver/apic.go +++ b/pkg/apiserver/apic.go @@ -151,8 +151,8 @@ func (a *apic) Push() error { if len(cache) == 0 { return nil } - err := a.Send(&cache) - return err + go a.Send(&cache) + return nil case <-ticker.C: if len(cache) > 0 { a.mu.Lock() @@ -160,15 +160,7 @@ func (a *apic) Push() error { cache = make(models.AddSignalsRequest, 0) a.mu.Unlock() log.Infof("Signal push: %d signals to push", len(cacheCopy)) - err := a.Send(&cacheCopy) - if err != nil { - log.Errorf("while sending signal to Central API : %s", err) - log.Debugf("dump: %+v", cacheCopy) - /* - even in case of error, we don't want to return here, or we need to kill everything. - this go-routine is in charge of pushing the signals to LAPI and is emptying the CAPIChan - */ - } + go a.Send(&cacheCopy) } case alerts := <-a.alertToPush: var signals []*models.AddSignalsRequestItem @@ -182,7 +174,7 @@ func (a *apic) Push() error { } } -func (a *apic) Send(cache *models.AddSignalsRequest) error { +func (a *apic) Send(cacheOrig *models.AddSignalsRequest) { /*we do have a problem with this : The apic.Push background routine reads from alertToPush chan. This chan is filled by Controller.CreateAlert @@ -194,10 +186,37 @@ func (a *apic) Send(cache *models.AddSignalsRequest) error { I don't know enough about gin to tell how much of an issue it can be. */ - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - _, _, err := a.apiClient.Signal.Add(ctx, cache) - return err + var cache []*models.AddSignalsRequestItem = *cacheOrig + var send models.AddSignalsRequest + + bulkSize := 50 + pageStart := 0 + pageEnd := bulkSize + + for { + + if pageEnd >= len(cache) { + send = cache[pageStart:] + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _, _, err := a.apiClient.Signal.Add(ctx, &send) + if err != nil { + log.Errorf("Error while sending final chunk to central API : %s", err) + return + } + break + } + send = cache[pageStart:pageEnd] + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _, _, err := a.apiClient.Signal.Add(ctx, &send) + if err != nil { + //we log it here as well, because the return value of func might be discarded + log.Errorf("Error while sending chunk to central API : %s", err) + } + pageStart += bulkSize + pageEnd += bulkSize + } } func (a *apic) PullTop() error {