Skip to content

Commit

Permalink
Add SleepWithJitter to reduce contention for resources in loops
Browse files Browse the repository at this point in the history
  • Loading branch information
j0holo committed Apr 13, 2023
1 parent 669baef commit c8a27b6
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 8 deletions.
13 changes: 10 additions & 3 deletions cmd/envtool/envtool.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func waitForPort(ctx context.Context, logger *zap.SugaredLogger, port uint16) er
addr := fmt.Sprintf("127.0.0.1:%d", port)
logger.Infof("Waiting for %s to be up...", addr)

var attempts = int64(1)
for ctx.Err() == nil {
conn, err := net.Dial("tcp", addr)
if err == nil {
Expand All @@ -67,7 +68,9 @@ func waitForPort(ctx context.Context, logger *zap.SugaredLogger, port uint16) er
}

logger.Infof("%s: %s", addr, err)
ctxutil.Sleep(ctx, time.Second)

ctxutil.SleepWithJitter(ctx, time.Second, attempts)
attempts++
}

return fmt.Errorf("failed to connect to %s", addr)
Expand Down Expand Up @@ -96,13 +99,15 @@ func setupAnyPostgres(ctx context.Context, logger *zap.SugaredLogger, uri string

var pgPool *pgdb.Pool

var attempts = int64(1)
for ctx.Err() == nil {
if pgPool, err = pgdb.NewPool(ctx, uri, logger.Desugar(), p); err == nil {
break
}

logger.Infof("%s: %s", uri, err)
ctxutil.Sleep(ctx, time.Second)
ctxutil.SleepWithJitter(ctx, time.Second, attempts)
attempts++
}

defer pgPool.Close()
Expand Down Expand Up @@ -162,13 +167,15 @@ func setupAnyTigris(ctx context.Context, logger *zap.SugaredLogger, port uint16)

var db *tigrisdb.TigrisDB

var attempts = int64(1)
for ctx.Err() == nil {
if db, err = tigrisdb.New(ctx, cfg, logger.Desugar()); err == nil {
break
}

logger.Infof("%s: %s", cfg.URL, err)
ctxutil.Sleep(ctx, time.Second)
ctxutil.SleepWithJitter(ctx, time.Second, attempts)
attempts++
}

defer db.Driver.Close()
Expand Down
4 changes: 3 additions & 1 deletion integration/commands_administration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -907,6 +907,7 @@ func TestCommandsAdministrationServerStatusFreeMonitoring(t *testing.T) {

// MongoDB might be slow to update the status
var status any
var attempts int64 = 1
for i := 0; i < 3; i++ {
var actual bson.D
err := s.Collection.Database().RunCommand(s.Ctx, bson.D{{"serverStatus", 1}}).Decode(&actual)
Expand All @@ -923,7 +924,8 @@ func TestCommandsAdministrationServerStatusFreeMonitoring(t *testing.T) {
if status == tc.expectedStatus {
break
}
ctxutil.Sleep(s.Ctx, time.Second)
ctxutil.SleepWithJitter(s.Ctx, time.Second, attempts)
attempts++
}

assert.Equal(t, tc.expectedStatus, status)
Expand Down
4 changes: 3 additions & 1 deletion internal/clientconn/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ func setupTLSListener(opts *setupTLSListenerOpts) (net.Listener, error) {

// acceptLoop runs listener's connection accepting loop.
func acceptLoop(ctx context.Context, listener net.Listener, wg *sync.WaitGroup, l *Listener, logger *zap.Logger) {
var attempts = int64(1)
for {
netConn, err := listener.Accept()
if err != nil {
Expand All @@ -255,7 +256,8 @@ func acceptLoop(ctx context.Context, listener net.Listener, wg *sync.WaitGroup,

logger.Warn("Failed to accept connection", zap.Error(err))
if !errors.Is(err, net.ErrClosed) {
ctxutil.Sleep(ctx, time.Second)
ctxutil.SleepWithJitter(ctx, time.Second, attempts)
attempts++
}
continue
}
Expand Down
2 changes: 1 addition & 1 deletion internal/handlers/pg/pgdb/transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (pgPool *Pool) InTransactionRetry(ctx context.Context, f func(pgx.Tx) error
map[string]any{"err": err, "attempt": attempts, "delay": delay},
)

ctxutil.Sleep(ctx, delay)
ctxutil.SleepWithJitter(ctx, retryDelayMax, int64(attempts))

default:
return lazyerrors.Errorf("non-retriable error: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion internal/handlers/tigris/tigrisdb/databases.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (tdb *TigrisDB) createDatabaseIfNotExists(ctx context.Context, db string) (
zap.String("db", db), zap.Error(err), zap.Duration("delay", delay),
)

ctxutil.Sleep(ctx, delay)
ctxutil.SleepWithJitter(ctx, retryDelayMax, int64(i+1))
continue
}

Expand Down
31 changes: 31 additions & 0 deletions internal/util/ctxutil/ctxutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package ctxutil

import (
"context"
"math"
"math/rand"
"time"
)

Expand Down Expand Up @@ -51,3 +53,32 @@ func Sleep(ctx context.Context, d time.Duration) {
defer cancel()
<-sleepCtx.Done()
}

// SleepWithJitter pauses the current goroutine until d + jitter has passed or ctx is canceled.
func SleepWithJitter(ctx context.Context, d time.Duration, attempts int64) {
sleepCtx, cancel := context.WithTimeout(ctx, DurationWithJitter(d, attempts))
defer cancel()
<-sleepCtx.Done()
}

// DurationWithJitter returns a duration
// based including some random jitter.
// The maximum sleep is the input duration.
//
// The minimum duration is at least 100 milliseconds.
//
// Math/rand is good enough because we don't need
// the randomness to be cryptographically secure.
func DurationWithJitter(cap time.Duration, attempts int64) time.Duration {
const base = time.Millisecond * 100

if attempts < 1 {
attempts = 1
}

maxMilliseconds := float64(base.Milliseconds()) * math.Pow(2, float64(attempts))
capMilliseconds := float64(cap.Milliseconds())
lowestValue := int64(math.Min(capMilliseconds, maxMilliseconds))

return time.Duration(rand.Int63n(lowestValue)) * time.Millisecond
}
26 changes: 25 additions & 1 deletion internal/util/ctxutil/ctxutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,32 @@

package ctxutil

import "testing"
import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestDummy(t *testing.T) {
// we need at least one test per package to correctly calculate coverage
}

func TestDurationWithJitter(t *testing.T) {
t.Parallel()

t.Run("larger or equal then 1ms", func(t *testing.T) {
sleep := DurationWithJitter(time.Second, 1)
assert.GreaterOrEqual(t, sleep, time.Millisecond)
})

t.Run("less or equal then duration input", func(t *testing.T) {
sleep := DurationWithJitter(time.Second, 100000)
assert.LessOrEqual(t, sleep, time.Second)
})

t.Run("attempt cannot be less then 1", func(t *testing.T) {
sleep := DurationWithJitter(time.Second, 0)
assert.LessOrEqual(t, sleep, time.Second)
})
}

0 comments on commit c8a27b6

Please sign in to comment.