Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GCE: Allow node count to exceed GCE TargetPool maximums #25178

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 67 additions & 23 deletions pkg/cloudprovider/providers/gce/gce.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
utilerrors "k8s.io/kubernetes/pkg/util/errors"
"k8s.io/kubernetes/pkg/util/flowcontrol"
netsets "k8s.io/kubernetes/pkg/util/net/sets"
"k8s.io/kubernetes/pkg/util/rand"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/wait"

Expand Down Expand Up @@ -68,6 +69,9 @@ const (
// are iterated through to prevent infinite loops if the API
// were to continuously return a nextPageToken.
maxPages = 25

// TargetPools can only support 1000 VMs.
maxInstancesPerTargetPool = 1000
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kubernetes/goog-cluster fyi

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Internal b/28566577, FYI

)

// GCECloud is an implementation of Interface, LoadBalancer and Instances for Google Compute Engine.
Expand Down Expand Up @@ -95,6 +99,11 @@ type Config struct {
}
}

type instRefSlice []*compute.InstanceReference

func (p instRefSlice) Len() int { return len(p) }
func (p instRefSlice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }

func init() {
cloudprovider.RegisterCloudProvider(ProviderName, func(config io.Reader) (cloudprovider.Interface, error) { return newGCECloud(config) })
}
Expand Down Expand Up @@ -853,11 +862,22 @@ func (gce *GCECloud) createForwardingRule(name, serviceName, region, ipAddress s
return nil
}

func restrictTargetPool(instances []string, max int) []string {
if len(instances) <= max {
return instances
}
rand.Shuffle(sort.StringSlice(instances))
return instances[:max]
}

