Skip to content

Commit

Permalink
fix: revert orchestrator changes
Browse files Browse the repository at this point in the history
  • Loading branch information
garethgeorge committed Apr 12, 2024
1 parent ff2bfb9 commit 07cffcb
Show file tree
Hide file tree
Showing 2 changed files with 254 additions and 25 deletions.
59 changes: 34 additions & 25 deletions internal/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/garethgeorge/backrest/internal/config"
"github.com/garethgeorge/backrest/internal/hook"
"github.com/garethgeorge/backrest/internal/oplog"
"github.com/garethgeorge/backrest/internal/queue"
"github.com/garethgeorge/backrest/internal/rotatinglog"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -41,10 +40,14 @@ type Orchestrator struct {
config *v1.Config
OpLog *oplog.OpLog
repoPool *resticRepoPool
taskQueue *queue.TimePriorityQueue[scheduledTask]
taskQueue taskQueue
hookExecutor *hook.HookExecutor
logStore *rotatinglog.RotatingLog
runningTask atomic.Pointer[taskExecutionInfo]

// now for the purpose of testing; used by Run() to get the current time.
now func() time.Time

runningTask atomic.Pointer[taskExecutionInfo]
}

func NewOrchestrator(resticBin string, cfg *v1.Config, oplog *oplog.OpLog, logStore *rotatinglog.RotatingLog) (*Orchestrator, error) {
Expand All @@ -56,8 +59,10 @@ func NewOrchestrator(resticBin string, cfg *v1.Config, oplog *oplog.OpLog, logSt
OpLog: oplog,
config: cfg,
// repoPool created with a memory store to ensure the config is updated in an atomic operation with the repo pool's config value.
repoPool: newResticRepoPool(resticBin, &config.MemoryStore{Config: cfg}),
taskQueue: queue.NewTimePriorityQueue[scheduledTask](),
repoPool: newResticRepoPool(resticBin, &config.MemoryStore{Config: cfg}),
taskQueue: newTaskQueue(func() time.Time {
return o.curTime()
}),
hookExecutor: hook.NewHookExecutor(oplog, logStore),
logStore: logStore,
}
Expand Down Expand Up @@ -99,6 +104,13 @@ func NewOrchestrator(resticBin string, cfg *v1.Config, oplog *oplog.OpLog, logSt
return o, nil
}

func (o *Orchestrator) curTime() time.Time {
if o.now != nil {
return o.now()
}
return time.Now()
}

func (o *Orchestrator) ApplyConfig(cfg *v1.Config) error {
o.mu.Lock()
defer o.mu.Unlock()
Expand All @@ -109,13 +121,12 @@ func (o *Orchestrator) ApplyConfig(cfg *v1.Config) error {
return fmt.Errorf("failed to update repo pool config: %w", err)
}

return o.scheduleDefaultTasks(cfg)
return o.ScheduleDefaultTasks(cfg)
}

// rescheduleTasksIfNeeded checks if any tasks need to be rescheduled based on config changes.
func (o *Orchestrator) scheduleDefaultTasks(config *v1.Config) error {
func (o *Orchestrator) ScheduleDefaultTasks(config *v1.Config) error {
zap.L().Info("scheduling default tasks, waiting for task queue reset.")

removedTasks := o.taskQueue.Reset()
for _, t := range removedTasks {
if err := t.task.Cancel(v1.OperationStatus_STATUS_SYSTEM_CANCELLED); err != nil {
Expand Down Expand Up @@ -184,22 +195,28 @@ func (o *Orchestrator) CancelOperation(operationId int64, status v1.OperationSta
}

tasks := o.taskQueue.Reset()
remaining := make([]scheduledTask, 0, len(tasks))

for _, t := range tasks {
if t.task.OperationId() == operationId {
if err := t.task.Cancel(status); err != nil {
return fmt.Errorf("cancel task %q: %w", t.task.Name(), err)
}

nextTime := t.task.Next(t.runAt)
if nextTime == nil {
continue
// check if the task has a next after it's current 'runAt' time, if it does then we will schedule the next run.
if nextTime := t.task.Next(t.runAt); nextTime != nil {
remaining = append(remaining, scheduledTask{
task: t.task,
runAt: *nextTime,
})
}

t.runAt = *nextTime
} else {
remaining = append(remaining, *t)
}
o.taskQueue.Enqueue(t.runAt, t.priority, t) // requeue the task.
}

o.taskQueue.Push(remaining...)

return nil
}

Expand All @@ -214,7 +231,7 @@ func (o *Orchestrator) Run(mainCtx context.Context) {
}

t := o.taskQueue.Dequeue(mainCtx)
if t.task == nil {
if t == nil {
continue
}

Expand Down Expand Up @@ -250,12 +267,12 @@ func (o *Orchestrator) Run(mainCtx context.Context) {
}

func (o *Orchestrator) ScheduleTask(t Task, priority int, callbacks ...func(error)) {
nextRun := t.Next(time.Now())
nextRun := t.Next(o.curTime())
if nextRun == nil {
return
}
zap.L().Info("scheduling task", zap.String("task", t.Name()), zap.String("runAt", nextRun.Format(time.RFC3339)))
o.taskQueue.Enqueue(*nextRun, priority, scheduledTask{
o.taskQueue.Push(scheduledTask{
task: t,
runAt: *nextRun,
priority: priority,
Expand Down Expand Up @@ -324,11 +341,3 @@ type taskExecutionInfo struct {
operationId int64
cancel func()
}

type scheduledTask struct {
task Task
runAt time.Time
priority int
callbacks []func(error)
config *v1.Config
}
220 changes: 220 additions & 0 deletions internal/orchestrator/scheduledtaskheap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
package orchestrator

import (
"container/heap"
"context"
"sync"
"time"

v1 "github.com/garethgeorge/backrest/gen/go/v1"
)

var taskQueueDefaultPollInterval = 3 * time.Minute

type taskQueue struct {
dequeueMu sync.Mutex
mu sync.Mutex
heap scheduledTaskHeapByTime
notify chan struct{}
ready scheduledTaskHeapByPriorityThenTime
pollInterval time.Duration

Now func() time.Time
}

func newTaskQueue(now func() time.Time) taskQueue {
return taskQueue{
heap: scheduledTaskHeapByTime{},
ready: scheduledTaskHeapByPriorityThenTime{},
pollInterval: taskQueueDefaultPollInterval,
Now: now,
}
}

func (t *taskQueue) curTime() time.Time {
if t.Now != nil {
return t.Now()
}
return time.Now()
}

func (t *taskQueue) Push(tasks ...scheduledTask) {
t.mu.Lock()
defer t.mu.Unlock()

for _, task := range tasks {
task := task
if task.task == nil {
panic("task cannot be nil")
}
heap.Push(&t.heap, &task)
}

if t.notify != nil {
t.notify <- struct{}{}
}
}

func (t *taskQueue) Reset() []*scheduledTask {
t.mu.Lock()
defer t.mu.Unlock()

oldTasks := t.heap.tasks
oldTasks = append(oldTasks, t.ready.tasks...)
t.heap.tasks = nil
t.ready.tasks = nil

if t.notify != nil {
t.notify <- struct{}{}
}
return oldTasks
}

func (t *taskQueue) Dequeue(ctx context.Context) *scheduledTask {
t.dequeueMu.Lock()
defer t.dequeueMu.Unlock()

t.mu.Lock()
defer t.mu.Unlock()
t.notify = make(chan struct{}, 10)
defer func() {
close(t.notify)
t.notify = nil
}()

for {
first, ok := t.heap.Peek().(*scheduledTask)
if !ok { // no tasks in heap.
if t.ready.Len() > 0 {
return heap.Pop(&t.ready).(*scheduledTask)
}
t.mu.Unlock()
select {
case <-ctx.Done():
t.mu.Lock()
return nil
case <-t.notify:
}
t.mu.Lock()
continue
}

now := t.curTime()

// if there's a task in the ready queue AND the first task isn't ready yet then immediately return the ready task.
ready, ok := t.ready.Peek().(*scheduledTask)
if ok && now.Before(first.runAt) {
heap.Pop(&t.ready)
return ready
}

t.mu.Unlock()
d := first.runAt.Sub(now)
if t.pollInterval > 0 && d > t.pollInterval {
// A poll interval may be set to work around clock changes
// e.g. when a laptop wakes from sleep or the system clock is adjusted.
d = t.pollInterval
}
timer := time.NewTimer(d)

select {
case <-timer.C:
t.mu.Lock()
if t.heap.Len() == 0 {
break
}

for {
first, ok := t.heap.Peek().(*scheduledTask)
if !ok {
break
}
if first.runAt.After(t.curTime()) {
// task is not yet ready to run
break
}
heap.Pop(&t.heap) // remove the task from the heap
heap.Push(&t.ready, first)
}

if t.ready.Len() == 0 {
break
}
return heap.Pop(&t.ready).(*scheduledTask)
case <-t.notify: // new task was added, loop again to ensure we have the earliest task.
t.mu.Lock()
if !timer.Stop() {
<-timer.C
}
case <-ctx.Done():
t.mu.Lock()
if !timer.Stop() {
<-timer.C
}
return nil
}
}
}

type scheduledTask struct {
task Task
runAt time.Time
priority int
callbacks []func(error)
config *v1.Config
}

type scheduledTaskHeap struct {
tasks []*scheduledTask
comparator func(i, j *scheduledTask) bool
}

func (h *scheduledTaskHeap) Len() int {
return len(h.tasks)
}

func (h *scheduledTaskHeap) Swap(i, j int) {
h.tasks[i], h.tasks[j] = h.tasks[j], h.tasks[i]
}

func (h *scheduledTaskHeap) Push(x interface{}) {
h.tasks = append(h.tasks, x.(*scheduledTask))
}

func (h *scheduledTaskHeap) Pop() interface{} {
old := h.tasks
n := len(old)
x := old[n-1]
h.tasks = old[0 : n-1]
return x
}

func (h *scheduledTaskHeap) Peek() interface{} {
if len(h.tasks) == 0 {
return nil
}
return h.tasks[0]
}

type scheduledTaskHeapByTime struct {
scheduledTaskHeap
}

var _ heap.Interface = &scheduledTaskHeapByTime{}

func (h *scheduledTaskHeapByTime) Less(i, j int) bool {
return h.tasks[i].runAt.Before(h.tasks[j].runAt)
}

type scheduledTaskHeapByPriorityThenTime struct {
scheduledTaskHeap
}

var _ heap.Interface = &scheduledTaskHeapByPriorityThenTime{}

func (h *scheduledTaskHeapByPriorityThenTime) Less(i, j int) bool {
if h.tasks[i].priority != h.tasks[j].priority {
return h.tasks[i].priority > h.tasks[j].priority
}
return h.tasks[i].runAt.Before(h.tasks[j].runAt)
}

0 comments on commit 07cffcb

Please sign in to comment.