Skip to content

Commit

Permalink
fix: tasks run late when laptops resume from sleep
Browse files Browse the repository at this point in the history
  • Loading branch information
garethgeorge committed Dec 28, 2023
1 parent 3f3252d commit cb78298
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 11 deletions.
8 changes: 3 additions & 5 deletions internal/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,9 @@ func NewOrchestrator(resticBin string, cfg *v1.Config, oplog *oplog.OpLog) (*Orc
config: cfg,
// repoPool created with a memory store to ensure the config is updated in an atomic operation with the repo pool's config value.
repoPool: newResticRepoPool(resticBin, &config.MemoryStore{Config: cfg}),
taskQueue: taskQueue{
Now: func() time.Time {
return o.curTime()
},
},
taskQueue: newTaskQueue(func() time.Time {
return o.curTime()
}),
}

// verify the operation log and mark any incomplete operations as failed.
Expand Down
30 changes: 24 additions & 6 deletions internal/orchestrator/scheduledtaskheap.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,28 @@ import (
"time"
)

var taskQueueDefaultPollInterval = 15 * time.Minute

type taskQueue struct {
dequeueMu sync.Mutex
mu sync.Mutex
heap scheduledTaskHeapByTime
notify chan struct{}
ready scheduledTaskHeapByPriorityThenTime
dequeueMu sync.Mutex
mu sync.Mutex
heap scheduledTaskHeapByTime
notify chan struct{}
ready scheduledTaskHeapByPriorityThenTime
pollInterval time.Duration

Now func() time.Time
}

func newTaskQueue(now func() time.Time) taskQueue {
return taskQueue{
heap: scheduledTaskHeapByTime{},
ready: scheduledTaskHeapByPriorityThenTime{},
pollInterval: taskQueueDefaultPollInterval,
Now: now,
}
}

func (t *taskQueue) curTime() time.Time {
if t.Now != nil {
return t.Now()
Expand Down Expand Up @@ -94,7 +106,13 @@ func (t *taskQueue) Dequeue(ctx context.Context) *scheduledTask {
}

t.mu.Unlock()
timer := time.NewTimer(first.runAt.Sub(now))
d := first.runAt.Sub(now)
if t.pollInterval > 0 && d > t.pollInterval {
// A poll interval may be set to work around clock changes
// e.g. when a laptop wakes from sleep or the system clock is adjusted.
d = t.pollInterval
}
timer := time.NewTimer(d)

select {
case <-timer.C:
Expand Down

0 comments on commit cb78298

Please sign in to comment.