From 655cb60f193f6ada932750afb10783498c69a4f6 Mon Sep 17 00:00:00 2001 From: Laura Brehm Date: Wed, 28 Aug 2024 12:29:25 +0100 Subject: [PATCH] runc-shim: handle pending execs as running Signed-off-by: Laura Brehm --- cmd/containerd-shim-runc-v2/task/service.go | 229 ++++++++------------ 1 file changed, 94 insertions(+), 135 deletions(-) diff --git a/cmd/containerd-shim-runc-v2/task/service.go b/cmd/containerd-shim-runc-v2/task/service.go index b469e3410eaf7..81e06a24e8379 100644 --- a/cmd/containerd-shim-runc-v2/task/service.go +++ b/cmd/containerd-shim-runc-v2/task/service.go @@ -75,16 +75,17 @@ func NewTaskService(ctx context.Context, publisher shim.Publisher, sd shutdown.S } go ep.Run(ctx) s := &service{ - context: ctx, - events: make(chan interface{}, 128), - ec: reaper.Default.Subscribe(), - ep: ep, - shutdown: sd, - containers: make(map[string]*runc.Container), - running: make(map[int][]containerProcess), - pendingExecs: make(map[*runc.Container]int), - containerInitExit: make(map[*runc.Container]runcC.Exit), - exitSubscribers: make(map[*map[int][]runcC.Exit]struct{}), + context: ctx, + events: make(chan interface{}, 128), + ec: reaper.Default.Subscribe(), + ep: ep, + shutdown: sd, + containers: make(map[string]*runc.Container), + running: make(map[int][]containerProcess), + runningExecs: make(map[*runc.Container]int), + execCountSubscribers: make(map[*runc.Container]chan<- int), + containerInitExit: make(map[*runc.Container]runcC.Exit), + exitSubscribers: make(map[*map[int][]runcC.Exit]struct{}), } go s.processExits() runcC.Monitor = reaper.Default @@ -119,7 +120,10 @@ type service struct { lifecycleMu sync.Mutex running map[int][]containerProcess // pid -> running process, guarded by lifecycleMu - pendingExecs map[*runc.Container]int // container -> num pending execs, guarded by lifecycleMu + runningExecs map[*runc.Container]int // container -> num running execs, guarded by lifecycleMu + // container -> subscription to exec exits/changes to s.runningExecs[container], + // guarded by lifecycleMu + execCountSubscribers map[*runc.Container]chan<- int // container -> init exits, guarded by lifecycleMu // Used to stash container init process exits, so that we can hold them // until after we've made sure to publish all the container's exec exits. @@ -149,8 +153,7 @@ type containerProcess struct { // // The returned handleStarted closure records that the process has started so // that its exit can be handled efficiently. If the process has already exited, -// it handles the exit immediately. In addition, if the process is an exec and -// its container's init process has already exited, that exit is also processed. +// it handles the exit immediately. // handleStarted should be called after the event announcing the start of the // process has been published. Note that s.lifecycleMu must not be held when // calling handleStarted. @@ -188,43 +191,6 @@ func (s *service) preStart(c *runc.Container) (handleStarted func(*runc.Containe _, init := p.(*process.Init) s.lifecycleMu.Lock() - var initCps []containerProcess - var initExit runcC.Exit - if !init { - s.pendingExecs[c]-- - - initPid := c.Pid() - iExit, initExited := s.containerInitExit[c] - if initExited && s.pendingExecs[c] == 0 { - // c's init process has exited before handleStarted was called and - // this is the last pending exec process start - we need to process - // the exit for the init process after processing this exec, so: - // - delete c from the s.pendingExecs map - // - keep the exits for the init pid to process later (after we process - // this exec's exits) - // - get the necessary containerProcesses for the init process (that we - // need to process the exits), and remove them from s.running (which we skipped - // doing in processExits). - delete(s.pendingExecs, c) - initExit = iExit - - var skipped []containerProcess - for _, initPidCp := range s.running[initPid] { - if initPidCp.Container == c { - initCps = append(initCps, initPidCp) - } else { - skipped = append(skipped, initPidCp) - } - } - - if len(skipped) == 0 { - delete(s.running, initPid) - } else { - s.running[initPid] = skipped - } - } - } - ees, exited := exits[pid] delete(s.exitSubscribers, &exits) exits = nil @@ -233,6 +199,14 @@ func (s *service) preStart(c *runc.Container) (handleStarted func(*runc.Containe for _, ee := range ees { s.handleProcessExit(ee, c, p) } + if !init { + s.lifecycleMu.Lock() + s.runningExecs[c]-- + if ch, ok := s.execCountSubscribers[c]; ok { + ch <- s.runningExecs[c] + } + s.lifecycleMu.Unlock() + } } else { // Process start was successful, add to `s.running`. s.running[pid] = append(s.running[pid], containerProcess{ @@ -241,10 +215,6 @@ func (s *service) preStart(c *runc.Container) (handleStarted func(*runc.Containe }) s.lifecycleMu.Unlock() } - - for _, cp := range initCps { - s.handleInitExit(initExit, cp.Container, cp.Process.(*process.Init)) - } } cleanup = func() { @@ -321,7 +291,7 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI. s.lifecycleMu.Unlock() return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container %s init process is not running", container.ID) } - s.pendingExecs[container]++ + s.runningExecs[container]++ } handleStarted, cleanup := s.preStart(cinit) s.lifecycleMu.Unlock() @@ -693,27 +663,15 @@ func (s *service) processExits() { // Handle the exit for a created/started process. If there's more than // one, assume they've all exited. One of them will be the correct // process. - var cps, skipped []containerProcess + var cps []containerProcess for _, cp := range s.running[e.Pid] { _, init := cp.Process.(*process.Init) if init { s.containerInitExit[cp.Container] = e } - if init && s.pendingExecs[cp.Container] != 0 { - // This exit relates to a container for which we have pending execs. In - // order to ensure order between execs and the init process for a given - // container, skip processing this exit here and let the `handleStarted` - // closure for the pending exec publish it. - skipped = append(skipped, cp) - } else { - cps = append(cps, cp) - } - } - if len(skipped) > 0 { - s.running[e.Pid] = skipped - } else { - delete(s.running, e.Pid) + cps = append(cps, cp) } + delete(s.running, e.Pid) s.lifecycleMu.Unlock() for _, cp := range cps { @@ -735,13 +693,10 @@ func (s *service) send(evt interface{}) { // are some extra invariants we want to ensure in this case, namely: // - for a given container, the init process exit MUST be the last exit published // This is achieved by: -// - waiting for all pending execs to either successfully start or early-exit -// (this happens before handleInitExit is called) // - killing all running container processes -// - removing all processes from s.running so that we s.processExits doesn't -// publish their exits -// - subscribing to exits+waiting for the running process count (for a -// container) to go to 0, or timing out. +// - waiting for the container's running exec counter to reach 0, or timeout +// - removing all remaining processes from s.running so that we s.processExits +// doesn't publish their exits // - publishing the received exec exits, and then publishing the init exit. func (s *service) handleInitExit(e runcC.Exit, c *runc.Container, p *process.Init) { // kill all running container processes @@ -752,70 +707,55 @@ func (s *service) handleInitExit(e runcC.Exit, c *runc.Container, p *process.Ini s.lifecycleMu.Lock() defer s.lifecycleMu.Unlock() - - // subscribe to exits so that we can receive exits for our running processes - exits := make(map[int][]runcC.Exit) - s.exitSubscribers[&exits] = struct{}{} - // remove all running processes from s.running, so that their exits don't get - // published by processExits - runningCps := make(map[int][]containerProcess) - for _, execProcess := range c.ExecdProcesses() { - pid := execProcess.Pid() - if _, running := s.running[execProcess.Pid()]; running { - var skipped []containerProcess - for _, cp := range s.running[pid] { - if cp.Container == c { - runningCps[pid] = append(runningCps[pid], cp) - } else { - skipped = append(skipped, cp) - } - } - - if len(skipped) == 0 { - delete(s.running, pid) - } else { - s.running[pid] = skipped - } - } + numRunningExecs := s.runningExecs[c] + if numRunningExecs == 0 { + s.handleProcessExit(e, c, p) + return } + events := make(chan int, numRunningExecs) + s.execCountSubscribers[c] = events + go func() { - execExits := make(map[int][]runcC.Exit) - timeout := 20 * time.Millisecond - timer := time.NewTimer(timeout) - defer timer.Stop() + defer func() { + s.lifecycleMu.Lock() + defer s.lifecycleMu.Unlock() + delete(s.execCountSubscribers, c) + }() + // wait for running processes to exit + timeout := 20 * time.Millisecond + watchdog := time.NewTimer(timeout) + defer watchdog.Stop() + WAIT: for { - <-timer.C - - var updated bool - for pid := range runningCps { - if e, exited := exits[pid]; exited { - if _, ok := execExits[pid]; !ok { - updated = true - } - execExits[pid] = append(execExits[pid], e...) + select { + case <-watchdog.C: + break WAIT + case runningExecs := <-events: + if runningExecs == 0 { + break WAIT } } - - if !updated { - break - } - - if len(execExits) == len(runningCps) { - break - } - - // timer channel has been drained, so it's safe - // to just call Reset - timer.Reset(timeout) + resetTimer(watchdog, timeout) } - // publish all received exec exits - for pid, execExits := range execExits { - for _, exit := range execExits { - for _, cp := range runningCps[pid] { - s.handleProcessExit(exit, cp.Container, cp.Process) + // wipe all remaining processes from `s.running so that late-reaped + // exec exits won't be published after the init exit + for _, execProcess := range c.ExecdProcesses() { + pid := execProcess.Pid() + if _, running := s.running[execProcess.Pid()]; running { + var skipped []containerProcess + for _, cp := range s.running[pid] { + if cp.Container != c { + skipped = append(skipped, cp) + } + } + + if len(skipped) == 0 { + delete(s.running, pid) + } else { + s.running[pid] = skipped } } } @@ -823,13 +763,24 @@ func (s *service) handleInitExit(e runcC.Exit, c *runc.Container, p *process.Ini // all running processes have exited now, and no new // ones can start, so we can publish the init exit s.handleProcessExit(e, c, p) - - s.lifecycleMu.Lock() - delete(s.exitSubscribers, &exits) - s.lifecycleMu.Unlock() }() } +// resetTimer is a helper function thatstops, drains and resets the timer. +// This is necessary in go versions <1.23, since the timer isn't stopped + +// the timer's channel isn't drained on timer.Reset. +// See: https://go-review.googlesource.com/c/go/+/568341 +// FIXME: remove/simplify this after we update to go1.23 +func resetTimer(t *time.Timer, d time.Duration) { + if !t.Stop() { + select { + case <-t.C: + default: + } + } + t.Reset(d) +} + func (s *service) handleProcessExit(e runcC.Exit, c *runc.Container, p process.Process) { p.SetExited(e.Status) s.send(&eventstypes.TaskExit{ @@ -839,6 +790,14 @@ func (s *service) handleProcessExit(e runcC.Exit, c *runc.Container, p process.P ExitStatus: uint32(e.Status), ExitedAt: protobuf.ToTimestamp(p.ExitedAt()), }) + if _, init := p.(*process.Init); !init { + s.lifecycleMu.Lock() + s.runningExecs[c]-- + if ch, ok := s.execCountSubscribers[c]; ok { + ch <- s.runningExecs[c] + } + s.lifecycleMu.Unlock() + } } func (s *service) getContainerPids(ctx context.Context, container *runc.Container) ([]uint32, error) {