Skip to content

Commit

Permalink
Merge pull request kubernetes#14542 from vishh/kubelet-refactor
Browse files Browse the repository at this point in the history
Auto commit by PR queue bot
  • Loading branch information
k8s-merge-robot committed Nov 12, 2015
2 parents 54e6db0 + b177053 commit e88593d
Show file tree
Hide file tree
Showing 19 changed files with 389 additions and 212 deletions.
9 changes: 6 additions & 3 deletions cmd/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
replicationcontroller "k8s.io/kubernetes/pkg/controller/replication"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/kubelet/cm"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/dockertools"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
Expand Down Expand Up @@ -216,7 +217,7 @@ func startComponents(firstManifestURL, secondManifestURL string) (string, string
configFilePath := integration.MakeTempDirOrDie("config", testRootDir)
glog.Infof("Using %s as root dir for kubelet #1", testRootDir)
fakeDocker1.VersionInfo = docker.Env{"ApiVersion=1.20"}

cm := cm.NewStubContainerManager()
kcfg := kubeletapp.SimpleKubelet(
cl,
&fakeDocker1,
Expand All @@ -238,7 +239,8 @@ func startComponents(firstManifestURL, secondManifestURL string) (string, string
10*time.Second, /* MinimumGCAge */
3*time.Second, /* NodeStatusUpdateFrequency */
10*time.Second, /* SyncFrequency */
40 /* MaxPods */)
40, /* MaxPods */
cm)

kubeletapp.RunKubelet(kcfg)
// Kubelet (machine)
Expand Down Expand Up @@ -270,7 +272,8 @@ func startComponents(firstManifestURL, secondManifestURL string) (string, string
3*time.Second, /* NodeStatusUpdateFrequency */
10*time.Second, /* SyncFrequency */

40 /* MaxPods */)
40, /* MaxPods */
cm)

