Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: multihost synchronization #562

Draft
wants to merge 30 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
d417df1
initial multihost prototyping
garethgeorge Nov 16, 2024
8a24c40
basic connectivity established
garethgeorge Nov 29, 2024
1f38c2c
handshake successful
garethgeorge Dec 6, 2024
a6685da
more proto changes and starting to introduce 'sync manager' as a coor…
garethgeorge Dec 7, 2024
3fff743
progress managing remote config sync and syncmanager impl
garethgeorge Dec 8, 2024
9a4cfde
implement client portion of operation diff protocol
garethgeorge Dec 8, 2024
d7c9cf0
fix build errors
garethgeorge Dec 8, 2024
01ccafe
fix crash on startup
garethgeorge Dec 8, 2024
1941b85
move proto to new file
garethgeorge Dec 9, 2024
e6a905c
config sync works & operation sync partially functional
garethgeorge Dec 10, 2024
78e94d1
fix some sync engine bugs
garethgeorge Dec 10, 2024
0c6dc16
test coverage
garethgeorge Dec 11, 2024
63091e5
expand test coverage to cover operation sync
garethgeorge Dec 11, 2024
247f4f5
more progress on sync tests
garethgeorge Dec 11, 2024
f071c64
normalize sqlite schema
garethgeorge Dec 13, 2024
3902723
more sqlite schema changes
garethgeorge Dec 14, 2024
87cf299
implement backup utility
garethgeorge Dec 14, 2024
bc76d60
sync tests passing now with flows properly propagated
garethgeorge Dec 14, 2024
ae8b5a8
add repo provider param
garethgeorge Dec 14, 2024
0a68168
support querying repo for its config
garethgeorge Dec 15, 2024
4d89fc1
add repo guid property
garethgeorge Dec 15, 2024
f392a73
implement migration + proper guid handling
garethgeorge Dec 15, 2024
59d8d78
more progress adding repo guid field
garethgeorge Dec 17, 2024
462cddb
backrest builds, tests not passing
garethgeorge Dec 17, 2024
6c08963
more progress on GUID plumbing
garethgeorge Dec 17, 2024
d5829a8
start working on test fixes
garethgeorge Dec 17, 2024
3924482
cont'd test fixes
garethgeorge Dec 18, 2024
e0176b1
use repo guid in sync protocol
garethgeorge Dec 23, 2024
66ab9ea
more bug fixes and update UI to use GUIDs
garethgeorge Dec 26, 2024
668786c
fix build errors in test, more fixes needed
garethgeorge Dec 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix build errors
  • Loading branch information
