Skip to content

Commit

Permalink
Merge pull request kubernetes#102015 from klueska/upstream-add-numa-t…
Browse files Browse the repository at this point in the history
…o-cpu-assignment-algo

Add support for consuming whole NUMA nodes in CPUManager CPU assignments
  • Loading branch information
k8s-ci-robot authored Oct 15, 2021
2 parents 655c04d + 4bae656 commit 55e1d2f
Show file tree
Hide file tree
Showing 5 changed files with 1,009 additions and 93 deletions.
195 changes: 168 additions & 27 deletions pkg/kubelet/cm/cpumanager/cpu_assignment.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,138 @@ import (
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
)

type numaOrSocketsFirstFuncs interface {
takeFullFirstLevel()
takeFullSecondLevel()
sortAvailableNUMANodes() []int
sortAvailableSockets() []int
sortAvailableCores() []int
}

type numaFirst struct{ acc *cpuAccumulator }
type socketsFirst struct{ acc *cpuAccumulator }

var _ numaOrSocketsFirstFuncs = (*numaFirst)(nil)
var _ numaOrSocketsFirstFuncs = (*socketsFirst)(nil)

// If NUMA nodes are higher in the memory hierarchy than sockets, then we take
// from the set of NUMA Nodes as the first level.
func (n *numaFirst) takeFullFirstLevel() {
n.acc.takeFullNUMANodes()
}

// If NUMA nodes are higher in the memory hierarchy than sockets, then we take
// from the set of sockets as the second level.
func (n *numaFirst) takeFullSecondLevel() {
n.acc.takeFullSockets()
}

// If NUMA nodes are higher in the memory hierarchy than sockets, then just
// sort the NUMA nodes directly, and return them.
func (n *numaFirst) sortAvailableNUMANodes() []int {
numas := n.acc.details.NUMANodes().ToSliceNoSort()
n.acc.sort(numas, n.acc.details.CPUsInNUMANodes)
return numas
}

// If NUMA nodes are higher in the memory hierarchy than sockets, then we need
// to pull the set of sockets out of each sorted NUMA node, and accumulate the
// partial order across them.
func (n *numaFirst) sortAvailableSockets() []int {
var result []int
for _, numa := range n.sortAvailableNUMANodes() {
sockets := n.acc.details.SocketsInNUMANodes(numa).ToSliceNoSort()
n.acc.sort(sockets, n.acc.details.CPUsInSockets)
result = append(result, sockets...)
}
return result
}

// If NUMA nodes are higher in the memory hierarchy than sockets, then
// cores sit directly below sockets in the memory hierarchy.
func (n *numaFirst) sortAvailableCores() []int {
var result []int
for _, socket := range n.acc.sortAvailableSockets() {
cores := n.acc.details.CoresInSockets(socket).ToSliceNoSort()
n.acc.sort(cores, n.acc.details.CPUsInCores)
result = append(result, cores...)
}
return result
}

// If sockets are higher in the memory hierarchy than NUMA nodes, then we take
// from the set of sockets as the first level.
func (s *socketsFirst) takeFullFirstLevel() {
s.acc.takeFullSockets()
}

// If sockets are higher in the memory hierarchy than NUMA nodes, then we take
// from the set of NUMA Nodes as the second level.
func (s *socketsFirst) takeFullSecondLevel() {
s.acc.takeFullNUMANodes()
}

// If sockets are higher in the memory hierarchy than NUMA nodes, then we need
// to pull the set of NUMA nodes out of each sorted Socket, and accumulate the
// partial order across them.
func (s *socketsFirst) sortAvailableNUMANodes() []int {
var result []int
for _, socket := range s.sortAvailableSockets() {
numas := s.acc.details.NUMANodesInSockets(socket).ToSliceNoSort()
s.acc.sort(numas, s.acc.details.CPUsInNUMANodes)
result = append(result, numas...)
}
return result
}

// If sockets are higher in the memory hierarchy than NUMA nodes, then just
// sort the sockets directly, and return them.
func (s *socketsFirst) sortAvailableSockets() []int {
sockets := s.acc.details.Sockets().ToSliceNoSort()
s.acc.sort(sockets, s.acc.details.CPUsInSockets)
return sockets
}

// If sockets are higher in the memory hierarchy than NUMA nodes, then cores
// sit directly below NUMA Nodes in the memory hierarchy.
func (s *socketsFirst) sortAvailableCores() []int {
var result []int
for _, numa := range s.acc.sortAvailableNUMANodes() {
cores := s.acc.details.CoresInNUMANodes(numa).ToSliceNoSort()
s.acc.sort(cores, s.acc.details.CPUsInCores)
result = append(result, cores...)
}
return result
}

type cpuAccumulator struct {
topo *topology.CPUTopology
details topology.CPUDetails
numCPUsNeeded int
result cpuset.CPUSet
topo *topology.CPUTopology
details topology.CPUDetails
numCPUsNeeded int
result cpuset.CPUSet
numaOrSocketsFirst numaOrSocketsFirstFuncs
}

