Skip to content

Commit

Permalink
Update reaper for multipe subscribers
Browse files Browse the repository at this point in the history
Depends on containerd/go-runc#24

The is currently a race with the reaper where you could miss some exit
events from processes.

The problem before and why the reaper was so complex was because
processes could fork, getting a pid, and then fail on an execve before
we would have time to register the process with the reaper.  This could
cause pids to fill up in a map as a way to reduce the race.

This changes makes the reaper handle multiple subscribers so that the
caller can handle locking, for when they want to wait for a specific
pid, without affecting other callers using the reaper code.

Exit events are broadcast to multiple subscribers, in the case, the runc
commands and container pids that we get from a pid-file.  Locking while
the entire container stats no longs affects runc commands where you want
to call `runc create` and wait until that has been completed.

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
  • Loading branch information
crosbymichael committed Aug 31, 2017
1 parent c2e894c commit 6b4c4a2
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 150 deletions.
19 changes: 12 additions & 7 deletions linux/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ func (r *Runtime) Create(ctx context.Context, id string, opts runtime.CreateOpts
return nil, errors.Wrapf(err, "invalid task id")
}

ec := reaper.Default.Subscribe()
defer reaper.Default.Unsubscribe(ec)

bundle, err := newBundle(
namespace, id,
filepath.Join(r.state, namespace),
Expand Down Expand Up @@ -177,7 +180,7 @@ func (r *Runtime) Create(ctx context.Context, id string, opts runtime.CreateOpts
"id": id,
"namespace": namespace,
}).Warn("cleaning up after killed shim")
err = r.cleanupAfterDeadShim(context.Background(), bundle, namespace, id, lc.pid, true)
err = r.cleanupAfterDeadShim(context.Background(), bundle, namespace, id, lc.pid, ec)
if err == nil {
r.tasks.Delete(ctx, lc)
} else {
Expand Down Expand Up @@ -320,7 +323,7 @@ func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) {
"namespace": ns,
}).Error("connecting to shim")
pid, _ := runc.ReadPidFile(filepath.Join(bundle.path, client.InitPidFile))
err := r.cleanupAfterDeadShim(ctx, bundle, ns, id, pid, false)
err := r.cleanupAfterDeadShim(ctx, bundle, ns, id, pid, nil)
if err != nil {
log.G(ctx).WithError(err).WithField("bundle", bundle.path).
Error("cleaning up after dead shim")
Expand All @@ -336,18 +339,20 @@ func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) {
return o, nil
}

