Skip to content

Commit

Permalink
feat: present list of operations on plan view
Browse files Browse the repository at this point in the history
  • Loading branch information
garethgeorge committed Nov 16, 2023
1 parent 6a46116 commit 6491dbe
Show file tree
Hide file tree
Showing 14 changed files with 474 additions and 121 deletions.
10 changes: 6 additions & 4 deletions internal/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ func (s *Server) GetOperationEvents(_ *emptypb.Empty, stream v1.ResticUI_GetOper
errorChan := make(chan error)
defer close(errorChan)
callback := func(eventType oplog.EventType, op *v1.Operation) {
zap.S().Debug("Sending an event")
var eventTypeMapped v1.OperationEventType
switch eventType {
case oplog.EventTypeOpCreated:
Expand All @@ -151,13 +152,14 @@ func (s *Server) GetOperationEvents(_ *emptypb.Empty, stream v1.ResticUI_GetOper
Operation: op,
}

if err := stream.Send(event); err != nil {
errorChan <- fmt.Errorf("failed to send event: %w", err)
}
go func() {
if err := stream.Send(event); err != nil {
errorChan <- fmt.Errorf("failed to send event: %w", err)
}
}()
}
s.oplog.Subscribe(&callback)
defer s.oplog.Unsubscribe(&callback)

select {
case <-stream.Context().Done():
return nil
Expand Down
4 changes: 2 additions & 2 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ var ErrConfigNotFound = fmt.Errorf("config not found")
var configDirFlag = flag.String("config_dir", "", "The directory to store the config file")

var Default ConfigStore = &CachingValidatingStore{
ConfigStore: &YamlFileStore{
Path: path.Join(configDir(*configDirFlag), "config.yaml"),
ConfigStore: &JsonFileStore{
Path: path.Join(configDir(*configDirFlag), "config.json"),
},
}

Expand Down
41 changes: 4 additions & 37 deletions internal/config/yamlstore.go → internal/config/jsonstore.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package config

import (
"encoding/json"
"errors"
"fmt"
"os"
Expand All @@ -11,17 +10,16 @@ import (
v1 "github.com/garethgeorge/resticui/gen/go/v1"
"github.com/google/renameio"
"google.golang.org/protobuf/encoding/protojson"
yaml "gopkg.in/yaml.v3"
)

type YamlFileStore struct {
type JsonFileStore struct {
Path string
mu sync.Mutex
}

var _ ConfigStore = &YamlFileStore{}
var _ ConfigStore = &JsonFileStore{}

func (f *YamlFileStore) Get() (*v1.Config, error) {
func (f *JsonFileStore) Get() (*v1.Config, error) {
f.mu.Lock()
defer f.mu.Unlock()

Expand All @@ -33,12 +31,6 @@ func (f *YamlFileStore) Get() (*v1.Config, error) {
return nil, fmt.Errorf("failed to read config file: %w", err)
}


data, err = yamlToJson(data)
if err != nil {
return nil, fmt.Errorf("failed to parse YAML config: %w", err)
}

var config v1.Config

if err = protojson.Unmarshal(data, &config); err != nil {
Expand All @@ -52,7 +44,7 @@ func (f *YamlFileStore) Get() (*v1.Config, error) {
return &config, nil
}

func (f *YamlFileStore) Update(config *v1.Config) error {
func (f *JsonFileStore) Update(config *v1.Config) error {
f.mu.Lock()
defer f.mu.Unlock()

Expand All @@ -65,11 +57,6 @@ func (f *YamlFileStore) Update(config *v1.Config) error {
return fmt.Errorf("failed to marshal config: %w", err)
}

data, err = jsonToYaml(data)
if err != nil {
return fmt.Errorf("failed to convert config to yaml: %w", err)
}

err = os.MkdirAll(filepath.Dir(f.Path), 0755)
if err != nil {
return fmt.Errorf("failed to create config directory: %w", err)
Expand All @@ -82,23 +69,3 @@ func (f *YamlFileStore) Update(config *v1.Config) error {

return nil
}

func jsonToYaml(data []byte) ([]byte, error) {
var config interface{}
err := json.Unmarshal(data, &config)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal config: %w", err)
}

return yaml.Marshal(config)
}

func yamlToJson(data []byte) ([]byte, error) {
var config interface{}
err := yaml.Unmarshal(data, &config)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal config: %w", err)
}

return json.Marshal(config)
}
6 changes: 6 additions & 0 deletions internal/config/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ func validatePlan(plan *v1.Plan, repos map[string]*v1.Repo) error {
err = multierror.Append(err, fmt.Errorf("path is required"))
}

for idx, p := range plan.Paths {
if p == "" {
err = multierror.Append(err, fmt.Errorf("path[%d] cannot be empty", idx))
}
}

if plan.Repo == "" {
err = multierror.Append(err,fmt.Errorf("repo is required"))
}
Expand Down
48 changes: 44 additions & 4 deletions internal/database/oplog/oplog.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/garethgeorge/resticui/internal/database/indexutil"
"github.com/garethgeorge/resticui/internal/database/serializationutil"
bolt "go.etcd.io/bbolt"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
)

Expand Down Expand Up @@ -57,8 +58,40 @@ func NewOpLog(databasePath string) (*OpLog, error) {
SystemBucket, OpLogBucket, RepoIndexBucket, PlanIndexBucket, IndexedSnapshotsSetBucket,
} {
if _, err := tx.CreateBucketIfNotExists(bucket); err != nil {
return fmt.Errorf("error creating bucket %s: %s", string(bucket), err)
return fmt.Errorf("creating bucket %s: %s", string(bucket), err)
}

// Validate the operation log on startup.
sysBucket := tx.Bucket(SystemBucket)
opLogBucket := tx.Bucket(OpLogBucket)
c := opLogBucket.Cursor()
if lastValidated := sysBucket.Get([]byte("last_validated")); lastValidated != nil {
c.Seek(lastValidated)
}
for k, v := c.First(); k != nil; k, v = c.Next() {
op := &v1.Operation{}
if err := proto.Unmarshal(v, op); err != nil {
zap.L().Error("error unmarshalling operation, there may be corruption in the oplog", zap.Error(err))
continue
}
if op.Status == v1.OperationStatus_STATUS_INPROGRESS {
op.Status = v1.OperationStatus_STATUS_ERROR
op.DisplayMessage = "Operation timeout."
bytes, err := proto.Marshal(op)
if err != nil {
return fmt.Errorf("marshalling operation: %w", err)
}
if err := opLogBucket.Put(k, bytes); err != nil {
return fmt.Errorf("putting operation into bucket: %w", err)
}
}
}
if lastValidated, _ := c.Last(); lastValidated != nil {
if err := sysBucket.Put([]byte("last_validated"), lastValidated); err != nil {
return fmt.Errorf("checkpointing last_validated key: %w", err)
}
}

}
return nil
}); err != nil {
Expand Down Expand Up @@ -88,19 +121,24 @@ func (o *OpLog) Add(op *v1.Operation) error {
if err == nil {
o.notifyHelper(EventTypeOpCreated, op)
}

return err
}

func (o *OpLog) BulkAdd(ops []*v1.Operation) {
o.db.Update(func(tx *bolt.Tx) error {
func (o *OpLog) BulkAdd(ops []*v1.Operation) error {
err := o.db.Update(func(tx *bolt.Tx) error {
for _, op := range ops {
if err := o.addOperationHelper(tx, op); err != nil {
return err
}
}
return nil
})
if err == nil {
for _, op := range ops {
o.notifyHelper(EventTypeOpCreated, op)
}
}
return err
}

func (o *OpLog) addOperationHelper(tx *bolt.Tx, op *v1.Operation) error {
Expand Down Expand Up @@ -239,6 +277,7 @@ func (o *OpLog) GetByRepo(repoId string, filter Filter) ([]*v1.Operation, error)
var ops []*v1.Operation
if err := o.db.View(func(tx *bolt.Tx) error {
ids := indexutil.IndexSearchByteValue(tx.Bucket(RepoIndexBucket), []byte(repoId)).ToSlice()
ids = filter(ids)

b := tx.Bucket(OpLogBucket)
for _, id := range ids {
Expand All @@ -260,6 +299,7 @@ func (o *OpLog) GetByPlan(planId string, filter Filter) ([]*v1.Operation, error)
var ops []*v1.Operation
if err := o.db.View(func(tx *bolt.Tx) error {
ids := indexutil.IndexSearchByteValue(tx.Bucket(PlanIndexBucket), []byte(planId)).ToSlice()
ids = filter(ids)

b := tx.Bucket(OpLogBucket)
for _, id := range ids {
Expand Down
25 changes: 21 additions & 4 deletions internal/orchestrator/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func backupHelper(ctx context.Context, orchestrator *Orchestrator, plan *v1.Plan
op := &v1.Operation{
PlanId: plan.Id,
RepoId: plan.Repo,
UnixTimeStartMs: time.Now().Unix(),
UnixTimeStartMs: curTimeMillis(),
Status: v1.OperationStatus_STATUS_INPROGRESS,
Op: backupOp,
}
Expand All @@ -105,16 +105,28 @@ func backupHelper(ctx context.Context, orchestrator *Orchestrator, plan *v1.Plan
return fmt.Errorf("failed to get repo %q: %w", plan.Repo, err)
}

if _, err := repo.Backup(ctx, plan, func(entry *restic.BackupProgressEntry) {
lastSent := time.Now() // debounce progress updates, these can endup being very frequent.
summary, err := repo.Backup(ctx, plan, func(entry *restic.BackupProgressEntry) {
if time.Since(lastSent) < 200 * time.Millisecond {
return
}
lastSent = time.Now()

backupOp.OperationBackup.LastStatus = entry.ToProto()
if err := orchestrator.oplog.Update(op); err != nil {
zap.S().Errorf("failed to update oplog with progress for backup: %v", err)
}
zap.L().Debug("Backup progress", zap.Float64("progress", entry.PercentDone))
}); err != nil {
})
if err != nil {
return fmt.Errorf("failed to backup repo %q: %w", plan.Repo, err)
}

backupOp.OperationBackup.LastStatus = summary.ToProto()
if err := orchestrator.oplog.Update(op); err != nil {
return fmt.Errorf("update oplog with summary for backup: %v", err)
}

zap.L().Info("Backup complete", zap.String("plan", plan.Id))
return nil
})
Expand All @@ -134,12 +146,17 @@ func WithOperation(oplog *oplog.OpLog, op *v1.Operation, do func() error) error
op.Status = v1.OperationStatus_STATUS_ERROR
op.DisplayMessage = err.Error()
}
op.UnixTimeEndMs = time.Now().Unix()
op.UnixTimeEndMs = curTimeMillis()
if op.Status == v1.OperationStatus_STATUS_INPROGRESS {
op.Status = v1.OperationStatus_STATUS_SUCCESS
}
if e := oplog.Update(op); err != nil {
return multierror.Append(err, fmt.Errorf("failed to update operation in oplog: %w", e))
}
return err
}

func curTimeMillis() int64 {
t := time.Now()
return t.Unix() * 1000 + int64(t.Nanosecond() / 1000000)
}
Loading

0 comments on commit 6491dbe

Please sign in to comment.