Skip to content

Commit

Permalink
feat: display queued operations
Browse files Browse the repository at this point in the history
  • Loading branch information
garethgeorge committed Nov 29, 2023
1 parent 1b67e2b commit 0c818bb
Show file tree
Hide file tree
Showing 15 changed files with 2,888 additions and 862 deletions.
5 changes: 4 additions & 1 deletion cmd/resticui/resticui.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,10 @@ func main() {
zap.S().Fatalf("Error finding or installing restic: %v", err)
}

orchestrator := orchestrator.NewOrchestrator(resticPath, cfg, oplog)
orchestrator, err := orchestrator.NewOrchestrator(resticPath, cfg, oplog)
if err != nil {
zap.S().Fatalf("Error creating orchestrator: %v", err)
}

// Start orchestration loop. Only exits when ctx is cancelled.
go orchestrator.Run(ctx)
Expand Down
45 changes: 25 additions & 20 deletions gen/go/v1/operations.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 7 additions & 3 deletions internal/oplog/oplog.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,13 @@ func NewOpLog(databasePath string) (*OpLog, error) {
zap.L().Error("error unmarshalling operation, there may be corruption in the oplog", zap.Error(err))
continue
}
if op.Status == v1.OperationStatus_STATUS_INPROGRESS {
op.Status = v1.OperationStatus_STATUS_ERROR
op.DisplayMessage = "Operation timeout."

if op.Status == v1.OperationStatus_STATUS_PENDING || op.Status == v1.OperationStatus_STATUS_SYSTEM_CANCELLED {
// remove pending operations.
o.deleteOperationHelper(tx, op.Id)
continue
} else if op.Status == v1.OperationStatus_STATUS_INPROGRESS {
o.deleteOperationHelper(tx, op.Id)
}

if err := o.addOperationHelper(tx, op); err != nil {
Expand Down
22 changes: 16 additions & 6 deletions internal/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,10 @@ type Orchestrator struct {
now func() time.Time
}

func NewOrchestrator(resticBin string, cfg *v1.Config, oplog *oplog.OpLog) *Orchestrator {
func NewOrchestrator(resticBin string, cfg *v1.Config, oplog *oplog.OpLog) (*Orchestrator, error) {
var o *Orchestrator
o = &Orchestrator{
config: cfg,
OpLog: oplog,
OpLog: oplog,
// repoPool created with a memory store to ensure the config is updated in an atomic operation with the repo pool's config value.
repoPool: newResticRepoPool(resticBin, &config.MemoryStore{Config: cfg}),
taskQueue: taskQueue{
Expand All @@ -47,22 +46,33 @@ func NewOrchestrator(resticBin string, cfg *v1.Config, oplog *oplog.OpLog) *Orch
},
},
}
return o
if err := o.ApplyConfig(cfg); err != nil {
return nil, fmt.Errorf("apply initial config: %w", err)
}
return o, nil
}

func (o *Orchestrator) ApplyConfig(cfg *v1.Config) error {
o.mu.Lock()
defer o.mu.Unlock()
o.config = cfg

zap.L().Debug("Applying config to orchestrator", zap.Any("config", cfg))
zap.L().Info("Applying config to orchestrator", zap.Any("config", cfg))

// Update the config provided to the repo pool.
if err := o.repoPool.configProvider.Update(cfg); err != nil {
return fmt.Errorf("failed to update repo pool config: %w", err)
}

o.taskQueue.Reset() // reset queued tasks, this may loose any ephemeral operations scheduled by RPC. Tasks in progress are not cancelled.
// reset queued tasks, this may loose any ephemeral operations scheduled by RPC. Tasks in progress are not cancelled.
removedTasks := o.taskQueue.Reset()
for _, t := range removedTasks {
if err := t.task.Cancel(v1.OperationStatus_STATUS_SYSTEM_CANCELLED); err != nil {
zap.L().Error("failed to cancel queued task", zap.String("task", t.task.Name()), zap.Error(err))
} else {
zap.L().Debug("queued task cancelled due to config change", zap.String("task", t.task.Name()))
}
}

// Requeue tasks that are affected by the config change.
for _, plan := range cfg.Plans {
Expand Down
10 changes: 5 additions & 5 deletions internal/orchestrator/scheduledtaskheap.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,16 @@ func (t *taskQueue) Push(task scheduledTask) {
}
}

func (t *taskQueue) Reset() {
func (t *taskQueue) Reset() []*scheduledTask {
t.mu.Lock()
defer t.mu.Unlock()

oldTasks := t.heap.tasks
t.heap.tasks = nil
if t.notify != nil {
t.notify <- struct{}{}
}
return oldTasks
}

func (t *taskQueue) Dequeue(ctx context.Context) *scheduledTask {
Expand All @@ -71,10 +73,9 @@ func (t *taskQueue) Dequeue(ctx context.Context) *scheduledTask {
}
t.mu.Unlock()
timer := time.NewTimer(first.runAt.Sub(t.curTime()))

t.mu.Lock()
select {
case <-timer.C:
t.mu.Lock()
if t.heap.Len() == 0 {
break
}
Expand All @@ -83,19 +84,18 @@ func (t *taskQueue) Dequeue(ctx context.Context) *scheduledTask {
// task is not yet ready to run
break
}

heap.Pop(&t.heap) // remove the task from the heap
t.mu.Unlock()
return first
case <-t.notify: // new task was added, loop again to ensure we have the earliest task.
t.mu.Lock()
if !timer.Stop() {
<-timer.C
}
case <-ctx.Done():
if !timer.Stop() {
<-timer.C
}
t.mu.Unlock()
return nil
}
}
Expand Down
Loading

0 comments on commit 0c818bb

Please sign in to comment.