Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding scale up down code for DeploymentController #14209

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
239 changes: 213 additions & 26 deletions pkg/controller/deployment/deployment_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ package deployment
import (
"fmt"
"hash/adler32"
"math"
"time"

"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/apis/experimental"
client "k8s.io/kubernetes/pkg/client/unversioned"
Expand All @@ -45,7 +45,6 @@ func New(client client.Interface) *DeploymentController {
func (d *DeploymentController) Run(syncPeriod time.Duration) {
go util.Until(func() {
if err := d.reconcileDeployments(); err != nil {
glog.Errorf("Couldnt reconcile deployments: %v", err)
}
}, syncPeriod, util.NeverStop)
}
Expand All @@ -64,24 +63,59 @@ func (d *DeploymentController) reconcileDeployments() error {
}

func (d *DeploymentController) reconcileDeployment(deployment *experimental.Deployment) error {
targetedRCs, err := d.getTargetedRCs(deployment)
switch deployment.Spec.Strategy.Type {
case experimental.RecreateDeploymentStrategyType:
return d.reconcileRecreateDeployment(*deployment)
case experimental.RollingUpdateDeploymentStrategyType:
return d.reconcileRollingUpdateDeployment(*deployment)
}
return fmt.Errorf("Unexpected deployment strategy type: %s", deployment.Spec.Strategy.Type)
}

func (d *DeploymentController) reconcileRecreateDeployment(deployment experimental.Deployment) error {
// TODO: implement me.
return nil
}

func (d *DeploymentController) reconcileRollingUpdateDeployment(deployment experimental.Deployment) error {
newRC, err := d.getNewRC(deployment)
if err != nil {
return err
}

oldRCs, err := d.getOldRCs(deployment)
if err != nil {
return err
}

allRCs := []*api.ReplicationController{}
allRCs = append(allRCs, oldRCs...)
allRCs = append(allRCs, newRC)

// Scale up, if we can.
scaledUp, err := d.scaleUp(allRCs, newRC, deployment)
if err != nil {
return err
}
desiredRC, err := d.getDesiredRC(deployment)
if scaledUp {
// Update DeploymentStatus
return d.updateDeploymentStatus(allRCs, newRC, deployment)
}

// Scale down, if we can.
scaledDown, err := d.scaleDown(allRCs, oldRCs, newRC, deployment)
if err != nil {
return err
}
// TODO: Scale up and down the targeted and desired RCs.
// For now, just print their names, until we start doing something useful.
for _, targetedRC := range targetedRCs {
glog.Infof("TargetedRC: %s", targetedRC.ObjectMeta.Name)
if scaledDown {
// Update DeploymentStatus
return d.updateDeploymentStatus(allRCs, newRC, deployment)
}
glog.Infof("DesiredRC: %s", desiredRC.ObjectMeta.Name)
// TODO: raise an event, neither scaled up nor down.
return nil
}

func (d *DeploymentController) getTargetedRCs(deployment *experimental.Deployment) ([]api.ReplicationController, error) {
func (d *DeploymentController) getOldRCs(deployment experimental.Deployment) ([]*api.ReplicationController, error) {
namespace := deployment.ObjectMeta.Namespace
// 1. Find all pods whose labels match deployment.Spec.Selector
podList, err := d.client.Pods(namespace).List(labels.SelectorFromSet(deployment.Spec.Selector), fields.Everything())
Expand All @@ -90,7 +124,7 @@ func (d *DeploymentController) getTargetedRCs(deployment *experimental.Deploymen
}
// 2. Find the corresponding RCs for pods in podList.
// TODO: Right now we list all RCs and then filter. We should add an API for this.
targetedRCs := map[string]api.ReplicationController{}
oldRCs := map[string]api.ReplicationController{}
rcList, err := d.client.ReplicationControllers(namespace).List(labels.Everything())
if err != nil {
return nil, fmt.Errorf("error listing replication controllers: %v", err)
Expand All @@ -100,51 +134,204 @@ func (d *DeploymentController) getTargetedRCs(deployment *experimental.Deploymen
for _, rc := range rcList.Items {
rcLabelsSelector := labels.SelectorFromSet(rc.Spec.Selector)
if rcLabelsSelector.Matches(podLabelsSelector) {
targetedRCs[rc.ObjectMeta.Name] = rc
continue
// Filter out RC that has the same pod template spec as the deployment - that is the new RC.
if api.Semantic.DeepEqual(rc.Spec.Template, getNewRCTemplate(deployment)) {
continue
}
oldRCs[rc.ObjectMeta.Name] = rc
}
}
}
requiredRCs := []api.ReplicationController{}
for _, value := range targetedRCs {
requiredRCs = append(requiredRCs, value)
requiredRCs := []*api.ReplicationController{}
for _, value := range oldRCs {
requiredRCs = append(requiredRCs, &value)
}
return requiredRCs, nil
}

// Returns an RC that matches the intent of the given deployment.
// It creates a new RC if required.
func (d *DeploymentController) getDesiredRC(deployment *experimental.Deployment) (*api.ReplicationController, error) {
func (d *DeploymentController) getNewRC(deployment experimental.Deployment) (*api.ReplicationController, error) {
namespace := deployment.ObjectMeta.Namespace
// Find if the required RC exists already.
rcList, err := d.client.ReplicationControllers(namespace).List(labels.Everything())
if err != nil {
return nil, fmt.Errorf("error listing replication controllers: %v", err)
}
newRCTemplate := getNewRCTemplate(deployment)

for _, rc := range rcList.Items {
if api.Semantic.DeepEqual(rc.Spec.Template, deployment.Spec.Template) {
// This is the desired RC.
if api.Semantic.DeepEqual(rc.Spec.Template, newRCTemplate) {
// This is the new RC.
return &rc, nil
}
}
// desired RC does not exist, create a new one.
podTemplateSpecHasher := adler32.New()
util.DeepHashObject(podTemplateSpecHasher, deployment.Spec.Template)
podTemplateSpecHash := podTemplateSpecHasher.Sum32()
// new RC does not exist, create one.
podTemplateSpecHash := getPodTemplateSpecHash(deployment.Spec.Template)
rcName := fmt.Sprintf("deploymentrc-%d", podTemplateSpecHash)
desiredRC := api.ReplicationController{
newRC := api.ReplicationController{
ObjectMeta: api.ObjectMeta{
Name: rcName,
Namespace: namespace,
},
Spec: api.ReplicationControllerSpec{
Replicas: 0,
Template: deployment.Spec.Template,
Selector: newRCTemplate.ObjectMeta.Labels,
Template: newRCTemplate,
},
}
createdRC, err := d.client.ReplicationControllers(namespace).Create(&desiredRC)
createdRC, err := d.client.ReplicationControllers(namespace).Create(&newRC)
if err != nil {
return nil, fmt.Errorf("error creating replication controller: %v", err)
}
return createdRC, nil
}

func getNewRCTemplate(deployment experimental.Deployment) *api.PodTemplateSpec {
// newRC will have the same template as in deployment spec, plus a unique label in some cases.
newRCTemplate := &api.PodTemplateSpec{
ObjectMeta: deployment.Spec.Template.ObjectMeta,
Spec: deployment.Spec.Template.Spec,
}
podTemplateSpecHash := getPodTemplateSpecHash(newRCTemplate)
if deployment.Spec.UniqueLabelKey != "" {
newLabels := map[string]string{}
for key, value := range deployment.Spec.Template.ObjectMeta.Labels {
newLabels[key] = value
}
newLabels[deployment.Spec.UniqueLabelKey] = fmt.Sprintf("%d", podTemplateSpecHash)
newRCTemplate.ObjectMeta.Labels = newLabels
}
return newRCTemplate
}

func getPodTemplateSpecHash(template *api.PodTemplateSpec) uint32 {
podTemplateSpecHasher := adler32.New()
util.DeepHashObject(podTemplateSpecHasher, template)
return podTemplateSpecHasher.Sum32()
}

func (d *DeploymentController) getPodsForRCs(replicationControllers []*api.ReplicationController) ([]api.Pod, error) {
allPods := []api.Pod{}
for _, rc := range replicationControllers {
podList, err := d.client.Pods(rc.ObjectMeta.Namespace).List(labels.SelectorFromSet(rc.Spec.Selector), fields.Everything())
if err != nil {
return allPods, fmt.Errorf("error listing pods: %v", err)
}
allPods = append(allPods, podList.Items...)
}
return allPods, nil
}

func (d *DeploymentController) getReplicaCountForRCs(replicationControllers []*api.ReplicationController) int {
totalReplicaCount := 0
for _, rc := range replicationControllers {
totalReplicaCount += rc.Spec.Replicas
}
return totalReplicaCount
}

func (d *DeploymentController) scaleUp(allRCs []*api.ReplicationController, newRC *api.ReplicationController, deployment experimental.Deployment) (bool, error) {
if newRC.Spec.Replicas == deployment.Spec.Replicas {
// Scaling up not required.
return false, nil
}
maxSurge, isPercent, err := util.GetIntOrPercentValue(&deployment.Spec.Strategy.RollingUpdate.MaxSurge)
if err != nil {
return false, fmt.Errorf("Invalid value for MaxSurge: %v", err)
}
if isPercent {
maxSurge = util.GetValueFromPercent(maxSurge, deployment.Spec.Replicas)
}
// Find the total number of pods
allPods, err := d.getPodsForRCs(allRCs)
if err != nil {
return false, err
}
currentPodCount := len(allPods)
// Check if we can scale up.
maxTotalPods := deployment.Spec.Replicas + maxSurge
if currentPodCount >= maxTotalPods {
// Cannot scale up.
return false, nil
}
// Scale up.
scaleUpCount := maxTotalPods - currentPodCount
scaleUpCount = int(math.Min(float64(scaleUpCount), float64(deployment.Spec.Replicas-newRC.Spec.Replicas)))
_, err = d.scaleRC(newRC, newRC.Spec.Replicas+scaleUpCount)
return true, err
}

func (d *DeploymentController) scaleDown(allRCs []*api.ReplicationController, oldRCs []*api.ReplicationController, newRC *api.ReplicationController, deployment experimental.Deployment) (bool, error) {
oldPodsCount := d.getReplicaCountForRCs(oldRCs)
if oldPodsCount == 0 {
// Cant scale down further
return false, nil
}
maxUnavailable, isPercent, err := util.GetIntOrPercentValue(&deployment.Spec.Strategy.RollingUpdate.MaxUnavailable)
if err != nil {
return false, fmt.Errorf("Invalid value for MaxUnavailable: %v", err)
}
if isPercent {
maxUnavailable = util.GetValueFromPercent(maxUnavailable, deployment.Spec.Replicas)
}
// Check if we can scale down.
minAvailable := deployment.Spec.Replicas - maxUnavailable
// Find the number of ready pods.
// TODO: Use MinReadySeconds once https://github.com/kubernetes/kubernetes/pull/12894 is merged.
readyPodCount := 0
allPods, err := d.getPodsForRCs(allRCs)
for _, pod := range allPods {
if api.IsPodReady(&pod) {
readyPodCount++
}
}

if readyPodCount <= minAvailable {
// Cannot scale down.
return false, nil
}
totalScaleDownCount := readyPodCount - minAvailable
for _, targetRC := range oldRCs {
if totalScaleDownCount == 0 {
// No further scaling required.
break
}
if targetRC.Spec.Replicas == 0 {
// cannot scale down this RC.
continue
}
// Scale down.
scaleDownCount := int(math.Min(float64(targetRC.Spec.Replicas), float64(totalScaleDownCount)))
_, err = d.scaleRC(targetRC, targetRC.Spec.Replicas-scaleDownCount)
if err != nil {
return false, err
}
totalScaleDownCount -= scaleDownCount
}
return true, err
}

func (d *DeploymentController) updateDeploymentStatus(allRCs []*api.ReplicationController, newRC *api.ReplicationController, deployment experimental.Deployment) error {
totalReplicas := d.getReplicaCountForRCs(allRCs)
updatedReplicas := d.getReplicaCountForRCs([]*api.ReplicationController{newRC})
newDeployment := deployment
// TODO: Reconcile this with API definition. API definition talks about ready pods, while this just computes created pods.
newDeployment.Status = experimental.DeploymentStatus{
Replicas: totalReplicas,
UpdatedReplicas: updatedReplicas,
}
_, err := d.updateDeployment(&newDeployment)
return err
}

func (d *DeploymentController) scaleRC(rc *api.ReplicationController, newScale int) (*api.ReplicationController, error) {
// TODO: Using client for now, update to use store when it is ready.
rc.Spec.Replicas = newScale
return d.client.ReplicationControllers(rc.ObjectMeta.Namespace).Update(rc)
}

func (d *DeploymentController) updateDeployment(deployment *experimental.Deployment) (*experimental.Deployment, error) {
// TODO: Using client for now, update to use store when it is ready.
return d.client.Experimental().Deployments(deployment.ObjectMeta.Namespace).Update(deployment)
}
20 changes: 20 additions & 0 deletions pkg/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"encoding/json"
"fmt"
"io"
"math"
"net"
"net/http"
"os"
Expand Down Expand Up @@ -191,6 +192,25 @@ func (intstr *IntOrString) Fuzz(c fuzz.Continue) {
}
}

func GetIntOrPercentValue(intStr *IntOrString) (int, bool, error) {
switch intStr.Kind {
case IntstrInt:
return intStr.IntVal, false, nil
case IntstrString:
s := strings.Replace(intStr.StrVal, "%", "", -1)
v, err := strconv.Atoi(s)
if err != nil {
return 0, false, fmt.Errorf("invalid value %q: %v", intStr.StrVal, err)
}
return v, true, nil
}
return 0, false, fmt.Errorf("invalid value: neither int nor percentage")
}

func GetValueFromPercent(percent int, value int) int {
return int(math.Ceil(float64(percent) * (float64(value)) / 100))
}

// Takes a list of strings and compiles them into a list of regular expressions
func CompileRegexps(regexpStrings []string) ([]*regexp.Regexp, error) {
regexps := []*regexp.Regexp{}
Expand Down