Skip to content

Commit

Permalink
Merge pull request #51357 from ConnorDoyle/cpu-manager-wiring-and-non…
Browse files Browse the repository at this point in the history
…epolicy

Automatic merge from submit-queue (batch tested with PRs 49971, 51357, 51616, 51649, 51372)

CPU manager wiring and `none` policy

Blocker for CPU manager #49186 (4 of 6)

* Previous PR in this series: #51140
* Next PR in this series: #51180

cc @balajismaniam @derekwaynecarr @sjenning 

**Release note**:

```release-note
NONE
```

TODO:
- [X] In-memory CPU manager state
- [x] Kubelet config value
- [x] Feature gate
- [X] None policy
- [X] Unit tests
- [X] CPU manager instantiation
- [x] Calls into CPU manager from Kubelet container runtime
  • Loading branch information
Kubernetes Submit Queue authored Sep 1, 2017
2 parents d56f6ef + 50674ec commit 08ad012
Show file tree
Hide file tree
Showing 30 changed files with 1,181 additions and 30 deletions.
2 changes: 2 additions & 0 deletions cmd/kubelet/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,8 @@ func AddKubeletConfigFlags(fs *pflag.FlagSet, c *kubeletconfig.KubeletConfigurat
fs.BoolVar(&c.CgroupsPerQOS, "cgroups-per-qos", c.CgroupsPerQOS, "Enable creation of QoS cgroup hierarchy, if true top level QoS and pod cgroups are created.")
fs.StringVar(&c.CgroupDriver, "cgroup-driver", c.CgroupDriver, "Driver that the kubelet uses to manipulate cgroups on the host. Possible values: 'cgroupfs', 'systemd'")
fs.StringVar(&c.CgroupRoot, "cgroup-root", c.CgroupRoot, "Optional root cgroup to use for pods. This is handled by the container runtime on a best effort basis. Default: '', which means use the container runtime default.")
fs.StringVar(&c.CPUManagerPolicy, "cpu-manager-policy", c.CPUManagerPolicy, "<Warning: Alpha feature> CPU Manager policy to use. Possible values: 'none'. Default: 'none'")
fs.DurationVar(&c.CPUManagerReconcilePeriod.Duration, "cpu-manager-reconcile-period", c.CPUManagerReconcilePeriod.Duration, "<Warning: Alpha feature> CPU Manager reconciliation period. Examples: '10s', or '1m'. If not supplied, defaults to `NodeStatusUpdateFrequency`")
fs.StringVar(&c.ContainerRuntime, "container-runtime", c.ContainerRuntime, "The container runtime to use. Possible values: 'docker', 'rkt'.")
fs.DurationVar(&c.RuntimeRequestTimeout.Duration, "runtime-request-timeout", c.RuntimeRequestTimeout.Duration, "Timeout of all runtime requests except long running request - pull, logs, exec and attach. When timeout exceeded, kubelet will cancel the request, throw out an error and retry later.")
fs.StringVar(&c.LockFilePath, "lock-file", c.LockFilePath, "<Warning: Alpha feature> The path to file for kubelet to use as a lock file.")
Expand Down
4 changes: 3 additions & 1 deletion cmd/kubelet/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,9 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies) (err error) {
SystemReserved: systemReserved,
HardEvictionThresholds: hardEvictionThresholds,
},
ExperimentalQOSReserved: *experimentalQOSReserved,
ExperimentalQOSReserved: *experimentalQOSReserved,
ExperimentalCPUManagerPolicy: s.CPUManagerPolicy,
ExperimentalCPUManagerReconcilePeriod: s.CPUManagerReconcilePeriod.Duration,
},
s.FailSwapOn,
kubeDeps.Recorder)
Expand Down
7 changes: 7 additions & 0 deletions pkg/features/kube_features.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,12 @@ const (
//
// Implement IPVS-based in-cluster service load balancing
SupportIPVSProxyMode utilfeature.Feature = "SupportIPVSProxyMode"

// owner: @ConnorDoyle
// alpha: v1.8
//
// Alternative container-level CPU affinity policies.
CPUManager utilfeature.Feature = "CPUManager"
)

