Skip to content

Commit

Permalink
fix updatePod of replication controller manager and replica set contr…
Browse files Browse the repository at this point in the history
…oller to

handle pod label updates that match no rc or rs
  • Loading branch information
Chao Xu committed Jun 15, 2016
1 parent 77419c4 commit 63fb075
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 25 deletions.
18 changes: 10 additions & 8 deletions pkg/controller/replicaset/replica_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,24 +326,26 @@ func (rsc *ReplicaSetController) updatePod(old, cur interface{}) {
}
curPod := cur.(*api.Pod)
oldPod := old.(*api.Pod)
glog.V(4).Infof("Pod %s updated %+v -> %+v.", curPod.Name, oldPod, curPod)
rs := rsc.getPodReplicaSet(curPod)
if rs == nil {
return
}

glog.V(4).Infof("Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta)
labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels)
if curPod.DeletionTimestamp != nil {
// when a pod is deleted gracefully it's deletion timestamp is first modified to reflect a grace period,
// and after such time has passed, the kubelet actually deletes it from the store. We receive an update
// for modification of the deletion timestamp and expect an rs to create more replicas asap, not wait
// until the kubelet actually deletes the pod. This is different from the Phase of a pod changing, because
// an rs never initiates a phase change, and so is never asleep waiting for the same.
rsc.deletePod(curPod)
if labelChanged {
// we don't need to check the oldPod.DeletionTimestamp because DeletionTimestamp cannot be unset.
rsc.deletePod(oldPod)
}
return
}

rsc.enqueueReplicaSet(rs)
if !reflect.DeepEqual(curPod.Labels, oldPod.Labels) {
if rs := rsc.getPodReplicaSet(curPod); rs != nil {
rsc.enqueueReplicaSet(rs)
}
if labelChanged {
// If the old and new ReplicaSet are the same, the first one that syncs
// will set expectations preventing any damage from the second.
if oldRS := rsc.getPodReplicaSet(oldPod); oldRS != nil {
Expand Down
26 changes: 21 additions & 5 deletions pkg/controller/replicaset/replica_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,15 +547,13 @@ func TestUpdatePods(t *testing.T) {
testRSSpec2.Name = "barfoo"
manager.rsStore.Store.Add(&testRSSpec2)

// Put one pod in the podStore
// case 1: We put in the podStore a pod with labels matching testRSSpec1,
// then update its labels to match testRSSpec2. We expect to receive a sync
// request for both replica sets.
pod1 := newPodList(manager.podStore.Indexer, 1, api.PodRunning, labelMap1, testRSSpec1, "pod").Items[0]
pod2 := pod1
pod2.Labels = labelMap2

// Send an update of the same pod with modified labels, and confirm we get a sync request for
// both controllers
manager.updatePod(&pod1, &pod2)

expected := sets.NewString(testRSSpec1.Name, testRSSpec2.Name)
for _, name := range expected.List() {
t.Logf("Expecting update for %+v", name)
Expand All @@ -568,6 +566,24 @@ func TestUpdatePods(t *testing.T) {
t.Errorf("Expected update notifications for replica sets within 100ms each")
}
}

// case 2: pod1 in the podStore has labels matching testRSSpec1. We update
// its labels to match no replica set. We expect to receive a sync request
// for testRSSpec1.
pod2.Labels = make(map[string]string)
manager.updatePod(&pod1, &pod2)
expected = sets.NewString(testRSSpec1.Name)
for _, name := range expected.List() {
t.Logf("Expecting update for %+v", name)
select {
case got := <-received:
if !expected.Has(got) {
t.Errorf("Expected keys %#v got %v", expected, got)
}
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("Expected update notifications for replica sets within 100ms each")
}
}
}

func TestControllerUpdateRequeue(t *testing.T) {
Expand Down
18 changes: 11 additions & 7 deletions pkg/controller/replication/replication_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,24 +353,28 @@ func (rm *ReplicationManager) updatePod(old, cur interface{}) {
return
}
curPod := cur.(*api.Pod)
rc := rm.getPodController(curPod)
if rc == nil {
return
}
oldPod := old.(*api.Pod)

glog.V(4).Infof("Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta)
labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels)
if curPod.DeletionTimestamp != nil {
// when a pod is deleted gracefully it's deletion timestamp is first modified to reflect a grace period,
// and after such time has passed, the kubelet actually deletes it from the store. We receive an update
// for modification of the deletion timestamp and expect an rc to create more replicas asap, not wait
// until the kubelet actually deletes the pod. This is different from the Phase of a pod changing, because
// an rc never initiates a phase change, and so is never asleep waiting for the same.
rm.deletePod(curPod)
if labelChanged {
// we don't need to check the oldPod.DeletionTimestamp because DeletionTimestamp cannot be unset.
rm.deletePod(oldPod)
}
return
}
rm.enqueueController(rc)

if rc := rm.getPodController(curPod); rc != nil {
rm.enqueueController(rc)
}
// Only need to get the old controller if the labels changed.
if !reflect.DeepEqual(curPod.Labels, oldPod.Labels) {
if labelChanged {
// If the old and new rc are the same, the first one that syncs
// will set expectations preventing any damage from the second.
if oldRC := rm.getPodController(oldPod); oldRC != nil {
Expand Down
26 changes: 21 additions & 5 deletions pkg/controller/replication/replication_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,15 +531,13 @@ func TestUpdatePods(t *testing.T) {
testControllerSpec2.Name = "barfoo"
manager.rcStore.Indexer.Add(&testControllerSpec2)

// Put one pod in the podStore
// case 1: We put in the podStore a pod with labels matching
// testControllerSpec1, then update its labels to match testControllerSpec2.
// We expect to receive a sync request for both controllers.
pod1 := newPodList(manager.podStore.Indexer, 1, api.PodRunning, testControllerSpec1, "pod").Items[0]
pod2 := pod1
pod2.Labels = testControllerSpec2.Spec.Selector

// Send an update of the same pod with modified labels, and confirm we get a sync request for
// both controllers
manager.updatePod(&pod1, &pod2)

expected := sets.NewString(testControllerSpec1.Name, testControllerSpec2.Name)
for _, name := range expected.List() {
t.Logf("Expecting update for %+v", name)
Expand All @@ -552,6 +550,24 @@ func TestUpdatePods(t *testing.T) {
t.Errorf("Expected update notifications for controllers within 100ms each")
}
}

// case 2: pod1 in the podStore has labels matching testControllerSpec1.
// We update its labels to match no replication controller. We expect to
// receive a sync request for testControllerSpec1.
pod2.Labels = make(map[string]string)
manager.updatePod(&pod1, &pod2)
expected = sets.NewString(testControllerSpec1.Name)
for _, name := range expected.List() {
t.Logf("Expecting update for %+v", name)
select {
case got := <-received:
if !expected.Has(got) {
t.Errorf("Expected keys %#v got %v", expected, got)
}
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("Expected update notifications for controllers within 100ms each")
}
}
}

func TestControllerUpdateRequeue(t *testing.T) {
Expand Down

0 comments on commit 63fb075

Please sign in to comment.