Skip to content

Commit

Permalink
This change supports robust kubelet volume cleanup
Browse files Browse the repository at this point in the history
Currently kubelet volume management works on the concept of desired
and actual world of states. The volume manager periodically compares the
two worlds and perform volume mount/unmount and/or attach/detach
operations. When kubelet restarts, the cache of those two worlds are
gone. Although desired world can be recovered through apiserver, actual
world can not be recovered which may cause some volumes cannot be cleaned
up if their information is deleted by apiserver. This change adds the
reconstruction of the actual world by reading the pod directories from
disk. The reconstructed volume information is added to both desired
world and actual world if it cannot be found in either world. The rest
logic would be as same as before, desired world populator may clean up
the volume entry if it is no longer in apiserver, and then volume
manager should invoke unmount to clean it up.
  • Loading branch information
jingxu97 committed Aug 15, 2016
1 parent 936c517 commit f19a114
Show file tree
Hide file tree
Showing 46 changed files with 989 additions and 237 deletions.
7 changes: 7 additions & 0 deletions cmd/kubelet/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ import (
"github.com/spf13/pflag"
)

const (
DefaultKubeletPodsDirName = "pods"
DefaultKubeletVolumesDirName = "volumes"
DefaultKubeletPluginsDirName = "plugins"
DefaultKubeletContainersDirName = "containers"
)

