Skip to content

Commit

Permalink
Merge pull request #111384 from harche/evented_pleg_pr
Browse files Browse the repository at this point in the history
Add Support for Evented PLEG
  • Loading branch information
k8s-ci-robot authored Nov 8, 2022
2 parents c36127e + 86284d4 commit 114594e
Show file tree
Hide file tree
Showing 18 changed files with 1,488 additions and 510 deletions.
10 changes: 10 additions & 0 deletions pkg/features/kube_features.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,14 @@ const (
// Allows running an ephemeral container in pod namespaces to troubleshoot a running pod.
EphemeralContainers featuregate.Feature = "EphemeralContainers"

// owner: @harche
// kep: http://kep.k8s.io/3386
// alpha: v1.25
//
// Allows using event-driven PLEG (pod lifecycle event generator) through kubelet
// which avoids frequent relisting of containers which helps optimize performance.
EventedPLEG featuregate.Feature = "EventedPLEG"

// owner: @andrewsykim @SergeyKanzhelev
// GA: v1.20
//
Expand Down Expand Up @@ -943,6 +951,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS

EphemeralContainers: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.27

EventedPLEG: {Default: false, PreRelease: featuregate.Alpha},

ExecProbeTimeout: {Default: true, PreRelease: featuregate.GA}, // lock to default and remove after v1.22 based on KEP #1972 update

ExpandCSIVolumes: {Default: true, PreRelease: featuregate.GA}, // remove in 1.26
Expand Down
46 changes: 42 additions & 4 deletions pkg/kubelet/container/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"time"

"k8s.io/apimachinery/pkg/types"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/kubernetes/pkg/features"
)

