Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: multihost synchronization #562

Draft
wants to merge 30 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
d417df1
initial multihost prototyping
garethgeorge Nov 16, 2024
8a24c40
basic connectivity established
garethgeorge Nov 29, 2024
1f38c2c
handshake successful
garethgeorge Dec 6, 2024
a6685da
more proto changes and starting to introduce 'sync manager' as a coor…
garethgeorge Dec 7, 2024
3fff743
progress managing remote config sync and syncmanager impl
garethgeorge Dec 8, 2024
9a4cfde
implement client portion of operation diff protocol
garethgeorge Dec 8, 2024
d7c9cf0
fix build errors
garethgeorge Dec 8, 2024
01ccafe
fix crash on startup
garethgeorge Dec 8, 2024
1941b85
move proto to new file
garethgeorge Dec 9, 2024
e6a905c
config sync works & operation sync partially functional
garethgeorge Dec 10, 2024
78e94d1
fix some sync engine bugs
garethgeorge Dec 10, 2024
0c6dc16
test coverage
garethgeorge Dec 11, 2024
63091e5
expand test coverage to cover operation sync
garethgeorge Dec 11, 2024
247f4f5
more progress on sync tests
garethgeorge Dec 11, 2024
f071c64
normalize sqlite schema
garethgeorge Dec 13, 2024
3902723
more sqlite schema changes
garethgeorge Dec 14, 2024
87cf299
implement backup utility
garethgeorge Dec 14, 2024
bc76d60
sync tests passing now with flows properly propagated
garethgeorge Dec 14, 2024
ae8b5a8
add repo provider param
garethgeorge Dec 14, 2024
0a68168
support querying repo for its config
garethgeorge Dec 15, 2024
4d89fc1
add repo guid property
garethgeorge Dec 15, 2024
f392a73
implement migration + proper guid handling
garethgeorge Dec 15, 2024
59d8d78
more progress adding repo guid field
garethgeorge Dec 17, 2024
462cddb
backrest builds, tests not passing
garethgeorge Dec 17, 2024
6c08963
more progress on GUID plumbing
garethgeorge Dec 17, 2024
d5829a8
start working on test fixes
garethgeorge Dec 17, 2024
3924482
cont'd test fixes
garethgeorge Dec 18, 2024
e0176b1
use repo guid in sync protocol
garethgeorge Dec 23, 2024
66ab9ea
more bug fixes and update UI to use GUIDs
garethgeorge Dec 26, 2024
668786c
fix build errors in test, more fixes needed
garethgeorge Dec 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
handshake successful
  • Loading branch information