// KubeletServer encapsulates all of the parameters necessary for starting up
// a kubelet. These can either be set via command line or directly.
type KubeletServer struct {
Expand Down
4 changes: 4 additions & 0 deletions pkg/controller/volume/persistentvolume/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1056,6 +1056,10 @@ func (plugin *mockVolumePlugin) RequiresRemount() bool {
return false
}

func (plugin *mockVolumePlugin) ConstructVolumeSpec(volumeName, mountPath string) (*vol.Spec, error) {
return nil, nil
}

func (plugin *mockVolumePlugin) NewMounter(spec *vol.Spec, podRef *api.Pod, opts vol.VolumeOptions) (vol.Mounter, error) {
return nil, fmt.Errorf("Mounter is not supported by this plugin")
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/kubelet/cm/container_manager_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ func (mi *fakeMountInterface) List() ([]mount.MountPoint, error) {
func (mi *fakeMountInterface) IsLikelyNotMountPoint(file string) (bool, error) {
return false, fmt.Errorf("unsupported")
}
func (mi *fakeMountInterface) GetDeviceNameFromMount(mountPath, pluginDir string) (string, error) {
return "", nil
}

func (mi *fakeMountInterface) DeviceOpened(pathname string) (bool, error) {
for _, mp := range mi.mountPoints {
Expand Down
17 changes: 12 additions & 5 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,14 +514,16 @@ func NewMainKubelet(
return nil, err
}

// setup volumeManager
klet.volumeManager, err = volumemanager.NewVolumeManager(
enableControllerAttachDetach,
nodeName,
klet.podManager,
klet.kubeClient,
klet.volumePluginMgr,
klet.containerRuntime,
mounter)
mounter,
klet.getPodsDir())

runtimeCache, err := kubecontainer.NewRuntimeCache(klet.containerRuntime)
if err != nil {
Expand Down Expand Up @@ -957,7 +959,7 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
}

// Start volume manager
go kl.volumeManager.Run(wait.NeverStop)
go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)

if kl.kubeClient != nil {
// Start syncing node status immediately, this may set up things the runtime needs to run.
Expand Down Expand Up @@ -1731,16 +1733,21 @@ func (kl *Kubelet) cleanupOrphanedPodDirs(
if allPods.Has(string(uid)) {
continue
}
// If volumes have not been unmounted/detached, do not delete directory.
// Doing so may result in corruption of data.
if podVolumesExist := kl.podVolumesExist(uid); podVolumesExist {
// If volumes have not been unmounted/detached, do not delete directory.
// Doing so may result in corruption of data.
glog.V(3).Infof("Orphaned pod %q found, but volumes are not cleaned up; err: %v", uid, err)
continue
}
// Check whether volume is still mounted on disk. If so, do not delete directory
if volumeNames, err := kl.getPodVolumeNameListFromDisk(uid); err != nil || len(volumeNames) != 0 {
glog.V(3).Infof("Orphaned pod %q found, but volumes are still mounted; err: %v, volumes: %v ", uid, err, volumeNames)
continue
}

glog.V(3).Infof("Orphaned pod %q found, removing", uid)
if err := os.RemoveAll(kl.getPodDir(uid)); err != nil {
glog.Infof("Failed to remove orphaned pod %q dir; err: %v", uid, err)
glog.Errorf("Failed to remove orphaned pod %q dir; err: %v", uid, err)
errlist = append(errlist, err)
}
}
Expand Down
42 changes: 37 additions & 5 deletions pkg/kubelet/kubelet_getters.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@ package kubelet

import (
"fmt"
"io/ioutil"
"net"
"path"

"github.com/golang/glog"
"k8s.io/kubernetes/cmd/kubelet/app/options"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/kubelet/cm"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
nodeutil "k8s.io/kubernetes/pkg/util/node"
)

Expand All @@ -40,15 +43,15 @@ func (kl *Kubelet) getRootDir() string {
// getPodsDir returns the full path to the directory under which pod
// directories are created.
func (kl *Kubelet) getPodsDir() string {
return path.Join(kl.getRootDir(), "pods")
return path.Join(kl.getRootDir(), options.DefaultKubeletPodsDirName)
}

// getPluginsDir returns the full path to the directory under which plugin
// directories are created. Plugins can use these directories for data that
// they need to persist. Plugins should create subdirectories under this named
// after their own names.
func (kl *Kubelet) getPluginsDir() string {
return path.Join(kl.getRootDir(), "plugins")
return path.Join(kl.getRootDir(), options.DefaultKubeletPluginsDirName)
}

// getPluginDir returns a data directory name for a given plugin name.
Expand Down Expand Up @@ -90,7 +93,7 @@ func (kl *Kubelet) getPodDir(podUID types.UID) string {
// which volumes are created for the specified pod. This directory may not
// exist if the pod does not exist.
func (kl *Kubelet) getPodVolumesDir(podUID types.UID) string {
return path.Join(kl.getPodDir(podUID), "volumes")
return path.Join(kl.getPodDir(podUID), options.DefaultKubeletVolumesDirName)
}

// getPodVolumeDir returns the full path to the directory which represents the
Expand All @@ -104,7 +107,7 @@ func (kl *Kubelet) getPodVolumeDir(podUID types.UID, pluginName string, volumeNa
// which plugins may store data for the specified pod. This directory may not
// exist if the pod does not exist.
func (kl *Kubelet) getPodPluginsDir(podUID types.UID) string {
return path.Join(kl.getPodDir(podUID), "plugins")
return path.Join(kl.getPodDir(podUID), options.DefaultKubeletPluginsDirName)
}

// getPodPluginDir returns a data directory name for a given plugin name for a
Expand All @@ -126,7 +129,7 @@ func (kl *Kubelet) getPodContainerDir(podUID types.UID, ctrName string) string {
// old && new = use new (but warn)
oldPath := path.Join(kl.getPodDir(podUID), ctrName)
oldExists := dirExists(oldPath)
newPath := path.Join(kl.getPodDir(podUID), "containers", ctrName)
newPath := path.Join(kl.getPodDir(podUID), options.DefaultKubeletContainersDirName, ctrName)
newExists := dirExists(newPath)
if oldExists && !newExists {
return oldPath
Expand Down Expand Up @@ -234,3 +237,32 @@ func (kl *Kubelet) getHostIPAnyWay() (net.IP, error) {
func (kl *Kubelet) GetExtraSupplementalGroupsForPod(pod *api.Pod) []int64 {
return kl.volumeManager.GetExtraSupplementalGroupsForPod(pod)
}

// getPodVolumeNameListFromDisk returns a list of the volume names by reading the
// volume directories for the given pod from the disk.
func (kl *Kubelet) getPodVolumeNameListFromDisk(podUID types.UID) ([]string, error) {
volumes := []string{}
podVolDir := kl.getPodVolumesDir(podUID)
volumePluginDirs, err := ioutil.ReadDir(podVolDir)
if err != nil {
glog.Errorf("Could not read directory %s: %v", podVolDir, err)
return volumes, err
}
for _, volumePluginDir := range volumePluginDirs {
volumePluginName := volumePluginDir.Name()
volumePluginPath := path.Join(podVolDir, volumePluginName)
volumeDirs, volumeDirsStatErrs, err := util.ReadDirNoExit(volumePluginPath)
if err != nil {
return volumes, fmt.Errorf("Could not read directory %s: %v", volumePluginPath, err)
}
for i, volumeDir := range volumeDirs {
if volumeDir != nil {
volumes = append(volumes, volumeDir.Name())
continue
}
glog.Errorf("Could not read directory %s: %v", podVolDir, volumeDirsStatErrs[i])

}
}
return volumes, nil
}
24 changes: 13 additions & 11 deletions pkg/kubelet/kubelet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,8 @@ func newTestKubeletWithImageList(
fakeKubeClient,
kubelet.volumePluginMgr,
fakeRuntime,
kubelet.mounter)
kubelet.mounter,
kubelet.getPodsDir())
if err != nil {
t.Fatalf("failed to initialize volume manager: %v", err)
}
Expand Down Expand Up @@ -404,8 +405,7 @@ func TestVolumeAttachAndMountControllerDisabled(t *testing.T) {
},
})

stopCh := make(chan struct{})
go kubelet.volumeManager.Run(stopCh)
stopCh := runVolumeManager(kubelet)
defer func() {
close(stopCh)
}()
Expand Down Expand Up @@ -474,8 +474,7 @@ func TestVolumeUnmountAndDetachControllerDisabled(t *testing.T) {
},
})

stopCh := make(chan struct{})
go kubelet.volumeManager.Run(stopCh)
stopCh := runVolumeManager(kubelet)
defer func() {
close(stopCh)
}()
Expand Down Expand Up @@ -603,8 +602,7 @@ func TestVolumeAttachAndMountControllerEnabled(t *testing.T) {
},
})

stopCh := make(chan struct{})
go kubelet.volumeManager.Run(stopCh)
stopCh := runVolumeManager(kubelet)
defer func() {
close(stopCh)
}()
Expand Down Expand Up @@ -697,8 +695,7 @@ func TestVolumeUnmountAndDetachControllerEnabled(t *testing.T) {
},
})

stopCh := make(chan struct{})
go kubelet.volumeManager.Run(stopCh)
stopCh := runVolumeManager(kubelet)
defer func() {
close(stopCh)
}()
Expand Down Expand Up @@ -856,8 +853,7 @@ func TestPodVolumesExist(t *testing.T) {
},
}

