Skip to content

Commit

Permalink
nftables accelerated datapath (flowtables)
Browse files Browse the repository at this point in the history
The kernel implement a flowtables infrastructure that allow to
accelerate packet forwarding using nftables.

Since connections with a small number of packets will not benefit of the
offloading, add a new option to the nftables kube-proxy so users can
define the minimum number of packets required to offload a connection to
the datapath. It also allow users to completely disable the behavior by
setting this option to 0.

By default connections with more than 20 packets are considered large
connections and offloaded to the fastpath.
  • Loading branch information
aojea committed Dec 9, 2024
1 parent 8ad9607 commit a211e4b
Show file tree
Hide file tree
Showing 13 changed files with 193 additions and 37 deletions.
2 changes: 2 additions & 0 deletions cmd/kube-proxy/app/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) {
fs.Int32Var(&o.metricsPort, "metrics-port", o.metricsPort, "The port to bind the metrics server. Use 0 to disable.")
_ = fs.MarkDeprecated("metrics-port", "This flag is deprecated and will be removed in a future release. Please use --metrics-bind-address instead.")

fs.Int32Var(o.config.NFTables.FastpathPacketThreshold, "fastpath-packet-threshold", ptr.Deref(o.config.NFTables.FastpathPacketThreshold, 20),
"Number of packets required for the proxy to offload the connection to the fastpath. Use 0 to disable.")
logsapi.AddFlags(&o.config.Logging, fs)
}

Expand Down
2 changes: 2 additions & 0 deletions cmd/kube-proxy/app/server_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ func (s *ProxyServer) createProxier(ctx context.Context, config *proxyconfigapi.
s.Recorder,
s.HealthzServer,
config.NodePortAddresses,
int(*config.NFTables.FastpathPacketThreshold),
initOnly,
)
} else {
Expand All @@ -323,6 +324,7 @@ func (s *ProxyServer) createProxier(ctx context.Context, config *proxyconfigapi.
s.Recorder,
s.HealthzServer,
config.NodePortAddresses,
int(*config.NFTables.FastpathPacketThreshold),
initOnly,
)
}
Expand Down
9 changes: 8 additions & 1 deletion pkg/generated/openapi/zz_generated.openapi.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ logging:
metricsBindAddress: 127.0.0.1:10249
mode: ""
nftables:
fastpathPacketThreshold: 20
masqueradeAll: false
masqueradeBit: 14
minSyncPeriod: 0s
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ logging:
metricsBindAddress: 127.0.0.1:10249
mode: ""
nftables:
fastpathPacketThreshold: 20
masqueradeAll: false
masqueradeBit: 14
minSyncPeriod: 0s
Expand Down
5 changes: 5 additions & 0 deletions pkg/proxy/apis/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ type KubeProxyNFTablesConfiguration struct {
// masqueradeBit is the bit of the iptables fwmark space to use for SNAT if using
// the nftables proxy mode. Values must be within the range [0, 31].
MasqueradeBit *int32

// FastpathPacketThreshold is the number of packets untile the proxy starts offloading
// the connection to the fast path and skipping the kernel stack.
// Set to 0 to disable it.
FastpathPacketThreshold *int32
}

