Skip to content

Commit

Permalink
fix: make instance ID required field
Browse files Browse the repository at this point in the history
  • Loading branch information
garethgeorge committed Jul 2, 2024
1 parent 90e0656 commit 7c8ded2
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 59 deletions.
1 change: 1 addition & 0 deletions internal/oplog/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ var migrations = []func(*OpLog, *bbolt.Tx) error{
migration001FlowID,
migration002InstanceID,
migration003ResetLastValidated,
migration002InstanceID, // re-run migration002InstanceID to fix improperly set instance IDs
}

var CurrentVersion = int64(len(migrations))
Expand Down
86 changes: 32 additions & 54 deletions internal/oplog/oplog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func TestAddOperation(t *testing.T) {
UnixTimeStartMs: 1234,
RepoId: "testrepo",
PlanId: "testplan",
InstanceId: "testinstance",
Op: &v1.Operation_OperationBackup{},
},
wantErr: false,
Expand All @@ -60,6 +61,7 @@ func TestAddOperation(t *testing.T) {
UnixTimeStartMs: 1234,
RepoId: "testrepo",
PlanId: "testplan",
InstanceId: "testinstance",
Op: &v1.Operation_OperationIndexSnapshot{
OperationIndexSnapshot: &v1.OperationIndexSnapshot{
Snapshot: &v1.ResticSnapshot{
Expand All @@ -76,6 +78,7 @@ func TestAddOperation(t *testing.T) {
Id: 1,
RepoId: "testrepo",
PlanId: "testplan",
InstanceId: "testinstance",
UnixTimeStartMs: 1234,
Op: &v1.Operation_OperationBackup{},
},
Expand All @@ -99,6 +102,15 @@ func TestAddOperation(t *testing.T) {
},
wantErr: true,
},
{
name: "operation with instance only",
op: &v1.Operation{
UnixTimeStartMs: 1234,
InstanceId: "testinstance",
Op: &v1.Operation_OperationBackup{},
},
wantErr: true,
},
}

for _, tc := range tests {
Expand Down Expand Up @@ -129,21 +141,25 @@ func TestListOperation(t *testing.T) {
UnixTimeStartMs: 1234,
PlanId: "plan1",
RepoId: "repo1",
InstanceId: "instance1",
DisplayMessage: "op1",
Op: &v1.Operation_OperationBackup{},
},
{
UnixTimeStartMs: 1234,
PlanId: "plan1",
RepoId: "repo2",
InstanceId: "instance2",
DisplayMessage: "op2",
Op: &v1.Operation_OperationBackup{},
},
{
UnixTimeStartMs: 1234,
PlanId: "plan2",
RepoId: "repo2",
InstanceId: "instance3",
DisplayMessage: "op3",
FlowId: 943,
Op: &v1.Operation_OperationBackup{},
},
}
Expand All @@ -156,35 +172,36 @@ func TestListOperation(t *testing.T) {

tests := []struct {
name string
byPlan bool
byRepo bool
id string
query Query
expected []string
}{
{
name: "list plan1",
byPlan: true,
id: "plan1",
query: Query{PlanId: "plan1"},
expected: []string{"op1", "op2"},
},
{
name: "list plan2",
byPlan: true,
id: "plan2",
query: Query{PlanId: "plan2"},
expected: []string{"op3"},
},
{
name: "list repo1",
byRepo: true,
id: "repo1",
query: Query{RepoId: "repo1"},
expected: []string{"op1"},
},
{
name: "list repo2",
byRepo: true,
id: "repo2",
query: Query{RepoId: "repo2"},
expected: []string{"op2", "op3"},
},
{
name: "list flow 943",
query: Query{FlowId: 943},
expected: []string{
"op3",
},
},
}

for _, tc := range tests {
Expand All @@ -197,13 +214,7 @@ func TestListOperation(t *testing.T) {
ops = append(ops, op)
return nil
}
if tc.byPlan {
err = log.ForEach(Query{PlanId: tc.id}, indexutil.CollectAll(), collect)
} else if tc.byRepo {
err = log.ForEach(Query{RepoId: tc.id}, indexutil.CollectAll(), collect)
} else {
t.Fatalf("must specify byPlan or byRepo")
}
err = log.ForEach(tc.query, indexutil.CollectAll(), collect)
if err != nil {
t.Fatalf("error listing operations: %s", err)
}
Expand All @@ -215,42 +226,6 @@ func TestListOperation(t *testing.T) {
}
}

func TestListByFlowId(t *testing.T) {
t.Parallel()

log, err := NewOpLog(t.TempDir() + "/test.boltdb")
if err != nil {
t.Fatalf("error creating oplog: %s", err)
}
t.Cleanup(func() { log.Close() })

op := &v1.Operation{
UnixTimeStartMs: 1234,
PlanId: "plan1",
RepoId: "repo1",
FlowId: 1,
Op: &v1.Operation_OperationBackup{},
}

if err := log.Add(op); err != nil {
t.Fatalf("error adding operation: %s", err)
}

var ops []*v1.Operation
if err := log.ForEach(Query{FlowId: 1}, indexutil.CollectAll(), func(op *v1.Operation) error {
ops = append(ops, op)
return nil
}); err != nil {
t.Fatalf("error listing operations: %s", err)
}
if len(ops) != 1 {
t.Fatalf("want 1 operation, got %d", len(ops))
}
if ops[0].Id != op.Id {
t.Errorf("want operation ID %d, got %d", op.Id, ops[0].Id)
}
}

func TestBigIO(t *testing.T) {
t.Parallel()

Expand All @@ -267,6 +242,7 @@ func TestBigIO(t *testing.T) {
UnixTimeStartMs: 1234,
PlanId: "plan1",
RepoId: "repo1",
InstanceId: "instance1",
Op: &v1.Operation_OperationBackup{},
}); err != nil {
t.Fatalf("error adding operation: %s", err)
Expand All @@ -289,6 +265,7 @@ func TestIndexSnapshot(t *testing.T) {
UnixTimeStartMs: 1234,
PlanId: "plan1",
RepoId: "repo1",
InstanceId: "instance1",
SnapshotId: snapshotId,
Op: &v1.Operation_OperationIndexSnapshot{},
}
Expand Down Expand Up @@ -324,6 +301,7 @@ func TestUpdateOperation(t *testing.T) {
UnixTimeStartMs: 1234,
PlanId: "oldplan",
RepoId: "oldrepo",
InstanceId: "instance1",
SnapshotId: snapshotId,
}
if err := log.Add(op); err != nil {
Expand Down
5 changes: 3 additions & 2 deletions internal/orchestrator/tasks/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ import (
var NeverScheduledTask = ScheduledTask{}

const (
PlanForUnassociatedOperations = "_unassociated_"
PlanForSystemTasks = "_system_" // plan for system tasks e.g. garbage collection, prune, stats, etc.
PlanForUnassociatedOperations = "_unassociated_"
InstanceIDForUnassociatedOperations = "_unassociated_"
PlanForSystemTasks = "_system_" // plan for system tasks e.g. garbage collection, prune, stats, etc.

TaskPriorityStats = 0
TaskPriorityDefault = 1 << 1 // default priority
Expand Down
6 changes: 5 additions & 1 deletion internal/orchestrator/tasks/taskindexsnapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,11 @@ func planForSnapshot(snapshot *v1.ResticSnapshot) string {
}

func instanceIDForSnapshot(snapshot *v1.ResticSnapshot) string {
return repo.InstanceIDFromTags(snapshot.Tags)
id := repo.InstanceIDFromTags(snapshot.Tags)
if id != "" {
return id
}
return InstanceIDForUnassociatedOperations
}

// tryMigrate checks if the snapshots use the latest backrest tag set and migrates them if necessary.
Expand Down
3 changes: 1 addition & 2 deletions internal/protoutil/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (

v1 "github.com/garethgeorge/backrest/gen/go/v1"
"github.com/garethgeorge/backrest/pkg/restic"
"go.uber.org/zap"
)

// ValidateOperation verifies critical properties of the operation proto.
Expand All @@ -24,7 +23,7 @@ func ValidateOperation(op *v1.Operation) error {
return errors.New("operation.plan_id is required")
}
if op.InstanceId == "" {
zap.L().Warn("operation.instance_id should typically be set")
return errors.New("operation.instance_id is required")
}
if op.SnapshotId != "" {
if err := restic.ValidateSnapshotId(op.SnapshotId); err != nil {
Expand Down

0 comments on commit 7c8ded2

Please sign in to comment.