Skip to content

Commit

Permalink
Wire up client bridges
Browse files Browse the repository at this point in the history
Signed-off-by: Maksym Pavlenko <pavlenko.maksym@gmail.com>
  • Loading branch information
mxpv committed Feb 11, 2023
1 parent 4b1ebef commit 9e5c207
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 20 deletions.
14 changes: 10 additions & 4 deletions plugins/sandbox/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,10 @@ func (c *controllerLocal) Create(ctx context.Context, sandboxID string, opts ...
return fmt.Errorf("failed to start new sandbox: %w", err)
}

svc := runtimeAPI.NewTTRPCSandboxClient(shim.Client())
svc, err := sandbox.NewClient(shim.Client())
if err != nil {
return err
}

var options *anypb.Any
if coptions.Options != nil {
Expand Down Expand Up @@ -136,7 +139,11 @@ func (c *controllerLocal) Start(ctx context.Context, sandboxID string) (sandbox.
return sandbox.ControllerInstance{}, fmt.Errorf("unable to find sandbox %q", sandboxID)
}

svc := runtimeAPI.NewTTRPCSandboxClient(shim.Client())
svc, err := sandbox.NewClient(shim.Client())
if err != nil {
return sandbox.ControllerInstance{}, err
}

resp, err := svc.StartSandbox(ctx, &runtimeAPI.StartSandboxRequest{SandboxID: sandboxID})
if err != nil {
return sandbox.ControllerInstance{}, fmt.Errorf("failed to start sandbox %s: %w", sandboxID, errdefs.FromGRPC(err))
Expand Down Expand Up @@ -258,6 +265,5 @@ func (c *controllerLocal) getSandbox(ctx context.Context, id string) (runtimeAPI
return nil, errdefs.ErrNotFound
}

svc := runtimeAPI.NewTTRPCSandboxClient(shim.Client())
return svc, nil
return sandbox.NewClient(shim.Client())
}
2 changes: 1 addition & 1 deletion runtime/v2/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func NewTaskClient(client interface{}) (v2.TaskService, error) {
return &grpcBridge{client}, nil
}

return nil, fmt.Errorf("unsupported client type %T", client)
return nil, fmt.Errorf("unsupported shim client type %T", client)
}

// grpcBridge implements `v2.TaskService` interface for GRPC shim server.
Expand Down
26 changes: 18 additions & 8 deletions runtime/v2/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,11 @@ func (m *TaskManager) Create(ctx context.Context, taskID string, opts runtime.Cr

// Cast to shim task and call task service to create a new container task instance.
// This will not be required once shim service / client implemented.
shimTask := newShimTask(shim)
shimTask, err := newShimTask(shim)
if err != nil {
return nil, err
}

t, err := shimTask.Create(ctx, opts)
if err != nil {
// NOTE: ctx contains required namespace information.
Expand All @@ -443,7 +447,7 @@ func (m *TaskManager) Create(ctx context.Context, taskID string, opts runtime.Cr
}

shimTask.Shutdown(dctx)
shimTask.Client().Close()
shimTask.Close()
}

return nil, fmt.Errorf("failed to create shim task: %w", err)
Expand All @@ -458,7 +462,7 @@ func (m *TaskManager) Get(ctx context.Context, id string) (runtime.Task, error)
if err != nil {
return nil, err
}
return newShimTask(shim), nil
return newShimTask(shim)
}

// Tasks lists all tasks
Expand All @@ -469,7 +473,11 @@ func (m *TaskManager) Tasks(ctx context.Context, all bool) ([]runtime.Task, erro
}
out := make([]runtime.Task, len(shims))
for i := range shims {
out[i] = newShimTask(shims[i])
newClient, err := newShimTask(shims[i])
if err != nil {
return nil, err
}
out[i] = newClient
}
return out, nil
}
Expand All @@ -486,10 +494,12 @@ func (m *TaskManager) Delete(ctx context.Context, taskID string) (*runtime.Exit,
return nil, err
}

var (
sandboxed = container.SandboxID != ""
shimTask = newShimTask(shim)
)
shimTask, err := newShimTask(shim)
if err != nil {
return nil, err
}

sandboxed := container.SandboxID != ""

exit, err := shimTask.delete(ctx, sandboxed, func(ctx context.Context, id string) {
m.manager.shims.Delete(ctx, id)
Expand Down
29 changes: 23 additions & 6 deletions runtime/v2/shim.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,13 @@ func loadShim(ctx context.Context, bundle *Bundle, onClose func()) (_ ShimInstan
}
ctx, cancel := timeout.WithContext(ctx, loadTimeout)
defer cancel()

// Check connectivity, TaskService is the only required service, so create a temp one to check connection.
s := newShimTask(shim)
s, err := newShimTask(shim)
if err != nil {
return nil, err
}

if _, err := s.PID(ctx); err != nil {
return nil, err
}
Expand Down Expand Up @@ -176,14 +181,16 @@ func cleanupAfterDeadShim(ctx context.Context, id string, rt *runtime.NSMap[Shim

// ShimInstance represents running shim process managed by ShimManager.
type ShimInstance interface {
io.Closer

// ID of the shim.
ID() string
// Namespace of this shim.
Namespace() string
// Bundle is a file system path to shim's bundle.
Bundle() string
// Client returns the underlying TTRPC client for this shim.
Client() *ttrpc.Client
Client() interface{}
// Delete will close the client and remove bundle from disk.
Delete(ctx context.Context) error
}
Expand All @@ -208,10 +215,15 @@ func (s *shim) Bundle() string {
return s.bundle.Path
}

func (s *shim) Client() *ttrpc.Client {
func (s *shim) Client() interface{} {
return s.client
}

// Close closes the underlying client connection.
func (s *shim) Close() error {
return s.client.Close()
}

func (s *shim) Delete(ctx context.Context) error {
var (
result *multierror.Error
Expand Down Expand Up @@ -241,11 +253,16 @@ type shimTask struct {
task task.TaskService
}

func newShimTask(shim ShimInstance) *shimTask {
func newShimTask(shim ShimInstance) (*shimTask, error) {
taskClient, err := NewTaskClient(shim.Client())
if err != nil {
return nil, err
}

return &shimTask{
ShimInstance: shim,
task: task.NewTaskClient(shim.Client()),
}
task: taskClient,
}, nil
}

func (s *shimTask) Shutdown(ctx context.Context) error {
Expand Down
5 changes: 4 additions & 1 deletion runtime/v2/shim_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,10 @@ func (m *ShimManager) loadShims(ctx context.Context) error {
cleanupAfterDeadShim(ctx, id, m.shims, m.events, binaryCall)
continue
}
shim := newShimTask(instance)
shim, err := newShimTask(instance)
if err != nil {
return err
}

// There are 3 possibilities for the loaded shim here:
// 1. It could be a shim that is running a task.
Expand Down

0 comments on commit 9e5c207

Please sign in to comment.