From b563101efbb09d9d63512e53672ce06eb1694d2e Mon Sep 17 00:00:00 2001 From: Renaud Gaubert Date: Thu, 10 Aug 2017 17:14:34 -0700 Subject: [PATCH 1/5] Added Device Plugin Manager --- pkg/kubelet/BUILD | 1 + .../apis/deviceplugin/v1alpha1/constants.go | 18 ++ pkg/kubelet/deviceplugin/BUILD | 38 ++++ pkg/kubelet/deviceplugin/endpoint.go | 196 ++++++++++++++++++ pkg/kubelet/deviceplugin/manager.go | 193 +++++++++++++++++ pkg/kubelet/deviceplugin/types.go | 74 +++++++ pkg/kubelet/deviceplugin/utils.go | 76 +++++++ 7 files changed, 596 insertions(+) create mode 100644 pkg/kubelet/deviceplugin/BUILD create mode 100644 pkg/kubelet/deviceplugin/endpoint.go create mode 100644 pkg/kubelet/deviceplugin/manager.go create mode 100644 pkg/kubelet/deviceplugin/types.go create mode 100644 pkg/kubelet/deviceplugin/utils.go diff --git a/pkg/kubelet/BUILD b/pkg/kubelet/BUILD index 979ef5e2d22fa..257cea02d9a3a 100644 --- a/pkg/kubelet/BUILD +++ b/pkg/kubelet/BUILD @@ -253,6 +253,7 @@ filegroup( "//pkg/kubelet/configmap:all-srcs", "//pkg/kubelet/container:all-srcs", "//pkg/kubelet/custommetrics:all-srcs", + "//pkg/kubelet/deviceplugin:all-srcs", "//pkg/kubelet/dockershim:all-srcs", "//pkg/kubelet/envvars:all-srcs", "//pkg/kubelet/events:all-srcs", diff --git a/pkg/kubelet/apis/deviceplugin/v1alpha1/constants.go b/pkg/kubelet/apis/deviceplugin/v1alpha1/constants.go index fb0440cce59cf..b771ec8c960dc 100644 --- a/pkg/kubelet/apis/deviceplugin/v1alpha1/constants.go +++ b/pkg/kubelet/apis/deviceplugin/v1alpha1/constants.go @@ -30,4 +30,22 @@ const ( DevicePluginPath = "/var/lib/kubelet/device-plugins/" // KubeletSocket is the path of the Kubelet registry socket KubeletSocket = DevicePluginPath + "kubelet.sock" + + // InvalidChars are the characters that may not appear in a Vendor or Kind field + InvalidChars = "/ " + + // ErrFailedToDialDevicePlugin is the error raised when the device plugin could not be + // reached on the registered socket + ErrFailedToDialDevicePlugin = "Failed to dial device plugin:" + // ErrUnsuportedVersion is the error raised when the device plugin uses an API version not + // supported by the Kubelet registry + ErrUnsuportedVersion = "Unsupported version" + // ErrDevicePluginAlreadyExists is the error raised when a device plugin with the + // same Resource Name tries to register itself + ErrDevicePluginAlreadyExists = "Another device plugin already registered this Resource Name" + // ErrInvalidResourceName is the error raised when a device plugin is registering + // itself with an invalid ResourceName + ErrInvalidResourceName = "The Resource Name is invalid" + // ErrEmptyResourceName is the error raised when the resource name field is empty + ErrEmptyResourceName = "Invalid Empty ResourceName" ) diff --git a/pkg/kubelet/deviceplugin/BUILD b/pkg/kubelet/deviceplugin/BUILD new file mode 100644 index 0000000000000..30e5891b2b3cf --- /dev/null +++ b/pkg/kubelet/deviceplugin/BUILD @@ -0,0 +1,38 @@ +package(default_visibility = ["//visibility:public"]) + +licenses(["notice"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_library", +) + +go_library( + name = "go_default_library", + srcs = [ + "endpoint.go", + "manager.go", + "types.go", + "utils.go", + ], + tags = ["automanaged"], + deps = [ + "//pkg/kubelet/apis/deviceplugin/v1alpha1:go_default_library", + "//vendor/github.com/golang/glog:go_default_library", + "//vendor/golang.org/x/net/context:go_default_library", + "//vendor/google.golang.org/grpc:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], +) diff --git a/pkg/kubelet/deviceplugin/endpoint.go b/pkg/kubelet/deviceplugin/endpoint.go new file mode 100644 index 0000000000000..2e418c5684391 --- /dev/null +++ b/pkg/kubelet/deviceplugin/endpoint.go @@ -0,0 +1,196 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package deviceplugin + +import ( + "fmt" + "net" + "sync" + "time" + + "github.com/golang/glog" + "golang.org/x/net/context" + "google.golang.org/grpc" + + pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha1" +) + +type endpoint struct { + client pluginapi.DevicePluginClient + + socketPath string + resourceName string + + devices map[string]*pluginapi.Device + mutex sync.Mutex + + callback MonitorCallback + + cancel context.CancelFunc + ctx context.Context +} + +func newEndpoint(socketPath, resourceName string, callback MonitorCallback) (*endpoint, error) { + client, err := dial(socketPath) + if err != nil { + return nil, err + } + + ctx, stop := context.WithCancel(context.Background()) + + return &endpoint{ + client: client, + + socketPath: socketPath, + resourceName: resourceName, + + devices: nil, + callback: callback, + + cancel: stop, + ctx: ctx, + }, nil +} + +func (e *endpoint) list() (pluginapi.DevicePlugin_ListAndWatchClient, error) { + glog.V(2).Infof("Starting ListAndWatch") + + stream, err := e.client.ListAndWatch(e.ctx, &pluginapi.Empty{}) + if err != nil { + glog.Errorf(ErrListAndWatch, e.resourceName, err) + + return nil, err + } + + devs, err := stream.Recv() + if err != nil { + glog.Errorf(ErrListAndWatch, e.resourceName, err) + return nil, err + } + + devices := make(map[string]*pluginapi.Device) + for _, d := range devs.Devices { + devices[d.ID] = d + } + + e.mutex.Lock() + e.devices = devices + e.mutex.Unlock() + + return stream, nil +} + +func (e *endpoint) listAndWatch(stream pluginapi.DevicePlugin_ListAndWatchClient) { + glog.V(2).Infof("Starting ListAndWatch") + + devices := make(map[string]*pluginapi.Device) + + e.mutex.Lock() + for _, d := range e.devices { + devices[d.ID] = CloneDevice(d) + } + e.mutex.Unlock() + + for { + response, err := stream.Recv() + if err != nil { + glog.Errorf(ErrListAndWatch, e.resourceName, err) + return + } + + devs := response.Devices + glog.V(2).Infof("State pushed for device plugin %s", e.resourceName) + + newDevs := make(map[string]*pluginapi.Device) + var added, updated []*pluginapi.Device + + for _, d := range devs { + dOld, ok := devices[d.ID] + newDevs[d.ID] = d + + if !ok { + glog.V(2).Infof("New device for Endpoint %s: %v", e.resourceName, d) + + devices[d.ID] = d + added = append(added, CloneDevice(d)) + + continue + } + + if d.Health == dOld.Health { + continue + } + + if d.Health == pluginapi.Unhealthy { + glog.Errorf("Device %s is now Unhealthy", d.ID) + } else if d.Health == pluginapi.Healthy { + glog.V(2).Infof("Device %s is now Healthy", d.ID) + } + + devices[d.ID] = d + updated = append(updated, CloneDevice(d)) + } + + var deleted []*pluginapi.Device + for id, d := range devices { + if _, ok := newDevs[id]; ok { + continue + } + + glog.Errorf("Device %s was deleted", d.ID) + + deleted = append(deleted, CloneDevice(d)) + delete(devices, id) + } + + e.mutex.Lock() + e.devices = devices + e.mutex.Unlock() + + e.callback(e.resourceName, added, updated, deleted) + } + +} + +func (e *endpoint) allocate(devs []*pluginapi.Device) (*pluginapi.AllocateResponse, error) { + var ids []string + for _, d := range devs { + ids = append(ids, d.ID) + } + + return e.client.Allocate(context.Background(), &pluginapi.AllocateRequest{ + DevicesIDs: ids, + }) +} + +func (e *endpoint) stop() { + e.cancel() +} + +func dial(unixSocketPath string) (pluginapi.DevicePluginClient, error) { + c, err := grpc.Dial(unixSocketPath, grpc.WithInsecure(), + grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) { + return net.DialTimeout("unix", addr, timeout) + }), + ) + + if err != nil { + return nil, fmt.Errorf(pluginapi.ErrFailedToDialDevicePlugin+" %v", err) + } + + return pluginapi.NewDevicePluginClient(c), nil +} diff --git a/pkg/kubelet/deviceplugin/manager.go b/pkg/kubelet/deviceplugin/manager.go new file mode 100644 index 0000000000000..f941b38c62644 --- /dev/null +++ b/pkg/kubelet/deviceplugin/manager.go @@ -0,0 +1,193 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package deviceplugin + +import ( + "fmt" + "net" + "os" + "path/filepath" + + "github.com/golang/glog" + "golang.org/x/net/context" + "google.golang.org/grpc" + + pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha1" +) + +// NewManagerImpl creates a new manager on the socket `socketPath` and can +// rebuild state from devices and available []Device. +// f is the callback that is called when a device becomes unhealthy +// socketPath is present for testing purposes in production this is pluginapi.KubeletSocket +func NewManagerImpl(socketPath string, f MonitorCallback) (*ManagerImpl, error) { + glog.V(2).Infof("Creating Device Plugin manager at %s", socketPath) + + if socketPath == "" || !filepath.IsAbs(socketPath) { + return nil, fmt.Errorf(ErrBadSocket+" %v", socketPath) + } + + dir, file := filepath.Split(socketPath) + return &ManagerImpl{ + Endpoints: make(map[string]*endpoint), + + socketname: file, + socketdir: dir, + callback: f, + }, nil +} + +// Start starts the Device Plugin Manager +func (m *ManagerImpl) Start() error { + glog.V(2).Infof("Starting Device Plugin manager") + + socketPath := filepath.Join(m.socketdir, m.socketname) + os.MkdirAll(m.socketdir, 0755) + + if err := os.Remove(socketPath); err != nil && !os.IsNotExist(err) { + glog.Errorf(ErrRemoveSocket+" %+v", err) + return err + } + + s, err := net.Listen("unix", socketPath) + if err != nil { + glog.Errorf(ErrListenSocket+" %+v", err) + return err + } + + m.server = grpc.NewServer([]grpc.ServerOption{}...) + + pluginapi.RegisterRegistrationServer(m.server, m) + go m.server.Serve(s) + + return nil +} + +// Devices is the map of devices that are known by the Device +// Plugin manager with the Kind of the devices as key +func (m *ManagerImpl) Devices() map[string][]*pluginapi.Device { + glog.V(2).Infof("Devices called") + + m.mutex.Lock() + defer m.mutex.Unlock() + + devs := make(map[string][]*pluginapi.Device) + for k, e := range m.Endpoints { + glog.V(2).Infof("Endpoint: %+v: %+v", k, e) + e.mutex.Lock() + devs[k] = copyDevices(e.devices) + e.mutex.Unlock() + } + + return devs +} + +// Allocate is the call that you can use to allocate a set of Devices +func (m *ManagerImpl) Allocate(resourceName string, + devs []*pluginapi.Device) (*pluginapi.AllocateResponse, error) { + + m.mutex.Lock() + defer m.mutex.Unlock() + + if len(devs) == 0 { + return nil, nil + } + + glog.Infof("Recieved request for devices %v for device plugin %s", + devs, resourceName) + + e, ok := m.Endpoints[resourceName] + if !ok { + return nil, fmt.Errorf("Unknown Device Plugin %s", resourceName) + } + + return e.allocate(devs) +} + +// Register registers a device plugin +func (m *ManagerImpl) Register(ctx context.Context, + r *pluginapi.RegisterRequest) (*pluginapi.Empty, error) { + + glog.V(2).Infof("Got request for Device Plugin %s", r.ResourceName) + + if r.Version != pluginapi.Version { + return &pluginapi.Empty{}, + fmt.Errorf(pluginapi.ErrUnsuportedVersion) + } + + if err := IsResourceNameValid(r.ResourceName); err != nil { + return &pluginapi.Empty{}, err + } + + if _, ok := m.Endpoints[r.ResourceName]; ok { + return &pluginapi.Empty{}, + fmt.Errorf(pluginapi.ErrDevicePluginAlreadyExists) + } + + go m.addEndpoint(r) + + return &pluginapi.Empty{}, nil +} + +// Stop is the function that can stop the gRPC server +func (m *ManagerImpl) Stop() error { + for _, e := range m.Endpoints { + e.stop() + } + + m.server.Stop() + + return nil +} + +func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) { + socketPath := filepath.Join(m.socketdir, r.Endpoint) + + e, err := newEndpoint(socketPath, r.ResourceName, m.callback) + if err != nil { + glog.Errorf("Failed to dial device plugin with request %v: %v", r, err) + return + } + + stream, err := e.list() + if err != nil { + glog.Errorf("Failed to List devices for plugin %v: %v", r.ResourceName, err) + return + } + + go func() { + e.listAndWatch(stream) + + m.mutex.Lock() + e.mutex.Lock() + + delete(m.Endpoints, r.ResourceName) + glog.V(2).Infof("Unregistered endpoint %v", e) + + e.mutex.Unlock() + m.mutex.Unlock() + }() + + m.mutex.Lock() + e.mutex.Lock() + + m.Endpoints[r.ResourceName] = e + glog.V(2).Infof("Registered endpoint %v", e) + + e.mutex.Unlock() + m.mutex.Unlock() + +} diff --git a/pkg/kubelet/deviceplugin/types.go b/pkg/kubelet/deviceplugin/types.go new file mode 100644 index 0000000000000..99bc8c0771cf3 --- /dev/null +++ b/pkg/kubelet/deviceplugin/types.go @@ -0,0 +1,74 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package deviceplugin + +import ( + "sync" + + "google.golang.org/grpc" + + pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha1" +) + +// MonitorCallback is the function called when a device becomes +// unhealthy (or healthy again) +// Updated contains the most recent state of the Device +type MonitorCallback func(resourceName string, added, updated, deleted []*pluginapi.Device) + +// Manager manages the Device Plugins running on a machine +type Manager interface { + // Start starts the gRPC service + Start() error + // Devices is the map of devices that have registered themselves + // against the manager. + // The map key is the ResourceName of the device plugins + Devices() map[string][]*pluginapi.Device + + // Allocate is calls the gRPC Allocate on the device plugin + Allocate(string, []*pluginapi.Device) (*pluginapi.AllocateResponse, error) + + // Stop stops the manager + Stop() error +} + +// ManagerImpl is the structure in charge of managing Device Plugins +type ManagerImpl struct { + socketname string + socketdir string + + Endpoints map[string]*endpoint // Key is ResourceName + mutex sync.Mutex + + callback MonitorCallback + + server *grpc.Server +} + +const ( + // ErrDevicePluginUnknown is the error raised when the device Plugin returned by Monitor is not know by the Device Plugin manager + ErrDevicePluginUnknown = "Manager does not have device plugin for device:" + // ErrDeviceUnknown is the error raised when the device returned by Monitor is not know by the Device Plugin manager + ErrDeviceUnknown = "Could not find device in it's Device Plugin's Device List:" + // ErrBadSocket is the error raised when the registry socket path is not absolute + ErrBadSocket = "Bad socketPath, must be an absolute path:" + // ErrRemoveSocket is the error raised when the registry could not remove the existing socket + ErrRemoveSocket = "Failed to remove socket while starting device plugin registry, with error" + // ErrListenSocket is the error raised when the registry could not listen on the socket + ErrListenSocket = "Failed to listen to socket while starting device plugin registry, with error" + // ErrListAndWatch is the error raised when ListAndWatch ended unsuccessfully + ErrListAndWatch = "ListAndWatch ended unexpectedly for device plugin %s with error %v" +) diff --git a/pkg/kubelet/deviceplugin/utils.go b/pkg/kubelet/deviceplugin/utils.go new file mode 100644 index 0000000000000..57a6b0a71c6a9 --- /dev/null +++ b/pkg/kubelet/deviceplugin/utils.go @@ -0,0 +1,76 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package deviceplugin + +import ( + "fmt" + "strings" + + pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha1" +) + +// CloneDevice clones a pluginapi.Device +func CloneDevice(d *pluginapi.Device) *pluginapi.Device { + return &pluginapi.Device{ + ID: d.ID, + Health: d.Health, + } + +} + +func copyDevices(devs map[string]*pluginapi.Device) []*pluginapi.Device { + var clones []*pluginapi.Device + + for _, d := range devs { + clones = append(clones, CloneDevice(d)) + } + + return clones +} + +// GetDevice returns the Device if a boolean signaling if the device was found or not +func GetDevice(d *pluginapi.Device, devs []*pluginapi.Device) (*pluginapi.Device, bool) { + name := DeviceKey(d) + + for _, d := range devs { + if DeviceKey(d) != name { + continue + } + + return d, true + } + + return nil, false +} + +// IsResourceNameValid returns an error if the resource is invalid, +func IsResourceNameValid(resourceName string) error { + if resourceName == "" { + return fmt.Errorf(pluginapi.ErrEmptyResourceName) + } + + if strings.ContainsAny(resourceName, pluginapi.InvalidChars) { + return fmt.Errorf(pluginapi.ErrInvalidResourceName) + } + + return nil +} + +// DeviceKey returns the Key of a device +func DeviceKey(d *pluginapi.Device) string { + return d.ID +} From c4a1c97329269a028ba357fc55b1e77f24eba091 Mon Sep 17 00:00:00 2001 From: Renaud Gaubert Date: Tue, 8 Aug 2017 16:34:13 -0700 Subject: [PATCH 2/5] Device Plugin Kubelet integration --- pkg/kubelet/BUILD | 1 + pkg/kubelet/cm/BUILD | 3 ++ pkg/kubelet/cm/container_manager.go | 3 ++ pkg/kubelet/cm/container_manager_linux.go | 12 +++++ pkg/kubelet/cm/device_plugin_handler.go | 55 +++++++++++++++++++++++ pkg/kubelet/kubelet.go | 7 +++ pkg/kubelet/kubelet_node_status.go | 22 +++++++++ 7 files changed, 103 insertions(+) create mode 100644 pkg/kubelet/cm/device_plugin_handler.go diff --git a/pkg/kubelet/BUILD b/pkg/kubelet/BUILD index 257cea02d9a3a..b3e261a43aaa0 100644 --- a/pkg/kubelet/BUILD +++ b/pkg/kubelet/BUILD @@ -43,6 +43,7 @@ go_library( "//pkg/fieldpath:go_default_library", "//pkg/kubelet/apis:go_default_library", "//pkg/kubelet/apis/cri:go_default_library", + "//pkg/kubelet/apis/deviceplugin/v1alpha1:go_default_library", "//pkg/kubelet/apis/kubeletconfig:go_default_library", "//pkg/kubelet/apis/kubeletconfig/v1alpha1:go_default_library", "//pkg/kubelet/cadvisor:go_default_library", diff --git a/pkg/kubelet/cm/BUILD b/pkg/kubelet/cm/BUILD index 0b1e48698e59a..b5389246bb81c 100644 --- a/pkg/kubelet/cm/BUILD +++ b/pkg/kubelet/cm/BUILD @@ -7,6 +7,7 @@ go_library( "container_manager.go", "container_manager_stub.go", "container_manager_unsupported.go", + "device_plugin_handler.go", "helpers_unsupported.go", "pod_container_manager_stub.go", "pod_container_manager_unsupported.go", @@ -27,8 +28,10 @@ go_library( }), visibility = ["//visibility:public"], deps = [ + "//pkg/kubelet/apis/deviceplugin/v1alpha1:go_default_library", "//pkg/kubelet/apis/kubeletconfig:go_default_library", "//pkg/kubelet/cadvisor:go_default_library", + "//pkg/kubelet/deviceplugin:go_default_library", "//pkg/kubelet/eviction/api:go_default_library", "//pkg/util/mount:go_default_library", "//vendor/github.com/golang/glog:go_default_library", diff --git a/pkg/kubelet/cm/container_manager.go b/pkg/kubelet/cm/container_manager.go index 36c4569eb70b0..9d01c33bbd251 100644 --- a/pkg/kubelet/cm/container_manager.go +++ b/pkg/kubelet/cm/container_manager.go @@ -20,6 +20,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" // TODO: Migrate kubelet to either use its own internal objects or client library. "k8s.io/api/core/v1" + pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha1" "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig" evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api" @@ -66,6 +67,8 @@ type ContainerManager interface { // UpdateQOSCgroups performs housekeeping updates to ensure that the top // level QoS containers have their desired state in a thread-safe way UpdateQOSCgroups() error + + InternalContainerLifecycle() InternalContainerLifecycle } type NodeConfig struct { diff --git a/pkg/kubelet/cm/container_manager_linux.go b/pkg/kubelet/cm/container_manager_linux.go index 66128398e3bd7..8de5b98a90a98 100644 --- a/pkg/kubelet/cm/container_manager_linux.go +++ b/pkg/kubelet/cm/container_manager_linux.go @@ -117,6 +117,8 @@ type containerManagerImpl struct { recorder record.EventRecorder // Interface for QoS cgroup management qosContainerManager QOSContainerManager + // Interface for device plugin management. + devicePluginHdler DevicePluginHandler } type features struct { @@ -820,3 +822,13 @@ func (cm *containerManagerImpl) GetCapacity() v1.ResourceList { defer cm.RUnlock() return cm.capacity } + +// GetDevicePluginHandler returns the DevicePluginHandler +func (m *containerManagerImpl) GetDevicePluginHandler() DevicePluginHandler { + return m.devicePluginHdler +} + +// SetDevicePluginHandler sets the DevicePluginHandler +func (m *containerManagerImpl) SetDevicePluginHandler(d DevicePluginHandler) { + m.devicePluginHdler = d +} diff --git a/pkg/kubelet/cm/device_plugin_handler.go b/pkg/kubelet/cm/device_plugin_handler.go new file mode 100644 index 0000000000000..7c30168ec278a --- /dev/null +++ b/pkg/kubelet/cm/device_plugin_handler.go @@ -0,0 +1,55 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cm + +import ( + "fmt" + + "github.com/golang/glog" + + pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha1" + "k8s.io/kubernetes/pkg/kubelet/deviceplugin" +) + +type DevicePluginHandlerImpl struct { + devicePluginManager deviceplugin.Manager +} + +// NewDevicePluginHandler create a DevicePluginHandler +func NewDevicePluginHandler() (*DevicePluginHandlerImpl, error) { + glog.V(2).Infof("Starting Device Plugin Handler") + + mgr, err := deviceplugin.NewManagerImpl(pluginapi.DevicePluginPath, + func(r string, a, u, d []*pluginapi.Device) {}) + + if err != nil { + return nil, fmt.Errorf("Failed to initialize device plugin: %+v", err) + } + + if err := mgr.Start(); err != nil { + return nil, err + } + + return &DevicePluginHandlerImpl{ + devicePluginManager: mgr, + }, nil +} + +// TODO cache this +func (h *DevicePluginHandlerImpl) Devices() map[string][]*pluginapi.Device { + return h.devicePluginManager.Devices() +} diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 9edfabf400c23..e145df621094a 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -754,6 +754,13 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, return nil, err } + devicePluginHdlr, err := cm.NewDevicePluginHandler() + if err != nil { + return nil, err + } + + klet.containerManager.SetDevicePluginHandler(devicePluginHdlr) + // If the experimentalMounterPathFlag is set, we do not want to // check node capabilities since the mount path is not the default if len(kubeCfg.ExperimentalMounterPath) != 0 { diff --git a/pkg/kubelet/kubelet_node_status.go b/pkg/kubelet/kubelet_node_status.go index 2b1ec95831411..146f6e36de3eb 100644 --- a/pkg/kubelet/kubelet_node_status.go +++ b/pkg/kubelet/kubelet_node_status.go @@ -39,6 +39,7 @@ import ( "k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/features" kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" + pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha1" "k8s.io/kubernetes/pkg/kubelet/cadvisor" "k8s.io/kubernetes/pkg/kubelet/events" "k8s.io/kubernetes/pkg/kubelet/util" @@ -621,6 +622,27 @@ func (kl *Kubelet) setNodeStatusMachineInfo(node *v1.Node) { } node.Status.Allocatable[k] = value } + + hdlr := kl.containerManager.GetDevicePluginHandler() + if hdlr == nil { + return + } + + for k, v := range hdlr.Devices() { + key := v1.ResourceName(v1.ResourceOpaqueIntPrefix + k) + + var n int64 + n = 0 + + for _, d := range v { + if d.Health == pluginapi.Unhealthy { + continue + } + n++ + } + + node.Status.Capacity[key] = *resource.NewQuantity(n, resource.DecimalSI) + } } // Set versioninfo for the node. From f7f4515e43f182b81d8452492f382a9b11d1f50e Mon Sep 17 00:00:00 2001 From: Renaud Gaubert Date: Tue, 15 Aug 2017 14:46:30 -0700 Subject: [PATCH 3/5] Testing --- pkg/kubelet/deviceplugin/BUILD | 16 +++ pkg/kubelet/deviceplugin/endpoint_test.go | 125 ++++++++++++++++++ pkg/kubelet/deviceplugin/manager_test.go | 58 ++++++++ .../deviceplugin/mock_device_plugin.go | 123 +++++++++++++++++ pkg/kubelet/deviceplugin/utils_test.go | 54 ++++++++ 5 files changed, 376 insertions(+) create mode 100644 pkg/kubelet/deviceplugin/endpoint_test.go create mode 100644 pkg/kubelet/deviceplugin/manager_test.go create mode 100644 pkg/kubelet/deviceplugin/mock_device_plugin.go create mode 100644 pkg/kubelet/deviceplugin/utils_test.go diff --git a/pkg/kubelet/deviceplugin/BUILD b/pkg/kubelet/deviceplugin/BUILD index 30e5891b2b3cf..ccf6f0ef96bc9 100644 --- a/pkg/kubelet/deviceplugin/BUILD +++ b/pkg/kubelet/deviceplugin/BUILD @@ -5,6 +5,7 @@ licenses(["notice"]) load( "@io_bazel_rules_go//go:def.bzl", "go_library", + "go_test", ) go_library( @@ -12,6 +13,7 @@ go_library( srcs = [ "endpoint.go", "manager.go", + "mock_device_plugin.go", "types.go", "utils.go", ], @@ -36,3 +38,17 @@ filegroup( srcs = [":package-srcs"], tags = ["automanaged"], ) + +go_test( + name = "go_default_test", + srcs = [ + "endpoint_test.go", + "manager_test.go", + "utils_test.go", + ], + library = ":go_default_library", + deps = [ + "//pkg/kubelet/apis/deviceplugin/v1alpha1:go_default_library", + "//vendor/github.com/stretchr/testify/require:go_default_library", + ], +) diff --git a/pkg/kubelet/deviceplugin/endpoint_test.go b/pkg/kubelet/deviceplugin/endpoint_test.go new file mode 100644 index 0000000000000..8898b3dd13340 --- /dev/null +++ b/pkg/kubelet/deviceplugin/endpoint_test.go @@ -0,0 +1,125 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package deviceplugin + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + + pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha1" +) + +const ( + socket = "/tmp/mock.sock" +) + +func TestNewEndpoint(t *testing.T) { + devs := []*pluginapi.Device{ + {ID: "ADeviceId", Health: pluginapi.Healthy}, + } + + p, e := esetup(t, devs, socket, "mock", func(n string, a, u, r []*pluginapi.Device) {}) + defer ecleanup(t, p, e) +} + +func TestList(t *testing.T) { + devs := []*pluginapi.Device{ + {ID: "ADeviceId", Health: pluginapi.Healthy}, + } + + p, e := esetup(t, devs, socket, "mock", func(n string, a, u, r []*pluginapi.Device) {}) + defer ecleanup(t, p, e) + + _, err := e.list() + require.NoError(t, err) + + e.mutex.Lock() + defer e.mutex.Unlock() + + require.Len(t, e.devices, 1) + + d, ok := e.devices[devs[0].ID] + require.True(t, ok) + + require.Equal(t, d.ID, devs[0].ID) + require.Equal(t, d.Health, devs[0].Health) +} + +func TestListAndWatch(t *testing.T) { + devs := []*pluginapi.Device{ + {ID: "ADeviceId", Health: pluginapi.Healthy}, + {ID: "AnotherDeviceId", Health: pluginapi.Healthy}, + } + + updated := []*pluginapi.Device{ + {ID: "ADeviceId", Health: pluginapi.Unhealthy}, + {ID: "AThirdDeviceId", Health: pluginapi.Healthy}, + } + + p, e := esetup(t, devs, socket, "mock", func(n string, a, u, r []*pluginapi.Device) { + require.Len(t, a, 1) + require.Len(t, u, 1) + require.Len(t, r, 1) + + require.Equal(t, a[0].ID, updated[1].ID) + + require.Equal(t, u[0].ID, updated[0].ID) + require.Equal(t, u[0].Health, updated[0].Health) + + require.Equal(t, r[0].ID, devs[1].ID) + }) + defer ecleanup(t, p, e) + + s, err := e.list() + require.NoError(t, err) + + go e.listAndWatch(s) + p.Update(updated) + time.Sleep(time.Second) + + e.mutex.Lock() + defer e.mutex.Unlock() + + require.Len(t, e.devices, 2) + for _, dref := range updated { + d, ok := e.devices[dref.ID] + + require.True(t, ok) + require.Equal(t, d.ID, dref.ID) + require.Equal(t, d.Health, dref.Health) + } + +} + +func esetup(t *testing.T, devs []*pluginapi.Device, socket, resourceName string, callback MonitorCallback) (*MockDevicePlugin, *endpoint) { + p := NewMockDevicePlugin(devs, socket) + + err := p.Start() + require.NoError(t, err) + + e, err := newEndpoint(socket, "mock", func(n string, a, u, r []*pluginapi.Device) {}) + require.NoError(t, err) + + return p, e +} + +func ecleanup(t *testing.T, p *MockDevicePlugin, e *endpoint) { + p.Stop() + e.stop() +} diff --git a/pkg/kubelet/deviceplugin/manager_test.go b/pkg/kubelet/deviceplugin/manager_test.go new file mode 100644 index 0000000000000..8aed51007d1ae --- /dev/null +++ b/pkg/kubelet/deviceplugin/manager_test.go @@ -0,0 +1,58 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package deviceplugin + +import ( + "testing" + + "github.com/stretchr/testify/require" + + pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha1" +) + +const ( + msocket = "/tmp/server.sock" +) + +func TestNewManagerImpl(t *testing.T) { + _, err := NewManagerImpl("", func(n string, a, u, r []*pluginapi.Device) {}) + require.Error(t, err) + + _, err = NewManagerImpl(msocket, func(n string, a, u, r []*pluginapi.Device) {}) + require.NoError(t, err) +} + +func TestNewManagerImplStart(t *testing.T) { + _, err := NewManagerImpl(msocket, func(n string, a, u, r []*pluginapi.Device) {}) + require.NoError(t, err) +} + +func setup(t *testing.T, devs []*pluginapi.Device, pluginSocket, serverSocket string, callback MonitorCallback) (Manager, *MockDevicePlugin) { + m, err := NewManagerImpl(serverSocket, callback) + require.NoError(t, err) + + p := NewMockDevicePlugin(devs, pluginSocket) + err = p.Start() + require.NoError(t, err) + + return m, p +} + +func cleanup(t *testing.T, m Manager, p *MockDevicePlugin) { + p.Stop() + m.Stop() +} diff --git a/pkg/kubelet/deviceplugin/mock_device_plugin.go b/pkg/kubelet/deviceplugin/mock_device_plugin.go new file mode 100644 index 0000000000000..676536cfe027e --- /dev/null +++ b/pkg/kubelet/deviceplugin/mock_device_plugin.go @@ -0,0 +1,123 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package deviceplugin + +import ( + "log" + "net" + "os" + + "golang.org/x/net/context" + "google.golang.org/grpc" + + pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha1" +) + +// MockDevicePlugin is a mock device plugin +type MockDevicePlugin struct { + devs []*pluginapi.Device + socket string + + stop chan interface{} + update chan []*pluginapi.Device + + server *grpc.Server +} + +// NewMockDevicePlugin returns an initialized MockDevicePlugin +func NewMockDevicePlugin(devs []*pluginapi.Device, socket string) *MockDevicePlugin { + return &MockDevicePlugin{ + devs: devs, + socket: socket, + + stop: make(chan interface{}), + update: make(chan []*pluginapi.Device), + } +} + +// Start starts the gRPC server of the device plugin +func (m *MockDevicePlugin) Start() error { + err := m.cleanup() + if err != nil { + return err + } + + sock, err := net.Listen("unix", m.socket) + if err != nil { + return err + } + + m.server = grpc.NewServer([]grpc.ServerOption{}...) + pluginapi.RegisterDevicePluginServer(m.server, m) + + go m.server.Serve(sock) + log.Println("Starting to serve on", m.socket) + + return nil +} + +// Stop stops the gRPC server +func (m *MockDevicePlugin) Stop() error { + m.server.Stop() + + return m.cleanup() +} + +// ListAndWatch lists devices and update that list according to the Update call +func (m *MockDevicePlugin) ListAndWatch(e *pluginapi.Empty, s pluginapi.DevicePlugin_ListAndWatchServer) error { + log.Println("ListAndWatch") + var devs []*pluginapi.Device + + for _, d := range m.devs { + devs = append(devs, &pluginapi.Device{ + ID: d.ID, + Health: pluginapi.Healthy, + }) + } + + s.Send(&pluginapi.ListAndWatchResponse{Devices: devs}) + + for { + select { + case <-m.stop: + return nil + case updated := <-m.update: + s.Send(&pluginapi.ListAndWatchResponse{Devices: updated}) + } + } +} + +// Update allows the device plugin to send new devices through ListAndWatch +func (m *MockDevicePlugin) Update(devs []*pluginapi.Device) { + m.update <- devs +} + +// Allocate does a mock allocation +func (m *MockDevicePlugin) Allocate(ctx context.Context, r *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) { + log.Printf("Allocate, %+v", r) + + var response pluginapi.AllocateResponse + return &response, nil +} + +func (m *MockDevicePlugin) cleanup() error { + if err := os.Remove(m.socket); err != nil && !os.IsNotExist(err) { + return err + } + + return nil +} diff --git a/pkg/kubelet/deviceplugin/utils_test.go b/pkg/kubelet/deviceplugin/utils_test.go new file mode 100644 index 0000000000000..cf4a1186647b7 --- /dev/null +++ b/pkg/kubelet/deviceplugin/utils_test.go @@ -0,0 +1,54 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package deviceplugin + +import ( + "testing" + + "github.com/stretchr/testify/require" + + pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha1" +) + +func TestCloneDevice(t *testing.T) { + d := CloneDevice(&pluginapi.Device{ID: "ADeviceId", Health: pluginapi.Healthy}) + + require.Equal(t, d.ID, "ADeviceId") + require.Equal(t, d.Health, pluginapi.Healthy) +} + +func TestCopyDevices(t *testing.T) { + d := map[string]*pluginapi.Device{ + "ADeviceId": {ID: "ADeviceId", Health: pluginapi.Healthy}, + } + + devs := copyDevices(d) + require.Len(t, devs, 1) +} + +func TestGetDevice(t *testing.T) { + devs := []*pluginapi.Device{ + {ID: "ADeviceId", Health: pluginapi.Healthy}, + } + + _, ok := GetDevice(&pluginapi.Device{ID: "AnotherDeviceId"}, devs) + require.False(t, ok) + + d, ok := GetDevice(&pluginapi.Device{ID: "ADeviceId"}, devs) + require.True(t, ok) + require.Equal(t, d, devs[0]) +} From 7a8ad491ef8e9025cd8038cd587f1f14608fab8e Mon Sep 17 00:00:00 2001 From: Renaud Gaubert Date: Fri, 18 Aug 2017 15:17:43 -0700 Subject: [PATCH 4/5] Alpha feature integration --- pkg/features/kube_features.go | 8 ++++++++ pkg/kubelet/deviceplugin/endpoint_test.go | 15 +++++++++++++-- pkg/kubelet/deviceplugin/manager_test.go | 14 +++++++++++--- pkg/kubelet/kubelet.go | 12 +++++++----- 4 files changed, 39 insertions(+), 10 deletions(-) diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 7b5e8a8132b2c..37aee4e1ae33b 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -73,6 +73,13 @@ const ( // Works only with Docker Container Runtime. Accelerators utilfeature.Feature = "Accelerators" + // owner: @vishh + // alpha: v1.8 + // + // Enables support for Device Plugins + // Only Nvidia GPUs are supported as of v1.8. + DevicePlugins utilfeature.Feature = "DevicePlugins" + // owner: @gmarek // alpha: v1.6 // @@ -155,6 +162,7 @@ var defaultKubernetesFeatureGates = map[utilfeature.Feature]utilfeature.FeatureS ExperimentalHostUserNamespaceDefaultingGate: {Default: false, PreRelease: utilfeature.Beta}, ExperimentalCriticalPodAnnotation: {Default: false, PreRelease: utilfeature.Alpha}, Accelerators: {Default: false, PreRelease: utilfeature.Alpha}, + DevicePlugins: {Default: false, PreRelease: utilfeature.Alpha}, TaintBasedEvictions: {Default: false, PreRelease: utilfeature.Alpha}, RotateKubeletServerCertificate: {Default: false, PreRelease: utilfeature.Alpha}, RotateKubeletClientCertificate: {Default: true, PreRelease: utilfeature.Beta}, diff --git a/pkg/kubelet/deviceplugin/endpoint_test.go b/pkg/kubelet/deviceplugin/endpoint_test.go index 8898b3dd13340..9713584949e64 100644 --- a/pkg/kubelet/deviceplugin/endpoint_test.go +++ b/pkg/kubelet/deviceplugin/endpoint_test.go @@ -17,6 +17,8 @@ limitations under the License. package deviceplugin import ( + "os" + "path" "testing" "time" @@ -25,11 +27,14 @@ import ( pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha1" ) -const ( - socket = "/tmp/mock.sock" +var ( + esocketName = "mock.sock" ) func TestNewEndpoint(t *testing.T) { + wd, _ := os.Getwd() + socket := path.Join(wd, esocketName) + devs := []*pluginapi.Device{ {ID: "ADeviceId", Health: pluginapi.Healthy}, } @@ -39,6 +44,9 @@ func TestNewEndpoint(t *testing.T) { } func TestList(t *testing.T) { + wd, _ := os.Getwd() + socket := path.Join(wd, esocketName) + devs := []*pluginapi.Device{ {ID: "ADeviceId", Health: pluginapi.Healthy}, } @@ -62,6 +70,9 @@ func TestList(t *testing.T) { } func TestListAndWatch(t *testing.T) { + wd, _ := os.Getwd() + socket := path.Join(wd, esocketName) + devs := []*pluginapi.Device{ {ID: "ADeviceId", Health: pluginapi.Healthy}, {ID: "AnotherDeviceId", Health: pluginapi.Healthy}, diff --git a/pkg/kubelet/deviceplugin/manager_test.go b/pkg/kubelet/deviceplugin/manager_test.go index 8aed51007d1ae..2816339887e6a 100644 --- a/pkg/kubelet/deviceplugin/manager_test.go +++ b/pkg/kubelet/deviceplugin/manager_test.go @@ -17,6 +17,8 @@ limitations under the License. package deviceplugin import ( + "os" + "path" "testing" "github.com/stretchr/testify/require" @@ -25,19 +27,25 @@ import ( ) const ( - msocket = "/tmp/server.sock" + msocketName = "/tmp/server.sock" ) func TestNewManagerImpl(t *testing.T) { + wd, _ := os.Getwd() + socket := path.Join(wd, msocketName) + _, err := NewManagerImpl("", func(n string, a, u, r []*pluginapi.Device) {}) require.Error(t, err) - _, err = NewManagerImpl(msocket, func(n string, a, u, r []*pluginapi.Device) {}) + _, err = NewManagerImpl(socket, func(n string, a, u, r []*pluginapi.Device) {}) require.NoError(t, err) } func TestNewManagerImplStart(t *testing.T) { - _, err := NewManagerImpl(msocket, func(n string, a, u, r []*pluginapi.Device) {}) + wd, _ := os.Getwd() + socket := path.Join(wd, msocketName) + + _, err := NewManagerImpl(socket, func(n string, a, u, r []*pluginapi.Device) {}) require.NoError(t, err) } diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index e145df621094a..b2ede83cc9a00 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -754,13 +754,15 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, return nil, err } - devicePluginHdlr, err := cm.NewDevicePluginHandler() - if err != nil { - return nil, err - } + if utilfeature.DefaultFeatureGate.Enabled(features.DevicePlugins) { + devicePluginHdlr, err := cm.NewDevicePluginHandler() + if err != nil { + return nil, err + } - klet.containerManager.SetDevicePluginHandler(devicePluginHdlr) + klet.containerManager.SetDevicePluginHandler(devicePluginHdlr) + } // If the experimentalMounterPathFlag is set, we do not want to // check node capabilities since the mount path is not the default if len(kubeCfg.ExperimentalMounterPath) != 0 { From 02001af752cb6068ecab42ec5fd6a2792df38945 Mon Sep 17 00:00:00 2001 From: Jiaying Zhang Date: Tue, 22 Aug 2017 15:03:47 -0700 Subject: [PATCH 5/5] Kubelet side extension to support device allocation --- cmd/kubelet/app/server.go | 4 + pkg/features/kube_features.go | 4 +- pkg/kubelet/BUILD | 1 - .../apis/deviceplugin/v1alpha1/constants.go | 18 -- pkg/kubelet/cm/BUILD | 15 +- pkg/kubelet/cm/container_manager.go | 6 +- pkg/kubelet/cm/container_manager_linux.go | 120 +++++++-- pkg/kubelet/cm/container_manager_stub.go | 6 + .../cm/container_manager_unsupported.go | 6 +- pkg/kubelet/cm/container_manager_windows.go | 2 +- pkg/kubelet/cm/device_plugin_handler.go | 200 +++++++++++++- pkg/kubelet/cm/device_plugin_handler_stub.go | 42 +++ pkg/kubelet/cm/device_plugin_handler_test.go | 250 ++++++++++++++++++ pkg/kubelet/deviceplugin/BUILD | 4 +- ...device_plugin.go => device_plugin_stub.go} | 31 ++- pkg/kubelet/deviceplugin/endpoint.go | 54 ++-- pkg/kubelet/deviceplugin/endpoint_test.go | 16 +- pkg/kubelet/deviceplugin/manager.go | 117 ++++---- pkg/kubelet/deviceplugin/manager_test.go | 6 +- pkg/kubelet/deviceplugin/types.go | 72 +++-- pkg/kubelet/deviceplugin/utils.go | 41 +-- pkg/kubelet/deviceplugin/utils_test.go | 23 +- pkg/kubelet/kubelet.go | 9 - pkg/kubelet/kubelet_node_status.go | 31 +-- pkg/kubelet/kubelet_pods.go | 17 +- 25 files changed, 838 insertions(+), 257 deletions(-) create mode 100644 pkg/kubelet/cm/device_plugin_handler_stub.go create mode 100644 pkg/kubelet/cm/device_plugin_handler_test.go rename pkg/kubelet/deviceplugin/{mock_device_plugin.go => device_plugin_stub.go} (74%) diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index 6d38d26b74872..2eb641d9201e0 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -427,6 +427,9 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies) (err error) { if err != nil { return err } + + devicePluginEnabled := utilfeature.DefaultFeatureGate.Enabled(features.DevicePlugins) + kubeDeps.ContainerManager, err = cm.NewContainerManager( kubeDeps.Mounter, kubeDeps.CAdvisorInterface, @@ -450,6 +453,7 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies) (err error) { ExperimentalQOSReserved: *experimentalQOSReserved, }, s.FailSwapOn, + devicePluginEnabled, kubeDeps.Recorder) if err != nil { diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 37aee4e1ae33b..ac7eb3133f174 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -73,11 +73,11 @@ const ( // Works only with Docker Container Runtime. Accelerators utilfeature.Feature = "Accelerators" - // owner: @vishh + // owner: @jiayingz // alpha: v1.8 // // Enables support for Device Plugins - // Only Nvidia GPUs are supported as of v1.8. + // Only Nvidia GPUs are tested as of v1.8. DevicePlugins utilfeature.Feature = "DevicePlugins" // owner: @gmarek diff --git a/pkg/kubelet/BUILD b/pkg/kubelet/BUILD index b3e261a43aaa0..257cea02d9a3a 100644 --- a/pkg/kubelet/BUILD +++ b/pkg/kubelet/BUILD @@ -43,7 +43,6 @@ go_library( "//pkg/fieldpath:go_default_library", "//pkg/kubelet/apis:go_default_library", "//pkg/kubelet/apis/cri:go_default_library", - "//pkg/kubelet/apis/deviceplugin/v1alpha1:go_default_library", "//pkg/kubelet/apis/kubeletconfig:go_default_library", "//pkg/kubelet/apis/kubeletconfig/v1alpha1:go_default_library", "//pkg/kubelet/cadvisor:go_default_library", diff --git a/pkg/kubelet/apis/deviceplugin/v1alpha1/constants.go b/pkg/kubelet/apis/deviceplugin/v1alpha1/constants.go index b771ec8c960dc..fb0440cce59cf 100644 --- a/pkg/kubelet/apis/deviceplugin/v1alpha1/constants.go +++ b/pkg/kubelet/apis/deviceplugin/v1alpha1/constants.go @@ -30,22 +30,4 @@ const ( DevicePluginPath = "/var/lib/kubelet/device-plugins/" // KubeletSocket is the path of the Kubelet registry socket KubeletSocket = DevicePluginPath + "kubelet.sock" - - // InvalidChars are the characters that may not appear in a Vendor or Kind field - InvalidChars = "/ " - - // ErrFailedToDialDevicePlugin is the error raised when the device plugin could not be - // reached on the registered socket - ErrFailedToDialDevicePlugin = "Failed to dial device plugin:" - // ErrUnsuportedVersion is the error raised when the device plugin uses an API version not - // supported by the Kubelet registry - ErrUnsuportedVersion = "Unsupported version" - // ErrDevicePluginAlreadyExists is the error raised when a device plugin with the - // same Resource Name tries to register itself - ErrDevicePluginAlreadyExists = "Another device plugin already registered this Resource Name" - // ErrInvalidResourceName is the error raised when a device plugin is registering - // itself with an invalid ResourceName - ErrInvalidResourceName = "The Resource Name is invalid" - // ErrEmptyResourceName is the error raised when the resource name field is empty - ErrEmptyResourceName = "Invalid Empty ResourceName" ) diff --git a/pkg/kubelet/cm/BUILD b/pkg/kubelet/cm/BUILD index b5389246bb81c..6578b3541cb38 100644 --- a/pkg/kubelet/cm/BUILD +++ b/pkg/kubelet/cm/BUILD @@ -8,6 +8,7 @@ go_library( "container_manager_stub.go", "container_manager_unsupported.go", "device_plugin_handler.go", + "device_plugin_handler_stub.go", "helpers_unsupported.go", "pod_container_manager_stub.go", "pod_container_manager_unsupported.go", @@ -31,11 +32,13 @@ go_library( "//pkg/kubelet/apis/deviceplugin/v1alpha1:go_default_library", "//pkg/kubelet/apis/kubeletconfig:go_default_library", "//pkg/kubelet/cadvisor:go_default_library", + "//pkg/kubelet/container:go_default_library", "//pkg/kubelet/deviceplugin:go_default_library", "//pkg/kubelet/eviction/api:go_default_library", "//pkg/util/mount:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//vendor/k8s.io/client-go/tools/record:go_default_library", @@ -57,7 +60,6 @@ go_library( "//vendor/github.com/opencontainers/runc/libcontainer/cgroups/fs:go_default_library", "//vendor/github.com/opencontainers/runc/libcontainer/cgroups/systemd:go_default_library", "//vendor/github.com/opencontainers/runc/libcontainer/configs:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", ], @@ -69,6 +71,7 @@ go_test( name = "go_default_test", srcs = [ "container_manager_unsupported_test.go", + "device_plugin_handler_test.go", ] + select({ "@io_bazel_rules_go//go/platform:linux_amd64": [ "cgroup_manager_linux_test.go", @@ -81,15 +84,19 @@ go_test( }), library = ":go_default_library", deps = [ + "//pkg/kubelet/apis/deviceplugin/v1alpha1:go_default_library", "//pkg/util/mount:go_default_library", + "//vendor/github.com/stretchr/testify/assert:go_default_library", + "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/uuid:go_default_library", ] + select({ "@io_bazel_rules_go//go/platform:linux_amd64": [ "//pkg/kubelet/apis/kubeletconfig:go_default_library", "//pkg/kubelet/eviction/api:go_default_library", - "//vendor/github.com/stretchr/testify/assert:go_default_library", "//vendor/github.com/stretchr/testify/require:go_default_library", - "//vendor/k8s.io/api/core/v1:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library", ], "//conditions:default": [], }), diff --git a/pkg/kubelet/cm/container_manager.go b/pkg/kubelet/cm/container_manager.go index 9d01c33bbd251..ed5312e2e7379 100644 --- a/pkg/kubelet/cm/container_manager.go +++ b/pkg/kubelet/cm/container_manager.go @@ -20,8 +20,8 @@ import ( "k8s.io/apimachinery/pkg/util/sets" // TODO: Migrate kubelet to either use its own internal objects or client library. "k8s.io/api/core/v1" - pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha1" "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig" + kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api" "fmt" @@ -68,7 +68,9 @@ type ContainerManager interface { // level QoS containers have their desired state in a thread-safe way UpdateQOSCgroups() error - InternalContainerLifecycle() InternalContainerLifecycle + // Returns RunContainerOptions with devices, mounts, and env fields populated for + // extended resources required by container. + GetResources(pod *v1.Pod, container *v1.Container, activePods []*v1.Pod) (*kubecontainer.RunContainerOptions, error) } type NodeConfig struct { diff --git a/pkg/kubelet/cm/container_manager_linux.go b/pkg/kubelet/cm/container_manager_linux.go index 8de5b98a90a98..f3ff471bdd143 100644 --- a/pkg/kubelet/cm/container_manager_linux.go +++ b/pkg/kubelet/cm/container_manager_linux.go @@ -41,6 +41,7 @@ import ( "k8s.io/client-go/tools/record" "k8s.io/kubernetes/pkg/kubelet/cadvisor" cmutil "k8s.io/kubernetes/pkg/kubelet/cm/util" + kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/qos" utilfile "k8s.io/kubernetes/pkg/util/file" "k8s.io/kubernetes/pkg/util/mount" @@ -117,8 +118,8 @@ type containerManagerImpl struct { recorder record.EventRecorder // Interface for QoS cgroup management qosContainerManager QOSContainerManager - // Interface for device plugin management. - devicePluginHdler DevicePluginHandler + // Interface for exporting and allocating devices reported by device plugins. + devicePluginHandler DevicePluginHandler } type features struct { @@ -181,7 +182,7 @@ func validateSystemRequirements(mountUtil mount.Interface) (features, error) { // TODO(vmarmol): Add limits to the system containers. // Takes the absolute name of the specified containers. // Empty container name disables use of the specified container. -func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.Interface, nodeConfig NodeConfig, failSwapOn bool, recorder record.EventRecorder) (ContainerManager, error) { +func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.Interface, nodeConfig NodeConfig, failSwapOn bool, devicePluginEnabled bool, recorder record.EventRecorder) (ContainerManager, error) { subsystems, err := GetCgroupSubsystems() if err != nil { return nil, fmt.Errorf("failed to get mounted cgroup subsystems: %v", err) @@ -252,7 +253,7 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I return nil, err } - return &containerManagerImpl{ + cm := &containerManagerImpl{ cadvisorInterface: cadvisorInterface, mountUtil: mountUtil, NodeConfig: nodeConfig, @@ -262,7 +263,31 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I cgroupRoot: cgroupRoot, recorder: recorder, qosContainerManager: qosContainerManager, - }, nil + } + + updateDeviceCapacityFunc := func(updates v1.ResourceList) { + cm.Lock() + defer cm.Unlock() + for k, v := range updates { + if v.Value() <= 0 { + delete(cm.capacity, k) + } else { + cm.capacity[k] = v + } + } + } + + glog.Infof("Creating device plugin handler: %t", devicePluginEnabled) + if devicePluginEnabled { + cm.devicePluginHandler, err = NewDevicePluginHandlerImpl(updateDeviceCapacityFunc) + } else { + cm.devicePluginHandler, err = NewDevicePluginHandlerStub() + } + if err != nil { + return nil, err + } + + return cm, nil } // NewPodContainerManager is a factory method returns a PodContainerManager object @@ -545,6 +570,11 @@ func (cm *containerManagerImpl) Start(node *v1.Node, activePods ActivePodsFunc) } close(stopChan) }, time.Second, stopChan) + + // Starts device plugin manager. + if err := cm.devicePluginHandler.Start(); err != nil { + return err + } return nil } @@ -562,6 +592,76 @@ func (cm *containerManagerImpl) setFsCapacity() error { return nil } +// TODO: move the GetResources logic to PodContainerManager. +func (cm *containerManagerImpl) GetResources(pod *v1.Pod, container *v1.Container, activePods []*v1.Pod) (*kubecontainer.RunContainerOptions, error) { + opts := &kubecontainer.RunContainerOptions{} + // Gets devices, mounts, and envs from device plugin handler. + glog.V(3).Infof("Calling devicePluginHandler AllocateDevices") + // Maps to detect duplicate settings. + devsMap := make(map[string]string) + mountsMap := make(map[string]string) + envsMap := make(map[string]string) + allocResps, err := cm.devicePluginHandler.Allocate(pod, container, activePods) + if err != nil { + return opts, err + } + // Loops through AllocationResponses of all required extended resources. + for _, resp := range allocResps { + // Loops through runtime spec of all devices of the given resource. + for _, devRuntime := range resp.Spec { + // Updates RunContainerOptions.Devices. + for _, dev := range devRuntime.Devices { + if d, ok := devsMap[dev.ContainerPath]; ok { + glog.V(3).Infof("skip existing device %s %s", dev.ContainerPath, dev.HostPath) + if d != dev.HostPath { + glog.Errorf("Container device %s has conflicting mapping host devices: %s and %s", + dev.ContainerPath, d, dev.HostPath) + } + continue + } + devsMap[dev.ContainerPath] = dev.HostPath + opts.Devices = append(opts.Devices, kubecontainer.DeviceInfo{ + PathOnHost: dev.HostPath, + PathInContainer: dev.ContainerPath, + Permissions: dev.Permissions, + }) + } + // Updates RunContainerOptions.Mounts. + for _, mount := range devRuntime.Mounts { + if m, ok := mountsMap[mount.ContainerPath]; ok { + glog.V(3).Infof("skip existing mount %s %s", mount.ContainerPath, mount.HostPath) + if m != mount.HostPath { + glog.Errorf("Container mount %s has conflicting mapping host mounts: %s and %s", + mount.ContainerPath, m, mount.HostPath) + } + continue + } + mountsMap[mount.ContainerPath] = mount.HostPath + opts.Mounts = append(opts.Mounts, kubecontainer.Mount{ + Name: mount.ContainerPath, + ContainerPath: mount.ContainerPath, + HostPath: mount.HostPath, + ReadOnly: mount.ReadOnly, + SELinuxRelabel: false, + }) + } + // Updates RunContainerOptions.Envs. + for k, v := range devRuntime.Envs { + if e, ok := envsMap[k]; ok { + glog.V(3).Infof("skip existing envs %s %s", k, v) + if e != v { + glog.Errorf("Environment variable %s has conflicting setting: %s and %s", k, e, v) + } + continue + } + envsMap[k] = v + opts.Envs = append(opts.Envs, kubecontainer.EnvVar{Name: k, Value: v}) + } + } + } + return opts, nil +} + func (cm *containerManagerImpl) SystemCgroupsLimit() v1.ResourceList { cpuLimit := int64(0) @@ -822,13 +922,3 @@ func (cm *containerManagerImpl) GetCapacity() v1.ResourceList { defer cm.RUnlock() return cm.capacity } - -// GetDevicePluginHandler returns the DevicePluginHandler -func (m *containerManagerImpl) GetDevicePluginHandler() DevicePluginHandler { - return m.devicePluginHdler -} - -// SetDevicePluginHandler sets the DevicePluginHandler -func (m *containerManagerImpl) SetDevicePluginHandler(d DevicePluginHandler) { - m.devicePluginHdler = d -} diff --git a/pkg/kubelet/cm/container_manager_stub.go b/pkg/kubelet/cm/container_manager_stub.go index 941913aceee16..203b682a1bcc4 100644 --- a/pkg/kubelet/cm/container_manager_stub.go +++ b/pkg/kubelet/cm/container_manager_stub.go @@ -19,6 +19,8 @@ package cm import ( "github.com/golang/glog" "k8s.io/api/core/v1" + + kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" ) type containerManagerStub struct{} @@ -66,6 +68,10 @@ func (cm *containerManagerStub) NewPodContainerManager() PodContainerManager { return &podContainerManagerStub{} } +func (cm *containerManagerStub) GetResources(pod *v1.Pod, container *v1.Container, activePods []*v1.Pod) (*kubecontainer.RunContainerOptions, error) { + return &kubecontainer.RunContainerOptions{}, nil +} + func NewStubContainerManager() ContainerManager { return &containerManagerStub{} } diff --git a/pkg/kubelet/cm/container_manager_unsupported.go b/pkg/kubelet/cm/container_manager_unsupported.go index a9984b9e25d0f..9232d031fa836 100644 --- a/pkg/kubelet/cm/container_manager_unsupported.go +++ b/pkg/kubelet/cm/container_manager_unsupported.go @@ -72,6 +72,10 @@ func (cm *unsupportedContainerManager) NewPodContainerManager() PodContainerMana return &unsupportedPodContainerManager{} } -func NewContainerManager(_ mount.Interface, _ cadvisor.Interface, _ NodeConfig, failSwapOn bool, recorder record.EventRecorder) (ContainerManager, error) { +func (cm *unsupportedContainerManager) GetResources(pod *v1.Pod, container *v1.Container, activePods []*v1.Pod) (*kubecontainer.RunContainerOptions, error) { + return &kubecontainer.RunContainerOptions{}, nil +} + +func NewContainerManager(_ mount.Interface, _ cadvisor.Interface, _ NodeConfig, failSwapOn bool, devicePluginEnabled bool, recorder record.EventRecorder) (ContainerManager, error) { return &unsupportedContainerManager{}, nil } diff --git a/pkg/kubelet/cm/container_manager_windows.go b/pkg/kubelet/cm/container_manager_windows.go index 1c431f69128a9..e6fac323a321a 100644 --- a/pkg/kubelet/cm/container_manager_windows.go +++ b/pkg/kubelet/cm/container_manager_windows.go @@ -38,6 +38,6 @@ func (cm *containerManagerImpl) Start(_ *v1.Node, _ ActivePodsFunc) error { return nil } -func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.Interface, nodeConfig NodeConfig, failSwapOn bool, recorder record.EventRecorder) (ContainerManager, error) { +func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.Interface, nodeConfig NodeConfig, failSwapOn bool, devicePluginEnabled bool, recorder record.EventRecorder) (ContainerManager, error) { return &containerManagerImpl{}, nil } diff --git a/pkg/kubelet/cm/device_plugin_handler.go b/pkg/kubelet/cm/device_plugin_handler.go index 7c30168ec278a..8360fad9d1778 100644 --- a/pkg/kubelet/cm/device_plugin_handler.go +++ b/pkg/kubelet/cm/device_plugin_handler.go @@ -18,38 +18,214 @@ package cm import ( "fmt" + "sync" "github.com/golang/glog" + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/util/sets" pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha1" "k8s.io/kubernetes/pkg/kubelet/deviceplugin" ) +// podDevices represents a list of pod to device Id mappings. +type containerDevices map[string]sets.String +type podDevices map[string]containerDevices + +func (pdev podDevices) pods() sets.String { + ret := sets.NewString() + for k := range pdev { + ret.Insert(k) + } + return ret +} + +func (pdev podDevices) insert(podUID, contName string, device string) { + if _, exists := pdev[podUID]; !exists { + pdev[podUID] = make(containerDevices) + } + if _, exists := pdev[podUID][contName]; !exists { + pdev[podUID][contName] = sets.NewString() + } + pdev[podUID][contName].Insert(device) +} + +func (pdev podDevices) getDevices(podUID, contName string) sets.String { + containers, exists := pdev[podUID] + if !exists { + return nil + } + devices, exists := containers[contName] + if !exists { + return nil + } + return devices +} + +func (pdev podDevices) delete(pods []string) { + for _, uid := range pods { + delete(pdev, uid) + } +} + +func (pdev podDevices) devices() sets.String { + ret := sets.NewString() + for _, containerDevices := range pdev { + for _, deviceSet := range containerDevices { + ret = ret.Union(deviceSet) + } + } + return ret +} + +type DevicePluginHandler interface { + // Start starts device plugin registration service. + Start() error + // Devices returns all of registered devices keyed by resourceName. + Devices() map[string][]*pluginapi.Device + // Allocate attempts to allocate all of required extended resources for + // the input container, issues an Allocate rpc request for each of such + // resources, and returns their AllocateResponses on success. + Allocate(pod *v1.Pod, container *v1.Container, activePods []*v1.Pod) ([]*pluginapi.AllocateResponse, error) +} + type DevicePluginHandlerImpl struct { + sync.Mutex devicePluginManager deviceplugin.Manager + // devicePluginManagerMonitorCallback is used for testing only. + devicePluginManagerMonitorCallback deviceplugin.MonitorCallback + // allDevices contains all of registered resourceNames and their exported device IDs. + allDevices map[string]sets.String + // allocatedDevices contains pod to allocated device mapping, keyed by resourceName. + allocatedDevices map[string]podDevices } // NewDevicePluginHandler create a DevicePluginHandler -func NewDevicePluginHandler() (*DevicePluginHandlerImpl, error) { - glog.V(2).Infof("Starting Device Plugin Handler") +// updateCapacityFunc is called to update ContainerManager capacity when +// device capacity changes. +func NewDevicePluginHandlerImpl(updateCapacityFunc func(v1.ResourceList)) (*DevicePluginHandlerImpl, error) { + glog.V(2).Infof("Creating Device Plugin Handler") + handler := &DevicePluginHandlerImpl{ + allDevices: make(map[string]sets.String), + allocatedDevices: devicesInUse(), + } - mgr, err := deviceplugin.NewManagerImpl(pluginapi.DevicePluginPath, - func(r string, a, u, d []*pluginapi.Device) {}) + deviceManagerMonitorCallback := func(resourceName string, added, updated, deleted []*pluginapi.Device) { + var capacity = v1.ResourceList{} + kept := append(updated, added...) + if _, ok := handler.allDevices[resourceName]; !ok { + handler.allDevices[resourceName] = sets.NewString() + } + // For now, DevicePluginHandler only keeps track of healthy devices. + // We can revisit this later when the need comes to track unhealthy devices here. + for _, dev := range kept { + if dev.Health == pluginapi.Healthy { + handler.allDevices[resourceName].Insert(dev.ID) + } else { + handler.allDevices[resourceName].Delete(dev.ID) + } + } + for _, dev := range deleted { + handler.allDevices[resourceName].Delete(dev.ID) + } + capacity[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(handler.allDevices[resourceName].Len()), resource.DecimalSI) + updateCapacityFunc(capacity) + } + mgr, err := deviceplugin.NewManagerImpl(pluginapi.KubeletSocket, deviceManagerMonitorCallback) if err != nil { - return nil, fmt.Errorf("Failed to initialize device plugin: %+v", err) + return nil, fmt.Errorf("Failed to initialize device plugin manager: %+v", err) } - if err := mgr.Start(); err != nil { - return nil, err - } + handler.devicePluginManager = mgr + handler.devicePluginManagerMonitorCallback = deviceManagerMonitorCallback + return handler, nil +} - return &DevicePluginHandlerImpl{ - devicePluginManager: mgr, - }, nil +func (h *DevicePluginHandlerImpl) Start() error { + return h.devicePluginManager.Start() } -// TODO cache this func (h *DevicePluginHandlerImpl) Devices() map[string][]*pluginapi.Device { return h.devicePluginManager.Devices() } + +func (h *DevicePluginHandlerImpl) Allocate(pod *v1.Pod, container *v1.Container, activePods []*v1.Pod) ([]*pluginapi.AllocateResponse, error) { + var ret []*pluginapi.AllocateResponse + h.updateAllocatedDevices(activePods) + for k, v := range container.Resources.Limits { + resource := string(k) + needed := int(v.Value()) + glog.V(3).Infof("needs %d %s", needed, resource) + if !deviceplugin.IsDeviceName(k) || needed == 0 { + continue + } + h.Lock() + // Gets list of devices that have already been allocated. + // This can happen if a container restarts for example. + if h.allocatedDevices[resource] == nil { + h.allocatedDevices[resource] = make(podDevices) + } + devices := h.allocatedDevices[resource].getDevices(string(pod.UID), container.Name) + if devices != nil { + glog.V(3).Infof("Found pre-allocated devices for resource %s container %q in Pod %q: %v", resource, container.Name, pod.UID, devices.List()) + needed = needed - devices.Len() + } + // Get Devices in use. + devicesInUse := h.allocatedDevices[resource].devices() + // Get a list of available devices. + available := h.allDevices[resource].Difference(devicesInUse) + if int(available.Len()) < needed { + h.Unlock() + return nil, fmt.Errorf("requested number of devices unavailable for %s. Requested: %d, Available: %d", resource, needed, available.Len()) + } + allocated := available.UnsortedList()[:needed] + for _, device := range allocated { + // Update internal allocated device cache. + h.allocatedDevices[resource].insert(string(pod.UID), container.Name, device) + } + h.Unlock() + // devicePluginManager.Allocate involves RPC calls to device plugin, which + // could be heavy-weight. Therefore we want to perform this operation outside + // mutex lock. Note if Allcate call fails, we may leave container resources + // partially allocated for the failed container. We rely on updateAllocatedDevices() + // to garbage collect these resources later. Another side effect is that if + // we have X resource A and Y resource B in total, and two containers, container1 + // and container2 both require X resource A and Y resource B. Both allocation + // requests may fail if we serve them in mixed order. + // TODO: may revisit this part later if we see inefficient resource allocation + // in real use as the result of this. + resp, err := h.devicePluginManager.Allocate(resource, append(devices.UnsortedList(), allocated...)) + if err != nil { + return nil, err + } + ret = append(ret, resp) + } + return ret, nil +} + +// devicesInUse returns a list of custom devices in use along with the +// respective pods that are using them. +func devicesInUse() map[string]podDevices { + // TODO: gets the initial state from checkpointing. + return make(map[string]podDevices) +} + +// updateAllocatedDevices updates the list of GPUs in use. +// It gets a list of active pods and then frees any GPUs that are bound to +// terminated pods. Returns error on failure. +func (h *DevicePluginHandlerImpl) updateAllocatedDevices(activePods []*v1.Pod) { + h.Lock() + defer h.Unlock() + activePodUids := sets.NewString() + for _, pod := range activePods { + activePodUids.Insert(string(pod.UID)) + } + for _, podDevs := range h.allocatedDevices { + allocatedPodUids := podDevs.pods() + podsToBeRemoved := allocatedPodUids.Difference(activePodUids) + glog.V(5).Infof("pods to be removed: %v", podsToBeRemoved.List()) + podDevs.delete(podsToBeRemoved.List()) + } +} diff --git a/pkg/kubelet/cm/device_plugin_handler_stub.go b/pkg/kubelet/cm/device_plugin_handler_stub.go new file mode 100644 index 0000000000000..a70c281086caa --- /dev/null +++ b/pkg/kubelet/cm/device_plugin_handler_stub.go @@ -0,0 +1,42 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cm + +import ( + "k8s.io/api/core/v1" + pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha1" +) + +// A simple stub implementation for DevicePluginHandler. +type DevicePluginHandlerStub struct{} + +func NewDevicePluginHandlerStub() (*DevicePluginHandlerStub, error) { + return &DevicePluginHandlerStub{}, nil +} + +func (h *DevicePluginHandlerStub) Start() error { + return nil +} + +func (h *DevicePluginHandlerStub) Devices() map[string][]*pluginapi.Device { + return make(map[string][]*pluginapi.Device) +} + +func (h *DevicePluginHandlerStub) Allocate(pod *v1.Pod, container *v1.Container, activePods []*v1.Pod) ([]*pluginapi.AllocateResponse, error) { + var ret []*pluginapi.AllocateResponse + return ret, nil +} diff --git a/pkg/kubelet/cm/device_plugin_handler_test.go b/pkg/kubelet/cm/device_plugin_handler_test.go new file mode 100644 index 0000000000000..e178cd9c7c89c --- /dev/null +++ b/pkg/kubelet/cm/device_plugin_handler_test.go @@ -0,0 +1,250 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cm + +import ( + "flag" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/uuid" + pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha1" +) + +func TestUpdateCapacity(t *testing.T) { + var expected = v1.ResourceList{} + as := assert.New(t) + verifyCapacityFunc := func(updates v1.ResourceList) { + as.Equal(expected, updates) + } + testDevicePluginHandler, err := NewDevicePluginHandlerImpl(verifyCapacityFunc) + as.NotNil(testDevicePluginHandler) + as.Nil(err) + + devs := []*pluginapi.Device{ + {ID: "Device1", Health: pluginapi.Healthy}, + {ID: "Device2", Health: pluginapi.Healthy}, + {ID: "Device3", Health: pluginapi.Unhealthy}, + } + + resourceName := "resource1" + // Adds three devices for resource1, two healthy and one unhealthy. + // Expects capacity for resource1 to be 2. + expected[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(2), resource.DecimalSI) + testDevicePluginHandler.devicePluginManagerMonitorCallback(resourceName, devs, []*pluginapi.Device{}, []*pluginapi.Device{}) + // Deletes an unhealthy device should NOT change capacity. + testDevicePluginHandler.devicePluginManagerMonitorCallback(resourceName, []*pluginapi.Device{}, []*pluginapi.Device{}, []*pluginapi.Device{devs[2]}) + // Updates a healthy device to unhealthy should reduce capacity by 1. + expected[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(1), resource.DecimalSI) + // Deletes a healthy device should reduce capacity by 1. + expected[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(0), resource.DecimalSI) + // Tests adding another resource. + delete(expected, v1.ResourceName(resourceName)) + resourceName2 := "resource2" + expected[v1.ResourceName(resourceName2)] = *resource.NewQuantity(int64(2), resource.DecimalSI) + testDevicePluginHandler.devicePluginManagerMonitorCallback(resourceName2, devs, []*pluginapi.Device{}, []*pluginapi.Device{}) +} + +type stringPairType struct { + value1 string + value2 string +} + +// DevicePluginManager stub to test device Allocation behavior. +type DevicePluginManagerTestStub struct { + // All data structs are keyed by resourceName+DevId + devRuntimeDevices map[string][]stringPairType + devRuntimeMounts map[string][]stringPairType + devRuntimeEnvs map[string][]stringPairType +} + +func NewDevicePluginManagerTestStub() (*DevicePluginManagerTestStub, error) { + return &DevicePluginManagerTestStub{ + devRuntimeDevices: make(map[string][]stringPairType), + devRuntimeMounts: make(map[string][]stringPairType), + devRuntimeEnvs: make(map[string][]stringPairType), + }, nil +} + +func (m *DevicePluginManagerTestStub) Start() error { + return nil +} + +func (m *DevicePluginManagerTestStub) Devices() map[string][]*pluginapi.Device { + return make(map[string][]*pluginapi.Device) +} + +func (m *DevicePluginManagerTestStub) Allocate(resourceName string, devIds []string) (*pluginapi.AllocateResponse, error) { + resp := new(pluginapi.AllocateResponse) + for _, id := range devIds { + key := resourceName + id + fmt.Printf("Alloc device %q for resource %q\n", id, resourceName) + devRuntime := new(pluginapi.DeviceRuntimeSpec) + for _, dev := range m.devRuntimeDevices[key] { + devRuntime.Devices = append(devRuntime.Devices, &pluginapi.DeviceSpec{ + ContainerPath: dev.value1, + HostPath: dev.value2, + Permissions: "mrw", + }) + } + for _, mount := range m.devRuntimeMounts[key] { + fmt.Printf("Add mount %q %q\n", mount.value1, mount.value2) + devRuntime.Mounts = append(devRuntime.Mounts, &pluginapi.Mount{ + ContainerPath: mount.value1, + HostPath: mount.value2, + ReadOnly: true, + }) + } + devRuntime.Envs = make(map[string]string) + for _, env := range m.devRuntimeEnvs[key] { + devRuntime.Envs[env.value1] = env.value2 + } + resp.Spec = append(resp.Spec, devRuntime) + } + return resp, nil +} + +func (m *DevicePluginManagerTestStub) Stop() error { + return nil +} + +func TestPodContainerDeviceAllocation(t *testing.T) { + flag.Set("alsologtostderr", fmt.Sprintf("%t", true)) + var logLevel string + flag.StringVar(&logLevel, "logLevel", "4", "test") + flag.Lookup("v").Value.Set(logLevel) + + var activePods []*v1.Pod + resourceName1 := "domain1.com/resource1" + resourceQuantity1 := *resource.NewQuantity(int64(2), resource.DecimalSI) + devId1 := "dev1" + devId2 := "dev2" + resourceName2 := "domain2.com/resource2" + resourceQuantity2 := *resource.NewQuantity(int64(1), resource.DecimalSI) + devId3 := "dev3" + devId4 := "dev4" + + m, err := NewDevicePluginManagerTestStub() + as := assert.New(t) + as.Nil(err) + monitorCallback := func(resourceName string, added, updated, deleted []*pluginapi.Device) {} + + testDevicePluginHandler := &DevicePluginHandlerImpl{ + devicePluginManager: m, + devicePluginManagerMonitorCallback: monitorCallback, + allDevices: make(map[string]sets.String), + allocatedDevices: make(map[string]podDevices), + } + testDevicePluginHandler.allDevices[resourceName1] = sets.NewString() + testDevicePluginHandler.allDevices[resourceName1].Insert(devId1) + testDevicePluginHandler.allDevices[resourceName1].Insert(devId2) + testDevicePluginHandler.allDevices[resourceName2] = sets.NewString() + testDevicePluginHandler.allDevices[resourceName2].Insert(devId3) + testDevicePluginHandler.allDevices[resourceName2].Insert(devId4) + + m.devRuntimeDevices[resourceName1+devId1] = append(m.devRuntimeDevices[resourceName1+devId1], stringPairType{"/dev/aaa", "/dev/aaa"}) + m.devRuntimeDevices[resourceName1+devId1] = append(m.devRuntimeDevices[resourceName1+devId1], stringPairType{"/dev/bbb", "/dev/bbb"}) + m.devRuntimeDevices[resourceName1+devId2] = append(m.devRuntimeDevices[resourceName1+devId2], stringPairType{"/dev/ccc", "/dev/ccc"}) + m.devRuntimeMounts[resourceName1+devId1] = append(m.devRuntimeMounts[resourceName1+devId1], stringPairType{"/container_dir1/file1", "host_dir1/file1"}) + m.devRuntimeMounts[resourceName1+devId2] = append(m.devRuntimeMounts[resourceName1+devId2], stringPairType{"/container_dir1/file1", "host_dir1/file1"}) + m.devRuntimeEnvs[resourceName1+devId2] = append(m.devRuntimeEnvs[resourceName1+devId2], stringPairType{"key1", "val1"}) + m.devRuntimeEnvs[resourceName2+devId3] = append(m.devRuntimeEnvs[resourceName2+devId3], stringPairType{"key2", "val2"}) + m.devRuntimeEnvs[resourceName2+devId4] = append(m.devRuntimeEnvs[resourceName2+devId4], stringPairType{"key2", "val2"}) + + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + UID: uuid.NewUUID(), + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: string(uuid.NewUUID()), + Resources: v1.ResourceRequirements{ + Limits: v1.ResourceList{ + v1.ResourceName(resourceName1): resourceQuantity1, + v1.ResourceName("cpu"): resourceQuantity1, + v1.ResourceName(resourceName2): resourceQuantity2, + }, + }, + }, + }, + }, + } + + cm := &containerManagerImpl{ + devicePluginHandler: testDevicePluginHandler, + } + activePods = append(activePods, pod) + runContainerOpts, err := cm.GetResources(pod, &pod.Spec.Containers[0], activePods) + as.Equal(len(runContainerOpts.Devices), 3) + // Two devices require to mount the same path. Expects a single mount entry to be created. + as.Equal(len(runContainerOpts.Mounts), 1) + as.Equal(runContainerOpts.Mounts[0].ContainerPath, "/container_dir1/file1") + as.Equal(len(runContainerOpts.Envs), 2) + + // Requesting to create a pod without enough resources should fail. + failPod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + UID: uuid.NewUUID(), + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: string(uuid.NewUUID()), + Resources: v1.ResourceRequirements{ + Limits: v1.ResourceList{ + v1.ResourceName(resourceName1): resourceQuantity1, + }, + }, + }, + }, + }, + } + runContainerOpts2, err := cm.GetResources(failPod, &failPod.Spec.Containers[0], activePods) + as.NotNil(err) + as.Equal(len(runContainerOpts2.Devices), 0) + as.Equal(len(runContainerOpts2.Mounts), 0) + as.Equal(len(runContainerOpts2.Envs), 0) + + // Requesting to create a new pod with a single resourceName2 should succeed. + newPod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + UID: uuid.NewUUID(), + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: string(uuid.NewUUID()), + Resources: v1.ResourceRequirements{ + Limits: v1.ResourceList{ + v1.ResourceName(resourceName2): resourceQuantity2, + }, + }, + }, + }, + }, + } + runContainerOpts3, err := cm.GetResources(newPod, &newPod.Spec.Containers[0], activePods) + as.Nil(err) + as.Equal(len(runContainerOpts3.Envs), 1) +} diff --git a/pkg/kubelet/deviceplugin/BUILD b/pkg/kubelet/deviceplugin/BUILD index ccf6f0ef96bc9..db841b8a990c3 100644 --- a/pkg/kubelet/deviceplugin/BUILD +++ b/pkg/kubelet/deviceplugin/BUILD @@ -11,18 +11,20 @@ load( go_library( name = "go_default_library", srcs = [ + "device_plugin_stub.go", "endpoint.go", "manager.go", - "mock_device_plugin.go", "types.go", "utils.go", ], tags = ["automanaged"], deps = [ + "//pkg/api/v1/helper:go_default_library", "//pkg/kubelet/apis/deviceplugin/v1alpha1:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/golang.org/x/net/context:go_default_library", "//vendor/google.golang.org/grpc:go_default_library", + "//vendor/k8s.io/api/core/v1:go_default_library", ], ) diff --git a/pkg/kubelet/deviceplugin/mock_device_plugin.go b/pkg/kubelet/deviceplugin/device_plugin_stub.go similarity index 74% rename from pkg/kubelet/deviceplugin/mock_device_plugin.go rename to pkg/kubelet/deviceplugin/device_plugin_stub.go index 676536cfe027e..2bd3ff4c20027 100644 --- a/pkg/kubelet/deviceplugin/mock_device_plugin.go +++ b/pkg/kubelet/deviceplugin/device_plugin_stub.go @@ -20,6 +20,7 @@ import ( "log" "net" "os" + "time" "golang.org/x/net/context" "google.golang.org/grpc" @@ -27,8 +28,8 @@ import ( pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha1" ) -// MockDevicePlugin is a mock device plugin -type MockDevicePlugin struct { +// Stub implementation for DevicePlugin. +type Stub struct { devs []*pluginapi.Device socket string @@ -38,9 +39,9 @@ type MockDevicePlugin struct { server *grpc.Server } -// NewMockDevicePlugin returns an initialized MockDevicePlugin -func NewMockDevicePlugin(devs []*pluginapi.Device, socket string) *MockDevicePlugin { - return &MockDevicePlugin{ +// NewDevicePluginStub returns an initialized DevicePlugin Stub. +func NewDevicePluginStub(devs []*pluginapi.Device, socket string) *Stub { + return &Stub{ devs: devs, socket: socket, @@ -50,7 +51,7 @@ func NewMockDevicePlugin(devs []*pluginapi.Device, socket string) *MockDevicePlu } // Start starts the gRPC server of the device plugin -func (m *MockDevicePlugin) Start() error { +func (m *Stub) Start() error { err := m.cleanup() if err != nil { return err @@ -65,20 +66,28 @@ func (m *MockDevicePlugin) Start() error { pluginapi.RegisterDevicePluginServer(m.server, m) go m.server.Serve(sock) + // Wait till grpc server is ready. + for i := 0; i < 10; i++ { + services := m.server.GetServiceInfo() + if len(services) > 0 { + break + } + time.Sleep(1 * time.Second) + } log.Println("Starting to serve on", m.socket) return nil } // Stop stops the gRPC server -func (m *MockDevicePlugin) Stop() error { +func (m *Stub) Stop() error { m.server.Stop() return m.cleanup() } // ListAndWatch lists devices and update that list according to the Update call -func (m *MockDevicePlugin) ListAndWatch(e *pluginapi.Empty, s pluginapi.DevicePlugin_ListAndWatchServer) error { +func (m *Stub) ListAndWatch(e *pluginapi.Empty, s pluginapi.DevicePlugin_ListAndWatchServer) error { log.Println("ListAndWatch") var devs []*pluginapi.Device @@ -102,19 +111,19 @@ func (m *MockDevicePlugin) ListAndWatch(e *pluginapi.Empty, s pluginapi.DevicePl } // Update allows the device plugin to send new devices through ListAndWatch -func (m *MockDevicePlugin) Update(devs []*pluginapi.Device) { +func (m *Stub) Update(devs []*pluginapi.Device) { m.update <- devs } // Allocate does a mock allocation -func (m *MockDevicePlugin) Allocate(ctx context.Context, r *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) { +func (m *Stub) Allocate(ctx context.Context, r *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) { log.Printf("Allocate, %+v", r) var response pluginapi.AllocateResponse return &response, nil } -func (m *MockDevicePlugin) cleanup() error { +func (m *Stub) cleanup() error { if err := os.Remove(m.socket); err != nil && !os.IsNotExist(err) { return err } diff --git a/pkg/kubelet/deviceplugin/endpoint.go b/pkg/kubelet/deviceplugin/endpoint.go index 2e418c5684391..f0523471a38d4 100644 --- a/pkg/kubelet/deviceplugin/endpoint.go +++ b/pkg/kubelet/deviceplugin/endpoint.go @@ -29,6 +29,9 @@ import ( pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha1" ) +// endpoint maps to a single registered device plugin. It is responsible +// for managing gRPC communications with the device plugin and caching +// device states reported by the device plugin. type endpoint struct { client pluginapi.DevicePluginClient @@ -44,9 +47,11 @@ type endpoint struct { ctx context.Context } +// newEndpoint creates a new endpoint for the given resourceName. func newEndpoint(socketPath, resourceName string, callback MonitorCallback) (*endpoint, error) { client, err := dial(socketPath) if err != nil { + glog.Errorf("Can't create new endpoint with path %s err %v", socketPath, err) return nil, err } @@ -66,49 +71,65 @@ func newEndpoint(socketPath, resourceName string, callback MonitorCallback) (*en }, nil } -func (e *endpoint) list() (pluginapi.DevicePlugin_ListAndWatchClient, error) { - glog.V(2).Infof("Starting ListAndWatch") +func (e *endpoint) getDevices() []*pluginapi.Device { + e.mutex.Lock() + defer e.mutex.Unlock() + return copyDevices(e.devices) +} +// list initializes ListAndWatch gRPC call for the device plugin and gets the +// initial list of the devices. Returns ListAndWatch gRPC stream on success. +func (e *endpoint) list() (pluginapi.DevicePlugin_ListAndWatchClient, error) { + glog.V(3).Infof("Starting List") stream, err := e.client.ListAndWatch(e.ctx, &pluginapi.Empty{}) if err != nil { - glog.Errorf(ErrListAndWatch, e.resourceName, err) + glog.Errorf(errListAndWatch, e.resourceName, err) return nil, err } devs, err := stream.Recv() if err != nil { - glog.Errorf(ErrListAndWatch, e.resourceName, err) + glog.Errorf(errListAndWatch, e.resourceName, err) return nil, err } devices := make(map[string]*pluginapi.Device) + var added, updated, deleted []*pluginapi.Device for _, d := range devs.Devices { devices[d.ID] = d + added = append(added, cloneDevice(d)) } e.mutex.Lock() e.devices = devices e.mutex.Unlock() + e.callback(e.resourceName, added, updated, deleted) + return stream, nil } +// listAndWatch blocks on receiving ListAndWatch gRPC stream updates. Each ListAndWatch +// stream update contains a new list of device states. listAndWatch compares the new +// device states with its cached states to get list of new, updated, and deleted devices. +// It then issues a callback to pass this information to the device_plugin_handler which +// will adjust the resource available information accordingly. func (e *endpoint) listAndWatch(stream pluginapi.DevicePlugin_ListAndWatchClient) { - glog.V(2).Infof("Starting ListAndWatch") + glog.V(3).Infof("Starting ListAndWatch") devices := make(map[string]*pluginapi.Device) e.mutex.Lock() for _, d := range e.devices { - devices[d.ID] = CloneDevice(d) + devices[d.ID] = cloneDevice(d) } e.mutex.Unlock() for { response, err := stream.Recv() if err != nil { - glog.Errorf(ErrListAndWatch, e.resourceName, err) + glog.Errorf(errListAndWatch, e.resourceName, err) return } @@ -126,7 +147,7 @@ func (e *endpoint) listAndWatch(stream pluginapi.DevicePlugin_ListAndWatchClient glog.V(2).Infof("New device for Endpoint %s: %v", e.resourceName, d) devices[d.ID] = d - added = append(added, CloneDevice(d)) + added = append(added, cloneDevice(d)) continue } @@ -142,7 +163,7 @@ func (e *endpoint) listAndWatch(stream pluginapi.DevicePlugin_ListAndWatchClient } devices[d.ID] = d - updated = append(updated, CloneDevice(d)) + updated = append(updated, cloneDevice(d)) } var deleted []*pluginapi.Device @@ -153,7 +174,7 @@ func (e *endpoint) listAndWatch(stream pluginapi.DevicePlugin_ListAndWatchClient glog.Errorf("Device %s was deleted", d.ID) - deleted = append(deleted, CloneDevice(d)) + deleted = append(deleted, cloneDevice(d)) delete(devices, id) } @@ -166,14 +187,10 @@ func (e *endpoint) listAndWatch(stream pluginapi.DevicePlugin_ListAndWatchClient } -func (e *endpoint) allocate(devs []*pluginapi.Device) (*pluginapi.AllocateResponse, error) { - var ids []string - for _, d := range devs { - ids = append(ids, d.ID) - } - +// allocate issues Allocate gRPC call to the device plugin. +func (e *endpoint) allocate(devs []string) (*pluginapi.AllocateResponse, error) { return e.client.Allocate(context.Background(), &pluginapi.AllocateRequest{ - DevicesIDs: ids, + DevicesIDs: devs, }) } @@ -181,6 +198,7 @@ func (e *endpoint) stop() { e.cancel() } +// dial establishes the gRPC communication with the registered device plugin. func dial(unixSocketPath string) (pluginapi.DevicePluginClient, error) { c, err := grpc.Dial(unixSocketPath, grpc.WithInsecure(), grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) { @@ -189,7 +207,7 @@ func dial(unixSocketPath string) (pluginapi.DevicePluginClient, error) { ) if err != nil { - return nil, fmt.Errorf(pluginapi.ErrFailedToDialDevicePlugin+" %v", err) + return nil, fmt.Errorf(errFailedToDialDevicePlugin+" %v", err) } return pluginapi.NewDevicePluginClient(c), nil diff --git a/pkg/kubelet/deviceplugin/endpoint_test.go b/pkg/kubelet/deviceplugin/endpoint_test.go index 9713584949e64..a1786711de9b4 100644 --- a/pkg/kubelet/deviceplugin/endpoint_test.go +++ b/pkg/kubelet/deviceplugin/endpoint_test.go @@ -17,7 +17,6 @@ limitations under the License. package deviceplugin import ( - "os" "path" "testing" "time" @@ -32,8 +31,7 @@ var ( ) func TestNewEndpoint(t *testing.T) { - wd, _ := os.Getwd() - socket := path.Join(wd, esocketName) + socket := path.Join("/tmp", esocketName) devs := []*pluginapi.Device{ {ID: "ADeviceId", Health: pluginapi.Healthy}, @@ -44,8 +42,7 @@ func TestNewEndpoint(t *testing.T) { } func TestList(t *testing.T) { - wd, _ := os.Getwd() - socket := path.Join(wd, esocketName) + socket := path.Join("/tmp", esocketName) devs := []*pluginapi.Device{ {ID: "ADeviceId", Health: pluginapi.Healthy}, @@ -70,8 +67,7 @@ func TestList(t *testing.T) { } func TestListAndWatch(t *testing.T) { - wd, _ := os.Getwd() - socket := path.Join(wd, esocketName) + socket := path.Join("/tmp", esocketName) devs := []*pluginapi.Device{ {ID: "ADeviceId", Health: pluginapi.Healthy}, @@ -118,8 +114,8 @@ func TestListAndWatch(t *testing.T) { } -func esetup(t *testing.T, devs []*pluginapi.Device, socket, resourceName string, callback MonitorCallback) (*MockDevicePlugin, *endpoint) { - p := NewMockDevicePlugin(devs, socket) +func esetup(t *testing.T, devs []*pluginapi.Device, socket, resourceName string, callback MonitorCallback) (*Stub, *endpoint) { + p := NewDevicePluginStub(devs, socket) err := p.Start() require.NoError(t, err) @@ -130,7 +126,7 @@ func esetup(t *testing.T, devs []*pluginapi.Device, socket, resourceName string, return p, e } -func ecleanup(t *testing.T, p *MockDevicePlugin, e *endpoint) { +func ecleanup(t *testing.T, p *Stub, e *endpoint) { p.Stop() e.stop() } diff --git a/pkg/kubelet/deviceplugin/manager.go b/pkg/kubelet/deviceplugin/manager.go index f941b38c62644..041d538422126 100644 --- a/pkg/kubelet/deviceplugin/manager.go +++ b/pkg/kubelet/deviceplugin/manager.go @@ -21,6 +21,7 @@ import ( "net" "os" "path/filepath" + "sync" "github.com/golang/glog" "golang.org/x/net/context" @@ -29,15 +30,27 @@ import ( pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha1" ) -// NewManagerImpl creates a new manager on the socket `socketPath` and can -// rebuild state from devices and available []Device. -// f is the callback that is called when a device becomes unhealthy +// ManagerImpl is the structure in charge of managing Device Plugins. +type ManagerImpl struct { + socketname string + socketdir string + + Endpoints map[string]*endpoint // Key is ResourceName + mutex sync.Mutex + + callback MonitorCallback + + server *grpc.Server +} + +// NewManagerImpl creates a new manager on the socket `socketPath`. +// f is the callback that is called when a device becomes unhealthy. // socketPath is present for testing purposes in production this is pluginapi.KubeletSocket func NewManagerImpl(socketPath string, f MonitorCallback) (*ManagerImpl, error) { glog.V(2).Infof("Creating Device Plugin manager at %s", socketPath) if socketPath == "" || !filepath.IsAbs(socketPath) { - return nil, fmt.Errorf(ErrBadSocket+" %v", socketPath) + return nil, fmt.Errorf(errBadSocket+" %v", socketPath) } dir, file := filepath.Split(socketPath) @@ -50,6 +63,26 @@ func NewManagerImpl(socketPath string, f MonitorCallback) (*ManagerImpl, error) }, nil } +func removeContents(dir string) error { + d, err := os.Open(dir) + if err != nil { + return err + } + defer d.Close() + names, err := d.Readdirnames(-1) + if err != nil { + return err + } + for _, name := range names { + // TODO: skip checkpoint file and check for file type. + err = os.RemoveAll(filepath.Join(dir, name)) + if err != nil { + return err + } + } + return nil +} + // Start starts the Device Plugin Manager func (m *ManagerImpl) Start() error { glog.V(2).Infof("Starting Device Plugin manager") @@ -57,14 +90,20 @@ func (m *ManagerImpl) Start() error { socketPath := filepath.Join(m.socketdir, m.socketname) os.MkdirAll(m.socketdir, 0755) + // Removes all stale sockets in m.socketdir. Device plugins can monitor + // this and use it as a signal to re-register with the new Kubelet. + if err := removeContents(m.socketdir); err != nil { + glog.Errorf("Fail to clean up stale contents under %s: %+v", m.socketdir, err) + } + if err := os.Remove(socketPath); err != nil && !os.IsNotExist(err) { - glog.Errorf(ErrRemoveSocket+" %+v", err) + glog.Errorf(errRemoveSocket+" %+v", err) return err } s, err := net.Listen("unix", socketPath) if err != nil { - glog.Errorf(ErrListenSocket+" %+v", err) + glog.Errorf(errListenSocket+" %+v", err) return err } @@ -77,39 +116,33 @@ func (m *ManagerImpl) Start() error { } // Devices is the map of devices that are known by the Device -// Plugin manager with the Kind of the devices as key +// Plugin manager with the kind of the devices as key func (m *ManagerImpl) Devices() map[string][]*pluginapi.Device { - glog.V(2).Infof("Devices called") - m.mutex.Lock() defer m.mutex.Unlock() devs := make(map[string][]*pluginapi.Device) for k, e := range m.Endpoints { - glog.V(2).Infof("Endpoint: %+v: %+v", k, e) - e.mutex.Lock() - devs[k] = copyDevices(e.devices) - e.mutex.Unlock() + glog.V(3).Infof("Endpoint: %+v: %+v", k, e) + devs[k] = e.getDevices() } return devs } -// Allocate is the call that you can use to allocate a set of Devices -func (m *ManagerImpl) Allocate(resourceName string, - devs []*pluginapi.Device) (*pluginapi.AllocateResponse, error) { - - m.mutex.Lock() - defer m.mutex.Unlock() +// Allocate is the call that you can use to allocate a set of devices +// from the registered device plugins. +func (m *ManagerImpl) Allocate(resourceName string, devs []string) (*pluginapi.AllocateResponse, error) { if len(devs) == 0 { return nil, nil } - glog.Infof("Recieved request for devices %v for device plugin %s", + glog.V(3).Infof("Recieved allocation request for devices %v for device plugin %s", devs, resourceName) - + m.mutex.Lock() e, ok := m.Endpoints[resourceName] + m.mutex.Unlock() if !ok { return nil, fmt.Errorf("Unknown Device Plugin %s", resourceName) } @@ -117,45 +150,46 @@ func (m *ManagerImpl) Allocate(resourceName string, return e.allocate(devs) } -// Register registers a device plugin +// Register registers a device plugin. func (m *ManagerImpl) Register(ctx context.Context, r *pluginapi.RegisterRequest) (*pluginapi.Empty, error) { - glog.V(2).Infof("Got request for Device Plugin %s", r.ResourceName) - if r.Version != pluginapi.Version { - return &pluginapi.Empty{}, - fmt.Errorf(pluginapi.ErrUnsuportedVersion) + return &pluginapi.Empty{}, fmt.Errorf(errUnsuportedVersion) } if err := IsResourceNameValid(r.ResourceName); err != nil { return &pluginapi.Empty{}, err } - if _, ok := m.Endpoints[r.ResourceName]; ok { - return &pluginapi.Empty{}, - fmt.Errorf(pluginapi.ErrDevicePluginAlreadyExists) - } - + // TODO: for now, always accepts newest device plugin. Later may consider to + // add some policies here, e.g., verify whether an old device plugin with the + // same resource name is still alive to determine whether we want to accept + // the new registration. go m.addEndpoint(r) return &pluginapi.Empty{}, nil } -// Stop is the function that can stop the gRPC server +// Stop is the function that can stop the gRPC server. func (m *ManagerImpl) Stop() error { for _, e := range m.Endpoints { e.stop() } - m.server.Stop() - return nil } func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) { - socketPath := filepath.Join(m.socketdir, r.Endpoint) + // Stops existing endpoint if there is any. + m.mutex.Lock() + old, ok := m.Endpoints[r.ResourceName] + m.mutex.Unlock() + if ok && old != nil { + old.stop() + } + socketPath := filepath.Join(m.socketdir, r.Endpoint) e, err := newEndpoint(socketPath, r.ResourceName, m.callback) if err != nil { glog.Errorf("Failed to dial device plugin with request %v: %v", r, err) @@ -172,22 +206,15 @@ func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) { e.listAndWatch(stream) m.mutex.Lock() - e.mutex.Lock() - - delete(m.Endpoints, r.ResourceName) + if old, ok := m.Endpoints[r.ResourceName]; ok && old == e { + delete(m.Endpoints, r.ResourceName) + } glog.V(2).Infof("Unregistered endpoint %v", e) - - e.mutex.Unlock() m.mutex.Unlock() }() m.mutex.Lock() - e.mutex.Lock() - m.Endpoints[r.ResourceName] = e glog.V(2).Infof("Registered endpoint %v", e) - - e.mutex.Unlock() m.mutex.Unlock() - } diff --git a/pkg/kubelet/deviceplugin/manager_test.go b/pkg/kubelet/deviceplugin/manager_test.go index 2816339887e6a..e524a581fc1c6 100644 --- a/pkg/kubelet/deviceplugin/manager_test.go +++ b/pkg/kubelet/deviceplugin/manager_test.go @@ -49,18 +49,18 @@ func TestNewManagerImplStart(t *testing.T) { require.NoError(t, err) } -func setup(t *testing.T, devs []*pluginapi.Device, pluginSocket, serverSocket string, callback MonitorCallback) (Manager, *MockDevicePlugin) { +func setup(t *testing.T, devs []*pluginapi.Device, pluginSocket, serverSocket string, callback MonitorCallback) (Manager, *Stub) { m, err := NewManagerImpl(serverSocket, callback) require.NoError(t, err) - p := NewMockDevicePlugin(devs, pluginSocket) + p := NewDevicePluginStub(devs, pluginSocket) err = p.Start() require.NoError(t, err) return m, p } -func cleanup(t *testing.T, m Manager, p *MockDevicePlugin) { +func cleanup(t *testing.T, m Manager, p *Stub) { p.Stop() m.Stop() } diff --git a/pkg/kubelet/deviceplugin/types.go b/pkg/kubelet/deviceplugin/types.go index 99bc8c0771cf3..3ce272816bb63 100644 --- a/pkg/kubelet/deviceplugin/types.go +++ b/pkg/kubelet/deviceplugin/types.go @@ -17,58 +17,54 @@ limitations under the License. package deviceplugin import ( - "sync" - - "google.golang.org/grpc" - pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha1" ) -// MonitorCallback is the function called when a device becomes -// unhealthy (or healthy again) -// Updated contains the most recent state of the Device +// MonitorCallback is the function called when a device's health state changes, +// or new devices are reported, or old devices are deleted. +// Updated contains the most recent state of the Device. type MonitorCallback func(resourceName string, added, updated, deleted []*pluginapi.Device) -// Manager manages the Device Plugins running on a machine +// Manager manages all the Device Plugins running on a node. type Manager interface { - // Start starts the gRPC service + // Start starts the gRPC Registration service. Start() error // Devices is the map of devices that have registered themselves // against the manager. - // The map key is the ResourceName of the device plugins + // The map key is the ResourceName of the device plugins. Devices() map[string][]*pluginapi.Device - // Allocate is calls the gRPC Allocate on the device plugin - Allocate(string, []*pluginapi.Device) (*pluginapi.AllocateResponse, error) + // Allocate takes resourceName and list of device Ids, and calls the + // gRPC Allocate on the device plugin matching the resourceName. + Allocate(string, []string) (*pluginapi.AllocateResponse, error) - // Stop stops the manager + // Stop stops the manager. Stop() error } -// ManagerImpl is the structure in charge of managing Device Plugins -type ManagerImpl struct { - socketname string - socketdir string - - Endpoints map[string]*endpoint // Key is ResourceName - mutex sync.Mutex - - callback MonitorCallback - - server *grpc.Server -} - +// TODO: evaluate whether we need these error definitions. const ( - // ErrDevicePluginUnknown is the error raised when the device Plugin returned by Monitor is not know by the Device Plugin manager - ErrDevicePluginUnknown = "Manager does not have device plugin for device:" - // ErrDeviceUnknown is the error raised when the device returned by Monitor is not know by the Device Plugin manager - ErrDeviceUnknown = "Could not find device in it's Device Plugin's Device List:" - // ErrBadSocket is the error raised when the registry socket path is not absolute - ErrBadSocket = "Bad socketPath, must be an absolute path:" - // ErrRemoveSocket is the error raised when the registry could not remove the existing socket - ErrRemoveSocket = "Failed to remove socket while starting device plugin registry, with error" - // ErrListenSocket is the error raised when the registry could not listen on the socket - ErrListenSocket = "Failed to listen to socket while starting device plugin registry, with error" - // ErrListAndWatch is the error raised when ListAndWatch ended unsuccessfully - ErrListAndWatch = "ListAndWatch ended unexpectedly for device plugin %s with error %v" + // errFailedToDialDevicePlugin is the error raised when the device plugin could not be + // reached on the registered socket + errFailedToDialDevicePlugin = "failed to dial device plugin:" + // errUnsuportedVersion is the error raised when the device plugin uses an API version not + // supported by the Kubelet registry + errUnsuportedVersion = "unsupported API version by the Kubelet registry" + // errDevicePluginAlreadyExists is the error raised when a device plugin with the + // same Resource Name tries to register itself + errDevicePluginAlreadyExists = "another device plugin already registered this Resource Name" + // errInvalidResourceName is the error raised when a device plugin is registering + // itself with an invalid ResourceName + errInvalidResourceName = "the ResourceName is invalid" + // errEmptyResourceName is the error raised when the resource name field is empty + errEmptyResourceName = "invalid Empty ResourceName" + + // errBadSocket is the error raised when the registry socket path is not absolute + errBadSocket = "bad socketPath, must be an absolute path:" + // errRemoveSocket is the error raised when the registry could not remove the existing socket + errRemoveSocket = "failed to remove socket while starting device plugin registry, with error" + // errListenSocket is the error raised when the registry could not listen on the socket + errListenSocket = "failed to listen to socket while starting device plugin registry, with error" + // errListAndWatch is the error raised when ListAndWatch ended unsuccessfully + errListAndWatch = "listAndWatch ended unexpectedly for device plugin %s with error %v" ) diff --git a/pkg/kubelet/deviceplugin/utils.go b/pkg/kubelet/deviceplugin/utils.go index 57a6b0a71c6a9..b30ac2f640e65 100644 --- a/pkg/kubelet/deviceplugin/utils.go +++ b/pkg/kubelet/deviceplugin/utils.go @@ -18,13 +18,13 @@ package deviceplugin import ( "fmt" - "strings" + "k8s.io/api/core/v1" + v1helper "k8s.io/kubernetes/pkg/api/v1/helper" pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha1" ) -// CloneDevice clones a pluginapi.Device -func CloneDevice(d *pluginapi.Device) *pluginapi.Device { +func cloneDevice(d *pluginapi.Device) *pluginapi.Device { return &pluginapi.Device{ ID: d.ID, Health: d.Health, @@ -36,41 +36,26 @@ func copyDevices(devs map[string]*pluginapi.Device) []*pluginapi.Device { var clones []*pluginapi.Device for _, d := range devs { - clones = append(clones, CloneDevice(d)) + clones = append(clones, cloneDevice(d)) } return clones } -// GetDevice returns the Device if a boolean signaling if the device was found or not -func GetDevice(d *pluginapi.Device, devs []*pluginapi.Device) (*pluginapi.Device, bool) { - name := DeviceKey(d) - - for _, d := range devs { - if DeviceKey(d) != name { - continue - } - - return d, true - } - - return nil, false -} - -// IsResourceNameValid returns an error if the resource is invalid, +// IsResourceNameValid returns an error if the resource is invalid or is not an +// extended resource name. func IsResourceNameValid(resourceName string) error { if resourceName == "" { - return fmt.Errorf(pluginapi.ErrEmptyResourceName) + return fmt.Errorf(errEmptyResourceName) } - - if strings.ContainsAny(resourceName, pluginapi.InvalidChars) { - return fmt.Errorf(pluginapi.ErrInvalidResourceName) + if !IsDeviceName(v1.ResourceName(resourceName)) { + return fmt.Errorf(errInvalidResourceName) } - return nil } -// DeviceKey returns the Key of a device -func DeviceKey(d *pluginapi.Device) string { - return d.ID +// IsDeviceName returns whether the ResourceName points to an extended resource +// name exported by a device plugin. +func IsDeviceName(k v1.ResourceName) bool { + return v1helper.IsExtendedResourceName(k) } diff --git a/pkg/kubelet/deviceplugin/utils_test.go b/pkg/kubelet/deviceplugin/utils_test.go index cf4a1186647b7..33924605c5c97 100644 --- a/pkg/kubelet/deviceplugin/utils_test.go +++ b/pkg/kubelet/deviceplugin/utils_test.go @@ -25,7 +25,7 @@ import ( ) func TestCloneDevice(t *testing.T) { - d := CloneDevice(&pluginapi.Device{ID: "ADeviceId", Health: pluginapi.Healthy}) + d := cloneDevice(&pluginapi.Device{ID: "ADeviceId", Health: pluginapi.Healthy}) require.Equal(t, d.ID, "ADeviceId") require.Equal(t, d.Health, pluginapi.Healthy) @@ -40,15 +40,14 @@ func TestCopyDevices(t *testing.T) { require.Len(t, devs, 1) } -func TestGetDevice(t *testing.T) { - devs := []*pluginapi.Device{ - {ID: "ADeviceId", Health: pluginapi.Healthy}, - } - - _, ok := GetDevice(&pluginapi.Device{ID: "AnotherDeviceId"}, devs) - require.False(t, ok) - - d, ok := GetDevice(&pluginapi.Device{ID: "ADeviceId"}, devs) - require.True(t, ok) - require.Equal(t, d, devs[0]) +func TestIsResourceName(t *testing.T) { + require.NotNil(t, IsResourceNameValid("")) + require.NotNil(t, IsResourceNameValid("cpu")) + require.NotNil(t, IsResourceNameValid("name1")) + require.NotNil(t, IsResourceNameValid("alpha.kubernetes.io/name1")) + require.NotNil(t, IsResourceNameValid("beta.kubernetes.io/name1")) + require.NotNil(t, IsResourceNameValid("kubernetes.io/name1")) + require.Nil(t, IsResourceNameValid("domain1.io/name1")) + require.Nil(t, IsResourceNameValid("alpha.domain1.io/name1")) + require.Nil(t, IsResourceNameValid("beta.domain1.io/name1")) } diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index b2ede83cc9a00..9edfabf400c23 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -754,15 +754,6 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, return nil, err } - if utilfeature.DefaultFeatureGate.Enabled(features.DevicePlugins) { - devicePluginHdlr, err := cm.NewDevicePluginHandler() - if err != nil { - return nil, err - } - - klet.containerManager.SetDevicePluginHandler(devicePluginHdlr) - - } // If the experimentalMounterPathFlag is set, we do not want to // check node capabilities since the mount path is not the default if len(kubeCfg.ExperimentalMounterPath) != 0 { diff --git a/pkg/kubelet/kubelet_node_status.go b/pkg/kubelet/kubelet_node_status.go index 146f6e36de3eb..4cb35ba52a122 100644 --- a/pkg/kubelet/kubelet_node_status.go +++ b/pkg/kubelet/kubelet_node_status.go @@ -39,7 +39,6 @@ import ( "k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/features" kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" - pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha1" "k8s.io/kubernetes/pkg/kubelet/cadvisor" "k8s.io/kubernetes/pkg/kubelet/events" "k8s.io/kubernetes/pkg/kubelet/util" @@ -596,6 +595,15 @@ func (kl *Kubelet) setNodeStatusMachineInfo(node *v1.Node) { node.Status.Capacity[v1.ResourceEphemeralStorage] = initialCapacity[v1.ResourceEphemeralStorage] } } + + initialCapacity := kl.containerManager.GetCapacity() + if initialCapacity != nil { + for k, v := range initialCapacity { + if v1helper.IsExtendedResourceName(k) { + node.Status.Capacity[k] = v + } + } + } } // Set Allocatable. @@ -622,27 +630,6 @@ func (kl *Kubelet) setNodeStatusMachineInfo(node *v1.Node) { } node.Status.Allocatable[k] = value } - - hdlr := kl.containerManager.GetDevicePluginHandler() - if hdlr == nil { - return - } - - for k, v := range hdlr.Devices() { - key := v1.ResourceName(v1.ResourceOpaqueIntPrefix + k) - - var n int64 - n = 0 - - for _, d := range v { - if d.Health == pluginapi.Unhealthy { - continue - } - n++ - } - - node.Status.Capacity[key] = *resource.NewQuantity(n, resource.DecimalSI) - } } // Set versioninfo for the node. diff --git a/pkg/kubelet/kubelet_pods.go b/pkg/kubelet/kubelet_pods.go index 8720902a1fa88..4ffdc0e8c32d9 100644 --- a/pkg/kubelet/kubelet_pods.go +++ b/pkg/kubelet/kubelet_pods.go @@ -352,8 +352,13 @@ func (kl *Kubelet) GetPodCgroupParent(pod *v1.Pod) string { // the container runtime to set parameters for launching a container. func (kl *Kubelet) GenerateRunContainerOptions(pod *v1.Pod, container *v1.Container, podIP string) (*kubecontainer.RunContainerOptions, bool, error) { useClusterFirstPolicy := false + opts, err := kl.containerManager.GetResources(pod, container, kl.GetActivePods()) + if err != nil { + return nil, false, err + } + cgroupParent := kl.GetPodCgroupParent(pod) - opts := &kubecontainer.RunContainerOptions{CgroupParent: cgroupParent} + opts.CgroupParent = cgroupParent hostname, hostDomainName, err := kl.GeneratePodHostNameAndDomain(pod) if err != nil { return nil, false, err @@ -364,19 +369,23 @@ func (kl *Kubelet) GenerateRunContainerOptions(pod *v1.Pod, container *v1.Contai opts.PortMappings = kubecontainer.MakePortMappings(container) // TODO(random-liu): Move following convert functions into pkg/kubelet/container - opts.Devices, err = kl.makeDevices(pod, container) + devices, err := kl.makeDevices(pod, container) if err != nil { return nil, false, err } + opts.Devices = append(opts.Devices, devices...) - opts.Mounts, err = makeMounts(pod, kl.getPodDir(pod.UID), container, hostname, hostDomainName, podIP, volumes) + mounts, err := makeMounts(pod, kl.getPodDir(pod.UID), container, hostname, hostDomainName, podIP, volumes) if err != nil { return nil, false, err } - opts.Envs, err = kl.makeEnvironmentVariables(pod, container, podIP) + opts.Mounts = append(opts.Mounts, mounts...) + + envs, err := kl.makeEnvironmentVariables(pod, container, podIP) if err != nil { return nil, false, err } + opts.Envs = append(opts.Envs, envs...) // Disabling adding TerminationMessagePath on Windows as these files would be mounted as docker volume and // Docker for Windows has a bug where only directories can be mounted