Skip to content

Commit

Permalink
Merge pull request kubernetes#110268 from danwinship/minimize-iptable…
Browse files Browse the repository at this point in the history
…s-changes

minimize iptables-restore input
  • Loading branch information
k8s-ci-robot authored Nov 2, 2022
2 parents b7f5de1 + 818de5a commit 3edbebe
Show file tree
Hide file tree
Showing 11 changed files with 453 additions and 165 deletions.
9 changes: 9 additions & 0 deletions pkg/features/kube_features.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,13 @@ const (
// Enable MinDomains in Pod Topology Spread.
MinDomainsInPodTopologySpread featuregate.Feature = "MinDomainsInPodTopologySpread"

// owner: @danwinship
// kep: http://kep.k8s.io/3453
// alpha: v1.26
//
// Enables new performance-improving code in kube-proxy iptables mode
MinimizeIPTablesRestore featuregate.Feature = "MinimizeIPTablesRestore"

// owner: @janosi @bridgetkromhout
// kep: https://kep.k8s.io/1435
// alpha: v1.20
Expand Down Expand Up @@ -959,6 +966,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS

MinDomainsInPodTopologySpread: {Default: false, PreRelease: featuregate.Beta},

MinimizeIPTablesRestore: {Default: false, PreRelease: featuregate.Alpha},

MixedProtocolLBService: {Default: true, PreRelease: featuregate.Beta},

MultiCIDRRangeAllocator: {Default: false, PreRelease: featuregate.Alpha},
Expand Down
18 changes: 18 additions & 0 deletions pkg/proxy/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,24 @@ func (ect *EndpointChangeTracker) EndpointSliceUpdate(endpointSlice *discovery.E
return changeNeeded
}

// PendingChanges returns a set whose keys are the names of the services whose endpoints
// have changed since the last time ect was used to update an EndpointsMap. (You must call
// this _before_ calling em.Update(ect).)
func (ect *EndpointChangeTracker) PendingChanges() sets.String {
if ect.endpointSliceCache != nil {
return ect.endpointSliceCache.pendingChanges()
}

ect.lock.Lock()
defer ect.lock.Unlock()

changes := sets.NewString()
for name := range ect.items {
changes.Insert(name.String())
}
return changes
}

// checkoutChanges returns a list of pending endpointsChanges and marks them as
// applied.
func (ect *EndpointChangeTracker) checkoutChanges() []*endpointsChange {
Expand Down
95 changes: 68 additions & 27 deletions pkg/proxy/endpoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -825,13 +825,15 @@ func TestUpdateEndpointsMap(t *testing.T) {
expectedStaleEndpoints []ServiceEndpoint
expectedStaleServiceNames map[ServicePortName]bool
expectedHealthchecks map[types.NamespacedName]int
expectedChangedEndpoints sets.String
}{{
name: "empty",
oldEndpoints: map[ServicePortName][]*BaseEndpointInfo{},
expectedResult: map[ServicePortName][]*BaseEndpointInfo{},
expectedStaleEndpoints: []ServiceEndpoint{},
expectedStaleServiceNames: map[ServicePortName]bool{},
expectedHealthchecks: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.NewString(),
}, {
name: "no change, unnamed port",
previousEndpoints: []*v1.Endpoints{
Expand All @@ -853,6 +855,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
expectedStaleEndpoints: []ServiceEndpoint{},
expectedStaleServiceNames: map[ServicePortName]bool{},
expectedHealthchecks: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.NewString(),
}, {
name: "no change, named port, local",
previousEndpoints: []*v1.Endpoints{
Expand All @@ -876,6 +879,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
expectedHealthchecks: map[types.NamespacedName]int{
makeNSN("ns1", "ep1"): 1,
},
expectedChangedEndpoints: sets.NewString(),
}, {
name: "no change, multiple subsets",
previousEndpoints: []*v1.Endpoints{
Expand Down Expand Up @@ -903,6 +907,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
expectedStaleEndpoints: []ServiceEndpoint{},
expectedStaleServiceNames: map[ServicePortName]bool{},
expectedHealthchecks: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.NewString(),
}, {
name: "no change, multiple subsets, multiple ports, local",
previousEndpoints: []*v1.Endpoints{
Expand Down Expand Up @@ -938,6 +943,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
expectedHealthchecks: map[types.NamespacedName]int{
makeNSN("ns1", "ep1"): 1,
},
expectedChangedEndpoints: sets.NewString(),
}, {
name: "no change, multiple endpoints, subsets, IPs, and ports",
previousEndpoints: []*v1.Endpoints{
Expand Down Expand Up @@ -1006,6 +1012,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
makeNSN("ns1", "ep1"): 2,
makeNSN("ns2", "ep2"): 1,
},
expectedChangedEndpoints: sets.NewString(),
}, {
name: "add an Endpoints",
previousEndpoints: []*v1.Endpoints{
Expand All @@ -1027,6 +1034,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
expectedHealthchecks: map[types.NamespacedName]int{
makeNSN("ns1", "ep1"): 1,
},
expectedChangedEndpoints: sets.NewString("ns1/ep1"),
}, {
name: "remove an Endpoints",
previousEndpoints: []*v1.Endpoints{
Expand All @@ -1047,6 +1055,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
}},
expectedStaleServiceNames: map[ServicePortName]bool{},
expectedHealthchecks: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.NewString("ns1/ep1"),
}, {
name: "add an IP and port",
previousEndpoints: []*v1.Endpoints{
Expand Down Expand Up @@ -1077,6 +1086,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
expectedHealthchecks: map[types.NamespacedName]int{
makeNSN("ns1", "ep1"): 1,
},
expectedChangedEndpoints: sets.NewString("ns1/ep1"),
}, {
name: "remove an IP and port",
previousEndpoints: []*v1.Endpoints{
Expand Down Expand Up @@ -1112,6 +1122,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
}},
expectedStaleServiceNames: map[ServicePortName]bool{},
expectedHealthchecks: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.NewString("ns1/ep1"),
}, {
name: "add a subset",
previousEndpoints: []*v1.Endpoints{
Expand Down Expand Up @@ -1140,6 +1151,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
expectedHealthchecks: map[types.NamespacedName]int{
makeNSN("ns1", "ep1"): 1,
},
expectedChangedEndpoints: sets.NewString("ns1/ep1"),
}, {
name: "remove a subset",
previousEndpoints: []*v1.Endpoints{
Expand Down Expand Up @@ -1167,6 +1179,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
}},
expectedStaleServiceNames: map[ServicePortName]bool{},
expectedHealthchecks: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.NewString("ns1/ep1"),
}, {
name: "rename a port",
previousEndpoints: []*v1.Endpoints{
Expand All @@ -1192,7 +1205,8 @@ func TestUpdateEndpointsMap(t *testing.T) {
expectedStaleServiceNames: map[ServicePortName]bool{
makeServicePortName("ns1", "ep1", "p11-2", v1.ProtocolUDP): true,
},
expectedHealthchecks: map[types.NamespacedName]int{},
expectedHealthchecks: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.NewString("ns1/ep1"),
}, {
name: "renumber a port",
previousEndpoints: []*v1.Endpoints{
Expand All @@ -1217,6 +1231,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
}},
expectedStaleServiceNames: map[ServicePortName]bool{},
expectedHealthchecks: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.NewString("ns1/ep1"),
}, {
name: "complex add and remove",
previousEndpoints: []*v1.Endpoints{
Expand Down Expand Up @@ -1292,6 +1307,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
expectedHealthchecks: map[types.NamespacedName]int{
makeNSN("ns4", "ep4"): 1,
},
expectedChangedEndpoints: sets.NewString("ns1/ep1", "ns2/ep2", "ns3/ep3", "ns4/ep4"),
}, {
name: "change from 0 endpoint address to 1 unnamed port",
previousEndpoints: []*v1.Endpoints{
Expand All @@ -1310,7 +1326,8 @@ func TestUpdateEndpointsMap(t *testing.T) {
expectedStaleServiceNames: map[ServicePortName]bool{
makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): true,
},
expectedHealthchecks: map[types.NamespacedName]int{},
expectedHealthchecks: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.NewString("ns1/ep1"),
},
}

Expand Down Expand Up @@ -1346,6 +1363,12 @@ func TestUpdateEndpointsMap(t *testing.T) {
fp.updateEndpoints(prev, curr)
}
}

pendingChanges := fp.endpointsChanges.PendingChanges()
if !pendingChanges.Equal(tc.expectedChangedEndpoints) {
t.Errorf("[%d] expected changed endpoints %q, got %q", tci, tc.expectedChangedEndpoints.List(), pendingChanges.List())
}

result := fp.endpointsMap.Update(fp.endpointsChanges)
newMap := fp.endpointsMap
compareEndpointsMapsStr(t, newMap, tc.expectedResult)
Expand Down Expand Up @@ -1520,13 +1543,14 @@ func TestEndpointSliceUpdate(t *testing.T) {
fqdnSlice.AddressType = discovery.AddressTypeFQDN

testCases := map[string]struct {
startingSlices []*discovery.EndpointSlice
endpointChangeTracker *EndpointChangeTracker
namespacedName types.NamespacedName
paramEndpointSlice *discovery.EndpointSlice
paramRemoveSlice bool
expectedReturnVal bool
expectedCurrentChange map[ServicePortName][]*BaseEndpointInfo
startingSlices []*discovery.EndpointSlice
endpointChangeTracker *EndpointChangeTracker
namespacedName types.NamespacedName
paramEndpointSlice *discovery.EndpointSlice
paramRemoveSlice bool
expectedReturnVal bool
expectedCurrentChange map[ServicePortName][]*BaseEndpointInfo
expectedChangedEndpoints sets.String
}{
// test starting from an empty state
"add a simple slice that doesn't already exist": {
Expand All @@ -1548,30 +1572,33 @@ func TestEndpointSliceUpdate(t *testing.T) {
&BaseEndpointInfo{Endpoint: "10.0.1.3:443", IsLocal: false, Ready: true, Serving: true, Terminating: false},
},
},
expectedChangedEndpoints: sets.NewString("ns1/svc1"),
},
// test no modification to state - current change should be nil as nothing changes
"add the same slice that already exists": {
startingSlices: []*discovery.EndpointSlice{
generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
},
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
paramRemoveSlice: false,
expectedReturnVal: false,
expectedCurrentChange: nil,
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
paramRemoveSlice: false,
expectedReturnVal: false,
expectedCurrentChange: nil,
expectedChangedEndpoints: sets.NewString(),
},
// ensure that only valide address types are processed
"add an FQDN slice (invalid address type)": {
startingSlices: []*discovery.EndpointSlice{
generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
},
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: fqdnSlice,
paramRemoveSlice: false,
expectedReturnVal: false,
expectedCurrentChange: nil,
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: fqdnSlice,
paramRemoveSlice: false,
expectedReturnVal: false,
expectedCurrentChange: nil,
expectedChangedEndpoints: sets.NewString(),
},
// test additions to existing state
"add a slice that overlaps with existing state": {
Expand Down Expand Up @@ -1604,6 +1631,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
&BaseEndpointInfo{Endpoint: "10.0.2.2:443", IsLocal: true, Ready: true, Serving: true, Terminating: false},
},
},
expectedChangedEndpoints: sets.NewString("ns1/svc1"),
},
// test additions to existing state with partially overlapping slices and ports
"add a slice that overlaps with existing state and partial ports": {
Expand Down Expand Up @@ -1634,6 +1662,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
&BaseEndpointInfo{Endpoint: "10.0.2.2:443", IsLocal: true, Ready: true, Serving: true, Terminating: false},
},
},
expectedChangedEndpoints: sets.NewString("ns1/svc1"),
},
// test deletions from existing state with partially overlapping slices and ports
"remove a slice that overlaps with existing state": {
Expand All @@ -1656,19 +1685,21 @@ func TestEndpointSliceUpdate(t *testing.T) {
&BaseEndpointInfo{Endpoint: "10.0.2.2:443", IsLocal: true, Ready: true, Serving: true, Terminating: false},
},
},
expectedChangedEndpoints: sets.NewString("ns1/svc1"),
},
// ensure a removal that has no effect turns into a no-op
"remove a slice that doesn't even exist in current state": {
startingSlices: []*discovery.EndpointSlice{
generateEndpointSlice("svc1", "ns1", 1, 5, 999, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
generateEndpointSlice("svc1", "ns1", 2, 2, 999, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
},
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 3, 5, 999, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
paramRemoveSlice: true,
expectedReturnVal: false,
expectedCurrentChange: nil,
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 3, 5, 999, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
paramRemoveSlice: true,
expectedReturnVal: false,
expectedCurrentChange: nil,
expectedChangedEndpoints: sets.NewString(),
},
// start with all endpoints ready, transition to no endpoints ready
"transition all endpoints to unready state": {
Expand All @@ -1692,6 +1723,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
&BaseEndpointInfo{Endpoint: "10.0.1.3:443", IsLocal: true, Ready: false, Serving: false, Terminating: false},
},
},
expectedChangedEndpoints: sets.NewString("ns1/svc1"),
},
// start with no endpoints ready, transition to all endpoints ready
"transition all endpoints to ready state": {
Expand All @@ -1713,6 +1745,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
&BaseEndpointInfo{Endpoint: "10.0.1.2:443", IsLocal: true, Ready: true, Serving: true, Terminating: false},
},
},
expectedChangedEndpoints: sets.NewString("ns1/svc1"),
},
// start with some endpoints ready, transition to more endpoints ready
"transition some endpoints to ready state": {
Expand Down Expand Up @@ -1741,6 +1774,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
&BaseEndpointInfo{Endpoint: "10.0.2.2:443", IsLocal: true, Ready: false, Serving: false, Terminating: false},
},
},
expectedChangedEndpoints: sets.NewString("ns1/svc1"),
},
// start with some endpoints ready, transition to some terminating
"transition some endpoints to terminating state": {
Expand Down Expand Up @@ -1769,6 +1803,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
&BaseEndpointInfo{Endpoint: "10.0.2.2:443", IsLocal: true, Ready: false, Serving: false, Terminating: true},
},
},
expectedChangedEndpoints: sets.NewString("ns1/svc1"),
},
}

