Skip to content

Commit

Permalink
Merge pull request kubernetes#21857 from nikhiljindal/stopDeployment
Browse files Browse the repository at this point in the history
fix deployment e2e flake: Update DeploymentReaper.Stop to use ObservedGeneration
  • Loading branch information
bgrant0607 committed Feb 24, 2016
2 parents cf98bcb + 2a538e3 commit 0b5edab
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 55 deletions.
19 changes: 17 additions & 2 deletions pkg/controller/deployment/deployment_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,9 +420,12 @@ func (dc *DeploymentController) syncDeployment(key string) error {
d := *obj.(*extensions.Deployment)

if d.Spec.Paused {
// TODO: Implement scaling for paused deployments.
// Dont take any action for paused deployment.
// But keep the status up-to-date.
// Ignore paused deployments
glog.V(4).Infof("Ignoring paused deployment %s/%s", d.Namespace, d.Name)
return nil
glog.V(4).Infof("Updating status only for paused deployment %s/%s", d.Namespace, d.Name)
return dc.syncPausedDeploymentStatus(&d)
}
if d.Spec.RollbackTo != nil {
revision := d.Spec.RollbackTo.Revision
Expand All @@ -440,6 +443,18 @@ func (dc *DeploymentController) syncDeployment(key string) error {
return fmt.Errorf("unexpected deployment strategy type: %s", d.Spec.Strategy.Type)
}

// Updates the status of a paused deployment
func (dc *DeploymentController) syncPausedDeploymentStatus(deployment *extensions.Deployment) error {
newRS, oldRSs, err := dc.getAllReplicaSets(*deployment, false)
if err != nil {
return err
}
allRSs := append(controller.FilterActiveReplicaSets(oldRSs), newRS)

// Sync deployment status
return dc.syncDeploymentStatus(allRSs, newRS, *deployment)
}

// Rolling back to a revision; no-op if the toRevision is deployment's current revision
func (dc *DeploymentController) rollback(deployment *extensions.Deployment, toRevision *int64) (*extensions.Deployment, error) {
newRS, allOldRSs, err := dc.getAllReplicaSets(*deployment, true)
Expand Down
45 changes: 10 additions & 35 deletions pkg/kubectl/stop.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/util"
deploymentutil "k8s.io/kubernetes/pkg/util/deployment"
utilerrors "k8s.io/kubernetes/pkg/util/errors"
"k8s.io/kubernetes/pkg/util/wait"
)
Expand Down Expand Up @@ -367,52 +368,25 @@ func (reaper *DeploymentReaper) Stop(namespace, name string, timeout time.Durati
// TODO replace with patch when available: https://github.com/kubernetes/kubernetes/issues/20527
d.Spec.RevisionHistoryLimit = util.IntPtr(0)
d.Spec.Replicas = 0
// TODO: un-pausing should not be necessary, remove when this is fixed:
// https://github.com/kubernetes/kubernetes/issues/20966
// Instead deployment should be Paused at this point and not at next TODO.
d.Spec.Paused = false
d.Spec.Paused = true
})
if err != nil {
return err
}

// wait for total no of pods drop to 0
if err := wait.Poll(reaper.pollInterval, reaper.timeout, func() (bool, error) {
curr, err := deployments.Get(name)
// if deployment was not found it must have been deleted, error out
if err != nil && errors.IsNotFound(err) {
return false, err
}
// if other errors happen, retry
if err != nil {
return false, nil
}
// check if deployment wasn't recreated with the same name
// TODO use generations when deployment will have them
if curr.UID != deployment.UID {
return false, errors.NewNotFound(extensions.Resource("Deployment"), name)
}
return curr.Status.Replicas == 0, nil
}); err != nil {
return err
}

// TODO: When deployments will allow running cleanup policy while being
// paused, move pausing to above update operation. Without it, we need to
// pause deployment before stopping RSs, to prevent creating new RSs.
// See https://github.com/kubernetes/kubernetes/issues/20966
deployment, err = reaper.updateDeploymentWithRetries(namespace, name, func(d *extensions.Deployment) {
d.Spec.Paused = true
})
if err != nil {
// Use observedGeneration to determine if the deployment controller noticed the pause.
if err := deploymentutil.WaitForObservedDeployment(func() (*extensions.Deployment, error) {
return deployments.Get(name)
}, deployment.Generation, 10*time.Millisecond, 1*time.Minute); err != nil {
return err
}

// remove remaining RSs
// Stop all replica sets.
selector, err := unversioned.LabelSelectorAsSelector(deployment.Spec.Selector)
if err != nil {
return err
}

options := api.ListOptions{LabelSelector: selector}
rsList, err := replicaSets.List(options)
if err != nil {
Expand All @@ -430,7 +404,8 @@ func (reaper *DeploymentReaper) Stop(namespace, name string, timeout time.Durati
return utilerrors.NewAggregate(errList)
}

// and finally deployment
// Delete deployment at the end.
// Note: We delete deployment at the end so that if removing RSs fails, we atleast have the deployment to retry.
return deployments.Delete(name, gracePeriod)
}

Expand Down
10 changes: 4 additions & 6 deletions pkg/kubectl/stop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,8 +538,7 @@ func TestDeploymentStop(t *testing.T) {
},
StopError: nil,
ExpectedActions: []string{"get:deployments", "update:deployments",
"get:deployments", "get:deployments", "update:deployments",
"list:replicasets", "delete:deployments"},
"get:deployments", "list:replicasets", "delete:deployments"},
},
{
Name: "Deployment with single replicaset",
Expand All @@ -561,10 +560,9 @@ func TestDeploymentStop(t *testing.T) {
},
StopError: nil,
ExpectedActions: []string{"get:deployments", "update:deployments",
"get:deployments", "get:deployments", "update:deployments",
"list:replicasets", "get:replicasets", "get:replicasets",
"update:replicasets", "get:replicasets", "get:replicasets",
"delete:replicasets", "delete:deployments"},
"get:deployments", "list:replicasets", "get:replicasets",
"get:replicasets", "update:replicasets", "get:replicasets",
"get:replicasets", "delete:replicasets", "delete:deployments"},
},
}

