Skip to content

Commit

Permalink
kubelet: reject pods on host port conflict
Browse files Browse the repository at this point in the history
When a host port conflict is detected, kubelet should set the pod status to
fail. The failed status will then be polled by other components at a later time,
which allows replication controller to create a new pod if necessary.

To achieve this, this change stores the pod status information in a status map
upon the detecton of port conflict. GetPodStatus() consults this status map
before attempting to query docker. The entries in the status map will be removed
when the pod is no longer associated with the node.
  • Loading branch information
yujuhong committed Mar 6, 2015
1 parent ca265c5 commit 3ccdb8d
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 65 deletions.
91 changes: 76 additions & 15 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ func NewMainKubelet(
return nil, err
}

klet.podStatuses = make(map[string]api.PodStatus)

return klet, nil
}

Expand Down Expand Up @@ -235,6 +237,10 @@ type Kubelet struct {

// the EventRecorder to use
recorder record.EventRecorder

// A pod status cache currently used to store rejected pods and their statuses.
podStatusesLock sync.RWMutex
podStatuses map[string]api.PodStatus
}

// getRootDir returns the full path to the directory under which kubelet can
Expand Down Expand Up @@ -456,6 +462,30 @@ func (kl *Kubelet) GetCadvisorClient() cadvisorInterface {
return kl.cadvisorClient
}

func (kl *Kubelet) getPodStatusFromCache(podFullName string) (api.PodStatus, bool) {
kl.podStatusesLock.RLock()
defer kl.podStatusesLock.RUnlock()
status, ok := kl.podStatuses[podFullName]
return status, ok
}

func (kl *Kubelet) setPodStatusInCache(podFullName string, status api.PodStatus) {
kl.podStatusesLock.Lock()
defer kl.podStatusesLock.Unlock()
kl.podStatuses[podFullName] = status
}

func (kl *Kubelet) removeOrphanedStatuses(podFullNames map[string]bool) {
kl.podStatusesLock.Lock()
defer kl.podStatusesLock.Unlock()
for key := range kl.podStatuses {
if _, ok := podFullNames[key]; !ok {
glog.V(5).Infof("Removing %q from status map.", key)
delete(kl.podStatuses, key)
}
}
}