kubeletapp.RunKubelet(kcfg)
return apiServer.URL, configFilePath
Expand Down
20 changes: 16 additions & 4 deletions cmd/kubelet/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"k8s.io/kubernetes/pkg/healthz"
"k8s.io/kubernetes/pkg/kubelet"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/kubelet/cm"
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/dockertools"
Expand Down Expand Up @@ -409,6 +410,7 @@ func (s *KubeletServer) UnsecuredKubeletConfig() (*KubeletConfig, error) {
ClusterDomain: s.ClusterDomain,
ConfigFile: s.Config,
ConfigureCBR0: s.ConfigureCBR0,
ContainerManager: nil,
ContainerRuntime: s.ContainerRuntime,
CPUCFSQuota: s.CPUCFSQuota,
DiskSpacePolicy: diskSpacePolicy,
Expand Down Expand Up @@ -474,6 +476,7 @@ func (s *KubeletServer) UnsecuredKubeletConfig() (*KubeletConfig, error) {
// Otherwise, the caller is assumed to have set up the KubeletConfig object and all defaults
// will be ignored.
func (s *KubeletServer) Run(kcfg *KubeletConfig) error {
var err error
if kcfg == nil {
cfg, err := s.UnsecuredKubeletConfig()
if err != nil {
Expand All @@ -498,11 +501,17 @@ func (s *KubeletServer) Run(kcfg *KubeletConfig) error {
}

if kcfg.CAdvisorInterface == nil {
ca, err := cadvisor.New(s.CAdvisorPort)
kcfg.CAdvisorInterface, err = cadvisor.New(s.CAdvisorPort)
if err != nil {
return err
}
}

if kcfg.ContainerManager == nil {
kcfg.ContainerManager, err = cm.NewContainerManager(kcfg.Mounter, kcfg.CAdvisorInterface)
if err != nil {
return err
}
kcfg.CAdvisorInterface = ca
}

util.ReallyCrash = s.ReallyCrashForTesting
Expand Down Expand Up @@ -670,7 +679,7 @@ func SimpleKubelet(client *client.Client,
osInterface kubecontainer.OSInterface,
fileCheckFrequency, httpCheckFrequency, minimumGCAge, nodeStatusUpdateFrequency, syncFrequency time.Duration,
maxPods int,
) *KubeletConfig {
containerManager cm.ContainerManager) *KubeletConfig {
imageGCPolicy := kubelet.ImageGCPolicy{
HighThresholdPercent: 90,
LowThresholdPercent: 80,
Expand All @@ -686,6 +695,7 @@ func SimpleKubelet(client *client.Client,
CgroupRoot: "",
Cloud: cloud,
ConfigFile: configFilePath,
ContainerManager: containerManager,
ContainerRuntime: "docker",
CPUCFSQuota: false,
DiskSpacePolicy: diskSpacePolicy,
Expand Down Expand Up @@ -724,8 +734,8 @@ func SimpleKubelet(client *client.Client,
SyncFrequency: syncFrequency,
SystemContainer: "",
TLSOptions: tlsOptions,
Writer: &io.StdWriter{},
VolumePlugins: volumePlugins,
Writer: &io.StdWriter{},
}
return &kcfg
}
Expand Down Expand Up @@ -864,6 +874,7 @@ type KubeletConfig struct {
ClusterDomain string
ConfigFile string
ConfigureCBR0 bool
ContainerManager cm.ContainerManager
ContainerRuntime string
CPUCFSQuota bool
DiskSpacePolicy kubelet.DiskSpacePolicy
Expand Down Expand Up @@ -1004,6 +1015,7 @@ func CreateAndInitKubelet(kc *KubeletConfig) (k KubeletBootstrap, pc *config.Pod
daemonEndpoints,
kc.OOMAdjuster,
kc.SerializeImagePulls,
kc.ContainerManager,
)

if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions cmd/kubemark/hollow-node.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/kubelet/cm"
"k8s.io/kubernetes/pkg/kubelet/dockertools"
"k8s.io/kubernetes/pkg/kubemark"
proxyconfig "k8s.io/kubernetes/pkg/proxy/config"
Expand Down Expand Up @@ -93,6 +94,7 @@ func main() {

if config.Morph == "kubelet" {
cadvisorInterface := new(cadvisor.Fake)
containerManager := cm.NewStubContainerManager()

fakeDockerClient := &dockertools.FakeDockerClient{}
fakeDockerClient.VersionInfo = docker.Env{"ApiVersion=1.18"}
Expand All @@ -106,6 +108,7 @@ func main() {
fakeDockerClient,
config.KubeletPort,
config.KubeletReadOnlyPort,
containerManager,
)
hollowKubelet.Run()
}
Expand Down
7 changes: 6 additions & 1 deletion contrib/mesos/pkg/executor/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/kubelet"
"k8s.io/kubernetes/pkg/kubelet/cm"
kconfig "k8s.io/kubernetes/pkg/kubelet/config"
"k8s.io/kubernetes/pkg/kubelet/dockertools"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
Expand Down Expand Up @@ -151,7 +152,6 @@ func (s *KubeletExecutorServer) runKubelet(execUpdates <-chan kubetypes.PodUpdat
if err != nil {
return k, pc, err
}

klet := k.(*kubelet.Kubelet)

s.kletLock.Lock()
Expand Down Expand Up @@ -187,6 +187,11 @@ func (s *KubeletExecutorServer) runKubelet(execUpdates <-chan kubetypes.PodUpdat
return err
}
kcfg.CAdvisorInterface = cAdvisorInterface
kcfg.ContainerManager, err = cm.NewContainerManager(kcfg.Mounter, cAdvisorInterface)
if err != nil {
return err
}

go func() {
for ni := range nodeInfos {
// TODO(sttts): implement with MachineAllocable mechanism when https://github.com/kubernetes/kubernetes/issues/13984 is finished
Expand Down
2 changes: 1 addition & 1 deletion pkg/kubelet/cadvisor/cadvisor_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ var _ Interface = new(Mock)

func (c *Mock) Start() error {
args := c.Called()
return args.Error(1)
return args.Error(0)
}

// ContainerInfo is a mock implementation of Interface.ContainerInfo.
Expand Down
35 changes: 35 additions & 0 deletions pkg/kubelet/cadvisor/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package cadvisor

import (
cadvisorApi "github.com/google/cadvisor/info/v1"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
)

func CapacityFromMachineInfo(info *cadvisorApi.MachineInfo) api.ResourceList {
c := api.ResourceList{
api.ResourceCPU: *resource.NewMilliQuantity(
int64(info.NumCores*1000),
resource.DecimalSI),
api.ResourceMemory: *resource.NewQuantity(
info.MemoryCapacity,
resource.BinarySI),
}
return c
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,26 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package kubelet
package cm

import (
"k8s.io/kubernetes/pkg/api"
)

// Manages the containers running on a machine.
type containerManager interface {
type ContainerManager interface {
// Runs the container manager's housekeeping.
// - Ensures that the Docker daemon is in a container.
// - Creates the system container where all non-containerized processes run.
Start() error
Start(NodeConfig) error

// Returns resources allocated to system containers in the machine.
// These containers include the system and Kubernetes services.
SystemContainersLimit() api.ResourceList
}

type NodeConfig struct {
DockerDaemonContainerName string
SystemContainerName string
KubeletContainerName string
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package kubelet
package cm

import (
"fmt"
Expand Down Expand Up @@ -73,21 +73,15 @@ func newSystemContainer(containerName string) *systemContainer {
}
}

type nodeConfig struct {
dockerDaemonContainerName string
systemContainerName string
kubeletContainerName string
}

type containerManagerImpl struct {
cadvisorInterface cadvisor.Interface
mountUtil mount.Interface
nodeConfig
NodeConfig
// External containers being managed.
systemContainers []*systemContainer
}

var _ containerManager = &containerManagerImpl{}
var _ ContainerManager = &containerManagerImpl{}

// checks if the required cgroups subsystems are mounted.
// As of now, only 'cpu' and 'memory' are required.
Expand Down Expand Up @@ -120,15 +114,11 @@ func validateSystemRequirements(mountUtil mount.Interface) error {
// TODO(vmarmol): Add limits to the system containers.
// Takes the absolute name of the specified containers.
// Empty container name disables use of the specified container.
func newContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.Interface, dockerDaemonContainerName, systemContainerName, kubeletContainerName string) (containerManager, error) {
func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.Interface) (ContainerManager, error) {
return &containerManagerImpl{
cadvisorInterface: cadvisorInterface,
mountUtil: mountUtil,
nodeConfig: nodeConfig{
dockerDaemonContainerName: dockerDaemonContainerName,
systemContainerName: systemContainerName,
kubeletContainerName: kubeletContainerName,
},
NodeConfig: NodeConfig{},
}, nil
}

Expand Down Expand Up @@ -197,26 +187,26 @@ func (cm *containerManagerImpl) setupNode() error {
}

systemContainers := []*systemContainer{}
if cm.dockerDaemonContainerName != "" {
cont := newSystemContainer(cm.dockerDaemonContainerName)
if cm.DockerDaemonContainerName != "" {
cont := newSystemContainer(cm.DockerDaemonContainerName)

info, err := cm.cadvisorInterface.MachineInfo()
var capacity = api.ResourceList{}
if err != nil {
} else {
capacity = CapacityFromMachineInfo(info)
capacity = cadvisor.CapacityFromMachineInfo(info)
}
memoryLimit := (int64(capacity.Memory().Value() * DockerMemoryLimitThresholdPercent / 100))
if memoryLimit < MinDockerMemoryLimit {
glog.Warningf("Memory limit %d for container %s is too small, reset it to %d", memoryLimit, cm.dockerDaemonContainerName, MinDockerMemoryLimit)
glog.Warningf("Memory limit %d for container %s is too small, reset it to %d", memoryLimit, cm.DockerDaemonContainerName, MinDockerMemoryLimit)
memoryLimit = MinDockerMemoryLimit
}

glog.V(2).Infof("Configure resource-only container %s with memory limit: %d", cm.dockerDaemonContainerName, memoryLimit)
glog.V(2).Infof("Configure resource-only container %s with memory limit: %d", cm.DockerDaemonContainerName, memoryLimit)

dockerContainer := &fs.Manager{
Cgroups: &configs.Cgroup{
Name: cm.dockerDaemonContainerName,
Name: cm.DockerDaemonContainerName,
Memory: memoryLimit,
MemorySwap: -1,
AllowAllDevices: true,
Expand All @@ -228,8 +218,8 @@ func (cm *containerManagerImpl) setupNode() error {
systemContainers = append(systemContainers, cont)
}

if cm.systemContainerName != "" {
if cm.systemContainerName == "/" {
if cm.SystemContainerName != "" {
if cm.SystemContainerName == "/" {
return fmt.Errorf("system container cannot be root (\"/\")")
}

Expand All @@ -238,23 +228,25 @@ func (cm *containerManagerImpl) setupNode() error {
Name: "/",
},
}
manager := createManager(cm.systemContainerName)
manager := createManager(cm.SystemContainerName)

err := ensureSystemContainer(rootContainer, manager)
if err != nil {
return err
}
systemContainers = append(systemContainers, newSystemContainer(cm.systemContainerName))
systemContainers = append(systemContainers, newSystemContainer(cm.SystemContainerName))
}

if cm.kubeletContainerName != "" {
systemContainers = append(systemContainers, newSystemContainer(cm.kubeletContainerName))
if cm.KubeletContainerName != "" {
systemContainers = append(systemContainers, newSystemContainer(cm.KubeletContainerName))
}
cm.systemContainers = systemContainers
return nil
}

func (cm *containerManagerImpl) Start() error {
func (cm *containerManagerImpl) Start(nodeConfig NodeConfig) error {
cm.NodeConfig = nodeConfig

// Setup the node
if err := cm.setupNode(); err != nil {
return err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package kubelet
package cm

import (
"fmt"
Expand Down
Loading

0 comments on commit e88593d

Please sign in to comment.