From b0228f876fd3f996a93d99b246fb2328e05804e1 Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Wed, 13 Sep 2017 11:22:06 -0400 Subject: [PATCH 1/3] Install conntrack-tools in dind node image (kube-proxy requires it) --- images/dind/node/Dockerfile | 1 + 1 file changed, 1 insertion(+) diff --git a/images/dind/node/Dockerfile b/images/dind/node/Dockerfile index 4848886b05e2..231b4685d5b5 100644 --- a/images/dind/node/Dockerfile +++ b/images/dind/node/Dockerfile @@ -21,6 +21,7 @@ RUN dnf -y update && dnf -y install\ bridge-utils\ ethtool\ iptables-services\ + conntrack-tools\ openvswitch\ python-netaddr\ python2-pyroute2\ From 0cc4ccc8eb47800c2f14f009caa74ad82cdc9de9 Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Wed, 13 Sep 2017 18:24:09 -0400 Subject: [PATCH 2/3] Require conntrack-tools in node package --- origin.spec | 1 + 1 file changed, 1 insertion(+) diff --git a/origin.spec b/origin.spec index ad5c938c003f..0c52cf6577f5 100644 --- a/origin.spec +++ b/origin.spec @@ -121,6 +121,7 @@ Requires: socat Requires: nfs-utils Requires: ethtool Requires: device-mapper-persistent-data >= 0.6.2 +Requires: conntrack-tools Requires(post): systemd Requires(preun): systemd Requires(postun): systemd From 2722efafe83987f03189b931d3c2e73fd3a43c85 Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Wed, 13 Sep 2017 11:29:15 -0400 Subject: [PATCH 3/3] UPSTREAM: 48524: fix udp service blackhole problem when number of backends changes from 0 to non-0 --- .../kubernetes/pkg/proxy/iptables/proxier.go | 67 ++++-- .../pkg/proxy/iptables/proxier_test.go | 223 +++++++++++------- 2 files changed, 188 insertions(+), 102 deletions(-) diff --git a/vendor/k8s.io/kubernetes/pkg/proxy/iptables/proxier.go b/vendor/k8s.io/kubernetes/pkg/proxy/iptables/proxier.go index dd581e42b7a8..113f299943bd 100644 --- a/vendor/k8s.io/kubernetes/pkg/proxy/iptables/proxier.go +++ b/vendor/k8s.io/kubernetes/pkg/proxy/iptables/proxier.go @@ -250,6 +250,17 @@ type serviceChangeMap struct { items map[types.NamespacedName]*serviceChange } +type updateEndpointMapResult struct { + hcEndpoints map[types.NamespacedName]int + staleEndpoints map[endpointServicePair]bool + staleServiceNames map[proxy.ServicePortName]bool +} + +type updateServiceMapResult struct { + hcServices map[types.NamespacedName]uint16 + staleServices sets.String +} + type proxyServiceMap map[proxy.ServicePortName]*serviceInfo type proxyEndpointsMap map[proxy.ServicePortName][]*endpointsInfo @@ -694,29 +705,29 @@ func shouldSkipService(svcName types.NamespacedName, service *api.Service) bool // map is cleared after applying them. func updateServiceMap( serviceMap proxyServiceMap, - changes *serviceChangeMap) (hcServices map[types.NamespacedName]uint16, staleServices sets.String) { - staleServices = sets.NewString() + changes *serviceChangeMap) (result updateServiceMapResult) { + result.staleServices = sets.NewString() func() { changes.lock.Lock() defer changes.lock.Unlock() for _, change := range changes.items { existingPorts := serviceMap.merge(change.current) - serviceMap.unmerge(change.previous, existingPorts, staleServices) + serviceMap.unmerge(change.previous, existingPorts, result.staleServices) } changes.items = make(map[types.NamespacedName]*serviceChange) }() // TODO: If this will appear to be computationally expensive, consider // computing this incrementally similarly to serviceMap. - hcServices = make(map[types.NamespacedName]uint16) + result.hcServices = make(map[types.NamespacedName]uint16) for svcPortName, info := range serviceMap { if info.healthCheckNodePort != 0 { - hcServices[svcPortName.NamespacedName] = uint16(info.healthCheckNodePort) + result.hcServices[svcPortName.NamespacedName] = uint16(info.healthCheckNodePort) } } - return hcServices, staleServices + return result } func (proxier *Proxier) OnEndpointsAdd(endpoints *api.Endpoints) { @@ -755,8 +766,9 @@ func (proxier *Proxier) OnEndpointsSynced() { func updateEndpointsMap( endpointsMap proxyEndpointsMap, changes *endpointsChangeMap, - hostname string) (hcEndpoints map[types.NamespacedName]int, staleSet map[endpointServicePair]bool) { - staleSet = make(map[endpointServicePair]bool) + hostname string) (result updateEndpointMapResult) { + result.staleEndpoints = make(map[endpointServicePair]bool) + result.staleServiceNames = make(map[proxy.ServicePortName]bool) func() { changes.lock.Lock() @@ -764,7 +776,7 @@ func updateEndpointsMap( for _, change := range changes.items { endpointsMap.unmerge(change.previous) endpointsMap.merge(change.current) - detectStaleConnections(change.previous, change.current, staleSet) + detectStaleConnections(change.previous, change.current, result.staleEndpoints, result.staleServiceNames) } changes.items = make(map[types.NamespacedName]*endpointsChange) }() @@ -775,18 +787,17 @@ func updateEndpointsMap( // TODO: If this will appear to be computationally expensive, consider // computing this incrementally similarly to endpointsMap. - hcEndpoints = make(map[types.NamespacedName]int) + result.hcEndpoints = make(map[types.NamespacedName]int) localIPs := getLocalIPs(endpointsMap) for nsn, ips := range localIPs { - hcEndpoints[nsn] = len(ips) + result.hcEndpoints[nsn] = len(ips) } - return hcEndpoints, staleSet + return result } -// are modified by this function with detected stale -// connections. -func detectStaleConnections(oldEndpointsMap, newEndpointsMap proxyEndpointsMap, staleEndpoints map[endpointServicePair]bool) { +// and are modified by this function with detected stale connections. +func detectStaleConnections(oldEndpointsMap, newEndpointsMap proxyEndpointsMap, staleEndpoints map[endpointServicePair]bool, staleServiceNames map[proxy.ServicePortName]bool) { for svcPortName, epList := range oldEndpointsMap { for _, ep := range epList { stale := true @@ -802,6 +813,13 @@ func detectStaleConnections(oldEndpointsMap, newEndpointsMap proxyEndpointsMap, } } } + + for svcPortName, epList := range newEndpointsMap { + // For udp service, if its backend changes from 0 to non-0. There may exist a conntrack entry that could blackhole traffic to the service. + if len(epList) > 0 && len(oldEndpointsMap[svcPortName]) == 0 { + staleServiceNames[svcPortName] = true + } + } } func getLocalIPs(endpointsMap proxyEndpointsMap) map[types.NamespacedName]sets.String { @@ -983,11 +1001,20 @@ func (proxier *Proxier) syncProxyRules() { // We assume that if this was called, we really want to sync them, // even if nothing changed in the meantime. In other words, callers are // responsible for detecting no-op changes and not calling this function. - hcServices, staleServices := updateServiceMap( + serviceUpdateResult := updateServiceMap( proxier.serviceMap, &proxier.serviceChanges) - hcEndpoints, staleEndpoints := updateEndpointsMap( + endpointUpdateResult := updateEndpointsMap( proxier.endpointsMap, &proxier.endpointsChanges, proxier.hostname) + staleServices := serviceUpdateResult.staleServices + // merge stale services gathered from updateEndpointsMap + for svcPortName := range endpointUpdateResult.staleServiceNames { + if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && svcInfo.protocol == api.ProtocolUDP { + glog.V(2).Infof("Stale udp service %v -> %s", svcPortName, svcInfo.clusterIP.String()) + staleServices.Insert(svcInfo.clusterIP.String()) + } + } + glog.V(3).Infof("Syncing iptables rules") // Create and link the kube services chain. @@ -1594,17 +1621,17 @@ func (proxier *Proxier) syncProxyRules() { // Update healthchecks. The endpoints list might include services that are // not "OnlyLocal", but the services list will not, and the healthChecker // will just drop those endpoints. - if err := proxier.healthChecker.SyncServices(hcServices); err != nil { + if err := proxier.healthChecker.SyncServices(serviceUpdateResult.hcServices); err != nil { glog.Errorf("Error syncing healtcheck services: %v", err) } - if err := proxier.healthChecker.SyncEndpoints(hcEndpoints); err != nil { + if err := proxier.healthChecker.SyncEndpoints(endpointUpdateResult.hcEndpoints); err != nil { glog.Errorf("Error syncing healthcheck endoints: %v", err) } // Finish housekeeping. // TODO: these and clearUDPConntrackForPort() could be made more consistent. utilproxy.DeleteServiceConnections(proxier.exec, staleServices.List()) - proxier.deleteEndpointConnections(staleEndpoints) + proxier.deleteEndpointConnections(endpointUpdateResult.staleEndpoints) } // Clear UDP conntrack for port or all conntrack entries when port equal zero. diff --git a/vendor/k8s.io/kubernetes/pkg/proxy/iptables/proxier_test.go b/vendor/k8s.io/kubernetes/pkg/proxy/iptables/proxier_test.go index 989c244776e2..619b543cbbf1 100644 --- a/vendor/k8s.io/kubernetes/pkg/proxy/iptables/proxier_test.go +++ b/vendor/k8s.io/kubernetes/pkg/proxy/iptables/proxier_test.go @@ -1088,24 +1088,24 @@ func TestBuildServiceMapAddRemove(t *testing.T) { for i := range services { fp.OnServiceAdd(services[i]) } - hcPorts, staleUDPServices := updateServiceMap(fp.serviceMap, &fp.serviceChanges) + result := updateServiceMap(fp.serviceMap, &fp.serviceChanges) if len(fp.serviceMap) != 8 { t.Errorf("expected service map length 8, got %v", fp.serviceMap) } // The only-local-loadbalancer ones get added - if len(hcPorts) != 1 { - t.Errorf("expected 1 healthcheck port, got %v", hcPorts) + if len(result.hcServices) != 1 { + t.Errorf("expected 1 healthcheck port, got %v", result.hcServices) } else { nsn := makeNSN("somewhere", "only-local-load-balancer") - if port, found := hcPorts[nsn]; !found || port != 345 { - t.Errorf("expected healthcheck port [%q]=345: got %v", nsn, hcPorts) + if port, found := result.hcServices[nsn]; !found || port != 345 { + t.Errorf("expected healthcheck port [%q]=345: got %v", nsn, result.hcServices) } } - if len(staleUDPServices) != 0 { + if len(result.staleServices) != 0 { // Services only added, so nothing stale yet - t.Errorf("expected stale UDP services length 0, got %d", len(staleUDPServices)) + t.Errorf("expected stale UDP services length 0, got %d", len(result.staleServices)) } // Remove some stuff @@ -1121,24 +1121,24 @@ func TestBuildServiceMapAddRemove(t *testing.T) { fp.OnServiceDelete(services[2]) fp.OnServiceDelete(services[3]) - hcPorts, staleUDPServices = updateServiceMap(fp.serviceMap, &fp.serviceChanges) + result = updateServiceMap(fp.serviceMap, &fp.serviceChanges) if len(fp.serviceMap) != 1 { t.Errorf("expected service map length 1, got %v", fp.serviceMap) } - if len(hcPorts) != 0 { - t.Errorf("expected 0 healthcheck ports, got %v", hcPorts) + if len(result.hcServices) != 0 { + t.Errorf("expected 0 healthcheck ports, got %v", result.hcServices) } // All services but one were deleted. While you'd expect only the ClusterIPs // from the three deleted services here, we still have the ClusterIP for // the not-deleted service, because one of it's ServicePorts was deleted. expectedStaleUDPServices := []string{"172.16.55.10", "172.16.55.4", "172.16.55.11", "172.16.55.12"} - if len(staleUDPServices) != len(expectedStaleUDPServices) { - t.Errorf("expected stale UDP services length %d, got %v", len(expectedStaleUDPServices), staleUDPServices.List()) + if len(result.staleServices) != len(expectedStaleUDPServices) { + t.Errorf("expected stale UDP services length %d, got %v", len(expectedStaleUDPServices), result.staleServices.List()) } for _, ip := range expectedStaleUDPServices { - if !staleUDPServices.Has(ip) { + if !result.staleServices.Has(ip) { t.Errorf("expected stale UDP service service %s", ip) } } @@ -1157,18 +1157,18 @@ func TestBuildServiceMapServiceHeadless(t *testing.T) { ) // Headless service should be ignored - hcPorts, staleUDPServices := updateServiceMap(fp.serviceMap, &fp.serviceChanges) + result := updateServiceMap(fp.serviceMap, &fp.serviceChanges) if len(fp.serviceMap) != 0 { t.Errorf("expected service map length 0, got %d", len(fp.serviceMap)) } // No proxied services, so no healthchecks - if len(hcPorts) != 0 { - t.Errorf("expected healthcheck ports length 0, got %d", len(hcPorts)) + if len(result.hcServices) != 0 { + t.Errorf("expected healthcheck ports length 0, got %d", len(result.hcServices)) } - if len(staleUDPServices) != 0 { - t.Errorf("expected stale UDP services length 0, got %d", len(staleUDPServices)) + if len(result.staleServices) != 0 { + t.Errorf("expected stale UDP services length 0, got %d", len(result.staleServices)) } } @@ -1185,16 +1185,16 @@ func TestBuildServiceMapServiceTypeExternalName(t *testing.T) { }), ) - hcPorts, staleUDPServices := updateServiceMap(fp.serviceMap, &fp.serviceChanges) + result := updateServiceMap(fp.serviceMap, &fp.serviceChanges) if len(fp.serviceMap) != 0 { t.Errorf("expected service map length 0, got %v", fp.serviceMap) } // No proxied services, so no healthchecks - if len(hcPorts) != 0 { - t.Errorf("expected healthcheck ports length 0, got %v", hcPorts) + if len(result.hcServices) != 0 { + t.Errorf("expected healthcheck ports length 0, got %v", result.hcServices) } - if len(staleUDPServices) != 0 { - t.Errorf("expected stale UDP services length 0, got %v", staleUDPServices) + if len(result.staleServices) != 0 { + t.Errorf("expected stale UDP services length 0, got %v", result.staleServices) } } @@ -1227,57 +1227,57 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { fp.OnServiceAdd(servicev1) - hcPorts, staleUDPServices := updateServiceMap(fp.serviceMap, &fp.serviceChanges) + result := updateServiceMap(fp.serviceMap, &fp.serviceChanges) if len(fp.serviceMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.serviceMap) } - if len(hcPorts) != 0 { - t.Errorf("expected healthcheck ports length 0, got %v", hcPorts) + if len(result.hcServices) != 0 { + t.Errorf("expected healthcheck ports length 0, got %v", result.hcServices) } - if len(staleUDPServices) != 0 { + if len(result.staleServices) != 0 { // Services only added, so nothing stale yet - t.Errorf("expected stale UDP services length 0, got %d", len(staleUDPServices)) + t.Errorf("expected stale UDP services length 0, got %d", len(result.staleServices)) } // Change service to load-balancer fp.OnServiceUpdate(servicev1, servicev2) - hcPorts, staleUDPServices = updateServiceMap(fp.serviceMap, &fp.serviceChanges) + result = updateServiceMap(fp.serviceMap, &fp.serviceChanges) if len(fp.serviceMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.serviceMap) } - if len(hcPorts) != 1 { - t.Errorf("expected healthcheck ports length 1, got %v", hcPorts) + if len(result.hcServices) != 1 { + t.Errorf("expected healthcheck ports length 1, got %v", result.hcServices) } - if len(staleUDPServices) != 0 { - t.Errorf("expected stale UDP services length 0, got %v", staleUDPServices.List()) + if len(result.staleServices) != 0 { + t.Errorf("expected stale UDP services length 0, got %v", result.staleServices.List()) } // No change; make sure the service map stays the same and there are // no health-check changes fp.OnServiceUpdate(servicev2, servicev2) - hcPorts, staleUDPServices = updateServiceMap(fp.serviceMap, &fp.serviceChanges) + result = updateServiceMap(fp.serviceMap, &fp.serviceChanges) if len(fp.serviceMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.serviceMap) } - if len(hcPorts) != 1 { - t.Errorf("expected healthcheck ports length 1, got %v", hcPorts) + if len(result.hcServices) != 1 { + t.Errorf("expected healthcheck ports length 1, got %v", result.hcServices) } - if len(staleUDPServices) != 0 { - t.Errorf("expected stale UDP services length 0, got %v", staleUDPServices.List()) + if len(result.staleServices) != 0 { + t.Errorf("expected stale UDP services length 0, got %v", result.staleServices.List()) } // And back to ClusterIP fp.OnServiceUpdate(servicev2, servicev1) - hcPorts, staleUDPServices = updateServiceMap(fp.serviceMap, &fp.serviceChanges) + result = updateServiceMap(fp.serviceMap, &fp.serviceChanges) if len(fp.serviceMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.serviceMap) } - if len(hcPorts) != 0 { - t.Errorf("expected healthcheck ports length 0, got %v", hcPorts) + if len(result.hcServices) != 0 { + t.Errorf("expected healthcheck ports length 0, got %v", result.hcServices) } - if len(staleUDPServices) != 0 { + if len(result.staleServices) != 0 { // Services only added, so nothing stale yet - t.Errorf("expected stale UDP services length 0, got %d", len(staleUDPServices)) + t.Errorf("expected stale UDP services length 0, got %d", len(result.staleServices)) } } @@ -1606,6 +1606,9 @@ func compareEndpointsMaps(t *testing.T, tci int, newMap, expected map[proxy.Serv func Test_updateEndpointsMap(t *testing.T) { var nodeName = testHostname + emptyEndpoint := func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{} + } unnamedPort := func(ept *api.Endpoints) { ept.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{ @@ -1910,18 +1913,20 @@ func Test_updateEndpointsMap(t *testing.T) { // previousEndpoints and currentEndpoints are used to call appropriate // handlers OnEndpoints* (based on whether corresponding values are nil // or non-nil) and must be of equal length. - previousEndpoints []*api.Endpoints - currentEndpoints []*api.Endpoints - oldEndpoints map[proxy.ServicePortName][]*endpointsInfo - expectedResult map[proxy.ServicePortName][]*endpointsInfo - expectedStale []endpointServicePair - expectedHealthchecks map[types.NamespacedName]int + previousEndpoints []*api.Endpoints + currentEndpoints []*api.Endpoints + oldEndpoints map[proxy.ServicePortName][]*endpointsInfo + expectedResult map[proxy.ServicePortName][]*endpointsInfo + expectedStaleEndpoints []endpointServicePair + expectedStaleServiceNames map[proxy.ServicePortName]bool + expectedHealthchecks map[types.NamespacedName]int }{{ // Case[0]: nothing - oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{}, - expectedResult: map[proxy.ServicePortName][]*endpointsInfo{}, - expectedStale: []endpointServicePair{}, - expectedHealthchecks: map[types.NamespacedName]int{}, + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{}, + expectedResult: map[proxy.ServicePortName][]*endpointsInfo{}, + expectedStaleEndpoints: []endpointServicePair{}, + expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, + expectedHealthchecks: map[types.NamespacedName]int{}, }, { // Case[1]: no change, unnamed port previousEndpoints: []*api.Endpoints{ @@ -1940,8 +1945,9 @@ func Test_updateEndpointsMap(t *testing.T) { {endpoint: "1.1.1.1:11", isLocal: false}, }, }, - expectedStale: []endpointServicePair{}, - expectedHealthchecks: map[types.NamespacedName]int{}, + expectedStaleEndpoints: []endpointServicePair{}, + expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, + expectedHealthchecks: map[types.NamespacedName]int{}, }, { // Case[2]: no change, named port, local previousEndpoints: []*api.Endpoints{ @@ -1960,7 +1966,8 @@ func Test_updateEndpointsMap(t *testing.T) { {endpoint: "1.1.1.1:11", isLocal: true}, }, }, - expectedStale: []endpointServicePair{}, + expectedStaleEndpoints: []endpointServicePair{}, + expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, expectedHealthchecks: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, @@ -1988,8 +1995,9 @@ func Test_updateEndpointsMap(t *testing.T) { {endpoint: "1.1.1.2:12", isLocal: false}, }, }, - expectedStale: []endpointServicePair{}, - expectedHealthchecks: map[types.NamespacedName]int{}, + expectedStaleEndpoints: []endpointServicePair{}, + expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, + expectedHealthchecks: map[types.NamespacedName]int{}, }, { // Case[4]: no change, multiple subsets, multiple ports, local previousEndpoints: []*api.Endpoints{ @@ -2020,7 +2028,8 @@ func Test_updateEndpointsMap(t *testing.T) { {endpoint: "1.1.1.3:13", isLocal: false}, }, }, - expectedStale: []endpointServicePair{}, + expectedStaleEndpoints: []endpointServicePair{}, + expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, expectedHealthchecks: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, @@ -2086,7 +2095,8 @@ func Test_updateEndpointsMap(t *testing.T) { {endpoint: "2.2.2.2:22", isLocal: true}, }, }, - expectedStale: []endpointServicePair{}, + expectedStaleEndpoints: []endpointServicePair{}, + expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, expectedHealthchecks: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 2, makeNSN("ns2", "ep2"): 1, @@ -2105,7 +2115,10 @@ func Test_updateEndpointsMap(t *testing.T) { {endpoint: "1.1.1.1:11", isLocal: true}, }, }, - expectedStale: []endpointServicePair{}, + expectedStaleEndpoints: []endpointServicePair{}, + expectedStaleServiceNames: map[proxy.ServicePortName]bool{ + makeServicePortName("ns1", "ep1", ""): true, + }, expectedHealthchecks: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, @@ -2123,11 +2136,12 @@ func Test_updateEndpointsMap(t *testing.T) { }, }, expectedResult: map[proxy.ServicePortName][]*endpointsInfo{}, - expectedStale: []endpointServicePair{{ + expectedStaleEndpoints: []endpointServicePair{{ endpoint: "1.1.1.1:11", servicePortName: makeServicePortName("ns1", "ep1", ""), }}, - expectedHealthchecks: map[types.NamespacedName]int{}, + expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, + expectedHealthchecks: map[types.NamespacedName]int{}, }, { // Case[8]: add an IP and port previousEndpoints: []*api.Endpoints{ @@ -2151,7 +2165,10 @@ func Test_updateEndpointsMap(t *testing.T) { {endpoint: "1.1.1.2:12", isLocal: true}, }, }, - expectedStale: []endpointServicePair{}, + expectedStaleEndpoints: []endpointServicePair{}, + expectedStaleServiceNames: map[proxy.ServicePortName]bool{ + makeServicePortName("ns1", "ep1", "p12"): true, + }, expectedHealthchecks: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, @@ -2178,7 +2195,7 @@ func Test_updateEndpointsMap(t *testing.T) { {endpoint: "1.1.1.1:11", isLocal: false}, }, }, - expectedStale: []endpointServicePair{{ + expectedStaleEndpoints: []endpointServicePair{{ endpoint: "1.1.1.2:11", servicePortName: makeServicePortName("ns1", "ep1", "p11"), }, { @@ -2188,7 +2205,8 @@ func Test_updateEndpointsMap(t *testing.T) { endpoint: "1.1.1.2:12", servicePortName: makeServicePortName("ns1", "ep1", "p12"), }}, - expectedHealthchecks: map[types.NamespacedName]int{}, + expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, + expectedHealthchecks: map[types.NamespacedName]int{}, }, { // Case[10]: add a subset previousEndpoints: []*api.Endpoints{ @@ -2210,7 +2228,10 @@ func Test_updateEndpointsMap(t *testing.T) { {endpoint: "1.1.1.2:12", isLocal: true}, }, }, - expectedStale: []endpointServicePair{}, + expectedStaleEndpoints: []endpointServicePair{}, + expectedStaleServiceNames: map[proxy.ServicePortName]bool{ + makeServicePortName("ns1", "ep1", "p12"): true, + }, expectedHealthchecks: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, @@ -2235,11 +2256,12 @@ func Test_updateEndpointsMap(t *testing.T) { {endpoint: "1.1.1.1:11", isLocal: false}, }, }, - expectedStale: []endpointServicePair{{ + expectedStaleEndpoints: []endpointServicePair{{ endpoint: "1.1.1.2:12", servicePortName: makeServicePortName("ns1", "ep1", "p12"), }}, - expectedHealthchecks: map[types.NamespacedName]int{}, + expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, + expectedHealthchecks: map[types.NamespacedName]int{}, }, { // Case[12]: rename a port previousEndpoints: []*api.Endpoints{ @@ -2258,10 +2280,13 @@ func Test_updateEndpointsMap(t *testing.T) { {endpoint: "1.1.1.1:11", isLocal: false}, }, }, - expectedStale: []endpointServicePair{{ + expectedStaleEndpoints: []endpointServicePair{{ endpoint: "1.1.1.1:11", servicePortName: makeServicePortName("ns1", "ep1", "p11"), }}, + expectedStaleServiceNames: map[proxy.ServicePortName]bool{ + makeServicePortName("ns1", "ep1", "p11-2"): true, + }, expectedHealthchecks: map[types.NamespacedName]int{}, }, { // Case[13]: renumber a port @@ -2281,11 +2306,12 @@ func Test_updateEndpointsMap(t *testing.T) { {endpoint: "1.1.1.1:22", isLocal: false}, }, }, - expectedStale: []endpointServicePair{{ + expectedStaleEndpoints: []endpointServicePair{{ endpoint: "1.1.1.1:11", servicePortName: makeServicePortName("ns1", "ep1", "p11"), }}, - expectedHealthchecks: map[types.NamespacedName]int{}, + expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, + expectedHealthchecks: map[types.NamespacedName]int{}, }, { // Case[14]: complex add and remove previousEndpoints: []*api.Endpoints{ @@ -2337,7 +2363,7 @@ func Test_updateEndpointsMap(t *testing.T) { {endpoint: "4.4.4.4:44", isLocal: true}, }, }, - expectedStale: []endpointServicePair{{ + expectedStaleEndpoints: []endpointServicePair{{ endpoint: "2.2.2.2:22", servicePortName: makeServicePortName("ns2", "ep2", "p22"), }, { @@ -2353,10 +2379,35 @@ func Test_updateEndpointsMap(t *testing.T) { endpoint: "4.4.4.6:45", servicePortName: makeServicePortName("ns4", "ep4", "p45"), }}, + expectedStaleServiceNames: map[proxy.ServicePortName]bool{ + makeServicePortName("ns1", "ep1", "p12"): true, + makeServicePortName("ns1", "ep1", "p122"): true, + makeServicePortName("ns3", "ep3", "p33"): true, + }, expectedHealthchecks: map[types.NamespacedName]int{ makeNSN("ns4", "ep4"): 1, }, - }} + }, { + // Case[15]: change from 0 endpoint address to 1 unnamed port + previousEndpoints: []*api.Endpoints{ + makeTestEndpoints("ns1", "ep1", emptyEndpoint), + }, + currentEndpoints: []*api.Endpoints{ + makeTestEndpoints("ns1", "ep1", unnamedPort), + }, + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{}, + expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", ""): { + {endpoint: "1.1.1.1:11", isLocal: false}, + }, + }, + expectedStaleEndpoints: []endpointServicePair{}, + expectedStaleServiceNames: map[proxy.ServicePortName]bool{ + makeServicePortName("ns1", "ep1", ""): true, + }, + expectedHealthchecks: map[types.NamespacedName]int{}, + }, + } for tci, tc := range testCases { ipt := iptablestest.NewFake() @@ -2390,19 +2441,27 @@ func Test_updateEndpointsMap(t *testing.T) { fp.OnEndpointsUpdate(prev, curr) } } - hcEndpoints, stale := updateEndpointsMap(fp.endpointsMap, &fp.endpointsChanges, fp.hostname) + result := updateEndpointsMap(fp.endpointsMap, &fp.endpointsChanges, fp.hostname) newMap := fp.endpointsMap compareEndpointsMaps(t, tci, newMap, tc.expectedResult) - if len(stale) != len(tc.expectedStale) { - t.Errorf("[%d] expected %d stale, got %d: %v", tci, len(tc.expectedStale), len(stale), stale) + if len(result.staleEndpoints) != len(tc.expectedStaleEndpoints) { + t.Errorf("[%d] expected %d staleEndpoints, got %d: %v", tci, len(tc.expectedStaleEndpoints), len(result.staleEndpoints), result.staleEndpoints) + } + for _, x := range tc.expectedStaleEndpoints { + if result.staleEndpoints[x] != true { + t.Errorf("[%d] expected staleEndpoints[%v], but didn't find it: %v", tci, x, result.staleEndpoints) + } + } + if len(result.staleServiceNames) != len(tc.expectedStaleServiceNames) { + t.Errorf("[%d] expected %d staleServiceNames, got %d: %v", tci, len(tc.expectedStaleServiceNames), len(result.staleServiceNames), result.staleServiceNames) } - for _, x := range tc.expectedStale { - if stale[x] != true { - t.Errorf("[%d] expected stale[%v], but didn't find it: %v", tci, x, stale) + for svcName := range tc.expectedStaleServiceNames { + if result.staleServiceNames[svcName] != true { + t.Errorf("[%d] expected staleServiceNames[%v], but didn't find it: %v", tci, svcName, result.staleServiceNames) } } - if !reflect.DeepEqual(hcEndpoints, tc.expectedHealthchecks) { - t.Errorf("[%d] expected healthchecks %v, got %v", tci, tc.expectedHealthchecks, hcEndpoints) + if !reflect.DeepEqual(result.hcEndpoints, tc.expectedHealthchecks) { + t.Errorf("[%d] expected healthchecks %v, got %v", tci, tc.expectedHealthchecks, result.hcEndpoints) } } }