Skip to content

Commit

Permalink
Replace ctxutil.Sleep with ctxutil.SleepWithJitter in loops
Browse files Browse the repository at this point in the history
time.Sleep is not used in the code base. context.WithTimeout
is used to stop actions. Not to retry them. Adding jitter
to things that do not retry getting resources does not make sense.
  • Loading branch information
ruben authored and j0holo committed Apr 12, 2023
1 parent 9822cf6 commit 5653b38
Show file tree
Hide file tree
Showing 7 changed files with 27 additions and 17 deletions.
12 changes: 9 additions & 3 deletions cmd/envtool/envtool.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func waitForPort(ctx context.Context, logger *zap.SugaredLogger, port uint16) er
addr := fmt.Sprintf("127.0.0.1:%d", port)
logger.Infof("Waiting for %s to be up...", addr)

var attempts int64 = 1
for ctx.Err() == nil {
conn, err := net.Dial("tcp", addr)
if err == nil {
Expand All @@ -67,7 +68,9 @@ func waitForPort(ctx context.Context, logger *zap.SugaredLogger, port uint16) er
}

logger.Infof("%s: %s", addr, err)
ctxutil.Sleep(ctx, time.Second)

ctxutil.SleepWithJitter(ctx, time.Second, attempts)
attempts++
}