func init() {
Expand All @@ -159,6 +165,7 @@ var defaultKubernetesFeatureGates = map[utilfeature.Feature]utilfeature.FeatureS
PodPriority: {Default: false, PreRelease: utilfeature.Alpha},
EnableEquivalenceClassCache: {Default: false, PreRelease: utilfeature.Alpha},
TaintNodesByCondition: {Default: false, PreRelease: utilfeature.Alpha},
CPUManager: {Default: false, PreRelease: utilfeature.Alpha},

// inherited features from generic apiserver, relisted here to get a conflict if it is changed
// unintentionally on either side:
Expand Down
4 changes: 4 additions & 0 deletions pkg/kubelet/apis/kubeletconfig/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,10 @@ type KubeletConfiguration struct {
CgroupRoot string
// containerRuntime is the container runtime to use.
ContainerRuntime string
// CPUManagerPolicy is the name of the policy to use.
CPUManagerPolicy string
// CPU Manager reconciliation period.
CPUManagerReconcilePeriod metav1.Duration
// remoteRuntimeEndpoint is the endpoint of remote runtime service
RemoteRuntimeEndpoint string
// remoteImageEndpoint is the endpoint of remote image service
Expand Down
6 changes: 6 additions & 0 deletions pkg/kubelet/apis/kubeletconfig/v1alpha1/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,12 @@ func SetDefaults_KubeletConfiguration(obj *KubeletConfiguration) {
if obj.NodeStatusUpdateFrequency == zeroDuration {
obj.NodeStatusUpdateFrequency = metav1.Duration{Duration: 10 * time.Second}
}
if obj.CPUManagerPolicy == "" {
obj.CPUManagerPolicy = "none"
}
if obj.CPUManagerReconcilePeriod == zeroDuration {
obj.CPUManagerReconcilePeriod = obj.NodeStatusUpdateFrequency
}
if obj.OOMScoreAdj == nil {
temp := int32(qos.KubeletOOMScoreAdj)
obj.OOMScoreAdj = &temp
Expand Down
4 changes: 4 additions & 0 deletions pkg/kubelet/apis/kubeletconfig/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,10 @@ type KubeletConfiguration struct {
CgroupDriver string `json:"cgroupDriver,omitempty"`
// containerRuntime is the container runtime to use.
ContainerRuntime string `json:"containerRuntime"`
// CPUManagerPolicy is the name of the policy to use.
CPUManagerPolicy string `json:"cpuManagerPolicy"`
// CPU Manager reconciliation period.
CPUManagerReconcilePeriod metav1.Duration `json:"cpuManagerReconcilePeriod"`
// remoteRuntimeEndpoint is the endpoint of remote runtime service
RemoteRuntimeEndpoint string `json:"remoteRuntimeEndpoint"`
// remoteImageEndpoint is the endpoint of remote image service
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,8 @@ func autoConvert_v1alpha1_KubeletConfiguration_To_kubeletconfig_KubeletConfigura
}
out.CgroupDriver = in.CgroupDriver
out.ContainerRuntime = in.ContainerRuntime
out.CPUManagerPolicy = in.CPUManagerPolicy
out.CPUManagerReconcilePeriod = in.CPUManagerReconcilePeriod
out.RemoteRuntimeEndpoint = in.RemoteRuntimeEndpoint
out.RemoteImageEndpoint = in.RemoteImageEndpoint
out.RuntimeRequestTimeout = in.RuntimeRequestTimeout
Expand Down Expand Up @@ -388,6 +390,8 @@ func autoConvert_kubeletconfig_KubeletConfiguration_To_v1alpha1_KubeletConfigura
out.SystemCgroups = in.SystemCgroups
out.CgroupRoot = in.CgroupRoot
out.ContainerRuntime = in.ContainerRuntime
out.CPUManagerPolicy = in.CPUManagerPolicy
out.CPUManagerReconcilePeriod = in.CPUManagerReconcilePeriod
out.RemoteRuntimeEndpoint = in.RemoteRuntimeEndpoint
out.RemoteImageEndpoint = in.RemoteImageEndpoint
out.RuntimeRequestTimeout = in.RuntimeRequestTimeout
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ func (in *KubeletConfiguration) DeepCopyInto(out *KubeletConfiguration) {
**out = **in
}
}
out.CPUManagerReconcilePeriod = in.CPUManagerReconcilePeriod
out.RuntimeRequestTimeout = in.RuntimeRequestTimeout
if in.LockFilePath != nil {
in, out := &in.LockFilePath, &out.LockFilePath
Expand Down
1 change: 1 addition & 0 deletions pkg/kubelet/apis/kubeletconfig/zz_generated.deepcopy.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ func (in *KubeletConfiguration) DeepCopyInto(out *KubeletConfiguration) {
out.NodeStatusUpdateFrequency = in.NodeStatusUpdateFrequency
out.ImageMinimumGCAge = in.ImageMinimumGCAge
out.VolumeStatsAggPeriod = in.VolumeStatsAggPeriod
out.CPUManagerReconcilePeriod = in.CPUManagerReconcilePeriod
out.RuntimeRequestTimeout = in.RuntimeRequestTimeout
if in.RegisterWithTaints != nil {
in, out := &in.RegisterWithTaints, &out.RegisterWithTaints
Expand Down
7 changes: 7 additions & 0 deletions pkg/kubelet/cm/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ go_library(
"container_manager.go",
"container_manager_stub.go",
"container_manager_unsupported.go",
"fake_internal_container_lifecycle.go",
"helpers_unsupported.go",
"internal_container_lifecycle.go",
"pod_container_manager_stub.go",
"pod_container_manager_unsupported.go",
"types.go",
Expand All @@ -27,9 +29,11 @@ go_library(
}),
visibility = ["//visibility:public"],
deps = [
"//pkg/kubelet/apis/cri:go_default_library",
"//pkg/kubelet/apis/kubeletconfig:go_default_library",
"//pkg/kubelet/cadvisor:go_default_library",
"//pkg/kubelet/eviction/api:go_default_library",
"//pkg/kubelet/status:go_default_library",
"//pkg/util/mount:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
Expand All @@ -41,6 +45,8 @@ go_library(
"//pkg/api:go_default_library",
"//pkg/api/v1/helper/qos:go_default_library",
"//pkg/api/v1/resource:go_default_library",
"//pkg/features:go_default_library",
"//pkg/kubelet/cm/cpumanager:go_default_library",
"//pkg/kubelet/cm/util:go_default_library",
"//pkg/kubelet/events:go_default_library",
"//pkg/kubelet/metrics:go_default_library",
Expand All @@ -57,6 +63,7 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
],
"//conditions:default": [],
}),
Expand Down
12 changes: 10 additions & 2 deletions pkg/kubelet/cm/container_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@ limitations under the License.
package cm

import (
"time"

"k8s.io/apimachinery/pkg/util/sets"
// TODO: Migrate kubelet to either use its own internal objects or client library.
"k8s.io/api/core/v1"
internalapi "k8s.io/kubernetes/pkg/kubelet/apis/cri"
"k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig"
evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api"
"k8s.io/kubernetes/pkg/kubelet/status"

"fmt"
"strconv"
Expand All @@ -35,7 +39,7 @@ type ContainerManager interface {
// Runs the container manager's housekeeping.
// - Ensures that the Docker daemon is in a container.
// - Creates the system container where all non-containerized processes run.
Start(*v1.Node, ActivePodsFunc) error
Start(*v1.Node, ActivePodsFunc, status.PodStatusProvider, internalapi.RuntimeService) error

// Returns resources allocated to system cgroups in the machine.
// These cgroups include the system and Kubernetes services.
Expand Down Expand Up @@ -66,6 +70,8 @@ type ContainerManager interface {
// UpdateQOSCgroups performs housekeeping updates to ensure that the top
// level QoS containers have their desired state in a thread-safe way
UpdateQOSCgroups() error

InternalContainerLifecycle() InternalContainerLifecycle
}

type NodeConfig struct {
Expand All @@ -78,7 +84,9 @@ type NodeConfig struct {
CgroupDriver string
ProtectKernelDefaults bool
NodeAllocatableConfig
ExperimentalQOSReserved map[v1.ResourceName]int64
ExperimentalQOSReserved map[v1.ResourceName]int64
ExperimentalCPUManagerPolicy string
ExperimentalCPUManagerReconcilePeriod time.Duration
}

type NodeAllocatableConfig struct {
Expand Down
75 changes: 69 additions & 6 deletions pkg/kubelet/cm/container_manager_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,21 @@ import (
"github.com/opencontainers/runc/libcontainer/cgroups"
"github.com/opencontainers/runc/libcontainer/cgroups/fs"
"github.com/opencontainers/runc/libcontainer/configs"

"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/record"
kubefeatures "k8s.io/kubernetes/pkg/features"
internalapi "k8s.io/kubernetes/pkg/kubelet/apis/cri"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
cmutil "k8s.io/kubernetes/pkg/kubelet/cm/util"
"k8s.io/kubernetes/pkg/kubelet/qos"
"k8s.io/kubernetes/pkg/kubelet/status"
utilfile "k8s.io/kubernetes/pkg/util/file"
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/util/oom"
Expand Down Expand Up @@ -117,6 +123,8 @@ type containerManagerImpl struct {
recorder record.EventRecorder
// Interface for QoS cgroup management
qosContainerManager QOSContainerManager
// Interface for CPU affinity management.
cpuManager cpumanager.Manager
}

type features struct {
Expand Down Expand Up @@ -216,11 +224,11 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
// It is safe to invoke `MachineInfo` on cAdvisor before logically initializing cAdvisor here because
// machine info is computed and cached once as part of cAdvisor object creation.
// But `RootFsInfo` and `ImagesFsInfo` are not available at this moment so they will be called later during manager starts
if info, err := cadvisorInterface.MachineInfo(); err == nil {
capacity = cadvisor.CapacityFromMachineInfo(info)
} else {
machineInfo, err := cadvisorInterface.MachineInfo()
if err != nil {
return nil, err
}
capacity = cadvisor.CapacityFromMachineInfo(machineInfo)

cgroupRoot := nodeConfig.CgroupRoot
cgroupManager := NewCgroupManager(subsystems, nodeConfig.CgroupDriver)
Expand Down Expand Up @@ -250,7 +258,7 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
return nil, err
}

return &containerManagerImpl{
cm := &containerManagerImpl{
cadvisorInterface: cadvisorInterface,
mountUtil: mountUtil,
NodeConfig: nodeConfig,
Expand All @@ -260,7 +268,23 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
cgroupRoot: cgroupRoot,
recorder: recorder,
qosContainerManager: qosContainerManager,
}, nil
}

// Initialize CPU manager
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CPUManager) {
cm.cpuManager, err = cpumanager.NewManager(
nodeConfig.ExperimentalCPUManagerPolicy,
nodeConfig.ExperimentalCPUManagerReconcilePeriod,
machineInfo,
cm.GetNodeAllocatableReservation(),
)
if err != nil {
glog.Errorf("failed to initialize cpu manager: %v", err)
return nil, err
}
}

return cm, nil
}

// NewPodContainerManager is a factory method returns a PodContainerManager object
Expand All @@ -279,6 +303,36 @@ func (cm *containerManagerImpl) NewPodContainerManager() PodContainerManager {
}
}

func (cm *containerManagerImpl) InternalContainerLifecycle() InternalContainerLifecycle {
return &internalContainerLifecycleImpl{cm.cpuManager}
}

// Implements InternalContainerLifecycle interface.
type internalContainerLifecycleImpl struct {
cpuManager cpumanager.Manager
}

func (i *internalContainerLifecycleImpl) PreStartContainer(pod *v1.Pod, container *v1.Container, containerID string) error {
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CPUManager) {
return i.cpuManager.AddContainer(pod, container, containerID)
}
return nil
}

func (i *internalContainerLifecycleImpl) PreStopContainer(containerID string) error {
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CPUManager) {
return i.cpuManager.RemoveContainer(containerID)
}
return nil
}

func (i *internalContainerLifecycleImpl) PostStopContainer(containerID string) error {
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CPUManager) {
return i.cpuManager.RemoveContainer(containerID)
}
return nil
}

// Create a cgroup container manager.
func createManager(containerName string) *fs.Manager {
allowAllDevices := true
Expand Down Expand Up @@ -485,7 +539,16 @@ func (cm *containerManagerImpl) Status() Status {
return cm.status
}

func (cm *containerManagerImpl) Start(node *v1.Node, activePods ActivePodsFunc) error {
func (cm *containerManagerImpl) Start(node *v1.Node,
activePods ActivePodsFunc,
podStatusProvider status.PodStatusProvider,
runtimeService internalapi.RuntimeService) error {

// Initialize CPU manager
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CPUManager) {
cm.cpuManager.Start(cpumanager.ActivePodsFunc(activePods), podStatusProvider, runtimeService)
}

// cache the node Info including resource capacity and
// allocatable of the node
cm.nodeInfo = node
Expand Down
9 changes: 8 additions & 1 deletion pkg/kubelet/cm/container_manager_stub.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@ package cm
import (
"github.com/golang/glog"
"k8s.io/api/core/v1"

internalapi "k8s.io/kubernetes/pkg/kubelet/apis/cri"
"k8s.io/kubernetes/pkg/kubelet/status"
)

type containerManagerStub struct{}

var _ ContainerManager = &containerManagerStub{}

func (cm *containerManagerStub) Start(_ *v1.Node, _ ActivePodsFunc) error {
func (cm *containerManagerStub) Start(_ *v1.Node, _ ActivePodsFunc, _ status.PodStatusProvider, _ internalapi.RuntimeService) error {
glog.V(2).Infof("Starting stub container manager")
return nil
}
Expand Down Expand Up @@ -66,6 +69,10 @@ func (cm *containerManagerStub) NewPodContainerManager() PodContainerManager {
return &podContainerManagerStub{}
}

func (cm *containerManagerStub) InternalContainerLifecycle() InternalContainerLifecycle {
return nil
}

func NewStubContainerManager() ContainerManager {
return &containerManagerStub{}
}
8 changes: 7 additions & 1 deletion pkg/kubelet/cm/container_manager_unsupported.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import (

"k8s.io/api/core/v1"
"k8s.io/client-go/tools/record"
internalapi "k8s.io/kubernetes/pkg/kubelet/apis/cri"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/kubelet/status"
"k8s.io/kubernetes/pkg/util/mount"
)

Expand All @@ -32,7 +34,7 @@ type unsupportedContainerManager struct {

var _ ContainerManager = &unsupportedContainerManager{}

func (unsupportedContainerManager) Start(_ *v1.Node, _ ActivePodsFunc) error {
func (unsupportedContainerManager) Start(_ *v1.Node, _ ActivePodsFunc, _ status.PodStatusProvider, _ internalapi.RuntimeService) error {
return fmt.Errorf("Container Manager is unsupported in this build")
}

Expand Down Expand Up @@ -72,6 +74,10 @@ func (cm *unsupportedContainerManager) NewPodContainerManager() PodContainerMana
return &unsupportedPodContainerManager{}
}

func (cm *unsupportedContainerManager) InternalContainerLifecycle() InternalContainerLifecycle {
return nil
}

func NewContainerManager(_ mount.Interface, _ cadvisor.Interface, _ NodeConfig, failSwapOn bool, recorder record.EventRecorder) (ContainerManager, error) {
return &unsupportedContainerManager{}, nil
}
Loading

0 comments on commit 08ad012

Please sign in to comment.