Skip to content

Commit

Permalink
Merge pull request kubernetes#25038 from rphillips/cherry-pick-24841-…
Browse files Browse the repository at this point in the history
…to-release-4.2

Bug 1840872: UPSTREAM: 88440: Report deleted pod status correctly

Origin-commit: 77869edca67678ff9f6575094bae0ede1314f989
  • Loading branch information
k8s-publishing-bot committed Jun 4, 2020
2 parents 8da5699 + 56b309d commit 5485ed2
Show file tree
Hide file tree
Showing 7 changed files with 307 additions and 25 deletions.
4 changes: 4 additions & 0 deletions pkg/kubelet/container/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ type RuntimeHelper interface {
// ShouldContainerBeRestarted checks whether a container needs to be restarted.
// TODO(yifan): Think about how to refactor this.
func ShouldContainerBeRestarted(container *v1.Container, pod *v1.Pod, podStatus *PodStatus) bool {
// Once a pod has been marked deleted, it should not be restarted
if pod.DeletionTimestamp != nil {
return false
}
// Get latest container status.
status := podStatus.FindContainerStatusByName(container.Name)
// If the container was never started before, we should start it.
Expand Down
24 changes: 24 additions & 0 deletions pkg/kubelet/container/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package container
import (
"reflect"
"testing"
"time"

"github.com/stretchr/testify/assert"

Expand Down Expand Up @@ -377,6 +378,8 @@ func TestShouldContainerBeRestarted(t *testing.T) {
v1.RestartPolicyOnFailure,
v1.RestartPolicyAlways,
}

// test policies
expected := map[string][]bool{
"no-history": {true, true, true},
"alive": {false, false, false},
Expand All @@ -395,6 +398,27 @@ func TestShouldContainerBeRestarted(t *testing.T) {
}
}
}

// test deleted pod
pod.DeletionTimestamp = &metav1.Time{Time: time.Now()}
expected = map[string][]bool{
"no-history": {false, false, false},
"alive": {false, false, false},
"succeed": {false, false, false},
"failed": {false, false, false},
"unknown": {false, false, false},
}
for _, c := range pod.Spec.Containers {
for i, policy := range policies {
pod.Spec.RestartPolicy = policy
e := expected[c.Name][i]
r := ShouldContainerBeRestarted(&c, pod, podStatus)
if r != e {
t.Errorf("Restart for container %q with restart policy %q expected %t, got %t",
c.Name, policy, e, r)
}
}
}
}

func TestHasPrivilegedContainer(t *testing.T) {
Expand Down
23 changes: 13 additions & 10 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -1988,19 +1988,22 @@ func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handle
}

// dispatchWork starts the asynchronous sync of the pod in a pod worker.
// If the pod is terminated, dispatchWork will perform no action.
// If the pod has completed termination, dispatchWork will perform no action.
func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
if kl.podIsTerminated(pod) {
klog.V(4).Infof("Pod %q is terminated, ignoring remaining sync work: %s", format.Pod(pod), syncType)
if pod.DeletionTimestamp != nil {
// If the pod is in a terminated state, there is no pod worker to
// handle the work item. Check if the DeletionTimestamp has been
// set, and force a status update to trigger a pod deletion request
// to the apiserver.
kl.statusManager.TerminatePod(pod)
}
// check whether we are ready to delete the pod from the API server (all status up to date)
containersTerminal, podWorkerTerminal := kl.podAndContainersAreTerminal(pod)
if pod.DeletionTimestamp != nil && containersTerminal {
klog.V(4).Infof("Pod %q has completed execution and should be deleted from the API server: %s", format.Pod(pod), syncType)
kl.statusManager.TerminatePod(pod)
return
}

// optimization: avoid invoking the pod worker if no further changes are possible to the pod definition
if podWorkerTerminal {
klog.V(4).Infof("Pod %q has completed, ignoring remaining sync work: %s", format.Pod(pod), syncType)
return
}

// Run the sync in an async worker.
kl.podWorkers.UpdatePod(&UpdatePodOptions{
Pod: pod,
Expand Down
28 changes: 23 additions & 5 deletions pkg/kubelet/kubelet_pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -868,8 +868,9 @@ func (kl *Kubelet) getPullSecretsForPod(pod *v1.Pod) []v1.Secret {
return pullSecrets
}

// podIsTerminated returns true if pod is in the terminated state ("Failed" or "Succeeded").
func (kl *Kubelet) podIsTerminated(pod *v1.Pod) bool {
// podStatusIsTerminal reports when the specified pod has no running containers or is no longer accepting
// spec changes.
func (kl *Kubelet) podAndContainersAreTerminal(pod *v1.Pod) (containersTerminal, podWorkerTerminal bool) {
// Check the cached pod status which was set after the last sync.
status, ok := kl.statusManager.GetPodStatus(pod.UID)
if !ok {
Expand All @@ -878,11 +879,28 @@ func (kl *Kubelet) podIsTerminated(pod *v1.Pod) bool {
// restarted.
status = pod.Status
}
return status.Phase == v1.PodFailed || status.Phase == v1.PodSucceeded || (pod.DeletionTimestamp != nil && notRunning(status.ContainerStatuses))
// A pod transitions into failed or succeeded from either container lifecycle (RestartNever container
// fails) or due to external events like deletion or eviction. A terminal pod *should* have no running
// containers, but to know that the pod has completed its lifecycle you must wait for containers to also
// be terminal.
containersTerminal = notRunning(status.ContainerStatuses)
// The kubelet must accept config changes from the pod spec until it has reached a point where changes would
// have no effect on any running container.
podWorkerTerminal = status.Phase == v1.PodFailed || status.Phase == v1.PodSucceeded || (pod.DeletionTimestamp != nil && containersTerminal)
return
}

// podIsTerminated returns true if the provided pod is in a terminal phase ("Failed", "Succeeded") or
// has been deleted and has no running containers. This corresponds to when a pod must accept changes to
// its pod spec (e.g. terminating containers allow grace period to be shortened).
func (kl *Kubelet) podIsTerminated(pod *v1.Pod) bool {
_, podWorkerTerminal := kl.podAndContainersAreTerminal(pod)
return podWorkerTerminal
}

// IsPodTerminated returns true if the pod with the provided UID is in a terminated state ("Failed" or "Succeeded")
// or if the pod has been deleted or removed
// IsPodTerminated returns true if the pod with the provided UID is in a terminal phase ("Failed",
// "Succeeded") or has been deleted and has no running containers. This corresponds to when a pod must
// accept changes to its pod spec (e.g. terminating containers allow grace period to be shortened)
func (kl *Kubelet) IsPodTerminated(uid types.UID) bool {
pod, podFound := kl.podManager.GetPodByUID(uid)
if !podFound {
Expand Down
25 changes: 16 additions & 9 deletions pkg/kubelet/status/status_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,13 +153,20 @@ func (m *manager) Start() {
syncTicker := time.Tick(syncPeriod)
// syncPod and syncBatch share the same go routine to avoid sync races.
go wait.Forever(func() {
select {
case syncRequest := <-m.podStatusChannel:
klog.V(5).Infof("Status Manager: syncing pod: %q, with status: (%d, %v) from podStatusChannel",
syncRequest.podUID, syncRequest.status.version, syncRequest.status.status)
m.syncPod(syncRequest.podUID, syncRequest.status)
case <-syncTicker:
m.syncBatch()
for {
select {
case syncRequest := <-m.podStatusChannel:
klog.V(5).Infof("Status Manager: syncing pod: %q, with status: (%d, %v) from podStatusChannel",
syncRequest.podUID, syncRequest.status.version, syncRequest.status.status)
m.syncPod(syncRequest.podUID, syncRequest.status)
case <-syncTicker:
klog.V(5).Infof("Status Manager: syncing batch")
// remove any entries in the status channel since the batch will handle them
for i := len(m.podStatusChannel); i > 0; i-- {
<-m.podStatusChannel
}
m.syncBatch()
}
}
}, 0)
}
Expand Down Expand Up @@ -277,7 +284,7 @@ func (m *manager) TerminatePod(pod *v1.Pod) {
}
status := *oldStatus.DeepCopy()
for i := range status.ContainerStatuses {
if status.ContainerStatuses[i].State.Terminated != nil {
if status.ContainerStatuses[i].State.Terminated != nil || status.ContainerStatuses[i].State.Waiting != nil {
continue
}
status.ContainerStatuses[i].State = v1.ContainerState{
Expand All @@ -289,7 +296,7 @@ func (m *manager) TerminatePod(pod *v1.Pod) {
}
}
for i := range status.InitContainerStatuses {
if status.InitContainerStatuses[i].State.Terminated != nil {
if status.InitContainerStatuses[i].State.Terminated != nil || status.InitContainerStatuses[i].State.Waiting != nil {
continue
}
status.InitContainerStatuses[i].State = v1.ContainerState{
Expand Down
1 change: 1 addition & 0 deletions test/e2e/node/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//test/e2e/common:go_default_library",
"//test/e2e/framework:go_default_library",
Expand Down
Loading

0 comments on commit 5485ed2

Please sign in to comment.