func (r *Runtime) cleanupAfterDeadShim(ctx context.Context, bundle *bundle, ns, id string, pid int, reap bool) error {
func (r *Runtime) cleanupAfterDeadShim(ctx context.Context, bundle *bundle, ns, id string, pid int, ec chan runc.Exit) error {
ctx = namespaces.WithNamespace(ctx, ns)
if err := r.terminate(ctx, bundle, ns, id); err != nil {
return errors.New("failed to terminate task, leaving bundle for debugging")
}

if reap {
if ec != nil {
// if sub-reaper is set, reap our new child
if v, err := sys.GetSubreaper(); err == nil && v == 1 {
reaper.Default.Register(pid, &reaper.Cmd{ExitCh: make(chan struct{})})
reaper.Default.WaitPid(pid)
reaper.Default.Delete(pid)
for e := range ec {
if e.Pid == pid {
break
}
}
}
}

Expand Down
6 changes: 3 additions & 3 deletions linux/shim/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ func WithStart(binary, address string, debug bool, exitHandler func()) ClientOpt
defer f.Close()

cmd := newCommand(binary, address, debug, config, f)
if err := reaper.Default.Start(cmd); err != nil {
ec, err := reaper.Default.Start(cmd)
if err != nil {
return nil, nil, errors.Wrapf(err, "failed to start shim")
}
defer func() {
Expand All @@ -53,8 +54,7 @@ func WithStart(binary, address string, debug bool, exitHandler func()) ClientOpt
}
}()
go func() {
reaper.Default.Wait(cmd)
reaper.Default.Delete(cmd.Process.Pid)
reaper.Default.Wait(cmd, ec)
exitHandler()
}()
log.G(ctx).WithFields(logrus.Fields{
Expand Down
6 changes: 2 additions & 4 deletions linux/shim/init_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"

"github.com/containerd/console"
"github.com/containerd/containerd/errdefs"
shimapi "github.com/containerd/containerd/linux/shim/v1"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -345,10 +346,7 @@ func (s *stoppedState) Delete(ctx context.Context) error {
}

func (s *stoppedState) Kill(ctx context.Context, sig uint32, all bool) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()

return s.p.kill(ctx, sig, all)
return errdefs.ToGRPCf(errdefs.ErrNotFound, "process %s not found", s.p.id)
}

func (s *stoppedState) SetExited(status int) {
Expand Down
49 changes: 26 additions & 23 deletions linux/shim/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/reaper"
"github.com/containerd/containerd/runtime"
runc "github.com/containerd/go-runc"
google_protobuf "github.com/golang/protobuf/ptypes/empty"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -48,7 +49,9 @@ func NewService(path, namespace, workDir string, publisher events.Publisher) (*S
namespace: namespace,
context: context,
workDir: workDir,
ec: reaper.Default.Subscribe(),
}
go s.processExits()
if err := s.initPlatform(); err != nil {
return nil, errors.Wrap(err, "failed to initialized platform behavior")
}
Expand All @@ -70,31 +73,27 @@ type Service struct {
mu sync.Mutex
processes map[string]process
events chan interface{}
eventsMu sync.Mutex
deferredEvent interface{}
namespace string
context context.Context
ec chan runc.Exit

workDir string
platform platform
}

func (s *Service) Create(ctx context.Context, r *shimapi.CreateTaskRequest) (*shimapi.CreateTaskResponse, error) {
s.mu.Lock()
defer s.mu.Unlock()
process, err := newInitProcess(ctx, s.platform, s.path, s.namespace, s.workDir, r)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
s.mu.Lock()
// save the main task id and bundle to the shim for additional requests
s.id = r.ID
s.bundle = r.Bundle
pid := process.Pid()
s.processes[r.ID] = process
s.mu.Unlock()
cmd := &reaper.Cmd{
ExitCh: make(chan struct{}),
}
reaper.Default.Register(pid, cmd)
s.events <- &eventsapi.TaskCreate{
ContainerID: r.ID,
Bundle: r.Bundle,
Expand All @@ -108,7 +107,6 @@ func (s *Service) Create(ctx context.Context, r *shimapi.CreateTaskRequest) (*sh
Checkpoint: r.Checkpoint,
Pid: uint32(pid),
}
go s.waitExit(process, pid, cmd)
return &shimapi.CreateTaskResponse{
Pid: uint32(pid),
}, nil
Expand All @@ -129,11 +127,6 @@ func (s *Service) Start(ctx context.Context, r *shimapi.StartRequest) (*shimapi.
}
} else {
pid := p.Pid()
cmd := &reaper.Cmd{
ExitCh: make(chan struct{}),
}
reaper.Default.Register(pid, cmd)
go s.waitExit(p, pid, cmd)
s.events <- &eventsapi.TaskExecStarted{
ContainerID: s.id,
ExecID: r.ID,
Expand Down Expand Up @@ -392,17 +385,27 @@ func (s *Service) deleteProcess(id string) {
s.mu.Unlock()
}

func (s *Service) waitExit(p process, pid int, cmd *reaper.Cmd) {
status, _ := reaper.Default.WaitPid(pid)
p.SetExited(status)
func (s *Service) processExits() {
for e := range s.ec {
s.checkProcesses(e)
}
}

reaper.Default.Delete(pid)
s.events <- &eventsapi.TaskExit{
ContainerID: s.id,
ID: p.ID(),
Pid: uint32(pid),
ExitStatus: uint32(status),
ExitedAt: p.ExitedAt(),
func (s *Service) checkProcesses(e runc.Exit) {
s.mu.Lock()
defer s.mu.Unlock()
for _, p := range s.processes {
if p.Pid() == e.Pid {
p.SetExited(e.Status)
s.events <- &eventsapi.TaskExit{
ContainerID: s.id,
ID: p.ID(),
Pid: uint32(e.Pid),
ExitStatus: uint32(e.Status),
ExitedAt: p.ExitedAt(),
}
return
}
}
}

Expand Down
126 changes: 45 additions & 81 deletions reaper/reaper.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,49 +6,45 @@ import (
"bytes"
"os/exec"
"sync"
"time"

"github.com/containerd/containerd/sys"
runc "github.com/containerd/go-runc"
"github.com/pkg/errors"
)

var (
ErrNoSuchProcess = errors.New("no such process")
)
var ErrNoSuchProcess = errors.New("no such process")

const bufferSize = 2048

// Reap should be called when the process receives an SIGCHLD. Reap will reap
// all exited processes and close their wait channels
func Reap() error {
now := time.Now()
exits, err := sys.Reap(false)
for _, e := range exits {
Default.Lock()
c, ok := Default.cmds[e.Pid]
if !ok {
Default.unknown[e.Pid] = e.Status
Default.Unlock()
continue
}
Default.Unlock()
if c.c != nil {
// after we get an exit, call wait on the go process to make sure all
// pipes are closed and finalizers are run on the process
c.c.Wait()
Default.Lock()
for c := range Default.subscribers {
for _, e := range exits {
c <- runc.Exit{
Timestamp: now,
Pid: e.Pid,
Status: e.Status,
}
}
c.exitStatus = e.Status
close(c.ExitCh)

}
Default.Unlock()
return err
}

var Default = &Monitor{
cmds: make(map[int]*Cmd),
unknown: make(map[int]int),
subscribers: make(map[chan runc.Exit]struct{}),
}

type Monitor struct {
sync.Mutex

cmds map[int]*Cmd
unknown map[int]int
subscribers map[chan runc.Exit]struct{}
}

func (m *Monitor) Output(c *exec.Cmd) ([]byte, error) {
Expand All @@ -69,82 +65,50 @@ func (m *Monitor) CombinedOutput(c *exec.Cmd) ([]byte, error) {
}

// Start starts the command a registers the process with the reaper
func (m *Monitor) Start(c *exec.Cmd) error {
rc := &Cmd{
c: c,
ExitCh: make(chan struct{}),
}
// start the process
m.Lock()
err := c.Start()
if c.Process != nil {
m.RegisterNL(c.Process.Pid, rc)
func (m *Monitor) Start(c *exec.Cmd) (chan runc.Exit, error) {
ec := m.Subscribe()
if err := c.Start(); err != nil {
m.Unsubscribe(ec)
return nil, err
}
m.Unlock()
return err
return ec, nil
}

// Run runs and waits for the command to finish
func (m *Monitor) Run(c *exec.Cmd) error {
if err := m.Start(c); err != nil {
ec, err := m.Start(c)
if err != nil {
return err
}
_, err := m.Wait(c)
_, err = m.Wait(c, ec)
return err
}

func (m *Monitor) Wait(c *exec.Cmd) (int, error) {
return m.WaitPid(c.Process.Pid)
}

func (m *Monitor) Register(pid int, c *Cmd) {
m.Lock()
m.RegisterNL(pid, c)
m.Unlock()
}

// RegisterNL does not grab the lock internally
// the caller is responsible for locking the monitor
func (m *Monitor) RegisterNL(pid int, c *Cmd) {
if status, ok := m.unknown[pid]; ok {
delete(m.unknown, pid)
m.cmds[pid] = c
c.exitStatus = status
close(c.ExitCh)
return
func (m *Monitor) Wait(c *exec.Cmd, ec chan runc.Exit) (int, error) {
for e := range ec {
if e.Pid == c.Process.Pid {
// make sure we flush all IO
c.Wait()
m.Unsubscribe(ec)
return e.Status, nil
}
}
m.cmds[pid] = c
// return no such process if the ec channel is closed and no more exit
// events will be sent
return -1, ErrNoSuchProcess
}

func (m *Monitor) WaitPid(pid int) (int, error) {
func (m *Monitor) Subscribe() chan runc.Exit {
c := make(chan runc.Exit, bufferSize)
m.Lock()
rc, ok := m.cmds[pid]
m.subscribers[c] = struct{}{}
m.Unlock()
if !ok {
return 255, errors.Wrapf(ErrNoSuchProcess, "pid %d", pid)
}
<-rc.ExitCh
if rc.exitStatus != 0 {
return rc.exitStatus, errors.Errorf("exit status %d", rc.exitStatus)
}
return rc.exitStatus, nil
return c
}

// Command returns the registered pid for the command created
func (m *Monitor) Command(pid int) *Cmd {
func (m *Monitor) Unsubscribe(c chan runc.Exit) {
m.Lock()
defer m.Unlock()
return m.cmds[pid]
}

func (m *Monitor) Delete(pid int) {
m.Lock()
delete(m.cmds, pid)
delete(m.subscribers, c)
close(c)
m.Unlock()
}

type Cmd struct {
c *exec.Cmd
ExitCh chan struct{}
exitStatus int
}
2 changes: 1 addition & 1 deletion vendor.conf
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
github.com/coreos/go-systemd 48702e0da86bd25e76cfef347e2adeb434a0d0a6
github.com/containerd/go-runc b85ac701de5065a66918203dd18f057433290807
github.com/containerd/go-runc e103f453ff3db23ec69d31371cadc1ea0ce87ec0
github.com/containerd/console 76d18fd1d66972718ab2284449591db0b3cdb4de
github.com/containerd/cgroups e6d1aa8c71c6103624b2c6e6f4be0863b67027f1
github.com/docker/go-metrics 8fd5772bf1584597834c6f7961a530f06cbfbb87
Expand Down
Loading

0 comments on commit 6b4c4a2

Please sign in to comment.