stopCh := make(chan struct{})
go kubelet.volumeManager.Run(stopCh)
stopCh := runVolumeManager(kubelet)
defer func() {
close(stopCh)
}()
Expand Down Expand Up @@ -3939,3 +3935,9 @@ func simulateVolumeInUseUpdate(
}
}
}

func runVolumeManager(kubelet *Kubelet) chan struct{} {
stopCh := make(chan struct{})
go kubelet.volumeManager.Run(kubelet.sourcesReady, stopCh)
return stopCh
}
3 changes: 2 additions & 1 deletion pkg/kubelet/runonce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ func TestRunOnce(t *testing.T) {
kb.kubeClient,
kb.volumePluginMgr,
fakeRuntime,
kb.mounter)
kb.mounter,
kb.getPodsDir())

kb.networkPlugin, _ = network.InitNetworkPlugin([]network.NetworkPlugin{}, "", nettest.NewFakeHost(nil), componentconfig.HairpinNone, kb.nonMasqueradeCIDR)
// TODO: Factor out "StatsProvider" from Kubelet so we don't have a cyclic dependency
Expand Down
20 changes: 20 additions & 0 deletions pkg/kubelet/volumemanager/cache/actual_state_of_world.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,11 @@ type ActualStateOfWorld interface {
// have no mountedPods. This list can be used to determine which volumes are
// no longer referenced and may be globally unmounted and detached.
GetUnmountedVolumes() []AttachedVolume

// GetPods generates and returns a map of pods in which map is indexed
// with pod's unique name. This map can be used to determine which pod is currently
// in actual state of world.
GetPods() map[volumetypes.UniquePodName]bool
}