func (gce *GCECloud) createTargetPool(name, serviceName, region string, hosts []*gceInstance, affinityType api.ServiceAffinity) error {
var instances []string
for _, host := range hosts {
instances = append(instances, makeHostURL(gce.projectID, host.Zone, host.Name))
}
// Choose a random subset of nodes to send traffic to, if we
// exceed API maximums.
instances = restrictTargetPool(instances, maxInstancesPerTargetPool)
pool := &compute.TargetPool{
Name: name,
Description: fmt.Sprintf(`{"kubernetes.io/service-name":"%s"}`, serviceName),
Expand Down Expand Up @@ -1075,26 +1095,19 @@ func (gce *GCECloud) ensureStaticIP(name, serviceName, region, existingIP string
return address.Address, existed, nil
}

// UpdateLoadBalancer is an implementation of LoadBalancer.UpdateLoadBalancer.
func (gce *GCECloud) UpdateLoadBalancer(service *api.Service, hostNames []string) error {
hosts, err := gce.getInstancesByNames(hostNames)
if err != nil {
return err
}

loadBalancerName := cloudprovider.GetLoadBalancerName(service)
pool, err := gce.service.TargetPools.Get(gce.projectID, gce.region, loadBalancerName).Do()
if err != nil {
return err
}
// computeUpdate takes the existing TargetPool and the set of running
// instances and returns (toAdd, toRemove), the set of instances to
// reprogram on the TargetPool this reconcile. max restricts the
// number of nodes allowed to be programmed on the TargetPool.
func computeUpdate(tp *compute.TargetPool, instances []*gceInstance, max int) ([]*compute.InstanceReference, []*compute.InstanceReference) {
existing := sets.NewString()
for _, instance := range pool.Instances {
for _, instance := range tp.Instances {
existing.Insert(hostURLToComparablePath(instance))
}

var toAdd []*compute.InstanceReference
var toRemove []*compute.InstanceReference
for _, host := range hosts {
for _, host := range instances {
link := host.makeComparableHostPath()
if !existing.Has(link) {
toAdd = append(toAdd, &compute.InstanceReference{Instance: link})
Expand All @@ -1105,9 +1118,36 @@ func (gce *GCECloud) UpdateLoadBalancer(service *api.Service, hostNames []string
toRemove = append(toRemove, &compute.InstanceReference{Instance: link})
}

if len(toAdd) > 0 {
add := &compute.TargetPoolsAddInstanceRequest{Instances: toAdd}
op, err := gce.service.TargetPools.AddInstance(gce.projectID, gce.region, loadBalancerName, add).Do()
if len(tp.Instances)+len(toAdd)-len(toRemove) > max {
// TODO(zmerlynn): In theory, there are faster ways to handle
// this if room is much smaller than len(toAdd). In practice,
// meh.
room := max - len(tp.Instances) + len(toRemove)
glog.Infof("TargetPool maximums exceeded, shuffling in %d instances", room)
rand.Shuffle(instRefSlice(toAdd))
toAdd = toAdd[:room]
}

return toAdd, toRemove
}

// UpdateLoadBalancer is an implementation of LoadBalancer.UpdateLoadBalancer.
func (gce *GCECloud) UpdateLoadBalancer(service *api.Service, hostNames []string) error {
hosts, err := gce.getInstancesByNames(hostNames)
if err != nil {
return err
}

loadBalancerName := cloudprovider.GetLoadBalancerName(service)
pool, err := gce.service.TargetPools.Get(gce.projectID, gce.region, loadBalancerName).Do()
if err != nil {
return err
}

toAdd, toRemove := computeUpdate(pool, hosts, maxInstancesPerTargetPool)
if len(toRemove) > 0 {
rm := &compute.TargetPoolsRemoveInstanceRequest{Instances: toRemove}
op, err := gce.service.TargetPools.RemoveInstance(gce.projectID, gce.region, loadBalancerName, rm).Do()
if err != nil {
return err
}
Expand All @@ -1116,9 +1156,9 @@ func (gce *GCECloud) UpdateLoadBalancer(service *api.Service, hostNames []string
}
}

if len(toRemove) > 0 {
rm := &compute.TargetPoolsRemoveInstanceRequest{Instances: toRemove}
op, err := gce.service.TargetPools.RemoveInstance(gce.projectID, gce.region, loadBalancerName, rm).Do()
if len(toAdd) > 0 {
add := &compute.TargetPoolsAddInstanceRequest{Instances: toAdd}
op, err := gce.service.TargetPools.AddInstance(gce.projectID, gce.region, loadBalancerName, add).Do()
if err != nil {
return err
}
Expand All @@ -1134,10 +1174,14 @@ func (gce *GCECloud) UpdateLoadBalancer(service *api.Service, hostNames []string
if err != nil {
return err
}
if len(updatedPool.Instances) != len(hosts) {
wantInstances := len(hosts)
if wantInstances > maxInstancesPerTargetPool {
wantInstances = maxInstancesPerTargetPool
}
if len(updatedPool.Instances) != wantInstances {
glog.Errorf("Unexpected number of instances (%d) in target pool %s after updating (expected %d). Instances in updated pool: %s",
len(updatedPool.Instances), loadBalancerName, len(hosts), strings.Join(updatedPool.Instances, ","))
return fmt.Errorf("Unexpected number of instances (%d) in target pool %s after update (expected %d)", len(updatedPool.Instances), loadBalancerName, len(hosts))
len(updatedPool.Instances), loadBalancerName, wantInstances, strings.Join(updatedPool.Instances, ","))
return fmt.Errorf("Unexpected number of instances (%d) in target pool %s after update (expected %d)", len(updatedPool.Instances), loadBalancerName, wantInstances)
}
return nil
}
Expand Down
112 changes: 112 additions & 0 deletions pkg/cloudprovider/providers/gce/gce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ package gce
import (
"reflect"
"testing"

compute "google.golang.org/api/compute/v1"
"k8s.io/kubernetes/pkg/util/rand"
)

func TestGetRegion(t *testing.T) {
Expand Down Expand Up @@ -148,3 +151,112 @@ func TestScrubDNS(t *testing.T) {
}
}
}

func TestRestrictTargetPool(t *testing.T) {
const maxInstances = 5
tests := []struct {
instances []string
want []string
}{
{
instances: []string{"1", "2", "3", "4", "5"},
want: []string{"1", "2", "3", "4", "5"},
},
{
instances: []string{"1", "2", "3", "4", "5", "6"},
want: []string{"4", "3", "5", "2", "6"},
},
}
for _, tc := range tests {
rand.Seed(5)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment on this seed value and the "want" values above?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. I used prose to comment the test blocks, hopefully that was good.

got := restrictTargetPool(append([]string{}, tc.instances...), maxInstances)
if !reflect.DeepEqual(got, tc.want) {
t.Errorf("restrictTargetPool(%v) => %v, want %v", tc.instances, got, tc.want)
}
}
}

func TestComputeUpdate(t *testing.T) {
const maxInstances = 5
const fakeZone = "us-moon1-f"
tests := []struct {
tp []string
instances []string
wantToAdd []string
wantToRemove []string
}{
{
// Test adding all instances.
tp: []string{},
instances: []string{"0", "1", "2"},
wantToAdd: []string{"0", "1", "2"},
wantToRemove: []string{},
},
{
// Test node 1 coming back healthy.
tp: []string{"0", "2"},
instances: []string{"0", "1", "2"},
wantToAdd: []string{"1"},
wantToRemove: []string{},
},
{
// Test node 1 going healthy while node 4 needs to be removed.
tp: []string{"0", "2", "4"},
instances: []string{"0", "1", "2"},
wantToAdd: []string{"1"},
wantToRemove: []string{"4"},
},
{
// Test exceeding the TargetPool max of 5 (for the test),
// which shuffles in 7, 5, 8 based on the deterministic
// seed below.
tp: []string{"0", "2", "4", "6"},
instances: []string{"0", "1", "2", "3", "5", "7", "8"},
wantToAdd: []string{"7", "5", "8"},
wantToRemove: []string{"4", "6"},
},
{
// Test all nodes getting removed.
tp: []string{"0", "1", "2", "3"},
instances: []string{},
wantToAdd: []string{},
wantToRemove: []string{"0", "1", "2", "3"},
},
}
for _, tc := range tests {
rand.Seed(5) // Arbitrary RNG seed for deterministic testing.

// Dummy up the gceInstance slice.
var instances []*gceInstance
for _, inst := range tc.instances {
instances = append(instances, &gceInstance{Name: inst, Zone: fakeZone})
}
// Dummy up the TargetPool URL list.
var urls []string
for _, inst := range tc.tp {
inst := &gceInstance{Name: inst, Zone: fakeZone}
urls = append(urls, inst.makeComparableHostPath())
}
gotAddInsts, gotRem := computeUpdate(&compute.TargetPool{Instances: urls}, instances, maxInstances)
var wantAdd []string
for _, inst := range tc.wantToAdd {
inst := &gceInstance{Name: inst, Zone: fakeZone}
wantAdd = append(wantAdd, inst.makeComparableHostPath())
}
var gotAdd []string
for _, inst := range gotAddInsts {
gotAdd = append(gotAdd, inst.Instance)
}
if !reflect.DeepEqual(wantAdd, gotAdd) {
t.Errorf("computeTargetPool(%v, %v) => added %v, wanted %v", tc.tp, tc.instances, gotAdd, wantAdd)
}
_ = gotRem
// var gotRem []string
// for _, inst := range gotRemInsts {
// gotRem = append(gotRem, inst.Instance)
// }
// if !reflect.DeepEqual(tc.wantToRemove, gotRem) {
// t.Errorf("computeTargetPool(%v, %v) => removed %v, wanted %v", tc.tp, tc.instances, gotRem, tc.wantToRemove)
// }
}
}
16 changes: 16 additions & 0 deletions pkg/util/rand/rand.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,19 @@ func String(length int) string {
}
return string(b)
}

// A type that satisfies the rand.Shufflable interface can be shuffled
// by Shuffle. Any sort.Interface will satisfy this interface.
type Shufflable interface {
Len() int
Swap(i, j int)
}

func Shuffle(data Shufflable) {
rng.Lock()
defer rng.Unlock()
for i := 0; i < data.Len(); i++ {
j := rng.rand.Intn(i + 1)
data.Swap(i, j)
}
}
13 changes: 13 additions & 0 deletions pkg/util/rand/rand_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package rand

import (
"math/rand"
"reflect"
"sort"
"strings"
"testing"
)
Expand Down Expand Up @@ -71,3 +73,14 @@ func TestPerm(t *testing.T) {
}
}
}

func TestShuffle(t *testing.T) {
Seed(5) // Arbitrary RNG seed for deterministic testing.
have := []int{0, 1, 2, 3, 4}
want := []int{3, 2, 4, 1, 0} // "have" shuffled, with RNG at Seed(5).
got := append([]int{}, have...)
Shuffle(sort.IntSlice(got))
if !reflect.DeepEqual(got, want) {
t.Errorf("Shuffle(%v) => %v, want %v", have, got, want)
}
}