// Run starts the kubelet reacting to config updates
func (kl *Kubelet) Run(updates <-chan PodUpdate) {
if kl.logServer == nil {
Expand Down Expand Up @@ -1277,10 +1307,28 @@ func (kl *Kubelet) cleanupOrphanedVolumes(pods []api.BoundPod, running []*docker
}

// SyncPods synchronizes the configured list of pods (desired state) with the host current state.
func (kl *Kubelet) SyncPods(pods []api.BoundPod, podSyncTypes map[types.UID]metrics.SyncPodType, start time.Time) error {
func (kl *Kubelet) SyncPods(allPods []api.BoundPod, podSyncTypes map[types.UID]metrics.SyncPodType, start time.Time) error {
defer func() {
metrics.SyncPodsLatency.Observe(metrics.SinceInMicroseconds(start))
}()

// Remove obsolete entries in podStatus where the pod is no longer considered bound to this node.
podFullNames := make(map[string]bool)
for _, pod := range allPods {
podFullNames[GetPodFullName(&pod)] = true
}
kl.removeOrphanedStatuses(podFullNames)

// Filtered out the rejected pod. They don't have running containers.
var pods []api.BoundPod
for _, pod := range allPods {
status, ok := kl.getPodStatusFromCache(GetPodFullName(&pod))
if ok && status.Phase == api.PodFailed {
continue
}
pods = append(pods, pod)
}

glog.V(4).Infof("Desired: %#v", pods)
var err error
desiredContainers := make(map[podContainer]empty)
Expand Down Expand Up @@ -1404,9 +1452,9 @@ func (s podsByCreationTime) Less(i, j int) bool {
return s[i].CreationTimestamp.Before(s[j].CreationTimestamp)
}

// filterHostPortConflicts removes pods that conflict on Port.HostPort values
func (kl *Kubelet) filterHostPortConflicts(pods []api.BoundPod) []api.BoundPod {
filtered := []api.BoundPod{}
// getHostPortConflicts detects pods with conflicted host ports and return them.
func getHostPortConflicts(pods []api.BoundPod) []api.BoundPod {
conflicts := []api.BoundPod{}
ports := map[int]bool{}
extract := func(p *api.ContainerPort) int { return p.HostPort }

Expand All @@ -1416,15 +1464,24 @@ func (kl *Kubelet) filterHostPortConflicts(pods []api.BoundPod) []api.BoundPod {
for i := range pods {
pod := &pods[i]
if errs := validation.AccumulateUniquePorts(pod.Spec.Containers, ports, extract); len(errs) != 0 {
glog.Warningf("Pod %q: HostPort is already allocated, ignoring: %v", GetPodFullName(pod), errs)
kl.recorder.Eventf(pod, "hostPortConflict", "Cannot start the pod due to host port conflict.")
// TODO: Set the pod status to fail.
glog.Errorf("Pod %q: HostPort is already allocated, ignoring: %v", GetPodFullName(pod), errs)
conflicts = append(conflicts, *pod)
continue
}
filtered = append(filtered, *pod)
}

return filtered
return conflicts
}

// handleHostPortConflicts handles pods that conflict on Port.HostPort values.
func (kl *Kubelet) handleHostPortConflicts(pods []api.BoundPod) {
conflicts := getHostPortConflicts(pods)
for _, pod := range conflicts {
kl.recorder.Eventf(&pod, "hostPortConflict", "Cannot start the pod due to host port conflict.")
kl.setPodStatusInCache(GetPodFullName(&pod), api.PodStatus{
Phase: api.PodFailed,
Message: "Pod cannot be started due to host port conflict"})
}
}

func (kl *Kubelet) handleUpdate(u PodUpdate) {
Expand All @@ -1434,12 +1491,11 @@ func (kl *Kubelet) handleUpdate(u PodUpdate) {
case SET:
glog.V(3).Infof("SET: Containers changed")
kl.pods = u.Pods
kl.pods = kl.filterHostPortConflicts(kl.pods)
kl.handleHostPortConflicts(kl.pods)
case UPDATE:
glog.V(3).Infof("Update: Containers changed")
kl.pods = updateBoundPods(u.Pods, kl.pods)
kl.pods = kl.filterHostPortConflicts(kl.pods)

kl.handleHostPortConflicts(kl.pods)
default:
panic("syncLoop does not support incremental changes")
}
Expand Down Expand Up @@ -1505,7 +1561,7 @@ func (kl *Kubelet) updatePods(u PodUpdate, podSyncTypes map[types.UID]metrics.Sy
}

kl.pods = u.Pods
kl.pods = kl.filterHostPortConflicts(kl.pods)
kl.handleHostPortConflicts(kl.pods)
case UPDATE:
glog.V(3).Infof("Update: Containers changed")

Expand All @@ -1514,9 +1570,8 @@ func (kl *Kubelet) updatePods(u PodUpdate, podSyncTypes map[types.UID]metrics.Sy
for i := range u.Pods {
podSyncTypes[u.Pods[i].UID] = metrics.SyncPodUpdate
}

kl.pods = updateBoundPods(u.Pods, kl.pods)
kl.pods = kl.filterHostPortConflicts(kl.pods)
kl.handleHostPortConflicts(kl.pods)
default:
panic("syncLoop does not support incremental changes")
}
Expand Down Expand Up @@ -1714,6 +1769,12 @@ func (kl *Kubelet) GetPodStatus(podFullName string, uid types.UID) (api.PodStatu
return podStatus, fmt.Errorf("Couldn't find spec for pod %s", podFullName)
}

// Check to see if the pod has been rejected.
mappedPodStatus, ok := kl.getPodStatusFromCache(podFullName)
if ok {
return mappedPodStatus, nil
}

info, err := dockertools.GetDockerPodInfo(kl.dockerClient, *spec, podFullName, uid)

if err != nil {
Expand Down
108 changes: 69 additions & 39 deletions pkg/kubelet/kubelet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func newTestKubelet(t *testing.T) (*Kubelet, *dockertools.FakeDockerClient, *syn
kubelet.serviceLister = testServiceLister{}
kubelet.readiness = newReadinessStates()
kubelet.recorder = recorder
kubelet.podStatuses = map[string]api.PodStatus{}
if err := kubelet.setupDataDirs(); err != nil {
t.Fatalf("can't initialize kubelet data dirs: %v", err)
}
Expand Down Expand Up @@ -1204,35 +1205,6 @@ func TestMakePortsAndBindings(t *testing.T) {
}
}

func TestCheckHostPortConflicts(t *testing.T) {
kubelet, _, _ := newTestKubelet(t)

successCaseAll := []api.BoundPod{
{Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}}},
{Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 81}}}}}},
{Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 82}}}}}},
}
successCaseNew := api.BoundPod{
Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 83}}}}},
}
expected := append(successCaseAll, successCaseNew)
if actual := kubelet.filterHostPortConflicts(expected); !reflect.DeepEqual(actual, expected) {
t.Errorf("Expected %#v, Got %#v", expected, actual)
}

failureCaseAll := []api.BoundPod{
{Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}}},
{Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 81}}}}}},
{Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 82}}}}}},
}
failureCaseNew := api.BoundPod{
Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 81}}}}},
}
if actual := kubelet.filterHostPortConflicts(append(failureCaseAll, failureCaseNew)); !reflect.DeepEqual(failureCaseAll, actual) {
t.Errorf("Expected %#v, Got %#v", expected, actual)
}
}

