Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

runc-shim: fix races/prevent init exits from being dropped #10651

Merged
merged 3 commits into from
Sep 5, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
188 changes: 110 additions & 78 deletions cmd/containerd-shim-runc-v2/task/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,17 @@ func NewTaskService(ctx context.Context, publisher shim.Publisher, sd shutdown.S
}
go ep.Run(ctx)
s := &service{
context: ctx,
events: make(chan interface{}, 128),
ec: reaper.Default.Subscribe(),
ep: ep,
shutdown: sd,
containers: make(map[string]*runc.Container),
running: make(map[int][]containerProcess),
pendingExecs: make(map[*runc.Container]int),
exitSubscribers: make(map[*map[int][]runcC.Exit]struct{}),
context: ctx,
events: make(chan interface{}, 128),
ec: reaper.Default.Subscribe(),
ep: ep,
shutdown: sd,
containers: make(map[string]*runc.Container),
running: make(map[int][]containerProcess),
runningExecs: make(map[*runc.Container]int),
execCountSubscribers: make(map[*runc.Container]chan<- int),
containerInitExit: make(map[*runc.Container]runcC.Exit),
exitSubscribers: make(map[*map[int][]runcC.Exit]struct{}),
}
go s.processExits()
runcC.Monitor = reaper.Default
Expand Down Expand Up @@ -117,7 +119,19 @@ type service struct {

lifecycleMu sync.Mutex
running map[int][]containerProcess // pid -> running process, guarded by lifecycleMu
pendingExecs map[*runc.Container]int // container -> num pending execs, guarded by lifecycleMu
runningExecs map[*runc.Container]int // container -> num running execs, guarded by lifecycleMu
// container -> subscription to exec exits/changes to s.runningExecs[container],
// guarded by lifecycleMu
execCountSubscribers map[*runc.Container]chan<- int
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may be better as a cond var with lifecyclemu to avoid more data races.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There'd still need to be some way for handleInitExit to be notified to check the count. That could be with a chan<- struct{}, but it seems cheaper to send the value over the chan<- int and avoid acquiring s.lifecycleMu if the value is > 0.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I did have a branch somewhere with a sync.Cond, but I ended up going with Cory's suggestion for a map of channels since it seemed simpler/more verifiably correct. In general I think I prefer this approach since it reduces lock contention over s.lifecycleMu inside the goroutine, and we only have to lock it to set the channel and then remove it when the goroutine is done, but let me know if you have a strong opinion/have spotted any race condition in particular!

// container -> init exits, guarded by lifecycleMu
// Used to stash container init process exits, so that we can hold them
// until after we've made sure to publish all the container's exec exits.
// Also used to prevent starting new execs from being started if the
// container's init process (read: pid, not [process.Init]) has already been
// reaped by the shim.
// Note that this flag gets updated before the container's [process.Init.Status]
// is transitioned to "stopped".
containerInitExit map[*runc.Container]runcC.Exit
// Subscriptions to exits for PIDs. Adding/deleting subscriptions and
// dereferencing the subscription pointers must only be done while holding
// lifecycleMu.
Expand All @@ -138,8 +152,7 @@ type containerProcess struct {
//
// The returned handleStarted closure records that the process has started so
// that its exit can be handled efficiently. If the process has already exited,
// it handles the exit immediately. In addition, if the process is an exec and
// its container's init process has already exited, that exit is also processed.
// it handles the exit immediately.
// handleStarted should be called after the event announcing the start of the
// process has been published. Note that s.lifecycleMu must not be held when
// calling handleStarted.
Expand Down Expand Up @@ -174,44 +187,8 @@ func (s *service) preStart(c *runc.Container) (handleStarted func(*runc.Containe
pid = p.Pid()
}

_, init := p.(*process.Init)
s.lifecycleMu.Lock()

var initExits []runcC.Exit
var initCps []containerProcess
if !init {
s.pendingExecs[c]--

initPid := c.Pid()
iExits, initExited := exits[initPid]
if initExited && s.pendingExecs[c] == 0 {
// c's init process has exited before handleStarted was called and
// this is the last pending exec process start - we need to process
// the exit for the init process after processing this exec, so:
// - delete c from the s.pendingExecs map
// - keep the exits for the init pid to process later (after we process
// this exec's exits)
// - get the necessary containerProcesses for the init process (that we
// need to process the exits), and remove them from s.running (which we skipped
// doing in processExits).
delete(s.pendingExecs, c)
initExits = iExits
var skipped []containerProcess
for _, initPidCp := range s.running[initPid] {
if initPidCp.Container == c {
initCps = append(initCps, initPidCp)
} else {
skipped = append(skipped, initPidCp)
}
}
if len(skipped) == 0 {
delete(s.running, initPid)
} else {
s.running[initPid] = skipped
}
}
}

ees, exited := exits[pid]
delete(s.exitSubscribers, &exits)
exits = nil
Expand All @@ -220,11 +197,6 @@ func (s *service) preStart(c *runc.Container) (handleStarted func(*runc.Containe
for _, ee := range ees {
s.handleProcessExit(ee, c, p)
}
for _, eee := range initExits {
for _, cp := range initCps {
s.handleProcessExit(eee, cp.Container, cp.Process)
}
}
} else {
// Process start was successful, add to `s.running`.
s.running[pid] = append(s.running[pid], containerProcess{
Expand Down Expand Up @@ -305,14 +277,29 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
if r.ExecID == "" {
cinit = container
} else {
s.pendingExecs[container]++
if _, initExited := s.containerInitExit[container]; initExited {
s.lifecycleMu.Unlock()
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container %s init process is not running", container.ID)
}
s.runningExecs[container]++
}
handleStarted, cleanup := s.preStart(cinit)
s.lifecycleMu.Unlock()
defer cleanup()

p, err := container.Start(ctx, r)
if err != nil {
// If we failed to even start the process, s.runningExecs
// won't get decremented in s.handleProcessExit. We still need
// to update it.
if r.ExecID != "" {
s.lifecycleMu.Lock()
s.runningExecs[container]--
if ch, ok := s.execCountSubscribers[container]; ok {
ch <- s.runningExecs[container]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like this could block if the for loop in the goroutine in handleInitExit has already received an event but hasn't run the defer.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me try to think through this:

  • Line 284 is s.runningExecs[container]++, protected by s.lifecycleMu. We can have two calls to Start that both (sequentially) invoke line 284.
  • Now assume we have the init process exit, SIGCHLD is received by the shim, and processExits reads, acquires s.lifecycleMu and stores s.containerInitExit[container]. No new execs can actually start after this, because of the check on line 280.
  • Then outside s.lifecycleMu handleInitProcess is invoked. We lock s.lifecyleMu, find s.runningExecs[container] is > 0, and make a 2-length channel in s.execCountSubscribers[container].
  • Both of the execs call container.Start, invoke runc, and fail because the container is no longer running
  • Each one acquires s.lifecycleMu and sends the count of runningExecs to the subscriber on line 299, first sending 1 then 0
  • handleInitProcess reads from the channel and finds a 0, and breaks out of the loop
  • A new Start call comes in and acquires s.lifecycleMu ahead of handleInitProcess's defer, but fails to make progress because of line 280

I'm struggling to think of a situation where:

  • handleInitExit is already in the channel loop (which means both (a) the init process is exited and we've stored it into s.containerInitExit already, and (b) s.runningExecs[container] is at least 1
  • Another exec can come in and increment s.runningExecs further after a 0 has been sent to ch

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, the channel created in s.handleInitExits has a buffer of size s.runningExecs[c], and we block new execs from being started before handleInitExits is called, so at most we'll process s.runningExecs[c] exits – so I don't think this would ever block.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems feasible that the shim is not even notified that the container init exited until after we have gotten the failure to start the exec here but before we take the lock at L296?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems feasible that the shim is not even notified that the container init exited until after we have gotten the failure to start the exec here but before we take the lock at L296?

Following that thread:

  • container init is started
  • (1) container exec is started
    • we increment s.runningExecs in L284, then start the exec
  • (1) the exec start call fails (L290)
  • (2) container init exits and s.processExits acquires s.lifecycleMu and processes it
    • init exit gets stashed in s.containerInitExits, and s.handleInitExit gets called (and 2. unlocks s.lifecycleMu)
  • (1) acquires the lock at L296, and decrements s.runningExecs
  • (2)s.handleInitExit locks s.lifecycleMu checks s.runningExecs, finds 0, and immediately processes the init exit

Or:

  • container init is started
  • (1) container exec is started
    • we increment s.runningExecs in L284, then start the exec
  • (1) the exec start call fails (L290)
  • (2) container init exits and s.processExits acquires s.lifecycleMu and processes it
    • init exit gets stashed in s.containerInitExits, and s.handleInitExit gets called (and 2. unlocks s.lifecycleMu)
  • (2)s.handleInitExit locks s.lifecycleMu checks s.runningExecs, finds 1, creates a buffered channel with length 1, launches the goroutine and unlocks s.lifecycleMu
  • (1) acquires the lock at L296, and decrements s.runningExecs, and sends (a 0) on the channel
  • (2) the goroutine receives on the channel, finds a 0, and processes the init exit.

}
s.lifecycleMu.Unlock()
}
handleStarted(container, p)
return nil, errdefs.ToGRPC(err)
}
Expand Down Expand Up @@ -677,28 +664,23 @@ func (s *service) processExits() {
// Handle the exit for a created/started process. If there's more than
// one, assume they've all exited. One of them will be the correct
// process.
var cps, skipped []containerProcess
var cps []containerProcess
for _, cp := range s.running[e.Pid] {
_, init := cp.Process.(*process.Init)
if init && s.pendingExecs[cp.Container] != 0 {
// This exit relates to a container for which we have pending execs. In
// order to ensure order between execs and the init process for a given
// container, skip processing this exit here and let the `handleStarted`
// closure for the pending exec publish it.
skipped = append(skipped, cp)
} else {
cps = append(cps, cp)
if init {
s.containerInitExit[cp.Container] = e
}
cps = append(cps, cp)
}
if len(skipped) > 0 {
s.running[e.Pid] = skipped
} else {
delete(s.running, e.Pid)
}
delete(s.running, e.Pid)
s.lifecycleMu.Unlock()

for _, cp := range cps {
s.handleProcessExit(e, cp.Container, cp.Process)
if ip, ok := cp.Process.(*process.Init); ok {
s.handleInitExit(e, cp.Container, ip)
} else {
s.handleProcessExit(e, cp.Container, cp.Process)
}
}
}
}
Expand All @@ -707,18 +689,60 @@ func (s *service) send(evt interface{}) {
s.events <- evt
}

// s.mu must be locked when calling handleProcessExit
func (s *service) handleProcessExit(e runcC.Exit, c *runc.Container, p process.Process) {
if ip, ok := p.(*process.Init); ok {
// Ensure all children are killed
if runc.ShouldKillAllOnExit(s.context, c.Bundle) {
if err := ip.KillAll(s.context); err != nil {
log.G(s.context).WithError(err).WithField("id", ip.ID()).
Error("failed to kill init's children")
}
// handleInitExit processes container init process exits.
// This is handled separately from non-init exits, because there
// are some extra invariants we want to ensure in this case, namely:
// - for a given container, the init process exit MUST be the last exit published
// This is achieved by:
// - killing all running container processes (if the container has a shared pid
// namespace, otherwise all other processes have been reaped already).
// - waiting for the container's running exec counter to reach 0.
// - finally, publishing the init exit.
func (s *service) handleInitExit(e runcC.Exit, c *runc.Container, p *process.Init) {
// kill all running container processes
if runc.ShouldKillAllOnExit(s.context, c.Bundle) {
if err := p.KillAll(s.context); err != nil {
log.G(s.context).WithError(err).WithField("id", p.ID()).
Error("failed to kill init's children")
}
}

s.lifecycleMu.Lock()
numRunningExecs := s.runningExecs[c]
if numRunningExecs == 0 {
delete(s.runningExecs, c)
dims marked this conversation as resolved.
Show resolved Hide resolved
s.lifecycleMu.Unlock()
s.handleProcessExit(e, c, p)
return
}

events := make(chan int, numRunningExecs)
samuelkarp marked this conversation as resolved.
Show resolved Hide resolved
s.execCountSubscribers[c] = events

s.lifecycleMu.Unlock()

go func() {
defer func() {
s.lifecycleMu.Lock()
defer s.lifecycleMu.Unlock()
delete(s.execCountSubscribers, c)
delete(s.runningExecs, c)
}()

// wait for running processes to exit
laurazard marked this conversation as resolved.
Show resolved Hide resolved
samuelkarp marked this conversation as resolved.
Show resolved Hide resolved
for {
if runningExecs := <-events; runningExecs == 0 {
dims marked this conversation as resolved.
Show resolved Hide resolved
break
}
}

// all running processes have exited now, and no new
// ones can start, so we can publish the init exit
s.handleProcessExit(e, c, p)
}()
}

func (s *service) handleProcessExit(e runcC.Exit, c *runc.Container, p process.Process) {
p.SetExited(e.Status)
s.send(&eventstypes.TaskExit{
ContainerID: c.ID,
Expand All @@ -727,6 +751,14 @@ func (s *service) handleProcessExit(e runcC.Exit, c *runc.Container, p process.P
ExitStatus: uint32(e.Status),
ExitedAt: protobuf.ToTimestamp(p.ExitedAt()),
})
if _, init := p.(*process.Init); !init {
s.lifecycleMu.Lock()
s.runningExecs[c]--
if ch, ok := s.execCountSubscribers[c]; ok {
ch <- s.runningExecs[c]
}
s.lifecycleMu.Unlock()
}
}

func (s *service) getContainerPids(ctx context.Context, container *runc.Container) ([]uint32, error) {
Expand Down
Loading