func newCPUAccumulator(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int) *cpuAccumulator {
return &cpuAccumulator{
acc := &cpuAccumulator{
topo: topo,
details: topo.CPUDetails.KeepOnly(availableCPUs),
numCPUsNeeded: numCPUs,
result: cpuset.NewCPUSet(),
}

if topo.NumSockets >= topo.NumNUMANodes {
acc.numaOrSocketsFirst = &numaFirst{acc}
} else {
acc.numaOrSocketsFirst = &socketsFirst{acc}
}

return acc
}

// Returns true if the supplied NUMANode is fully available in `topoDetails`.
func (a *cpuAccumulator) isNUMANodeFree(numaID int) bool {
return a.details.CPUsInNUMANodes(numaID).Size() == a.topo.CPUDetails.CPUsInNUMANodes(numaID).Size()
}

// Returns true if the supplied socket is fully available in `topoDetails`.
Expand All @@ -52,6 +170,17 @@ func (a *cpuAccumulator) isCoreFree(coreID int) bool {
return a.details.CPUsInCores(coreID).Size() == a.topo.CPUsPerCore()
}

// Returns free NUMA Node IDs as a slice sorted by sortAvailableNUMANodes().
func (a *cpuAccumulator) freeNUMANodes() []int {
free := []int{}
for _, numa := range a.sortAvailableNUMANodes() {
if a.isNUMANodeFree(numa) {
free = append(free, numa)
}
}
return free
}

// Returns free socket IDs as a slice sorted by sortAvailableSockets().
func (a *cpuAccumulator) freeSockets() []int {
free := []int{}
Expand Down Expand Up @@ -79,12 +208,12 @@ func (a *cpuAccumulator) freeCPUs() []int {
return a.sortAvailableCPUs()
}

// Sorts the provided list of sockets/cores/cpus referenced in 'ids' by the
// number of available CPUs contained within them (smallest to largest). The
// 'getCPU()' paramater defines the function that should be called to retrieve
// the list of available CPUs for the type of socket/core/cpu being referenced.
// If two sockets/cores/cpus have the same number of available CPUs, they are
// sorted in ascending order by their id.
// Sorts the provided list of NUMA nodes/sockets/cores/cpus referenced in 'ids'
// by the number of available CPUs contained within them (smallest to largest).
// The 'getCPU()' paramater defines the function that should be called to
// retrieve the list of available CPUs for the type being referenced. If two
// NUMA nodes/sockets/cores/cpus have the same number of available CPUs, they
// are sorted in ascending order by their id.
func (a *cpuAccumulator) sort(ids []int, getCPUs func(ids ...int) cpuset.CPUSet) {
sort.Slice(ids,
func(i, j int) bool {
Expand All @@ -100,24 +229,19 @@ func (a *cpuAccumulator) sort(ids []int, getCPUs func(ids ...int) cpuset.CPUSet)
})
}

// Sort all sockets with free CPUs using the sort() algorithm defined above.
// Sort all NUMA nodes with free CPUs.
func (a *cpuAccumulator) sortAvailableNUMANodes() []int {
return a.numaOrSocketsFirst.sortAvailableNUMANodes()
}

// Sort all sockets with free CPUs.
func (a *cpuAccumulator) sortAvailableSockets() []int {
sockets := a.details.Sockets().ToSliceNoSort()
a.sort(sockets, a.details.CPUsInSockets)
return sockets
return a.numaOrSocketsFirst.sortAvailableSockets()
}

// Sort all cores with free CPUs:
// - First by socket using sortAvailableSockets().
// - Then within each socket, using the sort() algorithm defined above.
func (a *cpuAccumulator) sortAvailableCores() []int {
var result []int
for _, socket := range a.sortAvailableSockets() {
cores := a.details.CoresInSockets(socket).ToSliceNoSort()
a.sort(cores, a.details.CPUsInCores)
result = append(result, cores...)
}
return result
return a.numaOrSocketsFirst.sortAvailableCores()
}

// Sort all available CPUs:
Expand All @@ -139,6 +263,17 @@ func (a *cpuAccumulator) take(cpus cpuset.CPUSet) {
a.numCPUsNeeded -= cpus.Size()
}

func (a *cpuAccumulator) takeFullNUMANodes() {
for _, numa := range a.freeNUMANodes() {
cpusInNUMANode := a.topo.CPUDetails.CPUsInNUMANodes(numa)
if !a.needs(cpusInNUMANode.Size()) {
continue
}
klog.V(4).InfoS("takeFullNUMANodes: claiming NUMA node", "numa", numa)
a.take(cpusInNUMANode)
}
}

func (a *cpuAccumulator) takeFullSockets() {
for _, socket := range a.freeSockets() {
cpusInSocket := a.topo.CPUDetails.CPUsInSockets(socket)
Expand Down Expand Up @@ -193,9 +328,15 @@ func takeByTopology(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, num
}

// Algorithm: topology-aware best-fit
// 1. Acquire whole sockets, if available and the container requires at
// least a socket's-worth of CPUs.
acc.takeFullSockets()
// 1. Acquire whole NUMA nodes and sockets, if available and the container
// requires at least a NUMA node or socket's-worth of CPUs. If NUMA
// Nodes map to 1 or more sockets, pull from NUMA nodes first.
// Otherwise pull from sockets first.
acc.numaOrSocketsFirst.takeFullFirstLevel()
if acc.isSatisfied() {
return acc.result, nil
}
acc.numaOrSocketsFirst.takeFullSecondLevel()
if acc.isSatisfied() {
return acc.result, nil
}
Expand Down
Loading

0 comments on commit 55e1d2f

Please sign in to comment.