Skip to content

Commit

Permalink
GCE: Allow nodes to exceed target pool maximums
Browse files Browse the repository at this point in the history
If we would exceeded the TargetPool API maximums, instead just
randomly select some subsection of the nodes to include in the TP
instead.
  • Loading branch information
zmerlynn committed May 10, 2016
1 parent e973b5d commit faf0c44
Show file tree
Hide file tree
Showing 4 changed files with 208 additions and 23 deletions.
90 changes: 67 additions & 23 deletions pkg/cloudprovider/providers/gce/gce.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
utilerrors "k8s.io/kubernetes/pkg/util/errors"
"k8s.io/kubernetes/pkg/util/flowcontrol"
netsets "k8s.io/kubernetes/pkg/util/net/sets"
"k8s.io/kubernetes/pkg/util/rand"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/wait"

Expand Down Expand Up @@ -68,6 +69,9 @@ const (
// are iterated through to prevent infinite loops if the API
// were to continuously return a nextPageToken.
maxPages = 25

// TargetPools can only support 1000 VMs.
maxInstancesPerTargetPool = 1000
)

// GCECloud is an implementation of Interface, LoadBalancer and Instances for Google Compute Engine.
Expand Down Expand Up @@ -95,6 +99,11 @@ type Config struct {
}
}

type instRefSlice []*compute.InstanceReference

func (p instRefSlice) Len() int { return len(p) }
func (p instRefSlice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }

func init() {
cloudprovider.RegisterCloudProvider(ProviderName, func(config io.Reader) (cloudprovider.Interface, error) { return newGCECloud(config) })
}
Expand Down Expand Up @@ -853,11 +862,22 @@ func (gce *GCECloud) createForwardingRule(name, serviceName, region, ipAddress s
return nil
}

func restrictTargetPool(instances []string, max int) []string {
if len(instances) <= max {
return instances
}
rand.Shuffle(sort.StringSlice(instances))
return instances[:max]
}

