Skip to content

Commit

Permalink
[prober] Start probing more quickly (cloudprober#382)
Browse files Browse the repository at this point in the history
This will make sure probes start as quickly as possible.
  • Loading branch information
manugarg authored May 12, 2023
1 parent 17be2c4 commit af99d11
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 23 deletions.
62 changes: 43 additions & 19 deletions prober/prober.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2017-2019 The Cloudprober Authors.
// Copyright 2017-2023 The Cloudprober Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -49,6 +49,8 @@ import (
"google.golang.org/grpc/status"
)

var randGenerator = rand.New(rand.NewSource(time.Now().UnixNano()))

// Prober represents a collection of probes where each probe implements the Probe interface.
type Prober struct {
Probes map[string]*probes.ProbeInfo
Expand Down Expand Up @@ -266,36 +268,58 @@ func (pr *Prober) startProbe(ctx context.Context, name string) {
go pr.Probes[name].Start(probeCtx, pr.dataChan)
}

// startProbesWithJitter try to space out probes over time, as much as possible,
// without making it too complicated. We arrange probes into interval buckets -
// all probes with the same interval will be part of the same bucket, and we
// then spread out probes within that interval by introducing a delay of
// interval / len(probes) between probes. We also introduce a random jitter
// between different interval buckets.
func (pr *Prober) startProbesWithJitter(ctx context.Context) {
// Seed random number generator.
rand.Seed(time.Now().UnixNano())
func randomDuration(duration, ceiling time.Duration) time.Duration {
if duration == 0 {
return 0
}
if duration > ceiling {
duration = ceiling
}
return time.Duration(randGenerator.Int63n(duration.Milliseconds())) * time.Millisecond
}

// interProbeWait returns the wait time between probes. It's not beneficial for
// this interval to be too large, so we cap it at 2 seconds.
func interProbeWait(interval time.Duration, numProbes int) time.Duration {
d := interval / time.Duration(numProbes)
if d > 2*time.Second {
return 2 * time.Second
}
return d
}

// startProbesWithJitter try to space out probes over time, as much as
// possible, without making it too complicated.
//
// We arrange probes into interval buckets - all probes with the same interval
// will be part of the same bucket. We spread out probes within an interval,
// and also the overall interval-buckets themselves.
//
// [probe1 <gap> probe2 <gap> probe3 <gap> ...] interval1 (30s)
// <interval-bucket-gap> [probe4 <gap> probe5 ...] interval2 (10s)
// <interval-bucket-gap> [probe6 <gap> probe7 ...] interval3 (1m)
func (pr *Prober) startProbesWithJitter(ctx context.Context) {
// Make interval -> [probe1, probe2, probe3..] map
intervalBuckets := make(map[time.Duration][]*probes.ProbeInfo)
for _, p := range pr.Probes {
intervalBuckets[p.Options.Interval] = append(intervalBuckets[p.Options.Interval], p)
}

iter := 0
for interval, probeInfos := range intervalBuckets {
go func(interval time.Duration, probeInfos []*probes.ProbeInfo) {
// Introduce a random jitter between interval buckets.
randomDelayMsec := rand.Int63n(int64(interval.Seconds() * 1000))
time.Sleep(time.Duration(randomDelayMsec) * time.Millisecond)

interProbeDelay := interval / time.Duration(len(probeInfos))
// Note that we introduce jitter within the goroutine instead of in the
// loop here. This is to make sure this function returns quickly.
go func(interval time.Duration, probeInfos []*probes.ProbeInfo, iter int) {
if iter > 0 {
time.Sleep(randomDuration(interval, 10*time.Second))
}

// Spread out probes evenly with an interval bucket.
for _, p := range probeInfos {
pr.l.Info("Starting probe: ", p.Name)
go pr.startProbe(ctx, p.Name)
time.Sleep(interProbeDelay)
time.Sleep(interProbeWait(interval, len(probeInfos)))
}
}(interval, probeInfos)
}(interval, probeInfos, iter)
iter++
}
}
74 changes: 74 additions & 0 deletions prober/prober_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright 2023 The Cloudprober Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package prober

import (
"fmt"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestRandomDuration(t *testing.T) {
tests := []struct {
duration time.Duration
ceiling time.Duration
}{
{
duration: 0,
ceiling: 10 * time.Second,
},
{
duration: 5 * time.Second,
ceiling: 10 * time.Second,
},
{
duration: 30 * time.Second,
ceiling: 10 * time.Second,
},
}
for _, tt := range tests {
t.Run(fmt.Sprintf("%v", tt), func(t *testing.T) {
got := randomDuration(tt.duration, tt.ceiling)
assert.LessOrEqual(t, got, tt.duration)
assert.LessOrEqual(t, got, tt.ceiling)
})
}
}

func TestInterProbeWait(t *testing.T) {
tests := []struct {
interval time.Duration
numProbes int
want time.Duration
}{
{
interval: 2 * time.Second,
numProbes: 16,
want: 125 * time.Millisecond,
},
{
interval: 30 * time.Second,
numProbes: 12,
want: 2 * time.Second,
},
}
for _, tt := range tests {
t.Run(fmt.Sprintf("%s:%d", tt.interval, tt.numProbes), func(t *testing.T) {
assert.Equal(t, tt.want, interProbeWait(tt.interval, tt.numProbes))
})
}
}
17 changes: 13 additions & 4 deletions probes/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,7 @@ func (p *Probe) updateTargetsAndStartProbes(ctx context.Context, dataChan chan *
gapBetweenTargets := p.gapBetweenTargets()
var startWaitTime time.Duration

iter := 0
// Start probe loop for new targets.
for key, target := range activeTargets {
// This target is already initialized.
Expand All @@ -562,12 +563,20 @@ func (p *Probe) updateTargetsAndStartProbes(ctx context.Context, dataChan chan *
probeCtx, cancelF := context.WithCancel(ctx)
p.waitGroup.Add(1)

go func(target endpoint.Endpoint, waitTime time.Duration) {
go func(target endpoint.Endpoint, waitTime time.Duration, iter int) {
defer p.waitGroup.Done()
// Wait for wait time + some jitter before starting this probe loop.
time.Sleep(waitTime + time.Duration(rand.Int63n(gapBetweenTargets.Microseconds()/10))*time.Microsecond)

// To evenly spread out target probes, wait for a random duration
// before starting the target go-routine.
if iter > 0 {
// For random padding using 1/10th of the gap.
jitterUsec := rand.Int63n(gapBetweenTargets.Microseconds() / 10)
time.Sleep(waitTime + time.Duration(jitterUsec)*time.Microsecond)
}

p.startForTarget(probeCtx, target, dataChan)
}(target, startWaitTime)
}(target, startWaitTime, iter)
iter++

startWaitTime += gapBetweenTargets

Expand Down

0 comments on commit af99d11

Please sign in to comment.