Expand All @@ -1783,6 +1818,12 @@ func TestEndpointSliceUpdate(t *testing.T) {
if tc.endpointChangeTracker.items == nil {
t.Errorf("Expected ect.items to not be nil")
}

pendingChanges := tc.endpointChangeTracker.PendingChanges()
if !pendingChanges.Equal(tc.expectedChangedEndpoints) {
t.Errorf("expected changed endpoints %q, got %q", tc.expectedChangedEndpoints.List(), pendingChanges.List())
}

changes := tc.endpointChangeTracker.checkoutChanges()
if tc.expectedCurrentChange == nil {
if len(changes) != 0 {
Expand Down
15 changes: 15 additions & 0 deletions pkg/proxy/endpointslicecache.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,21 @@ func (cache *EndpointSliceCache) updatePending(endpointSlice *discovery.Endpoint
return changed
}

// pendingChanges returns a set whose keys are the names of the services whose endpoints
// have changed since the last time checkoutChanges was called
func (cache *EndpointSliceCache) pendingChanges() sets.String {
cache.lock.Lock()
defer cache.lock.Unlock()

changes := sets.NewString()
for serviceNN, esTracker := range cache.trackerByServiceMap {
if len(esTracker.pending) > 0 {
changes.Insert(serviceNN.String())
}
}
return changes
}

// checkoutChanges returns a list of all endpointsChanges that are
// pending and then marks them as applied.
func (cache *EndpointSliceCache) checkoutChanges() []*endpointsChange {
Expand Down
Loading

0 comments on commit 3edbebe

Please sign in to comment.