garethgeorge committed Dec 6, 2024
commit 1f38c2c83a5e97d8c91c1c887cbfea01113f4298
7 changes: 6 additions & 1 deletion cmd/backrest/backrest.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,10 +270,15 @@ func installLoggers() {
c := zap.NewDevelopmentEncoderConfig()
c.EncodeLevel = zapcore.CapitalColorLevelEncoder
c.EncodeTime = zapcore.ISO8601TimeEncoder

debugLevel := zapcore.InfoLevel
if version == "unknown" { // dev build
debugLevel = zapcore.DebugLevel
}
pretty := zapcore.NewCore(
zapcore.NewConsoleEncoder(c),
zapcore.AddSync(colorable.NewColorableStdout()),
zapcore.InfoLevel,
debugLevel,
)

// JSON logging to log directory
Expand Down
27 changes: 17 additions & 10 deletions internal/api/syncapi/syncclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type SyncClient struct {
client v1connect.BackrestSyncServiceClient
reconnectDelay time.Duration
connectedRepos map[string]*v1.SyncConnectionInfo
l *zap.Logger
}

func newInsecureClient() *http.Client {
Expand All @@ -47,19 +48,19 @@ func newInsecureClient() *http.Client {

func NewSyncClient(localInstanceID, remoteInstanceURL string, oplog *oplog.OpLog) *SyncClient {
urlToUse := backrestRemoteUrlToHTTPUrl(remoteInstanceURL)
zap.L().Info("Connecting to sync server", zap.String("url", urlToUse))

client := v1connect.NewBackrestSyncServiceClient(
newInsecureClient(),
urlToUse,
)

return &SyncClient{
reconnectDelay: 60 * time.Second,
client: client,
oplog: oplog,
localInstanceID: localInstanceID,
reconnectDelay: 60 * time.Second,
client: client,
oplog: oplog,

connectedRepos: make(map[string]*v1.SyncConnectionInfo),
l: zap.L().Named("syncclient").With(zap.String("peer", remoteInstanceURL)),
}
}

Expand All @@ -77,23 +78,30 @@ func (c *SyncClient) setStatusForRepos(repos []*v1.Repo, state v1.SyncStreamItem
func (c *SyncClient) RunSyncForRepos(ctx context.Context, repos []*v1.Repo) {
for {
if ctx.Err() != nil {
break // context is done
return
}

lastConnect := time.Now()
c.setStatusForRepos(repos, v1.SyncStreamItem_CONNECTION_STATE_PENDING, "connecting")
if err := c.runSyncForReposInternal(ctx, repos); err != nil {
c.l.Warn("sync error", zap.Error(err))
c.setStatusForRepos(repos, v1.SyncStreamItem_CONNECTION_STATE_DISCONNECTED, err.Error())
} else {
c.setStatusForRepos(repos, v1.SyncStreamItem_CONNECTION_STATE_DISCONNECTED, "connection closed")
}

time.Sleep(c.reconnectDelay - time.Since(lastConnect))
delay := c.reconnectDelay - time.Since(lastConnect)
c.l.Info("lost connection, retrying after delay", zap.Duration("delay", delay))
select {
case <-time.After(delay):
case <-ctx.Done():
return
}
}
}

func (c *SyncClient) runSyncForReposInternal(ctx context.Context, repos []*v1.Repo) error {
zap.L().Info("Connecting to server for sync")
c.l.Info("connecting to sync server")
stream := c.client.Sync(ctx)

ctx, cancelWithError := context.WithCancelCause(ctx)
Expand All @@ -105,7 +113,7 @@ func (c *SyncClient) runSyncForReposInternal(ctx context.Context, repos []*v1.Re
for {
item, err := stream.Receive()
if err != nil {
zap.L().Error("Error receiving from sync stream", zap.Error(err))
c.l.Debug("receive error from sync stream, this is typically due to connection loss", zap.Error(err))
close(receive)
return
}
Expand All @@ -114,7 +122,6 @@ func (c *SyncClient) runSyncForReposInternal(ctx context.Context, repos []*v1.Re
}()

// Broadcast initial packet containing the protocol version and instance ID.
zap.S().Infof("Broadcast handshake as %v", c.localInstanceID)
if err := stream.Send(&v1.SyncStreamItem{
Action: &v1.SyncStreamItem_Handshake{
Handshake: &v1.SyncStreamItem_SyncActionHandshake{
Expand Down
4 changes: 3 additions & 1 deletion internal/api/syncapi/synchandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (h *BackrestSyncHandler) Sync(ctx context.Context, stream *connect.BidiStre
return connect.NewError(connect.CodeInvalidArgument, errors.New("no packets received"))
}

zap.S().Infof("Client connected with instance ID %s", clientInstanceID)
zap.S().Infof("syncserver client connected with instance ID %s", clientInstanceID)

// After receiving handshake packet, start processing commands
connectedRepos := make(map[string]struct{})
Expand Down Expand Up @@ -126,6 +126,8 @@ func (h *BackrestSyncHandler) Sync(ctx context.Context, stream *connect.BidiStre
}); err != nil {
return fmt.Errorf("action ConnectRepo: send connection state reply: %w", err)
}

zap.S().Debugf("syncserver client %q connected to repo %q", clientInstanceID, action.ConnectRepo.RepoId)
case *v1.SyncStreamItem_DiffOperations:
diffSel := action.DiffOperations.GetHaveOperationsSelector()

Expand Down
Loading