Skip to content

Commit

Permalink
Merge pull request kubernetes#5680 from GoogleCloudPlatform/revert-56…
Browse files Browse the repository at this point in the history
…78-revert-5069-network_hooks

Revert "Revert "[WIP] southbound networking hooks in kubelet""
  • Loading branch information
cjcullen committed Mar 19, 2015
2 parents 1dd4600 + 7ddcecf commit bec527f
Show file tree
Hide file tree
Showing 11 changed files with 606 additions and 0 deletions.
13 changes: 13 additions & 0 deletions cmd/kubelet/app/plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ package app
import (
// Credential providers
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/credentialprovider/gcp"
// Network plugins
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/network"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/network/exec"
// Volume plugins
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume/empty_dir"
Expand All @@ -44,3 +47,13 @@ func ProbeVolumePlugins() []volume.Plugin {

return allPlugins
}

// ProbeNetworkPlugins collects all compiled-in plugins
func ProbeNetworkPlugins() []network.NetworkPlugin {
allPlugins := []network.NetworkPlugin{}

// for each existing plugin, add to the list
allPlugins = append(allPlugins, exec.ProbeNetworkPlugins()...)

return allPlugins
}
10 changes: 10 additions & 0 deletions cmd/kubelet/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/cadvisor"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/config"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/network"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume"
"github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
Expand Down Expand Up @@ -79,6 +80,7 @@ type KubeletServer struct {
StreamingConnectionIdleTimeout time.Duration
ImageGCHighThresholdPercent int
ImageGCLowThresholdPercent int
NetworkPluginName string
}

// NewKubeletServer will create a new KubeletServer with default values.
Expand All @@ -104,6 +106,7 @@ func NewKubeletServer() *KubeletServer {
MasterServiceNamespace: api.NamespaceDefault,
ImageGCHighThresholdPercent: 90,
ImageGCLowThresholdPercent: 80,
NetworkPluginName: "",
}
}

Expand Down Expand Up @@ -142,6 +145,7 @@ func (s *KubeletServer) AddFlags(fs *pflag.FlagSet) {
fs.DurationVar(&s.StreamingConnectionIdleTimeout, "streaming_connection_idle_timeout", 0, "Maximum time a streaming connection can be idle before the connection is automatically closed. Example: '5m'")
fs.IntVar(&s.ImageGCHighThresholdPercent, "image_gc_high_threshold", s.ImageGCHighThresholdPercent, "The percent of disk usage after which image garbage collection is always run. Default: 90%%")
fs.IntVar(&s.ImageGCLowThresholdPercent, "image_gc_low_threshold", s.ImageGCLowThresholdPercent, "The percent of disk usage before which image garbage collection is never run. Lowest disk usage to garbage collect to. Default: 80%%")
fs.StringVar(&s.NetworkPluginName, "network_plugin", s.NetworkPluginName, "<Warning: Alpha feature> The name of the network plugin to be invoked for various events in kubelet/pod lifecycle")
}

