Skip to content

Commit

Permalink
kubelet: migrate pleg to contextual logging
Browse files Browse the repository at this point in the history
Signed-off-by: Oksana Baranova <oksana.baranova@intel.com>
oxxenix committed Sep 13, 2024
1 parent 19e8e59 commit 2474369
Showing 8 changed files with 50 additions and 39 deletions.
1 change: 1 addition & 0 deletions hack/golangci-hints.yaml
Original file line number Diff line number Diff line change
@@ -152,6 +152,7 @@ linters-settings: # please keep this alphabetized
contextual k8s.io/kubernetes/pkg/scheduler/.*
contextual k8s.io/kubernetes/test/e2e/dra/.*
contextual k8s.io/kubernetes/pkg/kubelet/cm/dra/.*
contextual k8s.io/kubernetes/pkg/kubelet/pleg/.*
# As long as contextual logging is alpha or beta, all WithName, WithValues,
# NewContext calls have to go through klog. Once it is GA, we can lift
1 change: 1 addition & 0 deletions hack/golangci-strict.yaml
Original file line number Diff line number Diff line change
@@ -198,6 +198,7 @@ linters-settings: # please keep this alphabetized
contextual k8s.io/kubernetes/pkg/scheduler/.*
contextual k8s.io/kubernetes/test/e2e/dra/.*
contextual k8s.io/kubernetes/pkg/kubelet/cm/dra/.*
contextual k8s.io/kubernetes/pkg/kubelet/pleg/.*
# As long as contextual logging is alpha or beta, all WithName, WithValues,
# NewContext calls have to go through klog. Once it is GA, we can lift
1 change: 1 addition & 0 deletions hack/golangci.yaml
Original file line number Diff line number Diff line change
@@ -201,6 +201,7 @@ linters-settings: # please keep this alphabetized
contextual k8s.io/kubernetes/pkg/scheduler/.*
contextual k8s.io/kubernetes/test/e2e/dra/.*
contextual k8s.io/kubernetes/pkg/kubelet/cm/dra/.*
contextual k8s.io/kubernetes/pkg/kubelet/pleg/.*
# As long as contextual logging is alpha or beta, all WithName, WithValues,
# NewContext calls have to go through klog. Once it is GA, we can lift
1 change: 1 addition & 0 deletions hack/logcheck.conf
Original file line number Diff line number Diff line change
@@ -48,6 +48,7 @@ contextual k8s.io/kubernetes/pkg/controller/.*
contextual k8s.io/kubernetes/pkg/scheduler/.*
contextual k8s.io/kubernetes/test/e2e/dra/.*
contextual k8s.io/kubernetes/pkg/kubelet/cm/dra/.*
contextual k8s.io/kubernetes/pkg/kubelet/pleg/.*

# As long as contextual logging is alpha or beta, all WithName, WithValues,
# NewContext calls have to go through klog. Once it is GA, we can lift
6 changes: 3 additions & 3 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
@@ -738,15 +738,15 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
RelistPeriod: eventedPlegRelistPeriod,
RelistThreshold: eventedPlegRelistThreshold,
}
klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, eventChannel, genericRelistDuration, klet.podCache, clock.RealClock{})
klet.pleg = pleg.NewGenericPLEG(logger, klet.containerRuntime, eventChannel, genericRelistDuration, klet.podCache, clock.RealClock{})
// In case Evented PLEG has to fall back on Generic PLEG due to an error,
// Evented PLEG should be able to reset the Generic PLEG relisting duration
// to the default value.
eventedRelistDuration := &pleg.RelistDuration{
RelistPeriod: genericPlegRelistPeriod,
RelistThreshold: genericPlegRelistThreshold,
}
klet.eventedPleg, err = pleg.NewEventedPLEG(klet.containerRuntime, klet.runtimeService, eventChannel,
klet.eventedPleg, err = pleg.NewEventedPLEG(logger, klet.containerRuntime, klet.runtimeService, eventChannel,
klet.podCache, klet.pleg, eventedPlegMaxStreamRetries, eventedRelistDuration, clock.RealClock{})
if err != nil {
return nil, err
@@ -756,7 +756,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
RelistPeriod: genericPlegRelistPeriod,
RelistThreshold: genericPlegRelistThreshold,
}
klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, eventChannel, genericRelistDuration, klet.podCache, clock.RealClock{})
klet.pleg = pleg.NewGenericPLEG(logger, klet.containerRuntime, eventChannel, genericRelistDuration, klet.podCache, clock.RealClock{})
}

