Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: multihost synchronization #562

Draft
wants to merge 30 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
d417df1
initial multihost prototyping
garethgeorge Nov 16, 2024
8a24c40
basic connectivity established
garethgeorge Nov 29, 2024
1f38c2c
handshake successful
garethgeorge Dec 6, 2024
a6685da
more proto changes and starting to introduce 'sync manager' as a coor…
garethgeorge Dec 7, 2024
3fff743
progress managing remote config sync and syncmanager impl
garethgeorge Dec 8, 2024
9a4cfde
implement client portion of operation diff protocol
garethgeorge Dec 8, 2024
d7c9cf0
fix build errors
garethgeorge Dec 8, 2024
01ccafe
fix crash on startup
garethgeorge Dec 8, 2024
1941b85
move proto to new file
garethgeorge Dec 9, 2024
e6a905c
config sync works & operation sync partially functional
garethgeorge Dec 10, 2024
78e94d1
fix some sync engine bugs
garethgeorge Dec 10, 2024
0c6dc16
test coverage
garethgeorge Dec 11, 2024
63091e5
expand test coverage to cover operation sync
garethgeorge Dec 11, 2024
247f4f5
more progress on sync tests
garethgeorge Dec 11, 2024
f071c64
normalize sqlite schema
garethgeorge Dec 13, 2024
3902723
more sqlite schema changes
garethgeorge Dec 14, 2024
87cf299
implement backup utility
garethgeorge Dec 14, 2024
bc76d60
sync tests passing now with flows properly propagated
garethgeorge Dec 14, 2024
ae8b5a8
add repo provider param
garethgeorge Dec 14, 2024
0a68168
support querying repo for its config
garethgeorge Dec 15, 2024
4d89fc1
add repo guid property
garethgeorge Dec 15, 2024
f392a73
implement migration + proper guid handling
garethgeorge Dec 15, 2024
59d8d78
more progress adding repo guid field
garethgeorge Dec 17, 2024
462cddb
backrest builds, tests not passing
garethgeorge Dec 17, 2024
6c08963
more progress on GUID plumbing
garethgeorge Dec 17, 2024
d5829a8
start working on test fixes
garethgeorge Dec 17, 2024
3924482
cont'd test fixes
garethgeorge Dec 18, 2024
e0176b1
use repo guid in sync protocol
garethgeorge Dec 23, 2024
66ab9ea
more bug fixes and update UI to use GUIDs
garethgeorge Dec 26, 2024
668786c
fix build errors in test, more fixes needed
garethgeorge Dec 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
initial multihost prototyping
  • Loading branch information
garethgeorge committed Nov 28, 2024
commit d417df18728d4f0ce6980f25002e7471dda5e9cd
71 changes: 69 additions & 2 deletions cmd/backrest/backrest.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
v1 "github.com/garethgeorge/backrest/gen/go/v1"
"github.com/garethgeorge/backrest/gen/go/v1/v1connect"
"github.com/garethgeorge/backrest/internal/api"
syncengine "github.com/garethgeorge/backrest/internal/api/syncapi"
"github.com/garethgeorge/backrest/internal/auth"
"github.com/garethgeorge/backrest/internal/config"
"github.com/garethgeorge/backrest/internal/env"
Expand Down Expand Up @@ -124,18 +125,65 @@ func main() {
}()

// Create and serve the HTTP gateway
syncHandler := syncengine.NewBackrestSyncHandler(configStore, log)

var syncWg sync.WaitGroup
var syncCtxCancel context.CancelFunc

syncConfigHook := &configHookForSyncEngine{
store: configStore,
onChange: func(cfg *v1.Config) {
if syncCtxCancel != nil {
zap.L().Info("cancelling existing sync context due to config change, preparing to reinitialize sync engine")
syncCtxCancel()
}
syncWg.Wait()

// Start running a sync loop with this context for each distinct URI
reposByURI := make(map[string][]*v1.Repo)

for _, repo := range cfg.Repos {
if !syncengine.IsBackrestRemoteRepoURI(repo.Uri) {
continue
}
zap.L()
reposByURI[repo.Uri] = append(reposByURI[repo.Uri], repo)
}

if len(reposByURI) == 0 {
return
}

ctx, cancel := context.WithCancel(context.Background())
syncCtxCancel = cancel

for uri, repos := range reposByURI {
syncWg.Add(1)

zap.S().Infof("starting sync engine for URI %q serving %d repos", uri, len(repos))

go func(uri string, repos []*v1.Repo) {
defer syncWg.Done()
// Create a new sync engine for this URI
syncClient := syncengine.NewSyncClient(cfg.Instance, uri, log)
syncClient.RunSyncForRepos(ctx, repos)
}(uri, repos)
}
},
}

apiBackrestHandler := api.NewBackrestHandler(
configStore,
syncConfigHook,
orchestrator,
log,
logStore,
)

authenticator := auth.NewAuthenticator(getSecret(), configStore)
apiAuthenticationHandler := api.NewAuthenticationHandler(authenticator)

mux := http.NewServeMux()
mux.Handle(v1connect.NewAuthenticationHandler(apiAuthenticationHandler))
mux.Handle(v1connect.NewBackrestSyncServiceHandler(syncHandler))
backrestHandlerPath, backrestHandler := v1connect.NewBackrestHandler(apiBackrestHandler)
mux.Handle(backrestHandlerPath, auth.RequireAuthentication(backrestHandler, authenticator))
mux.Handle("/", webui.Handler())
Expand Down Expand Up @@ -294,3 +342,22 @@ func migrateBboltOplog(logstore oplog.OpStore) {
}
zap.S().Infof("migrated %d operations from old bbolt oplog to sqlite", count)
}

type configHookForSyncEngine struct {
store config.ConfigStore
onChange func(*v1.Config)
}

var _ config.ConfigStore = &configHookForSyncEngine{}

func (c *configHookForSyncEngine) Get() (*v1.Config, error) {
return c.store.Get()
}

func (c *configHookForSyncEngine) Update(cfg *v1.Config) error {
err := c.store.Update(cfg)
if err == nil {
c.onChange(cfg)
}
return err
}
Loading
Loading