Skip to content

Commit

Permalink
Retry Pod/RC updates in kubectl rolling-update
Browse files Browse the repository at this point in the history
  • Loading branch information
janetkuo committed Jun 21, 2016
1 parent 62ce669 commit ee81e5e
Show file tree
Hide file tree
Showing 5 changed files with 168 additions and 113 deletions.
56 changes: 0 additions & 56 deletions pkg/client/unversioned/conditions.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package unversioned

import (
"fmt"
"time"

"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
Expand All @@ -29,61 +28,6 @@ import (
"k8s.io/kubernetes/pkg/watch"
)

// DefaultRetry is the recommended retry for a conflict where multiple clients
// are making changes to the same resource.
var DefaultRetry = wait.Backoff{
Steps: 5,
Duration: 10 * time.Millisecond,
Factor: 1.0,
Jitter: 0.1,
}

// DefaultBackoff is the recommended backoff for a conflict where a client
// may be attempting to make an unrelated modification to a resource under
// active management by one or more controllers.
var DefaultBackoff = wait.Backoff{
Steps: 4,
Duration: 10 * time.Millisecond,
Factor: 5.0,
Jitter: 0.1,
}

// RetryConflict executes the provided function repeatedly, retrying if the server returns a conflicting
// write. Callers should preserve previous executions if they wish to retry changes. It performs an
// exponential backoff.
//
// var pod *api.Pod
// err := RetryOnConflict(DefaultBackoff, func() (err error) {
// pod, err = c.Pods("mynamespace").UpdateStatus(podStatus)
// return
// })
// if err != nil {
// // may be conflict if max retries were hit
// return err
// }
// ...
//
// TODO: Make Backoff an interface?
func RetryOnConflict(backoff wait.Backoff, fn func() error) error {
var lastConflictErr error
err := wait.ExponentialBackoff(backoff, func() (bool, error) {
err := fn()
switch {
case err == nil:
return true, nil
case errors.IsConflict(err):
lastConflictErr = err
return false, nil
default:
return false, err
}
})
if err == wait.ErrWaitTimeout {
err = lastConflictErr
}
return err
}

// ControllerHasDesiredReplicas returns a condition that will be true if and only if
// the desired replica count for a controller's ReplicaSelector equals the Replicas count.
func ControllerHasDesiredReplicas(c Interface, controller *api.ReplicationController) wait.ConditionFunc {
Expand Down
79 changes: 79 additions & 0 deletions pkg/client/unversioned/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package unversioned

import (
"time"

"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/util/wait"
)

// DefaultRetry is the recommended retry for a conflict where multiple clients
// are making changes to the same resource.
var DefaultRetry = wait.Backoff{
Steps: 5,
Duration: 10 * time.Millisecond,
Factor: 1.0,
Jitter: 0.1,
}

// DefaultBackoff is the recommended backoff for a conflict where a client
// may be attempting to make an unrelated modification to a resource under
// active management by one or more controllers.
var DefaultBackoff = wait.Backoff{
Steps: 4,
Duration: 10 * time.Millisecond,
Factor: 5.0,
Jitter: 0.1,
}

// RetryConflict executes the provided function repeatedly, retrying if the server returns a conflicting
// write. Callers should preserve previous executions if they wish to retry changes. It performs an
// exponential backoff.
//
// var pod *api.Pod
// err := RetryOnConflict(DefaultBackoff, func() (err error) {
// pod, err = c.Pods("mynamespace").UpdateStatus(podStatus)
// return
// })
// if err != nil {
// // may be conflict if max retries were hit
// return err
// }
// ...
//
// TODO: Make Backoff an interface?
func RetryOnConflict(backoff wait.Backoff, fn func() error) error {
var lastConflictErr error
err := wait.ExponentialBackoff(backoff, func() (bool, error) {
err := fn()
switch {
case err == nil:
return true, nil
case errors.IsConflict(err):
lastConflictErr = err
return false, nil
default:
return false, err
}
})
if err == wait.ErrWaitTimeout {
err = lastConflictErr
}
return err
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down
132 changes: 82 additions & 50 deletions pkg/kubectl/rolling_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,15 +187,16 @@ func (r *RollingUpdater) Update(config *RollingUpdaterConfig) error {
if err != nil {
return err
}
if existing.Annotations == nil {
existing.Annotations = map[string]string{}
originReplicas := strconv.Itoa(int(existing.Spec.Replicas))
applyUpdate := func(rc *api.ReplicationController) {
if rc.Annotations == nil {
rc.Annotations = map[string]string{}
}
rc.Annotations[originalReplicasAnnotation] = originReplicas
}
existing.Annotations[originalReplicasAnnotation] = strconv.Itoa(int(existing.Spec.Replicas))
updated, err := r.c.ReplicationControllers(existing.Namespace).Update(existing)
if err != nil {
if oldRc, err = updateRcWithRetries(r.c, existing.Namespace, existing, applyUpdate); err != nil {
return err
}
oldRc = updated
}
// maxSurge is the maximum scaling increment and maxUnavailable are the maximum pods
// that can be unavailable during a rollout.
Expand Down Expand Up @@ -482,13 +483,14 @@ func (r *RollingUpdater) cleanupWithClients(oldRc, newRc *api.ReplicationControl
if err != nil {
return err
}
delete(newRc.Annotations, sourceIdAnnotation)
delete(newRc.Annotations, desiredReplicasAnnotation)

newRc, err = r.c.ReplicationControllers(r.ns).Update(newRc)
if err != nil {
applyUpdate := func(rc *api.ReplicationController) {
delete(rc.Annotations, sourceIdAnnotation)
delete(rc.Annotations, desiredReplicasAnnotation)
}
if newRc, err = updateRcWithRetries(r.c, r.ns, newRc, applyUpdate); err != nil {
return err
}

if err = wait.Poll(config.Interval, config.Timeout, client.ControllerHasDesiredReplicas(r.c, newRc)); err != nil {
return err
}
Expand Down Expand Up @@ -643,27 +645,29 @@ func SetNextControllerAnnotation(rc *api.ReplicationController, name string) {
}

func UpdateExistingReplicationController(c client.Interface, oldRc *api.ReplicationController, namespace, newName, deploymentKey, deploymentValue string, out io.Writer) (*api.ReplicationController, error) {
SetNextControllerAnnotation(oldRc, newName)
if _, found := oldRc.Spec.Selector[deploymentKey]; !found {
SetNextControllerAnnotation(oldRc, newName)
return AddDeploymentKeyToReplicationController(oldRc, c, deploymentKey, deploymentValue, namespace, out)
} else {
// If we didn't need to update the controller for the deployment key, we still need to write
// the "next" controller.
return c.ReplicationControllers(namespace).Update(oldRc)
applyUpdate := func(rc *api.ReplicationController) {
SetNextControllerAnnotation(rc, newName)
}
return updateRcWithRetries(c, namespace, oldRc, applyUpdate)
}
}

const MaxRetries = 3

func AddDeploymentKeyToReplicationController(oldRc *api.ReplicationController, client client.Interface, deploymentKey, deploymentValue, namespace string, out io.Writer) (*api.ReplicationController, error) {
var err error
// First, update the template label. This ensures that any newly created pods will have the new label
if oldRc, err = updateWithRetries(client.ReplicationControllers(namespace), oldRc, func(rc *api.ReplicationController) {
applyUpdate := func(rc *api.ReplicationController) {
if rc.Spec.Template.Labels == nil {
rc.Spec.Template.Labels = map[string]string{}
}
rc.Spec.Template.Labels[deploymentKey] = deploymentValue
}); err != nil {
}
if oldRc, err = updateRcWithRetries(client, namespace, oldRc, applyUpdate); err != nil {
return nil, err
}

Expand All @@ -677,26 +681,16 @@ func AddDeploymentKeyToReplicationController(oldRc *api.ReplicationController, c
}
for ix := range podList.Items {
pod := &podList.Items[ix]
if pod.Labels == nil {
pod.Labels = map[string]string{
deploymentKey: deploymentValue,
}
} else {
pod.Labels[deploymentKey] = deploymentValue
}
err = nil
delay := 3
for i := 0; i < MaxRetries; i++ {
_, err = client.Pods(namespace).Update(pod)
if err != nil {
fmt.Fprintf(out, "Error updating pod (%v), retrying after %d seconds", err, delay)
time.Sleep(time.Second * time.Duration(delay))
delay *= delay
applyUpdate := func(p *api.Pod) {
if p.Labels == nil {
p.Labels = map[string]string{
deploymentKey: deploymentValue,
}
} else {
break
p.Labels[deploymentKey] = deploymentValue
}
}
if err != nil {
if pod, err = updatePodWithRetries(client, namespace, pod, applyUpdate); err != nil {
return nil, err
}
}
Expand All @@ -709,12 +703,11 @@ func AddDeploymentKeyToReplicationController(oldRc *api.ReplicationController, c
for k, v := range oldRc.Spec.Selector {
selectorCopy[k] = v
}
oldRc.Spec.Selector[deploymentKey] = deploymentValue

// Update the selector of the rc so it manages all the pods we updated above
if oldRc, err = updateWithRetries(client.ReplicationControllers(namespace), oldRc, func(rc *api.ReplicationController) {
applyUpdate = func(rc *api.ReplicationController) {
rc.Spec.Selector[deploymentKey] = deploymentValue
}); err != nil {
}
// Update the selector of the rc so it manages all the pods we updated above
if oldRc, err = updateRcWithRetries(client, namespace, oldRc, applyUpdate); err != nil {
return nil, err
}

Expand All @@ -736,33 +729,72 @@ func AddDeploymentKeyToReplicationController(oldRc *api.ReplicationController, c
return oldRc, nil
}

type updateFunc func(controller *api.ReplicationController)
type updateRcFunc func(controller *api.ReplicationController)

// updateWithRetries updates applies the given rc as an update.
func updateWithRetries(rcClient client.ReplicationControllerInterface, rc *api.ReplicationController, applyUpdate updateFunc) (*api.ReplicationController, error) {
var err error
oldRc := rc
err = wait.Poll(10*time.Millisecond, 1*time.Minute, func() (bool, error) {
// updateRcWithRetries retries updating the given rc on conflict with the following steps:
// 1. Get latest resource
// 2. applyUpdate
// 3. Update the resource
func updateRcWithRetries(c client.Interface, namespace string, rc *api.ReplicationController, applyUpdate updateRcFunc) (*api.ReplicationController, error) {
// Deep copy the rc in case we failed on Get during retry loop
obj, err := api.Scheme.Copy(rc)
if err != nil {
return nil, fmt.Errorf("failed to deep copy rc before updating it: %v", err)
}
oldRc := obj.(*api.ReplicationController)
err = client.RetryOnConflict(client.DefaultBackoff, func() (e error) {
// Apply the update, then attempt to push it to the apiserver.
applyUpdate(rc)
if rc, err = rcClient.Update(rc); err == nil {
if rc, e = c.ReplicationControllers(namespace).Update(rc); e == nil {
// rc contains the latest controller post update
return true, nil
return
}
updateErr := e
// Update the controller with the latest resource version, if the update failed we
// can't trust rc so use oldRc.Name.
if rc, err = rcClient.Get(oldRc.Name); err != nil {
if rc, e = c.ReplicationControllers(namespace).Get(oldRc.Name); e != nil {
// The Get failed: Value in rc cannot be trusted.
rc = oldRc
}
// The Get passed: rc contains the latest controller, expect a poll for the update.
return false, nil
// Only return the error from update
return updateErr
})
// If the error is non-nil the returned controller cannot be trusted, if it is nil, the returned
// controller contains the applied update.
return rc, err
}

type updatePodFunc func(controller *api.Pod)

// updatePodWithRetries retries updating the given pod on conflict with the following steps:
// 1. Get latest resource
// 2. applyUpdate
// 3. Update the resource
func updatePodWithRetries(c client.Interface, namespace string, pod *api.Pod, applyUpdate updatePodFunc) (*api.Pod, error) {
// Deep copy the pod in case we failed on Get during retry loop
obj, err := api.Scheme.Copy(pod)
if err != nil {
return nil, fmt.Errorf("failed to deep copy pod before updating it: %v", err)
}
oldPod := obj.(*api.Pod)
err = client.RetryOnConflict(client.DefaultBackoff, func() (e error) {
// Apply the update, then attempt to push it to the apiserver.
applyUpdate(pod)
if pod, e = c.Pods(namespace).Update(pod); e == nil {
return
}
updateErr := e
if pod, e = c.Pods(namespace).Get(oldPod.Name); e != nil {
pod = oldPod
}
// Only return the error from update
return updateErr
})
// If the error is non-nil the returned pod cannot be trusted, if it is nil, the returned
// controller contains the applied update.
return pod, err
}

func FindSourceController(r client.ReplicationControllersNamespacer, namespace, name string) (*api.ReplicationController, error) {
list, err := r.ReplicationControllers(namespace).List(api.ListOptions{})
if err != nil {
Expand Down
Loading

0 comments on commit ee81e5e

Please sign in to comment.