From 7e50fa6df00fd9d2d5b850b3f8bf514f51436428 Mon Sep 17 00:00:00 2001 From: nikhiljindal Date: Tue, 23 Feb 2016 20:27:24 -0800 Subject: [PATCH 1/3] Update DeploymentReaper.Stop to use ObservedGeneration to remove race condition --- pkg/kubectl/stop.go | 45 ++++++++----------------------- pkg/kubectl/stop_test.go | 10 +++---- pkg/util/deployment/deployment.go | 13 +++++++++ test/e2e/deployment.go | 4 +-- test/e2e/util.go | 14 +++------- 5 files changed, 34 insertions(+), 52 deletions(-) diff --git a/pkg/kubectl/stop.go b/pkg/kubectl/stop.go index 0cf025b0dea6c..a6fd7c7ae7880 100644 --- a/pkg/kubectl/stop.go +++ b/pkg/kubectl/stop.go @@ -30,6 +30,7 @@ import ( client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/util" + deploymentutil "k8s.io/kubernetes/pkg/util/deployment" utilerrors "k8s.io/kubernetes/pkg/util/errors" "k8s.io/kubernetes/pkg/util/wait" ) @@ -368,52 +369,30 @@ func (reaper *DeploymentReaper) Stop(namespace, name string, timeout time.Durati zero := 0 d.Spec.RevisionHistoryLimit = &zero d.Spec.Replicas = 0 - // TODO: un-pausing should not be necessary, remove when this is fixed: - // https://github.com/kubernetes/kubernetes/issues/20966 - // Instead deployment should be Paused at this point and not at next TODO. - d.Spec.Paused = false + d.Spec.Paused = true }) if err != nil { return err } - // wait for total no of pods drop to 0 - if err := wait.Poll(reaper.pollInterval, reaper.timeout, func() (bool, error) { - curr, err := deployments.Get(name) - // if deployment was not found it must have been deleted, error out - if err != nil && errors.IsNotFound(err) { - return false, err - } - // if other errors happen, retry - if err != nil { - return false, nil - } - // check if deployment wasn't recreated with the same name - // TODO use generations when deployment will have them - if curr.UID != deployment.UID { - return false, errors.NewNotFound(extensions.Resource("Deployment"), name) - } - return curr.Status.Replicas == 0, nil - }); err != nil { + // Use observedGeneration to determine if the deployment controller noticed the pause. + if err := deploymentutil.WaitForObservedDeployment(func() (*extensions.Deployment, error) { + return deployments.Get(name) + }, deployment.Generation, 10*time.Millisecond, 1*time.Minute); err != nil { return err } - // TODO: When deployments will allow running cleanup policy while being - // paused, move pausing to above update operation. Without it, we need to - // pause deployment before stopping RSs, to prevent creating new RSs. - // See https://github.com/kubernetes/kubernetes/issues/20966 - deployment, err = reaper.updateDeploymentWithRetries(namespace, name, func(d *extensions.Deployment) { - d.Spec.Paused = true - }) - if err != nil { + // Delete deployment. + if err := deployments.Delete(name, gracePeriod); err != nil { return err } - // remove remaining RSs + // Stop all replica sets. selector, err := unversioned.LabelSelectorAsSelector(deployment.Spec.Selector) if err != nil { return err } + options := api.ListOptions{LabelSelector: selector} rsList, err := replicaSets.List(options) if err != nil { @@ -430,9 +409,7 @@ func (reaper *DeploymentReaper) Stop(namespace, name string, timeout time.Durati if len(errList) > 0 { return utilerrors.NewAggregate(errList) } - - // and finally deployment - return deployments.Delete(name, gracePeriod) + return nil } type updateDeploymentFunc func(d *extensions.Deployment) diff --git a/pkg/kubectl/stop_test.go b/pkg/kubectl/stop_test.go index 32a5f52d0104a..ceee15fed6210 100644 --- a/pkg/kubectl/stop_test.go +++ b/pkg/kubectl/stop_test.go @@ -538,8 +538,7 @@ func TestDeploymentStop(t *testing.T) { }, StopError: nil, ExpectedActions: []string{"get:deployments", "update:deployments", - "get:deployments", "get:deployments", "update:deployments", - "list:replicasets", "delete:deployments"}, + "get:deployments", "delete:deployments", "list:replicasets"}, }, { Name: "Deployment with single replicaset", @@ -561,10 +560,9 @@ func TestDeploymentStop(t *testing.T) { }, StopError: nil, ExpectedActions: []string{"get:deployments", "update:deployments", - "get:deployments", "get:deployments", "update:deployments", - "list:replicasets", "get:replicasets", "get:replicasets", - "update:replicasets", "get:replicasets", "get:replicasets", - "delete:replicasets", "delete:deployments"}, + "get:deployments", "delete:deployments", "list:replicasets", + "get:replicasets", "get:replicasets", "update:replicasets", + "get:replicasets", "get:replicasets", "delete:replicasets"}, }, } diff --git a/pkg/util/deployment/deployment.go b/pkg/util/deployment/deployment.go index 0ee24eed7d563..0392cff279ff5 100644 --- a/pkg/util/deployment/deployment.go +++ b/pkg/util/deployment/deployment.go @@ -427,3 +427,16 @@ func NewRSNewReplicas(deployment *extensions.Deployment, allRSs []*extensions.Re return 0, fmt.Errorf("deployment type %v isn't supported", deployment.Spec.Strategy.Type) } } + +// Polls for deployment to be updated so that deployment.Status.ObservedGeneration >= desiredGeneration. +// Returns error if polling timesout. +func WaitForObservedDeployment(getDeploymentFunc func() (*extensions.Deployment, error), desiredGeneration int64, interval, timeout time.Duration) error { + // TODO: This should take clientset.Interface when all code is updated to use clientset. Keeping it this way allows the function to be used by callers who have client.Interface. + return wait.Poll(interval, timeout, func() (bool, error) { + deployment, err := getDeploymentFunc() + if err != nil { + return false, err + } + return deployment.Status.ObservedGeneration >= desiredGeneration, nil + }) +} diff --git a/test/e2e/deployment.go b/test/e2e/deployment.go index 4669c31bb1780..bdddff3f667fc 100644 --- a/test/e2e/deployment.go +++ b/test/e2e/deployment.go @@ -543,7 +543,7 @@ func testPausedDeployment(f *Framework) { Expect(err).NotTo(HaveOccurred()) // Use observedGeneration to determine if the controller noticed the resume. - err = waitForObservedDeployment(c, ns, deploymentName) + err = waitForObservedDeployment(c, ns, deploymentName, deployment.Generation) Expect(err).NotTo(HaveOccurred()) selector, err := unversioned.LabelSelectorAsSelector(deployment.Spec.Selector) @@ -570,7 +570,7 @@ func testPausedDeployment(f *Framework) { Expect(err).NotTo(HaveOccurred()) // Use observedGeneration to determine if the controller noticed the pause. - err = waitForObservedDeployment(c, ns, deploymentName) + err = waitForObservedDeployment(c, ns, deploymentName, deployment.Generation) Expect(err).NotTo(HaveOccurred()) newRS, err := deploymentutil.GetNewReplicaSet(*deployment, c) diff --git a/test/e2e/util.go b/test/e2e/util.go index f25877e7d0c6f..146166a3b684f 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -2224,16 +2224,6 @@ func waitForDeploymentOldRSsNum(c *clientset.Clientset, ns, deploymentName strin }) } -func waitForObservedDeployment(c *clientset.Clientset, ns, deploymentName string) error { - return wait.Poll(poll, 1*time.Minute, func() (bool, error) { - deployment, err := c.Extensions().Deployments(ns).Get(deploymentName) - if err != nil { - return false, err - } - return deployment.Generation == deployment.Status.ObservedGeneration, nil - }) -} - func logReplicaSetsOfDeployment(deployment *extensions.Deployment, allOldRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet) { Logf("Deployment = %+v", deployment) for i := range allOldRSs { @@ -2242,6 +2232,10 @@ func logReplicaSetsOfDeployment(deployment *extensions.Deployment, allOldRSs []* Logf("New ReplicaSet of deployment %s: %+v", deployment.Name, newRS) } +func waitForObservedDeployment(c *clientset.Clientset, ns, deploymentName string, desiredGeneration int64) error { + return deploymentutil.WaitForObservedDeployment(func() (*extensions.Deployment, error) { return c.Extensions().Deployments(ns).Get(deploymentName) }, desiredGeneration, poll, 1*time.Minute) +} + func logPodsOfReplicaSets(c clientset.Interface, rss []*extensions.ReplicaSet, minReadySeconds int) { allPods, err := deploymentutil.GetPodsForReplicaSets(c, rss) if err == nil { From 9d7259950b69942ff2ee07ecf00224b8fd87abac Mon Sep 17 00:00:00 2001 From: nikhiljindal Date: Wed, 24 Feb 2016 00:07:46 -0800 Subject: [PATCH 2/3] Update status for paused deployments --- .../deployment/deployment_controller.go | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/pkg/controller/deployment/deployment_controller.go b/pkg/controller/deployment/deployment_controller.go index 8b6a42058077c..0b7f6e7e0aa4f 100644 --- a/pkg/controller/deployment/deployment_controller.go +++ b/pkg/controller/deployment/deployment_controller.go @@ -420,9 +420,12 @@ func (dc *DeploymentController) syncDeployment(key string) error { d := *obj.(*extensions.Deployment) if d.Spec.Paused { + // TODO: Implement scaling for paused deployments. + // Dont take any action for paused deployment. + // But keep the status up-to-date. // Ignore paused deployments - glog.V(4).Infof("Ignoring paused deployment %s/%s", d.Namespace, d.Name) - return nil + glog.V(4).Infof("Updating status only for paused deployment %s/%s", d.Namespace, d.Name) + return dc.syncPausedDeploymentStatus(&d) } if d.Spec.RollbackTo != nil { revision := d.Spec.RollbackTo.Revision @@ -440,6 +443,18 @@ func (dc *DeploymentController) syncDeployment(key string) error { return fmt.Errorf("unexpected deployment strategy type: %s", d.Spec.Strategy.Type) } +// Updates the status of a paused deployment +func (dc *DeploymentController) syncPausedDeploymentStatus(deployment *extensions.Deployment) error { + newRS, oldRSs, err := dc.getAllReplicaSets(*deployment, false) + if err != nil { + return err + } + allRSs := append(controller.FilterActiveReplicaSets(oldRSs), newRS) + + // Sync deployment status + return dc.syncDeploymentStatus(allRSs, newRS, *deployment) +} + // Rolling back to a revision; no-op if the toRevision is deployment's current revision func (dc *DeploymentController) rollback(deployment *extensions.Deployment, toRevision *int64) (*extensions.Deployment, error) { newRS, allOldRSs, err := dc.getAllReplicaSets(*deployment, true) From 2a538e317fdcefed3e0866a29056bae915319b9e Mon Sep 17 00:00:00 2001 From: nikhiljindal Date: Wed, 24 Feb 2016 13:00:45 -0800 Subject: [PATCH 3/3] Moving deployment deletion at the end --- pkg/kubectl/stop.go | 10 ++++------ pkg/kubectl/stop_test.go | 8 ++++---- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/pkg/kubectl/stop.go b/pkg/kubectl/stop.go index a6fd7c7ae7880..ff97e4ff376e5 100644 --- a/pkg/kubectl/stop.go +++ b/pkg/kubectl/stop.go @@ -382,11 +382,6 @@ func (reaper *DeploymentReaper) Stop(namespace, name string, timeout time.Durati return err } - // Delete deployment. - if err := deployments.Delete(name, gracePeriod); err != nil { - return err - } - // Stop all replica sets. selector, err := unversioned.LabelSelectorAsSelector(deployment.Spec.Selector) if err != nil { @@ -409,7 +404,10 @@ func (reaper *DeploymentReaper) Stop(namespace, name string, timeout time.Durati if len(errList) > 0 { return utilerrors.NewAggregate(errList) } - return nil + + // Delete deployment at the end. + // Note: We delete deployment at the end so that if removing RSs fails, we atleast have the deployment to retry. + return deployments.Delete(name, gracePeriod) } type updateDeploymentFunc func(d *extensions.Deployment) diff --git a/pkg/kubectl/stop_test.go b/pkg/kubectl/stop_test.go index ceee15fed6210..13fdc8bc19b73 100644 --- a/pkg/kubectl/stop_test.go +++ b/pkg/kubectl/stop_test.go @@ -538,7 +538,7 @@ func TestDeploymentStop(t *testing.T) { }, StopError: nil, ExpectedActions: []string{"get:deployments", "update:deployments", - "get:deployments", "delete:deployments", "list:replicasets"}, + "get:deployments", "list:replicasets", "delete:deployments"}, }, { Name: "Deployment with single replicaset", @@ -560,9 +560,9 @@ func TestDeploymentStop(t *testing.T) { }, StopError: nil, ExpectedActions: []string{"get:deployments", "update:deployments", - "get:deployments", "delete:deployments", "list:replicasets", - "get:replicasets", "get:replicasets", "update:replicasets", - "get:replicasets", "get:replicasets", "delete:replicasets"}, + "get:deployments", "list:replicasets", "get:replicasets", + "get:replicasets", "update:replicasets", "get:replicasets", + "get:replicasets", "delete:replicasets", "delete:deployments"}, }, }