diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 3091817a4ddb0..caa2b878a155a 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -155,6 +155,8 @@ func NewMainKubelet( return nil, err } + klet.podStatuses = make(map[string]api.PodStatus) + return klet, nil } @@ -235,6 +237,10 @@ type Kubelet struct { // the EventRecorder to use recorder record.EventRecorder + + // A pod status cache currently used to store rejected pods and their statuses. + podStatusesLock sync.RWMutex + podStatuses map[string]api.PodStatus } // getRootDir returns the full path to the directory under which kubelet can @@ -456,6 +462,30 @@ func (kl *Kubelet) GetCadvisorClient() cadvisorInterface { return kl.cadvisorClient } +func (kl *Kubelet) getPodStatusFromCache(podFullName string) (api.PodStatus, bool) { + kl.podStatusesLock.RLock() + defer kl.podStatusesLock.RUnlock() + status, ok := kl.podStatuses[podFullName] + return status, ok +} + +func (kl *Kubelet) setPodStatusInCache(podFullName string, status api.PodStatus) { + kl.podStatusesLock.Lock() + defer kl.podStatusesLock.Unlock() + kl.podStatuses[podFullName] = status +} + +func (kl *Kubelet) removeOrphanedStatuses(podFullNames map[string]bool) { + kl.podStatusesLock.Lock() + defer kl.podStatusesLock.Unlock() + for key := range kl.podStatuses { + if _, ok := podFullNames[key]; !ok { + glog.V(5).Infof("Removing %q from status map.", key) + delete(kl.podStatuses, key) + } + } +} + // Run starts the kubelet reacting to config updates func (kl *Kubelet) Run(updates <-chan PodUpdate) { if kl.logServer == nil { @@ -1277,10 +1307,28 @@ func (kl *Kubelet) cleanupOrphanedVolumes(pods []api.BoundPod, running []*docker } // SyncPods synchronizes the configured list of pods (desired state) with the host current state. -func (kl *Kubelet) SyncPods(pods []api.BoundPod, podSyncTypes map[types.UID]metrics.SyncPodType, start time.Time) error { +func (kl *Kubelet) SyncPods(allPods []api.BoundPod, podSyncTypes map[types.UID]metrics.SyncPodType, start time.Time) error { defer func() { metrics.SyncPodsLatency.Observe(metrics.SinceInMicroseconds(start)) }() + + // Remove obsolete entries in podStatus where the pod is no longer considered bound to this node. + podFullNames := make(map[string]bool) + for _, pod := range allPods { + podFullNames[GetPodFullName(&pod)] = true + } + kl.removeOrphanedStatuses(podFullNames) + + // Filtered out the rejected pod. They don't have running containers. + var pods []api.BoundPod + for _, pod := range allPods { + status, ok := kl.getPodStatusFromCache(GetPodFullName(&pod)) + if ok && status.Phase == api.PodFailed { + continue + } + pods = append(pods, pod) + } + glog.V(4).Infof("Desired: %#v", pods) var err error desiredContainers := make(map[podContainer]empty) @@ -1404,9 +1452,9 @@ func (s podsByCreationTime) Less(i, j int) bool { return s[i].CreationTimestamp.Before(s[j].CreationTimestamp) } -// filterHostPortConflicts removes pods that conflict on Port.HostPort values -func (kl *Kubelet) filterHostPortConflicts(pods []api.BoundPod) []api.BoundPod { - filtered := []api.BoundPod{} +// getHostPortConflicts detects pods with conflicted host ports and return them. +func getHostPortConflicts(pods []api.BoundPod) []api.BoundPod { + conflicts := []api.BoundPod{} ports := map[int]bool{} extract := func(p *api.ContainerPort) int { return p.HostPort } @@ -1416,15 +1464,24 @@ func (kl *Kubelet) filterHostPortConflicts(pods []api.BoundPod) []api.BoundPod { for i := range pods { pod := &pods[i] if errs := validation.AccumulateUniquePorts(pod.Spec.Containers, ports, extract); len(errs) != 0 { - glog.Warningf("Pod %q: HostPort is already allocated, ignoring: %v", GetPodFullName(pod), errs) - kl.recorder.Eventf(pod, "hostPortConflict", "Cannot start the pod due to host port conflict.") - // TODO: Set the pod status to fail. + glog.Errorf("Pod %q: HostPort is already allocated, ignoring: %v", GetPodFullName(pod), errs) + conflicts = append(conflicts, *pod) continue } - filtered = append(filtered, *pod) } - return filtered + return conflicts +} + +// handleHostPortConflicts handles pods that conflict on Port.HostPort values. +func (kl *Kubelet) handleHostPortConflicts(pods []api.BoundPod) { + conflicts := getHostPortConflicts(pods) + for _, pod := range conflicts { + kl.recorder.Eventf(&pod, "hostPortConflict", "Cannot start the pod due to host port conflict.") + kl.setPodStatusInCache(GetPodFullName(&pod), api.PodStatus{ + Phase: api.PodFailed, + Message: "Pod cannot be started due to host port conflict"}) + } } func (kl *Kubelet) handleUpdate(u PodUpdate) { @@ -1434,12 +1491,11 @@ func (kl *Kubelet) handleUpdate(u PodUpdate) { case SET: glog.V(3).Infof("SET: Containers changed") kl.pods = u.Pods - kl.pods = kl.filterHostPortConflicts(kl.pods) + kl.handleHostPortConflicts(kl.pods) case UPDATE: glog.V(3).Infof("Update: Containers changed") kl.pods = updateBoundPods(u.Pods, kl.pods) - kl.pods = kl.filterHostPortConflicts(kl.pods) - + kl.handleHostPortConflicts(kl.pods) default: panic("syncLoop does not support incremental changes") } @@ -1505,7 +1561,7 @@ func (kl *Kubelet) updatePods(u PodUpdate, podSyncTypes map[types.UID]metrics.Sy } kl.pods = u.Pods - kl.pods = kl.filterHostPortConflicts(kl.pods) + kl.handleHostPortConflicts(kl.pods) case UPDATE: glog.V(3).Infof("Update: Containers changed") @@ -1514,9 +1570,8 @@ func (kl *Kubelet) updatePods(u PodUpdate, podSyncTypes map[types.UID]metrics.Sy for i := range u.Pods { podSyncTypes[u.Pods[i].UID] = metrics.SyncPodUpdate } - kl.pods = updateBoundPods(u.Pods, kl.pods) - kl.pods = kl.filterHostPortConflicts(kl.pods) + kl.handleHostPortConflicts(kl.pods) default: panic("syncLoop does not support incremental changes") } @@ -1714,6 +1769,12 @@ func (kl *Kubelet) GetPodStatus(podFullName string, uid types.UID) (api.PodStatu return podStatus, fmt.Errorf("Couldn't find spec for pod %s", podFullName) } + // Check to see if the pod has been rejected. + mappedPodStatus, ok := kl.getPodStatusFromCache(podFullName) + if ok { + return mappedPodStatus, nil + } + info, err := dockertools.GetDockerPodInfo(kl.dockerClient, *spec, podFullName, uid) if err != nil { diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 64d66276a4776..1098b40569bd5 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -83,6 +83,7 @@ func newTestKubelet(t *testing.T) (*Kubelet, *dockertools.FakeDockerClient, *syn kubelet.serviceLister = testServiceLister{} kubelet.readiness = newReadinessStates() kubelet.recorder = recorder + kubelet.podStatuses = map[string]api.PodStatus{} if err := kubelet.setupDataDirs(); err != nil { t.Fatalf("can't initialize kubelet data dirs: %v", err) } @@ -1204,35 +1205,6 @@ func TestMakePortsAndBindings(t *testing.T) { } } -func TestCheckHostPortConflicts(t *testing.T) { - kubelet, _, _ := newTestKubelet(t) - - successCaseAll := []api.BoundPod{ - {Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}}}, - {Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 81}}}}}}, - {Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 82}}}}}}, - } - successCaseNew := api.BoundPod{ - Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 83}}}}}, - } - expected := append(successCaseAll, successCaseNew) - if actual := kubelet.filterHostPortConflicts(expected); !reflect.DeepEqual(actual, expected) { - t.Errorf("Expected %#v, Got %#v", expected, actual) - } - - failureCaseAll := []api.BoundPod{ - {Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}}}, - {Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 81}}}}}}, - {Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 82}}}}}}, - } - failureCaseNew := api.BoundPod{ - Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 81}}}}}, - } - if actual := kubelet.filterHostPortConflicts(append(failureCaseAll, failureCaseNew)); !reflect.DeepEqual(failureCaseAll, actual) { - t.Errorf("Expected %#v, Got %#v", expected, actual) - } -} - func TestFieldPath(t *testing.T) { pod := &api.BoundPod{Spec: api.PodSpec{Containers: []api.Container{ {Name: "foo"}, @@ -3088,13 +3060,35 @@ func TestPortForward(t *testing.T) { } } -// Tests that upon host port conflict, the newer pod is removed. -func TestFilterHostPortConflicts(t *testing.T) { - kubelet, _, _ := newTestKubelet(t) +// Tests that identify the host port conflicts are detected correctly. +func TestGetHostPortConflicts(t *testing.T) { + pods := []api.BoundPod{ + {Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}}}, + {Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 81}}}}}}, + {Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 82}}}}}}, + {Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 83}}}}}}, + } + // Pods should not cause any conflict. + conflicts := getHostPortConflicts(pods) + if len(conflicts) != 0 { + t.Errorf("expected no conflicts, Got %#v", conflicts) + } - // Reuse the pod spec with the same port to create a conflict. + // The new pod should cause conflict and be reported. + expected := api.BoundPod{ + Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 81}}}}}, + } + pods = append(pods, expected) + if actual := getHostPortConflicts(pods); !reflect.DeepEqual(actual, []api.BoundPod{expected}) { + t.Errorf("expected %#v, Got %#v", expected, actual) + } +} + +// Tests that we handle port conflicts correctly by setting the failed status in status map. +func TestHandlePortConflicts(t *testing.T) { + kl, _, _ := newTestKubelet(t) spec := api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}} - var pods = []api.BoundPod{ + pods := []api.BoundPod{ { ObjectMeta: api.ObjectMeta{ UID: "123456789", @@ -3115,12 +3109,48 @@ func TestFilterHostPortConflicts(t *testing.T) { // Make sure the BoundPods are in the reverse order of creation time. pods[1].CreationTimestamp = util.NewTime(time.Now()) pods[0].CreationTimestamp = util.NewTime(time.Now().Add(1 * time.Second)) - filteredPods := kubelet.filterHostPortConflicts(pods) - if len(filteredPods) != 1 { - t.Fatalf("Expected one pod. Got pods %#v", filteredPods) + // The newer pod should be rejected. + conflictedPodName := GetPodFullName(&pods[0]) + + kl.handleHostPortConflicts(pods) + if len(kl.podStatuses) != 1 { + t.Fatalf("expected length of status map to be 1. Got map %#v.", kl.podStatuses) + } + // Check pod status stored in the status map. + status, ok := kl.podStatuses[conflictedPodName] + if !ok { + t.Fatalf("status of pod %q is not found in the status map.", conflictedPodName) + } + if status.Phase != api.PodFailed { + t.Fatalf("expected pod status %q. Got %q.", api.PodFailed, status.Phase) + } + + // Check if we can retrieve the pod status from GetPodStatus(). + kl.pods = pods + status, err := kl.GetPodStatus(conflictedPodName, "") + if err != nil { + t.Fatalf("unable to retrieve pod status for pod %q: #v.", conflictedPodName, err) + } + if status.Phase != api.PodFailed { + t.Fatalf("expected pod status %q. Got %q.", api.PodFailed, status.Phase) + } +} + +func TestPurgingObsoleteStatusMapEntries(t *testing.T) { + kl, _, _ := newTestKubelet(t) + pods := []api.BoundPod{ + {Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}}}, + {Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}}}, + } + // Run once to populate the status map. + kl.handleHostPortConflicts(pods) + if len(kl.podStatuses) != 1 { + t.Fatalf("expected length of status map to be 1. Got map %#v.", kl.podStatuses) } - if filteredPods[0].Name != "oldpod" { - t.Fatalf("Expected pod %#v. Got pod %#v", pods[1], filteredPods[0]) + // Sync with empty pods so that the entry in status map will be removed. + kl.SyncPods([]api.BoundPod{}, emptyPodUIDs, time.Now()) + if len(kl.podStatuses) != 0 { + t.Fatalf("expected length of status map to be 0. Got map %#v.", kl.podStatuses) } } diff --git a/pkg/kubelet/runonce.go b/pkg/kubelet/runonce.go index d897819cebc86..6389b36f4a2f5 100644 --- a/pkg/kubelet/runonce.go +++ b/pkg/kubelet/runonce.go @@ -55,7 +55,7 @@ func (kl *Kubelet) runOnce(pods []api.BoundPod) (results []RunPodResult, err err if kl.dockerPuller == nil { kl.dockerPuller = dockertools.NewDockerPuller(kl.dockerClient, kl.pullQPS, kl.pullBurst) } - pods = kl.filterHostPortConflicts(pods) + kl.handleHostPortConflicts(pods) ch := make(chan RunPodResult) for i := range pods { diff --git a/pkg/master/pod_cache.go b/pkg/master/pod_cache.go index ab95f3a2d048f..b835bb1209957 100644 --- a/pkg/master/pod_cache.go +++ b/pkg/master/pod_cache.go @@ -213,16 +213,8 @@ func (p *PodCache) computePodStatus(pod *api.Pod) (api.PodStatus, error) { newStatus.HostIP = p.getHostAddress(nodeStatus.Addresses) newStatus.Info = result.Status.Info newStatus.PodIP = result.Status.PodIP - if newStatus.Info == nil { - // There is a small race window that kubelet couldn't - // propulated the status yet. This should go away once - // we removed boundPods - newStatus.Phase = api.PodPending - newStatus.Conditions = append(newStatus.Conditions, pod.Status.Conditions...) - } else { - newStatus.Phase = result.Status.Phase - newStatus.Conditions = result.Status.Conditions - } + newStatus.Phase = result.Status.Phase + newStatus.Conditions = result.Status.Conditions } return newStatus, err }