Skip to content

Commit

Permalink
Merge pull request kubernetes#110328 from danwinship/iptables-counters
Browse files Browse the repository at this point in the history
Stop trying to "preserve" iptables counters that are always 0
  • Loading branch information
k8s-ci-robot authored Jun 29, 2022
2 parents dafa55b + 7c27cf0 commit 0d9ed2c
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 302 deletions.
84 changes: 23 additions & 61 deletions pkg/proxy/iptables/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,24 +423,24 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) {
klog.ErrorS(err, "Failed to execute iptables-save", "table", utiliptables.TableNAT)
encounteredError = true
} else {
existingNATChains := utiliptables.GetChainLines(utiliptables.TableNAT, iptablesData.Bytes())
existingNATChains := utiliptables.GetChainsFromTable(iptablesData.Bytes())
natChains := &utilproxy.LineBuffer{}
natRules := &utilproxy.LineBuffer{}
natChains.Write("*nat")
// Start with chains we know we need to remove.
for _, chain := range []utiliptables.Chain{kubeServicesChain, kubeNodePortsChain, kubePostroutingChain} {
if _, found := existingNATChains[chain]; found {
chainString := string(chain)
natChains.WriteBytes(existingNATChains[chain]) // flush
natRules.Write("-X", chainString) // delete
natChains.Write(utiliptables.MakeChainLine(chain)) // flush
natRules.Write("-X", chainString) // delete
}
}
// Hunt for service and endpoint chains.
for chain := range existingNATChains {
chainString := string(chain)
if isServiceChainName(chainString) {
natChains.WriteBytes(existingNATChains[chain]) // flush
natRules.Write("-X", chainString) // delete
natChains.Write(utiliptables.MakeChainLine(chain)) // flush
natRules.Write("-X", chainString) // delete
}
}
natRules.Write("COMMIT")
Expand All @@ -460,14 +460,14 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) {
klog.ErrorS(err, "Failed to execute iptables-save", "table", utiliptables.TableFilter)
encounteredError = true
} else {
existingFilterChains := utiliptables.GetChainLines(utiliptables.TableFilter, iptablesData.Bytes())
existingFilterChains := utiliptables.GetChainsFromTable(iptablesData.Bytes())
filterChains := &utilproxy.LineBuffer{}
filterRules := &utilproxy.LineBuffer{}
filterChains.Write("*filter")
for _, chain := range []utiliptables.Chain{kubeServicesChain, kubeExternalServicesChain, kubeForwardChain, kubeNodePortsChain} {
if _, found := existingFilterChains[chain]; found {
chainString := string(chain)
filterChains.WriteBytes(existingFilterChains[chain])
filterChains.Write(utiliptables.MakeChainLine(chain))
filterRules.Write("-X", chainString)
}
}
Expand Down Expand Up @@ -890,23 +890,13 @@ func (proxier *Proxier) syncProxyRules() {

// Get iptables-save output so we can check for existing chains and rules.
// This will be a map of chain name to chain with rules as stored in iptables-save/iptables-restore
existingFilterChains := make(map[utiliptables.Chain][]byte)
proxier.existingFilterChainsData.Reset()
err := proxier.iptables.SaveInto(utiliptables.TableFilter, proxier.existingFilterChainsData)
if err != nil { // if we failed to get any rules
klog.ErrorS(err, "Failed to execute iptables-save, syncing all rules")
} else { // otherwise parse the output
existingFilterChains = utiliptables.GetChainLines(utiliptables.TableFilter, proxier.existingFilterChainsData.Bytes())
}

// IMPORTANT: existingNATChains may share memory with proxier.iptablesData.
existingNATChains := make(map[utiliptables.Chain][]byte)
existingNATChains := make(map[utiliptables.Chain]struct{})
proxier.iptablesData.Reset()
err = proxier.iptables.SaveInto(utiliptables.TableNAT, proxier.iptablesData)
if err != nil { // if we failed to get any rules
klog.ErrorS(err, "Failed to execute iptables-save, syncing all rules")
} else { // otherwise parse the output
existingNATChains = utiliptables.GetChainLines(utiliptables.TableNAT, proxier.iptablesData.Bytes())
err := proxier.iptables.SaveInto(utiliptables.TableNAT, proxier.iptablesData)
if err != nil {
klog.ErrorS(err, "Failed to execute iptables-save: stale chains will not be deleted")
} else {
existingNATChains = utiliptables.GetChainsFromTable(proxier.iptablesData.Bytes())
}

// Reset all buffers used later.
Expand All @@ -919,18 +909,10 @@ func (proxier *Proxier) syncProxyRules() {
// Make sure we keep stats for the top-level chains, if they existed
// (which most should have because we created them above).
for _, chainName := range []utiliptables.Chain{kubeServicesChain, kubeExternalServicesChain, kubeForwardChain, kubeNodePortsChain} {
if chain, ok := existingFilterChains[chainName]; ok {
proxier.filterChains.WriteBytes(chain)
} else {
proxier.filterChains.Write(utiliptables.MakeChainLine(chainName))
}
proxier.filterChains.Write(utiliptables.MakeChainLine(chainName))
}
for _, chainName := range []utiliptables.Chain{kubeServicesChain, kubeNodePortsChain, kubePostroutingChain, kubeMarkMasqChain} {
if chain, ok := existingNATChains[chainName]; ok {
proxier.natChains.WriteBytes(chain)
} else {
proxier.natChains.Write(utiliptables.MakeChainLine(chainName))
}
proxier.natChains.Write(utiliptables.MakeChainLine(chainName))
}

// Install the kubernetes-specific postrouting rules. We use a whole chain for
Expand Down Expand Up @@ -1028,12 +1010,8 @@ func (proxier *Proxier) syncProxyRules() {

endpointChain := epInfo.ChainName

// Create the endpoint chain, retaining counters if possible.
if chain, ok := existingNATChains[endpointChain]; ok {
proxier.natChains.WriteBytes(chain)
} else {
proxier.natChains.Write(utiliptables.MakeChainLine(endpointChain))
}
// Create the endpoint chain
proxier.natChains.Write(utiliptables.MakeChainLine(endpointChain))
activeNATChains[endpointChain] = true

args = append(args[:0], "-A", string(endpointChain))
Expand Down Expand Up @@ -1077,22 +1055,14 @@ func (proxier *Proxier) syncProxyRules() {

// Declare the clusterPolicyChain if needed.
if hasEndpoints && svcInfo.UsesClusterEndpoints() {
// Create the Cluster traffic policy chain, retaining counters if possible.
if chain, ok := existingNATChains[clusterPolicyChain]; ok {
proxier.natChains.WriteBytes(chain)
} else {
proxier.natChains.Write(utiliptables.MakeChainLine(clusterPolicyChain))
}
// Create the Cluster traffic policy chain
proxier.natChains.Write(utiliptables.MakeChainLine(clusterPolicyChain))
activeNATChains[clusterPolicyChain] = true
}

// Declare the localPolicyChain if needed.
if hasEndpoints && svcInfo.UsesLocalEndpoints() {
if chain, ok := existingNATChains[localPolicyChain]; ok {
proxier.natChains.WriteBytes(chain)
} else {
proxier.natChains.Write(utiliptables.MakeChainLine(localPolicyChain))
}
proxier.natChains.Write(utiliptables.MakeChainLine(localPolicyChain))
activeNATChains[localPolicyChain] = true
}

Expand All @@ -1101,11 +1071,7 @@ func (proxier *Proxier) syncProxyRules() {
// jump to externalTrafficChain, which will handle some special-cases
// and then jump to externalPolicyChain.
if hasEndpoints && svcInfo.ExternallyAccessible() {
if chain, ok := existingNATChains[externalTrafficChain]; ok {
proxier.natChains.WriteBytes(chain)
} else {
proxier.natChains.Write(utiliptables.MakeChainLine(externalTrafficChain))
}
proxier.natChains.Write(utiliptables.MakeChainLine(externalTrafficChain))
activeNATChains[externalTrafficChain] = true

if !svcInfo.ExternalPolicyLocal() {
Expand Down Expand Up @@ -1233,11 +1199,7 @@ func (proxier *Proxier) syncProxyRules() {
fwChain := svcInfo.firewallChainName

// Declare the service firewall chain.
if chain, ok := existingNATChains[fwChain]; ok {
proxier.natChains.WriteBytes(chain)
} else {
proxier.natChains.Write(utiliptables.MakeChainLine(fwChain))
}
proxier.natChains.Write(utiliptables.MakeChainLine(fwChain))
activeNATChains[fwChain] = true

// The firewall chain will jump to the "external destination"
Expand Down Expand Up @@ -1384,7 +1346,7 @@ func (proxier *Proxier) syncProxyRules() {
// We must (as per iptables) write a chain-line for it, which has
// the nice effect of flushing the chain. Then we can remove the
// chain.
proxier.natChains.WriteBytes(existingNATChains[chain])
proxier.natChains.Write(utiliptables.MakeChainLine(chain))
proxier.natRules.Write("-X", chainString)
}
}
Expand Down
30 changes: 2 additions & 28 deletions pkg/proxy/ipvs/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -1839,27 +1839,15 @@ func (proxier *Proxier) acceptIPVSTraffic() {

// createAndLinkKubeChain create all kube chains that ipvs proxier need and write basic link.
func (proxier *Proxier) createAndLinkKubeChain() {
existingFilterChains := proxier.getExistingChains(proxier.filterChainsData, utiliptables.TableFilter)
existingNATChains := proxier.getExistingChains(proxier.iptablesData, utiliptables.TableNAT)

// Make sure we keep stats for the top-level chains
for _, ch := range iptablesChains {
if _, err := proxier.iptables.EnsureChain(ch.table, ch.chain); err != nil {
klog.ErrorS(err, "Failed to ensure chain exists", "table", ch.table, "chain", ch.chain)
return
}
if ch.table == utiliptables.TableNAT {
if chain, ok := existingNATChains[ch.chain]; ok {
proxier.natChains.WriteBytes(chain)
} else {
proxier.natChains.Write(utiliptables.MakeChainLine(ch.chain))
}
proxier.natChains.Write(utiliptables.MakeChainLine(ch.chain))
} else {
if chain, ok := existingFilterChains[ch.chain]; ok {
proxier.filterChains.WriteBytes(chain)
} else {
proxier.filterChains.Write(utiliptables.MakeChainLine(ch.chain))
}
proxier.filterChains.Write(utiliptables.MakeChainLine(ch.chain))
}
}

Expand All @@ -1872,20 +1860,6 @@ func (proxier *Proxier) createAndLinkKubeChain() {

}

// getExistingChains get iptables-save output so we can check for existing chains and rules.
// This will be a map of chain name to chain with rules as stored in iptables-save/iptables-restore
// Result may SHARE memory with contents of buffer.
func (proxier *Proxier) getExistingChains(buffer *bytes.Buffer, table utiliptables.Table) map[utiliptables.Chain][]byte {
buffer.Reset()
err := proxier.iptables.SaveInto(table, buffer)
if err != nil { // if we failed to get any rules
klog.ErrorS(err, "Failed to execute iptables-save, syncing all rules")
} else { // otherwise parse the output
return utiliptables.GetChainLines(table, buffer.Bytes())
}
return nil
}

// After a UDP or SCTP endpoint has been removed, we must flush any pending conntrack entries to it, or else we
// risk sending more traffic to it, all of which will be lost (because UDP).
// This assumes the proxier mutex is held
Expand Down
104 changes: 17 additions & 87 deletions pkg/util/iptables/save_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,102 +21,32 @@ import (
"fmt"
)

var (
commitBytes = []byte("COMMIT")
spaceBytes = []byte(" ")
)

// MakeChainLine return an iptables-save/restore formatted chain line given a Chain
func MakeChainLine(chain Chain) string {
return fmt.Sprintf(":%s - [0:0]", chain)
}

// GetChainLines parses a table's iptables-save data to find chains in the table.
// It returns a map of iptables.Chain to []byte where the []byte is the chain line
// from save (with counters etc.).
// Note that to avoid allocations memory is SHARED with save.
func GetChainLines(table Table, save []byte) map[Chain][]byte {
chainsMap := make(map[Chain][]byte)
tablePrefix := []byte("*" + string(table))
readIndex := 0
// find beginning of table
for readIndex < len(save) {
line, n := readLine(readIndex, save)
readIndex = n
if bytes.HasPrefix(line, tablePrefix) {
// GetChainsFromTable parses iptables-save data to find the chains that are defined. It
// assumes that save contains a single table's data, and returns a map with keys for every
// chain defined in that table.
func GetChainsFromTable(save []byte) map[Chain]struct{} {
chainsMap := make(map[Chain]struct{})

for {
i := bytes.Index(save, []byte("\n:"))
if i == -1 {
break
}
}
// parse table lines
for readIndex < len(save) {
line, n := readLine(readIndex, save)
readIndex = n
if len(line) == 0 {
continue
}
if bytes.HasPrefix(line, commitBytes) || line[0] == '*' {
start := i + 2
save = save[start:]
end := bytes.Index(save, []byte(" "))
if i == -1 {
// shouldn't happen, but...
break
} else if line[0] == '#' {
continue
} else if line[0] == ':' && len(line) > 1 {
// We assume that the <line> contains space - chain lines have 3 fields,
// space delimited. If there is no space, this line will panic.
spaceIndex := bytes.Index(line, spaceBytes)
if spaceIndex == -1 {
panic(fmt.Sprintf("Unexpected chain line in iptables-save output: %v", string(line)))
}
chain := Chain(line[1:spaceIndex])
chainsMap[chain] = line
}
chain := Chain(save[:end])
chainsMap[chain] = struct{}{}
save = save[end:]
}
return chainsMap
}

func readLine(readIndex int, byteArray []byte) ([]byte, int) {
currentReadIndex := readIndex

// consume left spaces
for currentReadIndex < len(byteArray) {
if byteArray[currentReadIndex] == ' ' {
currentReadIndex++
} else {
break
}
}

// leftTrimIndex stores the left index of the line after the line is left-trimmed
leftTrimIndex := currentReadIndex

// rightTrimIndex stores the right index of the line after the line is right-trimmed
// it is set to -1 since the correct value has not yet been determined.
rightTrimIndex := -1

for ; currentReadIndex < len(byteArray); currentReadIndex++ {
if byteArray[currentReadIndex] == ' ' {
// set rightTrimIndex
if rightTrimIndex == -1 {
rightTrimIndex = currentReadIndex
}
} else if (byteArray[currentReadIndex] == '\n') || (currentReadIndex == (len(byteArray) - 1)) {
// end of line or byte buffer is reached
if currentReadIndex <= leftTrimIndex {
return nil, currentReadIndex + 1
}
// set the rightTrimIndex
if rightTrimIndex == -1 {
rightTrimIndex = currentReadIndex
if currentReadIndex == (len(byteArray)-1) && (byteArray[currentReadIndex] != '\n') {
// ensure that the last character is part of the returned string,
// unless the last character is '\n'
rightTrimIndex = currentReadIndex + 1
}
}
// Avoid unnecessary allocation.
return byteArray[leftTrimIndex:rightTrimIndex], currentReadIndex + 1
} else {
// unset rightTrimIndex
rightTrimIndex = -1
}
}
return nil, currentReadIndex
}
Loading

0 comments on commit 0d9ed2c

Please sign in to comment.