// MountedVolume represents a volume that has successfully been mounted to a pod.
Expand Down Expand Up @@ -573,6 +578,21 @@ func (asw *actualStateOfWorld) GetUnmountedVolumes() []AttachedVolume {
return unmountedVolumes
}

func (asw *actualStateOfWorld) GetPods() map[volumetypes.UniquePodName]bool {
asw.RLock()
defer asw.RUnlock()

podList := make(map[volumetypes.UniquePodName]bool)
for _, volumeObj := range asw.attachedVolumes {
for podName := range volumeObj.mountedPods {
if !podList[podName] {
podList[podName] = true
}
}
}
return podList
}

func (asw *actualStateOfWorld) newAttachedVolume(
attachedVolume *attachedVolume) AttachedVolume {
return AttachedVolume{
Expand Down
23 changes: 22 additions & 1 deletion pkg/kubelet/volumemanager/cache/desired_state_of_world.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ type DesiredStateOfWorld interface {
// attached to this node and the pods they should be mounted to based on the
// current desired state of the world.
GetVolumesToMount() []VolumeToMount

// GetPods generates and returns a map of pods in which map is indexed
// with pod's unique name. This map can be used to determine which pod is currently
// in desired state of world.
GetPods() map[types.UniquePodName]bool
}

// VolumeToMount represents a volume that is attached to this node and needs to
Expand All @@ -117,6 +122,7 @@ type desiredStateOfWorld struct {
// volumePluginMgr is the volume plugin manager used to create volume
// plugin objects.
volumePluginMgr *volume.VolumePluginMgr

sync.RWMutex
}

Expand Down Expand Up @@ -203,7 +209,7 @@ func (dsw *desiredStateOfWorld) AddPodToVolume(
} else {
// For non-attachable volumes, generate a unique name based on the pod
// namespace and name and the name of the volume within the pod.
volumeName = volumehelper.GetUniqueVolumeNameForNonAttachableVolume(podName, volumePlugin, outerVolumeSpecName)
volumeName = volumehelper.GetUniqueVolumeNameForNonAttachableVolume(podName, volumePlugin, volumeSpec)
}

volumeObj, volumeExists := dsw.volumesToMount[volumeName]
Expand Down Expand Up @@ -296,6 +302,21 @@ func (dsw *desiredStateOfWorld) PodExistsInVolume(
return podExists
}

func (dsw *desiredStateOfWorld) GetPods() map[types.UniquePodName]bool {
dsw.RLock()
defer dsw.RUnlock()

podList := make(map[types.UniquePodName]bool)
for _, volumeObj := range dsw.volumesToMount {
for podName := range volumeObj.podsToMount {
if !podList[podName] {
podList[podName] = true
}
}
}
return podList
}

func (dsw *desiredStateOfWorld) GetVolumesToMount() []VolumeToMount {
dsw.RLock()
defer dsw.RUnlock()
Expand Down
Loading

0 comments on commit f19a114

Please sign in to comment.