Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update reaper for multiple subscribers #1452

Merged
merged 1 commit into from
Aug 31, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 12 additions & 7 deletions linux/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ func (r *Runtime) Create(ctx context.Context, id string, opts runtime.CreateOpts
return nil, errors.Wrapf(err, "invalid task id")
}

ec := reaper.Default.Subscribe()
defer reaper.Default.Unsubscribe(ec)

bundle, err := newBundle(
namespace, id,
filepath.Join(r.state, namespace),
Expand Down Expand Up @@ -177,7 +180,7 @@ func (r *Runtime) Create(ctx context.Context, id string, opts runtime.CreateOpts
"id": id,
"namespace": namespace,
}).Warn("cleaning up after killed shim")
err = r.cleanupAfterDeadShim(context.Background(), bundle, namespace, id, lc.pid, true)
err = r.cleanupAfterDeadShim(context.Background(), bundle, namespace, id, lc.pid, ec)
if err == nil {
r.tasks.Delete(ctx, lc)
} else {
Expand Down Expand Up @@ -320,7 +323,7 @@ func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) {
"namespace": ns,
}).Error("connecting to shim")
pid, _ := runc.ReadPidFile(filepath.Join(bundle.path, client.InitPidFile))
err := r.cleanupAfterDeadShim(ctx, bundle, ns, id, pid, false)
err := r.cleanupAfterDeadShim(ctx, bundle, ns, id, pid, nil)
if err != nil {
log.G(ctx).WithError(err).WithField("bundle", bundle.path).
Error("cleaning up after dead shim")
Expand All @@ -336,18 +339,20 @@ func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) {
return o, nil
}