// Run runs the specified KubeletServer. This should never exit.
Expand Down Expand Up @@ -200,6 +204,8 @@ func (s *KubeletServer) Run(_ []string) error {
KubeClient: client,
MasterServiceNamespace: s.MasterServiceNamespace,
VolumePlugins: ProbeVolumePlugins(),
NetworkPlugins: ProbeNetworkPlugins(),
NetworkPluginName: s.NetworkPluginName,
StreamingConnectionIdleTimeout: s.StreamingConnectionIdleTimeout,
ImageGCPolicy: imageGCPolicy,
}
Expand Down Expand Up @@ -397,6 +403,8 @@ type KubeletConfig struct {
Runonce bool
MasterServiceNamespace string
VolumePlugins []volume.Plugin
NetworkPlugins []network.NetworkPlugin
NetworkPluginName string
StreamingConnectionIdleTimeout time.Duration
Recorder record.EventRecorder
TLSOptions *kubelet.TLSOptions
Expand Down Expand Up @@ -437,6 +445,8 @@ func createAndInitKubelet(kc *KubeletConfig, pc *config.PodConfig) (*kubelet.Kub
net.IP(kc.ClusterDNS),
kc.MasterServiceNamespace,
kc.VolumePlugins,
kc.NetworkPlugins,
kc.NetworkPluginName,
kc.StreamingConnectionIdleTimeout,
kc.Recorder,
kc.CadvisorInterface,
Expand Down
25 changes: 25 additions & 0 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/envvars"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/metrics"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/network"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/probe"
Expand Down Expand Up @@ -118,6 +119,8 @@ func NewMainKubelet(
clusterDNS net.IP,
masterServiceNamespace string,
volumePlugins []volume.Plugin,
networkPlugins []network.NetworkPlugin,
networkPluginName string,
streamingConnectionIdleTimeout time.Duration,
recorder record.EventRecorder,
cadvisorInterface cadvisor.Interface,
Expand Down Expand Up @@ -220,6 +223,12 @@ func NewMainKubelet(
return nil, err
}

if plug, err := network.InitNetworkPlugin(networkPlugins, networkPluginName, &networkHost{klet}); err != nil {
return nil, err
} else {
klet.networkPlugin = plug
}

klet.podStatuses = make(map[string]api.PodStatus)

klet.mirrorManager = newBasicMirrorManager(klet.kubeClient)
Expand Down Expand Up @@ -301,6 +310,9 @@ type Kubelet struct {
// Volume plugins.
volumePluginMgr volume.PluginMgr

// Network plugin
networkPlugin network.NetworkPlugin

// probe runner holder
prober probeHolder
// container readiness state holder
Expand Down Expand Up @@ -1357,6 +1369,11 @@ func (kl *Kubelet) syncPod(pod *api.Pod, hasMirrorPod bool, containersInPod dock
}
glog.Infof("Creating pod infra container for %q", podFullName)
podInfraContainerID, err = kl.createPodInfraContainer(pod)

// Call the networking plugin
if err == nil {
err = kl.networkPlugin.SetUpPod(pod.Namespace, pod.Name, podInfraContainerID)
}
if err != nil {
glog.Errorf("Failed to create pod infra container: %v; Skipping pod %q", err, podFullName)
return err
Expand Down Expand Up @@ -1548,6 +1565,14 @@ func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metric
pc := podContainer{podFullName, uid, containerName}
_, ok := desiredContainers[pc]
if err != nil || !ok {
// call the networking plugin for teardown
if containerName == dockertools.PodInfraContainerName {
name, namespace, _ := ParsePodFullName(podFullName)
err := kl.networkPlugin.TearDownPod(namespace, name, dockertools.DockerID(dockerContainers[ix].ID))
if err != nil {
glog.Errorf("Network plugin pre-delete method returned an error: %v", err)
}
}
glog.V(1).Infof("Killing unwanted container %+v", pc)
err = kl.killContainer(dockerContainers[ix])
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions pkg/kubelet/kubelet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/cadvisor"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/metrics"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/network"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume"
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume/host_path"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
Expand Down Expand Up @@ -73,6 +74,7 @@ func newTestKubelet(t *testing.T) *TestKubelet {
kubelet.kubeClient = fakeKubeClient
kubelet.dockerPuller = &dockertools.FakeDockerPuller{}
kubelet.hostname = "testnode"
kubelet.networkPlugin, _ = network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil))
if tempDir, err := ioutil.TempDir("/tmp", "kubelet_test."); err != nil {
t.Fatalf("can't make a temp rootdir: %v", err)
} else {
Expand Down
140 changes: 140 additions & 0 deletions pkg/kubelet/network/exec/exec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// Package exec scans and loads networking plugins that are installed
// under /usr/libexec/kubernetes/kubelet-plugins/net/exec/
// The layout convention for a plugin is:
// plugin-name/ (plugins have to be directories first)
// plugin-name/plugin-name (executable that will be called out, see Vendoring Note for more nuances)
// plugin-name/<other-files>
// where, 'executable' has the following requirements:
// - should have exec permissions
// - should give non-zero exit code on failure, and zero on sucess
// - the arguments will be <action> <pod_namespace> <pod_name> <docker_id_of_infra_container>
// whereupon, <action> will be one of:
// - init, called when the kubelet loads the plugin
// - setup, called after the infra container of a pod is
// created, but before other containers of the pod are created
// - teardown, called before the pod infra container is killed
// As the executables are called, the file-descriptors stdin, stdout, stderr
// remain open. The combined output of stdout/stderr is captured and logged.
//
// Note: If the pod infra container self-terminates (e.g. crashes or is killed),
// the entire pod lifecycle will be restarted, but teardown will not be called.
//
// Vendoring Note:
// Plugin Names can be vendored also. Use '~' as the escaped name for plugin directories.
// And expect command line argument to call vendored plugins as 'vendor/pluginName'
// e.g. pluginName = mysdn
// vendorname = mycompany
// then, plugin layout should be
// mycompany~mysdn/
// mycompany~mysdn/mysdn (this becomes the executable)
// mycompany~mysdn/<other-files>
// and, call the kubelet with '--network-plugin=mycompany/mysdn'
package exec

import (
"errors"
"fmt"
"io/ioutil"
"path"
"strings"
"syscall"

"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/network"
utilexec "github.com/GoogleCloudPlatform/kubernetes/pkg/util/exec"
"github.com/golang/glog"
)

type execNetworkPlugin struct {
execName string
execPath string
host network.Host
}

const (
initCmd = "init"
setUpCmd = "setup"
tearDownCmd = "teardown"
execDir = "/usr/libexec/kubernetes/kubelet-plugins/net/exec/"
X_OK = 0x1
)

func ProbeNetworkPlugins() []network.NetworkPlugin {
return probeNetworkPluginsWithExecDir(execDir)
}

func probeNetworkPluginsWithExecDir(pluginDir string) []network.NetworkPlugin {
execPlugins := []network.NetworkPlugin{}

files, _ := ioutil.ReadDir(pluginDir)
for _, f := range files {
// only directories are counted as plugins
// and pluginDir/dirname/dirname should be an executable
// unless dirname contains '~' for escaping namespace
// e.g. dirname = vendor~ipvlan
// then, executable will be pluginDir/dirname/ipvlan
if f.IsDir() {
execPath := path.Join(pluginDir, f.Name())
execPlugins = append(execPlugins, &execNetworkPlugin{execName: network.UnescapePluginName(f.Name()), execPath: execPath})
}
}
return execPlugins
}

func (plugin *execNetworkPlugin) Init(host network.Host) error {
err := plugin.validate()
if err != nil {
return err
}
plugin.host = host
// call the init script
out, err := utilexec.New().Command(plugin.getExecutable(), initCmd).CombinedOutput()
glog.V(5).Infof("Init 'exec' network plugin output: %s, %v", string(out), err)
return err
}

func (plugin *execNetworkPlugin) getExecutable() string {
parts := strings.Split(plugin.execName, "/")
execName := parts[len(parts)-1]
return path.Join(plugin.execPath, execName)
}

func (plugin *execNetworkPlugin) Name() string {
return plugin.execName
}

func (plugin *execNetworkPlugin) validate() error {
if syscall.Access(plugin.getExecutable(), X_OK) != nil {
errStr := fmt.Sprintf("Invalid exec plugin. Executable '%s' does not have correct permissions.", plugin.execName)
return errors.New(errStr)
}
return nil
}

func (plugin *execNetworkPlugin) SetUpPod(namespace string, name string, id dockertools.DockerID) error {
out, err := utilexec.New().Command(plugin.getExecutable(), setUpCmd, namespace, name, string(id)).CombinedOutput()
glog.V(5).Infof("SetUpPod 'exec' network plugin output: %s, %v", string(out), err)
return err
}

func (plugin *execNetworkPlugin) TearDownPod(namespace string, name string, id dockertools.DockerID) error {
out, err := utilexec.New().Command(plugin.getExecutable(), tearDownCmd, namespace, name, string(id)).CombinedOutput()
glog.V(5).Infof("TearDownPod 'exec' network plugin output: %s, %v", string(out), err)
return err
}
Loading

0 comments on commit bec527f

Please sign in to comment.