func TestFieldPath(t *testing.T) {
pod := &api.BoundPod{Spec: api.PodSpec{Containers: []api.Container{
{Name: "foo"},
Expand Down Expand Up @@ -3088,13 +3060,35 @@ func TestPortForward(t *testing.T) {
}
}

// Tests that upon host port conflict, the newer pod is removed.
func TestFilterHostPortConflicts(t *testing.T) {
kubelet, _, _ := newTestKubelet(t)
// Tests that identify the host port conflicts are detected correctly.
func TestGetHostPortConflicts(t *testing.T) {
pods := []api.BoundPod{
{Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}}},
{Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 81}}}}}},
{Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 82}}}}}},
{Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 83}}}}}},
}
// Pods should not cause any conflict.
conflicts := getHostPortConflicts(pods)
if len(conflicts) != 0 {
t.Errorf("expected no conflicts, Got %#v", conflicts)
}

// Reuse the pod spec with the same port to create a conflict.
// The new pod should cause conflict and be reported.
expected := api.BoundPod{
Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 81}}}}},
}
pods = append(pods, expected)
if actual := getHostPortConflicts(pods); !reflect.DeepEqual(actual, []api.BoundPod{expected}) {
t.Errorf("expected %#v, Got %#v", expected, actual)
}
}

// Tests that we handle port conflicts correctly by setting the failed status in status map.
func TestHandlePortConflicts(t *testing.T) {
kl, _, _ := newTestKubelet(t)
spec := api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}}
var pods = []api.BoundPod{
pods := []api.BoundPod{
{
ObjectMeta: api.ObjectMeta{
UID: "123456789",
Expand All @@ -3115,12 +3109,48 @@ func TestFilterHostPortConflicts(t *testing.T) {
// Make sure the BoundPods are in the reverse order of creation time.
pods[1].CreationTimestamp = util.NewTime(time.Now())
pods[0].CreationTimestamp = util.NewTime(time.Now().Add(1 * time.Second))
filteredPods := kubelet.filterHostPortConflicts(pods)
if len(filteredPods) != 1 {
t.Fatalf("Expected one pod. Got pods %#v", filteredPods)
// The newer pod should be rejected.
conflictedPodName := GetPodFullName(&pods[0])

kl.handleHostPortConflicts(pods)
if len(kl.podStatuses) != 1 {
t.Fatalf("expected length of status map to be 1. Got map %#v.", kl.podStatuses)
}
// Check pod status stored in the status map.
status, ok := kl.podStatuses[conflictedPodName]
if !ok {
t.Fatalf("status of pod %q is not found in the status map.", conflictedPodName)
}
if status.Phase != api.PodFailed {
t.Fatalf("expected pod status %q. Got %q.", api.PodFailed, status.Phase)
}

// Check if we can retrieve the pod status from GetPodStatus().
kl.pods = pods
status, err := kl.GetPodStatus(conflictedPodName, "")
if err != nil {
t.Fatalf("unable to retrieve pod status for pod %q: #v.", conflictedPodName, err)
}
if status.Phase != api.PodFailed {
t.Fatalf("expected pod status %q. Got %q.", api.PodFailed, status.Phase)
}
}

func TestPurgingObsoleteStatusMapEntries(t *testing.T) {
kl, _, _ := newTestKubelet(t)
pods := []api.BoundPod{
{Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}}},
{Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}}},
}
// Run once to populate the status map.
kl.handleHostPortConflicts(pods)
if len(kl.podStatuses) != 1 {
t.Fatalf("expected length of status map to be 1. Got map %#v.", kl.podStatuses)
}
if filteredPods[0].Name != "oldpod" {
t.Fatalf("Expected pod %#v. Got pod %#v", pods[1], filteredPods[0])
// Sync with empty pods so that the entry in status map will be removed.
kl.SyncPods([]api.BoundPod{}, emptyPodUIDs, time.Now())
if len(kl.podStatuses) != 0 {
t.Fatalf("expected length of status map to be 0. Got map %#v.", kl.podStatuses)
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/kubelet/runonce.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (kl *Kubelet) runOnce(pods []api.BoundPod) (results []RunPodResult, err err
if kl.dockerPuller == nil {
kl.dockerPuller = dockertools.NewDockerPuller(kl.dockerClient, kl.pullQPS, kl.pullBurst)
}
pods = kl.filterHostPortConflicts(pods)
kl.handleHostPortConflicts(pods)

ch := make(chan RunPodResult)
for i := range pods {
Expand Down
12 changes: 2 additions & 10 deletions pkg/master/pod_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,16 +213,8 @@ func (p *PodCache) computePodStatus(pod *api.Pod) (api.PodStatus, error) {
newStatus.HostIP = p.getHostAddress(nodeStatus.Addresses)
newStatus.Info = result.Status.Info
newStatus.PodIP = result.Status.PodIP
if newStatus.Info == nil {
// There is a small race window that kubelet couldn't
// propulated the status yet. This should go away once
// we removed boundPods
newStatus.Phase = api.PodPending
newStatus.Conditions = append(newStatus.Conditions, pod.Status.Conditions...)
} else {
newStatus.Phase = result.Status.Phase
newStatus.Conditions = result.Status.Conditions
}
newStatus.Phase = result.Status.Phase
newStatus.Conditions = result.Status.Conditions
}
return newStatus, err
}
Expand Down

0 comments on commit 3ccdb8d

Please sign in to comment.