Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: multihost synchronization #562

Draft
wants to merge 30 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
d417df1
initial multihost prototyping
garethgeorge Nov 16, 2024
8a24c40
basic connectivity established
garethgeorge Nov 29, 2024
1f38c2c
handshake successful
garethgeorge Dec 6, 2024
a6685da
more proto changes and starting to introduce 'sync manager' as a coor…
garethgeorge Dec 7, 2024
3fff743
progress managing remote config sync and syncmanager impl
garethgeorge Dec 8, 2024
9a4cfde
implement client portion of operation diff protocol
garethgeorge Dec 8, 2024
d7c9cf0
fix build errors
garethgeorge Dec 8, 2024
01ccafe
fix crash on startup
garethgeorge Dec 8, 2024
1941b85
move proto to new file
garethgeorge Dec 9, 2024
e6a905c
config sync works & operation sync partially functional
garethgeorge Dec 10, 2024
78e94d1
fix some sync engine bugs
garethgeorge Dec 10, 2024
0c6dc16
test coverage
garethgeorge Dec 11, 2024
63091e5
expand test coverage to cover operation sync
garethgeorge Dec 11, 2024
247f4f5
more progress on sync tests
garethgeorge Dec 11, 2024
f071c64
normalize sqlite schema
garethgeorge Dec 13, 2024
3902723
more sqlite schema changes
garethgeorge Dec 14, 2024
87cf299
implement backup utility
garethgeorge Dec 14, 2024
bc76d60
sync tests passing now with flows properly propagated
garethgeorge Dec 14, 2024
ae8b5a8
add repo provider param
garethgeorge Dec 14, 2024
0a68168
support querying repo for its config
garethgeorge Dec 15, 2024
4d89fc1
add repo guid property
garethgeorge Dec 15, 2024
f392a73
implement migration + proper guid handling
garethgeorge Dec 15, 2024
59d8d78
more progress adding repo guid field
garethgeorge Dec 17, 2024
462cddb
backrest builds, tests not passing
garethgeorge Dec 17, 2024
6c08963
more progress on GUID plumbing
garethgeorge Dec 17, 2024
d5829a8
start working on test fixes
garethgeorge Dec 17, 2024
3924482
cont'd test fixes
garethgeorge Dec 18, 2024
e0176b1
use repo guid in sync protocol
garethgeorge Dec 23, 2024
66ab9ea
more bug fixes and update UI to use GUIDs
garethgeorge Dec 26, 2024
668786c
fix build errors in test, more fixes needed
garethgeorge Dec 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
more progress on sync tests
  • Loading branch information
garethgeorge committed Dec 11, 2024
commit 247f4f59728e7f81848b1bf5a449e59c3d630875
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,12 @@ func TestSimpleOperationSync(t *testing.T) {
DisplayMessage: "hostop1",
},
})...)
peerHost.oplog.Add(testutil.OperationsWithDefaults(basicClientOperationTempl, []*v1.Operation{
{
DisplayMessage: "clientop-missing",
OriginalId: 1234, // must be an ID that doesn't exist remotely
},
})...)