// KubeProxyConntrackConfiguration contains conntrack settings for
Expand Down
8 changes: 8 additions & 0 deletions pkg/proxy/apis/config/v1alpha1/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,14 @@ func SetDefaults_KubeProxyConfiguration(obj *kubeproxyconfigv1alpha1.KubeProxyCo
if obj.ClientConnection.Burst == 0 {
obj.ClientConnection.Burst = 10
}
if obj.NFTables.FastpathPacketThreshold == nil {
// Number of packets required to offload the connection to the fastpath.
// Short connections will not benefit much from the switch to fastpath,
// so we consider that a connection that has exchanged more than 20 packets
// in each direction should be offloaded by default.
temp := int32(20)
obj.NFTables.FastpathPacketThreshold = &temp
}
if obj.FeatureGates == nil {
obj.FeatureGates = make(map[string]bool)
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/proxy/apis/config/v1alpha1/zz_generated.conversion.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions pkg/proxy/apis/config/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion pkg/proxy/nftables/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/lithammer/dedent"

"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
netutils "k8s.io/utils/net"
"sigs.k8s.io/knftables"
Expand Down Expand Up @@ -286,6 +286,9 @@ var ignoredRegexp = regexp.MustCompile(strings.Join(
// The trace tests only check new connections, so for our purposes, this
// check always succeeds (and thus can be ignored).
`^ct state new`,

// The trace tests does not check flowtables offloading
`^.*flow offload.*$`,
},
"|",
))
Expand Down
181 changes: 146 additions & 35 deletions pkg/proxy/nftables/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ import (
"sync/atomic"
"time"

"golang.org/x/time/rate"

"github.com/vishvananda/netlink"

v1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -101,6 +105,9 @@ const (
// masquerading
markMasqChain = "mark-for-masquerade"
masqueradingChain = "masquerading"

// flowtables
serviceFlowTable = "kube-proxy-flowtable"
)

// NewDualStackProxier creates a MetaProxier instance, with IPv4 and IPv6 proxies.
Expand All @@ -116,21 +123,22 @@ func NewDualStackProxier(
recorder events.EventRecorder,
healthzServer *healthcheck.ProxierHealthServer,
nodePortAddresses []string,
fastpathPacketThreshold int,
initOnly bool,
) (proxy.Provider, error) {
// Create an ipv4 instance of the single-stack proxier
ipv4Proxier, err := NewProxier(ctx, v1.IPv4Protocol,
syncPeriod, minSyncPeriod, masqueradeAll, masqueradeBit,
localDetectors[v1.IPv4Protocol], hostname, nodeIPs[v1.IPv4Protocol],
recorder, healthzServer, nodePortAddresses, initOnly)
recorder, healthzServer, nodePortAddresses, fastpathPacketThreshold, initOnly)
if err != nil {
return nil, fmt.Errorf("unable to create ipv4 proxier: %v", err)
}

ipv6Proxier, err := NewProxier(ctx, v1.IPv6Protocol,
syncPeriod, minSyncPeriod, masqueradeAll, masqueradeBit,
localDetectors[v1.IPv6Protocol], hostname, nodeIPs[v1.IPv6Protocol],
recorder, healthzServer, nodePortAddresses, initOnly)
recorder, healthzServer, nodePortAddresses, fastpathPacketThreshold, initOnly)
if err != nil {
return nil, fmt.Errorf("unable to create ipv6 proxier: %v", err)
}
Expand Down Expand Up @@ -195,12 +203,13 @@ type Proxier struct {

logger klog.Logger

clusterIPs *nftElementStorage
serviceIPs *nftElementStorage
firewallIPs *nftElementStorage
noEndpointServices *nftElementStorage
noEndpointNodePorts *nftElementStorage
serviceNodePorts *nftElementStorage
fastpathPacketThreshold int
clusterIPs *nftElementStorage
serviceIPs *nftElementStorage
firewallIPs *nftElementStorage
noEndpointServices *nftElementStorage
noEndpointNodePorts *nftElementStorage
serviceNodePorts *nftElementStorage
}

// Proxier implements proxy.Provider
Expand All @@ -221,6 +230,7 @@ func NewProxier(ctx context.Context,
recorder events.EventRecorder,
healthzServer *healthcheck.ProxierHealthServer,
nodePortAddressStrings []string,
fastpathPacketThreshold int,
initOnly bool,
) (*Proxier, error) {
logger := klog.LoggerWithValues(klog.FromContext(ctx), "ipFamily", ipFamily)
Expand All @@ -245,33 +255,34 @@ func NewProxier(ctx context.Context,
serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder, nodePortAddresses, healthzServer)

proxier := &Proxier{
ipFamily: ipFamily,
svcPortMap: make(proxy.ServicePortMap),
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil),
endpointsMap: make(proxy.EndpointsMap),
endpointsChanges: proxy.NewEndpointsChangeTracker(hostname, newEndpointInfo, ipFamily, recorder, nil),
needFullSync: true,
syncPeriod: syncPeriod,
nftables: nft,
masqueradeAll: masqueradeAll,
masqueradeMark: masqueradeMark,
conntrack: conntrack.New(),
localDetector: localDetector,
hostname: hostname,
nodeIP: nodeIP,
recorder: recorder,
serviceHealthServer: serviceHealthServer,
healthzServer: healthzServer,
nodePortAddresses: nodePortAddresses,
networkInterfacer: proxyutil.RealNetwork{},
staleChains: make(map[string]time.Time),
logger: logger,
clusterIPs: newNFTElementStorage("set", clusterIPsSet),
serviceIPs: newNFTElementStorage("map", serviceIPsMap),
firewallIPs: newNFTElementStorage("map", firewallIPsMap),
noEndpointServices: newNFTElementStorage("map", noEndpointServicesMap),
noEndpointNodePorts: newNFTElementStorage("map", noEndpointNodePortsMap),
serviceNodePorts: newNFTElementStorage("map", serviceNodePortsMap),
ipFamily: ipFamily,
svcPortMap: make(proxy.ServicePortMap),
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil),
endpointsMap: make(proxy.EndpointsMap),
endpointsChanges: proxy.NewEndpointsChangeTracker(hostname, newEndpointInfo, ipFamily, recorder, nil),
needFullSync: true,
syncPeriod: syncPeriod,
nftables: nft,
masqueradeAll: masqueradeAll,
masqueradeMark: masqueradeMark,
conntrack: conntrack.New(),
localDetector: localDetector,
hostname: hostname,
nodeIP: nodeIP,
recorder: recorder,
serviceHealthServer: serviceHealthServer,
healthzServer: healthzServer,
nodePortAddresses: nodePortAddresses,
networkInterfacer: proxyutil.RealNetwork{},
staleChains: make(map[string]time.Time),
logger: logger,
clusterIPs: newNFTElementStorage("set", clusterIPsSet),
serviceIPs: newNFTElementStorage("map", serviceIPsMap),
firewallIPs: newNFTElementStorage("map", firewallIPsMap),
noEndpointServices: newNFTElementStorage("map", noEndpointServicesMap),
noEndpointNodePorts: newNFTElementStorage("map", noEndpointNodePortsMap),
serviceNodePorts: newNFTElementStorage("map", serviceNodePortsMap),
fastpathPacketThreshold: fastpathPacketThreshold,
}

