Skip to content

Commit

Permalink
Move local sandbox controller under plugins package
Browse files Browse the repository at this point in the history
Add options to sandbox controller interface.
Update sandbox controller interface to fully utilize sandbox controller
interface.
Move grpc error conversion to service.

Signed-off-by: Derek McGowan <derek@mcg.dev>
  • Loading branch information
dmcgowan committed Feb 7, 2023
1 parent 2717685 commit a788f6c
Show file tree
Hide file tree
Showing 9 changed files with 384 additions and 269 deletions.
2 changes: 1 addition & 1 deletion pkg/cri/sbserver/podsandbox/sandbox_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ func (c *Controller) Start(ctx context.Context, id string) (cin sandbox.Controll
return
}

func (c *Controller) Create(ctx context.Context, _id string) error {
func (c *Controller) Create(ctx context.Context, _id string, _ ...sandbox.CreateOpt) error {
// Not used by pod-sandbox implementation as there is no need to split pause containers logic.
return nil
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/cri/sbserver/podsandbox/sandbox_stop.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ import (
sandboxstore "github.com/containerd/containerd/pkg/cri/store/sandbox"
ctrdutil "github.com/containerd/containerd/pkg/cri/util"
"github.com/containerd/containerd/protobuf"
"github.com/containerd/containerd/sandbox"
)

func (c *Controller) Stop(ctx context.Context, sandboxID string) error {
func (c *Controller) Stop(ctx context.Context, sandboxID string, _ ...sandbox.StopOpt) error {
sandbox, err := c.sandboxStore.Get(sandboxID)
if err != nil {
return fmt.Errorf("an error occurred when try to find sandbox %q: %w",
Expand Down
2 changes: 2 additions & 0 deletions plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ const (
TransferPlugin Type = "io.containerd.transfer.v1"
// SandboxStorePlugin implements a sandbox store
SandboxStorePlugin Type = "io.containerd.sandbox.store.v1"
// SandboxControllerPlugin implements a sandbox controller
SandboxControllerPlugin Type = "io.containerd.sandbox.controller.v1"
)

const (
Expand Down
263 changes: 263 additions & 0 deletions plugins/sandbox/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,263 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package sandbox

import (
"context"
"fmt"

runtimeAPI "github.com/containerd/containerd/api/runtime/sandbox/v1"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/events/exchange"
"github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/runtime"
v2 "github.com/containerd/containerd/runtime/v2"
"github.com/containerd/containerd/sandbox"

"google.golang.org/protobuf/types/known/anypb"
)

func init() {
plugin.Register(&plugin.Registration{
Type: plugin.SandboxControllerPlugin,
ID: "local",
Requires: []plugin.Type{
plugin.RuntimePluginV2,
plugin.EventPlugin,
plugin.SandboxStorePlugin,
},
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
shimPlugin, err := ic.GetByID(plugin.RuntimePluginV2, "shim")
if err != nil {
return nil, err
}

exchangePlugin, err := ic.GetByID(plugin.EventPlugin, "exchange")
if err != nil {
return nil, err
}

sbPlugin, err := ic.GetByID(plugin.SandboxStorePlugin, "local")
if err != nil {
return nil, err
}

var (
shims = shimPlugin.(*v2.ShimManager)
publisher = exchangePlugin.(*exchange.Exchange)
store = sbPlugin.(sandbox.Store)
)

return &controllerLocal{
shims: shims,
store: store,
publisher: publisher,
}, nil
},
})
}

type controllerLocal struct {
shims *v2.ShimManager
store sandbox.Store
publisher events.Publisher
}

var _ sandbox.Controller = (*controllerLocal)(nil)

func (c *controllerLocal) Create(ctx context.Context, sandboxID string, opts ...sandbox.CreateOpt) error {
var coptions sandbox.CreateOptions
for _, opt := range opts {
opt(&coptions)
}

if _, err := c.shims.Get(ctx, sandboxID); err == nil {
return fmt.Errorf("sandbox %s already running: %w", sandboxID, errdefs.ErrAlreadyExists)
}

info, err := c.store.Get(ctx, sandboxID)
if err != nil {
return fmt.Errorf("failed to query sandbox metadata from store: %w", err)
}

shim, err := c.shims.Start(ctx, sandboxID, runtime.CreateOpts{
Spec: info.Spec,
RuntimeOptions: info.Runtime.Options,
Runtime: info.Runtime.Name,
TaskOptions: nil,
})

if err != nil {
return fmt.Errorf("failed to start new sandbox: %w", err)
}

svc := runtimeAPI.NewTTRPCSandboxClient(shim.Client())

var options *anypb.Any
if coptions.Options != nil {
options = &anypb.Any{
TypeUrl: coptions.Options.GetTypeUrl(),
Value: coptions.Options.GetValue(),
}
}

if _, err := svc.CreateSandbox(ctx, &runtimeAPI.CreateSandboxRequest{
SandboxID: sandboxID,
BundlePath: shim.Bundle(),
Rootfs: coptions.Rootfs,
Options: options,
}); err != nil {
// TODO: Delete sandbox shim here.
return fmt.Errorf("failed to start sandbox %s: %w", sandboxID, errdefs.FromGRPC(err))
}

return nil
}

func (c *controllerLocal) Start(ctx context.Context, sandboxID string) (sandbox.ControllerInstance, error) {
shim, err := c.shims.Get(ctx, sandboxID)
if err != nil {
return sandbox.ControllerInstance{}, fmt.Errorf("unable to find sandbox %q", sandboxID)
}

svc := runtimeAPI.NewTTRPCSandboxClient(shim.Client())
resp, err := svc.StartSandbox(ctx, &runtimeAPI.StartSandboxRequest{SandboxID: sandboxID})
if err != nil {
return sandbox.ControllerInstance{}, fmt.Errorf("failed to start sandbox %s: %w", sandboxID, errdefs.FromGRPC(err))
}

return sandbox.ControllerInstance{
SandboxID: sandboxID,
Pid: resp.GetPid(),
CreatedAt: resp.GetCreatedAt().AsTime(),
}, nil
}

func (c *controllerLocal) Platform(ctx context.Context, sandboxID string) (platforms.Platform, error) {
svc, err := c.getSandbox(ctx, sandboxID)
if err != nil {
return platforms.Platform{}, err
}

response, err := svc.Platform(ctx, &runtimeAPI.PlatformRequest{SandboxID: sandboxID})
if err != nil {
return platforms.Platform{}, fmt.Errorf("failed to get sandbox platform: %w", errdefs.FromGRPC(err))
}

var platform platforms.Platform
if p := response.GetPlatform(); p != nil {
platform.OS = p.OS
platform.Architecture = p.Architecture
platform.Variant = p.Variant
}
return platform, nil
}

func (c *controllerLocal) Stop(ctx context.Context, sandboxID string, opts ...sandbox.StopOpt) error {
var soptions sandbox.StopOptions
for _, opt := range opts {
opt(&soptions)
}
req := &runtimeAPI.StopSandboxRequest{SandboxID: sandboxID}
if soptions.Timeout != nil {
req.TimeoutSecs = uint32(soptions.Timeout.Seconds())
}

svc, err := c.getSandbox(ctx, sandboxID)
if err != nil {
return err
}

if _, err := svc.StopSandbox(ctx, req); err != nil {
return fmt.Errorf("failed to stop sandbox: %w", errdefs.FromGRPC(err))
}

return nil
}

func (c *controllerLocal) Shutdown(ctx context.Context, sandboxID string) error {
svc, err := c.getSandbox(ctx, sandboxID)
if err != nil {
return err
}

_, err = svc.ShutdownSandbox(ctx, &runtimeAPI.ShutdownSandboxRequest{SandboxID: sandboxID})
if err != nil {
return fmt.Errorf("failed to shutdown sandbox: %w", errdefs.FromGRPC(err))
}

if err := c.shims.Delete(ctx, sandboxID); err != nil {
return fmt.Errorf("failed to delete sandbox shim: %w", err)
}

return nil
}

func (c *controllerLocal) Wait(ctx context.Context, sandboxID string) (sandbox.ExitStatus, error) {
svc, err := c.getSandbox(ctx, sandboxID)
if err != nil {
return sandbox.ExitStatus{}, err
}

resp, err := svc.WaitSandbox(ctx, &runtimeAPI.WaitSandboxRequest{
SandboxID: sandboxID,
})

if err != nil {
return sandbox.ExitStatus{}, fmt.Errorf("failed to wait sandbox %s: %w", sandboxID, errdefs.FromGRPC(err))
}

return sandbox.ExitStatus{
ExitStatus: resp.GetExitStatus(),
ExitedAt: resp.GetExitedAt().AsTime(),
}, nil
}

func (c *controllerLocal) Status(ctx context.Context, sandboxID string, verbose bool) (sandbox.ControllerStatus, error) {
svc, err := c.getSandbox(ctx, sandboxID)
if err != nil {
return sandbox.ControllerStatus{}, err
}

resp, err := svc.SandboxStatus(ctx, &runtimeAPI.SandboxStatusRequest{
SandboxID: sandboxID,
Verbose: verbose,
})
if err != nil {
return sandbox.ControllerStatus{}, fmt.Errorf("failed to query sandbox %s status: %w", sandboxID, err)
}

return sandbox.ControllerStatus{
SandboxID: resp.GetSandboxID(),
Pid: resp.GetPid(),
State: resp.GetState(),
ExitedAt: resp.GetCreatedAt().AsTime(),
Extra: resp.GetExtra(),
}, nil
}

func (c *controllerLocal) getSandbox(ctx context.Context, id string) (runtimeAPI.TTRPCSandboxService, error) {
shim, err := c.shims.Get(ctx, id)
if err != nil {
return nil, errdefs.ErrNotFound
}

svc := runtimeAPI.NewTTRPCSandboxClient(shim.Client())
return svc, nil
}
40 changes: 37 additions & 3 deletions sandbox/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,57 @@ import (
"context"
"time"

"github.com/containerd/containerd/api/types"
"github.com/containerd/containerd/platforms"
"github.com/containerd/typeurl"
)

type CreateOptions struct {
Rootfs []*types.Mount
Options typeurl.Any
}

type CreateOpt func(*CreateOptions)

// WithRootFS is used to create a sandbox with the provided rootfs mount
// TODO: Switch to mount.Mount once target added
func WithRootFS(m []*types.Mount) CreateOpt {
return func(co *CreateOptions) {
co.Rootfs = m
}
}

func WithOptions(a typeurl.Any) CreateOpt {
return func(co *CreateOptions) {
co.Options = a
}
}

type StopOptions struct {
Timeout *time.Duration
}

type StopOpt func(*StopOptions)

func WithTimeout(timeout time.Duration) StopOpt {
return func(so *StopOptions) {
so.Timeout = &timeout
}
}

// Controller is an interface to manage sandboxes at runtime.
// When running in sandbox mode, shim expected to implement `SandboxService`.
// Shim lifetimes are now managed manually via sandbox API by the containerd's client.
type Controller interface {
// Create is used to initialize sandbox environment.
Create(ctx context.Context, sandboxID string) error
// Create is used to initialize sandbox environment. (mounts, any)
Create(ctx context.Context, sandboxID string, opts ...CreateOpt) error
// Start will start previously created sandbox.
Start(ctx context.Context, sandboxID string) (ControllerInstance, error)
// Platform returns target sandbox OS that will be used by Controller.
// containerd will rely on this to generate proper OCI spec.
Platform(_ctx context.Context, _sandboxID string) (platforms.Platform, error)
// Stop will stop sandbox instance
Stop(ctx context.Context, sandboxID string) error
Stop(ctx context.Context, sandboxID string, opts ...StopOpt) error
// Wait blocks until sandbox process exits.
Wait(ctx context.Context, sandboxID string) (ExitStatus, error)
// Status will query sandbox process status. It is heavier than Ping call and must be used whenever you need to
Expand Down
Loading

0 comments on commit a788f6c

Please sign in to comment.