Skip to content

Commit

Permalink
fix: spawn goroutine to update oplog with progress during backup/restore
Browse files Browse the repository at this point in the history
  • Loading branch information
garethgeorge committed Apr 8, 2024
1 parent ffad2b0 commit eab1c1b
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 9 deletions.
16 changes: 11 additions & 5 deletions internal/orchestrator/taskbackup.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"slices"
"sync"
"time"

v1 "github.com/garethgeorge/backrest/gen/go/v1"
Expand Down Expand Up @@ -113,10 +114,11 @@ func backupHelper(ctx context.Context, t Task, orchestrator *Orchestrator, plan
Task: t.Name(),
})

var sendWg sync.WaitGroup
lastSent := time.Now() // debounce progress updates, these can endup being very frequent.
var lastFiles []string
summary, err := repo.Backup(ctx, plan, func(entry *restic.BackupProgressEntry) {

sendWg.Wait()
if entry.MessageType == "status" {
// prevents flickering output when a status entry omits the CurrentFiles property. Largely cosmetic.
if len(entry.CurrentFiles) == 0 {
Expand Down Expand Up @@ -144,14 +146,18 @@ func backupHelper(ctx context.Context, t Task, orchestrator *Orchestrator, plan
zap.S().Warnf("unexpected message type %q in backup progress entry", entry.MessageType)
}

if time.Since(lastSent) < 500*time.Millisecond {
if time.Since(lastSent) < 1*time.Second {
return
}
lastSent = time.Now()

if err := orchestrator.OpLog.Update(op); err != nil {
zap.S().Errorf("failed to update oplog with progress for backup: %v", err)
}
sendWg.Add(1)
go func() {
if err := orchestrator.OpLog.Update(op); err != nil {
zap.S().Errorf("failed to update oplog with progress for backup: %v", err)
}
sendWg.Done()
}()
})

vars := hook.HookVars{
Expand Down
17 changes: 13 additions & 4 deletions internal/orchestrator/taskrestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"sync"
"time"

v1 "github.com/garethgeorge/backrest/gen/go/v1"
Expand Down Expand Up @@ -81,18 +82,26 @@ func (t *RestoreTask) Run(ctx context.Context) error {
return fmt.Errorf("couldn't get repo %q: %w", t.restoreOpts.RepoId, err)
}

var sendWg sync.WaitGroup
lastSent := time.Now() // debounce progress updates, these can endup being very frequent.
summary, err := repo.Restore(ctx, t.restoreOpts.SnapshotId, t.restoreOpts.Path, t.restoreOpts.Target, func(entry *v1.RestoreProgressEntry) {
if time.Since(lastSent) < 250*time.Millisecond {
sendWg.Wait()
if time.Since(lastSent) < 1*time.Second {
return
}
lastSent = time.Now()

zap.S().Infof("restore progress: %v", entry)

forgetOp.OperationRestore.Status = entry
if err := t.orch.OpLog.Update(op); err != nil {
zap.S().Errorf("failed to update oplog with progress for restore: %v", err)
}

sendWg.Add(1)
go func() {
if err := t.orch.OpLog.Update(op); err != nil {
zap.S().Errorf("failed to update oplog with progress for restore: %v", err)
}
sendWg.Done()
}()
})
if err != nil {
return fmt.Errorf("restore failed: %w", err)
Expand Down

0 comments on commit eab1c1b

Please sign in to comment.