Expand Down
13 changes: 13 additions & 0 deletions pkg/util/deployment/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,3 +436,16 @@ func NewRSNewReplicas(deployment *extensions.Deployment, allRSs []*extensions.Re
return 0, fmt.Errorf("deployment type %v isn't supported", deployment.Spec.Strategy.Type)
}
}

// Polls for deployment to be updated so that deployment.Status.ObservedGeneration >= desiredGeneration.
// Returns error if polling timesout.
func WaitForObservedDeployment(getDeploymentFunc func() (*extensions.Deployment, error), desiredGeneration int64, interval, timeout time.Duration) error {
// TODO: This should take clientset.Interface when all code is updated to use clientset. Keeping it this way allows the function to be used by callers who have client.Interface.
return wait.Poll(interval, timeout, func() (bool, error) {
deployment, err := getDeploymentFunc()
if err != nil {
return false, err
}
return deployment.Status.ObservedGeneration >= desiredGeneration, nil
})
}
4 changes: 2 additions & 2 deletions test/e2e/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ func testPausedDeployment(f *Framework) {
Expect(err).NotTo(HaveOccurred())

// Use observedGeneration to determine if the controller noticed the resume.
err = waitForObservedDeployment(c, ns, deploymentName)
err = waitForObservedDeployment(c, ns, deploymentName, deployment.Generation)
Expect(err).NotTo(HaveOccurred())

selector, err := unversioned.LabelSelectorAsSelector(deployment.Spec.Selector)
Expand All @@ -570,7 +570,7 @@ func testPausedDeployment(f *Framework) {
Expect(err).NotTo(HaveOccurred())

// Use observedGeneration to determine if the controller noticed the pause.
err = waitForObservedDeployment(c, ns, deploymentName)
err = waitForObservedDeployment(c, ns, deploymentName, deployment.Generation)
Expect(err).NotTo(HaveOccurred())

newRS, err := deploymentutil.GetNewReplicaSet(*deployment, c)
Expand Down
14 changes: 4 additions & 10 deletions test/e2e/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -2224,16 +2224,6 @@ func waitForDeploymentOldRSsNum(c *clientset.Clientset, ns, deploymentName strin
})
}

func waitForObservedDeployment(c *clientset.Clientset, ns, deploymentName string) error {
return wait.Poll(poll, 1*time.Minute, func() (bool, error) {
deployment, err := c.Extensions().Deployments(ns).Get(deploymentName)
if err != nil {
return false, err
}
return deployment.Generation == deployment.Status.ObservedGeneration, nil
})
}

func logReplicaSetsOfDeployment(deployment *extensions.Deployment, allOldRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet) {
Logf("Deployment = %+v", deployment)
for i := range allOldRSs {
Expand All @@ -2242,6 +2232,10 @@ func logReplicaSetsOfDeployment(deployment *extensions.Deployment, allOldRSs []*
Logf("New ReplicaSet of deployment %s: %+v", deployment.Name, newRS)
}

func waitForObservedDeployment(c *clientset.Clientset, ns, deploymentName string, desiredGeneration int64) error {
return deploymentutil.WaitForObservedDeployment(func() (*extensions.Deployment, error) { return c.Extensions().Deployments(ns).Get(deploymentName) }, desiredGeneration, poll, 1*time.Minute)
}

func logPodsOfReplicaSets(c clientset.Interface, rss []*extensions.ReplicaSet, minReadySeconds int) {
allPods, err := deploymentutil.GetPodsForReplicaSets(c, rss)
if err == nil {
Expand Down

0 comments on commit 0b5edab

Please sign in to comment.