Skip to content

Commit

Permalink
fix: tasks duplicated when config is updated during a running operation
Browse files Browse the repository at this point in the history
  • Loading branch information
garethgeorge committed Mar 30, 2024
1 parent 8cf43f2 commit 035684c
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 6 deletions.
22 changes: 16 additions & 6 deletions internal/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,17 @@ func (o *Orchestrator) ApplyConfig(cfg *v1.Config) error {
defer o.mu.Unlock()
o.config = cfg

// Update the config provided to the repo pool.
// Update the config provided to the repo pool which is cached and diffed separately.
if err := o.repoPool.configProvider.Update(cfg); err != nil {
return fmt.Errorf("failed to update repo pool config: %w", err)
}

// reset queued tasks, this may loose any ephemeral operations scheduled by RPC. Tasks in progress aren't returned by Reset() so they will not be cancelled.
zap.L().Info("Applying config to orchestrator, waiting for task queue reset.")
return o.ScheduleDefaultTasks(cfg)
}

// rescheduleTasksIfNeeded checks if any tasks need to be rescheduled based on config changes.
func (o *Orchestrator) ScheduleDefaultTasks(config *v1.Config) error {
zap.L().Info("scheduling default tasks, waiting for task queue reset.")
removedTasks := o.taskQueue.Reset()
for _, t := range removedTasks {
if err := t.task.Cancel(v1.OperationStatus_STATUS_SYSTEM_CANCELLED); err != nil {
Expand All @@ -131,13 +135,15 @@ func (o *Orchestrator) ApplyConfig(cfg *v1.Config) error {
zap.L().Debug("queued task cancelled due to config change", zap.String("task", t.task.Name()))
}
}
zap.L().Info("Applied config to orchestrator, task queue reset. Rescheduling planned tasks now.")

zap.L().Info("reset task queue, scheduling new task set.")

// Requeue tasks that are affected by the config change.
o.ScheduleTask(&CollectGarbageTask{
orchestrator: o,
}, TaskPriorityDefault)
for _, plan := range cfg.Plans {

for _, plan := range config.Plans {
if plan.Disabled {
continue
}
Expand Down Expand Up @@ -253,7 +259,10 @@ func (o *Orchestrator) Run(mainCtx context.Context) {
cb(err)
}

o.ScheduleTask(t.task, t.priority)
if t.config == o.config {
// Only reschedule tasks if the config hasn't changed since the task was scheduled.
o.ScheduleTask(t.task, t.priority)
}
}
}

Expand All @@ -268,6 +277,7 @@ func (o *Orchestrator) ScheduleTask(t Task, priority int, callbacks ...func(erro
runAt: *nextRun,
priority: priority,
callbacks: callbacks,
config: o.config,
})
}

Expand Down
3 changes: 3 additions & 0 deletions internal/orchestrator/scheduledtaskheap.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"context"
"sync"
"time"

v1 "github.com/garethgeorge/backrest/gen/go/v1"
)

var taskQueueDefaultPollInterval = 3 * time.Minute
Expand Down Expand Up @@ -159,6 +161,7 @@ type scheduledTask struct {
runAt time.Time
priority int
callbacks []func(error)
config *v1.Config
}

type scheduledTaskHeap struct {
Expand Down

0 comments on commit 035684c

Please sign in to comment.