// Cache stores the PodStatus for the pods. It represents *all* the visible
Expand All @@ -36,7 +38,10 @@ import (
// cache entries.
type Cache interface {
Get(types.UID) (*PodStatus, error)
Set(types.UID, *PodStatus, error, time.Time)
// Set updates the cache by setting the PodStatus for the pod only
// if the data is newer than the cache based on the provided
// time stamp. Returns if the cache was updated.
Set(types.UID, *PodStatus, error, time.Time) (updated bool)
// GetNewerThan is a blocking call that only returns the status
// when it is newer than the given time.
GetNewerThan(types.UID, time.Time) (*PodStatus, error)
Expand Down Expand Up @@ -93,12 +98,22 @@ func (c *cache) GetNewerThan(id types.UID, minTime time.Time) (*PodStatus, error
return d.status, d.err
}

// Set sets the PodStatus for the pod.
func (c *cache) Set(id types.UID, status *PodStatus, err error, timestamp time.Time) {
// Set sets the PodStatus for the pod only if the data is newer than the cache
func (c *cache) Set(id types.UID, status *PodStatus, err error, timestamp time.Time) (updated bool) {
c.lock.Lock()
defer c.lock.Unlock()
defer c.notify(id, timestamp)

if utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) {
// Set the value in the cache only if it's not present already
// or the timestamp in the cache is older than the current update timestamp
if cachedVal, ok := c.pods[id]; ok && cachedVal.modified.After(timestamp) {
return false
}
}

c.pods[id] = &data{status: status, err: err, modified: timestamp}
c.notify(id, timestamp)
return true
}

// Delete removes the entry of the pod.
Expand Down Expand Up @@ -142,6 +157,29 @@ func (c *cache) get(id types.UID) *data {
// Otherwise, it returns nil. The caller should acquire the lock.
func (c *cache) getIfNewerThan(id types.UID, minTime time.Time) *data {
d, ok := c.pods[id]
if utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) {
// Evented PLEG has CREATED, STARTED, STOPPED and DELETED events
// However if the container creation fails for some reason there is no
// CRI event received by the kubelet and that pod will get stuck a
// GetNewerThan call in the pod workers. This is reproducible with
// the node e2e test,
// https://github.com/kubernetes/kubernetes/blob/83415e5c9e6e59a3d60a148160490560af2178a1/test/e2e_node/pod_hostnamefqdn_test.go#L161
// which forces failure during pod creation. This issue also exists in
// Generic PLEG but since it updates global timestamp periodically
// the GetNewerThan call gets unstuck.

// During node e2e tests, it was observed this change does not have any
// adverse impact on the behaviour of the Generic PLEG as well.
switch {
case !ok:
return makeDefaultData(id)
case ok && (d.modified.After(minTime) || (c.timestamp != nil && c.timestamp.After(minTime))):
return d
default:
return nil
}
}

globalTimestampIsNewer := (c.timestamp != nil && c.timestamp.After(minTime))
if !ok && globalTimestampIsNewer {
// Status is not cached, but the global timestamp is newer than
Expand Down
4 changes: 4 additions & 0 deletions pkg/kubelet/container/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ type Runtime interface {
// CheckpointContainer tells the runtime to checkpoint a container
// and store the resulting archive to the checkpoint directory.
CheckpointContainer(ctx context.Context, options *runtimeapi.CheckpointContainerRequest) error
// Generate pod status from the CRI event
GeneratePodStatus(event *runtimeapi.ContainerEventResponse) (*PodStatus, error)
}

// StreamingRuntime is the interface implemented by runtimes that handle the serving of the
Expand Down Expand Up @@ -305,6 +307,8 @@ type PodStatus struct {
// Status of the pod sandbox.
// Only for kuberuntime now, other runtime may keep it nil.
SandboxStatuses []*runtimeapi.PodSandboxStatus
// Timestamp at which container and pod statuses were recorded
TimeStamp time.Time
}

// Status represents the status of a container.
Expand Down
3 changes: 2 additions & 1 deletion pkg/kubelet/container/testing/fake_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ func (c *fakeCache) GetNewerThan(id types.UID, minTime time.Time) (*container.Po
return c.Get(id)
}

func (c *fakeCache) Set(id types.UID, status *container.PodStatus, err error, timestamp time.Time) {
func (c *fakeCache) Set(id types.UID, status *container.PodStatus, err error, timestamp time.Time) (updated bool) {
return true
}

func (c *fakeCache) Delete(id types.UID) {
Expand Down
9 changes: 9 additions & 0 deletions pkg/kubelet/container/testing/fake_runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,15 @@ func (f *FakeRuntime) KillContainerInPod(container v1.Container, pod *v1.Pod) er
return f.Err
}

func (f *FakeRuntime) GeneratePodStatus(event *runtimeapi.ContainerEventResponse) (*kubecontainer.PodStatus, error) {
f.Lock()
defer f.Unlock()

f.CalledFunctions = append(f.CalledFunctions, "GeneratePodStatus")
status := f.PodStatus
return &status, f.Err
}

func (f *FakeRuntime) GetPodStatus(_ context.Context, uid types.UID, name, namespace string) (*kubecontainer.PodStatus, error) {
f.Lock()
defer f.Unlock()
Expand Down
15 changes: 15 additions & 0 deletions pkg/kubelet/container/testing/runtime_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 22 additions & 1 deletion pkg/kubelet/cri/remote/remote_runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"errors"
"fmt"
"io"
"strings"
"time"

Expand Down Expand Up @@ -792,5 +793,25 @@ func (r *remoteRuntimeService) CheckpointContainer(ctx context.Context, options
}

func (r *remoteRuntimeService) GetContainerEvents(containerEventsCh chan *runtimeapi.ContainerEventResponse) error {
return nil
containerEventsStreamingClient, err := r.runtimeClient.GetContainerEvents(context.Background(), &runtimeapi.GetEventsRequest{})
if err != nil {
klog.ErrorS(err, "GetContainerEvents failed to get streaming client")
return err
}

for {
resp, err := containerEventsStreamingClient.Recv()
if err == io.EOF {
klog.ErrorS(err, "container events stream is closed")
return err
}
if err != nil {
klog.ErrorS(err, "failed to receive streaming container event")
return err
}
if resp != nil {
containerEventsCh <- resp
klog.V(4).InfoS("container event received", "resp", resp)
}
}
}
47 changes: 45 additions & 2 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,13 @@ const (
// Note that even though we set the period to 1s, the relisting itself can
// take more than 1s to finish if the container runtime responds slowly
// and/or when there are many container changes in one cycle.
plegRelistPeriod = time.Second * 1
genericPlegRelistPeriod = time.Second * 1
genericPlegRelistThreshold = time.Minute * 3

// Generic PLEG relist period and threshold when used with Evented PLEG.
eventedPlegRelistPeriod = time.Second * 300
eventedPlegRelistThreshold = time.Minute * 10
eventedPlegMaxStreamRetries = 5

// backOffPeriod is the period to back off when pod syncing results in an
// error. It is also used as the base period for the exponential backoff
Expand Down Expand Up @@ -699,9 +705,37 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
utilfeature.DefaultFeatureGate.Enabled(features.PodAndContainerStatsFromCRI))
}

klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, klet.podCache, clock.RealClock{})
eventChannel := make(chan *pleg.PodLifecycleEvent, plegChannelCapacity)

if utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) {
// adjust Generic PLEG relisting period and threshold to higher value when Evented PLEG is turned on
genericRelistDuration := &pleg.RelistDuration{
RelistPeriod: eventedPlegRelistPeriod,
RelistThreshold: eventedPlegRelistThreshold,
}
klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, eventChannel, genericRelistDuration, klet.podCache, clock.RealClock{})
// In case Evented PLEG has to fall back on Generic PLEG due to an error,
// Evented PLEG should be able to reset the Generic PLEG relisting duration
// to the default value.
eventedRelistDuration := &pleg.RelistDuration{
RelistPeriod: genericPlegRelistPeriod,
RelistThreshold: genericPlegRelistThreshold,
}
klet.eventedPleg = pleg.NewEventedPLEG(klet.containerRuntime, klet.runtimeService, eventChannel,
klet.podCache, klet.pleg, eventedPlegMaxStreamRetries, eventedRelistDuration, clock.RealClock{})
} else {
genericRelistDuration := &pleg.RelistDuration{
RelistPeriod: genericPlegRelistPeriod,
RelistThreshold: genericPlegRelistThreshold,
}
klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, eventChannel, genericRelistDuration, klet.podCache, clock.RealClock{})
}

klet.runtimeState = newRuntimeState(maxWaitForContainerRuntime)
klet.runtimeState.addHealthCheck("PLEG", klet.pleg.Healthy)
if utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) {
klet.runtimeState.addHealthCheck("EventedPLEG", klet.eventedPleg.Healthy)
}
if _, err := klet.updatePodCIDR(ctx, kubeCfg.PodCIDR); err != nil {
klog.ErrorS(err, "Pod CIDR update failed")
}
Expand Down Expand Up @@ -1062,6 +1096,9 @@ type Kubelet struct {
// Generates pod events.
pleg pleg.PodLifecycleEventGenerator

// Evented PLEG
eventedPleg pleg.PodLifecycleEventGenerator

// Store kubecontainer.PodStatus for all pods.
podCache kubecontainer.Cache

Expand Down Expand Up @@ -1485,6 +1522,12 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {

// Start the pod lifecycle event generator.
kl.pleg.Start()

// Start eventedPLEG only if EventedPLEG feature gate is enabled.
if utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) {
kl.eventedPleg.Start()
}

kl.syncLoop(ctx, updates, kl)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/kubelet/kubelet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ func newTestKubeletWithImageList(
kubelet.resyncInterval = 10 * time.Second
kubelet.workQueue = queue.NewBasicWorkQueue(fakeClock)
// Relist period does not affect the tests.
kubelet.pleg = pleg.NewGenericPLEG(fakeRuntime, 100, time.Hour, kubelet.podCache, clock.RealClock{})
kubelet.pleg = pleg.NewGenericPLEG(fakeRuntime, make(chan *pleg.PodLifecycleEvent, 100), &pleg.RelistDuration{RelistPeriod: time.Hour, RelistThreshold: genericPlegRelistThreshold}, kubelet.podCache, clock.RealClock{})
kubelet.clock = fakeClock

nodeRef := &v1.ObjectReference{
Expand Down
43 changes: 24 additions & 19 deletions pkg/kubelet/kuberuntime/kuberuntime_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,29 @@ func (m *kubeGenericRuntimeManager) readLastStringFromContainerLogs(path string)
return buf.String()
}

func (m *kubeGenericRuntimeManager) convertToKubeContainerStatus(status *runtimeapi.ContainerStatus) (cStatus *kubecontainer.Status) {
cStatus = toKubeContainerStatus(status, m.runtimeName)
if status.State == runtimeapi.ContainerState_CONTAINER_EXITED {
// Populate the termination message if needed.
annotatedInfo := getContainerInfoFromAnnotations(status.Annotations)
// If a container cannot even be started, it certainly does not have logs, so no need to fallbackToLogs.
fallbackToLogs := annotatedInfo.TerminationMessagePolicy == v1.TerminationMessageFallbackToLogsOnError &&
cStatus.ExitCode != 0 && cStatus.Reason != "ContainerCannotRun"
tMessage, checkLogs := getTerminationMessage(status, annotatedInfo.TerminationMessagePath, fallbackToLogs)
if checkLogs {
tMessage = m.readLastStringFromContainerLogs(status.GetLogPath())
}
// Enrich the termination message written by the application is not empty
if len(tMessage) != 0 {
if len(cStatus.Message) != 0 {
cStatus.Message += ": "
}
cStatus.Message += tMessage
}
}
return cStatus
}

// getPodContainerStatuses gets all containers' statuses for the pod.
func (m *kubeGenericRuntimeManager) getPodContainerStatuses(ctx context.Context, uid kubetypes.UID, name, namespace string) ([]*kubecontainer.Status, error) {
// Select all containers of the given pod.
Expand Down Expand Up @@ -521,25 +544,7 @@ func (m *kubeGenericRuntimeManager) getPodContainerStatuses(ctx context.Context,
if status == nil {
return nil, remote.ErrContainerStatusNil
}
cStatus := toKubeContainerStatus(status, m.runtimeName)
if status.State == runtimeapi.ContainerState_CONTAINER_EXITED {
// Populate the termination message if needed.
annotatedInfo := getContainerInfoFromAnnotations(status.Annotations)
// If a container cannot even be started, it certainly does not have logs, so no need to fallbackToLogs.
fallbackToLogs := annotatedInfo.TerminationMessagePolicy == v1.TerminationMessageFallbackToLogsOnError &&
cStatus.ExitCode != 0 && cStatus.Reason != "ContainerCannotRun"
tMessage, checkLogs := getTerminationMessage(status, annotatedInfo.TerminationMessagePath, fallbackToLogs)
if checkLogs {
tMessage = m.readLastStringFromContainerLogs(status.GetLogPath())
}
// Enrich the termination message written by the application is not empty
if len(tMessage) != 0 {
if len(cStatus.Message) != 0 {
cStatus.Message += ": "
}
cStatus.Message += tMessage
}
}
cStatus := m.convertToKubeContainerStatus(status)
statuses = append(statuses, cStatus)
}

Expand Down
Loading

0 comments on commit 114594e

Please sign in to comment.