Skip to content

Commit

Permalink
PLEG: reinspect pods that failed prior inspections
Browse files Browse the repository at this point in the history
Fix the following sequence of events:

1. relist call 1 successfully inspects a pod (just has infra container)
1. relist call 2 gets an error inspecting the same pod (has infra container and a transient
container that failed to create) and doesn't update the old/new pod records
1. relist calls 3+ don't inspect the pod any more (just has infra container so it doesn't look like
anything changed)

This change adds a new list that keeps track of pods that failed inspection and retries them the
next time relist is called. Without this change, a pod in this state would never be inspected again,
its entry in the status cache would never be updated, and the pod worker would never call syncPod
again because the most recent entry in the status cache has an error associated with it. Without
this change, pods in this state would be stuck Terminating forever, unless the user issued a
deletion with a grace period value of 0.
  • Loading branch information
Andy Goldstein committed May 3, 2016
1 parent 9289907 commit 3a87bfb
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 0 deletions.
31 changes: 31 additions & 0 deletions pkg/kubelet/pleg/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ type GenericPLEG struct {
cache kubecontainer.Cache
// For testability.
clock util.Clock
// Pods that failed to have their status retrieved during a relist. These pods will be
// retried during the next relisting.
podsToReinspect map[types.UID]*kubecontainer.Pod
}

// plegContainerState has a one-to-one mapping to the
Expand Down Expand Up @@ -210,6 +213,11 @@ func (g *GenericPLEG) relist() {
}
}

var needsReinspection map[types.UID]*kubecontainer.Pod
if g.cacheEnabled() {
needsReinspection = make(map[types.UID]*kubecontainer.Pod)
}

// If there are events associated with a pod, we should update the
// podCache.
for pid, events := range eventsByPodID {
Expand All @@ -226,7 +234,16 @@ func (g *GenericPLEG) relist() {
// parallelize if needed.
if err := g.updateCache(pod, pid); err != nil {
glog.Errorf("PLEG: Ignoring events for pod %s/%s: %v", pod.Name, pod.Namespace, err)

// make sure we try to reinspect the pod during the next relisting
needsReinspection[pid] = pod

continue
} else if _, found := g.podsToReinspect[pid]; found {
// this pod was in the list to reinspect and we did so because it had events, so remove it
// from the list (we don't want the reinspection code below to inspect it a second time in
// this relist execution)
delete(g.podsToReinspect, pid)
}
}
// Update the internal storage and send out the events.
Expand All @@ -241,10 +258,24 @@ func (g *GenericPLEG) relist() {
}

if g.cacheEnabled() {
// reinspect any pods that failed inspection during the previous relist
if len(g.podsToReinspect) > 0 {
glog.V(5).Infof("GenericPLEG: Reinspecting pods that previously failed inspection")
for pid, pod := range g.podsToReinspect {
if err := g.updateCache(pod, pid); err != nil {
glog.Errorf("PLEG: pod %s/%s failed reinspection: %v", pod.Name, pod.Namespace, err)
needsReinspection[pid] = pod
}
}
}

// Update the cache timestamp. This needs to happen *after*
// all pods have been properly updated in the cache.
g.cache.UpdateTime(timestamp)
}

// make sure we retain the list of pods that need reinspecting the next time relist is called
g.podsToReinspect = needsReinspection
}

func getContainersFromPods(pods ...*kubecontainer.Pod) []*kubecontainer.Container {
Expand Down
69 changes: 69 additions & 0 deletions pkg/kubelet/pleg/generic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package pleg

import (
"errors"
"fmt"
"reflect"
"sort"
Expand Down Expand Up @@ -356,3 +357,71 @@ func TestHealthy(t *testing.T) {
ok, _ = pleg.Healthy()
assert.True(t, ok, "pleg should be healthy")
}

func TestRelistWithReinspection(t *testing.T) {
pleg, runtimeMock := newTestGenericPLEGWithRuntimeMock()
ch := pleg.Watch()

infraContainer := createTestContainer("infra", kubecontainer.ContainerStateRunning)

podID := types.UID("test-pod")
pods := []*kubecontainer.Pod{{
ID: podID,
Containers: []*kubecontainer.Container{infraContainer},
}}
runtimeMock.On("GetPods", true).Return(pods, nil).Once()

goodStatus := &kubecontainer.PodStatus{
ID: podID,
ContainerStatuses: []*kubecontainer.ContainerStatus{{ID: infraContainer.ID, State: infraContainer.State}},
}
runtimeMock.On("GetPodStatus", podID, "", "").Return(goodStatus, nil).Once()

goodEvent := &PodLifecycleEvent{ID: podID, Type: ContainerStarted, Data: infraContainer.ID.ID}

// listing 1 - everything ok, infra container set up for pod
pleg.relist()
actualEvents := getEventsFromChannel(ch)
actualStatus, actualErr := pleg.cache.Get(podID)
assert.Equal(t, goodStatus, actualStatus)
assert.Equal(t, nil, actualErr)
assert.Exactly(t, []*PodLifecycleEvent{goodEvent}, actualEvents)

// listing 2 - pretend runtime was in the middle of creating the non-infra container for the pod
// and return an error during inspection
transientContainer := createTestContainer("transient", kubecontainer.ContainerStateUnknown)
podsWithTransientContainer := []*kubecontainer.Pod{{
ID: podID,
Containers: []*kubecontainer.Container{infraContainer, transientContainer},
}}
runtimeMock.On("GetPods", true).Return(podsWithTransientContainer, nil).Once()

badStatus := &kubecontainer.PodStatus{
ID: podID,
ContainerStatuses: []*kubecontainer.ContainerStatus{},
}
runtimeMock.On("GetPodStatus", podID, "", "").Return(badStatus, errors.New("inspection error")).Once()

pleg.relist()
actualEvents = getEventsFromChannel(ch)
actualStatus, actualErr = pleg.cache.Get(podID)
assert.Equal(t, badStatus, actualStatus)
assert.Equal(t, errors.New("inspection error"), actualErr)
assert.Exactly(t, []*PodLifecycleEvent{}, actualEvents)

// listing 3 - pretend the transient container has now disappeared, leaving just the infra
// container. Make sure the pod is reinspected for its status and the cache is updated.
runtimeMock.On("GetPods", true).Return(pods, nil).Once()
runtimeMock.On("GetPodStatus", podID, "", "").Return(goodStatus, nil).Once()

pleg.relist()
actualEvents = getEventsFromChannel(ch)
actualStatus, actualErr = pleg.cache.Get(podID)
assert.Equal(t, goodStatus, actualStatus)
assert.Equal(t, nil, actualErr)
// no events are expected because relist #1 set the old pod record which has the infra container
// running. relist #2 had the inspection error and therefore didn't modify either old or new.
// relist #3 forced the reinspection of the pod to retrieve its status, but because the list of
// containers was the same as relist #1, nothing "changed", so there are no new events.
assert.Exactly(t, []*PodLifecycleEvent{}, actualEvents)
}

0 comments on commit 3a87bfb

Please sign in to comment.