Skip to content

Commit

Permalink
Merge pull request #25178 from zmerlynn/random_max_target_pools
Browse files Browse the repository at this point in the history
Automatic merge from submit-queue

GCE: Allow node count to exceed GCE TargetPool maximums

```release-note
If the cluster node count exceeds the GCE TargetPool maximum (currently 1000),
randomly select which nodes are members of Kubernetes External Load Balancers.
```


[![Analytics](https://kubernetes-site.appspot.com/UA-36037335-10/GitHub/.github/PULL_REQUEST_TEMPLATE.md?pixel)]()

If we would exceeded the TargetPool API maximums, instead just
randomly select some subsection of the nodes to include in the TP
instead.
  • Loading branch information
k8s-merge-robot committed May 10, 2016
2 parents f9b8fd0 + faf0c44 commit a57876b
Show file tree
Hide file tree
Showing 4 changed files with 208 additions and 23 deletions.
90 changes: 67 additions & 23 deletions pkg/cloudprovider/providers/gce/gce.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
utilerrors "k8s.io/kubernetes/pkg/util/errors"
"k8s.io/kubernetes/pkg/util/flowcontrol"
netsets "k8s.io/kubernetes/pkg/util/net/sets"
"k8s.io/kubernetes/pkg/util/rand"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/wait"

Expand Down Expand Up @@ -68,6 +69,9 @@ const (
// are iterated through to prevent infinite loops if the API
// were to continuously return a nextPageToken.
maxPages = 25

// TargetPools can only support 1000 VMs.
maxInstancesPerTargetPool = 1000
)

// GCECloud is an implementation of Interface, LoadBalancer and Instances for Google Compute Engine.
Expand Down Expand Up @@ -95,6 +99,11 @@ type Config struct {
}
}

type instRefSlice []*compute.InstanceReference

func (p instRefSlice) Len() int { return len(p) }
func (p instRefSlice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }

func init() {
cloudprovider.RegisterCloudProvider(ProviderName, func(config io.Reader) (cloudprovider.Interface, error) { return newGCECloud(config) })
}
Expand Down Expand Up @@ -853,11 +862,22 @@ func (gce *GCECloud) createForwardingRule(name, serviceName, region, ipAddress s
return nil
}

func restrictTargetPool(instances []string, max int) []string {
if len(instances) <= max {
return instances
}
rand.Shuffle(sort.StringSlice(instances))
return instances[:max]
}

func (gce *GCECloud) createTargetPool(name, serviceName, region string, hosts []*gceInstance, affinityType api.ServiceAffinity) error {
var instances []string
for _, host := range hosts {
instances = append(instances, makeHostURL(gce.projectID, host.Zone, host.Name))
}
// Choose a random subset of nodes to send traffic to, if we
// exceed API maximums.
instances = restrictTargetPool(instances, maxInstancesPerTargetPool)
pool := &compute.TargetPool{
Name: name,
Description: fmt.Sprintf(`{"kubernetes.io/service-name":"%s"}`, serviceName),
Expand Down Expand Up @@ -1075,26 +1095,19 @@ func (gce *GCECloud) ensureStaticIP(name, serviceName, region, existingIP string
return address.Address, existed, nil
}

// UpdateLoadBalancer is an implementation of LoadBalancer.UpdateLoadBalancer.
func (gce *GCECloud) UpdateLoadBalancer(service *api.Service, hostNames []string) error {
hosts, err := gce.getInstancesByNames(hostNames)
if err != nil {
return err
}

loadBalancerName := cloudprovider.GetLoadBalancerName(service)
pool, err := gce.service.TargetPools.Get(gce.projectID, gce.region, loadBalancerName).Do()
if err != nil {
return err
}
// computeUpdate takes the existing TargetPool and the set of running
// instances and returns (toAdd, toRemove), the set of instances to
// reprogram on the TargetPool this reconcile. max restricts the
// number of nodes allowed to be programmed on the TargetPool.
func computeUpdate(tp *compute.TargetPool, instances []*gceInstance, max int) ([]*compute.InstanceReference, []*compute.InstanceReference) {
existing := sets.NewString()
for _, instance := range pool.Instances {
for _, instance := range tp.Instances {
existing.Insert(hostURLToComparablePath(instance))
}

var toAdd []*compute.InstanceReference
var toRemove []*compute.InstanceReference
for _, host := range hosts {
for _, host := range instances {
link := host.makeComparableHostPath()
if !existing.Has(link) {
toAdd = append(toAdd, &compute.InstanceReference{Instance: link})
Expand All @@ -1105,9 +1118,36 @@ func (gce *GCECloud) UpdateLoadBalancer(service *api.Service, hostNames []string
toRemove = append(toRemove, &compute.InstanceReference{Instance: link})
}

if len(toAdd) > 0 {
add := &compute.TargetPoolsAddInstanceRequest{Instances: toAdd}
op, err := gce.service.TargetPools.AddInstance(gce.projectID, gce.region, loadBalancerName, add).Do()
if len(tp.Instances)+len(toAdd)-len(toRemove) > max {
// TODO(zmerlynn): In theory, there are faster ways to handle
// this if room is much smaller than len(toAdd). In practice,
// meh.
room := max - len(tp.Instances) + len(toRemove)
glog.Infof("TargetPool maximums exceeded, shuffling in %d instances", room)
rand.Shuffle(instRefSlice(toAdd))
toAdd = toAdd[:room]
}

return toAdd, toRemove
}

// UpdateLoadBalancer is an implementation of LoadBalancer.UpdateLoadBalancer.
func (gce *GCECloud) UpdateLoadBalancer(service *api.Service, hostNames []string) error {
hosts, err := gce.getInstancesByNames(hostNames)
if err != nil {
return err
}

loadBalancerName := cloudprovider.GetLoadBalancerName(service)
pool, err := gce.service.TargetPools.Get(gce.projectID, gce.region, loadBalancerName).Do()
if err != nil {
return err
}

toAdd, toRemove := computeUpdate(pool, hosts, maxInstancesPerTargetPool)
if len(toRemove) > 0 {
rm := &compute.TargetPoolsRemoveInstanceRequest{Instances: toRemove}
op, err := gce.service.TargetPools.RemoveInstance(gce.projectID, gce.region, loadBalancerName, rm).Do()
if err != nil {
return err
}
Expand All @@ -1116,9 +1156,9 @@ func (gce *GCECloud) UpdateLoadBalancer(service *api.Service, hostNames []string
}
}

if len(toRemove) > 0 {
rm := &compute.TargetPoolsRemoveInstanceRequest{Instances: toRemove}
op, err := gce.service.TargetPools.RemoveInstance(gce.projectID, gce.region, loadBalancerName, rm).Do()
if len(toAdd) > 0 {
add := &compute.TargetPoolsAddInstanceRequest{Instances: toAdd}
op, err := gce.service.TargetPools.AddInstance(gce.projectID, gce.region, loadBalancerName, add).Do()
if err != nil {
return err
}
Expand All @@ -1134,10 +1174,14 @@ func (gce *GCECloud) UpdateLoadBalancer(service *api.Service, hostNames []string
if err != nil {
return err
}
if len(updatedPool.Instances) != len(hosts) {
wantInstances := len(hosts)
if wantInstances > maxInstancesPerTargetPool {
wantInstances = maxInstancesPerTargetPool
}
if len(updatedPool.Instances) != wantInstances {
glog.Errorf("Unexpected number of instances (%d) in target pool %s after updating (expected %d). Instances in updated pool: %s",
len(updatedPool.Instances), loadBalancerName, len(hosts), strings.Join(updatedPool.Instances, ","))
return fmt.Errorf("Unexpected number of instances (%d) in target pool %s after update (expected %d)", len(updatedPool.Instances), loadBalancerName, len(hosts))
len(updatedPool.Instances), loadBalancerName, wantInstances, strings.Join(updatedPool.Instances, ","))
return fmt.Errorf("Unexpected number of instances (%d) in target pool %s after update (expected %d)", len(updatedPool.Instances), loadBalancerName, wantInstances)
}
return nil
}
Expand Down
112 changes: 112 additions & 0 deletions pkg/cloudprovider/providers/gce/gce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ package gce
import (
"reflect"
"testing"

compute "google.golang.org/api/compute/v1"
"k8s.io/kubernetes/pkg/util/rand"
)

func TestGetRegion(t *testing.T) {
Expand Down Expand Up @@ -148,3 +151,112 @@ func TestScrubDNS(t *testing.T) {
}
}
}

func TestRestrictTargetPool(t *testing.T) {
const maxInstances = 5
tests := []struct {
instances []string
want []string
}{
{
instances: []string{"1", "2", "3", "4", "5"},
want: []string{"1", "2", "3", "4", "5"},
},
{
instances: []string{"1", "2", "3", "4", "5", "6"},
want: []string{"4", "3", "5", "2", "6"},
},
}
for _, tc := range tests {
rand.Seed(5)
got := restrictTargetPool(append([]string{}, tc.instances...), maxInstances)
if !reflect.DeepEqual(got, tc.want) {
t.Errorf("restrictTargetPool(%v) => %v, want %v", tc.instances, got, tc.want)
}
}
}

func TestComputeUpdate(t *testing.T) {
const maxInstances = 5
const fakeZone = "us-moon1-f"
tests := []struct {
tp []string
instances []string
wantToAdd []string
wantToRemove []string
}{
{
// Test adding all instances.
tp: []string{},
instances: []string{"0", "1", "2"},
wantToAdd: []string{"0", "1", "2"},
wantToRemove: []string{},
},
{
// Test node 1 coming back healthy.
tp: []string{"0", "2"},
instances: []string{"0", "1", "2"},
wantToAdd: []string{"1"},
wantToRemove: []string{},
},
{
// Test node 1 going healthy while node 4 needs to be removed.
tp: []string{"0", "2", "4"},
instances: []string{"0", "1", "2"},
wantToAdd: []string{"1"},
wantToRemove: []string{"4"},
},
{
// Test exceeding the TargetPool max of 5 (for the test),
// which shuffles in 7, 5, 8 based on the deterministic
// seed below.
tp: []string{"0", "2", "4", "6"},
instances: []string{"0", "1", "2", "3", "5", "7", "8"},
wantToAdd: []string{"7", "5", "8"},
wantToRemove: []string{"4", "6"},
},
{
// Test all nodes getting removed.
tp: []string{"0", "1", "2", "3"},
instances: []string{},
wantToAdd: []string{},
wantToRemove: []string{"0", "1", "2", "3"},
},
}
for _, tc := range tests {
rand.Seed(5) // Arbitrary RNG seed for deterministic testing.

// Dummy up the gceInstance slice.
var instances []*gceInstance
for _, inst := range tc.instances {
instances = append(instances, &gceInstance{Name: inst, Zone: fakeZone})
}
// Dummy up the TargetPool URL list.
var urls []string
for _, inst := range tc.tp {
inst := &gceInstance{Name: inst, Zone: fakeZone}
urls = append(urls, inst.makeComparableHostPath())
}
gotAddInsts, gotRem := computeUpdate(&compute.TargetPool{Instances: urls}, instances, maxInstances)
var wantAdd []string
for _, inst := range tc.wantToAdd {
inst := &gceInstance{Name: inst, Zone: fakeZone}
wantAdd = append(wantAdd, inst.makeComparableHostPath())
}
var gotAdd []string
for _, inst := range gotAddInsts {
gotAdd = append(gotAdd, inst.Instance)
}
if !reflect.DeepEqual(wantAdd, gotAdd) {
t.Errorf("computeTargetPool(%v, %v) => added %v, wanted %v", tc.tp, tc.instances, gotAdd, wantAdd)
}
_ = gotRem
// var gotRem []string
// for _, inst := range gotRemInsts {
// gotRem = append(gotRem, inst.Instance)
// }
// if !reflect.DeepEqual(tc.wantToRemove, gotRem) {
// t.Errorf("computeTargetPool(%v, %v) => removed %v, wanted %v", tc.tp, tc.instances, gotRem, tc.wantToRemove)
// }
}
}
16 changes: 16 additions & 0 deletions pkg/util/rand/rand.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,19 @@ func String(length int) string {
}
return string(b)
}

// A type that satisfies the rand.Shufflable interface can be shuffled
// by Shuffle. Any sort.Interface will satisfy this interface.
type Shufflable interface {
Len() int
Swap(i, j int)
}

func Shuffle(data Shufflable) {
rng.Lock()
defer rng.Unlock()
for i := 0; i < data.Len(); i++ {
j := rng.rand.Intn(i + 1)
data.Swap(i, j)
}
}
13 changes: 13 additions & 0 deletions pkg/util/rand/rand_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package rand

import (
"math/rand"
"reflect"
"sort"
"strings"
"testing"
)
Expand Down Expand Up @@ -71,3 +73,14 @@ func TestPerm(t *testing.T) {
}
}
}

func TestShuffle(t *testing.T) {
Seed(5) // Arbitrary RNG seed for deterministic testing.
have := []int{0, 1, 2, 3, 4}
want := []int{3, 2, 4, 1, 0} // "have" shuffled, with RNG at Seed(5).
got := append([]int{}, have...)
Shuffle(sort.IntSlice(got))
if !reflect.DeepEqual(got, want) {
t.Errorf("Shuffle(%v) => %v, want %v", have, got, want)
}
}

0 comments on commit a57876b

Please sign in to comment.