Skip to content

Commit

Permalink
Use exponential backoff with jitter (FerretDB#2419)
Browse files Browse the repository at this point in the history
  • Loading branch information
j0holo authored and fenogentov committed Apr 20, 2023
1 parent 34d1b99 commit 235b9ea
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 27 deletions.
20 changes: 9 additions & 11 deletions cmd/envtool/envtool.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func waitForPort(ctx context.Context, logger *zap.SugaredLogger, port uint16) er
addr := fmt.Sprintf("127.0.0.1:%d", port)
logger.Infof("Waiting for %s to be up...", addr)

var retry int64
attempts := int64(1)
for ctx.Err() == nil {
conn, err := net.Dial("tcp", addr)
if err == nil {
Expand All @@ -69,8 +69,8 @@ func waitForPort(ctx context.Context, logger *zap.SugaredLogger, port uint16) er

logger.Infof("%s: %s", addr, err)

retry++
ctxutil.SleepWithJitter(ctx, time.Second, retry)
ctxutil.SleepWithJitter(ctx, time.Second, attempts)
attempts++
}

return fmt.Errorf("failed to connect to %s", addr)
Expand Down Expand Up @@ -99,16 +99,15 @@ func setupAnyPostgres(ctx context.Context, logger *zap.SugaredLogger, uri string

var pgPool *pgdb.Pool

var retry int64
attempts := int64(1)
for ctx.Err() == nil {
if pgPool, err = pgdb.NewPool(ctx, uri, logger.Desugar(), p); err == nil {
break
}

logger.Infof("%s: %s", uri, err)

retry++
ctxutil.SleepWithJitter(ctx, time.Second, retry)
ctxutil.SleepWithJitter(ctx, time.Second, attempts)
attempts++
}

defer pgPool.Close()
Expand Down Expand Up @@ -168,16 +167,15 @@ func setupAnyTigris(ctx context.Context, logger *zap.SugaredLogger, port uint16)

var db *tigrisdb.TigrisDB

var retry int64
attempts := int64(1)
for ctx.Err() == nil {
if db, err = tigrisdb.New(ctx, cfg, logger.Desugar()); err == nil {
break
}

logger.Infof("%s: %s", cfg.URL, err)

retry++
ctxutil.SleepWithJitter(ctx, time.Second, retry)
ctxutil.SleepWithJitter(ctx, time.Second, attempts)
attempts++
}

defer db.Driver.Close()
Expand Down
4 changes: 3 additions & 1 deletion integration/commands_administration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -915,6 +915,7 @@ func TestCommandsAdministrationServerStatusFreeMonitoring(t *testing.T) {

// MongoDB might be slow to update the status
var status any
var attempts int64 = 1
for i := 0; i < 3; i++ {
var actual bson.D
err := s.Collection.Database().RunCommand(s.Ctx, bson.D{{"serverStatus", 1}}).Decode(&actual)
Expand All @@ -931,7 +932,8 @@ func TestCommandsAdministrationServerStatusFreeMonitoring(t *testing.T) {
if status == tc.expectedStatus {
break
}
ctxutil.Sleep(s.Ctx, time.Second)
ctxutil.SleepWithJitter(s.Ctx, time.Second, attempts)
attempts++
}

assert.Equal(t, tc.expectedStatus, status)
Expand Down
6 changes: 3 additions & 3 deletions internal/clientconn/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func setupTLSListener(opts *setupTLSListenerOpts) (net.Listener, error) {

// acceptLoop runs listener's connection accepting loop.
func acceptLoop(ctx context.Context, listener net.Listener, wg *sync.WaitGroup, l *Listener, logger *zap.Logger) {
var retry int64
attempts := int64(1)
for {
netConn, err := listener.Accept()
if err != nil {
Expand All @@ -256,8 +256,8 @@ func acceptLoop(ctx context.Context, listener net.Listener, wg *sync.WaitGroup,

logger.Warn("Failed to accept connection", zap.Error(err))
if !errors.Is(err, net.ErrClosed) {
retry++
ctxutil.SleepWithJitter(ctx, time.Second, retry)
ctxutil.SleepWithJitter(ctx, time.Second, attempts)
attempts++
}
continue
}
Expand Down
2 changes: 1 addition & 1 deletion internal/handlers/pg/pgdb/transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (pgPool *Pool) InTransactionRetry(ctx context.Context, f func(pgx.Tx) error
map[string]any{"err": err, "retry": retry},
)

ctxutil.SleepWithJitter(ctx, retryDelayMax, retry)
ctxutil.SleepWithJitter(ctx, retryDelayMax, int64(attempts))

default:
return lazyerrors.Errorf("non-retriable error: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion internal/handlers/tigris/tigrisdb/databases.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (tdb *TigrisDB) createDatabaseIfNotExists(ctx context.Context, db string) (
zap.String("db", db), zap.Int("retry", retry), zap.Error(err),
)

ctxutil.SleepWithJitter(ctx, retryDelayMax, int64(retry))
ctxutil.SleepWithJitter(ctx, retryDelayMax, int64(i+1))
continue
}

Expand Down
18 changes: 11 additions & 7 deletions internal/util/ctxutil/ctxutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,18 +61,22 @@ func SleepWithJitter(ctx context.Context, d time.Duration, attempts int64) {
<-sleepCtx.Done()
}

// DurationWithJitter returns an exponential backoff duration based on retry with random jitter.
// The maximum sleep is the cap. The minimum duration is at least 100 milliseconds.
// DurationWithJitter returns a duration
// based including some random jitter.
// The maximum sleep is the input duration.
//
// Math/rand is good enough because we don't need the randomness to be cryptographically secure.
func DurationWithJitter(cap time.Duration, retry int64) time.Duration {
// The minimum duration is at least 100 milliseconds.
//
// Math/rand is good enough because we don't need
// the randomness to be cryptographically secure.
func DurationWithJitter(cap time.Duration, attempts int64) time.Duration {
const base = time.Millisecond * 100

if retry < 1 {
panic("retry must be nonzero positive number")
if attempts < 1 {
attempts = 1
}

maxMilliseconds := float64(base.Milliseconds()) * math.Pow(2, float64(retry))
maxMilliseconds := float64(base.Milliseconds()) * math.Pow(2, float64(attempts))
capMilliseconds := float64(cap.Milliseconds())
lowestValue := int64(math.Min(capMilliseconds, maxMilliseconds))

Expand Down
25 changes: 22 additions & 3 deletions internal/util/ctxutil/ctxutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
package ctxutil

import (
"fmt"
"os"
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -72,3 +72,22 @@ func TestDurationWithJitter(t *testing.T) {
}
})
}

func TestDurationWithJitter(t *testing.T) {
t.Parallel()

t.Run("larger or equal then 1ms", func(t *testing.T) {
sleep := DurationWithJitter(time.Second, 1)
assert.GreaterOrEqual(t, sleep, time.Millisecond)
})

t.Run("less or equal then duration input", func(t *testing.T) {
sleep := DurationWithJitter(time.Second, 100000)
assert.LessOrEqual(t, sleep, time.Second)
})

t.Run("attempt cannot be less then 1", func(t *testing.T) {
sleep := DurationWithJitter(time.Second, 0)
assert.LessOrEqual(t, sleep, time.Second)
})
}

0 comments on commit 235b9ea

Please sign in to comment.