Skip to content

Commit

Permalink
service: initialize EventClient
Browse files Browse the repository at this point in the history
Kubelet events are not being tracked. This commit fixes it by
initializing the event client.

fixes d2iq-archive/kubernetes-mesos#680
  • Loading branch information
Sergiusz Urbaniak committed Dec 9, 2015
1 parent 78c31b6 commit 9176e93
Showing 1 changed file with 87 additions and 66 deletions.
153 changes: 87 additions & 66 deletions contrib/mesos/pkg/executor/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,83 +147,104 @@ func (s *KubeletExecutorServer) runKubelet(
staticPodsConfigPath string,
apiclient *client.Client,
podLW *cache.ListWatch,
) error {
) (err error) {
defer func() {
if err != nil {
// close the channel here. When Run returns without error, the executorKubelet is
// responsible to do this. If it returns with an error, we are responsible here.
close(kubeletDone)
}
}()

kcfg, err := s.UnsecuredKubeletConfig()
if err == nil {
// apply Mesos specific settings
executorDone := make(chan struct{})
kcfg.Builder = func(kc *kubeletapp.KubeletConfig) (kubeletapp.KubeletBootstrap, *kconfig.PodConfig, error) {
k, pc, err := kubeletapp.CreateAndInitKubelet(kc)
if err != nil {
return k, pc, err
}
if err != nil {
return err
}

s.klet = k.(*kubelet.Kubelet)
close(s.kletReady) // intentionally crash if this is called more than once
// apply Mesos specific settings
executorDone := make(chan struct{})
kcfg.Builder = func(kc *kubeletapp.KubeletConfig) (kubeletapp.KubeletBootstrap, *kconfig.PodConfig, error) {
k, pc, err := kubeletapp.CreateAndInitKubelet(kc)
if err != nil {
return k, pc, err
}

// decorate kubelet such that it shuts down when the executor is
decorated := &executorKubelet{
Kubelet: s.klet,
kubeletDone: kubeletDone,
executorDone: executorDone,
}
s.klet = k.(*kubelet.Kubelet)
close(s.kletReady) // intentionally crash if this is called more than once

return decorated, pc, nil
}
kcfg.DockerDaemonContainer = "" // don't move the docker daemon into a cgroup
kcfg.Hostname = kcfg.HostnameOverride
kcfg.KubeClient = apiclient
kcfg.NodeName = kcfg.HostnameOverride
kcfg.PodConfig = kconfig.NewPodConfig(kconfig.PodConfigNotificationIncremental, kcfg.Recorder) // override the default pod source
kcfg.StandaloneMode = false
kcfg.SystemContainer = "" // don't take control over other system processes.
if kcfg.Cloud != nil {
// fail early and hard because having the cloud provider loaded would go unnoticed,
// but break bigger cluster because accessing the state.json from every slave kills the master.
panic("cloud provider must not be set")
// decorate kubelet such that it shuts down when the executor is
decorated := &executorKubelet{
Kubelet: s.klet,
kubeletDone: kubeletDone,
executorDone: executorDone,
}

// create custom cAdvisor interface which return the resource values that Mesos reports
ni := <-nodeInfos
cAdvisorInterface, err := NewMesosCadvisor(ni.Cores, ni.Mem, s.CAdvisorPort)
if err != nil {
return err
}
kcfg.CAdvisorInterface = cAdvisorInterface
kcfg.ContainerManager, err = cm.NewContainerManager(kcfg.Mounter, cAdvisorInterface)
if err != nil {
return err
}
return decorated, pc, nil
}
kcfg.DockerDaemonContainer = "" // don't move the docker daemon into a cgroup
kcfg.Hostname = kcfg.HostnameOverride
kcfg.KubeClient = apiclient

go func() {
for ni := range nodeInfos {
// TODO(sttts): implement with MachineAllocable mechanism when https://github.com/kubernetes/kubernetes/issues/13984 is finished
log.V(3).Infof("ignoring updated node resources: %v", ni)
}
}()

// create main pod source, it will close executorDone when the executor updates stop flowing
newSourceMesos(executorDone, execUpdates, kcfg.PodConfig.Channel(mesosSource), podLW)

// create static-pods directory file source
log.V(2).Infof("initializing static pods source factory, configured at path %q", staticPodsConfigPath)
fileSourceUpdates := kcfg.PodConfig.Channel(kubetypes.FileSource)
kconfig.NewSourceFile(staticPodsConfigPath, kcfg.HostnameOverride, kcfg.FileCheckFrequency, fileSourceUpdates)

// run the kubelet, until execUpdates is closed
// NOTE: because kcfg != nil holds, the upstream Run function will not
// initialize the cloud provider. We explicitly wouldn't want
// that because then every kubelet instance would query the master
// state.json which does not scale.
err = s.KubeletServer.Run(kcfg)
// taken from KubeletServer#Run(*KubeletConfig)
eventClientConfig, err := s.CreateAPIServerClientConfig()
if err != nil {
return err
}

// make a separate client for events
eventClientConfig.QPS = s.EventRecordQPS
eventClientConfig.Burst = s.EventBurst
kcfg.EventClient, err = client.New(eventClientConfig)
if err != nil {
// close the channel here. When Run returns without error, the executorKubelet is
// responsible to do this. If it returns with an error, we are responsible here.
close(kubeletDone)
return err
}
return err

kcfg.NodeName = kcfg.HostnameOverride
kcfg.PodConfig = kconfig.NewPodConfig(kconfig.PodConfigNotificationIncremental, kcfg.Recorder) // override the default pod source
kcfg.StandaloneMode = false
kcfg.SystemContainer = "" // don't take control over other system processes.
if kcfg.Cloud != nil {
// fail early and hard because having the cloud provider loaded would go unnoticed,
// but break bigger cluster because accessing the state.json from every slave kills the master.
panic("cloud provider must not be set")
}

// create custom cAdvisor interface which return the resource values that Mesos reports
ni := <-nodeInfos
cAdvisorInterface, err := NewMesosCadvisor(ni.Cores, ni.Mem, s.CAdvisorPort)
if err != nil {
return err
}

kcfg.CAdvisorInterface = cAdvisorInterface
kcfg.ContainerManager, err = cm.NewContainerManager(kcfg.Mounter, cAdvisorInterface)
if err != nil {
return err
}

go func() {
for ni := range nodeInfos {
// TODO(sttts): implement with MachineAllocable mechanism when https://github.com/kubernetes/kubernetes/issues/13984 is finished
log.V(3).Infof("ignoring updated node resources: %v", ni)
}
}()

// create main pod source, it will close executorDone when the executor updates stop flowing
newSourceMesos(executorDone, execUpdates, kcfg.PodConfig.Channel(mesosSource), podLW)

// create static-pods directory file source
log.V(2).Infof("initializing static pods source factory, configured at path %q", staticPodsConfigPath)
fileSourceUpdates := kcfg.PodConfig.Channel(kubetypes.FileSource)
kconfig.NewSourceFile(staticPodsConfigPath, kcfg.HostnameOverride, kcfg.FileCheckFrequency, fileSourceUpdates)

// run the kubelet, until execUpdates is closed
// NOTE: because kcfg != nil holds, the upstream Run function will not
// initialize the cloud provider. We explicitly wouldn't want
// that because then every kubelet instance would query the master
// state.json which does not scale.
err = s.KubeletServer.Run(kcfg)

return
}

// Run runs the specified KubeletExecutorServer.
Expand Down

0 comments on commit 9176e93

Please sign in to comment.