func (gce *GCECloud) createTargetPool(name, serviceName, region string, hosts []*gceInstance, affinityType api.ServiceAffinity) error {
var instances []string
for _, host := range hosts {
instances = append(instances, makeHostURL(gce.projectID, host.Zone, host.Name))
}
// Choose a random subset of nodes to send traffic to, if we
// exceed API maximums.
instances = restrictTargetPool(instances, maxInstancesPerTargetPool)
pool := &compute.TargetPool{
Name: name,
Description: fmt.Sprintf(`{"kubernetes.io/service-name":"%s"}`, serviceName),
Expand Down Expand Up @@ -1075,26 +1095,19 @@ func (gce *GCECloud) ensureStaticIP(name, serviceName, region, existingIP string
return address.Address, existed, nil
}

// UpdateLoadBalancer is an implementation of LoadBalancer.UpdateLoadBalancer.
func (gce *GCECloud) UpdateLoadBalancer(service *api.Service, hostNames []string) error {
hosts, err := gce.getInstancesByNames(hostNames)
if err != nil {
return err
}

loadBalancerName := cloudprovider.GetLoadBalancerName(service)
pool, err := gce.service.TargetPools.Get(gce.projectID, gce.region, loadBalancerName).Do()
if err != nil {
return err
}
// computeUpdate takes the existing TargetPool and the set of running
// instances and returns (toAdd, toRemove), the set of instances to
// reprogram on the TargetPool this reconcile. max restricts the
// number of nodes allowed to be programmed on the TargetPool.
func computeUpdate(tp *compute.TargetPool, instances []*gceInstance, max int) ([]*compute.InstanceReference, []*compute.InstanceReference) {
existing := sets.NewString()
for _, instance := range pool.Instances {
for _, instance := range tp.Instances {
existing.Insert(hostURLToComparablePath(instance))
}

var toAdd []*compute.InstanceReference
var toRemove []*compute.InstanceReference
for _, host := range hosts {
for _, host := range instances {
link := host.makeComparableHostPath()
if !existing.Has(link) {
toAdd = append(toAdd, &compute.InstanceReference{Instance: link})
Expand All @@ -1105,9 +1118,36 @@ func (gce *GCECloud) UpdateLoadBalancer(service *api.Service, hostNames []string
toRemove = append(toRemove, &compute.InstanceReference{Instance: link})
}

if len(toAdd) > 0 {
add := &compute.TargetPoolsAddInstanceRequest{Instances: toAdd}
op, err := gce.service.TargetPools.AddInstance(gce.projectID, gce.region, loadBalancerName, add).Do()
if len(tp.Instances)+len(toAdd)-len(toRemove) > max {
// TODO(zmerlynn): In theory, there are faster ways to handle
// this if room is much smaller than len(toAdd). In practice,
// meh.
room := max - len(tp.Instances) + len(toRemove)
glog.Infof("TargetPool maximums exceeded, shuffling in %d instances", room)
rand.Shuffle(instRefSlice(toAdd))
toAdd = toAdd[:room]
}

return toAdd, toRemove
}

// UpdateLoadBalancer is an implementation of LoadBalancer.UpdateLoadBalancer.
func (gce *GCECloud) UpdateLoadBalancer(service *api.Service, hostNames []string) error {
hosts, err := gce.getInstancesByNames(hostNames)
if err != nil {
return err
}

loadBalancerName := cloudprovider.GetLoadBalancerName(service)
pool, err := gce.service.TargetPools.Get(gce.projectID, gce.region, loadBalancerName).Do()
if err != nil {
return err
}

toAdd, toRemove := computeUpdate(pool, hosts, maxInstancesPerTargetPool)
if len(toRemove) > 0 {
rm := &compute.TargetPoolsRemoveInstanceRequest{Instances: toRemove}
op, err := gce.service.TargetPools.RemoveInstance(gce.projectID, gce.region, loadBalancerName, rm).Do()
if err != nil {
return err
}
Expand All @@ -1116,9 +1156,9 @@ func (gce *GCECloud) UpdateLoadBalancer(service *api.Service, hostNames []string
}
}

if len(toRemove) > 0 {
rm := &compute.TargetPoolsRemoveInstanceRequest{Instances: toRemove}
op, err := gce.service.TargetPools.RemoveInstance(gce.projectID, gce.region, loadBalancerName, rm).Do()
if len(toAdd) > 0 {
add := &compute.TargetPoolsAddInstanceRequest{Instances: toAdd}
op, err := gce.service.TargetPools.AddInstance(gce.projectID, gce.region, loadBalancerName, add).Do()
if err != nil {
return err
}
Expand All @@ -1134,10 +1174,14 @@ func (gce *GCECloud) UpdateLoadBalancer(service *api.Service, hostNames []string
if err != nil {
return err
}
if len(updatedPool.Instances) != len(hosts) {
wantInstances := len(hosts)
if wantInstances > maxInstancesPerTargetPool {
wantInstances = maxInstancesPerTargetPool
}
if len(updatedPool.Instances) != wantInstances {
glog.Errorf("Unexpected number of instances (%d) in target pool %s after updating (expected %d). Instances in updated pool: %s",
len(updatedPool.Instances), loadBalancerName, len(hosts), strings.Join(updatedPool.Instances, ","))
return fmt.Errorf("Unexpected number of instances (%d) in target pool %s after update (expected %d)", len(updatedPool.Instances), loadBalancerName, len(hosts))
len(updatedPool.Instances), loadBalancerName, wantInstances, strings.Join(updatedPool.Instances, ","))
return fmt.Errorf("Unexpected number of instances (%d) in target pool %s after update (expected %d)", len(updatedPool.Instances), loadBalancerName, wantInstances)
}
return nil
}
Expand Down
112 changes: 112 additions & 0 deletions pkg/cloudprovider/providers/gce/gce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ package gce
import (
"reflect"
"testing"

compute "google.golang.org/api/compute/v1"
"k8s.io/kubernetes/pkg/util/rand"
)

func TestGetRegion(t *testing.T) {
Expand Down Expand Up @@ -148,3 +151,112 @@ func TestScrubDNS(t *testing.T) {
}
}
}

func TestRestrictTargetPool(t *testing.T) {
const maxInstances = 5
tests := []struct {
instances []string
want []string
}{
{
instances: []string{"1", "2", "3", "4", "5"},
want: []string{"1", "2", "3", "4", "5"},
},
{
instances: []string{"1", "2", "3", "4", "5", "6"},
want: []string{"4", "3", "5", "2", "6"},
},
}
for _, tc := range tests {
rand.Seed(5)
got := restrictTargetPool(append([]string{}, tc.instances...), maxInstances)
if !reflect.DeepEqual(got, tc.want) {
t.Errorf("restrictTargetPool(%v) => %v, want %v", tc.instances, got, tc.want)
}
}
}

func TestComputeUpdate(t *testing.T) {
const maxInstances = 5
const fakeZone = "us-moon1-f"
tests := []struct {
tp []string
instances []string
wantToAdd []string
wantToRemove []string
}{
{
// Test adding all instances.
tp: []string{},
instances: []string{"0", "1", "2"},
wantToAdd: []string{"0", "1", "2"},
wantToRemove: []string{},
},
{
// Test node 1 coming back healthy.
tp: []string{"0", "2"},
instances: []string{"0", "1", "2"},
wantToAdd: []string{"1"},
wantToRemove: []string{},
},
{
// Test node 1 going healthy while node 4 needs to be removed.
tp: []string{"0", "2", "4"},
instances: []string{"0", "1", "2"},
wantToAdd: []string{"1"},
wantToRemove: []string{"4"},
},
{
// Test exceeding the TargetPool max of 5 (for the test),
// which shuffles in 7, 5, 8 based on the deterministic
// seed below.
tp: []string{"0", "2", "4", "6"},
instances: []string{"0", "1", "2", "3", "5", "7", "8"},
wantToAdd: []string{"7", "5", "8"},
wantToRemove: []string{"4", "6"},
},
{
// Test all nodes getting removed.
tp: []string{"0", "1", "2", "3"},
instances: []string{},
wantToAdd: []string{},
wantToRemove: []string{"0", "1", "2", "3"},
},
}
for _, tc := range tests {
rand.Seed(5) // Arbitrary RNG seed for deterministic testing.

// Dummy up the gceInstance slice.
var instances []*gceInstance
for _, inst := range tc.instances {
instances = append(instances, &gceInstance{Name: inst, Zone: fakeZone})
}
// Dummy up the TargetPool URL list.
var urls []string
for _, inst := range tc.tp {
inst := &gceInstance{Name: inst, Zone: fakeZone}
urls = append(urls, inst.makeComparableHostPath())
}
gotAddInsts, gotRem := computeUpdate(&compute.TargetPool{Instances: urls}, instances, maxInstances)
var wantAdd []string
for _, inst := range tc.wantToAdd {
inst := &gceInstance{Name: inst, Zone: fakeZone}
wantAdd = append(wantAdd, inst.makeComparableHostPath())
}
var gotAdd []string
for _, inst := range gotAddInsts {
gotAdd = append(gotAdd, inst.Instance)
}
if !reflect.DeepEqual(wantAdd, gotAdd) {
t.Errorf("computeTargetPool(%v, %v) => added %v, wanted %v", tc.tp, tc.instances, gotAdd, wantAdd)
}
_ = gotRem
// var gotRem []string
// for _, inst := range gotRemInsts {
// gotRem = append(gotRem, inst.Instance)
// }
// if !reflect.DeepEqual(tc.wantToRemove, gotRem) {
// t.Errorf("computeTargetPool(%v, %v) => removed %v, wanted %v", tc.tp, tc.instances, gotRem, tc.wantToRemove)
// }
}
}
16 changes: 16 additions & 0 deletions pkg/util/rand/rand.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,19 @@ func String(length int) string {
}
return string(b)
}

// A type that satisfies the rand.Shufflable interface can be shuffled
// by Shuffle. Any sort.Interface will satisfy this interface.
type Shufflable interface {
Len() int
Swap(i, j int)
}

func Shuffle(data Shufflable) {
rng.Lock()
defer rng.Unlock()
for i := 0; i < data.Len(); i++ {
j := rng.rand.Intn(i + 1)
data.Swap(i, j)
}
}
13 changes: 13 additions & 0 deletions pkg/util/rand/rand_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package rand

import (
"math/rand"
"reflect"
"sort"
"strings"
"testing"
)
Expand Down Expand Up @@ -71,3 +73,14 @@ func TestPerm(t *testing.T) {
}
}
}

func TestShuffle(t *testing.T) {
Seed(5) // Arbitrary RNG seed for deterministic testing.
have := []int{0, 1, 2, 3, 4}
want := []int{3, 2, 4, 1, 0} // "have" shuffled, with RNG at Seed(5).
got := append([]int{}, have...)
Shuffle(sort.IntSlice(got))
if !reflect.DeepEqual(got, want) {
t.Errorf("Shuffle(%v) => %v, want %v", have, got, want)
}
}

0 comments on commit faf0c44

Please sign in to comment.