Skip to content

Commit

Permalink
config sync works & operation sync partially functional
Browse files Browse the repository at this point in the history
  • Loading branch information
garethgeorge committed Dec 10, 2024
1 parent 1941b85 commit e6a905c
Show file tree
Hide file tree
Showing 9 changed files with 351 additions and 324 deletions.
19 changes: 0 additions & 19 deletions cmd/backrest/backrest.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,22 +311,3 @@ func migrateBboltOplog(logstore oplog.OpStore) {
}
zap.S().Infof("migrated %d operations from old bbolt oplog to sqlite", count)
}

type configHookForsyncapi struct {
store config.ConfigStore
onChange func(*v1.Config)
}

var _ config.ConfigStore = &configHookForsyncapi{}

func (c *configHookForsyncapi) Get() (*v1.Config, error) {
return c.store.Get()
}

func (c *configHookForsyncapi) Update(cfg *v1.Config) error {
err := c.store.Update(cfg)
if err == nil {
c.onChange(cfg)
}
return err
}
559 changes: 279 additions & 280 deletions gen/go/v1/config.pb.go

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions internal/api/syncapi/identity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ func TestIdentity(t *testing.T) {
t.Fatalf("failed to create identity: %v", err)
}

signed, err := ident.SignMessage([]byte("hello world!"))
fmt.Println("signed message: %x", signed)
signature, err := ident.SignMessage([]byte("hello world!"))
fmt.Printf("signed message: %x\n", signature)

// Load and print identity file
bytes, _ := os.ReadFile(filepath.Join(dir, "myidentity.pem"))
Expand Down
35 changes: 28 additions & 7 deletions internal/api/syncapi/syncclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/tls"
"errors"
"fmt"
"io"
"net"
"net/http"
"sync"
Expand Down Expand Up @@ -66,13 +67,14 @@ func NewSyncClient(mgr *SyncManager, localInstanceID string, peer *v1.Multihost_
reconnectDelay: 60 * time.Second,
client: client,
oplog: oplog,
l: zap.L().Named("syncclient").With(zap.String("peer", peer.GetInstanceUrl())),
l: zap.L().Named(fmt.Sprintf("syncclient %q", peer.GetInstanceId())),
}, nil
}

func (c *SyncClient) setConnectionState(state v1.SyncConnectionState, message string) {
c.mu.Lock()
c.connectionStatus = state
c.connectionStatusMessage = message
c.mu.Unlock()
}

Expand All @@ -93,7 +95,7 @@ func (c *SyncClient) RunSync(ctx context.Context) {
c.setConnectionState(v1.SyncConnectionState_CONNECTION_STATE_PENDING, "connection pending")

if err := c.runSyncInternal(ctx); err != nil {
c.l.Error("sync error", zap.Error(err))
c.l.Sugar().Errorf("sync error: %v", err)
c.setConnectionState(v1.SyncConnectionState_CONNECTION_STATE_DISCONNECTED, err.Error())
}

Expand All @@ -113,15 +115,15 @@ func (c *SyncClient) runSyncInternal(ctx context.Context) error {

ctx, cancelWithError := context.WithCancelCause(ctx)

receiveError := make(chan error, 1)
receive := make(chan *v1.SyncStreamItem, 1)
send := make(chan *v1.SyncStreamItem, 100)

go func() {
for {
item, err := stream.Receive()
if err != nil {
c.l.Debug("receive error from sync stream, this is typically due to connection loss", zap.Error(err))
close(receive)
receiveError <- err
return
}
receive <- item
Expand All @@ -142,7 +144,14 @@ func (c *SyncClient) runSyncInternal(ctx context.Context) error {
},
},
}); err != nil {
return err
// note: the error checking w/streams in connectrpc is fairly awkward.
// If write returns an EOF error, we are expected to call stream.Receive()
// to get the unmarshalled network failure.
if !errors.Is(err, io.EOF) {
return err
}
_, err2 := stream.Receive()
return err2
}

// Wait for the handshake packet from the server.
Expand All @@ -165,6 +174,10 @@ func (c *SyncClient) runSyncInternal(ctx context.Context) error {
return connect.NewError(connect.CodeInvalidArgument, errors.New("no packets received"))
}

if serverInstanceID != c.peer.InstanceId {
return connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("server instance ID %q does not match expected peer instance ID %q", serverInstanceID, c.peer.InstanceId))
}

haveRunSync := make(map[string]struct{}) // repo ID -> have run sync?