burstSyncs := 2
Expand Down Expand Up @@ -707,6 +718,21 @@ func (proxier *Proxier) setupNFTables(tx *knftables.Transaction) {
})
}

// Offload the connection after the defined number of packets
if proxier.fastpathPacketThreshold > 0 {
tx.Add(&knftables.Flowtable{
Name: serviceFlowTable,
})
tx.Add(&knftables.Rule{
Chain: filterForwardChain,
Rule: knftables.Concat(
"ct original", ipX, "daddr", "@", clusterIPsSet,
"ct packets >", proxier.fastpathPacketThreshold,
"flow offload", "@", serviceFlowTable,
),
})
}

// flush containers
proxier.clusterIPs.reset(tx)
proxier.serviceIPs.reset(tx)
Expand Down Expand Up @@ -747,13 +773,98 @@ func (proxier *Proxier) Sync() {
proxier.syncRunner.Run()
}

func (proxier *Proxier) syncNodeInterfaces() {
minInterval := 5 * time.Second
maxInterval := 1 * time.Minute
rateLimiter := rate.NewLimiter(rate.Every(minInterval), 1)
// Resources are published periodically or if there is a netlink notification
// indicating a new interfaces was added or changed
nlChannel := make(chan netlink.LinkUpdate)
doneCh := make(chan struct{})
defer close(doneCh)
if err := netlink.LinkSubscribe(nlChannel, doneCh); err != nil {
proxier.logger.Error(err, "error subscribing to netlink interfaces, only syncing periodically", "interval", maxInterval.String())
}

for {
err := rateLimiter.Wait(context.Background())
if err != nil {
proxier.logger.Error(err, "unexpected rate limited error trying to get system interfaces")
}
ifnames, err := proxier.getNodeInterfaces()
if err != nil {
proxier.logger.Error(err, "failed to list system network interfaces")
}
if len(ifnames) > 0 {
tx := proxier.nftables.NewTransaction()
tx.Add(&knftables.Flowtable{
Name: serviceFlowTable,
Devices: ifnames,
})
err := proxier.nftables.Run(context.Background(), tx)
if err != nil {
proxier.logger.Error(err, "failed to add network interfaces to the flowtable")
}
}

select {
// trigger a reconcile
case <-nlChannel:
// drain the channel so we only sync once
for len(nlChannel) > 0 {
<-nlChannel
}
case <-time.After(maxInterval):
}
}

}

func (proxier *Proxier) getNodeInterfaces() ([]string, error) {
ifNames := []string{}

ifaces, err := net.Interfaces()
if err != nil {
return ifNames, err
}

for _, iface := range ifaces {
klog.V(7).InfoS("Checking network interface", "name", iface.Name)
// skip down interfaces
if iface.Flags&net.FlagUp == 0 {
continue
}
// skip loopback interfaces
if iface.Flags&net.FlagLoopback != 0 {
continue
}

var ifaceType string
link, err := netlink.LinkByName(iface.Name)
// allow to match only by name if it is not possible to get the type
if err != nil {
klog.ErrorS(err, "error getting link by name")
} else {
ifaceType = link.Type()
}

klog.V(7).InfoS("Checking network interface", "name", iface.Name, "type", ifaceType)
}

return ifNames, nil
}

// SyncLoop runs periodic work. This is expected to run as a goroutine or as the main loop of the app. It does not return.
func (proxier *Proxier) SyncLoop() {
// Update healthz timestamp at beginning in case Sync() never succeeds.
if proxier.healthzServer != nil {
proxier.healthzServer.Updated(proxier.ipFamily)
}

if proxier.fastpathPacketThreshold > 0 {
go proxier.syncNodeInterfaces()
}

// synthesize "last change queued" time as the informers are syncing.
metrics.SyncProxyRulesLastQueuedTimestamp.SetToCurrentTime()
proxier.syncRunner.Loop(wait.NeverStop)
Expand Down
4 changes: 4 additions & 0 deletions staging/src/k8s.io/kube-proxy/config/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ type KubeProxyNFTablesConfiguration struct {
// '1m', '2h22m'). A value of 0 means every Service or EndpointSlice change will
// result in an immediate iptables resync.
MinSyncPeriod metav1.Duration `json:"minSyncPeriod"`
// FastpathPacketThreshold is the number of packets untile the proxy starts offloading
// the connection to the fast path and skipping the kernel stack.
// Set to 0 to disable it.
FastpathPacketThreshold *int32 `json:"fastpathPacketThreshold"`
}

// KubeProxyConntrackConfiguration contains conntrack settings for
Expand Down
Loading

0 comments on commit a211e4b

Please sign in to comment.