Skip to content

Commit

Permalink
runc-shim: handle pending execs as running
Browse files Browse the repository at this point in the history
This commit rewrites and simplifies a lot of this logic to reduce it's
complexity, and also handle the case where the container doesn't have
it's own pid-namespace, which means that we're not guaranteed to receive
the init exit last.

This is achieved by replacing `s.pendingExecs` with `s.runningExecs`,
for which both (previously) pending and de facto running execs are
considered.

The new exit handling logic can be summed up by:
- when we receive an init exit, stash it it in `s.containerInitExit`,
  and if a container's init process has exited, refuse new execs.
- (if the container does not have it's own pidns) kill all running
  processes (if the container has a private pid-namespace, then all
  processes will be dead already).
- wait for the container's running exec count (which includes execs
  which have been started but might still early exit) to get to 0.
- publish the stashed away init exit.

Signed-off-by: Laura Brehm <laurabrehm@hey.com>
  • Loading branch information
laurazard committed Sep 4, 2024
1 parent e735791 commit 421a4b5
Showing 1 changed file with 104 additions and 88 deletions.
192 changes: 104 additions & 88 deletions cmd/containerd-shim-runc-v2/task/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,17 @@ func NewTaskService(ctx context.Context, publisher shim.Publisher, sd shutdown.S
}
go ep.Run(ctx)
s := &service{
context: ctx,
events: make(chan interface{}, 128),
ec: reaper.Default.Subscribe(),
ep: ep,
shutdown: sd,
containers: make(map[string]*runc.Container),
running: make(map[int][]containerProcess),
pendingExecs: make(map[*runc.Container]int),
execable: make(map[*runc.Container]bool),
exitSubscribers: make(map[*map[int][]runcC.Exit]struct{}),
context: ctx,
events: make(chan interface{}, 128),
ec: reaper.Default.Subscribe(),
ep: ep,
shutdown: sd,
containers: make(map[string]*runc.Container),
running: make(map[int][]containerProcess),
runningExecs: make(map[*runc.Container]int),
execCountSubscribers: make(map[*runc.Container]chan<- int),
containerInitExit: make(map[*runc.Container]runcC.Exit),
exitSubscribers: make(map[*map[int][]runcC.Exit]struct{}),
}
go s.processExits()
runcC.Monitor = reaper.Default
Expand Down Expand Up @@ -118,13 +119,19 @@ type service struct {

lifecycleMu sync.Mutex
running map[int][]containerProcess // pid -> running process, guarded by lifecycleMu
pendingExecs map[*runc.Container]int // container -> num pending execs, guarded by lifecycleMu
// container -> execs can be started, guarded by lifecycleMu.
// Execs can be started if the container's init process (read: pid, not [process.Init])
// has been started and not yet reaped by the shim.
runningExecs map[*runc.Container]int // container -> num running execs, guarded by lifecycleMu
// container -> subscription to exec exits/changes to s.runningExecs[container],
// guarded by lifecycleMu
execCountSubscribers map[*runc.Container]chan<- int
// container -> init exits, guarded by lifecycleMu
// Used to stash container init process exits, so that we can hold them
// until after we've made sure to publish all the container's exec exits.
// Also used to prevent starting new execs from being started if the
// container's init process (read: pid, not [process.Init]) has already been
// reaped by the shim.
// Note that this flag gets updated before the container's [process.Init.Status]
// is transitioned to "stopped".
execable map[*runc.Container]bool
containerInitExit map[*runc.Container]runcC.Exit
// Subscriptions to exits for PIDs. Adding/deleting subscriptions and
// dereferencing the subscription pointers must only be done while holding
// lifecycleMu.
Expand All @@ -145,8 +152,7 @@ type containerProcess struct {
//
// The returned handleStarted closure records that the process has started so
// that its exit can be handled efficiently. If the process has already exited,
// it handles the exit immediately. In addition, if the process is an exec and
// its container's init process has already exited, that exit is also processed.
// it handles the exit immediately.
// handleStarted should be called after the event announcing the start of the
// process has been published. Note that s.lifecycleMu must not be held when
// calling handleStarted.
Expand Down Expand Up @@ -181,44 +187,8 @@ func (s *service) preStart(c *runc.Container) (handleStarted func(*runc.Containe
pid = p.Pid()
}

_, init := p.(*process.Init)
s.lifecycleMu.Lock()

var initExits []runcC.Exit
var initCps []containerProcess
if !init {
s.pendingExecs[c]--

initPid := c.Pid()
iExits, initExited := exits[initPid]
if initExited && s.pendingExecs[c] == 0 {
// c's init process has exited before handleStarted was called and
// this is the last pending exec process start - we need to process
// the exit for the init process after processing this exec, so:
// - delete c from the s.pendingExecs map
// - keep the exits for the init pid to process later (after we process
// this exec's exits)
// - get the necessary containerProcesses for the init process (that we
// need to process the exits), and remove them from s.running (which we skipped
// doing in processExits).
delete(s.pendingExecs, c)
initExits = iExits
var skipped []containerProcess
for _, initPidCp := range s.running[initPid] {
if initPidCp.Container == c {
initCps = append(initCps, initPidCp)
} else {
skipped = append(skipped, initPidCp)
}
}
if len(skipped) == 0 {
delete(s.running, initPid)
} else {
s.running[initPid] = skipped
}
}
}

ees, exited := exits[pid]
delete(s.exitSubscribers, &exits)
exits = nil
Expand All @@ -227,20 +197,12 @@ func (s *service) preStart(c *runc.Container) (handleStarted func(*runc.Containe
for _, ee := range ees {
s.handleProcessExit(ee, c, p)
}
for _, eee := range initExits {
for _, cp := range initCps {
s.handleProcessExit(eee, cp.Container, cp.Process)
}
}
} else {
// Process start was successful, add to `s.running`.
s.running[pid] = append(s.running[pid], containerProcess{
Container: c,
Process: p,
})
if init {
s.execable[c] = true
}
s.lifecycleMu.Unlock()
}
}
Expand Down Expand Up @@ -315,18 +277,29 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
if r.ExecID == "" {
cinit = container
} else {
if !s.execable[container] {
if _, initExited := s.containerInitExit[container]; initExited {
s.lifecycleMu.Unlock()
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container %s init process is not running", container.ID)
}
s.pendingExecs[container]++
s.runningExecs[container]++
}
handleStarted, cleanup := s.preStart(cinit)
s.lifecycleMu.Unlock()
defer cleanup()

p, err := container.Start(ctx, r)
if err != nil {
// If we failed to even start the process, s.runningExecs
// won't get decremented in s.handleProcessExit. We still need
// to update it.
if r.ExecID != "" {
s.lifecycleMu.Lock()
s.runningExecs[container]--
if ch, ok := s.execCountSubscribers[container]; ok {
ch <- s.runningExecs[container]
}
s.lifecycleMu.Unlock()
}
handleStarted(container, p)
return nil, errdefs.ToGRPC(err)
}
Expand Down Expand Up @@ -691,31 +664,23 @@ func (s *service) processExits() {
// Handle the exit for a created/started process. If there's more than
// one, assume they've all exited. One of them will be the correct
// process.
var cps, skipped []containerProcess
var cps []containerProcess
for _, cp := range s.running[e.Pid] {
_, init := cp.Process.(*process.Init)
if init {
delete(s.execable, cp.Container)
}
if init && s.pendingExecs[cp.Container] != 0 {
// This exit relates to a container for which we have pending execs. In
// order to ensure order between execs and the init process for a given
// container, skip processing this exit here and let the `handleStarted`
// closure for the pending exec publish it.
skipped = append(skipped, cp)
} else {
cps = append(cps, cp)
s.containerInitExit[cp.Container] = e
}
cps = append(cps, cp)
}
if len(skipped) > 0 {
s.running[e.Pid] = skipped
} else {
delete(s.running, e.Pid)
}
delete(s.running, e.Pid)
s.lifecycleMu.Unlock()

for _, cp := range cps {
s.handleProcessExit(e, cp.Container, cp.Process)
if ip, ok := cp.Process.(*process.Init); ok {
s.handleInitExit(e, cp.Container, ip)
} else {
s.handleProcessExit(e, cp.Container, cp.Process)
}
}
}
}
Expand All @@ -724,17 +689,60 @@ func (s *service) send(evt interface{}) {
s.events <- evt
}

func (s *service) handleProcessExit(e runcC.Exit, c *runc.Container, p process.Process) {
if ip, ok := p.(*process.Init); ok {
// Ensure all children are killed
if runc.ShouldKillAllOnExit(s.context, c.Bundle) {
if err := ip.KillAll(s.context); err != nil {
log.G(s.context).WithError(err).WithField("id", ip.ID()).
Error("failed to kill init's children")
}
// handleInitExit processes container init process exits.
// This is handled separately from non-init exits, because there
// are some extra invariants we want to ensure in this case, namely:
// - for a given container, the init process exit MUST be the last exit published
// This is achieved by:
// - killing all running container processes (if the container has a shared pid
// namespace, otherwise all other processes have been reaped already).
// - waiting for the container's running exec counter to reach 0.
// - finally, publishing the init exit.
func (s *service) handleInitExit(e runcC.Exit, c *runc.Container, p *process.Init) {
// kill all running container processes
if runc.ShouldKillAllOnExit(s.context, c.Bundle) {
if err := p.KillAll(s.context); err != nil {
log.G(s.context).WithError(err).WithField("id", p.ID()).
Error("failed to kill init's children")
}
}

s.lifecycleMu.Lock()
numRunningExecs := s.runningExecs[c]
if numRunningExecs == 0 {
delete(s.runningExecs, c)
s.lifecycleMu.Unlock()
s.handleProcessExit(e, c, p)
return
}

events := make(chan int, numRunningExecs)
s.execCountSubscribers[c] = events

s.lifecycleMu.Unlock()

go func() {
defer func() {
s.lifecycleMu.Lock()
defer s.lifecycleMu.Unlock()
delete(s.execCountSubscribers, c)
delete(s.runningExecs, c)
}()

// wait for running processes to exit
for {
if runningExecs := <-events; runningExecs == 0 {
break
}
}

// all running processes have exited now, and no new
// ones can start, so we can publish the init exit
s.handleProcessExit(e, c, p)
}()
}

func (s *service) handleProcessExit(e runcC.Exit, c *runc.Container, p process.Process) {
p.SetExited(e.Status)
s.send(&eventstypes.TaskExit{
ContainerID: c.ID,
Expand All @@ -743,6 +751,14 @@ func (s *service) handleProcessExit(e runcC.Exit, c *runc.Container, p process.P
ExitStatus: uint32(e.Status),
ExitedAt: protobuf.ToTimestamp(p.ExitedAt()),
})
if _, init := p.(*process.Init); !init {
s.lifecycleMu.Lock()
s.runningExecs[c]--
if ch, ok := s.execCountSubscribers[c]; ok {
ch <- s.runningExecs[c]
}
s.lifecycleMu.Unlock()
}
}

func (s *service) getContainerPids(ctx context.Context, container *runc.Container) ([]uint32, error) {
Expand Down

0 comments on commit 421a4b5

Please sign in to comment.