diff --git a/pkg/cloudprovider/cloud.go b/pkg/cloudprovider/cloud.go index 1db9e84670ff8..5ba83617940ab 100644 --- a/pkg/cloudprovider/cloud.go +++ b/pkg/cloudprovider/cloud.go @@ -48,7 +48,7 @@ type TCPLoadBalancer interface { // TODO: Break this up into different interfaces (LB, etc) when we have more than one type of service TCPLoadBalancerExists(name, region string) (bool, error) // CreateTCPLoadBalancer creates a new tcp load balancer. Returns the IP address of the balancer - CreateTCPLoadBalancer(name, region string, externalIP net.IP, port int, hosts []string) (net.IP, error) + CreateTCPLoadBalancer(name, region string, externalIP net.IP, port int, hosts []string, affinityType api.AffinityType) (net.IP, error) // UpdateTCPLoadBalancer updates hosts under the specified load balancer. UpdateTCPLoadBalancer(name, region string, hosts []string) error // DeleteTCPLoadBalancer deletes a specified load balancer. diff --git a/pkg/cloudprovider/fake/fake.go b/pkg/cloudprovider/fake/fake.go index 1233c8075577e..4efc6d82d7800 100644 --- a/pkg/cloudprovider/fake/fake.go +++ b/pkg/cloudprovider/fake/fake.go @@ -84,7 +84,7 @@ func (f *FakeCloud) TCPLoadBalancerExists(name, region string) (bool, error) { // CreateTCPLoadBalancer is a test-spy implementation of TCPLoadBalancer.CreateTCPLoadBalancer. // It adds an entry "create" into the internal method call record. -func (f *FakeCloud) CreateTCPLoadBalancer(name, region string, externalIP net.IP, port int, hosts []string) (net.IP, error) { +func (f *FakeCloud) CreateTCPLoadBalancer(name, region string, externalIP net.IP, port int, hosts []string, affinityType api.AffinityType) (net.IP, error) { f.addCall("create") return f.ExternalIP, f.Err } diff --git a/pkg/cloudprovider/gce/gce.go b/pkg/cloudprovider/gce/gce.go index 469a9878c34c7..991b49186bde6 100644 --- a/pkg/cloudprovider/gce/gce.go +++ b/pkg/cloudprovider/gce/gce.go @@ -155,14 +155,27 @@ func makeHostLink(projectID, zone, host string) string { projectID, zone, host) } -func (gce *GCECloud) makeTargetPool(name, region string, hosts []string) (string, error) { +// Session Affinity Type string +type GCEAffinityType string + +const ( + // AffinityTypeNone - no session affinity. + GCEAffinityTypeNone GCEAffinityType = "None" + // AffinityTypeClientIP is the Client IP based. + GCEAffinityTypeClientIP GCEAffinityType = "CLIENT_IP" + // AffinityTypeClientIP is the Client IP based. + GCEAffinityTypeClientIPProto GCEAffinityType = "CLIENT_IP_PROTO" +) + +func (gce *GCECloud) makeTargetPool(name, region string, hosts []string, affinityType GCEAffinityType) (string, error) { var instances []string for _, host := range hosts { instances = append(instances, makeHostLink(gce.projectID, gce.zone, host)) } pool := &compute.TargetPool{ - Name: name, - Instances: instances, + Name: name, + Instances: instances, + SessionAffinity: string(affinityType), } _, err := gce.service.TargetPools.Insert(gce.projectID, region, pool).Do() if err != nil { @@ -191,9 +204,22 @@ func (gce *GCECloud) TCPLoadBalancerExists(name, region string) (bool, error) { return false, err } +//translate from what K8s supports to what the cloud provider supports for session affinity. +func translateAffinityType(affinityType api.AffinityType) GCEAffinityType { + switch affinityType { + case api.AffinityTypeClientIP: + return GCEAffinityTypeClientIP + case api.AffinityTypeNone: + return GCEAffinityTypeNone + default: + glog.Errorf("unexpected affinity type: %v", affinityType) + return GCEAffinityTypeNone + } +} + // CreateTCPLoadBalancer is an implementation of TCPLoadBalancer.CreateTCPLoadBalancer. -func (gce *GCECloud) CreateTCPLoadBalancer(name, region string, externalIP net.IP, port int, hosts []string) (net.IP, error) { - pool, err := gce.makeTargetPool(name, region, hosts) +func (gce *GCECloud) CreateTCPLoadBalancer(name, region string, externalIP net.IP, port int, hosts []string, affinityType api.AffinityType) (net.IP, error) { + pool, err := gce.makeTargetPool(name, region, hosts, translateAffinityType(affinityType)) if err != nil { return nil, err } diff --git a/pkg/proxy/loadbalancer.go b/pkg/proxy/loadbalancer.go index 6faca93c05003..3ab3c1379f291 100644 --- a/pkg/proxy/loadbalancer.go +++ b/pkg/proxy/loadbalancer.go @@ -17,6 +17,7 @@ limitations under the License. package proxy import ( + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "net" ) @@ -25,4 +26,6 @@ type LoadBalancer interface { // NextEndpoint returns the endpoint to handle a request for the given // service and source address. NextEndpoint(service string, srcAddr net.Addr) (string, error) + NewService(service string, sessionAffinityType api.AffinityType, stickyMaxAgeMinutes int) error + CleanupStaleStickySessions(service string) } diff --git a/pkg/proxy/proxier.go b/pkg/proxy/proxier.go index 80d293522d960..bfc723d6501ec 100644 --- a/pkg/proxy/proxier.go +++ b/pkg/proxy/proxier.go @@ -39,7 +39,9 @@ type serviceInfo struct { socket proxySocket timeout time.Duration // TODO: make this an net.IP address - publicIP []string + publicIP []string + sessionAffinityType api.AffinityType + stickyMaxAgeMinutes int } // How long we wait for a connection to a backend in seconds @@ -341,6 +343,7 @@ func (proxier *Proxier) SyncLoop() { glog.Errorf("Failed to ensure iptables: %v", err) } proxier.ensurePortals() + proxier.cleanupStaleStickySessions() } } } @@ -358,6 +361,15 @@ func (proxier *Proxier) ensurePortals() { } } +// clean up any stale sticky session records in the hash map. +func (proxier *Proxier) cleanupStaleStickySessions() { + for name, info := range proxier.serviceMap { + if info.sessionAffinityType != api.AffinityTypeNone { + proxier.loadBalancer.CleanupStaleStickySessions(name) + } + } +} + // This assumes proxier.mu is not locked. func (proxier *Proxier) stopProxy(service string, info *serviceInfo) error { proxier.mu.Lock() @@ -403,10 +415,12 @@ func (proxier *Proxier) addServiceOnPort(service string, protocol api.Protocol, return nil, err } si := &serviceInfo{ - proxyPort: portNum, - protocol: protocol, - socket: sock, - timeout: timeout, + proxyPort: portNum, + protocol: protocol, + socket: sock, + timeout: timeout, + sessionAffinityType: api.AffinityTypeNone, + stickyMaxAgeMinutes: 180, } proxier.setServiceInfo(service, si) @@ -456,10 +470,19 @@ func (proxier *Proxier) OnUpdate(services []api.Service) { info.portalIP = serviceIP info.portalPort = service.Spec.Port info.publicIP = service.Spec.PublicIPs + + if service.Spec.SessionAffinity != nil { + info.sessionAffinityType = *service.Spec.SessionAffinity + // TODO: paramaterize this in the types api file as an attribute of sticky session. For now it's hardcoded to 3 hours. + info.stickyMaxAgeMinutes = 180 + } + glog.V(4).Infof("info: %+v", info) + err = proxier.openPortal(service.Name, info) if err != nil { glog.Errorf("Failed to open portal for %q: %v", service.Name, err) } + proxier.loadBalancer.NewService(service.Name, info.sessionAffinityType, info.stickyMaxAgeMinutes) } proxier.mu.Lock() defer proxier.mu.Unlock() diff --git a/pkg/proxy/roundrobin.go b/pkg/proxy/roundrobin.go index a2db561627555..0e50b0879b9d9 100644 --- a/pkg/proxy/roundrobin.go +++ b/pkg/proxy/roundrobin.go @@ -22,6 +22,7 @@ import ( "reflect" "strconv" "sync" + "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/golang/glog" @@ -32,27 +33,79 @@ var ( ErrMissingEndpoints = errors.New("missing endpoints") ) +type sessionAffinityDetail struct { + clientIPAddress string + //clientProtocol api.Protocol //not yet used + //sessionCookie string //not yet used + endpoint string + lastUsedDTTM time.Time +} + +type serviceDetail struct { + name string + sessionAffinityType api.AffinityType + sessionAffinityMap map[string]*sessionAffinityDetail + stickyMaxAgeMinutes int +} + // LoadBalancerRR is a round-robin load balancer. type LoadBalancerRR struct { - lock sync.RWMutex - endpointsMap map[string][]string - rrIndex map[string]int + lock sync.RWMutex + endpointsMap map[string][]string + rrIndex map[string]int + serviceDtlMap map[string]serviceDetail +} + +func newServiceDetail(service string, sessionAffinityType api.AffinityType, stickyMaxAgeMinutes int) *serviceDetail { + return &serviceDetail{ + name: service, + sessionAffinityType: sessionAffinityType, + sessionAffinityMap: make(map[string]*sessionAffinityDetail), + stickyMaxAgeMinutes: stickyMaxAgeMinutes, + } } // NewLoadBalancerRR returns a new LoadBalancerRR. func NewLoadBalancerRR() *LoadBalancerRR { return &LoadBalancerRR{ - endpointsMap: make(map[string][]string), - rrIndex: make(map[string]int), + endpointsMap: make(map[string][]string), + rrIndex: make(map[string]int), + serviceDtlMap: make(map[string]serviceDetail), + } +} + +func (lb *LoadBalancerRR) NewService(service string, sessionAffinityType api.AffinityType, stickyMaxAgeMinutes int) error { + if stickyMaxAgeMinutes == 0 { + stickyMaxAgeMinutes = 180 //default to 3 hours if not specified. Should 0 be unlimeted instead???? } + if _, exists := lb.serviceDtlMap[service]; !exists { + lb.serviceDtlMap[service] = *newServiceDetail(service, sessionAffinityType, stickyMaxAgeMinutes) + glog.V(4).Infof("NewService. Service does not exist. So I created it: %+v", lb.serviceDtlMap[service]) + } + return nil +} + +// return true if this service detail is using some form of session affinity. +func isSessionAffinity(serviceDtl serviceDetail) bool { + //Should never be empty string, but chekcing for it to be safe. + if serviceDtl.sessionAffinityType == "" || serviceDtl.sessionAffinityType == api.AffinityTypeNone { + return false + } + return true } // NextEndpoint returns a service endpoint. // The service endpoint is chosen using the round-robin algorithm. func (lb *LoadBalancerRR) NextEndpoint(service string, srcAddr net.Addr) (string, error) { + var ipaddr string + glog.V(4).Infof("NextEndpoint. service: %s. srcAddr: %+v. Endpoints: %+v", service, srcAddr, lb.endpointsMap) + lb.lock.RLock() - endpoints, exists := lb.endpointsMap[service] + serviceDtls, exists := lb.serviceDtlMap[service] + endpoints, _ := lb.endpointsMap[service] index := lb.rrIndex[service] + sessionAffinityEnabled := isSessionAffinity(serviceDtls) + lb.lock.RUnlock() if !exists { return "", ErrMissingServiceEntry @@ -60,9 +113,37 @@ func (lb *LoadBalancerRR) NextEndpoint(service string, srcAddr net.Addr) (string if len(endpoints) == 0 { return "", ErrMissingEndpoints } + if sessionAffinityEnabled { + if _, _, err := net.SplitHostPort(srcAddr.String()); err == nil { + ipaddr, _, _ = net.SplitHostPort(srcAddr.String()) + } + sessionAffinity, exists := serviceDtls.sessionAffinityMap[ipaddr] + glog.V(4).Infof("NextEndpoint. Key: %s. sessionAffinity: %+v", ipaddr, sessionAffinity) + if exists && int(time.Now().Sub(sessionAffinity.lastUsedDTTM).Minutes()) < serviceDtls.stickyMaxAgeMinutes { + endpoint := sessionAffinity.endpoint + sessionAffinity.lastUsedDTTM = time.Now() + glog.V(4).Infof("NextEndpoint. Key: %s. sessionAffinity: %+v", ipaddr, sessionAffinity) + return endpoint, nil + } + } endpoint := endpoints[index] lb.lock.Lock() lb.rrIndex[service] = (index + 1) % len(endpoints) + + if sessionAffinityEnabled { + var affinity *sessionAffinityDetail + affinity, _ = lb.serviceDtlMap[service].sessionAffinityMap[ipaddr] + if affinity == nil { + affinity = new(sessionAffinityDetail) //&sessionAffinityDetail{ipaddr, "TCP", "", endpoint, time.Now()} + lb.serviceDtlMap[service].sessionAffinityMap[ipaddr] = affinity + } + affinity.lastUsedDTTM = time.Now() + affinity.endpoint = endpoint + affinity.clientIPAddress = ipaddr + + glog.V(4).Infof("NextEndpoint. New Affinity key %s: %+v", ipaddr, lb.serviceDtlMap[service].sessionAffinityMap[ipaddr]) + } + lb.lock.Unlock() return endpoint, nil } @@ -89,6 +170,35 @@ func filterValidEndpoints(endpoints []string) []string { return result } +//remove any session affinity records associated to a particular endpoint (for example when a pod goes down). +func removeSessionAffinityByEndpoint(lb *LoadBalancerRR, service string, endpoint string) { + for _, affinityDetail := range lb.serviceDtlMap[service].sessionAffinityMap { + if affinityDetail.endpoint == endpoint { + glog.V(4).Infof("Removing client: %s from sessionAffinityMap for service: %s", affinityDetail.endpoint, service) + delete(lb.serviceDtlMap[service].sessionAffinityMap, affinityDetail.clientIPAddress) + } + } +} + +//Loop through the valid endpoints and then the endpoints associated with the Load Balancer. +// Then remove any session affinity records that are not in both lists. +func updateServiceDetailMap(lb *LoadBalancerRR, service string, validEndpoints []string) { + allEndpoints := map[string]int{} + for _, validEndpoint := range validEndpoints { + allEndpoints[validEndpoint] = 1 + } + for _, existingEndpoint := range lb.endpointsMap[service] { + allEndpoints[existingEndpoint] = allEndpoints[existingEndpoint] + 1 + } + for mKey, mVal := range allEndpoints { + if mVal == 1 { + glog.V(3).Infof("Delete endpoint %s for service: %s", mKey, service) + removeSessionAffinityByEndpoint(lb, service, mKey) + delete(lb.serviceDtlMap[service].sessionAffinityMap, mKey) + } + } +} + // OnUpdate manages the registered service endpoints. // Registered endpoints are updated if found in the update set or // unregistered if missing from the update set. @@ -102,7 +212,13 @@ func (lb *LoadBalancerRR) OnUpdate(endpoints []api.Endpoints) { validEndpoints := filterValidEndpoints(endpoint.Endpoints) if !exists || !reflect.DeepEqual(existingEndpoints, validEndpoints) { glog.V(3).Infof("LoadBalancerRR: Setting endpoints for %s to %+v", endpoint.Name, endpoint.Endpoints) + updateServiceDetailMap(lb, endpoint.Name, validEndpoints) + // On update can be called without NewService being called externally. + // to be safe we will call it here. A new service will only be created + // if one does not already exist. + lb.NewService(endpoint.Name, api.AffinityTypeNone, 0) lb.endpointsMap[endpoint.Name] = validEndpoints + // Reset the round-robin index. lb.rrIndex[endpoint.Name] = 0 } @@ -113,6 +229,17 @@ func (lb *LoadBalancerRR) OnUpdate(endpoints []api.Endpoints) { if _, exists := registeredEndpoints[k]; !exists { glog.V(3).Infof("LoadBalancerRR: Removing endpoints for %s -> %+v", k, v) delete(lb.endpointsMap, k) + delete(lb.serviceDtlMap, k) + } + } +} + +func (lb *LoadBalancerRR) CleanupStaleStickySessions(service string) { + stickyMaxAgeMinutes := lb.serviceDtlMap[service].stickyMaxAgeMinutes + for key, affinityDetail := range lb.serviceDtlMap[service].sessionAffinityMap { + if int(time.Now().Sub(affinityDetail.lastUsedDTTM).Minutes()) >= stickyMaxAgeMinutes { + glog.V(4).Infof("Removing client: %s from sessionAffinityMap for service: %s. Last used is greater than %d minutes....", affinityDetail.clientIPAddress, service, stickyMaxAgeMinutes) + delete(lb.serviceDtlMap[service].sessionAffinityMap, key) } } } diff --git a/pkg/proxy/roundrobin_test.go b/pkg/proxy/roundrobin_test.go index 4fa68f980d837..bb9f54b030fcb 100644 --- a/pkg/proxy/roundrobin_test.go +++ b/pkg/proxy/roundrobin_test.go @@ -17,6 +17,7 @@ limitations under the License. package proxy import ( + "net" "testing" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -68,8 +69,8 @@ func TestLoadBalanceFailsWithNoEndpoints(t *testing.T) { } } -func expectEndpoint(t *testing.T, loadBalancer *LoadBalancerRR, service string, expected string) { - endpoint, err := loadBalancer.NextEndpoint(service, nil) +func expectEndpoint(t *testing.T, loadBalancer *LoadBalancerRR, service string, expected string, netaddr net.Addr) { + endpoint, err := loadBalancer.NextEndpoint(service, netaddr) if err != nil { t.Errorf("Didn't find a service for %s, expected %s, failed with: %v", service, expected, err) } @@ -90,10 +91,10 @@ func TestLoadBalanceWorksWithSingleEndpoint(t *testing.T) { Endpoints: []string{"endpoint1:40"}, } loadBalancer.OnUpdate(endpoints) - expectEndpoint(t, loadBalancer, "foo", "endpoint1:40") - expectEndpoint(t, loadBalancer, "foo", "endpoint1:40") - expectEndpoint(t, loadBalancer, "foo", "endpoint1:40") - expectEndpoint(t, loadBalancer, "foo", "endpoint1:40") + expectEndpoint(t, loadBalancer, "foo", "endpoint1:40", nil) + expectEndpoint(t, loadBalancer, "foo", "endpoint1:40", nil) + expectEndpoint(t, loadBalancer, "foo", "endpoint1:40", nil) + expectEndpoint(t, loadBalancer, "foo", "endpoint1:40", nil) } func TestLoadBalanceWorksWithMultipleEndpoints(t *testing.T) { @@ -108,10 +109,10 @@ func TestLoadBalanceWorksWithMultipleEndpoints(t *testing.T) { Endpoints: []string{"endpoint:1", "endpoint:2", "endpoint:3"}, } loadBalancer.OnUpdate(endpoints) - expectEndpoint(t, loadBalancer, "foo", "endpoint:1") - expectEndpoint(t, loadBalancer, "foo", "endpoint:2") - expectEndpoint(t, loadBalancer, "foo", "endpoint:3") - expectEndpoint(t, loadBalancer, "foo", "endpoint:1") + expectEndpoint(t, loadBalancer, "foo", "endpoint:1", nil) + expectEndpoint(t, loadBalancer, "foo", "endpoint:2", nil) + expectEndpoint(t, loadBalancer, "foo", "endpoint:3", nil) + expectEndpoint(t, loadBalancer, "foo", "endpoint:1", nil) } func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { @@ -126,21 +127,21 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { Endpoints: []string{"endpoint:1", "endpoint:2", "endpoint:3"}, } loadBalancer.OnUpdate(endpoints) - expectEndpoint(t, loadBalancer, "foo", "endpoint:1") - expectEndpoint(t, loadBalancer, "foo", "endpoint:2") - expectEndpoint(t, loadBalancer, "foo", "endpoint:3") - expectEndpoint(t, loadBalancer, "foo", "endpoint:1") - expectEndpoint(t, loadBalancer, "foo", "endpoint:2") + expectEndpoint(t, loadBalancer, "foo", "endpoint:1", nil) + expectEndpoint(t, loadBalancer, "foo", "endpoint:2", nil) + expectEndpoint(t, loadBalancer, "foo", "endpoint:3", nil) + expectEndpoint(t, loadBalancer, "foo", "endpoint:1", nil) + expectEndpoint(t, loadBalancer, "foo", "endpoint:2", nil) // Then update the configuration with one fewer endpoints, make sure // we start in the beginning again endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []string{"endpoint:8", "endpoint:9"}, } loadBalancer.OnUpdate(endpoints) - expectEndpoint(t, loadBalancer, "foo", "endpoint:8") - expectEndpoint(t, loadBalancer, "foo", "endpoint:9") - expectEndpoint(t, loadBalancer, "foo", "endpoint:8") - expectEndpoint(t, loadBalancer, "foo", "endpoint:9") + expectEndpoint(t, loadBalancer, "foo", "endpoint:8", nil) + expectEndpoint(t, loadBalancer, "foo", "endpoint:9", nil) + expectEndpoint(t, loadBalancer, "foo", "endpoint:8", nil) + expectEndpoint(t, loadBalancer, "foo", "endpoint:9", nil) // Clear endpoints endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []string{}} loadBalancer.OnUpdate(endpoints) @@ -167,17 +168,17 @@ func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) { Endpoints: []string{"endpoint:4", "endpoint:5"}, } loadBalancer.OnUpdate(endpoints) - expectEndpoint(t, loadBalancer, "foo", "endpoint:1") - expectEndpoint(t, loadBalancer, "foo", "endpoint:2") - expectEndpoint(t, loadBalancer, "foo", "endpoint:3") - expectEndpoint(t, loadBalancer, "foo", "endpoint:1") - expectEndpoint(t, loadBalancer, "foo", "endpoint:2") + expectEndpoint(t, loadBalancer, "foo", "endpoint:1", nil) + expectEndpoint(t, loadBalancer, "foo", "endpoint:2", nil) + expectEndpoint(t, loadBalancer, "foo", "endpoint:3", nil) + expectEndpoint(t, loadBalancer, "foo", "endpoint:1", nil) + expectEndpoint(t, loadBalancer, "foo", "endpoint:2", nil) - expectEndpoint(t, loadBalancer, "bar", "endpoint:4") - expectEndpoint(t, loadBalancer, "bar", "endpoint:5") - expectEndpoint(t, loadBalancer, "bar", "endpoint:4") - expectEndpoint(t, loadBalancer, "bar", "endpoint:5") - expectEndpoint(t, loadBalancer, "bar", "endpoint:4") + expectEndpoint(t, loadBalancer, "bar", "endpoint:4", nil) + expectEndpoint(t, loadBalancer, "bar", "endpoint:5", nil) + expectEndpoint(t, loadBalancer, "bar", "endpoint:4", nil) + expectEndpoint(t, loadBalancer, "bar", "endpoint:5", nil) + expectEndpoint(t, loadBalancer, "bar", "endpoint:4", nil) // Then update the configuration by removing foo loadBalancer.OnUpdate(endpoints[1:]) @@ -187,8 +188,228 @@ func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) { } // but bar is still there, and we continue RR from where we left off. - expectEndpoint(t, loadBalancer, "bar", "endpoint:5") - expectEndpoint(t, loadBalancer, "bar", "endpoint:4") - expectEndpoint(t, loadBalancer, "bar", "endpoint:5") - expectEndpoint(t, loadBalancer, "bar", "endpoint:4") + expectEndpoint(t, loadBalancer, "bar", "endpoint:5", nil) + expectEndpoint(t, loadBalancer, "bar", "endpoint:4", nil) + expectEndpoint(t, loadBalancer, "bar", "endpoint:5", nil) + expectEndpoint(t, loadBalancer, "bar", "endpoint:4", nil) +} + +func TestStickyLoadBalanceWorksWithSingleEndpoint(t *testing.T) { + client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0} + client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0} + loadBalancer := NewLoadBalancerRR() + endpoint, err := loadBalancer.NextEndpoint("foo", nil) + if err == nil || len(endpoint) != 0 { + t.Errorf("Didn't fail with non-existent service") + } + loadBalancer.NewService("foo", api.AffinityTypeClientIP, 0) + endpoints := make([]api.Endpoints, 1) + endpoints[0] = api.Endpoints{ + ObjectMeta: api.ObjectMeta{Name: "foo"}, + Endpoints: []string{"endpoint:1"}, + } + loadBalancer.OnUpdate(endpoints) + expectEndpoint(t, loadBalancer, "foo", "endpoint:1", client1) + expectEndpoint(t, loadBalancer, "foo", "endpoint:1", client1) + expectEndpoint(t, loadBalancer, "foo", "endpoint:1", client2) + expectEndpoint(t, loadBalancer, "foo", "endpoint:1", client2) +} + +func TestStickyLoadBalanaceWorksWithMultipleEndpoints(t *testing.T) { + client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0} + client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0} + client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0} + loadBalancer := NewLoadBalancerRR() + endpoint, err := loadBalancer.NextEndpoint("foo", nil) + if err == nil || len(endpoint) != 0 { + t.Errorf("Didn't fail with non-existent service") + } + + loadBalancer.NewService("foo", api.AffinityTypeClientIP, 0) + endpoints := make([]api.Endpoints, 1) + endpoints[0] = api.Endpoints{ + ObjectMeta: api.ObjectMeta{Name: "foo"}, + Endpoints: []string{"endpoint:1", "endpoint:2", "endpoint:3"}, + } + loadBalancer.OnUpdate(endpoints) + expectEndpoint(t, loadBalancer, "foo", "endpoint:1", client1) + expectEndpoint(t, loadBalancer, "foo", "endpoint:1", client1) + expectEndpoint(t, loadBalancer, "foo", "endpoint:2", client2) + expectEndpoint(t, loadBalancer, "foo", "endpoint:2", client2) + expectEndpoint(t, loadBalancer, "foo", "endpoint:3", client3) + expectEndpoint(t, loadBalancer, "foo", "endpoint:3", client3) + expectEndpoint(t, loadBalancer, "foo", "endpoint:1", client1) + expectEndpoint(t, loadBalancer, "foo", "endpoint:1", client1) +} + +func TestStickyLoadBalanaceWorksWithMultipleEndpointsStickyNone(t *testing.T) { + client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0} + client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0} + client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0} + loadBalancer := NewLoadBalancerRR() + endpoint, err := loadBalancer.NextEndpoint("foo", nil) + if err == nil || len(endpoint) != 0 { + t.Errorf("Didn't fail with non-existent service") + } + + loadBalancer.NewService("foo", api.AffinityTypeNone, 0) + endpoints := make([]api.Endpoints, 1) + endpoints[0] = api.Endpoints{ + ObjectMeta: api.ObjectMeta{Name: "foo"}, + Endpoints: []string{"endpoint:1", "endpoint:2", "endpoint:3"}, + } + loadBalancer.OnUpdate(endpoints) + expectEndpoint(t, loadBalancer, "foo", "endpoint:1", client1) + expectEndpoint(t, loadBalancer, "foo", "endpoint:2", client1) + expectEndpoint(t, loadBalancer, "foo", "endpoint:3", client2) + expectEndpoint(t, loadBalancer, "foo", "endpoint:1", client2) + expectEndpoint(t, loadBalancer, "foo", "endpoint:2", client3) + expectEndpoint(t, loadBalancer, "foo", "endpoint:3", client3) + expectEndpoint(t, loadBalancer, "foo", "endpoint:1", client1) + expectEndpoint(t, loadBalancer, "foo", "endpoint:2", client1) +} + +func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) { + client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0} + client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0} + client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0} + client4 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 4), Port: 0} + client5 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 5), Port: 0} + client6 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 6), Port: 0} + loadBalancer := NewLoadBalancerRR() + endpoint, err := loadBalancer.NextEndpoint("foo", nil) + if err == nil || len(endpoint) != 0 { + t.Errorf("Didn't fail with non-existent service") + } + + loadBalancer.NewService("foo", api.AffinityTypeClientIP, 0) + endpoints := make([]api.Endpoints, 1) + endpoints[0] = api.Endpoints{ + ObjectMeta: api.ObjectMeta{Name: "foo"}, + Endpoints: []string{"endpoint:1", "endpoint:2", "endpoint:3"}, + } + loadBalancer.OnUpdate(endpoints) + expectEndpoint(t, loadBalancer, "foo", "endpoint:1", client1) + expectEndpoint(t, loadBalancer, "foo", "endpoint:1", client1) + expectEndpoint(t, loadBalancer, "foo", "endpoint:2", client2) + expectEndpoint(t, loadBalancer, "foo", "endpoint:2", client2) + expectEndpoint(t, loadBalancer, "foo", "endpoint:3", client3) + + endpoints[0] = api.Endpoints{ + ObjectMeta: api.ObjectMeta{Name: "foo"}, + Endpoints: []string{"endpoint:1", "endpoint:2"}, + } + loadBalancer.OnUpdate(endpoints) + expectEndpoint(t, loadBalancer, "foo", "endpoint:1", client1) + expectEndpoint(t, loadBalancer, "foo", "endpoint:2", client2) + expectEndpoint(t, loadBalancer, "foo", "endpoint:1", client3) + + endpoints[0] = api.Endpoints{ + ObjectMeta: api.ObjectMeta{Name: "foo"}, + Endpoints: []string{"endpoint:1", "endpoint:2", "endpoint:4"}, + } + loadBalancer.OnUpdate(endpoints) + expectEndpoint(t, loadBalancer, "foo", "endpoint:1", client1) + expectEndpoint(t, loadBalancer, "foo", "endpoint:2", client2) + expectEndpoint(t, loadBalancer, "foo", "endpoint:1", client3) + expectEndpoint(t, loadBalancer, "foo", "endpoint:1", client4) + expectEndpoint(t, loadBalancer, "foo", "endpoint:2", client5) + expectEndpoint(t, loadBalancer, "foo", "endpoint:4", client6) +} + +func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { + client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0} + client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0} + client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0} + loadBalancer := NewLoadBalancerRR() + endpoint, err := loadBalancer.NextEndpoint("foo", nil) + if err == nil || len(endpoint) != 0 { + t.Errorf("Didn't fail with non-existent service") + } + + loadBalancer.NewService("foo", api.AffinityTypeClientIP, 0) + endpoints := make([]api.Endpoints, 1) + endpoints[0] = api.Endpoints{ + ObjectMeta: api.ObjectMeta{Name: "foo"}, + Endpoints: []string{"endpoint:1", "endpoint:2", "endpoint:3"}, + } + loadBalancer.OnUpdate(endpoints) + expectEndpoint(t, loadBalancer, "foo", "endpoint:1", client1) + expectEndpoint(t, loadBalancer, "foo", "endpoint:1", client1) + expectEndpoint(t, loadBalancer, "foo", "endpoint:2", client2) + expectEndpoint(t, loadBalancer, "foo", "endpoint:2", client2) + expectEndpoint(t, loadBalancer, "foo", "endpoint:3", client3) + expectEndpoint(t, loadBalancer, "foo", "endpoint:1", client1) + expectEndpoint(t, loadBalancer, "foo", "endpoint:2", client2) + // Then update the configuration with one fewer endpoints, make sure + // we start in the beginning again + endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, + Endpoints: []string{"endpoint:4", "endpoint:5"}, + } + loadBalancer.OnUpdate(endpoints) + expectEndpoint(t, loadBalancer, "foo", "endpoint:4", client1) + expectEndpoint(t, loadBalancer, "foo", "endpoint:5", client2) + expectEndpoint(t, loadBalancer, "foo", "endpoint:4", client1) + expectEndpoint(t, loadBalancer, "foo", "endpoint:5", client2) + expectEndpoint(t, loadBalancer, "foo", "endpoint:5", client2) + + // Clear endpoints + endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []string{}} + loadBalancer.OnUpdate(endpoints) + + endpoint, err = loadBalancer.NextEndpoint("foo", nil) + if err == nil || len(endpoint) != 0 { + t.Errorf("Didn't fail with non-existent service") + } +} + +func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) { + client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0} + client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0} + client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0} + loadBalancer := NewLoadBalancerRR() + endpoint, err := loadBalancer.NextEndpoint("foo", nil) + if err == nil || len(endpoint) != 0 { + t.Errorf("Didn't fail with non-existent service") + } + loadBalancer.NewService("foo", api.AffinityTypeClientIP, 0) + endpoints := make([]api.Endpoints, 2) + endpoints[0] = api.Endpoints{ + ObjectMeta: api.ObjectMeta{Name: "foo"}, + Endpoints: []string{"endpoint:1", "endpoint:2", "endpoint:3"}, + } + loadBalancer.NewService("bar", api.AffinityTypeClientIP, 0) + endpoints[1] = api.Endpoints{ + ObjectMeta: api.ObjectMeta{Name: "bar"}, + Endpoints: []string{"endpoint:4", "endpoint:5"}, + } + loadBalancer.OnUpdate(endpoints) + expectEndpoint(t, loadBalancer, "foo", "endpoint:1", client1) + expectEndpoint(t, loadBalancer, "foo", "endpoint:2", client2) + expectEndpoint(t, loadBalancer, "foo", "endpoint:3", client3) + expectEndpoint(t, loadBalancer, "foo", "endpoint:3", client3) + expectEndpoint(t, loadBalancer, "foo", "endpoint:1", client1) + expectEndpoint(t, loadBalancer, "foo", "endpoint:2", client2) + + expectEndpoint(t, loadBalancer, "bar", "endpoint:4", client1) + expectEndpoint(t, loadBalancer, "bar", "endpoint:5", client2) + expectEndpoint(t, loadBalancer, "bar", "endpoint:4", client1) + expectEndpoint(t, loadBalancer, "bar", "endpoint:5", client2) + expectEndpoint(t, loadBalancer, "bar", "endpoint:4", client1) + expectEndpoint(t, loadBalancer, "bar", "endpoint:4", client1) + + // Then update the configuration by removing foo + loadBalancer.OnUpdate(endpoints[1:]) + endpoint, err = loadBalancer.NextEndpoint("foo", nil) + if err == nil || len(endpoint) != 0 { + t.Errorf("Didn't fail with non-existent service") + } + + // but bar is still there, and we continue RR from where we left off. + expectEndpoint(t, loadBalancer, "bar", "endpoint:4", client1) + expectEndpoint(t, loadBalancer, "bar", "endpoint:5", client2) + expectEndpoint(t, loadBalancer, "bar", "endpoint:4", client1) + expectEndpoint(t, loadBalancer, "bar", "endpoint:5", client2) + expectEndpoint(t, loadBalancer, "bar", "endpoint:4", client1) + expectEndpoint(t, loadBalancer, "bar", "endpoint:4", client1) } diff --git a/pkg/registry/service/rest.go b/pkg/registry/service/rest.go index 6d7570b78d5ef..a541adbbd1d66 100644 --- a/pkg/registry/service/rest.go +++ b/pkg/registry/service/rest.go @@ -134,16 +134,20 @@ func (rs *REST) Create(ctx api.Context, obj runtime.Object) (<-chan apiserver.RE if err != nil { return nil, err } + var affinityType api.AffinityType = api.AffinityTypeNone + if service.Spec.SessionAffinity != nil { + affinityType = *service.Spec.SessionAffinity + } if len(service.Spec.PublicIPs) > 0 { for _, publicIP := range service.Spec.PublicIPs { - _, err = balancer.CreateTCPLoadBalancer(service.Name, zone.Region, net.ParseIP(publicIP), service.Spec.Port, hostsFromMinionList(hosts)) + _, err = balancer.CreateTCPLoadBalancer(service.Name, zone.Region, net.ParseIP(publicIP), service.Spec.Port, hostsFromMinionList(hosts), affinityType) if err != nil { // TODO: have to roll-back any successful calls. return nil, err } } } else { - ip, err := balancer.CreateTCPLoadBalancer(service.Name, zone.Region, nil, service.Spec.Port, hostsFromMinionList(hosts)) + ip, err := balancer.CreateTCPLoadBalancer(service.Name, zone.Region, nil, service.Spec.Port, hostsFromMinionList(hosts), affinityType) if err != nil { return nil, err }