Skip to content

Commit

Permalink
intrenal/cri: optimize stdio create procedure
Browse files Browse the repository at this point in the history
When each container is created, the containerd process will create
three new threads and open fifos.stdin/fifo.stdout/fifo.stderr. The
three threads in the fifos pipeline file will be blocked, and these
blocked threads will only return from the fifos pipeline file is
opened after the corresponding shim process of the container is started.
If the container remains in a state of creation without being started,
the above three threads will be permanently blocked, and each created
container will consume three threads. The default number of containerd
threads is 10000. In extreme scenarios, creating only the container
without starting it will reach the upper limit of containerd threads,
ultimately causing containerd threads to exit.

Move the operation of opening the fifos.stdin/fifos.stdout/fifos.stderr
for each container to the StartContainer function, so that no new threads
are created in CreateContainer function. And these new threads are not
constantly blocked in opening fifos.stdin/fifos.stdout/fifos.stderr file
process due to the immediate started of the shim process. Thus reducing
the consumption of containerd threads in this way. Prevent containerd
processes from exiting due to reaching the maximum number of threads.

Signed-off-by: mingfukuang <kuang.mingfu@zte.com.cn>
  • Loading branch information
mingfukuang committed Dec 10, 2024
1 parent e514bae commit 7d1b865
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 27 deletions.
22 changes: 0 additions & 22 deletions internal/cri/server/container_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
"github.com/containerd/containerd/v2/core/containers"
"github.com/containerd/containerd/v2/internal/cri/annotations"
criconfig "github.com/containerd/containerd/v2/internal/cri/config"
cio "github.com/containerd/containerd/v2/internal/cri/io"
crilabels "github.com/containerd/containerd/v2/internal/cri/labels"
customopts "github.com/containerd/containerd/v2/internal/cri/opts"
containerstore "github.com/containerd/containerd/v2/internal/cri/store/container"
Expand Down Expand Up @@ -262,26 +261,6 @@ func (c *criService) CreateContainer(ctx context.Context, r *runtime.CreateConta
sandboxConfig.GetLogDirectory(), config.GetLogPath())
}

var containerIO *cio.ContainerIO
switch ociRuntime.IOType {
case criconfig.IOTypeStreaming:
containerIO, err = cio.NewContainerIO(id,
cio.WithStreams(sandbox.Endpoint.Address, config.GetTty(), config.GetStdin()))
default:
containerIO, err = cio.NewContainerIO(id,
cio.WithNewFIFOs(volatileContainerRootDir, config.GetTty(), config.GetStdin()))
}
if err != nil {
return nil, fmt.Errorf("failed to create container io: %w", err)
}
defer func() {
if retErr != nil {
if err := containerIO.Close(); err != nil {
log.G(ctx).WithError(err).Errorf("Failed to close container io %q", id)
}
}
}()

specOpts, err := c.platformSpecOpts(platform, config, &image.ImageSpec.Config)
if err != nil {
return nil, fmt.Errorf("failed to get container spec opts: %w", err)
Expand Down Expand Up @@ -332,7 +311,6 @@ func (c *criService) CreateContainer(ctx context.Context, r *runtime.CreateConta
container, err := containerstore.NewContainer(meta,
containerstore.WithStatus(status, containerRootDir),
containerstore.WithContainer(cntr),
containerstore.WithContainerIO(containerIO),
)
if err != nil {
return nil, fmt.Errorf("failed to create internal container object for %q: %w", id, err)
Expand Down
36 changes: 31 additions & 5 deletions internal/cri/server/container_start.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"

containerd "github.com/containerd/containerd/v2/client"
criconfig "github.com/containerd/containerd/v2/internal/cri/config"
cio "github.com/containerd/containerd/v2/internal/cri/io"
containerstore "github.com/containerd/containerd/v2/internal/cri/store/container"
sandboxstore "github.com/containerd/containerd/v2/internal/cri/store/sandbox"
Expand Down Expand Up @@ -102,6 +103,36 @@ func (c *criService) StartContainer(ctx context.Context, r *runtime.StartContain
}
}

ociRuntime, err := c.config.GetSandboxRuntime(sandbox.Config, sandbox.Metadata.RuntimeHandler)
if err != nil {
return nil, fmt.Errorf("failed to get sandbox runtime: %w", err)
}

switch ociRuntime.IOType {
case criconfig.IOTypeStreaming:
cntr.IO, err = cio.NewContainerIO(id,
cio.WithStreams(sandbox.Endpoint.Address, config.GetTty(), config.GetStdin()))
default:
volatileContainerRootDir := c.getVolatileContainerRootDir(id)
cntr.IO, err = cio.NewContainerIO(id,
cio.WithNewFIFOs(volatileContainerRootDir, config.GetTty(), config.GetStdin()))
}
if err != nil {
return nil, fmt.Errorf("failed to create container io: %w", err)
}
defer func() {
if retErr != nil {
if err := cntr.IO.Close(); err != nil {
log.G(ctx).WithError(err).Errorf("Failed to close container io %q", id)
}
}
}()

if err = c.containerStore.UpdateContainerStdio(id, cntr.IO); err != nil {
log.G(ctx).WithError(err).Errorf("Failed UpdateContainerStdio %q", id)
return nil, err
}

ioCreation := func(id string) (_ containerdio.IO, err error) {
stdoutWC, stderrWC, err := c.createContainerLoggers(meta.LogPath, config.GetTty())
if err != nil {
Expand All @@ -112,11 +143,6 @@ func (c *criService) StartContainer(ctx context.Context, r *runtime.StartContain
return cntr.IO, nil
}

ociRuntime, err := c.config.GetSandboxRuntime(sandbox.Config, sandbox.Metadata.RuntimeHandler)
if err != nil {
return nil, fmt.Errorf("failed to get sandbox runtime: %w", err)
}

var taskOpts []containerd.NewTaskOpts
if ociRuntime.Path != "" {
taskOpts = append(taskOpts, containerd.WithRuntimePath(ociRuntime.Path))
Expand Down
21 changes: 21 additions & 0 deletions internal/cri/store/container/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,27 @@ func (s *Store) UpdateContainerStats(id string, newContainerStats *stats.Contain
return nil
}

func (s *Store) UpdateContainerStdio(id string, io *cio.ContainerIO) error {
s.lock.Lock()
defer s.lock.Unlock()
id, err := s.idIndex.Get(id)
if err != nil {
if err == truncindex.ErrNotExist {
err = errdefs.ErrNotFound
}
return err
}

if _, ok := s.containers[id]; !ok {
return errdefs.ErrNotFound
}

c := s.containers[id]
c.IO = io
s.containers[id] = c
return nil
}

// Delete deletes the container from store with specified id.
func (s *Store) Delete(id string) {
s.lock.Lock()
Expand Down

0 comments on commit 7d1b865

Please sign in to comment.