From f19a1148db1b7584be6b6b60abaf8c0bd1503ed3 Mon Sep 17 00:00:00 2001 From: Jing Xu Date: Thu, 23 Jun 2016 12:46:21 -0700 Subject: [PATCH] This change supports robust kubelet volume cleanup Currently kubelet volume management works on the concept of desired and actual world of states. The volume manager periodically compares the two worlds and perform volume mount/unmount and/or attach/detach operations. When kubelet restarts, the cache of those two worlds are gone. Although desired world can be recovered through apiserver, actual world can not be recovered which may cause some volumes cannot be cleaned up if their information is deleted by apiserver. This change adds the reconstruction of the actual world by reading the pod directories from disk. The reconstructed volume information is added to both desired world and actual world if it cannot be found in either world. The rest logic would be as same as before, desired world populator may clean up the volume entry if it is no longer in apiserver, and then volume manager should invoke unmount to clean it up. --- cmd/kubelet/app/options/options.go | 7 + .../volume/persistentvolume/framework_test.go | 4 + .../cm/container_manager_linux_test.go | 3 + pkg/kubelet/kubelet.go | 17 +- pkg/kubelet/kubelet_getters.go | 42 +- pkg/kubelet/kubelet_test.go | 24 +- pkg/kubelet/runonce_test.go | 3 +- .../cache/actual_state_of_world.go | 20 + .../cache/desired_state_of_world.go | 23 +- .../volumemanager/reconciler/reconciler.go | 547 +++++++++++++----- .../reconciler/reconciler_test.go | 50 +- pkg/kubelet/volumemanager/volume_manager.go | 27 +- .../volumemanager/volume_manager_test.go | 25 +- pkg/util/goroutinemap/goroutinemap.go | 16 +- pkg/util/mount/fake.go | 4 + pkg/util/mount/mount.go | 28 + pkg/util/mount/mount_linux.go | 5 + pkg/util/mount/nsenter_mount.go | 5 + pkg/util/mount/nsenter_mount_unsupported.go | 4 + pkg/volume/aws_ebs/attacher.go | 5 + pkg/volume/aws_ebs/aws_ebs.go | 18 + pkg/volume/azure_file/azure_file.go | 13 + pkg/volume/cephfs/cephfs.go | 13 + pkg/volume/cinder/attacher.go | 5 + pkg/volume/cinder/cinder.go | 19 + pkg/volume/configmap/configmap.go | 10 + pkg/volume/downwardapi/downwardapi.go | 10 + pkg/volume/empty_dir/empty_dir.go | 10 + pkg/volume/fc/fc.go | 10 + pkg/volume/flexvolume/flexvolume.go | 12 + pkg/volume/flocker/plugin.go | 12 + pkg/volume/gce_pd/attacher.go | 5 + pkg/volume/gce_pd/gce_pd.go | 18 + pkg/volume/git_repo/git_repo.go | 10 + pkg/volume/glusterfs/glusterfs.go | 13 + pkg/volume/host_path/host_path.go | 12 + pkg/volume/iscsi/iscsi.go | 13 + pkg/volume/nfs/nfs.go | 12 + pkg/volume/plugins.go | 7 + pkg/volume/rbd/rbd.go | 12 + pkg/volume/secret/secret.go | 12 + pkg/volume/testing/testing.go | 8 + .../nestedpendingoperations.go | 67 ++- .../operationexecutor/operation_executor.go | 28 +- pkg/volume/util/volumehelper/volumehelper.go | 6 +- pkg/volume/vsphere_volume/vsphere_volume.go | 12 + 46 files changed, 989 insertions(+), 237 deletions(-) diff --git a/cmd/kubelet/app/options/options.go b/cmd/kubelet/app/options/options.go index f4ec9289212eb..1c6d194b3d494 100644 --- a/cmd/kubelet/app/options/options.go +++ b/cmd/kubelet/app/options/options.go @@ -29,6 +29,13 @@ import ( "github.com/spf13/pflag" ) +const ( + DefaultKubeletPodsDirName = "pods" + DefaultKubeletVolumesDirName = "volumes" + DefaultKubeletPluginsDirName = "plugins" + DefaultKubeletContainersDirName = "containers" +) + // KubeletServer encapsulates all of the parameters necessary for starting up // a kubelet. These can either be set via command line or directly. type KubeletServer struct { diff --git a/pkg/controller/volume/persistentvolume/framework_test.go b/pkg/controller/volume/persistentvolume/framework_test.go index 68354dd8d01f1..609548965e16d 100644 --- a/pkg/controller/volume/persistentvolume/framework_test.go +++ b/pkg/controller/volume/persistentvolume/framework_test.go @@ -1056,6 +1056,10 @@ func (plugin *mockVolumePlugin) RequiresRemount() bool { return false } +func (plugin *mockVolumePlugin) ConstructVolumeSpec(volumeName, mountPath string) (*vol.Spec, error) { + return nil, nil +} + func (plugin *mockVolumePlugin) NewMounter(spec *vol.Spec, podRef *api.Pod, opts vol.VolumeOptions) (vol.Mounter, error) { return nil, fmt.Errorf("Mounter is not supported by this plugin") } diff --git a/pkg/kubelet/cm/container_manager_linux_test.go b/pkg/kubelet/cm/container_manager_linux_test.go index 16273ab540042..37a4d64504471 100644 --- a/pkg/kubelet/cm/container_manager_linux_test.go +++ b/pkg/kubelet/cm/container_manager_linux_test.go @@ -50,6 +50,9 @@ func (mi *fakeMountInterface) List() ([]mount.MountPoint, error) { func (mi *fakeMountInterface) IsLikelyNotMountPoint(file string) (bool, error) { return false, fmt.Errorf("unsupported") } +func (mi *fakeMountInterface) GetDeviceNameFromMount(mountPath, pluginDir string) (string, error) { + return "", nil +} func (mi *fakeMountInterface) DeviceOpened(pathname string) (bool, error) { for _, mp := range mi.mountPoints { diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 67f12d7c57d03..38adc92cf4ea5 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -514,6 +514,7 @@ func NewMainKubelet( return nil, err } + // setup volumeManager klet.volumeManager, err = volumemanager.NewVolumeManager( enableControllerAttachDetach, nodeName, @@ -521,7 +522,8 @@ func NewMainKubelet( klet.kubeClient, klet.volumePluginMgr, klet.containerRuntime, - mounter) + mounter, + klet.getPodsDir()) runtimeCache, err := kubecontainer.NewRuntimeCache(klet.containerRuntime) if err != nil { @@ -957,7 +959,7 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) { } // Start volume manager - go kl.volumeManager.Run(wait.NeverStop) + go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop) if kl.kubeClient != nil { // Start syncing node status immediately, this may set up things the runtime needs to run. @@ -1731,16 +1733,21 @@ func (kl *Kubelet) cleanupOrphanedPodDirs( if allPods.Has(string(uid)) { continue } + // If volumes have not been unmounted/detached, do not delete directory. + // Doing so may result in corruption of data. if podVolumesExist := kl.podVolumesExist(uid); podVolumesExist { - // If volumes have not been unmounted/detached, do not delete directory. - // Doing so may result in corruption of data. glog.V(3).Infof("Orphaned pod %q found, but volumes are not cleaned up; err: %v", uid, err) continue } + // Check whether volume is still mounted on disk. If so, do not delete directory + if volumeNames, err := kl.getPodVolumeNameListFromDisk(uid); err != nil || len(volumeNames) != 0 { + glog.V(3).Infof("Orphaned pod %q found, but volumes are still mounted; err: %v, volumes: %v ", uid, err, volumeNames) + continue + } glog.V(3).Infof("Orphaned pod %q found, removing", uid) if err := os.RemoveAll(kl.getPodDir(uid)); err != nil { - glog.Infof("Failed to remove orphaned pod %q dir; err: %v", uid, err) + glog.Errorf("Failed to remove orphaned pod %q dir; err: %v", uid, err) errlist = append(errlist, err) } } diff --git a/pkg/kubelet/kubelet_getters.go b/pkg/kubelet/kubelet_getters.go index 8d8b6b8706552..8f9194c07e64d 100644 --- a/pkg/kubelet/kubelet_getters.go +++ b/pkg/kubelet/kubelet_getters.go @@ -18,14 +18,17 @@ package kubelet import ( "fmt" + "io/ioutil" "net" "path" "github.com/golang/glog" + "k8s.io/kubernetes/cmd/kubelet/app/options" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/kubelet/cm" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/types" + "k8s.io/kubernetes/pkg/util" nodeutil "k8s.io/kubernetes/pkg/util/node" ) @@ -40,7 +43,7 @@ func (kl *Kubelet) getRootDir() string { // getPodsDir returns the full path to the directory under which pod // directories are created. func (kl *Kubelet) getPodsDir() string { - return path.Join(kl.getRootDir(), "pods") + return path.Join(kl.getRootDir(), options.DefaultKubeletPodsDirName) } // getPluginsDir returns the full path to the directory under which plugin @@ -48,7 +51,7 @@ func (kl *Kubelet) getPodsDir() string { // they need to persist. Plugins should create subdirectories under this named // after their own names. func (kl *Kubelet) getPluginsDir() string { - return path.Join(kl.getRootDir(), "plugins") + return path.Join(kl.getRootDir(), options.DefaultKubeletPluginsDirName) } // getPluginDir returns a data directory name for a given plugin name. @@ -90,7 +93,7 @@ func (kl *Kubelet) getPodDir(podUID types.UID) string { // which volumes are created for the specified pod. This directory may not // exist if the pod does not exist. func (kl *Kubelet) getPodVolumesDir(podUID types.UID) string { - return path.Join(kl.getPodDir(podUID), "volumes") + return path.Join(kl.getPodDir(podUID), options.DefaultKubeletVolumesDirName) } // getPodVolumeDir returns the full path to the directory which represents the @@ -104,7 +107,7 @@ func (kl *Kubelet) getPodVolumeDir(podUID types.UID, pluginName string, volumeNa // which plugins may store data for the specified pod. This directory may not // exist if the pod does not exist. func (kl *Kubelet) getPodPluginsDir(podUID types.UID) string { - return path.Join(kl.getPodDir(podUID), "plugins") + return path.Join(kl.getPodDir(podUID), options.DefaultKubeletPluginsDirName) } // getPodPluginDir returns a data directory name for a given plugin name for a @@ -126,7 +129,7 @@ func (kl *Kubelet) getPodContainerDir(podUID types.UID, ctrName string) string { // old && new = use new (but warn) oldPath := path.Join(kl.getPodDir(podUID), ctrName) oldExists := dirExists(oldPath) - newPath := path.Join(kl.getPodDir(podUID), "containers", ctrName) + newPath := path.Join(kl.getPodDir(podUID), options.DefaultKubeletContainersDirName, ctrName) newExists := dirExists(newPath) if oldExists && !newExists { return oldPath @@ -234,3 +237,32 @@ func (kl *Kubelet) getHostIPAnyWay() (net.IP, error) { func (kl *Kubelet) GetExtraSupplementalGroupsForPod(pod *api.Pod) []int64 { return kl.volumeManager.GetExtraSupplementalGroupsForPod(pod) } + +// getPodVolumeNameListFromDisk returns a list of the volume names by reading the +// volume directories for the given pod from the disk. +func (kl *Kubelet) getPodVolumeNameListFromDisk(podUID types.UID) ([]string, error) { + volumes := []string{} + podVolDir := kl.getPodVolumesDir(podUID) + volumePluginDirs, err := ioutil.ReadDir(podVolDir) + if err != nil { + glog.Errorf("Could not read directory %s: %v", podVolDir, err) + return volumes, err + } + for _, volumePluginDir := range volumePluginDirs { + volumePluginName := volumePluginDir.Name() + volumePluginPath := path.Join(podVolDir, volumePluginName) + volumeDirs, volumeDirsStatErrs, err := util.ReadDirNoExit(volumePluginPath) + if err != nil { + return volumes, fmt.Errorf("Could not read directory %s: %v", volumePluginPath, err) + } + for i, volumeDir := range volumeDirs { + if volumeDir != nil { + volumes = append(volumes, volumeDir.Name()) + continue + } + glog.Errorf("Could not read directory %s: %v", podVolDir, volumeDirsStatErrs[i]) + + } + } + return volumes, nil +} diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 5977fc44d87f3..dba23ac80ee34 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -248,7 +248,8 @@ func newTestKubeletWithImageList( fakeKubeClient, kubelet.volumePluginMgr, fakeRuntime, - kubelet.mounter) + kubelet.mounter, + kubelet.getPodsDir()) if err != nil { t.Fatalf("failed to initialize volume manager: %v", err) } @@ -404,8 +405,7 @@ func TestVolumeAttachAndMountControllerDisabled(t *testing.T) { }, }) - stopCh := make(chan struct{}) - go kubelet.volumeManager.Run(stopCh) + stopCh := runVolumeManager(kubelet) defer func() { close(stopCh) }() @@ -474,8 +474,7 @@ func TestVolumeUnmountAndDetachControllerDisabled(t *testing.T) { }, }) - stopCh := make(chan struct{}) - go kubelet.volumeManager.Run(stopCh) + stopCh := runVolumeManager(kubelet) defer func() { close(stopCh) }() @@ -603,8 +602,7 @@ func TestVolumeAttachAndMountControllerEnabled(t *testing.T) { }, }) - stopCh := make(chan struct{}) - go kubelet.volumeManager.Run(stopCh) + stopCh := runVolumeManager(kubelet) defer func() { close(stopCh) }() @@ -697,8 +695,7 @@ func TestVolumeUnmountAndDetachControllerEnabled(t *testing.T) { }, }) - stopCh := make(chan struct{}) - go kubelet.volumeManager.Run(stopCh) + stopCh := runVolumeManager(kubelet) defer func() { close(stopCh) }() @@ -856,8 +853,7 @@ func TestPodVolumesExist(t *testing.T) { }, } - stopCh := make(chan struct{}) - go kubelet.volumeManager.Run(stopCh) + stopCh := runVolumeManager(kubelet) defer func() { close(stopCh) }() @@ -3939,3 +3935,9 @@ func simulateVolumeInUseUpdate( } } } + +func runVolumeManager(kubelet *Kubelet) chan struct{} { + stopCh := make(chan struct{}) + go kubelet.volumeManager.Run(kubelet.sourcesReady, stopCh) + return stopCh +} diff --git a/pkg/kubelet/runonce_test.go b/pkg/kubelet/runonce_test.go index 7c922a4feec15..e368056cfde5a 100644 --- a/pkg/kubelet/runonce_test.go +++ b/pkg/kubelet/runonce_test.go @@ -99,7 +99,8 @@ func TestRunOnce(t *testing.T) { kb.kubeClient, kb.volumePluginMgr, fakeRuntime, - kb.mounter) + kb.mounter, + kb.getPodsDir()) kb.networkPlugin, _ = network.InitNetworkPlugin([]network.NetworkPlugin{}, "", nettest.NewFakeHost(nil), componentconfig.HairpinNone, kb.nonMasqueradeCIDR) // TODO: Factor out "StatsProvider" from Kubelet so we don't have a cyclic dependency diff --git a/pkg/kubelet/volumemanager/cache/actual_state_of_world.go b/pkg/kubelet/volumemanager/cache/actual_state_of_world.go index d3906a52e3f02..7e8756a779467 100644 --- a/pkg/kubelet/volumemanager/cache/actual_state_of_world.go +++ b/pkg/kubelet/volumemanager/cache/actual_state_of_world.go @@ -136,6 +136,11 @@ type ActualStateOfWorld interface { // have no mountedPods. This list can be used to determine which volumes are // no longer referenced and may be globally unmounted and detached. GetUnmountedVolumes() []AttachedVolume + + // GetPods generates and returns a map of pods in which map is indexed + // with pod's unique name. This map can be used to determine which pod is currently + // in actual state of world. + GetPods() map[volumetypes.UniquePodName]bool } // MountedVolume represents a volume that has successfully been mounted to a pod. @@ -573,6 +578,21 @@ func (asw *actualStateOfWorld) GetUnmountedVolumes() []AttachedVolume { return unmountedVolumes } +func (asw *actualStateOfWorld) GetPods() map[volumetypes.UniquePodName]bool { + asw.RLock() + defer asw.RUnlock() + + podList := make(map[volumetypes.UniquePodName]bool) + for _, volumeObj := range asw.attachedVolumes { + for podName := range volumeObj.mountedPods { + if !podList[podName] { + podList[podName] = true + } + } + } + return podList +} + func (asw *actualStateOfWorld) newAttachedVolume( attachedVolume *attachedVolume) AttachedVolume { return AttachedVolume{ diff --git a/pkg/kubelet/volumemanager/cache/desired_state_of_world.go b/pkg/kubelet/volumemanager/cache/desired_state_of_world.go index 24edd5906c0b5..6bcbef8e27e05 100644 --- a/pkg/kubelet/volumemanager/cache/desired_state_of_world.go +++ b/pkg/kubelet/volumemanager/cache/desired_state_of_world.go @@ -92,6 +92,11 @@ type DesiredStateOfWorld interface { // attached to this node and the pods they should be mounted to based on the // current desired state of the world. GetVolumesToMount() []VolumeToMount + + // GetPods generates and returns a map of pods in which map is indexed + // with pod's unique name. This map can be used to determine which pod is currently + // in desired state of world. + GetPods() map[types.UniquePodName]bool } // VolumeToMount represents a volume that is attached to this node and needs to @@ -117,6 +122,7 @@ type desiredStateOfWorld struct { // volumePluginMgr is the volume plugin manager used to create volume // plugin objects. volumePluginMgr *volume.VolumePluginMgr + sync.RWMutex } @@ -203,7 +209,7 @@ func (dsw *desiredStateOfWorld) AddPodToVolume( } else { // For non-attachable volumes, generate a unique name based on the pod // namespace and name and the name of the volume within the pod. - volumeName = volumehelper.GetUniqueVolumeNameForNonAttachableVolume(podName, volumePlugin, outerVolumeSpecName) + volumeName = volumehelper.GetUniqueVolumeNameForNonAttachableVolume(podName, volumePlugin, volumeSpec) } volumeObj, volumeExists := dsw.volumesToMount[volumeName] @@ -296,6 +302,21 @@ func (dsw *desiredStateOfWorld) PodExistsInVolume( return podExists } +func (dsw *desiredStateOfWorld) GetPods() map[types.UniquePodName]bool { + dsw.RLock() + defer dsw.RUnlock() + + podList := make(map[types.UniquePodName]bool) + for _, volumeObj := range dsw.volumesToMount { + for podName := range volumeObj.podsToMount { + if !podList[podName] { + podList[podName] = true + } + } + } + return podList +} + func (dsw *desiredStateOfWorld) GetVolumesToMount() []VolumeToMount { dsw.RLock() defer dsw.RUnlock() diff --git a/pkg/kubelet/volumemanager/reconciler/reconciler.go b/pkg/kubelet/volumemanager/reconciler/reconciler.go index 38e11e231aa7c..6603e38d6202b 100644 --- a/pkg/kubelet/volumemanager/reconciler/reconciler.go +++ b/pkg/kubelet/volumemanager/reconciler/reconciler.go @@ -20,16 +20,27 @@ limitations under the License. package reconciler import ( + "fmt" + "io/ioutil" + "path" "time" "github.com/golang/glog" + "k8s.io/kubernetes/cmd/kubelet/app/options" + "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + "k8s.io/kubernetes/pkg/kubelet/config" "k8s.io/kubernetes/pkg/kubelet/volumemanager/cache" + "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff" "k8s.io/kubernetes/pkg/util/mount" + "k8s.io/kubernetes/pkg/util/strings" "k8s.io/kubernetes/pkg/util/wait" + "k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume/util/nestedpendingoperations" "k8s.io/kubernetes/pkg/volume/util/operationexecutor" + volumetypes "k8s.io/kubernetes/pkg/volume/util/types" + "k8s.io/kubernetes/pkg/volume/util/volumehelper" ) // Reconciler runs a periodic loop to reconcile the desired state of the world @@ -46,7 +57,7 @@ type Reconciler interface { // If attach/detach management is enabled, the manager will also check if // volumes that should be attached are attached and volumes that should // be detached are detached and trigger attach/detach operations as needed. - Run(stopCh <-chan struct{}) + Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) } // NewReconciler returns a new instance of Reconciler. @@ -56,6 +67,8 @@ type Reconciler interface { // this node, and therefore the volume manager should not // loopSleepDuration - the amount of time the reconciler loop sleeps between // successive executions +// reconstructDuration - the amount of time the reconstruct sleeps between +// successive executions // waitForAttachTimeout - the amount of time the Mount function will wait for // the volume to be attached // hostName - the hostname for this node, used by Attach and Detach methods @@ -65,26 +78,34 @@ type Reconciler interface { // safely (prevents more than one operation from being triggered on the same // volume) // mounter - mounter passed in from kubelet, passed down unmount path +// volumePluginMrg - volume plugin manager passed from kubelet func NewReconciler( kubeClient internalclientset.Interface, controllerAttachDetachEnabled bool, loopSleepDuration time.Duration, + reconstructDuration time.Duration, waitForAttachTimeout time.Duration, hostName string, desiredStateOfWorld cache.DesiredStateOfWorld, actualStateOfWorld cache.ActualStateOfWorld, operationExecutor operationexecutor.OperationExecutor, - mounter mount.Interface) Reconciler { + mounter mount.Interface, + volumePluginMgr *volume.VolumePluginMgr, + kubeletPodsDir string) Reconciler { return &reconciler{ kubeClient: kubeClient, controllerAttachDetachEnabled: controllerAttachDetachEnabled, loopSleepDuration: loopSleepDuration, + reconstructDuration: reconstructDuration, waitForAttachTimeout: waitForAttachTimeout, hostName: hostName, desiredStateOfWorld: desiredStateOfWorld, actualStateOfWorld: actualStateOfWorld, operationExecutor: operationExecutor, mounter: mounter, + volumePluginMgr: volumePluginMgr, + kubeletPodsDir: kubeletPodsDir, + timeOfLastReconstruct: time.Now(), } } @@ -92,157 +113,138 @@ type reconciler struct { kubeClient internalclientset.Interface controllerAttachDetachEnabled bool loopSleepDuration time.Duration + reconstructDuration time.Duration waitForAttachTimeout time.Duration hostName string desiredStateOfWorld cache.DesiredStateOfWorld actualStateOfWorld cache.ActualStateOfWorld operationExecutor operationexecutor.OperationExecutor mounter mount.Interface + volumePluginMgr *volume.VolumePluginMgr + kubeletPodsDir string + timeOfLastReconstruct time.Time } -func (rc *reconciler) Run(stopCh <-chan struct{}) { - wait.Until(rc.reconciliationLoopFunc(), rc.loopSleepDuration, stopCh) +func (rc *reconciler) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) { + wait.Until(rc.reconciliationLoopFunc(sourcesReady), rc.loopSleepDuration, stopCh) } -func (rc *reconciler) reconciliationLoopFunc() func() { +func (rc *reconciler) reconciliationLoopFunc(sourcesReady config.SourcesReady) func() { return func() { - // Unmounts are triggered before mounts so that a volume that was - // referenced by a pod that was deleted and is now referenced by another - // pod is unmounted from the first pod before being mounted to the new - // pod. + rc.reconcile() + + // Add all sources ready check so that reconciler's reconstruct process will start after + // desired state of world is populated with pod volume information from different sources. Otherwise, + // reconciler's reconstruct process may add incomplete volume information and cause confusion. + // In addition, if some sources are not ready, the reconstruct process may clean up pods' volumes + // that are still in use because desired states could not get a complete list of pods. + if sourcesReady.AllReady() && time.Since(rc.timeOfLastReconstruct) > rc.reconstructDuration { + glog.V(5).Infof("Sources are all ready, starting reconstruct state function") + rc.reconstruct() + } + } +} - // Ensure volumes that should be unmounted are unmounted. - for _, mountedVolume := range rc.actualStateOfWorld.GetMountedVolumes() { - if !rc.desiredStateOfWorld.PodExistsInVolume(mountedVolume.PodName, mountedVolume.VolumeName) { - // Volume is mounted, unmount it - glog.V(12).Infof("Attempting to start UnmountVolume for volume %q (spec.Name: %q) from pod %q (UID: %q).", +func (rc *reconciler) reconcile() { + // Unmounts are triggered before mounts so that a volume that was + // referenced by a pod that was deleted and is now referenced by another + // pod is unmounted from the first pod before being mounted to the new + // pod. + + // Ensure volumes that should be unmounted are unmounted. + for _, mountedVolume := range rc.actualStateOfWorld.GetMountedVolumes() { + if !rc.desiredStateOfWorld.PodExistsInVolume(mountedVolume.PodName, mountedVolume.VolumeName) { + // Volume is mounted, unmount it + glog.V(12).Infof("Attempting to start UnmountVolume for volume %q (spec.Name: %q) from pod %q (UID: %q).", + mountedVolume.VolumeName, + mountedVolume.OuterVolumeSpecName, + mountedVolume.PodName, + mountedVolume.PodUID) + err := rc.operationExecutor.UnmountVolume( + mountedVolume.MountedVolume, rc.actualStateOfWorld) + if err != nil && + !nestedpendingoperations.IsAlreadyExists(err) && + !exponentialbackoff.IsExponentialBackoff(err) { + // Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected. + // Log all other errors. + glog.Errorf( + "operationExecutor.UnmountVolume failed for volume %q (spec.Name: %q) pod %q (UID: %q) controllerAttachDetachEnabled: %v with err: %v", + mountedVolume.VolumeName, + mountedVolume.OuterVolumeSpecName, + mountedVolume.PodName, + mountedVolume.PodUID, + rc.controllerAttachDetachEnabled, + err) + } + if err == nil { + glog.Infof("UnmountVolume operation started for volume %q (spec.Name: %q) from pod %q (UID: %q).", mountedVolume.VolumeName, mountedVolume.OuterVolumeSpecName, mountedVolume.PodName, mountedVolume.PodUID) - err := rc.operationExecutor.UnmountVolume( - mountedVolume.MountedVolume, rc.actualStateOfWorld) + } + } + } + + // Ensure volumes that should be attached/mounted are attached/mounted. + for _, volumeToMount := range rc.desiredStateOfWorld.GetVolumesToMount() { + volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName) + volumeToMount.DevicePath = devicePath + if cache.IsVolumeNotAttachedError(err) { + if rc.controllerAttachDetachEnabled || !volumeToMount.PluginIsAttachable { + // Volume is not attached (or doesn't implement attacher), kubelet attach is disabled, wait + // for controller to finish attaching volume. + glog.V(12).Infof("Attempting to start VerifyControllerAttachedVolume for volume %q (spec.Name: %q) pod %q (UID: %q)", + volumeToMount.VolumeName, + volumeToMount.VolumeSpec.Name(), + volumeToMount.PodName, + volumeToMount.Pod.UID) + err := rc.operationExecutor.VerifyControllerAttachedVolume( + volumeToMount.VolumeToMount, + rc.hostName, + rc.actualStateOfWorld) if err != nil && !nestedpendingoperations.IsAlreadyExists(err) && !exponentialbackoff.IsExponentialBackoff(err) { // Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected. // Log all other errors. glog.Errorf( - "operationExecutor.UnmountVolume failed for volume %q (spec.Name: %q) pod %q (UID: %q) controllerAttachDetachEnabled: %v with err: %v", - mountedVolume.VolumeName, - mountedVolume.OuterVolumeSpecName, - mountedVolume.PodName, - mountedVolume.PodUID, + "operationExecutor.VerifyControllerAttachedVolume failed for volume %q (spec.Name: %q) pod %q (UID: %q) controllerAttachDetachEnabled: %v with err: %v", + volumeToMount.VolumeName, + volumeToMount.VolumeSpec.Name(), + volumeToMount.PodName, + volumeToMount.Pod.UID, rc.controllerAttachDetachEnabled, err) } if err == nil { - glog.Infof("UnmountVolume operation started for volume %q (spec.Name: %q) from pod %q (UID: %q).", - mountedVolume.VolumeName, - mountedVolume.OuterVolumeSpecName, - mountedVolume.PodName, - mountedVolume.PodUID) - } - } - } - - // Ensure volumes that should be attached/mounted are attached/mounted. - for _, volumeToMount := range rc.desiredStateOfWorld.GetVolumesToMount() { - volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName) - volumeToMount.DevicePath = devicePath - if cache.IsVolumeNotAttachedError(err) { - if rc.controllerAttachDetachEnabled || !volumeToMount.PluginIsAttachable { - // Volume is not attached (or doesn't implement attacher), kubelet attach is disabled, wait - // for controller to finish attaching volume. - glog.V(12).Infof("Attempting to start VerifyControllerAttachedVolume for volume %q (spec.Name: %q) pod %q (UID: %q)", + glog.Infof("VerifyControllerAttachedVolume operation started for volume %q (spec.Name: %q) pod %q (UID: %q)", volumeToMount.VolumeName, volumeToMount.VolumeSpec.Name(), volumeToMount.PodName, volumeToMount.Pod.UID) - err := rc.operationExecutor.VerifyControllerAttachedVolume( - volumeToMount.VolumeToMount, - rc.hostName, - rc.actualStateOfWorld) - if err != nil && - !nestedpendingoperations.IsAlreadyExists(err) && - !exponentialbackoff.IsExponentialBackoff(err) { - // Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected. - // Log all other errors. - glog.Errorf( - "operationExecutor.VerifyControllerAttachedVolume failed for volume %q (spec.Name: %q) pod %q (UID: %q) controllerAttachDetachEnabled: %v with err: %v", - volumeToMount.VolumeName, - volumeToMount.VolumeSpec.Name(), - volumeToMount.PodName, - volumeToMount.Pod.UID, - rc.controllerAttachDetachEnabled, - err) - } - if err == nil { - glog.Infof("VerifyControllerAttachedVolume operation started for volume %q (spec.Name: %q) pod %q (UID: %q)", - volumeToMount.VolumeName, - volumeToMount.VolumeSpec.Name(), - volumeToMount.PodName, - volumeToMount.Pod.UID) - } - } else { - // Volume is not attached to node, kubelet attach is enabled, volume implements an attacher, - // so attach it - volumeToAttach := operationexecutor.VolumeToAttach{ - VolumeName: volumeToMount.VolumeName, - VolumeSpec: volumeToMount.VolumeSpec, - NodeName: rc.hostName, - } - glog.V(12).Infof("Attempting to start AttachVolume for volume %q (spec.Name: %q) pod %q (UID: %q)", - volumeToMount.VolumeName, - volumeToMount.VolumeSpec.Name(), - volumeToMount.PodName, - volumeToMount.Pod.UID) - err := rc.operationExecutor.AttachVolume(volumeToAttach, rc.actualStateOfWorld) - if err != nil && - !nestedpendingoperations.IsAlreadyExists(err) && - !exponentialbackoff.IsExponentialBackoff(err) { - // Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected. - // Log all other errors. - glog.Errorf( - "operationExecutor.AttachVolume failed for volume %q (spec.Name: %q) pod %q (UID: %q) controllerAttachDetachEnabled: %v with err: %v", - volumeToMount.VolumeName, - volumeToMount.VolumeSpec.Name(), - volumeToMount.PodName, - volumeToMount.Pod.UID, - rc.controllerAttachDetachEnabled, - err) - } - if err == nil { - glog.Infof("AttachVolume operation started for volume %q (spec.Name: %q) pod %q (UID: %q)", - volumeToMount.VolumeName, - volumeToMount.VolumeSpec.Name(), - volumeToMount.PodName, - volumeToMount.Pod.UID) - } } - } else if !volMounted || cache.IsRemountRequiredError(err) { - // Volume is not mounted, or is already mounted, but requires remounting - remountingLogStr := "" - if cache.IsRemountRequiredError(err) { - remountingLogStr = "Volume is already mounted to pod, but remount was requested." + } else { + // Volume is not attached to node, kubelet attach is enabled, volume implements an attacher, + // so attach it + volumeToAttach := operationexecutor.VolumeToAttach{ + VolumeName: volumeToMount.VolumeName, + VolumeSpec: volumeToMount.VolumeSpec, + NodeName: rc.hostName, } - glog.V(12).Infof("Attempting to start MountVolume for volume %q (spec.Name: %q) to pod %q (UID: %q). %s", + glog.V(12).Infof("Attempting to start AttachVolume for volume %q (spec.Name: %q) pod %q (UID: %q)", volumeToMount.VolumeName, volumeToMount.VolumeSpec.Name(), volumeToMount.PodName, - volumeToMount.Pod.UID, - remountingLogStr) - err := rc.operationExecutor.MountVolume( - rc.waitForAttachTimeout, - volumeToMount.VolumeToMount, - rc.actualStateOfWorld) + volumeToMount.Pod.UID) + err := rc.operationExecutor.AttachVolume(volumeToAttach, rc.actualStateOfWorld) if err != nil && !nestedpendingoperations.IsAlreadyExists(err) && !exponentialbackoff.IsExponentialBackoff(err) { // Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected. // Log all other errors. glog.Errorf( - "operationExecutor.MountVolume failed for volume %q (spec.Name: %q) pod %q (UID: %q) controllerAttachDetachEnabled: %v with err: %v", + "operationExecutor.AttachVolume failed for volume %q (spec.Name: %q) pod %q (UID: %q) controllerAttachDetachEnabled: %v with err: %v", volumeToMount.VolumeName, volumeToMount.VolumeSpec.Name(), volumeToMount.PodName, @@ -251,77 +253,306 @@ func (rc *reconciler) reconciliationLoopFunc() func() { err) } if err == nil { - glog.Infof("MountVolume operation started for volume %q (spec.Name: %q) to pod %q (UID: %q). %s", + glog.Infof("AttachVolume operation started for volume %q (spec.Name: %q) pod %q (UID: %q)", volumeToMount.VolumeName, volumeToMount.VolumeSpec.Name(), volumeToMount.PodName, - volumeToMount.Pod.UID, - remountingLogStr) + volumeToMount.Pod.UID) } } + } else if !volMounted || cache.IsRemountRequiredError(err) { + // Volume is not mounted, or is already mounted, but requires remounting + remountingLogStr := "" + if cache.IsRemountRequiredError(err) { + remountingLogStr = "Volume is already mounted to pod, but remount was requested." + } + glog.V(12).Infof("Attempting to start MountVolume for volume %q (spec.Name: %q) to pod %q (UID: %q). %s", + volumeToMount.VolumeName, + volumeToMount.VolumeSpec.Name(), + volumeToMount.PodName, + volumeToMount.Pod.UID, + remountingLogStr) + err := rc.operationExecutor.MountVolume( + rc.waitForAttachTimeout, + volumeToMount.VolumeToMount, + rc.actualStateOfWorld) + if err != nil && + !nestedpendingoperations.IsAlreadyExists(err) && + !exponentialbackoff.IsExponentialBackoff(err) { + // Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected. + // Log all other errors. + glog.Errorf( + "operationExecutor.MountVolume failed for volume %q (spec.Name: %q) pod %q (UID: %q) controllerAttachDetachEnabled: %v with err: %v", + volumeToMount.VolumeName, + volumeToMount.VolumeSpec.Name(), + volumeToMount.PodName, + volumeToMount.Pod.UID, + rc.controllerAttachDetachEnabled, + err) + } + if err == nil { + glog.Infof("MountVolume operation started for volume %q (spec.Name: %q) to pod %q (UID: %q). %s", + volumeToMount.VolumeName, + volumeToMount.VolumeSpec.Name(), + volumeToMount.PodName, + volumeToMount.Pod.UID, + remountingLogStr) + } } + } - // Ensure devices that should be detached/unmounted are detached/unmounted. - for _, attachedVolume := range rc.actualStateOfWorld.GetUnmountedVolumes() { - if !rc.desiredStateOfWorld.VolumeExists(attachedVolume.VolumeName) { - if attachedVolume.GloballyMounted { - // Volume is globally mounted to device, unmount it - glog.V(12).Infof("Attempting to start UnmountDevice for volume %q (spec.Name: %q)", + // Ensure devices that should be detached/unmounted are detached/unmounted. + for _, attachedVolume := range rc.actualStateOfWorld.GetUnmountedVolumes() { + if !rc.desiredStateOfWorld.VolumeExists(attachedVolume.VolumeName) { + if attachedVolume.GloballyMounted { + // Volume is globally mounted to device, unmount it + glog.V(12).Infof("Attempting to start UnmountDevice for volume %q (spec.Name: %q)", + attachedVolume.VolumeName, + attachedVolume.VolumeSpec.Name()) + err := rc.operationExecutor.UnmountDevice( + attachedVolume.AttachedVolume, rc.actualStateOfWorld, rc.mounter) + if err != nil && + !nestedpendingoperations.IsAlreadyExists(err) && + !exponentialbackoff.IsExponentialBackoff(err) { + // Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected. + // Log all other errors. + glog.Errorf( + "operationExecutor.UnmountDevice failed for volume %q (spec.Name: %q) controllerAttachDetachEnabled: %v with err: %v", + attachedVolume.VolumeName, + attachedVolume.VolumeSpec.Name(), + rc.controllerAttachDetachEnabled, + err) + } + if err == nil { + glog.Infof("UnmountDevice operation started for volume %q (spec.Name: %q)", + attachedVolume.VolumeName, + attachedVolume.VolumeSpec.Name()) + } + } else { + // Volume is attached to node, detach it + if rc.controllerAttachDetachEnabled || !attachedVolume.PluginIsAttachable { + // Kubelet not responsible for detaching or this volume has a non-attachable volume plugin, + // so just remove it to actualStateOfWorld without attach. + rc.actualStateOfWorld.MarkVolumeAsDetached( + attachedVolume.VolumeName, rc.hostName) + } else { + // Only detach if kubelet detach is enabled + glog.V(12).Infof("Attempting to start DetachVolume for volume %q (spec.Name: %q)", attachedVolume.VolumeName, attachedVolume.VolumeSpec.Name()) - err := rc.operationExecutor.UnmountDevice( - attachedVolume.AttachedVolume, rc.actualStateOfWorld, rc.mounter) + err := rc.operationExecutor.DetachVolume( + attachedVolume.AttachedVolume, false /* verifySafeToDetach */, rc.actualStateOfWorld) if err != nil && !nestedpendingoperations.IsAlreadyExists(err) && !exponentialbackoff.IsExponentialBackoff(err) { - // Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected. + // Ignore nestedpendingoperations.IsAlreadyExists && exponentialbackoff.IsExponentialBackoff errors, they are expected. // Log all other errors. glog.Errorf( - "operationExecutor.UnmountDevice failed for volume %q (spec.Name: %q) controllerAttachDetachEnabled: %v with err: %v", + "operationExecutor.DetachVolume failed for volume %q (spec.Name: %q) controllerAttachDetachEnabled: %v with err: %v", attachedVolume.VolumeName, attachedVolume.VolumeSpec.Name(), rc.controllerAttachDetachEnabled, err) } if err == nil { - glog.Infof("UnmountDevice operation started for volume %q (spec.Name: %q)", - attachedVolume.VolumeName, - attachedVolume.VolumeSpec.Name()) - } - } else { - // Volume is attached to node, detach it - if rc.controllerAttachDetachEnabled || !attachedVolume.PluginIsAttachable { - // Kubelet not responsible for detaching or this volume has a non-attachable volume plugin, - // so just remove it to actualStateOfWorld without attach. - rc.actualStateOfWorld.MarkVolumeAsDetached( - attachedVolume.VolumeName, rc.hostName) - } else { - // Only detach if kubelet detach is enabled - glog.V(12).Infof("Attempting to start DetachVolume for volume %q (spec.Name: %q)", + glog.Infof("DetachVolume operation started for volume %q (spec.Name: %q)", attachedVolume.VolumeName, attachedVolume.VolumeSpec.Name()) - err := rc.operationExecutor.DetachVolume( - attachedVolume.AttachedVolume, false /* verifySafeToDetach */, rc.actualStateOfWorld) - if err != nil && - !nestedpendingoperations.IsAlreadyExists(err) && - !exponentialbackoff.IsExponentialBackoff(err) { - // Ignore nestedpendingoperations.IsAlreadyExists && exponentialbackoff.IsExponentialBackoff errors, they are expected. - // Log all other errors. - glog.Errorf( - "operationExecutor.DetachVolume failed for volume %q (spec.Name: %q) controllerAttachDetachEnabled: %v with err: %v", - attachedVolume.VolumeName, - attachedVolume.VolumeSpec.Name(), - rc.controllerAttachDetachEnabled, - err) - } - if err == nil { - glog.Infof("DetachVolume operation started for volume %q (spec.Name: %q)", - attachedVolume.VolumeName, - attachedVolume.VolumeSpec.Name()) - } } } } } } } + +// reconstruct process tries to observe the real world by scanning all pods' volume directories from the disk. +// If the actual and desired state of worlds are not consistent with the observed world, it means that some +// mounted volumes are left out probably during kubelet restart. This process will reconstruct +// the volumes and udpate the actual and desired states. In the following reconciler loop, those volumes will +// be cleaned up. +func (rc *reconciler) reconstruct() { + defer rc.updateReconstructTime() + rc.reconstructStates(rc.kubeletPodsDir) +} + +func (rc *reconciler) updateReconstructTime() { + rc.timeOfLastReconstruct = time.Now() +} + +type podVolume struct { + podName volumetypes.UniquePodName + volumeSpecName string + mountPath string + pluginName string +} + +// reconstructFromDisk scans the volume directories under the given pod directory. If the volume is not +// in either actual or desired state of world, or pending operation, this function will reconstuct +// the volume spec and put it in both the actual and desired state of worlds. If no running +// container is mounting the volume, the volume will be removed by desired state of world's populator and +// cleaned up by the reconciler. +func (rc *reconciler) reconstructStates(podsDir string) { + // Get volumes information by reading the pod's directory + podVolumes, err := getVolumesFromPodDir(podsDir) + if err != nil { + glog.Errorf("Cannot get volumes from disk %v", err) + return + } + for _, volume := range podVolumes { + volumeToMount, err := rc.reconstructVolume(volume) + if err != nil { + glog.Errorf("Could not construct volume information: %v", err) + continue + } + + // Check if there is an pending operation for the given pod and volume. + // Need to check pending operation before checking the actual and desired + // states to avoid race condition during checking. For exmaple, the following + // might happen if pending operation is checked after checking actual and desired states. + // 1. Checking the pod and it does not exist in either actual or desired state. + // 2. An operation for the given pod finishes and the actual state is updated. + // 3. Checking and there is no pending operation for the given pod. + if rc.operationExecutor.IsOperationPending(volumeToMount.VolumeName, volumeToMount.PodName) { + continue + } + desiredPods := rc.desiredStateOfWorld.GetPods() + actualPods := rc.actualStateOfWorld.GetPods() + if desiredPods[volume.podName] || actualPods[volume.podName] { + continue + } + + glog.V(3).Infof( + "Could not find pod information in desired or actual states or pending operation, update it in both states: %+v", + volumeToMount) + if err = rc.updateStates(volumeToMount); err != nil { + glog.Errorf("Error occured during reconstruct volume from disk: %v", err) + } + } +} + +// Reconstruct Volume object and volumeToMount data structure by reading the pod's volume directories +func (rc *reconciler) reconstructVolume(volume podVolume) (*operationexecutor.VolumeToMount, error) { + plugin, err := rc.volumePluginMgr.FindPluginByName(volume.pluginName) + if err != nil { + return nil, err + } + volumeSpec, err := plugin.ConstructVolumeSpec(volume.volumeSpecName, volume.mountPath) + if err != nil { + return nil, err + } + volumeName, err := plugin.GetVolumeName(volumeSpec) + if err != nil { + return nil, err + } + pod := &api.Pod{ + ObjectMeta: api.ObjectMeta{ + UID: types.UID(volume.podName), + }, + } + attachablePlugin, err := rc.volumePluginMgr.FindAttachablePluginByName(volume.pluginName) + if err != nil { + return nil, err + } + var uniqueVolumeName api.UniqueVolumeName + if attachablePlugin != nil { + uniqueVolumeName = volumehelper.GetUniqueVolumeName(volume.pluginName, volumeName) + } else { + uniqueVolumeName = volumehelper.GetUniqueVolumeNameForNonAttachableVolume(volume.podName, plugin, volumeSpec) + } + + volumeToMount := &operationexecutor.VolumeToMount{ + VolumeName: uniqueVolumeName, + PodName: volume.podName, + VolumeSpec: volumeSpec, + OuterVolumeSpecName: volumeName, /*volumeName is InnerVolumeSpecName. But this information will not be used for cleanup*/ + Pod: pod, + PluginIsAttachable: attachablePlugin != nil, + VolumeGidValue: "", + DevicePath: "", + } + return volumeToMount, nil +} + +func (rc *reconciler) updateStates(volumeToMount *operationexecutor.VolumeToMount) error { + err := rc.actualStateOfWorld.MarkVolumeAsAttached( + volumeToMount.VolumeName, volumeToMount.VolumeSpec, "", volumeToMount.DevicePath) + if err != nil { + return fmt.Errorf("Could not add volume information to actual state of world: %v", err) + } + err = rc.actualStateOfWorld.AddPodToVolume( + volumeToMount.PodName, + types.UID(volumeToMount.PodName), + volumeToMount.VolumeName, + nil, + volumeToMount.OuterVolumeSpecName, + volumeToMount.DevicePath) + if err != nil { + return fmt.Errorf("Could not add pod to volume information to actual state of world: %v", err) + } + if volumeToMount.PluginIsAttachable { + err = rc.actualStateOfWorld.MarkDeviceAsMounted(volumeToMount.VolumeName) + if err != nil { + return fmt.Errorf("Could not mark device is mounted to actual state of world: %v", err) + } + } + _, err = rc.desiredStateOfWorld.AddPodToVolume(volumeToMount.PodName, + volumeToMount.Pod, + volumeToMount.VolumeSpec, + volumeToMount.OuterVolumeSpecName, + volumeToMount.VolumeGidValue) + if err != nil { + return fmt.Errorf("Could not add pod to volume information to desired state of world: %v", err) + } + return nil +} + +// getVolumesFromPodDir scans through the volumes directories under the given pod directory. +// It returns a list of pod volume information including pod's uid, volume's plugin name, mount path, +// and volume spec name. +func getVolumesFromPodDir(podDir string) ([]podVolume, error) { + podsDirInfo, err := ioutil.ReadDir(podDir) + if err != nil { + return nil, err + } + volumes := []podVolume{} + for i := range podsDirInfo { + if !podsDirInfo[i].IsDir() { + continue + } + podName := podsDirInfo[i].Name() + podDir := path.Join(podDir, podName) + volumesDir := path.Join(podDir, options.DefaultKubeletVolumesDirName) + volumesDirInfo, err := ioutil.ReadDir(volumesDir) + if err != nil { + glog.Errorf("Could not read volume directory %q: %v", volumesDir, err) + continue + } + for _, volumeDir := range volumesDirInfo { + pluginName := volumeDir.Name() + volumePluginPath := path.Join(volumesDir, pluginName) + + volumePluginDirs, err := ioutil.ReadDir(volumePluginPath) + if err != nil { + glog.Errorf("Could not read volume plugin directory %q: %v", volumePluginPath, err) + continue + } + + unescapePluginName := strings.UnescapeQualifiedNameForDisk(pluginName) + for _, volumeNameDir := range volumePluginDirs { + if volumeNameDir != nil { + volumeName := volumeNameDir.Name() + mountPath := path.Join(volumePluginPath, volumeName) + volumes = append(volumes, podVolume{ + podName: volumetypes.UniquePodName(podName), + volumeSpecName: volumeName, + mountPath: mountPath, + pluginName: unescapePluginName, + }) + } + } + + } + } + glog.V(10).Infof("Get volumes from pod directory %q %+v", podDir, volumes) + return volumes, nil +} diff --git a/pkg/kubelet/volumemanager/reconciler/reconciler_test.go b/pkg/kubelet/volumemanager/reconciler/reconciler_test.go index 58ab07ad525d6..aa9e61b7a1401 100644 --- a/pkg/kubelet/volumemanager/reconciler/reconciler_test.go +++ b/pkg/kubelet/volumemanager/reconciler/reconciler_test.go @@ -25,9 +25,11 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" "k8s.io/kubernetes/pkg/client/testing/core" + "k8s.io/kubernetes/pkg/kubelet/config" "k8s.io/kubernetes/pkg/kubelet/volumemanager/cache" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util/mount" + "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/volume" volumetesting "k8s.io/kubernetes/pkg/volume/testing" @@ -38,12 +40,13 @@ import ( const ( // reconcilerLoopSleepDuration is the amount of time the reconciler loop // waits between successive executions - reconcilerLoopSleepDuration time.Duration = 0 * time.Millisecond - + reconcilerLoopSleepDuration time.Duration = 0 * time.Millisecond + reconcilerReconstructSleepPeriod time.Duration = 10 * time.Minute // waitForAttachTimeout is the maximum amount of time a // operationexecutor.Mount call will wait for a volume to be attached. waitForAttachTimeout time.Duration = 1 * time.Second nodeName string = "myhostname" + kubeletPodsDir string = "fake-dir" ) // Calls Run() @@ -59,15 +62,18 @@ func Test_Run_Positive_DoNothing(t *testing.T) { kubeClient, false, /* controllerAttachDetachEnabled */ reconcilerLoopSleepDuration, + reconcilerReconstructSleepPeriod, waitForAttachTimeout, nodeName, dsw, asw, oex, - &mount.FakeMounter{}) + &mount.FakeMounter{}, + volumePluginMgr, + kubeletPodsDir) // Act - go reconciler.Run(wait.NeverStop) + runReconciler(reconciler) // Assert assert.NoError(t, volumetesting.VerifyZeroAttachCalls(fakePlugin)) @@ -92,12 +98,15 @@ func Test_Run_Positive_VolumeAttachAndMount(t *testing.T) { kubeClient, false, /* controllerAttachDetachEnabled */ reconcilerLoopSleepDuration, + reconcilerReconstructSleepPeriod, waitForAttachTimeout, nodeName, dsw, asw, oex, - &mount.FakeMounter{}) + &mount.FakeMounter{}, + volumePluginMgr, + kubeletPodsDir) pod := &api.Pod{ ObjectMeta: api.ObjectMeta{ Name: "pod1", @@ -128,9 +137,8 @@ func Test_Run_Positive_VolumeAttachAndMount(t *testing.T) { } // Act - go reconciler.Run(wait.NeverStop) + runReconciler(reconciler) waitForMount(t, fakePlugin, generatedVolumeName, asw) - // Assert assert.NoError(t, volumetesting.VerifyAttachCallCount( 1 /* expectedAttachCallCount */, fakePlugin)) @@ -160,12 +168,15 @@ func Test_Run_Positive_VolumeMountControllerAttachEnabled(t *testing.T) { kubeClient, true, /* controllerAttachDetachEnabled */ reconcilerLoopSleepDuration, + reconcilerReconstructSleepPeriod, waitForAttachTimeout, nodeName, dsw, asw, oex, - &mount.FakeMounter{}) + &mount.FakeMounter{}, + volumePluginMgr, + kubeletPodsDir) pod := &api.Pod{ ObjectMeta: api.ObjectMeta{ Name: "pod1", @@ -197,7 +208,7 @@ func Test_Run_Positive_VolumeMountControllerAttachEnabled(t *testing.T) { } // Act - go reconciler.Run(wait.NeverStop) + runReconciler(reconciler) waitForMount(t, fakePlugin, generatedVolumeName, asw) // Assert @@ -228,12 +239,15 @@ func Test_Run_Positive_VolumeAttachMountUnmountDetach(t *testing.T) { kubeClient, false, /* controllerAttachDetachEnabled */ reconcilerLoopSleepDuration, + reconcilerReconstructSleepPeriod, waitForAttachTimeout, nodeName, dsw, asw, oex, - &mount.FakeMounter{}) + &mount.FakeMounter{}, + volumePluginMgr, + kubeletPodsDir) pod := &api.Pod{ ObjectMeta: api.ObjectMeta{ Name: "pod1", @@ -264,9 +278,8 @@ func Test_Run_Positive_VolumeAttachMountUnmountDetach(t *testing.T) { } // Act - go reconciler.Run(wait.NeverStop) + runReconciler(reconciler) waitForMount(t, fakePlugin, generatedVolumeName, asw) - // Assert assert.NoError(t, volumetesting.VerifyAttachCallCount( 1 /* expectedAttachCallCount */, fakePlugin)) @@ -308,12 +321,15 @@ func Test_Run_Positive_VolumeUnmountControllerAttachEnabled(t *testing.T) { kubeClient, true, /* controllerAttachDetachEnabled */ reconcilerLoopSleepDuration, + reconcilerReconstructSleepPeriod, waitForAttachTimeout, nodeName, dsw, asw, oex, - &mount.FakeMounter{}) + &mount.FakeMounter{}, + volumePluginMgr, + kubeletPodsDir) pod := &api.Pod{ ObjectMeta: api.ObjectMeta{ Name: "pod1", @@ -344,7 +360,8 @@ func Test_Run_Positive_VolumeUnmountControllerAttachEnabled(t *testing.T) { } // Act - go reconciler.Run(wait.NeverStop) + runReconciler(reconciler) + dsw.MarkVolumesReportedInUse([]api.UniqueVolumeName{generatedVolumeName}) waitForMount(t, fakePlugin, generatedVolumeName, asw) @@ -445,3 +462,8 @@ func createTestClient() *fake.Clientset { }) return fakeClient } + +func runReconciler(reconciler Reconciler) { + sourcesReady := config.NewSourcesReady(func(_ sets.String) bool { return false }) + go reconciler.Run(sourcesReady, wait.NeverStop) +} diff --git a/pkg/kubelet/volumemanager/volume_manager.go b/pkg/kubelet/volumemanager/volume_manager.go index e4c6b2aa7dcc0..353887af68b2d 100644 --- a/pkg/kubelet/volumemanager/volume_manager.go +++ b/pkg/kubelet/volumemanager/volume_manager.go @@ -24,6 +24,7 @@ import ( "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + "k8s.io/kubernetes/pkg/kubelet/config" "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/pod" @@ -46,6 +47,10 @@ const ( // between successive executions reconcilerLoopSleepPeriod time.Duration = 100 * time.Millisecond + // reconcilerReconstructSleepPeriod is the amount of time the reconciler reconstruct process + // waits between successive executions + reconcilerReconstructSleepPeriod time.Duration = 3 * time.Minute + // desiredStateOfWorldPopulatorLoopSleepPeriod is the amount of time the // DesiredStateOfWorldPopulator loop waits between successive executions desiredStateOfWorldPopulatorLoopSleepPeriod time.Duration = 100 * time.Millisecond @@ -76,6 +81,10 @@ const ( // operation is waiting it only blocks other operations on the same device, // other devices are not affected. waitForAttachTimeout time.Duration = 10 * time.Minute + + // reconcilerStartGracePeriod is the maximum amount of time volume manager + // can wait to start reconciler + reconcilerStartGracePeriod time.Duration = 60 * time.Second ) // VolumeManager runs a set of asynchronous loops that figure out which volumes @@ -83,7 +92,7 @@ const ( // this node and makes it so. type VolumeManager interface { // Starts the volume manager and all the asynchronous loops that it controls - Run(stopCh <-chan struct{}) + Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) // WaitForAttachAndMount processes the volumes referenced in the specified // pod and blocks until they are all attached and mounted (reflected in @@ -138,7 +147,8 @@ func NewVolumeManager( kubeClient internalclientset.Interface, volumePluginMgr *volume.VolumePluginMgr, kubeContainerRuntime kubecontainer.Runtime, - mounter mount.Interface) (VolumeManager, error) { + mounter mount.Interface, + kubeletPodsDir string) (VolumeManager, error) { vm := &volumeManager{ kubeClient: kubeClient, volumePluginMgr: volumePluginMgr, @@ -153,12 +163,15 @@ func NewVolumeManager( kubeClient, controllerAttachDetachEnabled, reconcilerLoopSleepPeriod, + reconcilerReconstructSleepPeriod, waitForAttachTimeout, hostName, vm.desiredStateOfWorld, vm.actualStateOfWorld, vm.operationExecutor, - mounter) + mounter, + volumePluginMgr, + kubeletPodsDir) vm.desiredStateOfWorldPopulator = populator.NewDesiredStateOfWorldPopulator( kubeClient, desiredStateOfWorldPopulatorLoopSleepPeriod, @@ -208,12 +221,14 @@ type volumeManager struct { desiredStateOfWorldPopulator populator.DesiredStateOfWorldPopulator } -func (vm *volumeManager) Run(stopCh <-chan struct{}) { +func (vm *volumeManager) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) { defer runtime.HandleCrash() - glog.Infof("Starting Kubelet Volume Manager") - go vm.reconciler.Run(stopCh) go vm.desiredStateOfWorldPopulator.Run(stopCh) + glog.V(2).Infof("The desired_state_of_world populator starts") + + glog.Infof("Starting Kubelet Volume Manager") + go vm.reconciler.Run(sourcesReady, stopCh) <-stopCh glog.Infof("Shutting down Kubelet Volume Manager") diff --git a/pkg/kubelet/volumemanager/volume_manager_test.go b/pkg/kubelet/volumemanager/volume_manager_test.go index 105c914271090..db9a40c03c2c1 100644 --- a/pkg/kubelet/volumemanager/volume_manager_test.go +++ b/pkg/kubelet/volumemanager/volume_manager_test.go @@ -26,11 +26,13 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" + "k8s.io/kubernetes/pkg/kubelet/config" containertest "k8s.io/kubernetes/pkg/kubelet/container/testing" "k8s.io/kubernetes/pkg/kubelet/pod" kubepod "k8s.io/kubernetes/pkg/kubelet/pod" podtest "k8s.io/kubernetes/pkg/kubelet/pod/testing" "k8s.io/kubernetes/pkg/util/mount" + "k8s.io/kubernetes/pkg/util/sets" utiltesting "k8s.io/kubernetes/pkg/util/testing" "k8s.io/kubernetes/pkg/volume" volumetest "k8s.io/kubernetes/pkg/volume/testing" @@ -58,8 +60,7 @@ func TestGetMountedVolumesForPodAndGetVolumesInUse(t *testing.T) { t.Fatalf("Failed to initialize volume manager: %v", err) } - stopCh := make(chan struct{}) - go manager.Run(stopCh) + stopCh := runVolumeManager(manager) defer close(stopCh) podManager.SetPods([]*api.Pod{pod}) @@ -149,8 +150,10 @@ func TestGetExtraSupplementalGroupsForPod(t *testing.T) { continue } - stopCh := make(chan struct{}) - go manager.Run(stopCh) + stopCh := runVolumeManager(manager) + defer func() { + close(stopCh) + }() podManager.SetPods([]*api.Pod{pod}) @@ -170,8 +173,6 @@ func TestGetExtraSupplementalGroupsForPod(t *testing.T) { if !reflect.DeepEqual(tc.expected, actual) { t.Errorf("Expected supplemental groups %v, got %v", tc.expected, actual) } - - close(stopCh) } } @@ -190,7 +191,8 @@ func newTestVolumeManager( kubeClient, plugMgr, &containertest.FakeRuntime{}, - &mount.FakeMounter{}) + &mount.FakeMounter{}, + "") return vm, err } @@ -276,3 +278,12 @@ func simulateVolumeInUseUpdate( } } } + +func runVolumeManager(manager VolumeManager) chan struct{} { + stopCh := make(chan struct{}) + //readyCh := make(chan bool, 1) + //readyCh <- true + sourcesReady := config.NewSourcesReady(func(_ sets.String) bool { return true }) + go manager.Run(sourcesReady, stopCh) + return stopCh +} diff --git a/pkg/util/goroutinemap/goroutinemap.go b/pkg/util/goroutinemap/goroutinemap.go index c5f6884a0c9dc..913d066e59e88 100644 --- a/pkg/util/goroutinemap/goroutinemap.go +++ b/pkg/util/goroutinemap/goroutinemap.go @@ -58,6 +58,10 @@ type GoRoutineMap interface { // necessary during tests - the test should wait until all operations finish // and evaluate results after that. Wait() + + // IsOperationPending returns true if the operation is pending, otherwise + // returns false + IsOperationPending(operationName string) bool } // NewGoRoutineMap returns a new instance of GoRoutineMap. @@ -75,7 +79,7 @@ type goRoutineMap struct { operations map[string]operation exponentialBackOffOnError bool cond *sync.Cond - lock sync.Mutex + lock sync.RWMutex } type operation struct { @@ -150,6 +154,16 @@ func (grm *goRoutineMap) operationComplete( } } +func (grm *goRoutineMap) IsOperationPending(operationName string) bool { + grm.lock.RLock() + defer grm.lock.RUnlock() + existingOp, exists := grm.operations[operationName] + if exists && existingOp.operationPending { + return true + } + return false +} + func (grm *goRoutineMap) Wait() { grm.lock.Lock() defer grm.lock.Unlock() diff --git a/pkg/util/mount/fake.go b/pkg/util/mount/fake.go index 3a9706cf3cedd..60b001ba6c538 100644 --- a/pkg/util/mount/fake.go +++ b/pkg/util/mount/fake.go @@ -140,3 +140,7 @@ func (f *FakeMounter) DeviceOpened(pathname string) (bool, error) { func (f *FakeMounter) PathIsDevice(pathname string) (bool, error) { return true, nil } + +func (f *FakeMounter) GetDeviceNameFromMount(mountPath, pluginDir string) (string, error) { + return getDeviceNameFromMount(f, mountPath, pluginDir) +} diff --git a/pkg/util/mount/mount.go b/pkg/util/mount/mount.go index 69f39cf6943f2..25f55df9ea928 100644 --- a/pkg/util/mount/mount.go +++ b/pkg/util/mount/mount.go @@ -19,7 +19,10 @@ limitations under the License. package mount import ( + "fmt" + "path" "path/filepath" + "strings" "github.com/golang/glog" "k8s.io/kubernetes/pkg/util/exec" @@ -43,6 +46,9 @@ type Interface interface { DeviceOpened(pathname string) (bool, error) // PathIsDevice determines if a path is a device. PathIsDevice(pathname string) (bool, error) + // GetDeviceNameFromMount finds the device name by checking the mount path + // to get the global mount path which matches its plugin directory + GetDeviceNameFromMount(mountPath, pluginDir string) (string, error) } // This represents a single line in /proc/mounts or /etc/fstab. @@ -151,3 +157,25 @@ func GetDeviceNameFromMount(mounter Interface, mountPath string) (string, int, e } return device, refCount, nil } + +// getDeviceNameFromMount find the device name from /proc/mounts in which +// the mount path reference should match the given plugin directory. In case no mount path reference +// matches, returns the volume name taken from its given mountPath +func getDeviceNameFromMount(mounter Interface, mountPath, pluginDir string) (string, error) { + refs, err := GetMountRefs(mounter, mountPath) + if err != nil { + glog.V(4).Infof("GetMountRefs failed for mount path %q: %v", mountPath, err) + return "", err + } + if len(refs) == 0 { + glog.V(4).Infof("Directory %s is not mounted", mountPath) + return "", fmt.Errorf("directory %s is not mounted", mountPath) + } + for _, ref := range refs { + if strings.HasPrefix(ref, pluginDir) { + return path.Base(ref), nil + } + } + + return path.Base(mountPath), nil +} diff --git a/pkg/util/mount/mount_linux.go b/pkg/util/mount/mount_linux.go index 03a5ccae75598..5e556c3444e43 100644 --- a/pkg/util/mount/mount_linux.go +++ b/pkg/util/mount/mount_linux.go @@ -222,6 +222,11 @@ func pathIsDevice(pathname string) (bool, error) { return false, nil } +//GetDeviceNameFromMount: given a mount point, find the device name from its global mount point +func (mounter *Mounter) GetDeviceNameFromMount(mountPath, pluginDir string) (string, error) { + return getDeviceNameFromMount(mounter, mountPath, pluginDir) +} + func listProcMounts(mountFilePath string) ([]MountPoint, error) { hash1, err := readProcMounts(mountFilePath, nil) if err != nil { diff --git a/pkg/util/mount/nsenter_mount.go b/pkg/util/mount/nsenter_mount.go index ea2b945a7f256..8004ed3f288de 100644 --- a/pkg/util/mount/nsenter_mount.go +++ b/pkg/util/mount/nsenter_mount.go @@ -217,6 +217,11 @@ func (n *NsenterMounter) PathIsDevice(pathname string) (bool, error) { return pathIsDevice(pathname) } +//GetDeviceNameFromMount given a mount point, find the volume id from checking /proc/mounts +func (n *NsenterMounter) GetDeviceNameFromMount(mountPath, pluginDir string) (string, error) { + return getDeviceNameFromMount(n, mountPath, pluginDir) +} + func (n *NsenterMounter) absHostPath(command string) string { path, ok := n.paths[command] if !ok { diff --git a/pkg/util/mount/nsenter_mount_unsupported.go b/pkg/util/mount/nsenter_mount_unsupported.go index f40f73ab2d219..dcf19edefd264 100644 --- a/pkg/util/mount/nsenter_mount_unsupported.go +++ b/pkg/util/mount/nsenter_mount_unsupported.go @@ -49,3 +49,7 @@ func (*NsenterMounter) DeviceOpened(pathname string) (bool, error) { func (*NsenterMounter) PathIsDevice(pathname string) (bool, error) { return true, nil } + +func (*NsenterMounter) GetDeviceNameFromMount(mountPath, pluginDir string) (string, error) { + return "", nil +} diff --git a/pkg/volume/aws_ebs/attacher.go b/pkg/volume/aws_ebs/attacher.go index e4a710e08f8b0..2a37527f4ed23 100644 --- a/pkg/volume/aws_ebs/attacher.go +++ b/pkg/volume/aws_ebs/attacher.go @@ -51,6 +51,11 @@ func (plugin *awsElasticBlockStorePlugin) NewAttacher() (volume.Attacher, error) }, nil } +func (plugin *awsElasticBlockStorePlugin) GetDeviceMountRefs(deviceMountPath string) ([]string, error) { + mounter := plugin.host.GetMounter() + return mount.GetMountRefs(mounter, deviceMountPath) +} + func (attacher *awsElasticBlockStoreAttacher) Attach(spec *volume.Spec, hostName string) (string, error) { volumeSource, readOnly, err := getVolumeSource(spec) if err != nil { diff --git a/pkg/volume/aws_ebs/aws_ebs.go b/pkg/volume/aws_ebs/aws_ebs.go index b2831da3a577a..e60a7c867502b 100644 --- a/pkg/volume/aws_ebs/aws_ebs.go +++ b/pkg/volume/aws_ebs/aws_ebs.go @@ -188,6 +188,24 @@ func getVolumeSource( return nil, false, fmt.Errorf("Spec does not reference an AWS EBS volume type") } +func (plugin *awsElasticBlockStorePlugin) ConstructVolumeSpec(volName, mountPath string) (*volume.Spec, error) { + mounter := plugin.host.GetMounter() + pluginDir := plugin.host.GetPluginDir(plugin.GetPluginName()) + sourceName, err := mounter.GetDeviceNameFromMount(mountPath, pluginDir) + if err != nil { + return nil, err + } + awsVolume := &api.Volume{ + Name: volName, + VolumeSource: api.VolumeSource{ + AWSElasticBlockStore: &api.AWSElasticBlockStoreVolumeSource{ + VolumeID: sourceName, + }, + }, + } + return volume.NewSpecFromVolume(awsVolume), nil +} + // Abstract interface to PD operations. type ebsManager interface { CreateVolume(provisioner *awsElasticBlockStoreProvisioner) (volumeID string, volumeSizeGB int, labels map[string]string, err error) diff --git a/pkg/volume/azure_file/azure_file.go b/pkg/volume/azure_file/azure_file.go index 9201abb16ac8f..7511dcbe35704 100644 --- a/pkg/volume/azure_file/azure_file.go +++ b/pkg/volume/azure_file/azure_file.go @@ -124,6 +124,19 @@ func (plugin *azureFilePlugin) newUnmounterInternal(volName string, podUID types }}, nil } +func (plugin *azureFilePlugin) ConstructVolumeSpec(volName, mountPath string) (*volume.Spec, error) { + azureVolume := &api.Volume{ + Name: volName, + VolumeSource: api.VolumeSource{ + AzureFile: &api.AzureFileVolumeSource{ + SecretName: volName, + ShareName: volName, + }, + }, + } + return volume.NewSpecFromVolume(azureVolume), nil +} + // azureFile volumes represent mount of an AzureFile share. type azureFile struct { volName string diff --git a/pkg/volume/cephfs/cephfs.go b/pkg/volume/cephfs/cephfs.go index 18574ee9aeece..b42b3a07eaf3e 100644 --- a/pkg/volume/cephfs/cephfs.go +++ b/pkg/volume/cephfs/cephfs.go @@ -154,6 +154,19 @@ func (plugin *cephfsPlugin) newUnmounterInternal(volName string, podUID types.UI }, nil } +func (plugin *cephfsPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) { + cephfsVolume := &api.Volume{ + Name: volumeName, + VolumeSource: api.VolumeSource{ + CephFS: &api.CephFSVolumeSource{ + Monitors: []string{}, + Path: volumeName, + }, + }, + } + return volume.NewSpecFromVolume(cephfsVolume), nil +} + // CephFS volumes represent a bare host file or directory mount of an CephFS export. type cephfs struct { volName string diff --git a/pkg/volume/cinder/attacher.go b/pkg/volume/cinder/attacher.go index ecf11e1c74920..a35894c12a4f6 100644 --- a/pkg/volume/cinder/attacher.go +++ b/pkg/volume/cinder/attacher.go @@ -53,6 +53,11 @@ func (plugin *cinderPlugin) NewAttacher() (volume.Attacher, error) { }, nil } +func (plugin *cinderPlugin) GetDeviceMountRefs(deviceMountPath string) ([]string, error) { + mounter := plugin.host.GetMounter() + return mount.GetMountRefs(mounter, deviceMountPath) +} + func (attacher *cinderDiskAttacher) Attach(spec *volume.Spec, hostName string) (string, error) { volumeSource, _, err := getVolumeSource(spec) if err != nil { diff --git a/pkg/volume/cinder/cinder.go b/pkg/volume/cinder/cinder.go index 75e8a6d7352ff..24b9d073a1984 100644 --- a/pkg/volume/cinder/cinder.go +++ b/pkg/volume/cinder/cinder.go @@ -204,6 +204,25 @@ func (plugin *cinderPlugin) getCloudProvider() (CinderProvider, error) { } } +func (plugin *cinderPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) { + mounter := plugin.host.GetMounter() + pluginDir := plugin.host.GetPluginDir(plugin.GetPluginName()) + sourceName, err := mounter.GetDeviceNameFromMount(mountPath, pluginDir) + if err != nil { + return nil, err + } + glog.V(4).Infof("Found volume %s mounted to %s", sourceName, mountPath) + cinderVolume := &api.Volume{ + Name: volumeName, + VolumeSource: api.VolumeSource{ + Cinder: &api.CinderVolumeSource{ + VolumeID: sourceName, + }, + }, + } + return volume.NewSpecFromVolume(cinderVolume), nil +} + // Abstract interface to PD operations. type cdManager interface { // Attaches the disk to the kubelet's host machine. diff --git a/pkg/volume/configmap/configmap.go b/pkg/volume/configmap/configmap.go index 98c00c44ed205..6451951f765a3 100644 --- a/pkg/volume/configmap/configmap.go +++ b/pkg/volume/configmap/configmap.go @@ -86,6 +86,16 @@ func (plugin *configMapPlugin) NewUnmounter(volName string, podUID types.UID) (v return &configMapVolumeUnmounter{&configMapVolume{volName, podUID, plugin, plugin.host.GetMounter(), plugin.host.GetWriter(), volume.MetricsNil{}}}, nil } +func (plugin *configMapPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) { + configMapVolume := &api.Volume{ + Name: volumeName, + VolumeSource: api.VolumeSource{ + ConfigMap: &api.ConfigMapVolumeSource{}, + }, + } + return volume.NewSpecFromVolume(configMapVolume), nil +} + type configMapVolume struct { volName string podUID types.UID diff --git a/pkg/volume/downwardapi/downwardapi.go b/pkg/volume/downwardapi/downwardapi.go index 184b2c5859d74..970dee86ee441 100644 --- a/pkg/volume/downwardapi/downwardapi.go +++ b/pkg/volume/downwardapi/downwardapi.go @@ -106,6 +106,16 @@ func (plugin *downwardAPIPlugin) NewUnmounter(volName string, podUID types.UID) }, nil } +func (plugin *downwardAPIPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) { + downwardAPIVolume := &api.Volume{ + Name: volumeName, + VolumeSource: api.VolumeSource{ + DownwardAPI: &api.DownwardAPIVolumeSource{}, + }, + } + return volume.NewSpecFromVolume(downwardAPIVolume), nil +} + // downwardAPIVolume retrieves downward API data and placing them into the volume on the host. type downwardAPIVolume struct { volName string diff --git a/pkg/volume/empty_dir/empty_dir.go b/pkg/volume/empty_dir/empty_dir.go index 0653b2556ffd1..2a545945d0373 100644 --- a/pkg/volume/empty_dir/empty_dir.go +++ b/pkg/volume/empty_dir/empty_dir.go @@ -128,6 +128,16 @@ func (plugin *emptyDirPlugin) newUnmounterInternal(volName string, podUID types. return ed, nil } +func (plugin *emptyDirPlugin) ConstructVolumeSpec(volName, mountPath string) (*volume.Spec, error) { + emptyDirVolume := &api.Volume{ + Name: volName, + VolumeSource: api.VolumeSource{ + EmptyDir: &api.EmptyDirVolumeSource{}, + }, + } + return volume.NewSpecFromVolume(emptyDirVolume), nil +} + // mountDetector abstracts how to find what kind of mount a path is backed by. type mountDetector interface { // GetMountMedium determines what type of medium a given path is backed diff --git a/pkg/volume/fc/fc.go b/pkg/volume/fc/fc.go index 96e8518794920..98abdc8dd038b 100644 --- a/pkg/volume/fc/fc.go +++ b/pkg/volume/fc/fc.go @@ -141,6 +141,16 @@ func (plugin *fcPlugin) execCommand(command string, args []string) ([]byte, erro return cmd.CombinedOutput() } +func (plugin *fcPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) { + fcVolume := &api.Volume{ + Name: volumeName, + VolumeSource: api.VolumeSource{ + FC: &api.FCVolumeSource{}, + }, + } + return volume.NewSpecFromVolume(fcVolume), nil +} + type fcDisk struct { volName string podUID types.UID diff --git a/pkg/volume/flexvolume/flexvolume.go b/pkg/volume/flexvolume/flexvolume.go index eb4642ca7c00c..30fd59681d6b0 100644 --- a/pkg/volume/flexvolume/flexvolume.go +++ b/pkg/volume/flexvolume/flexvolume.go @@ -179,6 +179,18 @@ func (plugin *flexVolumePlugin) newUnmounterInternal(volName string, podUID type }, nil } +func (plugin *flexVolumePlugin) ConstructVolumeSpec(volumeName, sourceName string) (*volume.Spec, error) { + flexVolume := &api.Volume{ + Name: volumeName, + VolumeSource: api.VolumeSource{ + FlexVolume: &api.FlexVolumeSource{ + Driver: sourceName, + }, + }, + } + return volume.NewSpecFromVolume(flexVolume), nil +} + // flexVolume is the disk resource provided by this plugin. type flexVolumeDisk struct { // podUID is the UID of the pod. diff --git a/pkg/volume/flocker/plugin.go b/pkg/volume/flocker/plugin.go index 9bdf9bc3e689f..8cf3193723532 100644 --- a/pkg/volume/flocker/plugin.go +++ b/pkg/volume/flocker/plugin.go @@ -117,6 +117,18 @@ func (p *flockerPlugin) NewUnmounter(datasetName string, podUID types.UID) (volu return nil, nil } +func (p *flockerPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) { + flockerVolume := &api.Volume{ + Name: volumeName, + VolumeSource: api.VolumeSource{ + Flocker: &api.FlockerVolumeSource{ + DatasetName: volumeName, + }, + }, + } + return volume.NewSpecFromVolume(flockerVolume), nil +} + type flockerMounter struct { *flocker client flockerclient.Clientable diff --git a/pkg/volume/gce_pd/attacher.go b/pkg/volume/gce_pd/attacher.go index c60a02aeaeeb4..d3e78cd978d9b 100644 --- a/pkg/volume/gce_pd/attacher.go +++ b/pkg/volume/gce_pd/attacher.go @@ -53,6 +53,11 @@ func (plugin *gcePersistentDiskPlugin) NewAttacher() (volume.Attacher, error) { }, nil } +func (plugin *gcePersistentDiskPlugin) GetDeviceMountRefs(deviceMountPath string) ([]string, error) { + mounter := plugin.host.GetMounter() + return mount.GetMountRefs(mounter, deviceMountPath) +} + // Attach checks with the GCE cloud provider if the specified volume is already // attached to the specified node. If the volume is attached, it succeeds // (returns nil). If it is not, Attach issues a call to the GCE cloud provider diff --git a/pkg/volume/gce_pd/gce_pd.go b/pkg/volume/gce_pd/gce_pd.go index a3d57bd0e0c4a..cc0431e1c7e14 100644 --- a/pkg/volume/gce_pd/gce_pd.go +++ b/pkg/volume/gce_pd/gce_pd.go @@ -182,6 +182,24 @@ func (plugin *gcePersistentDiskPlugin) newProvisionerInternal(options volume.Vol }, nil } +func (plugin *gcePersistentDiskPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) { + mounter := plugin.host.GetMounter() + pluginDir := plugin.host.GetPluginDir(plugin.GetPluginName()) + sourceName, err := mounter.GetDeviceNameFromMount(mountPath, pluginDir) + if err != nil { + return nil, err + } + gceVolume := &api.Volume{ + Name: volumeName, + VolumeSource: api.VolumeSource{ + GCEPersistentDisk: &api.GCEPersistentDiskVolumeSource{ + PDName: sourceName, + }, + }, + } + return volume.NewSpecFromVolume(gceVolume), nil +} + // Abstract interface to PD operations. type pdManager interface { // Creates a volume diff --git a/pkg/volume/git_repo/git_repo.go b/pkg/volume/git_repo/git_repo.go index 9d2c2ca3a83e4..83aa0d95ac63d 100644 --- a/pkg/volume/git_repo/git_repo.go +++ b/pkg/volume/git_repo/git_repo.go @@ -107,6 +107,16 @@ func (plugin *gitRepoPlugin) NewUnmounter(volName string, podUID types.UID) (vol }, nil } +func (plugin *gitRepoPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) { + gitVolume := &api.Volume{ + Name: volumeName, + VolumeSource: api.VolumeSource{ + GitRepo: &api.GitRepoVolumeSource{}, + }, + } + return volume.NewSpecFromVolume(gitVolume), nil +} + // gitRepo volumes are directories which are pre-filled from a git repository. // These do not persist beyond the lifetime of a pod. type gitRepoVolume struct { diff --git a/pkg/volume/glusterfs/glusterfs.go b/pkg/volume/glusterfs/glusterfs.go index bbaea800aff3e..f4ccce0bf9797 100644 --- a/pkg/volume/glusterfs/glusterfs.go +++ b/pkg/volume/glusterfs/glusterfs.go @@ -145,6 +145,19 @@ func (plugin *glusterfsPlugin) execCommand(command string, args []string) ([]byt return cmd.CombinedOutput() } +func (plugin *glusterfsPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) { + glusterfsVolume := &api.Volume{ + Name: volumeName, + VolumeSource: api.VolumeSource{ + Glusterfs: &api.GlusterfsVolumeSource{ + EndpointsName: volumeName, + Path: volumeName, + }, + }, + } + return volume.NewSpecFromVolume(glusterfsVolume), nil +} + // Glusterfs volumes represent a bare host file or directory mount of an Glusterfs export. type glusterfs struct { volName string diff --git a/pkg/volume/host_path/host_path.go b/pkg/volume/host_path/host_path.go index 466f92f7a55b5..a29d4e6e61f69 100644 --- a/pkg/volume/host_path/host_path.go +++ b/pkg/volume/host_path/host_path.go @@ -138,6 +138,18 @@ func (plugin *hostPathPlugin) NewProvisioner(options volume.VolumeOptions) (volu return plugin.newProvisionerFunc(options, plugin.host) } +func (plugin *hostPathPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) { + hostPathVolume := &api.Volume{ + Name: volumeName, + VolumeSource: api.VolumeSource{ + HostPath: &api.HostPathVolumeSource{ + Path: volumeName, + }, + }, + } + return volume.NewSpecFromVolume(hostPathVolume), nil +} + func newRecycler(pvName string, spec *volume.Spec, host volume.VolumeHost, config volume.VolumeConfig) (volume.Recycler, error) { if spec.PersistentVolume == nil || spec.PersistentVolume.Spec.HostPath == nil { return nil, fmt.Errorf("spec.PersistentVolumeSource.HostPath is nil") diff --git a/pkg/volume/iscsi/iscsi.go b/pkg/volume/iscsi/iscsi.go index 9ca787c183c7d..248605dc8752f 100644 --- a/pkg/volume/iscsi/iscsi.go +++ b/pkg/volume/iscsi/iscsi.go @@ -146,6 +146,19 @@ func (plugin *iscsiPlugin) execCommand(command string, args []string) ([]byte, e return cmd.CombinedOutput() } +func (plugin *iscsiPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) { + iscsiVolume := &api.Volume{ + Name: volumeName, + VolumeSource: api.VolumeSource{ + ISCSI: &api.ISCSIVolumeSource{ + TargetPortal: volumeName, + IQN: volumeName, + }, + }, + } + return volume.NewSpecFromVolume(iscsiVolume), nil +} + type iscsiDisk struct { volName string podUID types.UID diff --git a/pkg/volume/nfs/nfs.go b/pkg/volume/nfs/nfs.go index 463addc6b75d8..4ca95781b6e84 100644 --- a/pkg/volume/nfs/nfs.go +++ b/pkg/volume/nfs/nfs.go @@ -136,6 +136,18 @@ func (plugin *nfsPlugin) NewRecycler(pvName string, spec *volume.Spec) (volume.R return plugin.newRecyclerFunc(pvName, spec, plugin.host, plugin.config) } +func (plugin *nfsPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) { + nfsVolume := &api.Volume{ + Name: volumeName, + VolumeSource: api.VolumeSource{ + NFS: &api.NFSVolumeSource{ + Path: volumeName, + }, + }, + } + return volume.NewSpecFromVolume(nfsVolume), nil +} + // NFS volumes represent a bare host file or directory mount of an NFS export. type nfs struct { volName string diff --git a/pkg/volume/plugins.go b/pkg/volume/plugins.go index 0ab0a73d983ab..0ff8939399469 100644 --- a/pkg/volume/plugins.go +++ b/pkg/volume/plugins.go @@ -98,6 +98,12 @@ type VolumePlugin interface { // - name: The volume name, as per the api.Volume spec. // - podUID: The UID of the enclosing pod NewUnmounter(name string, podUID types.UID) (Unmounter, error) + + // ConstructVolumeSpec constructs a volume spec based on the given volume name + // and mountPath. The spec may have incomplete information due to limited + // information from input. This function is used by volume manager to reconstruct + // volume spec by reading the volume directories from disk + ConstructVolumeSpec(volumeName, mountPath string) (*Spec, error) } // PersistentVolumePlugin is an extended interface of VolumePlugin and is used @@ -151,6 +157,7 @@ type AttachableVolumePlugin interface { VolumePlugin NewAttacher() (Attacher, error) NewDetacher() (Detacher, error) + GetDeviceMountRefs(deviceMountPath string) ([]string, error) } // VolumeHost is an interface that plugins can use to access the kubelet. diff --git a/pkg/volume/rbd/rbd.go b/pkg/volume/rbd/rbd.go index 94893cd7802bf..bfc852b17ef79 100644 --- a/pkg/volume/rbd/rbd.go +++ b/pkg/volume/rbd/rbd.go @@ -165,6 +165,18 @@ func (plugin *rbdPlugin) newUnmounterInternal(volName string, podUID types.UID, }, nil } +func (plugin *rbdPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) { + rbdVolume := &api.Volume{ + Name: volumeName, + VolumeSource: api.VolumeSource{ + RBD: &api.RBDVolumeSource{ + CephMonitors: []string{}, + }, + }, + } + return volume.NewSpecFromVolume(rbdVolume), nil +} + type rbd struct { volName string podUID types.UID diff --git a/pkg/volume/secret/secret.go b/pkg/volume/secret/secret.go index 0a397fa552ac7..9ca6911690ab8 100644 --- a/pkg/volume/secret/secret.go +++ b/pkg/volume/secret/secret.go @@ -110,6 +110,18 @@ func (plugin *secretPlugin) NewUnmounter(volName string, podUID types.UID) (volu }, nil } +func (plugin *secretPlugin) ConstructVolumeSpec(volName, mountPath string) (*volume.Spec, error) { + secretVolume := &api.Volume{ + Name: volName, + VolumeSource: api.VolumeSource{ + Secret: &api.SecretVolumeSource{ + SecretName: volName, + }, + }, + } + return volume.NewSpecFromVolume(secretVolume), nil +} + type secretVolume struct { volName string podUID types.UID diff --git a/pkg/volume/testing/testing.go b/pkg/volume/testing/testing.go index 8e7626095394d..26b56ae8e0dcd 100644 --- a/pkg/volume/testing/testing.go +++ b/pkg/volume/testing/testing.go @@ -288,6 +288,14 @@ func (plugin *FakeVolumePlugin) GetAccessModes() []api.PersistentVolumeAccessMod return []api.PersistentVolumeAccessMode{} } +func (plugin *FakeVolumePlugin) ConstructVolumeSpec(volumeName, mountPath string) (*Spec, error) { + return nil, nil +} + +func (plugin *FakeVolumePlugin) GetDeviceMountRefs(deviceMountPath string) ([]string, error) { + return []string{}, nil +} + type FakeVolume struct { sync.RWMutex PodUID types.UID diff --git a/pkg/volume/util/nestedpendingoperations/nestedpendingoperations.go b/pkg/volume/util/nestedpendingoperations/nestedpendingoperations.go index 9b3743f7bf58e..266b6cdf054bf 100644 --- a/pkg/volume/util/nestedpendingoperations/nestedpendingoperations.go +++ b/pkg/volume/util/nestedpendingoperations/nestedpendingoperations.go @@ -58,6 +58,10 @@ type NestedPendingOperations interface { // necessary during tests - the test should wait until all operations finish // and evaluate results after that. Wait() + + // IsOperationPending returns true if an operation for the given volumeName and podName is pending, + // otherwise it returns false + IsOperationPending(volumeName api.UniqueVolumeName, podName types.UniquePodName) bool } // NewNestedPendingOperations returns a new instance of NestedPendingOperations. @@ -74,7 +78,7 @@ type nestedPendingOperations struct { operations []operation exponentialBackOffOnError bool cond *sync.Cond - lock sync.Mutex + lock sync.RWMutex } type operation struct { @@ -90,29 +94,9 @@ func (grm *nestedPendingOperations) Run( operationFunc func() error) error { grm.lock.Lock() defer grm.lock.Unlock() - - var previousOp operation - opExists := false - previousOpIndex := -1 - for previousOpIndex, previousOp = range grm.operations { - if previousOp.volumeName != volumeName { - // No match, keep searching - continue - } - - if previousOp.podName != emptyUniquePodName && - podName != emptyUniquePodName && - previousOp.podName != podName { - // No match, keep searching - continue - } - - // Match - opExists = true - break - } - + opExists, previousOpIndex := grm.isOperationExists(volumeName, podName) if opExists { + previousOp := grm.operations[previousOpIndex] // Operation already exists if previousOp.operationPending { // Operation is pending @@ -153,6 +137,43 @@ func (grm *nestedPendingOperations) Run( return nil } +func (grm *nestedPendingOperations) IsOperationPending( + volumeName api.UniqueVolumeName, + podName types.UniquePodName) bool { + + grm.lock.RLock() + defer grm.lock.RUnlock() + + exist, previousOpIndex := grm.isOperationExists(volumeName, podName) + if exist && grm.operations[previousOpIndex].operationPending { + return true + } + return false +} + +func (grm *nestedPendingOperations) isOperationExists( + volumeName api.UniqueVolumeName, + podName types.UniquePodName) (bool, int) { + + for previousOpIndex, previousOp := range grm.operations { + if previousOp.volumeName != volumeName { + // No match, keep searching + continue + } + + if previousOp.podName != emptyUniquePodName && + podName != emptyUniquePodName && + previousOp.podName != podName { + // No match, keep searching + continue + } + + // Match + return true, previousOpIndex + } + return false, -1 +} + func (grm *nestedPendingOperations) getOperation( volumeName api.UniqueVolumeName, podName types.UniquePodName) (uint, error) { diff --git a/pkg/volume/util/operationexecutor/operation_executor.go b/pkg/volume/util/operationexecutor/operation_executor.go index 28c1e57d750a5..0f055f33b9eee 100644 --- a/pkg/volume/util/operationexecutor/operation_executor.go +++ b/pkg/volume/util/operationexecutor/operation_executor.go @@ -99,6 +99,10 @@ type OperationExecutor interface { // object, for example) then an error is returned which triggers exponential // back off on retries. VerifyControllerAttachedVolume(volumeToMount VolumeToMount, nodeName string, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error + + // IsOperationPending returns true if an operation for the given volumeName and podName is pending, + // otherwise it returns false + IsOperationPending(volumeName api.UniqueVolumeName, podName volumetypes.UniquePodName) bool } // NewOperationExecutor returns a new instance of OperationExecutor. @@ -339,6 +343,10 @@ type operationExecutor struct { pendingOperations nestedpendingoperations.NestedPendingOperations } +func (oe *operationExecutor) IsOperationPending(volumeName api.UniqueVolumeName, podName volumetypes.UniquePodName) bool { + return oe.pendingOperations.IsOperationPending(volumeName, podName) +} + func (oe *operationExecutor) AttachVolume( volumeToAttach VolumeToAttach, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error { @@ -391,6 +399,7 @@ func (oe *operationExecutor) MountVolume( func (oe *operationExecutor) UnmountVolume( volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) error { + unmountFunc, err := oe.generateUnmountVolumeFunc(volumeToUnmount, actualStateOfWorld) if err != nil { @@ -811,11 +820,14 @@ func (oe *operationExecutor) generateUnmountVolumeFunc( } glog.Infof( - "UnmountVolume.TearDown succeeded for volume %q (volume.spec.Name: %q) pod %q (UID: %q).", + "UnmountVolume.TearDown succeeded for volume %q (OuterVolumeSpecName: %q) pod %q (UID: %q). InnerVolumeSpecName %q. PluginName %q, VolumeGidValue %q", volumeToUnmount.VolumeName, volumeToUnmount.OuterVolumeSpecName, volumeToUnmount.PodName, - volumeToUnmount.PodUID) + volumeToUnmount.PodUID, + volumeToUnmount.InnerVolumeSpecName, + volumeToUnmount.PluginName, + volumeToUnmount.VolumeGidValue) // Update actual state of world markVolMountedErr := actualStateOfWorld.MarkVolumeAsUnmounted( @@ -879,7 +891,17 @@ func (oe *operationExecutor) generateUnmountDeviceFunc( deviceToDetach.VolumeSpec.Name(), err) } - + refs, err := attachableVolumePlugin.GetDeviceMountRefs(deviceMountPath) + if err != nil || len(refs) > 0 { + if err == nil { + err = fmt.Errorf("The device mount path %q is still mounted by other references %v", deviceMountPath, refs) + } + return fmt.Errorf( + "GetDeviceMountRefs check failed for volume %q (spec.Name: %q) with: %v", + deviceToDetach.VolumeName, + deviceToDetach.VolumeSpec.Name(), + err) + } // Execute unmount unmountDeviceErr := volumeDetacher.UnmountDevice(deviceMountPath) if unmountDeviceErr != nil { diff --git a/pkg/volume/util/volumehelper/volumehelper.go b/pkg/volume/util/volumehelper/volumehelper.go index 2ab765e3cd950..386daa1f3f6a2 100644 --- a/pkg/volume/util/volumehelper/volumehelper.go +++ b/pkg/volume/util/volumehelper/volumehelper.go @@ -54,8 +54,10 @@ func GetUniqueVolumeName(pluginName, volumeName string) api.UniqueVolumeName { // GetUniqueVolumeNameForNonAttachableVolume returns the unique volume name // for a non-attachable volume. -func GetUniqueVolumeNameForNonAttachableVolume(podName types.UniquePodName, volumePlugin volume.VolumePlugin, podSpecName string) api.UniqueVolumeName { - return api.UniqueVolumeName(fmt.Sprintf("%s/%v-%s", volumePlugin.GetPluginName(), podName, podSpecName)) +func GetUniqueVolumeNameForNonAttachableVolume( + podName types.UniquePodName, volumePlugin volume.VolumePlugin, volumeSpec *volume.Spec) api.UniqueVolumeName { + return api.UniqueVolumeName( + fmt.Sprintf("%s/%v-%s", volumePlugin.GetPluginName(), podName, volumeSpec.Name())) } // GetUniqueVolumeNameFromSpec uses the given VolumePlugin to generate a unique diff --git a/pkg/volume/vsphere_volume/vsphere_volume.go b/pkg/volume/vsphere_volume/vsphere_volume.go index 62f26c56a339b..31b337d047f6f 100644 --- a/pkg/volume/vsphere_volume/vsphere_volume.go +++ b/pkg/volume/vsphere_volume/vsphere_volume.go @@ -135,6 +135,18 @@ func (plugin *vsphereVolumePlugin) getCloudProvider() (*vsphere.VSphere, error) return vs, nil } +func (plugin *vsphereVolumePlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) { + vsphereVolume := &api.Volume{ + Name: volumeName, + VolumeSource: api.VolumeSource{ + VsphereVolume: &api.VsphereVirtualDiskVolumeSource{ + VolumePath: volumeName, + }, + }, + } + return volume.NewSpecFromVolume(vsphereVolume), nil +} + // Abstract interface to disk operations. type vdManager interface { // Attaches the disk to the kubelet's host machine.