peerClient.oplog.Add(testutil.OperationsWithDefaults(basicClientOperationTempl, []*v1.Operation{
{
Expand All @@ -240,6 +246,19 @@ func TestSimpleOperationSync(t *testing.T) {
tryConnect(t, ctx, peerClient, defaultHostID)

tryExpectOperationsSynced(t, ctx, peerHost, peerClient, oplog.Query{RepoID: defaultRepoID, InstanceID: defaultClientID})
tryExpectExactOperations(t, ctx, peerHost, oplog.Query{RepoID: defaultRepoID, InstanceID: defaultClientID},
testutil.OperationsWithDefaults(basicClientOperationTempl, []*v1.Operation{
{
Id: 3, // b/c hostop1 has id 1
OriginalId: 1,
DisplayMessage: "clientop1",
},
{
Id: 4, // b/c hostop1 has id 1
OriginalId: 2,
DisplayMessage: "clientop2",
},
}))
}

func getOperations(t *testing.T, oplog *oplog.OpLog, query oplog.Query) []*v1.Operation {
Expand Down Expand Up @@ -272,6 +291,17 @@ func tryExpectOperationsSynced(t *testing.T, ctx context.Context, peer1 *peerUnd
err := testutil.Retry(t, ctx, func() error {
peer1Ops := getOperations(t, peer1.oplog, query)
peer2Ops := getOperations(t, peer2.oplog, query)
// clear fields that we expect will be re-mapped
for _, op := range peer1Ops {
op.Id = 0
op.FlowId = 0
op.OriginalId = 0
}
for _, op := range peer2Ops {
op.Id = 0
op.FlowId = 0
op.OriginalId = 0
}

sortFn := func(a, b *v1.Operation) int {
if a.DisplayMessage < b.DisplayMessage {
Expand Down
6 changes: 4 additions & 2 deletions internal/api/syncapi/syncclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,10 +317,10 @@ func (c *SyncClient) runSyncInternal(ctx context.Context) error {
}
case *v1.SyncStreamItem_DiffOperations:
requestedOperations := action.DiffOperations.GetRequestOperations()
c.l.Sugar().Debugf("received operation request for operations: %v", requestedOperations)

var deletedIDs []int64
var sendOps []*v1.Operation
var ops []*v1.Operation

sendOpsFunc := func() error {
if err := stream.Send(&v1.SyncStreamItem{
Expand All @@ -337,6 +337,7 @@ func (c *SyncClient) runSyncInternal(ctx context.Context) error {
sendOps = sendOps[:0]
return fmt.Errorf("action diff operations: send create operations: %w", err)
}
c.l.Sugar().Debugf("sent %d operations", len(sendOps))
sendOps = sendOps[:0]
return nil
}
Expand All @@ -353,6 +354,7 @@ func (c *SyncClient) runSyncInternal(ctx context.Context) error {
continue // skip this operation
}
if op.GetInstanceId() != c.localInstanceID {
c.l.Sugar().Warnf("action diff operations, requested operation %d is not from this instance, this shouldn't happen with a wellbehaved server", opID)
continue // skip operations that are not from this instance e.g. an "index snapshot" picking up snapshots created by another instance.
}

Expand All @@ -363,7 +365,7 @@ func (c *SyncClient) runSyncInternal(ctx context.Context) error {
return fmt.Errorf("remote requested operation for repo %q for which sync was never initiated", op.GetRepoId())
}

sendOps = append(ops, op)
sendOps = append(sendOps, op)
sentOps += 1
if len(sendOps) >= 256 {
if err := sendOpsFunc(); err != nil {
Expand Down
26 changes: 25 additions & 1 deletion internal/api/syncapi/synchandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,11 @@ func (h *BackrestSyncHandler) Sync(ctx context.Context, stream *connect.BidiStre
// The diff selector _must_ specify a repo the client has access to
repo := config.FindRepo(initialConfig, diffSel.GetRepoId())
if repo == nil {
zap.S().Warnf("syncserver action DiffOperations: client %q tried to diff with repo %q that does not exist", clientInstanceID, diffSel.GetRepoId())
return connect.NewError(connect.CodePermissionDenied, fmt.Errorf("action DiffOperations: repo %q not found", diffSel.GetRepoId()))
}
if !slices.Contains(repo.GetAllowedPeerInstanceIds(), clientInstanceID) {
zap.S().Warnf("syncserver action DiffOperations: client %q tried to diff with repo %q that they are not allowed to access", clientInstanceID, diffSel.GetRepoId())
return connect.NewError(connect.CodePermissionDenied, fmt.Errorf("action DiffOperations: client is not allowed to access repo %q", diffSel.GetRepoId()))
}

Expand All @@ -192,6 +194,9 @@ func (h *BackrestSyncHandler) Sync(ctx context.Context, stream *connect.BidiStre

localMetadata := []oplog.OpMetadata{}
if err := h.mgr.oplog.QueryMetadata(diffSelQuery, func(metadata oplog.OpMetadata) error {
if metadata.OriginalID == 0 {
return nil // skip operations that didn't come from a remote
}
localMetadata = append(localMetadata, metadata)
return nil
}); err != nil {
Expand All @@ -212,6 +217,9 @@ func (h *BackrestSyncHandler) Sync(ctx context.Context, stream *connect.BidiStre
return remoteMetadata[i].ID < remoteMetadata[j].ID
})

requestDueToModno := 0
requestMissingRemote := 0
requestMissingLocal := 0
requestIDs := []int64{}

// This is a simple O(n) diff algorithm that compares the local and remote metadata vectors.
Expand All @@ -227,26 +235,39 @@ func (h *BackrestSyncHandler) Sync(ctx context.Context, stream *connect.BidiStre
}
localIndex++
remoteIndex++
requestDueToModno++
} else if local.OriginalID < remote.ID {
// the ID is found locally not remotely, request it and see if we get a delete event back
// from the client indicating that the operation was deleted.
requestIDs = append(requestIDs, local.OriginalID)
localIndex++
requestMissingLocal++
} else {
// the ID is found remotely not locally, request it for initial sync.
requestIDs = append(requestIDs, remote.ID)
remoteIndex++
requestMissingRemote++
}
}
for localIndex < len(localMetadata) {
requestIDs = append(requestIDs, localMetadata[localIndex].OriginalID)
localIndex++
requestMissingLocal++
}
for remoteIndex < len(remoteMetadata) {
requestIDs = append(requestIDs, remoteMetadata[remoteIndex].ID)
remoteIndex++
requestMissingRemote++
}

zap.L().Debug("syncserver diff operations with client metadata",
zap.String("client_instance_id", clientInstanceID),
zap.Any("query", diffSelQuery),
zap.Int("request_due_to_modno", requestDueToModno),
zap.Int("request_local_but_not_remote", requestMissingLocal),
zap.Int("request_remote_but_not_local", requestMissingRemote),
zap.Int("request_ids_total", len(requestIDs)),
)
if len(requestIDs) > 0 {
if err := stream.Send(&v1.SyncStreamItem{
Action: &v1.SyncStreamItem_DiffOperations{
Expand All @@ -263,18 +284,21 @@ func (h *BackrestSyncHandler) Sync(ctx context.Context, stream *connect.BidiStre
case *v1.SyncStreamItem_SendOperations:
switch event := action.SendOperations.GetEvent().Event.(type) {
case *v1.OperationEvent_CreatedOperations:
zap.L().Debug("syncserver received created operations", zap.Any("operations", event.CreatedOperations.GetOperations()))
for _, op := range event.CreatedOperations.GetOperations() {
if err := insertOrUpdate(op); err != nil {
return fmt.Errorf("action SendOperations: operation event create: %w", err)
}
}
case *v1.OperationEvent_UpdatedOperations:
zap.L().Debug("syncserver received update operations", zap.Any("operations", event.UpdatedOperations.GetOperations()))
for _, op := range event.UpdatedOperations.GetOperations() {
if err := insertOrUpdate(op); err != nil {
return fmt.Errorf("action SendOperations: operation event update: %w", err)
}
}
case *v1.OperationEvent_DeletedOperations:
zap.L().Debug("syncserver received delete operations", zap.Any("operations", event.DeletedOperations.GetValues()))
for _, id := range event.DeletedOperations.GetValues() {
if err := deleteByOriginalID(id); err != nil {
return fmt.Errorf("action SendOperations: operation event delete %d: %w", id, err)
Expand All @@ -285,7 +309,7 @@ func (h *BackrestSyncHandler) Sync(ctx context.Context, stream *connect.BidiStre
return connect.NewError(connect.CodeInvalidArgument, errors.New("action SendOperations: unknown event type"))
}
default:
return connect.NewError(connect.CodeInvalidArgument, errors.New("Unknown action type"))
return connect.NewError(connect.CodeInvalidArgument, errors.New("unknown action type"))
}

return nil
Expand Down
Loading