klet.runtimeState = newRuntimeState(maxWaitForContainerRuntime)
2 changes: 1 addition & 1 deletion pkg/kubelet/kubelet_test.go
Original file line number Diff line number Diff line change
@@ -334,7 +334,7 @@ func newTestKubeletWithImageList(
kubelet.resyncInterval = 10 * time.Second
kubelet.workQueue = queue.NewBasicWorkQueue(fakeClock)
// Relist period does not affect the tests.
kubelet.pleg = pleg.NewGenericPLEG(fakeRuntime, make(chan *pleg.PodLifecycleEvent, 100), &pleg.RelistDuration{RelistPeriod: time.Hour, RelistThreshold: genericPlegRelistThreshold}, kubelet.podCache, clock.RealClock{})
kubelet.pleg = pleg.NewGenericPLEG(logger, fakeRuntime, make(chan *pleg.PodLifecycleEvent, 100), &pleg.RelistDuration{RelistPeriod: time.Hour, RelistThreshold: genericPlegRelistThreshold}, kubelet.podCache, clock.RealClock{})
kubelet.clock = fakeClock

nodeRef := &v1.ObjectReference{
35 changes: 19 additions & 16 deletions pkg/kubelet/pleg/evented.go
Original file line number Diff line number Diff line change
@@ -83,10 +83,12 @@ type EventedPLEG struct {
stopCacheUpdateCh chan struct{}
// Locks the start/stop operation of the Evented PLEG.
runningMu sync.Mutex
// logger is used for contextual logging
logger klog.Logger
}

// NewEventedPLEG instantiates a new EventedPLEG object and return it.
func NewEventedPLEG(runtime kubecontainer.Runtime, runtimeService internalapi.RuntimeService, eventChannel chan *PodLifecycleEvent,
func NewEventedPLEG(logger klog.Logger, runtime kubecontainer.Runtime, runtimeService internalapi.RuntimeService, eventChannel chan *PodLifecycleEvent,
cache kubecontainer.Cache, genericPleg PodLifecycleEventGenerator, eventedPlegMaxStreamRetries int,
relistDuration *RelistDuration, clock clock.Clock) (PodLifecycleEventGenerator, error) {
handler, ok := genericPleg.(podLifecycleEventGeneratorHandler)
@@ -102,6 +104,7 @@ func NewEventedPLEG(runtime kubecontainer.Runtime, runtimeService internalapi.Ru
eventedPlegMaxStreamRetries: eventedPlegMaxStreamRetries,
relistDuration: relistDuration,
clock: clock,
logger: logger,
}, nil
}

@@ -184,7 +187,7 @@ func (e *EventedPLEG) watchEventsChannel() {
if numAttempts >= e.eventedPlegMaxStreamRetries {
if isEventedPLEGInUse() {
// Fall back to Generic PLEG relisting since Evented PLEG is not working.
klog.V(4).InfoS("Fall back to Generic PLEG relisting since Evented PLEG is not working")
e.logger.V(4).Info("Fall back to Generic PLEG relisting since Evented PLEG is not working")
e.Stop()
e.genericPleg.Stop() // Stop the existing Generic PLEG which runs with longer relisting period when Evented PLEG is in use.
e.Update(e.relistDuration) // Update the relisting period to the default value for the Generic PLEG.
@@ -200,7 +203,7 @@ func (e *EventedPLEG) watchEventsChannel() {
metrics.EventedPLEGConnErr.Inc()
numAttempts++
e.Relist() // Force a relist to get the latest container and pods running metric.
klog.V(4).InfoS("Evented PLEG: Failed to get container events, retrying: ", "err", err)
e.logger.V(4).Info("Evented PLEG: Failed to get container events, retrying: ", "err", err)
}
}
}()
@@ -221,7 +224,7 @@ func (e *EventedPLEG) processCRIEvents(containerEventsResponseCh chan *runtimeap
// b) in worst case, a relist will eventually sync the pod status.
// TODO(#114371): Figure out a way to handle this case instead of ignoring.
if event.PodSandboxStatus == nil || event.PodSandboxStatus.Metadata == nil {
klog.ErrorS(nil, "Evented PLEG: received ContainerEventResponse with nil PodSandboxStatus or PodSandboxStatus.Metadata", "containerEventResponse", event)
e.logger.Error(nil, "Evented PLEG: received ContainerEventResponse with nil PodSandboxStatus or PodSandboxStatus.Metadata", "containerEventResponse", event)
continue
}

@@ -234,15 +237,15 @@ func (e *EventedPLEG) processCRIEvents(containerEventsResponseCh chan *runtimeap
// if branch is okay, we just use it to determine whether the
// additional "podStatus" key and its value should be added.
if klog.V(6).Enabled() {
klog.ErrorS(err, "Evented PLEG: error generating pod status from the received event", "podUID", podID, "podStatus", status)
e.logger.Error(err, "Evented PLEG: error generating pod status from the received event", "podUID", podID, "podStatus", status)
} else {
klog.ErrorS(err, "Evented PLEG: error generating pod status from the received event", "podUID", podID)
e.logger.Error(err, "Evented PLEG: error generating pod status from the received event", "podUID", podID)
}
} else {
if klogV := klog.V(6); klogV.Enabled() {
klogV.InfoS("Evented PLEG: Generated pod status from the received event", "podUID", podID, "podStatus", status)
if klogV := e.logger.V(6); klogV.Enabled() {
e.logger.Info("Evented PLEG: Generated pod status from the received event", "podUID", podID, "podStatus", status)
} else {
klog.V(4).InfoS("Evented PLEG: Generated pod status from the received event", "podUID", podID)
e.logger.V(4).Info("Evented PLEG: Generated pod status from the received event", "podUID", podID)
}
// Preserve the pod IP across cache updates if the new IP is empty.
// When a pod is torn down, kubelet may race with PLEG and retrieve
@@ -282,23 +285,23 @@ func (e *EventedPLEG) processCRIEvent(event *runtimeapi.ContainerEventResponse)
switch event.ContainerEventType {
case runtimeapi.ContainerEventType_CONTAINER_STOPPED_EVENT:
e.sendPodLifecycleEvent(&PodLifecycleEvent{ID: types.UID(event.PodSandboxStatus.Metadata.Uid), Type: ContainerDied, Data: event.ContainerId})
klog.V(4).InfoS("Received Container Stopped Event", "event", event.String())
e.logger.V(4).Info("Received Container Stopped Event", "event", event.String())
case runtimeapi.ContainerEventType_CONTAINER_CREATED_EVENT:
// We only need to update the pod status on container create.
// But we don't have to generate any PodLifeCycleEvent. Container creation related
// PodLifeCycleEvent is ignored by the existing Generic PLEG as well.
// https://github.com/kubernetes/kubernetes/blob/24753aa8a4df8d10bfd6330e0f29186000c018be/pkg/kubelet/pleg/generic.go#L88 and
// https://github.com/kubernetes/kubernetes/blob/24753aa8a4df8d10bfd6330e0f29186000c018be/pkg/kubelet/pleg/generic.go#L273
klog.V(4).InfoS("Received Container Created Event", "event", event.String())
e.logger.V(4).Info("Received Container Created Event", "event", event.String())
case runtimeapi.ContainerEventType_CONTAINER_STARTED_EVENT:
e.sendPodLifecycleEvent(&PodLifecycleEvent{ID: types.UID(event.PodSandboxStatus.Metadata.Uid), Type: ContainerStarted, Data: event.ContainerId})
klog.V(4).InfoS("Received Container Started Event", "event", event.String())
e.logger.V(4).Info("Received Container Started Event", "event", event.String())
case runtimeapi.ContainerEventType_CONTAINER_DELETED_EVENT:
// In case the pod is deleted it is safe to generate both ContainerDied and ContainerRemoved events, just like in the case of
// Generic PLEG. https://github.com/kubernetes/kubernetes/blob/24753aa8a4df8d10bfd6330e0f29186000c018be/pkg/kubelet/pleg/generic.go#L169
e.sendPodLifecycleEvent(&PodLifecycleEvent{ID: types.UID(event.PodSandboxStatus.Metadata.Uid), Type: ContainerDied, Data: event.ContainerId})
e.sendPodLifecycleEvent(&PodLifecycleEvent{ID: types.UID(event.PodSandboxStatus.Metadata.Uid), Type: ContainerRemoved, Data: event.ContainerId})
klog.V(4).InfoS("Received Container Deleted Event", "event", event)
e.logger.V(4).Info("Received Container Deleted Event", "event", event)
}
}

@@ -330,7 +333,7 @@ func (e *EventedPLEG) sendPodLifecycleEvent(event *PodLifecycleEvent) {
default:
// record how many events were discarded due to channel out of capacity
metrics.PLEGDiscardEvents.Inc()
klog.ErrorS(nil, "Evented PLEG: Event channel is full, discarded pod lifecycle event")
e.logger.Error(nil, "Evented PLEG: Event channel is full, discarded pod lifecycle event")
}
}

@@ -356,7 +359,7 @@ func getPodSandboxState(podStatus *kubecontainer.PodStatus) kubecontainer.State
func (e *EventedPLEG) updateRunningPodMetric(podStatus *kubecontainer.PodStatus) {
cachedPodStatus, err := e.cache.Get(podStatus.ID)
if err != nil {
klog.ErrorS(err, "Evented PLEG: Get cache", "podID", podStatus.ID)
e.logger.Error(err, "Evented PLEG: Get cache", "podID", podStatus.ID)
}
// cache miss condition: The pod status object will have empty state if missed in cache
if len(cachedPodStatus.SandboxStatuses) < 1 {
@@ -387,7 +390,7 @@ func getContainerStateCount(podStatus *kubecontainer.PodStatus) map[kubecontaine
func (e *EventedPLEG) updateRunningContainerMetric(podStatus *kubecontainer.PodStatus) {
cachedPodStatus, err := e.cache.Get(podStatus.ID)
if err != nil {
klog.ErrorS(err, "Evented PLEG: Get cache", "podID", podStatus.ID)
e.logger.Error(err, "Evented PLEG: Get cache", "podID", podStatus.ID)
}

// cache miss condition: The pod status object will have empty state if missed in cache
42 changes: 23 additions & 19 deletions pkg/kubelet/pleg/generic.go
Original file line number Diff line number Diff line change
@@ -78,6 +78,8 @@ type GenericPLEG struct {
relistDuration *RelistDuration
// Mutex to serialize updateCache called by relist vs UpdateCache interface
podCacheMutex sync.Mutex
// logger is used for contextual logging
logger klog.Logger
}

// plegContainerState has a one-to-one mapping to the
@@ -116,10 +118,11 @@ type podRecord struct {
type podRecords map[types.UID]*podRecord

// NewGenericPLEG instantiates a new GenericPLEG object and return it.
func NewGenericPLEG(runtime kubecontainer.Runtime, eventChannel chan *PodLifecycleEvent,
func NewGenericPLEG(logger klog.Logger, runtime kubecontainer.Runtime, eventChannel chan *PodLifecycleEvent,
relistDuration *RelistDuration, cache kubecontainer.Cache,
clock clock.Clock) PodLifecycleEventGenerator {
return &GenericPLEG{
logger: logger,
relistDuration: relistDuration,
runtime: runtime,
eventChannel: eventChannel,
@@ -176,12 +179,12 @@ func (g *GenericPLEG) Healthy() (bool, error) {
return true, nil
}

func generateEvents(podID types.UID, cid string, oldState, newState plegContainerState) []*PodLifecycleEvent {
func generateEvents(logger klog.Logger, podID types.UID, cid string, oldState, newState plegContainerState) []*PodLifecycleEvent {
if newState == oldState {
return nil
}

klog.V(4).InfoS("GenericPLEG", "podUID", podID, "containerID", cid, "oldState", oldState, "newState", newState)
logger.V(4).Info("GenericPLEG", "podUID", podID, "containerID", cid, "oldState", oldState, "newState", newState)
switch newState {
case plegContainerRunning:
return []*PodLifecycleEvent{{ID: podID, Type: ContainerStarted, Data: cid}}
@@ -221,7 +224,8 @@ func (g *GenericPLEG) Relist() {
defer g.relistLock.Unlock()

ctx := context.Background()
klog.V(5).InfoS("GenericPLEG: Relisting")

g.logger.V(5).Info("GenericPLEG: Relisting")

if lastRelistTime := g.getRelistTime(); !lastRelistTime.IsZero() {
metrics.PLEGRelistInterval.Observe(metrics.SinceInSeconds(lastRelistTime))
@@ -235,7 +239,7 @@ func (g *GenericPLEG) Relist() {
// Get all the pods.
podList, err := g.runtime.GetPods(ctx, true)
if err != nil {
klog.ErrorS(err, "GenericPLEG: Unable to retrieve pods")
g.logger.Error(err, "GenericPLEG: Unable to retrieve pods")
return
}

@@ -254,7 +258,7 @@ func (g *GenericPLEG) Relist() {
// Get all containers in the old and the new pod.
allContainers := getContainersFromPods(oldPod, pod)
for _, container := range allContainers {
events := computeEvents(oldPod, pod, &container.ID)
events := computeEvents(g.logger, oldPod, pod, &container.ID)
for _, e := range events {
updateEvents(eventsByPodID, e)
}
@@ -282,7 +286,7 @@ func (g *GenericPLEG) Relist() {
// parallelize if needed.
if err, updated := g.updateCache(ctx, pod, pid); err != nil {
// Rely on updateCache calling GetPodStatus to log the actual error.
klog.V(4).ErrorS(err, "PLEG: Ignoring events for pod", "pod", klog.KRef(pod.Namespace, pod.Name))
g.logger.V(4).Error(err, "PLEG: Ignoring events for pod", "pod", klog.KRef(pod.Namespace, pod.Name))

// make sure we try to reinspect the pod during the next relisting
needsReinspection[pid] = pod
@@ -315,7 +319,7 @@ func (g *GenericPLEG) Relist() {
case g.eventChannel <- events[i]:
default:
metrics.PLEGDiscardEvents.Inc()
klog.ErrorS(nil, "Event channel is full, discard this relist() cycle event")
g.logger.Error(nil, "Event channel is full, discard this relist() cycle event")
}
// Log exit code of containers when they finished in a particular event
if events[i].Type == ContainerDied {
@@ -331,7 +335,7 @@ func (g *GenericPLEG) Relist() {
}
if containerID, ok := events[i].Data.(string); ok {
if exitCode, ok := containerExitCode[containerID]; ok && pod != nil {
klog.V(2).InfoS("Generic (PLEG): container finished", "podID", pod.ID, "containerID", containerID, "exitCode", exitCode)
g.logger.V(2).Info("Generic (PLEG): container finished", "podID", pod.ID, "containerID", containerID, "exitCode", exitCode)
}
}
}
@@ -341,11 +345,11 @@ func (g *GenericPLEG) Relist() {
if g.cacheEnabled() {
// reinspect any pods that failed inspection during the previous relist
if len(g.podsToReinspect) > 0 {
klog.V(5).InfoS("GenericPLEG: Reinspecting pods that previously failed inspection")
g.logger.V(5).Info("GenericPLEG: Reinspecting pods that previously failed inspection")
for pid, pod := range g.podsToReinspect {
if err, _ := g.updateCache(ctx, pod, pid); err != nil {
// Rely on updateCache calling GetPodStatus to log the actual error.
klog.V(5).ErrorS(err, "PLEG: pod failed reinspection", "pod", klog.KRef(pod.Namespace, pod.Name))
g.logger.V(5).Error(err, "PLEG: pod failed reinspection", "pod", klog.KRef(pod.Namespace, pod.Name))
needsReinspection[pid] = pod
}
}
@@ -386,7 +390,7 @@ func getContainersFromPods(pods ...*kubecontainer.Pod) []*kubecontainer.Containe
return containers
}

func computeEvents(oldPod, newPod *kubecontainer.Pod, cid *kubecontainer.ContainerID) []*PodLifecycleEvent {
func computeEvents(logger klog.Logger, oldPod, newPod *kubecontainer.Pod, cid *kubecontainer.ContainerID) []*PodLifecycleEvent {
var pid types.UID
if oldPod != nil {
pid = oldPod.ID
@@ -395,7 +399,7 @@ func computeEvents(oldPod, newPod *kubecontainer.Pod, cid *kubecontainer.Contain
}
oldState := getContainerState(oldPod, cid)
newState := getContainerState(newPod, cid)
return generateEvents(pid, cid.ID, oldState, newState)
return generateEvents(logger, pid, cid.ID, oldState, newState)
}

func (g *GenericPLEG) cacheEnabled() bool {
@@ -433,7 +437,7 @@ func (g *GenericPLEG) updateCache(ctx context.Context, pod *kubecontainer.Pod, p
if pod == nil {
// The pod is missing in the current relist. This means that
// the pod has no visible (active or inactive) containers.
klog.V(4).InfoS("PLEG: Delete status for pod", "podUID", string(pid))
g.logger.V(4).Info("PLEG: Delete status for pod", "podUID", string(pid))
g.cache.Delete(pid)
return nil, true
}
@@ -448,15 +452,15 @@ func (g *GenericPLEG) updateCache(ctx context.Context, pod *kubecontainer.Pod, p
// if branch is okay, we just use it to determine whether the
// additional "podStatus" key and its value should be added.
if klog.V(6).Enabled() {
klog.ErrorS(err, "PLEG: Write status", "pod", klog.KRef(pod.Namespace, pod.Name), "podStatus", status)
g.logger.Error(err, "PLEG: Write status", "pod", klog.KRef(pod.Namespace, pod.Name), "podStatus", status)
} else {
klog.ErrorS(err, "PLEG: Write status", "pod", klog.KRef(pod.Namespace, pod.Name))
g.logger.Error(err, "PLEG: Write status", "pod", klog.KRef(pod.Namespace, pod.Name))
}
} else {
if klogV := klog.V(6); klogV.Enabled() {
klogV.InfoS("PLEG: Write status", "pod", klog.KRef(pod.Namespace, pod.Name), "podStatus", status)
if klogV := g.logger.V(6); klogV.Enabled() {
g.logger.Info("PLEG: Write status", "pod", klog.KRef(pod.Namespace, pod.Name), "podStatus", status)
} else {
klog.V(4).InfoS("PLEG: Write status", "pod", klog.KRef(pod.Namespace, pod.Name))
g.logger.V(4).Info("PLEG: Write status", "pod", klog.KRef(pod.Namespace, pod.Name))
}
// Preserve the pod IP across cache updates if the new IP is empty.
// When a pod is torn down, kubelet may race with PLEG and retrieve

0 comments on commit 2474369

Please sign in to comment.