diff --git a/prober/prober.go b/prober/prober.go index 60929b32983..4ff4832db3f 100644 --- a/prober/prober.go +++ b/prober/prober.go @@ -1,4 +1,4 @@ -// Copyright 2017-2019 The Cloudprober Authors. +// Copyright 2017-2023 The Cloudprober Authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -49,6 +49,8 @@ import ( "google.golang.org/grpc/status" ) +var randGenerator = rand.New(rand.NewSource(time.Now().UnixNano())) + // Prober represents a collection of probes where each probe implements the Probe interface. type Prober struct { Probes map[string]*probes.ProbeInfo @@ -266,36 +268,58 @@ func (pr *Prober) startProbe(ctx context.Context, name string) { go pr.Probes[name].Start(probeCtx, pr.dataChan) } -// startProbesWithJitter try to space out probes over time, as much as possible, -// without making it too complicated. We arrange probes into interval buckets - -// all probes with the same interval will be part of the same bucket, and we -// then spread out probes within that interval by introducing a delay of -// interval / len(probes) between probes. We also introduce a random jitter -// between different interval buckets. -func (pr *Prober) startProbesWithJitter(ctx context.Context) { - // Seed random number generator. - rand.Seed(time.Now().UnixNano()) +func randomDuration(duration, ceiling time.Duration) time.Duration { + if duration == 0 { + return 0 + } + if duration > ceiling { + duration = ceiling + } + return time.Duration(randGenerator.Int63n(duration.Milliseconds())) * time.Millisecond +} +// interProbeWait returns the wait time between probes. It's not beneficial for +// this interval to be too large, so we cap it at 2 seconds. +func interProbeWait(interval time.Duration, numProbes int) time.Duration { + d := interval / time.Duration(numProbes) + if d > 2*time.Second { + return 2 * time.Second + } + return d +} + +// startProbesWithJitter try to space out probes over time, as much as +// possible, without making it too complicated. +// +// We arrange probes into interval buckets - all probes with the same interval +// will be part of the same bucket. We spread out probes within an interval, +// and also the overall interval-buckets themselves. +// +// [probe1 probe2 probe3 ...] interval1 (30s) +// [probe4 probe5 ...] interval2 (10s) +// [probe6 probe7 ...] interval3 (1m) +func (pr *Prober) startProbesWithJitter(ctx context.Context) { // Make interval -> [probe1, probe2, probe3..] map intervalBuckets := make(map[time.Duration][]*probes.ProbeInfo) for _, p := range pr.Probes { intervalBuckets[p.Options.Interval] = append(intervalBuckets[p.Options.Interval], p) } + iter := 0 for interval, probeInfos := range intervalBuckets { - go func(interval time.Duration, probeInfos []*probes.ProbeInfo) { - // Introduce a random jitter between interval buckets. - randomDelayMsec := rand.Int63n(int64(interval.Seconds() * 1000)) - time.Sleep(time.Duration(randomDelayMsec) * time.Millisecond) - - interProbeDelay := interval / time.Duration(len(probeInfos)) + // Note that we introduce jitter within the goroutine instead of in the + // loop here. This is to make sure this function returns quickly. + go func(interval time.Duration, probeInfos []*probes.ProbeInfo, iter int) { + if iter > 0 { + time.Sleep(randomDuration(interval, 10*time.Second)) + } - // Spread out probes evenly with an interval bucket. for _, p := range probeInfos { pr.l.Info("Starting probe: ", p.Name) go pr.startProbe(ctx, p.Name) - time.Sleep(interProbeDelay) + time.Sleep(interProbeWait(interval, len(probeInfos))) } - }(interval, probeInfos) + }(interval, probeInfos, iter) + iter++ } } diff --git a/prober/prober_test.go b/prober/prober_test.go new file mode 100644 index 00000000000..f5921852f3b --- /dev/null +++ b/prober/prober_test.go @@ -0,0 +1,74 @@ +// Copyright 2023 The Cloudprober Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package prober + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestRandomDuration(t *testing.T) { + tests := []struct { + duration time.Duration + ceiling time.Duration + }{ + { + duration: 0, + ceiling: 10 * time.Second, + }, + { + duration: 5 * time.Second, + ceiling: 10 * time.Second, + }, + { + duration: 30 * time.Second, + ceiling: 10 * time.Second, + }, + } + for _, tt := range tests { + t.Run(fmt.Sprintf("%v", tt), func(t *testing.T) { + got := randomDuration(tt.duration, tt.ceiling) + assert.LessOrEqual(t, got, tt.duration) + assert.LessOrEqual(t, got, tt.ceiling) + }) + } +} + +func TestInterProbeWait(t *testing.T) { + tests := []struct { + interval time.Duration + numProbes int + want time.Duration + }{ + { + interval: 2 * time.Second, + numProbes: 16, + want: 125 * time.Millisecond, + }, + { + interval: 30 * time.Second, + numProbes: 12, + want: 2 * time.Second, + }, + } + for _, tt := range tests { + t.Run(fmt.Sprintf("%s:%d", tt.interval, tt.numProbes), func(t *testing.T) { + assert.Equal(t, tt.want, interProbeWait(tt.interval, tt.numProbes)) + }) + } +} diff --git a/probes/http/http.go b/probes/http/http.go index cd732878bc6..4af49cd4575 100644 --- a/probes/http/http.go +++ b/probes/http/http.go @@ -551,6 +551,7 @@ func (p *Probe) updateTargetsAndStartProbes(ctx context.Context, dataChan chan * gapBetweenTargets := p.gapBetweenTargets() var startWaitTime time.Duration + iter := 0 // Start probe loop for new targets. for key, target := range activeTargets { // This target is already initialized. @@ -562,12 +563,20 @@ func (p *Probe) updateTargetsAndStartProbes(ctx context.Context, dataChan chan * probeCtx, cancelF := context.WithCancel(ctx) p.waitGroup.Add(1) - go func(target endpoint.Endpoint, waitTime time.Duration) { + go func(target endpoint.Endpoint, waitTime time.Duration, iter int) { defer p.waitGroup.Done() - // Wait for wait time + some jitter before starting this probe loop. - time.Sleep(waitTime + time.Duration(rand.Int63n(gapBetweenTargets.Microseconds()/10))*time.Microsecond) + + // To evenly spread out target probes, wait for a random duration + // before starting the target go-routine. + if iter > 0 { + // For random padding using 1/10th of the gap. + jitterUsec := rand.Int63n(gapBetweenTargets.Microseconds() / 10) + time.Sleep(waitTime + time.Duration(jitterUsec)*time.Microsecond) + } + p.startForTarget(probeCtx, target, dataChan) - }(target, startWaitTime) + }(target, startWaitTime, iter) + iter++ startWaitTime += gapBetweenTargets