Skip to content

Commit

Permalink
Add a rollingupdate lib and command to kubectl
Browse files Browse the repository at this point in the history
Also decouple conditions from client for testability.
  • Loading branch information
j3ffml committed Jan 7, 2015
1 parent ded3ef2 commit 0ab39df
Show file tree
Hide file tree
Showing 8 changed files with 576 additions and 5 deletions.
2 changes: 1 addition & 1 deletion cmd/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ func runReplicationControllerTest(c *client.Client) {
glog.Infof("Done creating replication controllers")

// Give the controllers some time to actually create the pods
if err := wait.Poll(time.Second, time.Second*30, c.ControllerHasDesiredReplicas(controller)); err != nil {
if err := wait.Poll(time.Second, time.Second*30, client.ControllerHasDesiredReplicas(c, &controller)); err != nil {
glog.Fatalf("FAILED: pods never created %v", err)
}

Expand Down
7 changes: 3 additions & 4 deletions pkg/client/conditions.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,17 @@ package client

import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait"
)

// ControllerHasDesiredReplicas returns a condition that will be true iff the desired replica count
// for a controller's ReplicaSelector equals the Replicas count.
func (c *Client) ControllerHasDesiredReplicas(controller api.ReplicationController) wait.ConditionFunc {
func ControllerHasDesiredReplicas(c Interface, controller *api.ReplicationController) wait.ConditionFunc {
return func() (bool, error) {
pods, err := c.Pods(controller.Namespace).List(labels.Set(controller.Spec.Selector).AsSelector())
ctrl, err := c.ReplicationControllers(controller.Namespace).Get(controller.Name)
if err != nil {
return false, err
}
return len(pods.Items) == controller.Spec.Replicas, nil
return ctrl.Status.Replicas == ctrl.Spec.Replicas, nil
}
}
1 change: 1 addition & 0 deletions pkg/kubectl/cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ Find more information at https://github.com/GoogleCloudPlatform/kubernetes.`,

cmds.AddCommand(NewCmdNamespace(out))
cmds.AddCommand(f.NewCmdLog(out))
cmds.AddCommand(f.NewCmdRollingUpdate(out))

if err := cmds.Execute(); err != nil {
os.Exit(1)
Expand Down
11 changes: 11 additions & 0 deletions pkg/kubectl/cmd/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"path/filepath"
"strconv"
"strings"
"time"

"github.com/golang/glog"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -82,6 +83,16 @@ func GetFlagInt(cmd *cobra.Command, flag string) int {
return v
}

func GetFlagDuration(cmd *cobra.Command, flag string) time.Duration {
f := cmd.Flags().Lookup(flag)
if f == nil {
glog.Fatalf("Flag accessed but not defined for command %s: %s", cmd.Name(), flag)
}
v, err := time.ParseDuration(f.Value.String())
checkErr(err)
return v
}

// Returns the first non-empty string out of the ones provided. If all
// strings are empty, returns an empty string.
func FirstNonEmptyString(args ...string) string {
Expand Down
109 changes: 109 additions & 0 deletions pkg/kubectl/cmd/rollingupdate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package cmd

import (
"fmt"
"io"

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubectl"
"github.com/spf13/cobra"
)

const (
updatePeriod = "1m0s"
timeout = "5m0s"
pollInterval = "3s"
)

func (f *Factory) NewCmdRollingUpdate(out io.Writer) *cobra.Command {
cmd := &cobra.Command{
Use: "rollingupdate <old-controller-name> -f <new-controller.json>",
Short: "Perform a rolling update of the given replicationController",
Long: `Perform a rolling update of the given replicationController.",
Replaces named controller with new controller, updating one pod at a time to use the
new PodTemplate. The new-controller.json must specify the same namespace as the
existing controller and overwrite at least one (common) label in its replicaSelector.
Examples:
$ kubectl rollingupdate frontend-v1 -f frontend-v2.json
<update pods of frontend-v1 using new controller data in frontend-v2.json>
$ cat frontend-v2.json | kubectl rollingupdate frontend-v1 -f -
<update pods of frontend-v1 using json data passed into stdin>`,
Run: func(cmd *cobra.Command, args []string) {
filename := GetFlagString(cmd, "filename")
if len(filename) == 0 {
usageError(cmd, "Must specify filename for new controller")
}
period := GetFlagDuration(cmd, "update-period")
interval := GetFlagDuration(cmd, "poll-interval")
timeout := GetFlagDuration(cmd, "timeout")
if len(args) != 1 {
usageError(cmd, "Must specify the controller to update")
}
oldName := args[0]
schema, err := f.Validator(cmd)
checkErr(err)
mapping, namespace, newName, data := ResourceFromFile(cmd, filename, f.Typer, f.Mapper, schema)
if mapping.Kind != "ReplicationController" {
usageError(cmd, "%s does not specify a valid ReplicationController", filename)
}
err = CompareNamespaceFromFile(cmd, namespace)
checkErr(err)

client, err := f.ClientBuilder.Client()
checkErr(err)
obj, err := mapping.Codec.Decode(data)
checkErr(err)
newRc := obj.(*api.ReplicationController)

updater := kubectl.NewRollingUpdater(namespace, client)

// fetch rc
oldRc, err := client.ReplicationControllers(namespace).Get(oldName)
checkErr(err)

var hasLabel bool
for key, oldValue := range oldRc.Spec.Selector {
if newValue, ok := newRc.Spec.Selector[key]; ok && newValue != oldValue {
hasLabel = true
break
}
}
if !hasLabel {
usageError(cmd, "%s must specify a matching key with non-equal value in Selector for %s",
filename, oldName)
}
// TODO: handle resizes during rolling update
if newRc.Spec.Replicas == 0 {
newRc.Spec.Replicas = oldRc.Spec.Replicas
}
err = updater.Update(out, oldRc, newRc, period, interval, timeout)
checkErr(err)

fmt.Fprintf(out, "%s\n", newName)
},
}
cmd.Flags().String("update-period", updatePeriod, `Time to wait between updating pods. Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".`)
cmd.Flags().String("poll-interval", pollInterval, `Time delay between polling controller status after update. Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".`)
cmd.Flags().String("timeout", timeout, `Max time to wait for a controller to update before giving up. Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".`)
cmd.Flags().StringP("filename", "f", "", "Filename or URL to file to use to create the new controller")
return cmd
}
2 changes: 2 additions & 0 deletions pkg/kubectl/kubectl.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import (

var apiVersionToUse = "v1beta1"

const kubectlAnnotationPrefix = "kubectl.kubernetes.io/"

func GetKubeClient(config *client.Config, matchVersion bool) (*client.Client, error) {
// TODO: get the namespace context when kubectl ns is completed
c, err := client.New(config)
Expand Down
173 changes: 173 additions & 0 deletions pkg/kubectl/rolling_updater.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package kubectl

import (
"fmt"
"io"
"strconv"
"time"

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait"
)

// RollingUpdater provides methods for updating replicated pods in a predictable,
// fault-tolerant way.
type RollingUpdater struct {
// Client interface for creating and updating controllers
c client.Interface
// Namespace for resources
ns string
}

// NewRollingUpdater creates a RollingUpdater from a client
func NewRollingUpdater(namespace string, c client.Interface) *RollingUpdater {
return &RollingUpdater{
c,
namespace,
}
}

const (
sourceIdAnnotation = kubectlAnnotationPrefix + "update-source-id"
desiredReplicasAnnotation = kubectlAnnotationPrefix + "desired-replicas"
)

// Update all pods for a ReplicationController (oldRc) by creating a new controller (newRc)
// with 0 replicas, and synchronously resizing oldRc,newRc by 1 until oldRc has 0 replicas
// and newRc has the original # of desired replicas. oldRc is then deleted.
// If an update from newRc to oldRc is already in progress, we attempt to drive it to completion.
// If an error occurs at any step of the update, the error will be returned.
// 'out' writer for progress output
// 'oldRc' existing controller to be replaced
// 'newRc' controller that will take ownership of updated pods (will be created if needed)
// 'updatePeriod' time to wait between individual pod updates
// 'interval' time to wait between polling controller status after update
// 'timeout' time to wait for controller updates before giving up
//
// TODO: make this handle performing a rollback of a partially completed rollout.
func (r *RollingUpdater) Update(out io.Writer, oldRc, newRc *api.ReplicationController, updatePeriod, interval, timeout time.Duration) error {
oldName := oldRc.ObjectMeta.Name
newName := newRc.ObjectMeta.Name

if newRc.Spec.Replicas <= 0 {
return fmt.Errorf("Invalid controller spec for %s; required: > 0 replicas, actual: %s\n", newName, newRc.Spec)
}
desired := newRc.Spec.Replicas
sourceId := fmt.Sprintf("%s:%s", oldName, oldRc.ObjectMeta.UID)

// look for existing newRc, incase this update was previously started but interrupted
rc, existing, err := r.getExistingNewRc(sourceId, newName)
if existing {
fmt.Fprintf(out, "Continuing update with existing controller %s.\n", newName)
if err != nil {
return err
}
replicas := rc.ObjectMeta.Annotations[desiredReplicasAnnotation]
desired, err = strconv.Atoi(replicas)
if err != nil {
return fmt.Errorf("Unable to parse annotation for %s: %s=%s",
newName, desiredReplicasAnnotation, replicas)
}
newRc = rc
} else {
fmt.Fprintf(out, "Creating %s\n", newName)
if newRc.ObjectMeta.Annotations == nil {
newRc.ObjectMeta.Annotations = map[string]string{}
}
newRc.ObjectMeta.Annotations[desiredReplicasAnnotation] = fmt.Sprintf("%d", desired)
newRc.ObjectMeta.Annotations[sourceIdAnnotation] = sourceId
newRc.Spec.Replicas = 0
newRc, err = r.c.ReplicationControllers(r.ns).Create(newRc)
if err != nil {
return err
}
}

// +1, -1 on oldRc, newRc until newRc has desired number of replicas or oldRc has 0 replicas
for newRc.Spec.Replicas < desired && oldRc.Spec.Replicas != 0 {
newRc.Spec.Replicas += 1
oldRc.Spec.Replicas -= 1
fmt.Fprintf(out, "Updating %s replicas: %d, %s replicas: %d\n",
oldName, oldRc.Spec.Replicas,
newName, newRc.Spec.Replicas)

newRc, err = r.updateAndWait(newRc, interval, timeout)
if err != nil {
return err
}
time.Sleep(updatePeriod)
oldRc, err = r.updateAndWait(oldRc, interval, timeout)
if err != nil {
return err
}
}
// delete remaining replicas on oldRc
if oldRc.Spec.Replicas != 0 {
fmt.Fprintf(out, "Stopping %s replicas: %d -> %d\n",
oldName, oldRc.Spec.Replicas, 0)
oldRc.Spec.Replicas = 0
oldRc, err = r.updateAndWait(oldRc, interval, timeout)
if err != nil {
return err
}
}
// add remaining replicas on newRc, cleanup annotations
if newRc.Spec.Replicas != desired {
fmt.Fprintf(out, "Resizing %s replicas: %d -> %d\n",
newName, newRc.Spec.Replicas, desired)
newRc.Spec.Replicas = desired
}
delete(newRc.ObjectMeta.Annotations, sourceIdAnnotation)
delete(newRc.ObjectMeta.Annotations, desiredReplicasAnnotation)
newRc, err = r.updateAndWait(newRc, interval, timeout)
if err != nil {
return err
}
// delete old rc
fmt.Fprintf(out, "Update succeeded. Deleting %s\n", oldName)
return r.c.ReplicationControllers(r.ns).Delete(oldName)
}

func (r *RollingUpdater) getExistingNewRc(sourceId, name string) (rc *api.ReplicationController, existing bool, err error) {
if rc, err = r.c.ReplicationControllers(r.ns).Get(name); err == nil {
existing = true
annotations := rc.ObjectMeta.Annotations
source := annotations[sourceIdAnnotation]
_, ok := annotations[desiredReplicasAnnotation]
if source != sourceId || !ok {
err = fmt.Errorf("Missing/unexpected annotations for controller %s: %s", name, annotations)
}
return
}
err = nil
return
}

func (r *RollingUpdater) updateAndWait(rc *api.ReplicationController, interval, timeout time.Duration) (*api.ReplicationController, error) {
rc, err := r.c.ReplicationControllers(r.ns).Update(rc)
if err != nil {
return nil, err
}
if err := wait.Poll(interval, timeout,
client.ControllerHasDesiredReplicas(r.c, rc)); err != nil {
return nil, err
}
return r.c.ReplicationControllers(r.ns).Get(rc.ObjectMeta.Name)
}
Loading

0 comments on commit 0ab39df

Please sign in to comment.