Skip to content

Commit

Permalink
Move ActivateComponents RPC from pipe to component method call. (Serv…
Browse files Browse the repository at this point in the history
…iceWeaver#705)

Continuing the work to remove pipe-based RPCs, the ActivateComponents
call to the deployer is now a component method call.

Also made NewEnvelope() responsible for serving calls to the deployer
component. (Callers that are directly using NewEnvelopeConn() still
have to manage such calls themselves.)
  • Loading branch information
ghemawat authored Jan 9, 2024
1 parent 7c8a152 commit d181e06
Show file tree
Hide file tree
Showing 22 changed files with 518 additions and 292 deletions.
7 changes: 6 additions & 1 deletion deployerControl.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,15 @@ func (local *localDeployerControl) Init(ctx context.Context) error {
return nil
}

// LogBatch logs a list of entries.
// LogBatch implements the control.DeployerControl interface.
func (local *localDeployerControl) LogBatch(ctx context.Context, batch *protos.LogEntryBatch) error {
for _, entry := range batch.Entries {
fmt.Fprintln(os.Stderr, local.pp.Format(entry))
}
return nil
}

// ActivateComponent implements the control.DeployerControl interface.
func (*localDeployerControl) ActivateComponent(context.Context, *protos.ActivateComponentRequest) (*protos.ActivateComponentReply, error) {
return nil, fmt.Errorf("localDeployerControl.ActivateComponent not implemented")
}
1 change: 1 addition & 0 deletions examples/examples_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func TestExamples(t *testing.T) {
// Build the weaver binary.
cmd := exec.Command("go", "build")
cmd.Dir = "../cmd/weaver"
cmd.Stderr = os.Stderr
if err := cmd.Run(); err != nil {
t.Fatal(err)
}
Expand Down
4 changes: 2 additions & 2 deletions godeps.txt
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,6 @@ github.com/ServiceWeaver/weaver/internal/tool/multi
errors
flag
fmt
github.com/ServiceWeaver/weaver/internal/control
github.com/ServiceWeaver/weaver/internal/metrics
github.com/ServiceWeaver/weaver/internal/must
github.com/ServiceWeaver/weaver/internal/proxy
Expand All @@ -563,7 +562,6 @@ github.com/ServiceWeaver/weaver/internal/tool/multi
github.com/ServiceWeaver/weaver/runtime/bin
github.com/ServiceWeaver/weaver/runtime/codegen
github.com/ServiceWeaver/weaver/runtime/colors
github.com/ServiceWeaver/weaver/runtime/deployers
github.com/ServiceWeaver/weaver/runtime/envelope
github.com/ServiceWeaver/weaver/runtime/graph
github.com/ServiceWeaver/weaver/runtime/logging
Expand Down Expand Up @@ -840,6 +838,7 @@ github.com/ServiceWeaver/weaver/runtime/envelope
golang.org/x/sync/errgroup
io
log/slog
net
os
strconv
sync
Expand Down Expand Up @@ -1005,6 +1004,7 @@ github.com/ServiceWeaver/weaver/weavertest
golang.org/x/exp/maps
golang.org/x/sync/errgroup
log/slog
net
os
reflect
regexp
Expand Down
10 changes: 10 additions & 0 deletions internal/control/deployer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,22 @@ import (
"github.com/ServiceWeaver/weaver/runtime/protos"
)

// DeployerPath is the path used for the deployer control component.
// It points to an internal type in a different package.
const DeployerPath = "github.com/ServiceWeaver/weaver/deployerControl"

// DeployerControl is the interface for the weaver.deployerControl component. It is
// present in its own package so other packages do not need to copy the interface
// definition.
//
// Arguments and results are protobufs to allow deployers to evolve independently
// of application binaries.
type DeployerControl interface {
// LogBatch logs the supplied batch of log entries.
LogBatch(context.Context, *protos.LogEntryBatch) error

// ActivateComponent ensures that the provided component is running
// somewhere. A call to ActivateComponent also implicitly signals that a
// weavelet is interested in receiving routing info for the component.
ActivateComponent(context.Context, *protos.ActivateComponentRequest) (*protos.ActivateComponentReply, error)
}
4 changes: 4 additions & 0 deletions internal/control/weavelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ import (
"github.com/ServiceWeaver/weaver/runtime/protos"
)

// WeaveletPath is the path used for the weavelet control component.
// It points to an internal type in a different package.
const WeaveletPath = "github.com/ServiceWeaver/weaver/weaveletControl"

// WeaveletControl is the interface for the weaver.weaveletControl component. It is
// present in its own package so other packages do not need to copy the interface
// definition.
Expand Down
4 changes: 4 additions & 0 deletions internal/envelope/conn/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ func (*handlerForTest) HandleTraceSpans(context.Context, *protos.TraceSpans) err
return nil
}

func (*handlerForTest) LogBatch(context.Context, *protos.LogEntryBatch) error {
return nil
}

func (*handlerForTest) HandleLogEntry(context.Context, *protos.LogEntry) error {
return nil
}
Expand Down
8 changes: 1 addition & 7 deletions internal/envelope/conn/envelope_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type EnvelopeHandler interface {
GetSelfCertificate(context.Context, *protos.GetSelfCertificateRequest) (*protos.GetSelfCertificateReply, error)
VerifyClientCertificate(context.Context, *protos.VerifyClientCertificateRequest) (*protos.VerifyClientCertificateReply, error)
VerifyServerCertificate(context.Context, *protos.VerifyServerCertificateRequest) (*protos.VerifyServerCertificateReply, error)
LogBatch(context.Context, *protos.LogEntryBatch) error
HandleLogEntry(context.Context, *protos.LogEntry) error
HandleTraceSpans(context.Context, *protos.TraceSpans) error
}
Expand Down Expand Up @@ -196,13 +197,6 @@ func (e *EnvelopeConn) handleMessage(msg *protos.WeaveletMsg, h EnvelopeHandler)
}

