Skip to content

Commit

Permalink
feat: operations IDs are ordered by operation timestamp
Browse files Browse the repository at this point in the history
  • Loading branch information
garethgeorge committed Nov 23, 2023
1 parent 338b6f2 commit a1ed6f9
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 29 deletions.
19 changes: 11 additions & 8 deletions internal/oplog/oplog.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,10 @@ func NewOpLog(databasePath string) (*OpLog, error) {
return nil, fmt.Errorf("error opening database: %s", err)
}

o := &OpLog{db: db}
o := &OpLog{
db: db,
}
o.nextId.Store(1)

if err := db.Update(func(tx *bolt.Tx) error {
// Create the buckets if they don't exist
Expand Down Expand Up @@ -194,15 +197,15 @@ func (o *OpLog) getOperationHelper(b *bolt.Bucket, id int64) (*v1.Operation, err

func (o *OpLog) addOperationHelper(tx *bolt.Tx, op *v1.Operation) error {
b := tx.Bucket(OpLogBucket)

if op.Id == 0 {
// Create a unique ID sorted based on the start time in milliseconds and
// a counter to ensure uniqueness in the case of multiple operations
// starting at the same time.
op.Id = op.UnixTimeStartMs<<20 | (o.nextId.Add(1) & (1<<20 - 1))
if op.Id < 0 {
return fmt.Errorf("overflow in operation ID generation")
seq, err := b.NextSequence()
if err != nil {
return fmt.Errorf("error getting next sequence: %w", err)
}
if op.UnixTimeStartMs == 0 {
return fmt.Errorf("operation must have a start time")
}
op.Id = op.UnixTimeStartMs<<20 | int64(seq&(1<<20-1))
}

op.SnapshotId = NormalizeSnapshotId(op.SnapshotId)
Expand Down
34 changes: 25 additions & 9 deletions internal/orchestrator/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,21 @@ func (t *ScheduledBackupTask) Name() string {
}

func (t *ScheduledBackupTask) Next(now time.Time) *time.Time {
if ops, err := t.orchestrator.OpLog.GetByPlan(t.plan.Id, indexutil.CollectLastN(10)); err == nil {
var lastBackupOp *v1.Operation
for _, op := range ops {
if _, ok := op.Op.(*v1.Operation_OperationBackup); ok {
lastBackupOp = op
}
}

if lastBackupOp != nil {
now = time.Unix(0, lastBackupOp.UnixTimeEndMs*int64(time.Millisecond))
}
} else {
zap.S().Errorf("error getting last operation for plan %q when computing backup schedule: %v", t.plan.Id, err)
}

next := t.schedule.Next(now)
return &next
}
Expand Down Expand Up @@ -174,6 +189,7 @@ func indexSnapshotsHelper(ctx context.Context, orchestrator *Orchestrator, plan
UnixTimeStartMs: snapshotProto.UnixTimeMs,
UnixTimeEndMs: snapshotProto.UnixTimeMs,
Status: v1.OperationStatus_STATUS_SUCCESS,
SnapshotId: snapshotProto.Id,
Op: &v1.Operation_OperationIndexSnapshot{
OperationIndexSnapshot: &v1.OperationIndexSnapshot{
Snapshot: snapshotProto,
Expand All @@ -196,15 +212,6 @@ func indexSnapshotsHelper(ctx context.Context, orchestrator *Orchestrator, plan
return err
}

func containsSnapshotOperation(ops []*v1.Operation) bool {
for _, op := range ops {
if _, ok := op.Op.(*v1.Operation_OperationIndexSnapshot); ok {
return true
}
}
return false
}

// WithOperation is a utility that creates an operation to track the function's execution.
// timestamps are automatically added and the status is automatically updated if an error occurs.
func WithOperation(oplog *oplog.OpLog, op *v1.Operation, do func() error) error {
Expand Down Expand Up @@ -233,3 +240,12 @@ func curTimeMillis() int64 {
t := time.Now()
return t.Unix()*1000 + int64(t.Nanosecond()/1000000)
}

func containsSnapshotOperation(ops []*v1.Operation) bool {
for _, op := range ops {
if _, ok := op.Op.(*v1.Operation_OperationIndexSnapshot); ok {
return true
}
}
return false
}
15 changes: 10 additions & 5 deletions proto/v1/operations.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,18 @@ message OperationList {

message Operation {
int64 id = 1;
string repo_id = 2; // repo id if associated with a repo (always true)
string plan_id = 3; // plan id if associated with a plan (always true)
string snapshot_id = 8; // snapshot id if associated with a snapshot.
// repo id if associated with a repo (always true)
string repo_id = 2;
// plan id if associated with a plan (always true)
string plan_id = 3;
// snapshot id if associated with a snapshot.
string snapshot_id = 8;
OperationStatus status = 4;
int64 unix_time_start_ms = 5;
// unix time in milliseconds of the operation's creation (ID is derived from this)
int64 unix_time_start_ms = 5;
int64 unix_time_end_ms = 6;
string display_message = 7; // human readable context message (if any)
// human readable context message, typically an error message.
string display_message = 7;

oneof op {
OperationBackup operation_backup = 100;
Expand Down
14 changes: 7 additions & 7 deletions webui/src/components/OperationList.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -100,17 +100,17 @@ export const OperationRow = ({
color = "blue";
}

let opType = "Message";
if (operation.operationBackup) {
opType = "Backup";
} else if (operation.operationIndexSnapshot) {
opType = "Snapshot";
}

if (
operation.displayMessage &&
operation.status === OperationStatus.STATUS_ERROR
) {
let opType = "Message";
if (operation.operationBackup) {
opType = "Backup";
} else if (operation.operationIndexSnapshot) {
opType = "Snapshot";
}

return (
<List.Item>
<List.Item.Meta
Expand Down

0 comments on commit a1ed6f9

Please sign in to comment.