garethgeorge committed Dec 8, 2024
commit d7c9cf02d4c3ec43bb55c209aecc6833e6d9e014
63 changes: 11 additions & 52 deletions cmd/backrest/backrest.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func main() {
if err != nil {
zap.S().Fatalf("error loading config: %v", err)
}
configMgr := &config.ConfigManager{Store: configStore}

var wg sync.WaitGroup

Expand Down Expand Up @@ -113,6 +114,7 @@ func main() {
}()

// Create orchestrator and start task loop.
// TODO: update the orchestrator to accept a configMgr and auto-refresh the config w/o explicit ApplyConfig call.
orchestrator, err := orchestrator.NewOrchestrator(resticPath, cfg, log, logStore)
if err != nil {
zap.S().Fatalf("error creating orchestrator: %v", err)
Expand All @@ -125,65 +127,22 @@ func main() {
}()

// Create and serve the HTTP gateway
syncHandler := syncapi.NewBackrestSyncHandler(configStore, log)

var syncWg sync.WaitGroup
var syncCtxCancel context.CancelFunc

syncConfigHook := &configHookForsyncapi{
store: configStore,
onChange: func(cfg *v1.Config) {
if syncCtxCancel != nil {
zap.L().Info("cancelling existing sync context due to config change, preparing to reinitialize sync engine")
syncCtxCancel()
}
syncWg.Wait()

// Start running a sync loop with this context for each distinct URI
reposByURI := make(map[string][]*v1.Repo)

for _, repo := range cfg.Repos {
if !syncapi.IsBackrestRemoteRepoURI(repo.Uri) {
continue
}
zap.L()
reposByURI[repo.Uri] = append(reposByURI[repo.Uri], repo)
}

if len(reposByURI) == 0 {
return
}

ctx, cancel := context.WithCancel(context.Background())
syncCtxCancel = cancel

for uri, repos := range reposByURI {
syncWg.Add(1)

repoIDs := make([]string, len(repos))
for i, repo := range repos {
repoIDs[i] = repo.Id
}
zap.S().Infof("starting sync engine for URI %q for remote repos %v", uri, repoIDs)
syncMgr := syncapi.NewSyncManager(configMgr, log, orchestrator, filepath.Join(env.DataDir(), "sync"))
wg.Add(1)
go func() {
syncMgr.RunSync(ctx)
wg.Done()
}()

go func(uri string, repos []*v1.Repo) {
defer syncWg.Done()
// Create a new sync engine for this URI
syncClient := syncapi.NewSyncClient(cfg.Instance, uri, log)
syncClient.RunSyncForRepos(ctx, repos)
}(uri, repos)
}
},
}
syncConfigHook.onChange(cfg) // initialize sync engine
syncHandler := syncapi.NewBackrestSyncHandler(syncMgr)

apiBackrestHandler := api.NewBackrestHandler(
syncConfigHook,
configMgr,
orchestrator,
log,
logStore,
)
authenticator := auth.NewAuthenticator(getSecret(), configStore)
authenticator := auth.NewAuthenticator(getSecret(), configMgr)
apiAuthenticationHandler := api.NewAuthenticationHandler(authenticator)

mux := http.NewServeMux()
Expand Down
79 changes: 52 additions & 27 deletions internal/api/syncapi/syncclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,12 +165,12 @@ func (c *SyncClient) runSyncInternal(ctx context.Context) error {
return connect.NewError(connect.CodeInvalidArgument, errors.New("no packets received"))
}

didSubscribeOplog := false
haveRunSync := make(map[string]struct{}) // repo ID -> have run sync?

oplogSubscription := func(ops []*v1.Operation, event oplog.OperationEvent) {
var opsToForward []*v1.Operation
for _, op := range ops {
if connInfo, ok := c.connectedRepos[op.GetRepoId()]; ok && connInfo.ConnectionState == v1.SyncStreamItem_CONNECTION_STATE_CONNECTED {
if _, ok := haveRunSync[op.GetRepoId()]; ok {
opsToForward = append(opsToForward, op)
}
}
Expand Down Expand Up @@ -216,17 +216,30 @@ func (c *SyncClient) runSyncInternal(ctx context.Context) error {
cancelWithError(fmt.Errorf("operation send buffer overflow"))
}
}

haveRunSync := make(map[string]struct{}) // repo ID -> have run sync?
c.oplog.Subscribe(oplog.Query{}, &oplogSubscription)
defer c.oplog.Unsubscribe(&oplogSubscription)

handleSyncCommand := func(item *v1.SyncStreamItem) error {
switch action := item.Action.(type) {
case *v1.SyncStreamItem_SendConfig:
newRemoteConfig := action.SendConfig.Config
if err := c.mgr.remoteConfigStore.Update(c.peer.InstanceId); err != nil {
if err := c.mgr.remoteConfigStore.Update(c.peer.InstanceId, newRemoteConfig); err != nil {
return fmt.Errorf("update remote config store with latest config: %w", err)
}

// remove any repo IDs that are no longer in the config, our access has been revoked.
remoteRepoIDs := make(map[string]struct{})
for _, repo := range newRemoteConfig.GetRepos() {
remoteRepoIDs[repo.GetId()] = struct{}{}
}
for repoID := range haveRunSync {
if _, ok := remoteRepoIDs[repoID]; !ok {
delete(haveRunSync, repoID)
}
}

// load the local config so that we can index the remote repos into any local repos that reference their URIs
// e.g. backrest:<instance-id> format URI.
localConfig, err := c.mgr.configMgr.Get()
if err != nil {
return fmt.Errorf("get local config: %w", err)
Expand All @@ -242,7 +255,7 @@ func (c *SyncClient) runSyncInternal(ctx context.Context) error {
repo.Hooks = nil // we don't accept any hooks from the server. This could be used to execute arbitrary code on the client.
instanceID, err := InstanceForBackrestURI(localRepoConfig.Uri)
if err != nil || instanceID != c.peer.InstanceId {
c.l.Sugar().Debugf("ignoring remote repo config %q/%q because the local repo with the same name specifies URI %q which does not reference the peer providing this config", c.peer.InstanceId, repo.GetId(), localRepoConfig.BackrestURI)
c.l.Sugar().Debugf("ignoring remote repo config %q/%q because the local repo with the same name specifies URI %q which does not reference the peer providing this config", c.peer.InstanceId, repo.GetId(), localRepoConfig.Uri)
continue
}

Expand Down Expand Up @@ -281,6 +294,26 @@ func (c *SyncClient) runSyncInternal(ctx context.Context) error {
var deletedIDs []int64
var sendOps []*v1.Operation
var ops []*v1.Operation

sendOpsFunc := func() error {
if err := stream.Send(&v1.SyncStreamItem{
Action: &v1.SyncStreamItem_SendOperations{
SendOperations: &v1.SyncStreamItem_SyncActionSendOperations{
Event: &v1.OperationEvent{
Event: &v1.OperationEvent_CreatedOperations{
CreatedOperations: &v1.OperationList{Operations: sendOps},
},
},
},
},
}); err != nil {
sendOps = sendOps[:0] // clear the slice
return fmt.Errorf("action diff operations: send create operations: %w", err)
}
sendOps = sendOps[:0] // clear the slice
return nil
}

for _, opID := range requestedOperations {
op, err := c.oplog.Get(opID)
if err != nil {
Expand All @@ -294,11 +327,23 @@ func (c *SyncClient) runSyncInternal(ctx context.Context) error {

_, ok := haveRunSync[op.GetRepoId()]
if !ok {
// this should never happen if sync is working correctly. Would probably indicate oplog corruption?
// this should never happen if sync is working correctly. Would probably indicate oplog or our access was revoked.
// Error out and re-initiate sync.
return fmt.Errorf("remote requested operation for repo %q for which sync was never initiated", op.GetRepoId())
}

sendOps = append(ops, op)
if len(sendOps) >= 128 {
if err := sendOpsFunc(); err != nil {
return err
}
}
}

if len(sendOps) > 0 {
if err := sendOpsFunc(); err != nil {
return err
}
}

if len(deletedIDs) > 0 {
Expand All @@ -316,22 +361,6 @@ func (c *SyncClient) runSyncInternal(ctx context.Context) error {
return fmt.Errorf("action diff operations: send delete operations: %w", err)
}
}

if len(sendOps) > 0 {
if err := stream.Send(&v1.SyncStreamItem{
Action: &v1.SyncStreamItem_SendOperations{
SendOperations: &v1.SyncStreamItem_SyncActionSendOperations{
Event: &v1.OperationEvent{
Event: &v1.OperationEvent_CreatedOperations{
CreatedOperations: &v1.OperationList{Operations: sendOps},
},
},
},
},
}); err != nil {
return fmt.Errorf("action diff operations: send create operations: %w", err)
}
}
case *v1.SyncStreamItem_Throttle:
c.reconnectDelay = time.Duration(action.Throttle.GetDelayMs()) * time.Millisecond
default:
Expand All @@ -357,10 +386,6 @@ func (c *SyncClient) runSyncInternal(ctx context.Context) error {
return err
}
case <-ctx.Done():
if didSubscribeOplog {
c.oplog.Unsubscribe(&oplogSubscription)
}

return ctx.Err()
}
}
Expand Down
2 changes: 2 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ type ConfigManager struct {
changeNotifyCh []chan struct{}
}

var _ ConfigStore = &ConfigManager{}

func (m *ConfigManager) Get() (*v1.Config, error) {
return m.Store.Get()
}
Expand Down
Loading