-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
runc-shim: fix races/prevent init exits from being dropped #10651
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -74,15 +74,17 @@ func NewTaskService(ctx context.Context, publisher shim.Publisher, sd shutdown.S | |
} | ||
go ep.Run(ctx) | ||
s := &service{ | ||
context: ctx, | ||
events: make(chan interface{}, 128), | ||
ec: reaper.Default.Subscribe(), | ||
ep: ep, | ||
shutdown: sd, | ||
containers: make(map[string]*runc.Container), | ||
running: make(map[int][]containerProcess), | ||
pendingExecs: make(map[*runc.Container]int), | ||
exitSubscribers: make(map[*map[int][]runcC.Exit]struct{}), | ||
context: ctx, | ||
events: make(chan interface{}, 128), | ||
ec: reaper.Default.Subscribe(), | ||
ep: ep, | ||
shutdown: sd, | ||
containers: make(map[string]*runc.Container), | ||
running: make(map[int][]containerProcess), | ||
runningExecs: make(map[*runc.Container]int), | ||
execCountSubscribers: make(map[*runc.Container]chan<- int), | ||
containerInitExit: make(map[*runc.Container]runcC.Exit), | ||
exitSubscribers: make(map[*map[int][]runcC.Exit]struct{}), | ||
} | ||
go s.processExits() | ||
runcC.Monitor = reaper.Default | ||
|
@@ -117,7 +119,19 @@ type service struct { | |
|
||
lifecycleMu sync.Mutex | ||
running map[int][]containerProcess // pid -> running process, guarded by lifecycleMu | ||
pendingExecs map[*runc.Container]int // container -> num pending execs, guarded by lifecycleMu | ||
runningExecs map[*runc.Container]int // container -> num running execs, guarded by lifecycleMu | ||
// container -> subscription to exec exits/changes to s.runningExecs[container], | ||
// guarded by lifecycleMu | ||
execCountSubscribers map[*runc.Container]chan<- int | ||
// container -> init exits, guarded by lifecycleMu | ||
// Used to stash container init process exits, so that we can hold them | ||
// until after we've made sure to publish all the container's exec exits. | ||
// Also used to prevent starting new execs from being started if the | ||
// container's init process (read: pid, not [process.Init]) has already been | ||
// reaped by the shim. | ||
// Note that this flag gets updated before the container's [process.Init.Status] | ||
// is transitioned to "stopped". | ||
containerInitExit map[*runc.Container]runcC.Exit | ||
// Subscriptions to exits for PIDs. Adding/deleting subscriptions and | ||
// dereferencing the subscription pointers must only be done while holding | ||
// lifecycleMu. | ||
|
@@ -138,8 +152,7 @@ type containerProcess struct { | |
// | ||
// The returned handleStarted closure records that the process has started so | ||
// that its exit can be handled efficiently. If the process has already exited, | ||
// it handles the exit immediately. In addition, if the process is an exec and | ||
// its container's init process has already exited, that exit is also processed. | ||
// it handles the exit immediately. | ||
// handleStarted should be called after the event announcing the start of the | ||
// process has been published. Note that s.lifecycleMu must not be held when | ||
// calling handleStarted. | ||
|
@@ -174,44 +187,8 @@ func (s *service) preStart(c *runc.Container) (handleStarted func(*runc.Containe | |
pid = p.Pid() | ||
} | ||
|
||
_, init := p.(*process.Init) | ||
s.lifecycleMu.Lock() | ||
|
||
var initExits []runcC.Exit | ||
var initCps []containerProcess | ||
if !init { | ||
s.pendingExecs[c]-- | ||
|
||
initPid := c.Pid() | ||
iExits, initExited := exits[initPid] | ||
if initExited && s.pendingExecs[c] == 0 { | ||
// c's init process has exited before handleStarted was called and | ||
// this is the last pending exec process start - we need to process | ||
// the exit for the init process after processing this exec, so: | ||
// - delete c from the s.pendingExecs map | ||
// - keep the exits for the init pid to process later (after we process | ||
// this exec's exits) | ||
// - get the necessary containerProcesses for the init process (that we | ||
// need to process the exits), and remove them from s.running (which we skipped | ||
// doing in processExits). | ||
delete(s.pendingExecs, c) | ||
initExits = iExits | ||
var skipped []containerProcess | ||
for _, initPidCp := range s.running[initPid] { | ||
if initPidCp.Container == c { | ||
initCps = append(initCps, initPidCp) | ||
} else { | ||
skipped = append(skipped, initPidCp) | ||
} | ||
} | ||
if len(skipped) == 0 { | ||
delete(s.running, initPid) | ||
} else { | ||
s.running[initPid] = skipped | ||
} | ||
} | ||
} | ||
|
||
ees, exited := exits[pid] | ||
delete(s.exitSubscribers, &exits) | ||
exits = nil | ||
|
@@ -220,11 +197,6 @@ func (s *service) preStart(c *runc.Container) (handleStarted func(*runc.Containe | |
for _, ee := range ees { | ||
s.handleProcessExit(ee, c, p) | ||
} | ||
for _, eee := range initExits { | ||
for _, cp := range initCps { | ||
s.handleProcessExit(eee, cp.Container, cp.Process) | ||
} | ||
} | ||
} else { | ||
// Process start was successful, add to `s.running`. | ||
s.running[pid] = append(s.running[pid], containerProcess{ | ||
|
@@ -305,14 +277,29 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI. | |
if r.ExecID == "" { | ||
cinit = container | ||
} else { | ||
s.pendingExecs[container]++ | ||
if _, initExited := s.containerInitExit[container]; initExited { | ||
s.lifecycleMu.Unlock() | ||
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container %s init process is not running", container.ID) | ||
} | ||
s.runningExecs[container]++ | ||
} | ||
handleStarted, cleanup := s.preStart(cinit) | ||
s.lifecycleMu.Unlock() | ||
defer cleanup() | ||
|
||
p, err := container.Start(ctx, r) | ||
if err != nil { | ||
// If we failed to even start the process, s.runningExecs | ||
// won't get decremented in s.handleProcessExit. We still need | ||
// to update it. | ||
if r.ExecID != "" { | ||
s.lifecycleMu.Lock() | ||
s.runningExecs[container]-- | ||
if ch, ok := s.execCountSubscribers[container]; ok { | ||
ch <- s.runningExecs[container] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems like this could block if the for loop in the goroutine in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let me try to think through this:
I'm struggling to think of a situation where:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, the channel created in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems feasible that the shim is not even notified that the container init exited until after we have gotten the failure to start the exec here but before we take the lock at L296? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Following that thread:
Or:
|
||
} | ||
s.lifecycleMu.Unlock() | ||
} | ||
handleStarted(container, p) | ||
return nil, errdefs.ToGRPC(err) | ||
} | ||
|
@@ -677,28 +664,23 @@ func (s *service) processExits() { | |
// Handle the exit for a created/started process. If there's more than | ||
// one, assume they've all exited. One of them will be the correct | ||
// process. | ||
var cps, skipped []containerProcess | ||
var cps []containerProcess | ||
for _, cp := range s.running[e.Pid] { | ||
_, init := cp.Process.(*process.Init) | ||
if init && s.pendingExecs[cp.Container] != 0 { | ||
// This exit relates to a container for which we have pending execs. In | ||
// order to ensure order between execs and the init process for a given | ||
// container, skip processing this exit here and let the `handleStarted` | ||
// closure for the pending exec publish it. | ||
skipped = append(skipped, cp) | ||
} else { | ||
cps = append(cps, cp) | ||
if init { | ||
s.containerInitExit[cp.Container] = e | ||
} | ||
cps = append(cps, cp) | ||
} | ||
if len(skipped) > 0 { | ||
s.running[e.Pid] = skipped | ||
} else { | ||
delete(s.running, e.Pid) | ||
} | ||
delete(s.running, e.Pid) | ||
s.lifecycleMu.Unlock() | ||
|
||
for _, cp := range cps { | ||
s.handleProcessExit(e, cp.Container, cp.Process) | ||
if ip, ok := cp.Process.(*process.Init); ok { | ||
s.handleInitExit(e, cp.Container, ip) | ||
} else { | ||
s.handleProcessExit(e, cp.Container, cp.Process) | ||
} | ||
} | ||
} | ||
} | ||
|
@@ -707,18 +689,60 @@ func (s *service) send(evt interface{}) { | |
s.events <- evt | ||
} | ||
|
||
// s.mu must be locked when calling handleProcessExit | ||
func (s *service) handleProcessExit(e runcC.Exit, c *runc.Container, p process.Process) { | ||
if ip, ok := p.(*process.Init); ok { | ||
// Ensure all children are killed | ||
if runc.ShouldKillAllOnExit(s.context, c.Bundle) { | ||
if err := ip.KillAll(s.context); err != nil { | ||
log.G(s.context).WithError(err).WithField("id", ip.ID()). | ||
Error("failed to kill init's children") | ||
} | ||
// handleInitExit processes container init process exits. | ||
// This is handled separately from non-init exits, because there | ||
// are some extra invariants we want to ensure in this case, namely: | ||
// - for a given container, the init process exit MUST be the last exit published | ||
// This is achieved by: | ||
// - killing all running container processes (if the container has a shared pid | ||
// namespace, otherwise all other processes have been reaped already). | ||
// - waiting for the container's running exec counter to reach 0. | ||
// - finally, publishing the init exit. | ||
func (s *service) handleInitExit(e runcC.Exit, c *runc.Container, p *process.Init) { | ||
// kill all running container processes | ||
if runc.ShouldKillAllOnExit(s.context, c.Bundle) { | ||
if err := p.KillAll(s.context); err != nil { | ||
log.G(s.context).WithError(err).WithField("id", p.ID()). | ||
Error("failed to kill init's children") | ||
} | ||
} | ||
|
||
s.lifecycleMu.Lock() | ||
numRunningExecs := s.runningExecs[c] | ||
if numRunningExecs == 0 { | ||
delete(s.runningExecs, c) | ||
dims marked this conversation as resolved.
Show resolved
Hide resolved
|
||
s.lifecycleMu.Unlock() | ||
s.handleProcessExit(e, c, p) | ||
return | ||
} | ||
|
||
events := make(chan int, numRunningExecs) | ||
samuelkarp marked this conversation as resolved.
Show resolved
Hide resolved
|
||
s.execCountSubscribers[c] = events | ||
|
||
s.lifecycleMu.Unlock() | ||
|
||
go func() { | ||
defer func() { | ||
s.lifecycleMu.Lock() | ||
defer s.lifecycleMu.Unlock() | ||
delete(s.execCountSubscribers, c) | ||
delete(s.runningExecs, c) | ||
}() | ||
|
||
// wait for running processes to exit | ||
laurazard marked this conversation as resolved.
Show resolved
Hide resolved
samuelkarp marked this conversation as resolved.
Show resolved
Hide resolved
|
||
for { | ||
if runningExecs := <-events; runningExecs == 0 { | ||
dims marked this conversation as resolved.
Show resolved
Hide resolved
|
||
break | ||
} | ||
} | ||
|
||
// all running processes have exited now, and no new | ||
// ones can start, so we can publish the init exit | ||
s.handleProcessExit(e, c, p) | ||
}() | ||
} | ||
|
||
func (s *service) handleProcessExit(e runcC.Exit, c *runc.Container, p process.Process) { | ||
p.SetExited(e.Status) | ||
s.send(&eventstypes.TaskExit{ | ||
ContainerID: c.ID, | ||
|
@@ -727,6 +751,14 @@ func (s *service) handleProcessExit(e runcC.Exit, c *runc.Container, p process.P | |
ExitStatus: uint32(e.Status), | ||
ExitedAt: protobuf.ToTimestamp(p.ExitedAt()), | ||
}) | ||
if _, init := p.(*process.Init); !init { | ||
s.lifecycleMu.Lock() | ||
s.runningExecs[c]-- | ||
if ch, ok := s.execCountSubscribers[c]; ok { | ||
ch <- s.runningExecs[c] | ||
} | ||
s.lifecycleMu.Unlock() | ||
} | ||
} | ||
|
||
func (s *service) getContainerPids(ctx context.Context, container *runc.Container) ([]uint32, error) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This may be better as a cond var with lifecyclemu to avoid more data races.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There'd still need to be some way for handleInitExit to be notified to check the count. That could be with a
chan<- struct{}
, but it seems cheaper to send the value over thechan<- int
and avoid acquirings.lifecycleMu
if the value is > 0.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, I did have a branch somewhere with a
sync.Cond
, but I ended up going with Cory's suggestion for a map of channels since it seemed simpler/more verifiably correct. In general I think I prefer this approach since it reduces lock contention overs.lifecycleMu
inside the goroutine, and we only have to lock it to set the channel and then remove it when the goroutine is done, but let me know if you have a strong opinion/have spotted any race condition in particular!