Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deviceplugin jiayingz #51209

Merged
merged 5 commits into from
Sep 2, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cmd/kubelet/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,9 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies) (err error) {
if err != nil {
return err
}

devicePluginEnabled := utilfeature.DefaultFeatureGate.Enabled(features.DevicePlugins)

kubeDeps.ContainerManager, err = cm.NewContainerManager(
kubeDeps.Mounter,
kubeDeps.CAdvisorInterface,
Expand All @@ -450,6 +453,7 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies) (err error) {
ExperimentalQOSReserved: *experimentalQOSReserved,
},
s.FailSwapOn,
devicePluginEnabled,
kubeDeps.Recorder)

if err != nil {
Expand Down
8 changes: 8 additions & 0 deletions pkg/features/kube_features.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,13 @@ const (
// Works only with Docker Container Runtime.
Accelerators utilfeature.Feature = "Accelerators"

// owner: @jiayingz
// alpha: v1.8
//
// Enables support for Device Plugins
// Only Nvidia GPUs are tested as of v1.8.
DevicePlugins utilfeature.Feature = "DevicePlugins"

// owner: @gmarek
// alpha: v1.6
//
Expand Down Expand Up @@ -155,6 +162,7 @@ var defaultKubernetesFeatureGates = map[utilfeature.Feature]utilfeature.FeatureS
ExperimentalHostUserNamespaceDefaultingGate: {Default: false, PreRelease: utilfeature.Beta},
ExperimentalCriticalPodAnnotation: {Default: false, PreRelease: utilfeature.Alpha},
Accelerators: {Default: false, PreRelease: utilfeature.Alpha},
DevicePlugins: {Default: false, PreRelease: utilfeature.Alpha},
TaintBasedEvictions: {Default: false, PreRelease: utilfeature.Alpha},
RotateKubeletServerCertificate: {Default: false, PreRelease: utilfeature.Alpha},
RotateKubeletClientCertificate: {Default: true, PreRelease: utilfeature.Beta},
Expand Down
1 change: 1 addition & 0 deletions pkg/kubelet/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ filegroup(
"//pkg/kubelet/configmap:all-srcs",
"//pkg/kubelet/container:all-srcs",
"//pkg/kubelet/custommetrics:all-srcs",
"//pkg/kubelet/deviceplugin:all-srcs",
"//pkg/kubelet/dockershim:all-srcs",
"//pkg/kubelet/envvars:all-srcs",
"//pkg/kubelet/events:all-srcs",
Expand Down
18 changes: 14 additions & 4 deletions pkg/kubelet/cm/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ go_library(
"container_manager.go",
"container_manager_stub.go",
"container_manager_unsupported.go",
"device_plugin_handler.go",
"device_plugin_handler_stub.go",
"helpers_unsupported.go",
"pod_container_manager_stub.go",
"pod_container_manager_unsupported.go",
Expand All @@ -27,12 +29,16 @@ go_library(
}),
visibility = ["//visibility:public"],
deps = [
"//pkg/kubelet/apis/deviceplugin/v1alpha1:go_default_library",
"//pkg/kubelet/apis/kubeletconfig:go_default_library",
"//pkg/kubelet/cadvisor:go_default_library",
"//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/deviceplugin:go_default_library",
"//pkg/kubelet/eviction/api:go_default_library",
"//pkg/util/mount:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library",
Expand All @@ -54,7 +60,6 @@ go_library(
"//vendor/github.com/opencontainers/runc/libcontainer/cgroups/fs:go_default_library",
"//vendor/github.com/opencontainers/runc/libcontainer/cgroups/systemd:go_default_library",
"//vendor/github.com/opencontainers/runc/libcontainer/configs:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
],
Expand All @@ -66,6 +71,7 @@ go_test(
name = "go_default_test",
srcs = [
"container_manager_unsupported_test.go",
"device_plugin_handler_test.go",
] + select({
"@io_bazel_rules_go//go/platform:linux_amd64": [
"cgroup_manager_linux_test.go",
Expand All @@ -78,15 +84,19 @@ go_test(
}),
library = ":go_default_library",
deps = [
"//pkg/kubelet/apis/deviceplugin/v1alpha1:go_default_library",
"//pkg/util/mount:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
] + select({
"@io_bazel_rules_go//go/platform:linux_amd64": [
"//pkg/kubelet/apis/kubeletconfig:go_default_library",
"//pkg/kubelet/eviction/api:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/github.com/stretchr/testify/require:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
],
"//conditions:default": [],
}),
Expand Down
5 changes: 5 additions & 0 deletions pkg/kubelet/cm/container_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
// TODO: Migrate kubelet to either use its own internal objects or client library.
"k8s.io/api/core/v1"
"k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api"

"fmt"
Expand Down Expand Up @@ -66,6 +67,10 @@ type ContainerManager interface {
// UpdateQOSCgroups performs housekeeping updates to ensure that the top
// level QoS containers have their desired state in a thread-safe way
UpdateQOSCgroups() error

// Returns RunContainerOptions with devices, mounts, and env fields populated for
// extended resources required by container.
GetResources(pod *v1.Pod, container *v1.Container, activePods []*v1.Pod) (*kubecontainer.RunContainerOptions, error)
}

type NodeConfig struct {
Expand Down
108 changes: 105 additions & 3 deletions pkg/kubelet/cm/container_manager_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
cmutil "k8s.io/kubernetes/pkg/kubelet/cm/util"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/qos"
utilfile "k8s.io/kubernetes/pkg/util/file"
"k8s.io/kubernetes/pkg/util/mount"
Expand Down Expand Up @@ -117,6 +118,8 @@ type containerManagerImpl struct {
recorder record.EventRecorder
// Interface for QoS cgroup management
qosContainerManager QOSContainerManager
// Interface for exporting and allocating devices reported by device plugins.
devicePluginHandler DevicePluginHandler
}

type features struct {
Expand Down Expand Up @@ -179,7 +182,7 @@ func validateSystemRequirements(mountUtil mount.Interface) (features, error) {
// TODO(vmarmol): Add limits to the system containers.
// Takes the absolute name of the specified containers.
// Empty container name disables use of the specified container.
func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.Interface, nodeConfig NodeConfig, failSwapOn bool, recorder record.EventRecorder) (ContainerManager, error) {
func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.Interface, nodeConfig NodeConfig, failSwapOn bool, devicePluginEnabled bool, recorder record.EventRecorder) (ContainerManager, error) {
subsystems, err := GetCgroupSubsystems()
if err != nil {
return nil, fmt.Errorf("failed to get mounted cgroup subsystems: %v", err)
Expand Down Expand Up @@ -250,7 +253,7 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
return nil, err
}

return &containerManagerImpl{
cm := &containerManagerImpl{
cadvisorInterface: cadvisorInterface,
mountUtil: mountUtil,
NodeConfig: nodeConfig,
Expand All @@ -260,7 +263,31 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
cgroupRoot: cgroupRoot,
recorder: recorder,
qosContainerManager: qosContainerManager,
}, nil
}

updateDeviceCapacityFunc := func(updates v1.ResourceList) {
cm.Lock()
defer cm.Unlock()
for k, v := range updates {
if v.Value() <= 0 {
delete(cm.capacity, k)
} else {
cm.capacity[k] = v
}
}
}

glog.Infof("Creating device plugin handler: %t", devicePluginEnabled)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should log level be used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a one-time log, so probably ok to stay as info.

if devicePluginEnabled {
cm.devicePluginHandler, err = NewDevicePluginHandlerImpl(updateDeviceCapacityFunc)
} else {
cm.devicePluginHandler, err = NewDevicePluginHandlerStub()
}
if err != nil {
return nil, err
}

return cm, nil
}

// NewPodContainerManager is a factory method returns a PodContainerManager object
Expand Down Expand Up @@ -543,6 +570,11 @@ func (cm *containerManagerImpl) Start(node *v1.Node, activePods ActivePodsFunc)
}
close(stopChan)
}, time.Second, stopChan)

// Starts device plugin manager.
if err := cm.devicePluginHandler.Start(); err != nil {
return err
}
return nil
}

Expand All @@ -560,6 +592,76 @@ func (cm *containerManagerImpl) setFsCapacity() error {
return nil
}

// TODO: move the GetResources logic to PodContainerManager.
func (cm *containerManagerImpl) GetResources(pod *v1.Pod, container *v1.Container, activePods []*v1.Pod) (*kubecontainer.RunContainerOptions, error) {
opts := &kubecontainer.RunContainerOptions{}
// Gets devices, mounts, and envs from device plugin handler.
glog.V(3).Infof("Calling devicePluginHandler AllocateDevices")
// Maps to detect duplicate settings.
devsMap := make(map[string]string)
mountsMap := make(map[string]string)
envsMap := make(map[string]string)
allocResps, err := cm.devicePluginHandler.Allocate(pod, container, activePods)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally, allocation of resources would happen as part of pod creation so as to fail pods during admission whose extended resource request cannot be satisfied by the kubelet due to any reason.
For example, a device may have just turned unhealthy and the scheduler may not have noticed it. Note there can be multiple entities scheduling pods on to nodes.
Node level admission control is the last line of defence against state propagation issues in k8s.
If not for allocation during pod admission, I'd at-least like to see an admission handler (we have quite a few of them), that validates that devices are available to satisfy a pod's request. Essentially a dry run of the logic here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree that sounds a more ideal handling, which probably will be addressed after 1.8 when we try to move the GetResources logic to PodContainerManager.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you file an issue with the background information just that it doesn't get lost? You can assign the issue to yourself since you plan on fixing it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filed #51592
Leave it unassigned for now as I don't have enough background information on this. It probably needs more discussions with sig-node.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on what @vishh said.

if err != nil {
return opts, err
}
// Loops through AllocationResponses of all required extended resources.
for _, resp := range allocResps {
// Loops through runtime spec of all devices of the given resource.
for _, devRuntime := range resp.Spec {
// Updates RunContainerOptions.Devices.
for _, dev := range devRuntime.Devices {
if d, ok := devsMap[dev.ContainerPath]; ok {
glog.V(3).Infof("skip existing device %s %s", dev.ContainerPath, dev.HostPath)
if d != dev.HostPath {
glog.Errorf("Container device %s has conflicting mapping host devices: %s and %s",
dev.ContainerPath, d, dev.HostPath)
}
continue
}
devsMap[dev.ContainerPath] = dev.HostPath
opts.Devices = append(opts.Devices, kubecontainer.DeviceInfo{
PathOnHost: dev.HostPath,
PathInContainer: dev.ContainerPath,
Permissions: dev.Permissions,
})
}
// Updates RunContainerOptions.Mounts.
for _, mount := range devRuntime.Mounts {
if m, ok := mountsMap[mount.ContainerPath]; ok {
glog.V(3).Infof("skip existing mount %s %s", mount.ContainerPath, mount.HostPath)
if m != mount.HostPath {
glog.Errorf("Container mount %s has conflicting mapping host mounts: %s and %s",
mount.ContainerPath, m, mount.HostPath)
}
continue
}
mountsMap[mount.ContainerPath] = mount.HostPath
opts.Mounts = append(opts.Mounts, kubecontainer.Mount{
Name: mount.ContainerPath,
ContainerPath: mount.ContainerPath,
HostPath: mount.HostPath,
ReadOnly: mount.ReadOnly,
SELinuxRelabel: false,
})
}
// Updates RunContainerOptions.Envs.
for k, v := range devRuntime.Envs {
if e, ok := envsMap[k]; ok {
glog.V(3).Infof("skip existing envs %s %s", k, v)
if e != v {
glog.Errorf("Environment variable %s has conflicting setting: %s and %s", k, e, v)
}
continue
}
envsMap[k] = v
opts.Envs = append(opts.Envs, kubecontainer.EnvVar{Name: k, Value: v})
}
}
}
return opts, nil
}

func (cm *containerManagerImpl) SystemCgroupsLimit() v1.ResourceList {
cpuLimit := int64(0)

Expand Down
6 changes: 6 additions & 0 deletions pkg/kubelet/cm/container_manager_stub.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package cm
import (
"github.com/golang/glog"
"k8s.io/api/core/v1"

kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
)

type containerManagerStub struct{}
Expand Down Expand Up @@ -66,6 +68,10 @@ func (cm *containerManagerStub) NewPodContainerManager() PodContainerManager {
return &podContainerManagerStub{}
}

func (cm *containerManagerStub) GetResources(pod *v1.Pod, container *v1.Container, activePods []*v1.Pod) (*kubecontainer.RunContainerOptions, error) {
return &kubecontainer.RunContainerOptions{}, nil
}

func NewStubContainerManager() ContainerManager {
return &containerManagerStub{}
}
6 changes: 5 additions & 1 deletion pkg/kubelet/cm/container_manager_unsupported.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ func (cm *unsupportedContainerManager) NewPodContainerManager() PodContainerMana
return &unsupportedPodContainerManager{}
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to add GetResources implementation here to make sure unsupportedContainerManager still implements the ContainerManager interface.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

func NewContainerManager(_ mount.Interface, _ cadvisor.Interface, _ NodeConfig, failSwapOn bool, recorder record.EventRecorder) (ContainerManager, error) {
func (cm *unsupportedContainerManager) GetResources(pod *v1.Pod, container *v1.Container, activePods []*v1.Pod) (*kubecontainer.RunContainerOptions, error) {
return &kubecontainer.RunContainerOptions{}, nil
}

func NewContainerManager(_ mount.Interface, _ cadvisor.Interface, _ NodeConfig, failSwapOn bool, devicePluginEnabled bool, recorder record.EventRecorder) (ContainerManager, error) {
return &unsupportedContainerManager{}, nil
}
2 changes: 1 addition & 1 deletion pkg/kubelet/cm/container_manager_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,6 @@ func (cm *containerManagerImpl) Start(_ *v1.Node, _ ActivePodsFunc) error {
return nil
}

func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.Interface, nodeConfig NodeConfig, failSwapOn bool, recorder record.EventRecorder) (ContainerManager, error) {
func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.Interface, nodeConfig NodeConfig, failSwapOn bool, devicePluginEnabled bool, recorder record.EventRecorder) (ContainerManager, error) {
return &containerManagerImpl{}, nil
}
Loading