Skip to content

Commit

Permalink
chore: backend support for notifying operation status changes (gareth…
Browse files Browse the repository at this point in the history
…george#29)

Co-authored-by: Gareth <garethgeorge97@gmail.com>
  • Loading branch information
aschoettler and garethgeorge authored Dec 8, 2023
1 parent 8d40576 commit 019a0c0
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 26 deletions.
14 changes: 7 additions & 7 deletions internal/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,21 +161,21 @@ func (s *Server) ListSnapshotFiles(ctx context.Context, query *v1.ListSnapshotFi
func (s *Server) GetOperationEvents(_ *emptypb.Empty, stream v1.ResticUI_GetOperationEventsServer) error {
errorChan := make(chan error)
defer close(errorChan)
callback := func(eventType oplog.EventType, op *v1.Operation) {
callback := func(oldOp *v1.Operation, newOp *v1.Operation) {
var eventTypeMapped v1.OperationEventType
switch eventType {
case oplog.EventTypeOpCreated:
eventType := oplog.EventTypeUnknown
if oldOp == nil && newOp != nil {
eventTypeMapped = v1.OperationEventType_EVENT_CREATED
case oplog.EventTypeOpUpdated:
} else if oldOp != nil && newOp != nil {
eventTypeMapped = v1.OperationEventType_EVENT_UPDATED
default:
} else {
zap.L().Error("Unknown event type", zap.Int("eventType", int(eventType)))
eventTypeMapped = v1.OperationEventType_EVENT_UNKNOWN
return
}

event := &v1.OperationEvent{
Type: eventTypeMapped,
Operation: op,
Operation: newOp,
}

go func() {
Expand Down
38 changes: 20 additions & 18 deletions internal/oplog/oplog.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type OpLog struct {
db *bolt.DB

subscribersMu sync.RWMutex
subscribers []*func(EventType, *v1.Operation)
subscribers []*func(*v1.Operation, *v1.Operation)
nextId atomic.Int64
}

Expand Down Expand Up @@ -138,7 +138,7 @@ func (o *OpLog) Add(op *v1.Operation) error {
return nil
})
if err == nil {
o.notifyHelper(EventTypeOpCreated, op)
o.notifyHelper(nil, op)
}
return err
}
Expand All @@ -157,7 +157,7 @@ func (o *OpLog) BulkAdd(ops []*v1.Operation) error {
})
if err == nil {
for _, op := range ops {
o.notifyHelper(EventTypeOpCreated, op)
o.notifyHelper(nil, op)
}
}
return err
Expand All @@ -167,9 +167,11 @@ func (o *OpLog) Update(op *v1.Operation) error {
if op.Id == 0 {
return errors.New("operation does not have an ID, OpLog.Update expects operation with an ID")
}

var oldOp *v1.Operation
err := o.db.Update(func(tx *bolt.Tx) error {
if err := o.deleteOperationHelper(tx, op.Id); err != nil {
var err error
oldOp, err = o.deleteOperationHelper(tx, op.Id)
if err != nil {
return fmt.Errorf("deleting existing value prior to update: %w", err)
}
if err := o.addOperationHelper(tx, op); err != nil {
Expand All @@ -178,7 +180,7 @@ func (o *OpLog) Update(op *v1.Operation) error {
return nil
})
if err == nil {
o.notifyHelper(EventTypeOpUpdated, op)
o.notifyHelper(oldOp, op)
}
return err
}
Expand All @@ -189,7 +191,7 @@ func (o *OpLog) Delete(id int64) error {
if val == nil {
return ErrNotExist
}
if err := o.deleteOperationHelper(tx, id); err != nil {
if _, err := o.deleteOperationHelper(tx, id); err != nil {
return fmt.Errorf("deleting operation %v: %w", id, err)
}

Expand All @@ -204,11 +206,11 @@ func (o *OpLog) Delete(id int64) error {
return err
}

func (o *OpLog) notifyHelper(eventType EventType, op *v1.Operation) {
func (o *OpLog) notifyHelper(old *v1.Operation, new *v1.Operation) {
o.subscribersMu.RLock()
defer o.subscribersMu.RUnlock()
for _, sub := range o.subscribers {
(*sub)(eventType, op)
(*sub)(old, new)
}
}

Expand Down Expand Up @@ -280,37 +282,37 @@ func (o *OpLog) addOperationHelper(tx *bolt.Tx, op *v1.Operation) error {
return nil
}

func (o *OpLog) deleteOperationHelper(tx *bolt.Tx, id int64) error {
func (o *OpLog) deleteOperationHelper(tx *bolt.Tx, id int64) (*v1.Operation, error) {
b := tx.Bucket(OpLogBucket)

prevValue, err := o.getOperationHelper(b, id)
if err != nil {
return fmt.Errorf("getting operation %v: %w", id, err)
return nil, fmt.Errorf("getting operation %v: %w", id, err)
}

if prevValue.PlanId != "" {
if err := indexutil.IndexRemoveByteValue(tx.Bucket(PlanIndexBucket), []byte(prevValue.PlanId), id); err != nil {
return fmt.Errorf("removing operation %v from plan index: %w", id, err)
return nil, fmt.Errorf("removing operation %v from plan index: %w", id, err)
}
}

if prevValue.RepoId != "" {
if err := indexutil.IndexRemoveByteValue(tx.Bucket(RepoIndexBucket), []byte(prevValue.RepoId), id); err != nil {
return fmt.Errorf("removing operation %v from repo index: %w", id, err)
return nil, fmt.Errorf("removing operation %v from repo index: %w", id, err)
}
}

if prevValue.SnapshotId != "" {
if err := indexutil.IndexRemoveByteValue(tx.Bucket(SnapshotIndexBucket), []byte(prevValue.SnapshotId), id); err != nil {
return fmt.Errorf("removing operation %v from snapshot index: %w", id, err)
return nil, fmt.Errorf("removing operation %v from snapshot index: %w", id, err)
}
}

if err := b.Delete(serializationutil.Itob(id)); err != nil {
return fmt.Errorf("deleting operation %v from bucket: %w", id, err)
return nil, fmt.Errorf("deleting operation %v from bucket: %w", id, err)
}

return nil
return prevValue, nil
}

func (o *OpLog) Get(id int64) (*v1.Operation, error) {
Expand Down Expand Up @@ -382,13 +384,13 @@ func (o *OpLog) ForAll(do func(op *v1.Operation) error) error {
return nil
}

func (o *OpLog) Subscribe(callback *func(EventType, *v1.Operation)) {
func (o *OpLog) Subscribe(callback *func(*v1.Operation, *v1.Operation)) {
o.subscribersMu.Lock()
defer o.subscribersMu.Unlock()
o.subscribers = append(o.subscribers, callback)
}

func (o *OpLog) Unsubscribe(callback *func(EventType, *v1.Operation)) {
func (o *OpLog) Unsubscribe(callback *func(*v1.Operation, *v1.Operation)) {
o.subscribersMu.Lock()
defer o.subscribersMu.Unlock()
subs := o.subscribers
Expand Down
1 change: 0 additions & 1 deletion internal/oplog/oplog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ func TestAddOperation(t *testing.T) {

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
if err := log.Add(tc.op); (err != nil) != tc.wantErr {
t.Errorf("Add() error = %v, wantErr %v", err, tc.wantErr)
}
Expand Down

0 comments on commit 019a0c0

Please sign in to comment.