Skip to content

Commit

Permalink
feat: implement backup scheduling in orchestrator
Browse files Browse the repository at this point in the history
  • Loading branch information
garethgeorge committed Nov 15, 2023
1 parent dd9142c commit eadb1a8
Show file tree
Hide file tree
Showing 19 changed files with 433 additions and 104 deletions.
21 changes: 18 additions & 3 deletions cmd/resticui/resticui.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,22 @@ func main() {
}
defer oplog.Close()

orchestrator, err := orchestrator.NewOrchestrator(config.Default, oplog)
if err != nil {
zap.S().Fatalf("Error creating orchestrator: %v", err)
}

// Start orchestration loop.
go func() {
err := orchestrator.Run(ctx)
if err != nil && !errors.Is(err, context.Canceled) {
zap.S().Fatal("Orchestrator loop exited with error: ", zap.Error(err))
cancel() // cancel the context when the orchestrator exits (e.g. on fatal error)
}
}()

apiServer := api.NewServer(
orchestrator.NewOrchestrator(config.Default), // TODO: eliminate default config
orchestrator, // TODO: eliminate default config
oplog,
)

Expand Down Expand Up @@ -78,9 +92,9 @@ func main() {
server.Shutdown(context.Background())
}()
if err := server.ListenAndServe(); !errors.Is(err, http.ErrServerClosed) {
zap.S().Error("Error starting server", zap.Error(err))
zap.L().Error("Error starting server", zap.Error(err))
}
zap.S().Info("HTTP gateway shutdown")
zap.L().Info("HTTP gateway shutdown")
cancel() // cancel the context when the HTTP server exits (e.g. on fatal error)
}()

Expand All @@ -92,6 +106,7 @@ func init() {
if os.Getenv("DEBUG") != "" {
c := zap.NewDevelopmentEncoderConfig()
c.EncodeLevel = zapcore.CapitalColorLevelEncoder
c.EncodeTime = zapcore.ISO8601TimeEncoder
l := zap.New(zapcore.NewCore(
zapcore.NewConsoleEncoder(c),
zapcore.AddSync(colorable.NewColorableStdout()),
Expand Down
85 changes: 43 additions & 42 deletions gen/go/v1/operations.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 15 additions & 15 deletions gen/go/v1/restic.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ require (
github.com/golang/snappy v0.0.4 // indirect
github.com/google/flatbuffers v23.5.26+incompatible // indirect
github.com/google/orderedcode v0.0.1 // indirect
github.com/gorhill/cronexpr v0.0.0-20180427100037-88b0669f7d75 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/klauspost/compress v1.17.2 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ github.com/google/orderedcode v0.0.1/go.mod h1:iVyU4/qPKHY5h/wSd6rZZCDcLJNxiWO6d
github.com/google/renameio v1.0.1 h1:Lh/jXZmvZxb0BBeSY5VKEfidcbcbenKjZFzM/q0fSeU=
github.com/google/renameio v1.0.1/go.mod h1:t/HQoYBZSsWSNK35C6CO/TpPLDVWvxOHboWUAweKUpk=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorhill/cronexpr v0.0.0-20180427100037-88b0669f7d75 h1:f0n1xnMSmBLzVfsMMvriDyA75NB/oBgILX2GcHXIQzY=
github.com/gorhill/cronexpr v0.0.0-20180427100037-88b0669f7d75/go.mod h1:g2644b03hfBX9Ov0ZBDgXXens4rxSxmqFBbhvKv2yVA=
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.1 h1:HcUWd006luQPljE73d5sk+/VgYPGUReEVz2y1/qylwY=
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.1/go.mod h1:w9Y7gY31krpLmrVU5ZPG9H7l9fZuRu5/3R3S3FMtVQ4=
github.com/grpc-ecosystem/grpc-gateway v1.16.0 h1:gmcG1KaJ57LophUzW0Hy8NmPhnMZb4M0+kPpLofRdBo=
Expand Down
23 changes: 4 additions & 19 deletions internal/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"errors"
"fmt"
"os"
"sync"
"sync/atomic"

"github.com/garethgeorge/resticui/gen/go/types"
v1 "github.com/garethgeorge/resticui/gen/go/v1"
Expand All @@ -23,17 +21,12 @@ type Server struct {
*v1.UnimplementedResticUIServer
orchestrator *orchestrator.Orchestrator
oplog *oplog.OpLog

reqId atomic.Uint64
eventChannelsMu sync.Mutex
eventChannels map[uint64]chan *v1.Event
}

var _ v1.ResticUIServer = &Server{}

func NewServer(orchestrator *orchestrator.Orchestrator, oplog *oplog.OpLog) *Server {
s := &Server{
eventChannels: make(map[uint64]chan *v1.Event),
orchestrator: orchestrator,
}

Expand Down Expand Up @@ -84,11 +77,13 @@ func (s *Server) AddRepo(ctx context.Context, repo *v1.Repo) (*v1.Config, error)
return nil, fmt.Errorf("failed to init repo: %w", err)
}

zap.S().Debug("Updating config")
zap.L().Debug("Updating config")
if err := config.Default.Update(c); err != nil {
return nil, fmt.Errorf("failed to update config: %w", err)
}

s.orchestrator.ApplyConfig(c)

return c, nil
}

Expand Down Expand Up @@ -139,7 +134,7 @@ func (s *Server) GetOperationEvents(_ *emptypb.Empty, stream v1.ResticUI_GetOper
case oplog.EventTypeOpUpdated:
eventTypeMapped = v1.OperationEventType_EVENT_UPDATED
default:
zap.S().Error("Unknown event type", zap.Int("eventType", int(eventType)))
zap.L().Error("Unknown event type", zap.Int("eventType", int(eventType)))
eventTypeMapped = v1.OperationEventType_EVENT_UNKNOWN
}

Expand Down Expand Up @@ -179,13 +174,3 @@ func (s *Server) PathAutocomplete(ctx context.Context, path *types.StringValue)
return &types.StringList{Values: paths}, nil
}


// PublishEvent publishes an event to all GetEvents streams. It is effectively a multicast.
func (s *Server) PublishEvent(event *v1.Event) {
zap.S().Debug("Publishing event", zap.Any("event", event))
s.eventChannelsMu.Lock()
defer s.eventChannelsMu.Unlock()
for _, ch := range s.eventChannels {
ch <- event
}
}
27 changes: 27 additions & 0 deletions internal/config/memstore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package config

import (
"sync"

v1 "github.com/garethgeorge/resticui/gen/go/v1"
)

type MemoryStore struct {
mu sync.Mutex
Config *v1.Config
}

var _ ConfigStore = &MemoryStore{}

func (c *MemoryStore) Get() (*v1.Config, error) {
c.mu.Lock()
defer c.mu.Unlock()
return c.Config, nil
}

func (c *MemoryStore) Update(config *v1.Config) error {
c.mu.Lock()
defer c.mu.Unlock()
c.Config = config
return nil
}
Loading

0 comments on commit eadb1a8

Please sign in to comment.