oplogSubscription := func(ops []*v1.Operation, event oplog.OperationEvent) {
Expand Down Expand Up @@ -222,6 +235,7 @@ func (c *SyncClient) runSyncInternal(ctx context.Context) error {
handleSyncCommand := func(item *v1.SyncStreamItem) error {
switch action := item.Action.(type) {
case *v1.SyncStreamItem_SendConfig:
c.l.Sugar().Debugf("received remote config update")
newRemoteConfig := action.SendConfig.Config
if err := c.mgr.remoteConfigStore.Update(c.peer.InstanceId, newRemoteConfig); err != nil {
return fmt.Errorf("update remote config store with latest config: %w", err)
Expand Down Expand Up @@ -252,10 +266,14 @@ func (c *SyncClient) runSyncInternal(ctx context.Context) error {
}
haveRunSync[repo.GetId()] = struct{}{}
localRepoConfig := config.FindRepo(localConfig, repo.GetId())
if localRepoConfig == nil {
c.l.Sugar().Debugf("ignoring remote repo config %q/%q because the local repo with the same name does not exist", c.peer.InstanceId, repo.GetId())
continue
}
repo.Hooks = nil // we don't accept any hooks from the server. This could be used to execute arbitrary code on the client.
instanceID, err := InstanceForBackrestURI(localRepoConfig.Uri)
if err != nil || instanceID != c.peer.InstanceId {
c.l.Sugar().Debugf("ignoring remote repo config %q/%q because the local repo with the same name specifies URI %q which does not reference the peer providing this config", c.peer.InstanceId, repo.GetId(), localRepoConfig.Uri)
c.l.Sugar().Debugf("ignoring remote repo config %q/%q because the local repo with the same name specifies URI %q (instance ID %q) which does not reference the peer providing this config", c.peer.InstanceId, repo.GetId(), localRepoConfig.Uri, instanceID)
continue
}

Expand All @@ -273,6 +291,8 @@ func (c *SyncClient) runSyncInternal(ctx context.Context) error {
return fmt.Errorf("action sync config: query oplog for repo %q: %w", repo.GetId(), err)
}

c.l.Sugar().Infof("initiating operation history sync for repo %q", repo.GetId())

if err := stream.Send(&v1.SyncStreamItem{
Action: &v1.SyncStreamItem_DiffOperations{
DiffOperations: &v1.SyncStreamItem_SyncActionDiffOperations{
Expand Down Expand Up @@ -371,6 +391,8 @@ func (c *SyncClient) runSyncInternal(ctx context.Context) error {

for {
select {
case err := <-receiveError:
return fmt.Errorf("receive error: %w", err)
case item, ok := <-receive:
if !ok {
return nil
Expand All @@ -389,5 +411,4 @@ func (c *SyncClient) runSyncInternal(ctx context.Context) error {
return ctx.Err()
}
}

}
5 changes: 4 additions & 1 deletion internal/api/syncapi/synchandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
const SyncProtocolVersion = 1

type BackrestSyncHandler struct {
v1connect.UnimplementedBackrestSyncServiceHandler
mgr *SyncManager
}

Expand Down Expand Up @@ -53,7 +54,7 @@ func (h *BackrestSyncHandler) Sync(ctx context.Context, stream *connect.BidiStre
}()

// Broadcast initial packet containing the protocol version and instance ID.
zap.S().Infof("Client connected, broadcast handshake as %v", initialConfig.Instance)
zap.S().Debugf("syncserver a client connected, broadcast handshake as %v", initialConfig.Instance)
if err := stream.Send(&v1.SyncStreamItem{
Action: &v1.SyncStreamItem_Handshake{
Handshake: &v1.SyncStreamItem_SyncActionHandshake{
Expand Down Expand Up @@ -125,6 +126,7 @@ func (h *BackrestSyncHandler) Sync(ctx context.Context, stream *connect.BidiStre
if slices.Contains(repo.AllowedPeerInstanceIds, clientInstanceID) {
repoCopy := proto.Clone(repo).(*v1.Repo)
repoCopy.AllowedPeerInstanceIds = nil
repoCopy.Hooks = nil
remoteConfig.Repos = append(remoteConfig.Repos, repoCopy)
}
}
Expand Down Expand Up @@ -306,6 +308,7 @@ func (h *BackrestSyncHandler) Sync(ctx context.Context, stream *connect.BidiStre
}
sendConfigToClient(newConfig)
case <-ctx.Done():
zap.S().Infof("syncserver client %q disconnected", authorizedClientPeer.InstanceId)
return ctx.Err()
}
}
Expand Down
28 changes: 18 additions & 10 deletions internal/api/syncapi/syncmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,14 @@ func NewSyncManager(configMgr *config.ConfigManager, oplog *oplog.OpLog, orchest
orchestrator: orchestrator,
oplog: oplog,
remoteConfigStore: remoteConfigStore,

syncClients: make(map[string]*SyncClient),
}
}

// Note: top level function will be called holding the lock, must kick off goroutines and then return.
func (m *SyncManager) RunSync(ctx context.Context) {
var syncWg sync.WaitGroup
var cancelLastSync context.CancelFunc

configWatchCh := m.configMgr.Watch()
Expand All @@ -50,6 +53,8 @@ func (m *SyncManager) RunSync(ctx context.Context) {
// TODO: rather than cancel the top level context, something clever e.g. diffing the set of peers could be done here.
if cancelLastSync != nil {
cancelLastSync()
zap.L().Info("syncmanager applying new config, waiting for existing sync goroutines to exit")
syncWg.Wait()
}
syncCtx, cancel := context.WithCancel(ctx)
cancelLastSync = cancel
Expand All @@ -65,9 +70,15 @@ func (m *SyncManager) RunSync(ctx context.Context) {
return
}

zap.S().Info("syncmanager applying new config, starting sync goroutines for %d known peers", len(config.Multihost.GetKnownHosts()))
zap.S().Infof("syncmanager applying new config, starting sync goroutines for %d known peers", len(config.Multihost.GetKnownHosts()))
for _, knownHostPeer := range config.Multihost.KnownHosts {
if knownHostPeer.InstanceId == "" {
continue
}

syncWg.Add(1)
go func(knownHostPeer *v1.Multihost_Peer) {
defer syncWg.Done()
zap.S().Debugf("syncmanager starting sync goroutine with peer %q", knownHostPeer.InstanceId)
err := m.runSyncWithPeerInternal(syncCtx, config, knownHostPeer)
if err != nil {
Expand Down Expand Up @@ -96,16 +107,13 @@ func (m *SyncManager) runSyncWithPeerInternal(ctx context.Context, config *v1.Co
return errors.New("local instance must set instance name before peersync can be enabled")
}

client, ok := m.syncClients[knownHostPeer.InstanceId]
if !ok {
newClient, err := NewSyncClient(m, config.Instance, knownHostPeer, m.oplog)
if err != nil {
return fmt.Errorf("creating sync client: %w", err)
}
m.syncClients[knownHostPeer.InstanceId] = client
client = newClient
newClient, err := NewSyncClient(m, config.Instance, knownHostPeer, m.oplog)
if err != nil {
return fmt.Errorf("creating sync client: %w", err)
}
m.syncClients[knownHostPeer.InstanceId] = newClient

go newClient.RunSync(ctx)

go client.RunSync(ctx)
return nil
}
15 changes: 15 additions & 0 deletions internal/api/syncapi/uriutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"net/url"
)

var ErrNotBackrestURI = errors.New("not a backrest URI")

func CreateRemoteRepoURI(instanceUrl string) (string, error) {
u, err := url.Parse(instanceUrl)
if err != nil {
Expand Down Expand Up @@ -43,3 +45,16 @@ func InstanceForBackrestURI(repoUri string) (string, error) {

return u.Hostname(), nil
}

func RepoForBackrestURI(repoUri string) (string, error) {
u, err := url.Parse(repoUri)
if err != nil {
return "", err
}

if u.Scheme != "backrest" {
return "", errors.New("not a backrest URI")
}

return u.Path, nil
}
4 changes: 2 additions & 2 deletions proto/v1/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ message Config {
repeated Repo repos = 3 [json_name="repos"];
repeated Plan plans = 4 [json_name="plans"];
Auth auth = 5 [json_name="auth"];
Multihost multihost = 7 [json_name="multihost"];
Multihost multihost = 7 [json_name="sync"];
}

message Multihost {
Expand Down Expand Up @@ -58,7 +58,7 @@ message Repo {
repeated Hook hooks = 7 [json_name="hooks"]; // hooks to run on events for this repo.
bool auto_unlock = 8 [json_name="autoUnlock"]; // automatically unlock the repo when needed.
CommandPrefix command_prefix = 10 [json_name="commandPrefix"]; // modifiers for the restic commands
repeated string allowed_peer_instance_ids = 100 [json_name="allowedPeerInstanceIDs"]; // list of peer instance IDs allowed to access this repo.
repeated string allowed_peer_instance_ids = 100 [json_name="allowedPeers"]; // list of peer instance IDs allowed to access this repo.
}

message Plan {
Expand Down
Loading

0 comments on commit e6a905c

Please sign in to comment.