Skip to content

Commit

Permalink
runc-shim: handle pending execs as running
Browse files Browse the repository at this point in the history
Signed-off-by: Laura Brehm <laurabrehm@hey.com>
  • Loading branch information
laurazard committed Aug 28, 2024
1 parent 3e7ad97 commit 655cb60
Showing 1 changed file with 94 additions and 135 deletions.
229 changes: 94 additions & 135 deletions cmd/containerd-shim-runc-v2/task/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,16 +75,17 @@ func NewTaskService(ctx context.Context, publisher shim.Publisher, sd shutdown.S
}
go ep.Run(ctx)
s := &service{
context: ctx,
events: make(chan interface{}, 128),
ec: reaper.Default.Subscribe(),
ep: ep,
shutdown: sd,
containers: make(map[string]*runc.Container),
running: make(map[int][]containerProcess),
pendingExecs: make(map[*runc.Container]int),
containerInitExit: make(map[*runc.Container]runcC.Exit),
exitSubscribers: make(map[*map[int][]runcC.Exit]struct{}),
context: ctx,
events: make(chan interface{}, 128),
ec: reaper.Default.Subscribe(),
ep: ep,
shutdown: sd,
containers: make(map[string]*runc.Container),
running: make(map[int][]containerProcess),
runningExecs: make(map[*runc.Container]int),
execCountSubscribers: make(map[*runc.Container]chan<- int),
containerInitExit: make(map[*runc.Container]runcC.Exit),
exitSubscribers: make(map[*map[int][]runcC.Exit]struct{}),
}
go s.processExits()
runcC.Monitor = reaper.Default
Expand Down Expand Up @@ -119,7 +120,10 @@ type service struct {

lifecycleMu sync.Mutex
running map[int][]containerProcess // pid -> running process, guarded by lifecycleMu
pendingExecs map[*runc.Container]int // container -> num pending execs, guarded by lifecycleMu
runningExecs map[*runc.Container]int // container -> num running execs, guarded by lifecycleMu
// container -> subscription to exec exits/changes to s.runningExecs[container],
// guarded by lifecycleMu
execCountSubscribers map[*runc.Container]chan<- int
// container -> init exits, guarded by lifecycleMu
// Used to stash container init process exits, so that we can hold them
// until after we've made sure to publish all the container's exec exits.
Expand Down Expand Up @@ -149,8 +153,7 @@ type containerProcess struct {
//
// The returned handleStarted closure records that the process has started so
// that its exit can be handled efficiently. If the process has already exited,
// it handles the exit immediately. In addition, if the process is an exec and
// its container's init process has already exited, that exit is also processed.
// it handles the exit immediately.
// handleStarted should be called after the event announcing the start of the
// process has been published. Note that s.lifecycleMu must not be held when
// calling handleStarted.
Expand Down Expand Up @@ -188,43 +191,6 @@ func (s *service) preStart(c *runc.Container) (handleStarted func(*runc.Containe
_, init := p.(*process.Init)
s.lifecycleMu.Lock()

var initCps []containerProcess
var initExit runcC.Exit
if !init {
s.pendingExecs[c]--

initPid := c.Pid()
iExit, initExited := s.containerInitExit[c]
if initExited && s.pendingExecs[c] == 0 {
// c's init process has exited before handleStarted was called and
// this is the last pending exec process start - we need to process
// the exit for the init process after processing this exec, so:
// - delete c from the s.pendingExecs map
// - keep the exits for the init pid to process later (after we process
// this exec's exits)
// - get the necessary containerProcesses for the init process (that we
// need to process the exits), and remove them from s.running (which we skipped
// doing in processExits).
delete(s.pendingExecs, c)
initExit = iExit

var skipped []containerProcess
for _, initPidCp := range s.running[initPid] {
if initPidCp.Container == c {
initCps = append(initCps, initPidCp)
} else {
skipped = append(skipped, initPidCp)
}
}

if len(skipped) == 0 {
delete(s.running, initPid)
} else {
s.running[initPid] = skipped
}
}
}

ees, exited := exits[pid]
delete(s.exitSubscribers, &exits)
exits = nil
Expand All @@ -233,6 +199,14 @@ func (s *service) preStart(c *runc.Container) (handleStarted func(*runc.Containe
for _, ee := range ees {
s.handleProcessExit(ee, c, p)
}
if !init {
s.lifecycleMu.Lock()
s.runningExecs[c]--
if ch, ok := s.execCountSubscribers[c]; ok {
ch <- s.runningExecs[c]
}
s.lifecycleMu.Unlock()
}
} else {
// Process start was successful, add to `s.running`.
s.running[pid] = append(s.running[pid], containerProcess{
Expand All @@ -241,10 +215,6 @@ func (s *service) preStart(c *runc.Container) (handleStarted func(*runc.Containe
})
s.lifecycleMu.Unlock()
}

for _, cp := range initCps {
s.handleInitExit(initExit, cp.Container, cp.Process.(*process.Init))
}
}

cleanup = func() {
Expand Down Expand Up @@ -321,7 +291,7 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
s.lifecycleMu.Unlock()
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container %s init process is not running", container.ID)
}
s.pendingExecs[container]++
s.runningExecs[container]++
}
handleStarted, cleanup := s.preStart(cinit)
s.lifecycleMu.Unlock()
Expand Down Expand Up @@ -693,27 +663,15 @@ func (s *service) processExits() {
// Handle the exit for a created/started process. If there's more than
// one, assume they've all exited. One of them will be the correct
// process.
var cps, skipped []containerProcess
var cps []containerProcess
for _, cp := range s.running[e.Pid] {
_, init := cp.Process.(*process.Init)
if init {
s.containerInitExit[cp.Container] = e
}
if init && s.pendingExecs[cp.Container] != 0 {
// This exit relates to a container for which we have pending execs. In
// order to ensure order between execs and the init process for a given
// container, skip processing this exit here and let the `handleStarted`
// closure for the pending exec publish it.
skipped = append(skipped, cp)
} else {
cps = append(cps, cp)
}
}
if len(skipped) > 0 {
s.running[e.Pid] = skipped
} else {
delete(s.running, e.Pid)
cps = append(cps, cp)
}
delete(s.running, e.Pid)
s.lifecycleMu.Unlock()

for _, cp := range cps {
Expand All @@ -735,13 +693,10 @@ func (s *service) send(evt interface{}) {
// are some extra invariants we want to ensure in this case, namely:
// - for a given container, the init process exit MUST be the last exit published
// This is achieved by:
// - waiting for all pending execs to either successfully start or early-exit
// (this happens before handleInitExit is called)
// - killing all running container processes
// - removing all processes from s.running so that we s.processExits doesn't
// publish their exits
// - subscribing to exits+waiting for the running process count (for a
// container) to go to 0, or timing out.
// - waiting for the container's running exec counter to reach 0, or timeout
// - removing all remaining processes from s.running so that we s.processExits
// doesn't publish their exits
// - publishing the received exec exits, and then publishing the init exit.
func (s *service) handleInitExit(e runcC.Exit, c *runc.Container, p *process.Init) {
// kill all running container processes
Expand All @@ -752,84 +707,80 @@ func (s *service) handleInitExit(e runcC.Exit, c *runc.Container, p *process.Ini

s.lifecycleMu.Lock()
defer s.lifecycleMu.Unlock()

// subscribe to exits so that we can receive exits for our running processes
exits := make(map[int][]runcC.Exit)
s.exitSubscribers[&exits] = struct{}{}
// remove all running processes from s.running, so that their exits don't get
// published by processExits
runningCps := make(map[int][]containerProcess)
for _, execProcess := range c.ExecdProcesses() {
pid := execProcess.Pid()
if _, running := s.running[execProcess.Pid()]; running {
var skipped []containerProcess
for _, cp := range s.running[pid] {
if cp.Container == c {
runningCps[pid] = append(runningCps[pid], cp)
} else {
skipped = append(skipped, cp)
}
}

if len(skipped) == 0 {
delete(s.running, pid)
} else {
s.running[pid] = skipped
}
}
numRunningExecs := s.runningExecs[c]
if numRunningExecs == 0 {
s.handleProcessExit(e, c, p)
return
}

events := make(chan int, numRunningExecs)
s.execCountSubscribers[c] = events

go func() {
execExits := make(map[int][]runcC.Exit)
timeout := 20 * time.Millisecond
timer := time.NewTimer(timeout)
defer timer.Stop()
defer func() {
s.lifecycleMu.Lock()
defer s.lifecycleMu.Unlock()
delete(s.execCountSubscribers, c)
}()

// wait for running processes to exit
timeout := 20 * time.Millisecond
watchdog := time.NewTimer(timeout)
defer watchdog.Stop()
WAIT:
for {
<-timer.C

var updated bool
for pid := range runningCps {
if e, exited := exits[pid]; exited {
if _, ok := execExits[pid]; !ok {
updated = true
}
execExits[pid] = append(execExits[pid], e...)
select {
case <-watchdog.C:
break WAIT
case runningExecs := <-events:
if runningExecs == 0 {
break WAIT
}
}

if !updated {
break
}

if len(execExits) == len(runningCps) {
break
}

// timer channel has been drained, so it's safe
// to just call Reset
timer.Reset(timeout)
resetTimer(watchdog, timeout)
}

// publish all received exec exits
for pid, execExits := range execExits {
for _, exit := range execExits {
for _, cp := range runningCps[pid] {
s.handleProcessExit(exit, cp.Container, cp.Process)
// wipe all remaining processes from `s.running so that late-reaped
// exec exits won't be published after the init exit
for _, execProcess := range c.ExecdProcesses() {
pid := execProcess.Pid()
if _, running := s.running[execProcess.Pid()]; running {
var skipped []containerProcess
for _, cp := range s.running[pid] {
if cp.Container != c {
skipped = append(skipped, cp)
}
}

if len(skipped) == 0 {
delete(s.running, pid)
} else {
s.running[pid] = skipped
}
}
}

// all running processes have exited now, and no new
// ones can start, so we can publish the init exit
s.handleProcessExit(e, c, p)

s.lifecycleMu.Lock()
delete(s.exitSubscribers, &exits)
s.lifecycleMu.Unlock()
}()
}

// resetTimer is a helper function thatstops, drains and resets the timer.
// This is necessary in go versions <1.23, since the timer isn't stopped +
// the timer's channel isn't drained on timer.Reset.
// See: https://go-review.googlesource.com/c/go/+/568341
// FIXME: remove/simplify this after we update to go1.23
func resetTimer(t *time.Timer, d time.Duration) {
if !t.Stop() {
select {
case <-t.C:
default:
}
}
t.Reset(d)
}

func (s *service) handleProcessExit(e runcC.Exit, c *runc.Container, p process.Process) {
p.SetExited(e.Status)
s.send(&eventstypes.TaskExit{
Expand All @@ -839,6 +790,14 @@ func (s *service) handleProcessExit(e runcC.Exit, c *runc.Container, p process.P
ExitStatus: uint32(e.Status),
ExitedAt: protobuf.ToTimestamp(p.ExitedAt()),
})
if _, init := p.(*process.Init); !init {
s.lifecycleMu.Lock()
s.runningExecs[c]--
if ch, ok := s.execCountSubscribers[c]; ok {
ch <- s.runningExecs[c]
}
s.lifecycleMu.Unlock()
}
}

func (s *service) getContainerPids(ctx context.Context, container *runc.Container) ([]uint32, error) {
Expand Down

0 comments on commit 655cb60

Please sign in to comment.