func (r *Runtime) cleanupAfterDeadShim(ctx context.Context, bundle *bundle, ns, id string, pid int, reap bool) error {
func (r *Runtime) cleanupAfterDeadShim(ctx context.Context, bundle *bundle, ns, id string, pid int, ec chan runc.Exit) error {
ctx = namespaces.WithNamespace(ctx, ns)
if err := r.terminate(ctx, bundle, ns, id); err != nil {
return errors.New("failed to terminate task, leaving bundle for debugging")
}

if reap {
if ec != nil {
// if sub-reaper is set, reap our new child
if v, err := sys.GetSubreaper(); err == nil && v == 1 {
reaper.Default.Register(pid, &reaper.Cmd{ExitCh: make(chan struct{})})
reaper.Default.WaitPid(pid)
reaper.Default.Delete(pid)
for e := range ec {
if e.Pid == pid {
break
}
}
}
}

Expand Down
6 changes: 3 additions & 3 deletions linux/shim/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ func WithStart(binary, address string, debug bool, exitHandler func()) ClientOpt
defer f.Close()

cmd := newCommand(binary, address, debug, config, f)
if err := reaper.Default.Start(cmd); err != nil {
ec, err := reaper.Default.Start(cmd)
if err != nil {
return nil, nil, errors.Wrapf(err, "failed to start shim")
}
defer func() {
Expand All @@ -53,8 +54,7 @@ func WithStart(binary, address string, debug bool, exitHandler func()) ClientOpt
}
}()
go func() {
reaper.Default.Wait(cmd)
reaper.Default.Delete(cmd.Process.Pid)
reaper.Default.Wait(cmd, ec)
exitHandler()
}()
log.G(ctx).WithFields(logrus.Fields{
Expand Down
6 changes: 2 additions & 4 deletions linux/shim/init_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"

"github.com/containerd/console"
"github.com/containerd/containerd/errdefs"
shimapi "github.com/containerd/containerd/linux/shim/v1"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -345,10 +346,7 @@ func (s *stoppedState) Delete(ctx context.Context) error {
}

func (s *stoppedState) Kill(ctx context.Context, sig uint32, all bool) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()

return s.p.kill(ctx, sig, all)
return errdefs.ToGRPCf(errdefs.ErrNotFound, "process %s not found", s.p.id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe use failed preconditions here? Although I guess not found make it easier, only need to check against one type of error

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its this way because we map no such process syscall error to NotFound

}

func (s *stoppedState) SetExited(status int) {
Expand Down
49 changes: 26 additions & 23 deletions linux/shim/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/reaper"
"github.com/containerd/containerd/runtime"
runc "github.com/containerd/go-runc"
google_protobuf "github.com/golang/protobuf/ptypes/empty"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -48,7 +49,9 @@ func NewService(path, namespace, workDir string, publisher events.Publisher) (*S
namespace: namespace,
context: context,
workDir: workDir,
ec: reaper.Default.Subscribe(),
}
go s.processExits()
if err := s.initPlatform(); err != nil {
return nil, errors.Wrap(err, "failed to initialized platform behavior")
}
Expand All @@ -70,31 +73,27 @@ type Service struct {
mu sync.Mutex
processes map[string]process
events chan interface{}
eventsMu sync.Mutex
deferredEvent interface{}
namespace string
context context.Context
ec chan runc.Exit

workDir string
platform platform
}

func (s *Service) Create(ctx context.Context, r *shimapi.CreateTaskRequest) (*shimapi.CreateTaskResponse, error) {
s.mu.Lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use defer s.mu.Lock() now?

Otherwise, move it back after newInitProcess or unlock it in the error path.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

defer s.mu.Unlock()
process, err := newInitProcess(ctx, s.platform, s.path, s.namespace, s.workDir, r)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
s.mu.Lock()
// save the main task id and bundle to the shim for additional requests
s.id = r.ID
s.bundle = r.Bundle
pid := process.Pid()
s.processes[r.ID] = process
s.mu.Unlock()
cmd := &reaper.Cmd{
ExitCh: make(chan struct{}),
}
reaper.Default.Register(pid, cmd)
s.events <- &eventsapi.TaskCreate{
ContainerID: r.ID,
Bundle: r.Bundle,
Expand All @@ -108,7 +107,6 @@ func (s *Service) Create(ctx context.Context, r *shimapi.CreateTaskRequest) (*sh
Checkpoint: r.Checkpoint,
Pid: uint32(pid),
}
go s.waitExit(process, pid, cmd)
return &shimapi.CreateTaskResponse{
Pid: uint32(pid),
}, nil
Expand All @@ -129,11 +127,6 @@ func (s *Service) Start(ctx context.Context, r *shimapi.StartRequest) (*shimapi.
}
} else {
pid := p.Pid()
cmd := &reaper.Cmd{
ExitCh: make(chan struct{}),
}
reaper.Default.Register(pid, cmd)
go s.waitExit(p, pid, cmd)
s.events <- &eventsapi.TaskExecStarted{
ContainerID: s.id,
ExecID: r.ID,
Expand Down Expand Up @@ -392,17 +385,27 @@ func (s *Service) deleteProcess(id string) {
s.mu.Unlock()
}

func (s *Service) waitExit(p process, pid int, cmd *reaper.Cmd) {
status, _ := reaper.Default.WaitPid(pid)
p.SetExited(status)
func (s *Service) processExits() {
for e := range s.ec {
s.checkProcesses(e)
}
}

reaper.Default.Delete(pid)
s.events <- &eventsapi.TaskExit{
ContainerID: s.id,
ID: p.ID(),
Pid: uint32(pid),
ExitStatus: uint32(status),
ExitedAt: p.ExitedAt(),
func (s *Service) checkProcesses(e runc.Exit) {
s.mu.Lock()
defer s.mu.Unlock()
for _, p := range s.processes {
if p.Pid() == e.Pid {
p.SetExited(e.Status)
s.events <- &eventsapi.TaskExit{
ContainerID: s.id,
ID: p.ID(),
Pid: uint32(e.Pid),
ExitStatus: uint32(e.Status),
ExitedAt: p.ExitedAt(),
}
return
}
}
}

Expand Down
126 changes: 45 additions & 81 deletions reaper/reaper.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,49 +6,45 @@ import (
"bytes"
"os/exec"
"sync"
"time"

"github.com/containerd/containerd/sys"
runc "github.com/containerd/go-runc"
"github.com/pkg/errors"
)

var (
ErrNoSuchProcess = errors.New("no such process")
)
var ErrNoSuchProcess = errors.New("no such process")

const bufferSize = 2048

// Reap should be called when the process receives an SIGCHLD. Reap will reap
// all exited processes and close their wait channels
func Reap() error {
now := time.Now()
exits, err := sys.Reap(false)
for _, e := range exits {
Default.Lock()
c, ok := Default.cmds[e.Pid]
if !ok {
Default.unknown[e.Pid] = e.Status
Default.Unlock()
continue
}
Default.Unlock()
if c.c != nil {
// after we get an exit, call wait on the go process to make sure all
// pipes are closed and finalizers are run on the process
c.c.Wait()
Default.Lock()
for c := range Default.subscribers {
for _, e := range exits {
c <- runc.Exit{
Timestamp: now,
Pid: e.Pid,
Status: e.Status,
}
}
c.exitStatus = e.Status
close(c.ExitCh)

}
Default.Unlock()
return err
}

var Default = &Monitor{
cmds: make(map[int]*Cmd),
unknown: make(map[int]int),
subscribers: make(map[chan runc.Exit]struct{}),
}

type Monitor struct {
sync.Mutex

cmds map[int]*Cmd
unknown map[int]int
subscribers map[chan runc.Exit]struct{}
}

func (m *Monitor) Output(c *exec.Cmd) ([]byte, error) {
Expand All @@ -69,82 +65,50 @@ func (m *Monitor) CombinedOutput(c *exec.Cmd) ([]byte, error) {
}

// Start starts the command a registers the process with the reaper
func (m *Monitor) Start(c *exec.Cmd) error {
rc := &Cmd{
c: c,
ExitCh: make(chan struct{}),
}
// start the process
m.Lock()
err := c.Start()
if c.Process != nil {
m.RegisterNL(c.Process.Pid, rc)
func (m *Monitor) Start(c *exec.Cmd) (chan runc.Exit, error) {
ec := m.Subscribe()
if err := c.Start(); err != nil {
m.Unsubscribe(ec)
return nil, err
}
m.Unlock()
return err
return ec, nil
}

// Run runs and waits for the command to finish
func (m *Monitor) Run(c *exec.Cmd) error {
if err := m.Start(c); err != nil {
ec, err := m.Start(c)
if err != nil {
return err
}
_, err := m.Wait(c)
_, err = m.Wait(c, ec)
return err
}

func (m *Monitor) Wait(c *exec.Cmd) (int, error) {
return m.WaitPid(c.Process.Pid)
}

func (m *Monitor) Register(pid int, c *Cmd) {
m.Lock()
m.RegisterNL(pid, c)
m.Unlock()
}

// RegisterNL does not grab the lock internally
// the caller is responsible for locking the monitor
func (m *Monitor) RegisterNL(pid int, c *Cmd) {
if status, ok := m.unknown[pid]; ok {
delete(m.unknown, pid)
m.cmds[pid] = c
c.exitStatus = status
close(c.ExitCh)
return
func (m *Monitor) Wait(c *exec.Cmd, ec chan runc.Exit) (int, error) {
for e := range ec {
if e.Pid == c.Process.Pid {
// make sure we flush all IO
c.Wait()
m.Unsubscribe(ec)
return e.Status, nil
}
}
m.cmds[pid] = c
// return no such process if the ec channel is closed and no more exit
// events will be sent
return -1, ErrNoSuchProcess
}

func (m *Monitor) WaitPid(pid int) (int, error) {
func (m *Monitor) Subscribe() chan runc.Exit {
c := make(chan runc.Exit, bufferSize)
m.Lock()
rc, ok := m.cmds[pid]
m.subscribers[c] = struct{}{}
m.Unlock()
if !ok {
return 255, errors.Wrapf(ErrNoSuchProcess, "pid %d", pid)
}
<-rc.ExitCh
if rc.exitStatus != 0 {
return rc.exitStatus, errors.Errorf("exit status %d", rc.exitStatus)
}
return rc.exitStatus, nil
return c
}

// Command returns the registered pid for the command created
func (m *Monitor) Command(pid int) *Cmd {
func (m *Monitor) Unsubscribe(c chan runc.Exit) {
m.Lock()
defer m.Unlock()
return m.cmds[pid]
}

func (m *Monitor) Delete(pid int) {
m.Lock()
delete(m.cmds, pid)
delete(m.subscribers, c)
close(c)
m.Unlock()
}

type Cmd struct {
c *exec.Cmd
ExitCh chan struct{}
exitStatus int
}
2 changes: 1 addition & 1 deletion vendor.conf
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
github.com/coreos/go-systemd 48702e0da86bd25e76cfef347e2adeb434a0d0a6
github.com/containerd/go-runc b85ac701de5065a66918203dd18f057433290807
github.com/containerd/go-runc e103f453ff3db23ec69d31371cadc1ea0ce87ec0
github.com/containerd/console 76d18fd1d66972718ab2284449591db0b3cdb4de
github.com/containerd/cgroups e6d1aa8c71c6103624b2c6e6f4be0863b67027f1
github.com/docker/go-metrics 8fd5772bf1584597834c6f7961a530f06cbfbb87
Expand Down
Loading