switch {
case msg.ActivateComponentRequest != nil:
reply, err := h.ActivateComponent(e.ctx, msg.ActivateComponentRequest)
return e.conn.send(&protos.EnvelopeMsg{
Id: -msg.Id,
Error: errstring(err),
ActivateComponentReply: reply,
})
case msg.GetListenerAddressRequest != nil:
reply, err := h.GetListenerAddress(e.ctx, msg.GetListenerAddressRequest)
return e.conn.send(&protos.EnvelopeMsg{
Expand Down
4 changes: 4 additions & 0 deletions internal/envelope/conn/trace_readwrite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ func (p *pipeForTest) HandleTraceSpans(_ context.Context, spans *protos.TraceSpa
return nil
}

func (*pipeForTest) LogBatch(context.Context, *protos.LogEntryBatch) error {
return nil
}

func (*pipeForTest) HandleLogEntry(context.Context, *protos.LogEntry) error {
return nil
}
Expand Down
14 changes: 0 additions & 14 deletions internal/envelope/conn/weavelet_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,20 +119,6 @@ func (w *WeaveletConn) Listener() net.Listener {
return w.lis
}

// ActivateComponentRPC ensures that the provided component is running
// somewhere. A call to ActivateComponentRPC also implicitly signals that a
// weavelet is interested in receiving routing info for the component.
func (w *WeaveletConn) ActivateComponentRPC(req *protos.ActivateComponentRequest) error {
reply, err := w.rpc(&protos.WeaveletMsg{ActivateComponentRequest: req})
if err != nil {
return err
}
if reply.ActivateComponentReply == nil {
return fmt.Errorf("nil ActivateComponentReply received from envelope")
}
return nil
}

// GetListenerAddressRPC returns the address the weavelet should listen on for
// a particular listener.
func (w *WeaveletConn) GetListenerAddressRPC(req *protos.GetListenerAddressRequest) (*protos.GetListenerAddressReply, error) {
Expand Down
34 changes: 34 additions & 0 deletions internal/testdeployer/remoteweavelet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ package testdeployer
import (
"context"
"fmt"
"log/slog"
"net"
"os"
"sync"
"testing"
"time"

"github.com/ServiceWeaver/weaver/internal/control"
"github.com/ServiceWeaver/weaver/internal/envelope/conn"
"github.com/ServiceWeaver/weaver/internal/reflection"
"github.com/ServiceWeaver/weaver/internal/weaver"
Expand Down Expand Up @@ -139,6 +141,7 @@ type deployer struct {
ctx context.Context // context used to spawn weavelets
cancel context.CancelFunc // shuts down the deployer and all weavelets
logger *logging.TestLogger // logger
threads *errgroup.Group // background threads
placement map[string][]string // weavelet -> components
placedAt map[string][]string // component -> weavelets
weavelets map[string]*weavelet // weavelets
Expand Down Expand Up @@ -189,6 +192,11 @@ func deploy(t *testing.T, ctx context.Context, placement map[string][]string) *d
// argument.
func deployWithInfo(t *testing.T, ctx context.Context, placement map[string][]string, info *protos.EnvelopeInfo) *deployer {
t.Helper()
udsPath := deployers.NewUnixSocketPath(t.TempDir())
uds, err := net.Listen("unix", udsPath)
if err != nil {
t.Fatal(err)
}

// Invert placement.
placedAt := map[string][]string{}
Expand All @@ -200,21 +208,39 @@ func deployWithInfo(t *testing.T, ctx context.Context, placement map[string][]st

// Create the deployer.
ctx, cancel := context.WithCancel(ctx)
threads, ctx := errgroup.WithContext(ctx)
d := &deployer{
t: t,
ctx: ctx,
cancel: cancel,
logger: logging.NewTestLogger(t, testing.Verbose()),
threads: threads,
placement: placement,
placedAt: placedAt,
weavelets: map[string]*weavelet{},
}

// Handle calls from weavelets.
logger := slog.New(&logging.LogHandler{Write: d.logger.Log})
threads.Go(func() error {
return deployers.ServeComponents(ctx, uds, logger, map[string]any{
control.DeployerPath: d,
})
})

// Spawn the weavelets.
for name := range placement {
info := protomsg.Clone(info)
info.Id = uuid.New().String()
info.ControlSocket = deployers.NewUnixSocketPath(t.TempDir())
info.Redirects = []*protos.EnvelopeInfo_Redirect{
// Supply custom deployer control component
{
Component: control.DeployerPath,
Target: control.DeployerPath,
Address: "unix://" + udsPath,
},
}
weavelet, err := spawn(ctx, info, d)
if err != nil {
t.Fatal(err)
Expand All @@ -234,6 +260,14 @@ func (d *deployer) shutdown() {
}
}

// LogBatch implements the control.DeployerControl interface.
func (d *deployer) LogBatch(ctx context.Context, batch *protos.LogEntryBatch) error {
for _, entry := range batch.Entries {
d.logger.Log(entry)
}
return nil
}

// ActivateComponent implements the EnvelopeHandler interface.
func (d *deployer) ActivateComponent(ctx context.Context, req *protos.ActivateComponentRequest) (*protos.ActivateComponentReply, error) {
d.mu.Lock()
Expand Down
49 changes: 8 additions & 41 deletions internal/tool/multi/deployer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"syscall"
"time"

"github.com/ServiceWeaver/weaver/internal/control"
imetrics "github.com/ServiceWeaver/weaver/internal/metrics"
"github.com/ServiceWeaver/weaver/internal/proxy"
"github.com/ServiceWeaver/weaver/internal/routing"
Expand All @@ -38,7 +37,6 @@ import (
"github.com/ServiceWeaver/weaver/runtime"
"github.com/ServiceWeaver/weaver/runtime/bin"
"github.com/ServiceWeaver/weaver/runtime/colors"
"github.com/ServiceWeaver/weaver/runtime/deployers"
"github.com/ServiceWeaver/weaver/runtime/envelope"
"github.com/ServiceWeaver/weaver/runtime/graph"
"github.com/ServiceWeaver/weaver/runtime/logging"
Expand All @@ -55,16 +53,12 @@ import (
// The default number of times a component is replicated.
const defaultReplication = 2

// Path name for the deployer control component we implement.
const deployerControlPath = "github.com/ServiceWeaver/weaver/deployerControl"

// A deployer manages an application deployment.
type deployer struct {
ctx context.Context
ctxCancel context.CancelFunc
deploymentId string
tmpDir string // Private directory for this weavelet/envelope
udsPath string // Path to Unix domain socket
config *MultiConfig
started time.Time
logger *slog.Logger
Expand All @@ -82,11 +76,8 @@ type deployer struct {
err error // error that stopped the babysitter
groups map[string]*group // groups, by component name
proxies map[string]*proxyInfo // proxies, by listener name

}

var _ control.DeployerControl = &deployer{}

// A group contains information about a co-location group.
type group struct {
name string // group name
Expand Down Expand Up @@ -151,19 +142,11 @@ func newDeployer(ctx context.Context, deploymentId string, config *MultiConfig,
return nil, fmt.Errorf("cannot open Perfetto database: %w", err)
}

// Make Unix domain socket listener for serving hosted system components.
udsPath := deployers.NewUnixSocketPath(tmpDir)
uds, err := net.Listen("unix", udsPath)
if err != nil {
return nil, err
}

ctx, cancel := context.WithCancel(ctx)
d := &deployer{
ctx: ctx,
ctxCancel: cancel,
tmpDir: tmpDir,
udsPath: udsPath,
logger: logger,
caCert: caCert,
caKey: caKey,
Expand Down Expand Up @@ -197,15 +180,6 @@ func newDeployer(ctx context.Context, deploymentId string, config *MultiConfig,
return err
})

// Start a goroutine that serves calls to system components like deployerControl.
d.running.Go(func() error {
err := deployers.ServeComponents(d.ctx, uds, d.logger, map[string]any{
deployerControlPath: d,
})
d.stop(err)
return err
})

return d, nil
}

Expand Down Expand Up @@ -355,14 +329,6 @@ func (d *deployer) startColocationGroup(g *group) error {
RunMain: g.started[runtime.Main],
Mtls: d.config.Mtls,
InternalAddress: "localhost:0",
Redirects: []*protos.EnvelopeInfo_Redirect{
// Supply custom deployer control component
{
Component: deployerControlPath,
Target: deployerControlPath,
Address: "unix://" + d.udsPath,
},
},
}
e, err := envelope.NewEnvelope(d.ctx, info, d.config.App, envelope.Options{
Logger: d.logger,
Expand All @@ -375,13 +341,14 @@ func (d *deployer) startColocationGroup(g *group) error {
// compiled binary.
wlet := e.WeaveletInfo()

h := &handler{
deployer: d,
g: g,
subscribed: map[string]bool{},
envelope: e,
}

d.running.Go(func() error {
h := &handler{
deployer: d,
g: g,
subscribed: map[string]bool{},
envelope: e,
}
err := e.Serve(h)
d.stop(err)
return err
Expand All @@ -403,7 +370,7 @@ func (d *deployer) startMain() error {
})
}

// ActivateComponent implements the envelope.EnvelopeHandler interface.
// ActivateComponent implements the control.DeployerControl interface.
func (h *handler) ActivateComponent(_ context.Context, req *protos.ActivateComponentRequest) (*protos.ActivateComponentReply, error) {
if err := h.subscribeTo(req); err != nil {
return nil, err
Expand Down
16 changes: 16 additions & 0 deletions internal/tool/ssh/impl/babysitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,22 @@ func (b *babysitter) watchComponents() {
}
}

// LogBatch implements the protos.EnvelopeHandler interface.
func (b *babysitter) LogBatch(_ context.Context, req *protos.LogEntryBatch) error {
// TODO: Support batched log delivery
for _, entry := range req.Entries {
if err := protomsg.Call(b.ctx, protomsg.CallArgs{
Client: http.DefaultClient,
Addr: b.info.ManagerAddr,
URLPath: recvLogEntryURL,
Request: entry,
}); err != nil {
return err
}
}
return nil
}

// HandleLogEntry implements the protos.EnvelopeHandler interface.
func (b *babysitter) HandleLogEntry(_ context.Context, req *protos.LogEntry) error {
return protomsg.Call(b.ctx, protomsg.CallArgs{
Expand Down
Loading

0 comments on commit d181e06

Please sign in to comment.