return fmt.Errorf("failed to connect to %s", addr)
Expand Down Expand Up @@ -96,13 +99,14 @@ func setupAnyPostgres(ctx context.Context, logger *zap.SugaredLogger, uri string

var pgPool *pgdb.Pool

var attempt int64 = 1
for ctx.Err() == nil {
if pgPool, err = pgdb.NewPool(ctx, uri, logger.Desugar(), p); err == nil {
break
}

logger.Infof("%s: %s", uri, err)
ctxutil.Sleep(ctx, time.Second)
ctxutil.SleepWithJitter(ctx, time.Second, attempt)
}

defer pgPool.Close()
Expand Down Expand Up @@ -162,13 +166,15 @@ func setupAnyTigris(ctx context.Context, logger *zap.SugaredLogger, port uint16)

var db *tigrisdb.TigrisDB

var attempts int64 = 1
for ctx.Err() == nil {
if db, err = tigrisdb.New(ctx, cfg, logger.Desugar()); err == nil {
break
}

logger.Infof("%s: %s", cfg.URL, err)
ctxutil.Sleep(ctx, time.Second)
ctxutil.SleepWithJitter(ctx, time.Second, attempts)
attempts++
}

defer db.Driver.Close()
Expand Down
4 changes: 3 additions & 1 deletion integration/commands_administration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -907,6 +907,7 @@ func TestCommandsAdministrationServerStatusFreeMonitoring(t *testing.T) {

// MongoDB might be slow to update the status
var status any
var attempts int64 = 1
for i := 0; i < 3; i++ {
var actual bson.D
err := s.Collection.Database().RunCommand(s.Ctx, bson.D{{"serverStatus", 1}}).Decode(&actual)
Expand All @@ -923,7 +924,8 @@ func TestCommandsAdministrationServerStatusFreeMonitoring(t *testing.T) {
if status == tc.expectedStatus {
break
}
ctxutil.Sleep(s.Ctx, time.Second)
ctxutil.SleepWithJitter(s.Ctx, time.Second, attempts)
attempts++
}

assert.Equal(t, tc.expectedStatus, status)
Expand Down
4 changes: 3 additions & 1 deletion internal/clientconn/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ func setupTLSListener(opts *setupTLSListenerOpts) (net.Listener, error) {

// acceptLoop runs listener's connection accepting loop.
func acceptLoop(ctx context.Context, listener net.Listener, wg *sync.WaitGroup, l *Listener, logger *zap.Logger) {
var attempts int64 = 1
for {
netConn, err := listener.Accept()
if err != nil {
Expand All @@ -255,7 +256,8 @@ func acceptLoop(ctx context.Context, listener net.Listener, wg *sync.WaitGroup,

logger.Warn("Failed to accept connection", zap.Error(err))
if !errors.Is(err, net.ErrClosed) {
ctxutil.Sleep(ctx, time.Second)
ctxutil.SleepWithJitter(ctx, time.Second, attempts)
attempts++
}
continue
}
Expand Down
2 changes: 1 addition & 1 deletion internal/handlers/pg/pgdb/transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (pgPool *Pool) InTransactionRetry(ctx context.Context, f func(pgx.Tx) error
map[string]any{"err": err, "attempt": attempts, "delay": delay},
)

ctxutil.Sleep(ctx, delay)
ctxutil.SleepWithJitter(ctx, delay, int64(attempts))

default:
return lazyerrors.Errorf("non-retriable error: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion internal/handlers/tigris/tigrisdb/databases.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (tdb *TigrisDB) createDatabaseIfNotExists(ctx context.Context, db string) (
zap.String("db", db), zap.Error(err), zap.Duration("delay", delay),
)

ctxutil.Sleep(ctx, delay)
ctxutil.SleepWithJitter(ctx, delay, int64(i+1))
continue
}

Expand Down
14 changes: 7 additions & 7 deletions internal/util/ctxutil/ctxutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,28 +55,28 @@ func Sleep(ctx context.Context, d time.Duration) {
}

// SleepWithJitter pauses the current goroutine until d + jitter has passed or ctx is canceled.
func SleepWithJitter(ctx context.Context, d time.Duration, attempt int64) {
sleepCtx, cancel := context.WithTimeout(ctx, durationWithJitter(d, attempt))
func SleepWithJitter(ctx context.Context, d time.Duration, attempts int64) {
sleepCtx, cancel := context.WithTimeout(ctx, DurationWithJitter(d, attempts))
defer cancel()
<-sleepCtx.Done()
}

// durationWithJitter returns a duration
// DurationWithJitter returns a duration
// based including some random jitter.
// The maximum sleep is the input duration.
//
// The minimum duration is at least 100 milliseconds.
//
// math/rand is good enough because we don't need
// the randomness to be cryptographically secure.
func durationWithJitter(cap time.Duration, attempt int64) time.Duration {
func DurationWithJitter(cap time.Duration, attempts int64) time.Duration {
const base = time.Millisecond * 100

if attempt < 1 {
attempt = 1
if attempts < 1 {
attempts = 1
}

maxMilliseconds := float64(base.Milliseconds()) * math.Pow(2, float64(attempt))
maxMilliseconds := float64(base.Milliseconds()) * math.Pow(2, float64(attempts))
capMilliseconds := float64(cap.Milliseconds())
lowestValue := int64(math.Min(capMilliseconds, maxMilliseconds))
return time.Duration(rand.Int63n(lowestValue)) * time.Millisecond
Expand Down
6 changes: 3 additions & 3 deletions internal/util/ctxutil/ctxutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,17 @@ func TestDurationWithJitter(t *testing.T) {
t.Parallel()

t.Run("larger or equal then 1ms", func(t *testing.T) {
sleep := durationWithJitter(time.Second, 1)
sleep := DurationWithJitter(time.Second, 1)
assert.GreaterOrEqual(t, sleep, time.Millisecond)
})

t.Run("less or equal then duration input", func(t *testing.T) {
sleep := durationWithJitter(time.Second, 100000)
sleep := DurationWithJitter(time.Second, 100000)
assert.LessOrEqual(t, sleep, time.Second)
})

t.Run("attempt cannot be less then 1", func(t *testing.T) {
sleep := durationWithJitter(time.Second, 0)
sleep := DurationWithJitter(time.Second, 0)
assert.LessOrEqual(t, sleep, time.Second)
})
}

0 comments on commit 5653b38

Please sign in to comment.