diff --git a/cmd/kubelet/app/options/options.go b/cmd/kubelet/app/options/options.go index 8eb1ee5788819..22b355bcdf09a 100644 --- a/cmd/kubelet/app/options/options.go +++ b/cmd/kubelet/app/options/options.go @@ -29,6 +29,13 @@ import ( "github.com/spf13/pflag" ) +const ( + DefaultKubeletPodsDirName = "pods" + DefaultKubeletVolumesDirName = "volumes" + DefaultKubeletPluginsDirName = "plugins" + DefaultKubeletContainersDirName = "containers" +) + // KubeletServer encapsulates all of the parameters necessary for starting up // a kubelet. These can either be set via command line or directly. type KubeletServer struct { diff --git a/pkg/controller/volume/persistentvolume/framework_test.go b/pkg/controller/volume/persistentvolume/framework_test.go index 68354dd8d01f1..609548965e16d 100644 --- a/pkg/controller/volume/persistentvolume/framework_test.go +++ b/pkg/controller/volume/persistentvolume/framework_test.go @@ -1056,6 +1056,10 @@ func (plugin *mockVolumePlugin) RequiresRemount() bool { return false } +func (plugin *mockVolumePlugin) ConstructVolumeSpec(volumeName, mountPath string) (*vol.Spec, error) { + return nil, nil +} + func (plugin *mockVolumePlugin) NewMounter(spec *vol.Spec, podRef *api.Pod, opts vol.VolumeOptions) (vol.Mounter, error) { return nil, fmt.Errorf("Mounter is not supported by this plugin") } diff --git a/pkg/kubelet/cm/container_manager_linux_test.go b/pkg/kubelet/cm/container_manager_linux_test.go index 16273ab540042..37a4d64504471 100644 --- a/pkg/kubelet/cm/container_manager_linux_test.go +++ b/pkg/kubelet/cm/container_manager_linux_test.go @@ -50,6 +50,9 @@ func (mi *fakeMountInterface) List() ([]mount.MountPoint, error) { func (mi *fakeMountInterface) IsLikelyNotMountPoint(file string) (bool, error) { return false, fmt.Errorf("unsupported") } +func (mi *fakeMountInterface) GetDeviceNameFromMount(mountPath, pluginDir string) (string, error) { + return "", nil +} func (mi *fakeMountInterface) DeviceOpened(pathname string) (bool, error) { for _, mp := range mi.mountPoints { diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 67f12d7c57d03..38adc92cf4ea5 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -514,6 +514,7 @@ func NewMainKubelet( return nil, err } + // setup volumeManager klet.volumeManager, err = volumemanager.NewVolumeManager( enableControllerAttachDetach, nodeName, @@ -521,7 +522,8 @@ func NewMainKubelet( klet.kubeClient, klet.volumePluginMgr, klet.containerRuntime, - mounter) + mounter, + klet.getPodsDir()) runtimeCache, err := kubecontainer.NewRuntimeCache(klet.containerRuntime) if err != nil { @@ -957,7 +959,7 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) { } // Start volume manager - go kl.volumeManager.Run(wait.NeverStop) + go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop) if kl.kubeClient != nil { // Start syncing node status immediately, this may set up things the runtime needs to run. @@ -1731,16 +1733,21 @@ func (kl *Kubelet) cleanupOrphanedPodDirs( if allPods.Has(string(uid)) { continue } + // If volumes have not been unmounted/detached, do not delete directory. + // Doing so may result in corruption of data. if podVolumesExist := kl.podVolumesExist(uid); podVolumesExist { - // If volumes have not been unmounted/detached, do not delete directory. - // Doing so may result in corruption of data. glog.V(3).Infof("Orphaned pod %q found, but volumes are not cleaned up; err: %v", uid, err) continue } + // Check whether volume is still mounted on disk. If so, do not delete directory + if volumeNames, err := kl.getPodVolumeNameListFromDisk(uid); err != nil || len(volumeNames) != 0 { + glog.V(3).Infof("Orphaned pod %q found, but volumes are still mounted; err: %v, volumes: %v ", uid, err, volumeNames) + continue + } glog.V(3).Infof("Orphaned pod %q found, removing", uid) if err := os.RemoveAll(kl.getPodDir(uid)); err != nil { - glog.Infof("Failed to remove orphaned pod %q dir; err: %v", uid, err) + glog.Errorf("Failed to remove orphaned pod %q dir; err: %v", uid, err) errlist = append(errlist, err) } } diff --git a/pkg/kubelet/kubelet_getters.go b/pkg/kubelet/kubelet_getters.go index 8d8b6b8706552..8f9194c07e64d 100644 --- a/pkg/kubelet/kubelet_getters.go +++ b/pkg/kubelet/kubelet_getters.go @@ -18,14 +18,17 @@ package kubelet import ( "fmt" + "io/ioutil" "net" "path" "github.com/golang/glog" + "k8s.io/kubernetes/cmd/kubelet/app/options" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/kubelet/cm" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/types" + "k8s.io/kubernetes/pkg/util" nodeutil "k8s.io/kubernetes/pkg/util/node" ) @@ -40,7 +43,7 @@ func (kl *Kubelet) getRootDir() string { // getPodsDir returns the full path to the directory under which pod // directories are created. func (kl *Kubelet) getPodsDir() string { - return path.Join(kl.getRootDir(), "pods") + return path.Join(kl.getRootDir(), options.DefaultKubeletPodsDirName) } // getPluginsDir returns the full path to the directory under which plugin @@ -48,7 +51,7 @@ func (kl *Kubelet) getPodsDir() string { // they need to persist. Plugins should create subdirectories under this named // after their own names. func (kl *Kubelet) getPluginsDir() string { - return path.Join(kl.getRootDir(), "plugins") + return path.Join(kl.getRootDir(), options.DefaultKubeletPluginsDirName) } // getPluginDir returns a data directory name for a given plugin name. @@ -90,7 +93,7 @@ func (kl *Kubelet) getPodDir(podUID types.UID) string { // which volumes are created for the specified pod. This directory may not // exist if the pod does not exist. func (kl *Kubelet) getPodVolumesDir(podUID types.UID) string { - return path.Join(kl.getPodDir(podUID), "volumes") + return path.Join(kl.getPodDir(podUID), options.DefaultKubeletVolumesDirName) } // getPodVolumeDir returns the full path to the directory which represents the @@ -104,7 +107,7 @@ func (kl *Kubelet) getPodVolumeDir(podUID types.UID, pluginName string, volumeNa // which plugins may store data for the specified pod. This directory may not // exist if the pod does not exist. func (kl *Kubelet) getPodPluginsDir(podUID types.UID) string { - return path.Join(kl.getPodDir(podUID), "plugins") + return path.Join(kl.getPodDir(podUID), options.DefaultKubeletPluginsDirName) } // getPodPluginDir returns a data directory name for a given plugin name for a @@ -126,7 +129,7 @@ func (kl *Kubelet) getPodContainerDir(podUID types.UID, ctrName string) string { // old && new = use new (but warn) oldPath := path.Join(kl.getPodDir(podUID), ctrName) oldExists := dirExists(oldPath) - newPath := path.Join(kl.getPodDir(podUID), "containers", ctrName) + newPath := path.Join(kl.getPodDir(podUID), options.DefaultKubeletContainersDirName, ctrName) newExists := dirExists(newPath) if oldExists && !newExists { return oldPath @@ -234,3 +237,32 @@ func (kl *Kubelet) getHostIPAnyWay() (net.IP, error) { func (kl *Kubelet) GetExtraSupplementalGroupsForPod(pod *api.Pod) []int64 { return kl.volumeManager.GetExtraSupplementalGroupsForPod(pod) } + +// getPodVolumeNameListFromDisk returns a list of the volume names by reading the +// volume directories for the given pod from the disk. +func (kl *Kubelet) getPodVolumeNameListFromDisk(podUID types.UID) ([]string, error) { + volumes := []string{} + podVolDir := kl.getPodVolumesDir(podUID) + volumePluginDirs, err := ioutil.ReadDir(podVolDir) + if err != nil { + glog.Errorf("Could not read directory %s: %v", podVolDir, err) + return volumes, err + } + for _, volumePluginDir := range volumePluginDirs { + volumePluginName := volumePluginDir.Name() + volumePluginPath := path.Join(podVolDir, volumePluginName) + volumeDirs, volumeDirsStatErrs, err := util.ReadDirNoExit(volumePluginPath) + if err != nil { + return volumes, fmt.Errorf("Could not read directory %s: %v", volumePluginPath, err) + } + for i, volumeDir := range volumeDirs { + if volumeDir != nil { + volumes = append(volumes, volumeDir.Name()) + continue + } + glog.Errorf("Could not read directory %s: %v", podVolDir, volumeDirsStatErrs[i]) + + } + } + return volumes, nil +} diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index e9cc8db457f11..8412ad2efce2e 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -248,7 +248,8 @@ func newTestKubeletWithImageList( fakeKubeClient, kubelet.volumePluginMgr, fakeRuntime, - kubelet.mounter) + kubelet.mounter, + kubelet.getPodsDir()) if err != nil { t.Fatalf("failed to initialize volume manager: %v", err) } @@ -404,8 +405,7 @@ func TestVolumeAttachAndMountControllerDisabled(t *testing.T) { }, }) - stopCh := make(chan struct{}) - go kubelet.volumeManager.Run(stopCh) + stopCh := runVolumeManager(kubelet) defer func() { close(stopCh) }() @@ -474,8 +474,7 @@ func TestVolumeUnmountAndDetachControllerDisabled(t *testing.T) { }, }) - stopCh := make(chan struct{}) - go kubelet.volumeManager.Run(stopCh) + stopCh := runVolumeManager(kubelet) defer func() { close(stopCh) }() @@ -603,8 +602,7 @@ func TestVolumeAttachAndMountControllerEnabled(t *testing.T) { }, }) - stopCh := make(chan struct{}) - go kubelet.volumeManager.Run(stopCh) + stopCh := runVolumeManager(kubelet) defer func() { close(stopCh) }() @@ -697,8 +695,7 @@ func TestVolumeUnmountAndDetachControllerEnabled(t *testing.T) { }, }) - stopCh := make(chan struct{}) - go kubelet.volumeManager.Run(stopCh) + stopCh := runVolumeManager(kubelet) defer func() { close(stopCh) }() @@ -856,8 +853,7 @@ func TestPodVolumesExist(t *testing.T) { }, } - stopCh := make(chan struct{}) - go kubelet.volumeManager.Run(stopCh) + stopCh := runVolumeManager(kubelet) defer func() { close(stopCh) }() @@ -3939,3 +3935,9 @@ func simulateVolumeInUseUpdate( } } } + +func runVolumeManager(kubelet *Kubelet) chan struct{} { + stopCh := make(chan struct{}) + go kubelet.volumeManager.Run(kubelet.sourcesReady, stopCh) + return stopCh +} diff --git a/pkg/kubelet/runonce_test.go b/pkg/kubelet/runonce_test.go index 7c922a4feec15..e368056cfde5a 100644 --- a/pkg/kubelet/runonce_test.go +++ b/pkg/kubelet/runonce_test.go @@ -99,7 +99,8 @@ func TestRunOnce(t *testing.T) { kb.kubeClient, kb.volumePluginMgr, fakeRuntime, - kb.mounter) + kb.mounter, + kb.getPodsDir()) kb.networkPlugin, _ = network.InitNetworkPlugin([]network.NetworkPlugin{}, "", nettest.NewFakeHost(nil), componentconfig.HairpinNone, kb.nonMasqueradeCIDR) // TODO: Factor out "StatsProvider" from Kubelet so we don't have a cyclic dependency diff --git a/pkg/kubelet/volumemanager/cache/actual_state_of_world.go b/pkg/kubelet/volumemanager/cache/actual_state_of_world.go index d3906a52e3f02..7e8756a779467 100644 --- a/pkg/kubelet/volumemanager/cache/actual_state_of_world.go +++ b/pkg/kubelet/volumemanager/cache/actual_state_of_world.go @@ -136,6 +136,11 @@ type ActualStateOfWorld interface { // have no mountedPods. This list can be used to determine which volumes are // no longer referenced and may be globally unmounted and detached. GetUnmountedVolumes() []AttachedVolume + + // GetPods generates and returns a map of pods in which map is indexed + // with pod's unique name. This map can be used to determine which pod is currently + // in actual state of world. + GetPods() map[volumetypes.UniquePodName]bool } // MountedVolume represents a volume that has successfully been mounted to a pod. @@ -573,6 +578,21 @@ func (asw *actualStateOfWorld) GetUnmountedVolumes() []AttachedVolume { return unmountedVolumes } +func (asw *actualStateOfWorld) GetPods() map[volumetypes.UniquePodName]bool { + asw.RLock() + defer asw.RUnlock() + + podList := make(map[volumetypes.UniquePodName]bool) + for _, volumeObj := range asw.attachedVolumes { + for podName := range volumeObj.mountedPods { + if !podList[podName] { + podList[podName] = true + } + } + } + return podList +} + func (asw *actualStateOfWorld) newAttachedVolume( attachedVolume *attachedVolume) AttachedVolume { return AttachedVolume{ diff --git a/pkg/kubelet/volumemanager/cache/desired_state_of_world.go b/pkg/kubelet/volumemanager/cache/desired_state_of_world.go index 24edd5906c0b5..6bcbef8e27e05 100644 --- a/pkg/kubelet/volumemanager/cache/desired_state_of_world.go +++ b/pkg/kubelet/volumemanager/cache/desired_state_of_world.go @@ -92,6 +92,11 @@ type DesiredStateOfWorld interface { // attached to this node and the pods they should be mounted to based on the // current desired state of the world. GetVolumesToMount() []VolumeToMount + + // GetPods generates and returns a map of pods in which map is indexed + // with pod's unique name. This map can be used to determine which pod is currently + // in desired state of world. + GetPods() map[types.UniquePodName]bool } // VolumeToMount represents a volume that is attached to this node and needs to @@ -117,6 +122,7 @@ type desiredStateOfWorld struct { // volumePluginMgr is the volume plugin manager used to create volume // plugin objects. volumePluginMgr *volume.VolumePluginMgr + sync.RWMutex } @@ -203,7 +209,7 @@ func (dsw *desiredStateOfWorld) AddPodToVolume( } else { // For non-attachable volumes, generate a unique name based on the pod // namespace and name and the name of the volume within the pod. - volumeName = volumehelper.GetUniqueVolumeNameForNonAttachableVolume(podName, volumePlugin, outerVolumeSpecName) + volumeName = volumehelper.GetUniqueVolumeNameForNonAttachableVolume(podName, volumePlugin, volumeSpec) } volumeObj, volumeExists := dsw.volumesToMount[volumeName] @@ -296,6 +302,21 @@ func (dsw *desiredStateOfWorld) PodExistsInVolume( return podExists } +func (dsw *desiredStateOfWorld) GetPods() map[types.UniquePodName]bool { + dsw.RLock() + defer dsw.RUnlock() + + podList := make(map[types.UniquePodName]bool) + for _, volumeObj := range dsw.volumesToMount { + for podName := range volumeObj.podsToMount { + if !podList[podName] { + podList[podName] = true + } + } + } + return podList +} + func (dsw *desiredStateOfWorld) GetVolumesToMount() []VolumeToMount { dsw.RLock() defer dsw.RUnlock() diff --git a/pkg/kubelet/volumemanager/reconciler/reconciler.go b/pkg/kubelet/volumemanager/reconciler/reconciler.go index 38e11e231aa7c..6603e38d6202b 100644 --- a/pkg/kubelet/volumemanager/reconciler/reconciler.go +++ b/pkg/kubelet/volumemanager/reconciler/reconciler.go @@ -20,16 +20,27 @@ limitations under the License. package reconciler import ( + "fmt" + "io/ioutil" + "path" "time" "github.com/golang/glog" + "k8s.io/kubernetes/cmd/kubelet/app/options" + "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + "k8s.io/kubernetes/pkg/kubelet/config" "k8s.io/kubernetes/pkg/kubelet/volumemanager/cache" + "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff" "k8s.io/kubernetes/pkg/util/mount" + "k8s.io/kubernetes/pkg/util/strings" "k8s.io/kubernetes/pkg/util/wait" + "k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume/util/nestedpendingoperations" "k8s.io/kubernetes/pkg/volume/util/operationexecutor" + volumetypes "k8s.io/kubernetes/pkg/volume/util/types" + "k8s.io/kubernetes/pkg/volume/util/volumehelper" ) // Reconciler runs a periodic loop to reconcile the desired state of the world @@ -46,7 +57,7 @@ type Reconciler interface { // If attach/detach management is enabled, the manager will also check if // volumes that should be attached are attached and volumes that should // be detached are detached and trigger attach/detach operations as needed. - Run(stopCh <-chan struct{}) + Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) } // NewReconciler returns a new instance of Reconciler. @@ -56,6 +67,8 @@ type Reconciler interface { // this node, and therefore the volume manager should not // loopSleepDuration - the amount of time the reconciler loop sleeps between // successive executions +// reconstructDuration - the amount of time the reconstruct sleeps between +// successive executions // waitForAttachTimeout - the amount of time the Mount function will wait for // the volume to be attached // hostName - the hostname for this node, used by Attach and Detach methods @@ -65,26 +78,34 @@ type Reconciler interface { // safely (prevents more than one operation from being triggered on the same // volume) // mounter - mounter passed in from kubelet, passed down unmount path +// volumePluginMrg - volume plugin manager passed from kubelet func NewReconciler( kubeClient internalclientset.Interface, controllerAttachDetachEnabled bool, loopSleepDuration time.Duration, + reconstructDuration time.Duration, waitForAttachTimeout time.Duration, hostName string, desiredStateOfWorld cache.DesiredStateOfWorld, actualStateOfWorld cache.ActualStateOfWorld, operationExecutor operationexecutor.OperationExecutor, - mounter mount.Interface) Reconciler { + mounter mount.Interface, + volumePluginMgr *volume.VolumePluginMgr, + kubeletPodsDir string) Reconciler { return &reconciler{ kubeClient: kubeClient, controllerAttachDetachEnabled: controllerAttachDetachEnabled, loopSleepDuration: loopSleepDuration, + reconstructDuration: reconstructDuration, waitForAttachTimeout: waitForAttachTimeout, hostName: hostName, desiredStateOfWorld: desiredStateOfWorld, actualStateOfWorld: actualStateOfWorld, operationExecutor: operationExecutor, mounter: mounter, + volumePluginMgr: volumePluginMgr, + kubeletPodsDir: kubeletPodsDir, + timeOfLastReconstruct: time.Now(), } } @@ -92,157 +113,138 @@ type reconciler struct { kubeClient internalclientset.Interface controllerAttachDetachEnabled bool loopSleepDuration time.Duration + reconstructDuration time.Duration waitForAttachTimeout time.Duration hostName string desiredStateOfWorld cache.DesiredStateOfWorld actualStateOfWorld cache.ActualStateOfWorld operationExecutor operationexecutor.OperationExecutor mounter mount.Interface + volumePluginMgr *volume.VolumePluginMgr + kubeletPodsDir string + timeOfLastReconstruct time.Time } -func (rc *reconciler) Run(stopCh <-chan struct{}) { - wait.Until(rc.reconciliationLoopFunc(), rc.loopSleepDuration, stopCh) +func (rc *reconciler) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) { + wait.Until(rc.reconciliationLoopFunc(sourcesReady), rc.loopSleepDuration, stopCh) } -func (rc *reconciler) reconciliationLoopFunc() func() { +func (rc *reconciler) reconciliationLoopFunc(sourcesReady config.SourcesReady) func() { return func() { - // Unmounts are triggered before mounts so that a volume that was - // referenced by a pod that was deleted and is now referenced by another - // pod is unmounted from the first pod before being mounted to the new - // pod. + rc.reconcile() + + // Add all sources ready check so that reconciler's reconstruct process will start after + // desired state of world is populated with pod volume information from different sources. Otherwise, + // reconciler's reconstruct process may add incomplete volume information and cause confusion. + // In addition, if some sources are not ready, the reconstruct process may clean up pods' volumes + // that are still in use because desired states could not get a complete list of pods. + if sourcesReady.AllReady() && time.Since(rc.timeOfLastReconstruct) > rc.reconstructDuration { + glog.V(5).Infof("Sources are all ready, starting reconstruct state function") + rc.reconstruct() + } + } +} - // Ensure volumes that should be unmounted are unmounted. - for _, mountedVolume := range rc.actualStateOfWorld.GetMountedVolumes() { - if !rc.desiredStateOfWorld.PodExistsInVolume(mountedVolume.PodName, mountedVolume.VolumeName) { - // Volume is mounted, unmount it - glog.V(12).Infof("Attempting to start UnmountVolume for volume %q (spec.Name: %q) from pod %q (UID: %q).", +func (rc *reconciler) reconcile() { + // Unmounts are triggered before mounts so that a volume that was + // referenced by a pod that was deleted and is now referenced by another + // pod is unmounted from the first pod before being mounted to the new + // pod. + + // Ensure volumes that should be unmounted are unmounted. + for _, mountedVolume := range rc.actualStateOfWorld.GetMountedVolumes() { + if !rc.desiredStateOfWorld.PodExistsInVolume(mountedVolume.PodName, mountedVolume.VolumeName) { + // Volume is mounted, unmount it + glog.V(12).Infof("Attempting to start UnmountVolume for volume %q (spec.Name: %q) from pod %q (UID: %q).", + mountedVolume.VolumeName, + mountedVolume.OuterVolumeSpecName, + mountedVolume.PodName, + mountedVolume.PodUID) + err := rc.operationExecutor.UnmountVolume( + mountedVolume.MountedVolume, rc.actualStateOfWorld) + if err != nil && + !nestedpendingoperations.IsAlreadyExists(err) && + !exponentialbackoff.IsExponentialBackoff(err) { + // Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected. + // Log all other errors. + glog.Errorf( + "operationExecutor.UnmountVolume failed for volume %q (spec.Name: %q) pod %q (UID: %q) controllerAttachDetachEnabled: %v with err: %v", + mountedVolume.VolumeName, + mountedVolume.OuterVolumeSpecName, + mountedVolume.PodName, + mountedVolume.PodUID, + rc.controllerAttachDetachEnabled, + err) + } + if err == nil { + glog.Infof("UnmountVolume operation started for volume %q (spec.Name: %q) from pod %q (UID: %q).", mountedVolume.VolumeName, mountedVolume.OuterVolumeSpecName, mountedVolume.PodName, mountedVolume.PodUID) - err := rc.operationExecutor.UnmountVolume( - mountedVolume.MountedVolume, rc.actualStateOfWorld) + } + } + } + + // Ensure volumes that should be attached/mounted are attached/mounted. + for _, volumeToMount := range rc.desiredStateOfWorld.GetVolumesToMount() { + volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName) + volumeToMount.DevicePath = devicePath + if cache.IsVolumeNotAttachedError(err) { + if rc.controllerAttachDetachEnabled || !volumeToMount.PluginIsAttachable { + // Volume is not attached (or doesn't implement attacher), kubelet attach is disabled, wait + // for controller to finish attaching volume. + glog.V(12).Infof("Attempting to start VerifyControllerAttachedVolume for volume %q (spec.Name: %q) pod %q (UID: %q)", + volumeToMount.VolumeName, + volumeToMount.VolumeSpec.Name(), + volumeToMount.PodName, + volumeToMount.Pod.UID) + err := rc.operationExecutor.VerifyControllerAttachedVolume( + volumeToMount.VolumeToMount, + rc.hostName, + rc.actualStateOfWorld) if err != nil && !nestedpendingoperations.IsAlreadyExists(err) && !exponentialbackoff.IsExponentialBackoff(err) { // Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected. // Log all other errors. glog.Errorf( - "operationExecutor.UnmountVolume failed for volume %q (spec.Name: %q) pod %q (UID: %q) controllerAttachDetachEnabled: %v with err: %v", - mountedVolume.VolumeName, - mountedVolume.OuterVolumeSpecName, - mountedVolume.PodName, - mountedVolume.PodUID, + "operationExecutor.VerifyControllerAttachedVolume failed for volume %q (spec.Name: %q) pod %q (UID: %q) controllerAttachDetachEnabled: %v with err: %v", + volumeToMount.VolumeName, + volumeToMount.VolumeSpec.Name(), + volumeToMount.PodName, + volumeToMount.Pod.UID, rc.controllerAttachDetachEnabled, err) } if err == nil { - glog.Infof("UnmountVolume operation started for volume %q (spec.Name: %q) from pod %q (UID: %q).", - mountedVolume.VolumeName, - mountedVolume.OuterVolumeSpecName, - mountedVolume.PodName, - mountedVolume.PodUID) - } - } - } - - // Ensure volumes that should be attached/mounted are attached/mounted. - for _, volumeToMount := range rc.desiredStateOfWorld.GetVolumesToMount() { - volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName) - volumeToMount.DevicePath = devicePath - if cache.IsVolumeNotAttachedError(err) { - if rc.controllerAttachDetachEnabled || !volumeToMount.PluginIsAttachable { - // Volume is not attached (or doesn't implement attacher), kubelet attach is disabled, wait - // for controller to finish attaching volume. - glog.V(12).Infof("Attempting to start VerifyControllerAttachedVolume for volume %q (spec.Name: %q) pod %q (UID: %q)", + glog.Infof("VerifyControllerAttachedVolume operation started for volume %q (spec.Name: %q) pod %q (UID: %q)", volumeToMount.VolumeName, volumeToMount.VolumeSpec.Name(), volumeToMount.PodName, volumeToMount.Pod.UID) - err := rc.operationExecutor.VerifyControllerAttachedVolume( - volumeToMount.VolumeToMount, - rc.hostName, - rc.actualStateOfWorld) - if err != nil && - !nestedpendingoperations.IsAlreadyExists(err) && - !exponentialbackoff.IsExponentialBackoff(err) { - // Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected. - // Log all other errors. - glog.Errorf( - "operationExecutor.VerifyControllerAttachedVolume failed for volume %q (spec.Name: %q) pod %q (UID: %q) controllerAttachDetachEnabled: %v with err: %v", - volumeToMount.VolumeName, - volumeToMount.VolumeSpec.Name(), - volumeToMount.PodName, - volumeToMount.Pod.UID, - rc.controllerAttachDetachEnabled, - err) - } - if err == nil { - glog.Infof("VerifyControllerAttachedVolume operation started for volume %q (spec.Name: %q) pod %q (UID: %q)", - volumeToMount.VolumeName, - volumeToMount.VolumeSpec.Name(), - volumeToMount.PodName, - volumeToMount.Pod.UID) - } - } else { - // Volume is not attached to node, kubelet attach is enabled, volume implements an attacher, - // so attach it - volumeToAttach := operationexecutor.VolumeToAttach{ - VolumeName: volumeToMount.VolumeName, - VolumeSpec: volumeToMount.VolumeSpec, - NodeName: rc.hostName, - } - glog.V(12).Infof("Attempting to start AttachVolume for volume %q (spec.Name: %q) pod %q (UID: %q)", - volumeToMount.VolumeName, - volumeToMount.VolumeSpec.Name(), - volumeToMount.PodName, - volumeToMount.Pod.UID) - err := rc.operationExecutor.AttachVolume(volumeToAttach, rc.actualStateOfWorld) - if err != nil && - !nestedpendingoperations.IsAlreadyExists(err) && - !exponentialbackoff.IsExponentialBackoff(err) { - // Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected. - // Log all other errors. - glog.Errorf( - "operationExecutor.AttachVolume failed for volume %q (spec.Name: %q) pod %q (UID: %q) controllerAttachDetachEnabled: %v with err: %v", - volumeToMount.VolumeName, - volumeToMount.VolumeSpec.Name(), - volumeToMount.PodName, - volumeToMount.Pod.UID, - rc.controllerAttachDetachEnabled, - err) - } - if err == nil { - glog.Infof("AttachVolume operation started for volume %q (spec.Name: %q) pod %q (UID: %q)", - volumeToMount.VolumeName, - volumeToMount.VolumeSpec.Name(), - volumeToMount.PodName, - volumeToMount.Pod.UID) - } } - } else if !volMounted || cache.IsRemountRequiredError(err) { - // Volume is not mounted, or is already mounted, but requires remounting - remountingLogStr := "" - if cache.IsRemountRequiredError(err) { - remountingLogStr = "Volume is already mounted to pod, but remount was requested." + } else { + // Volume is not attached to node, kubelet attach is enabled, volume implements an attacher, + // so attach it + volumeToAttach := operationexecutor.VolumeToAttach{ + VolumeName: volumeToMount.VolumeName, + VolumeSpec: volumeToMount.VolumeSpec, + NodeName: rc.hostName, } - glog.V(12).Infof("Attempting to start MountVolume for volume %q (spec.Name: %q) to pod %q (UID: %q). %s", + glog.V(12).Infof("Attempting to start AttachVolume for volume %q (spec.Name: %q) pod %q (UID: %q)", volumeToMount.VolumeName, volumeToMount.VolumeSpec.Name(), volumeToMount.PodName, - volumeToMount.Pod.UID, - remountingLogStr) - err := rc.operationExecutor.MountVolume( - rc.waitForAttachTimeout, - volumeToMount.VolumeToMount, - rc.actualStateOfWorld) + volumeToMount.Pod.UID) + err := rc.operationExecutor.AttachVolume(volumeToAttach, rc.actualStateOfWorld) if err != nil && !nestedpendingoperations.IsAlreadyExists(err) && !exponentialbackoff.IsExponentialBackoff(err) { // Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected. // Log all other errors. glog.Errorf( - "operationExecutor.MountVolume failed for volume %q (spec.Name: %q) pod %q (UID: %q) controllerAttachDetachEnabled: %v with err: %v", + "operationExecutor.AttachVolume failed for volume %q (spec.Name: %q) pod %q (UID: %q) controllerAttachDetachEnabled: %v with err: %v", volumeToMount.VolumeName, volumeToMount.VolumeSpec.Name(), volumeToMount.PodName, @@ -251,77 +253,306 @@ func (rc *reconciler) reconciliationLoopFunc() func() { err) } if err == nil { - glog.Infof("MountVolume operation started for volume %q (spec.Name: %q) to pod %q (UID: %q). %s", + glog.Infof("AttachVolume operation started for volume %q (spec.Name: %q) pod %q (UID: %q)", volumeToMount.VolumeName, volumeToMount.VolumeSpec.Name(), volumeToMount.PodName, - volumeToMount.Pod.UID, - remountingLogStr) + volumeToMount.Pod.UID) } } + } else if !volMounted || cache.IsRemountRequiredError(err) { + // Volume is not mounted, or is already mounted, but requires remounting + remountingLogStr := "" + if cache.IsRemountRequiredError(err) { + remountingLogStr = "Volume is already mounted to pod, but remount was requested." + } + glog.V(12).Infof("Attempting to start MountVolume for volume %q (spec.Name: %q) to pod %q (UID: %q). %s", + volumeToMount.VolumeName, + volumeToMount.VolumeSpec.Name(), + volumeToMount.PodName, + volumeToMount.Pod.UID, + remountingLogStr) + err := rc.operationExecutor.MountVolume( + rc.waitForAttachTimeout, + volumeToMount.VolumeToMount, + rc.actualStateOfWorld) + if err != nil && + !nestedpendingoperations.IsAlreadyExists(err) && + !exponentialbackoff.IsExponentialBackoff(err) { + // Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected. + // Log all other errors. + glog.Errorf( + "operationExecutor.MountVolume failed for volume %q (spec.Name: %q) pod %q (UID: %q) controllerAttachDetachEnabled: %v with err: %v", + volumeToMount.VolumeName, + volumeToMount.VolumeSpec.Name(), + volumeToMount.PodName, + volumeToMount.Pod.UID, + rc.controllerAttachDetachEnabled, + err) + } + if err == nil { + glog.Infof("MountVolume operation started for volume %q (spec.Name: %q) to pod %q (UID: %q). %s", + volumeToMount.VolumeName, + volumeToMount.VolumeSpec.Name(), + volumeToMount.PodName, + volumeToMount.Pod.UID, + remountingLogStr) + } } + } - // Ensure devices that should be detached/unmounted are detached/unmounted. - for _, attachedVolume := range rc.actualStateOfWorld.GetUnmountedVolumes() { - if !rc.desiredStateOfWorld.VolumeExists(attachedVolume.VolumeName) { - if attachedVolume.GloballyMounted { - // Volume is globally mounted to device, unmount it - glog.V(12).Infof("Attempting to start UnmountDevice for volume %q (spec.Name: %q)", + // Ensure devices that should be detached/unmounted are detached/unmounted. + for _, attachedVolume := range rc.actualStateOfWorld.GetUnmountedVolumes() { + if !rc.desiredStateOfWorld.VolumeExists(attachedVolume.VolumeName) { + if attachedVolume.GloballyMounted { + // Volume is globally mounted to device, unmount it + glog.V(12).Infof("Attempting to start UnmountDevice for volume %q (spec.Name: %q)", + attachedVolume.VolumeName, + attachedVolume.VolumeSpec.Name()) + err := rc.operationExecutor.UnmountDevice( + attachedVolume.AttachedVolume, rc.actualStateOfWorld, rc.mounter) + if err != nil && + !nestedpendingoperations.IsAlreadyExists(err) && + !exponentialbackoff.IsExponentialBackoff(err) { + // Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected. + // Log all other errors. + glog.Errorf( + "operationExecutor.UnmountDevice failed for volume %q (spec.Name: %q) controllerAttachDetachEnabled: %v with err: %v", + attachedVolume.VolumeName, + attachedVolume.VolumeSpec.Name(), + rc.controllerAttachDetachEnabled, + err) + } + if err == nil { + glog.Infof("UnmountDevice operation started for volume %q (spec.Name: %q)", + attachedVolume.VolumeName, + attachedVolume.VolumeSpec.Name()) + } + } else { + // Volume is attached to node, detach it + if rc.controllerAttachDetachEnabled || !attachedVolume.PluginIsAttachable { + // Kubelet not responsible for detaching or this volume has a non-attachable volume plugin, + // so just remove it to actualStateOfWorld without attach. + rc.actualStateOfWorld.MarkVolumeAsDetached( + attachedVolume.VolumeName, rc.hostName) + } else { + // Only detach if kubelet detach is enabled + glog.V(12).Infof("Attempting to start DetachVolume for volume %q (spec.Name: %q)", attachedVolume.VolumeName, attachedVolume.VolumeSpec.Name()) - err := rc.operationExecutor.UnmountDevice( - attachedVolume.AttachedVolume, rc.actualStateOfWorld, rc.mounter) + err := rc.operationExecutor.DetachVolume( + attachedVolume.AttachedVolume, false /* verifySafeToDetach */, rc.actualStateOfWorld) if err != nil && !nestedpendingoperations.IsAlreadyExists(err) && !exponentialbackoff.IsExponentialBackoff(err) { - // Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected. + // Ignore nestedpendingoperations.IsAlreadyExists && exponentialbackoff.IsExponentialBackoff errors, they are expected. // Log all other errors. glog.Errorf( - "operationExecutor.UnmountDevice failed for volume %q (spec.Name: %q) controllerAttachDetachEnabled: %v with err: %v", + "operationExecutor.DetachVolume failed for volume %q (spec.Name: %q) controllerAttachDetachEnabled: %v with err: %v", attachedVolume.VolumeName, attachedVolume.VolumeSpec.Name(), rc.controllerAttachDetachEnabled, err) } if err == nil { - glog.Infof("UnmountDevice operation started for volume %q (spec.Name: %q)", - attachedVolume.VolumeName, - attachedVolume.VolumeSpec.Name()) - } - } else { - // Volume is attached to node, detach it - if rc.controllerAttachDetachEnabled || !attachedVolume.PluginIsAttachable { - // Kubelet not responsible for detaching or this volume has a non-attachable volume plugin, - // so just remove it to actualStateOfWorld without attach. - rc.actualStateOfWorld.MarkVolumeAsDetached( - attachedVolume.VolumeName, rc.hostName) - } else { - // Only detach if kubelet detach is enabled - glog.V(12).Infof("Attempting to start DetachVolume for volume %q (spec.Name: %q)", + glog.Infof("DetachVolume operation started for volume %q (spec.Name: %q)", attachedVolume.VolumeName, attachedVolume.VolumeSpec.Name()) - err := rc.operationExecutor.DetachVolume( - attachedVolume.AttachedVolume, false /* verifySafeToDetach */, rc.actualStateOfWorld) - if err != nil && - !nestedpendingoperations.IsAlreadyExists(err) && - !exponentialbackoff.IsExponentialBackoff(err) { - // Ignore nestedpendingoperations.IsAlreadyExists && exponentialbackoff.IsExponentialBackoff errors, they are expected. - // Log all other errors. - glog.Errorf( - "operationExecutor.DetachVolume failed for volume %q (spec.Name: %q) controllerAttachDetachEnabled: %v with err: %v", - attachedVolume.VolumeName, - attachedVolume.VolumeSpec.Name(), - rc.controllerAttachDetachEnabled, - err) - } - if err == nil { - glog.Infof("DetachVolume operation started for volume %q (spec.Name: %q)", - attachedVolume.VolumeName, - attachedVolume.VolumeSpec.Name()) - } } } } } } } + +// reconstruct process tries to observe the real world by scanning all pods' volume directories from the disk. +// If the actual and desired state of worlds are not consistent with the observed world, it means that some +// mounted volumes are left out probably during kubelet restart. This process will reconstruct +// the volumes and udpate the actual and desired states. In the following reconciler loop, those volumes will +// be cleaned up. +func (rc *reconciler) reconstruct() { + defer rc.updateReconstructTime() + rc.reconstructStates(rc.kubeletPodsDir) +} + +func (rc *reconciler) updateReconstructTime() { + rc.timeOfLastReconstruct = time.Now() +} + +type podVolume struct { + podName volumetypes.UniquePodName + volumeSpecName string + mountPath string + pluginName string +} + +// reconstructFromDisk scans the volume directories under the given pod directory. If the volume is not +// in either actual or desired state of world, or pending operation, this function will reconstuct +// the volume spec and put it in both the actual and desired state of worlds. If no running +// container is mounting the volume, the volume will be removed by desired state of world's populator and +// cleaned up by the reconciler. +func (rc *reconciler) reconstructStates(podsDir string) { + // Get volumes information by reading the pod's directory + podVolumes, err := getVolumesFromPodDir(podsDir) + if err != nil { + glog.Errorf("Cannot get volumes from disk %v", err) + return + } + for _, volume := range podVolumes { + volumeToMount, err := rc.reconstructVolume(volume) + if err != nil { + glog.Errorf("Could not construct volume information: %v", err) + continue + } + + // Check if there is an pending operation for the given pod and volume. + // Need to check pending operation before checking the actual and desired + // states to avoid race condition during checking. For exmaple, the following + // might happen if pending operation is checked after checking actual and desired states. + // 1. Checking the pod and it does not exist in either actual or desired state. + // 2. An operation for the given pod finishes and the actual state is updated. + // 3. Checking and there is no pending operation for the given pod. + if rc.operationExecutor.IsOperationPending(volumeToMount.VolumeName, volumeToMount.PodName) { + continue + } + desiredPods := rc.desiredStateOfWorld.GetPods() + actualPods := rc.actualStateOfWorld.GetPods() + if desiredPods[volume.podName] || actualPods[volume.podName] { + continue + } + + glog.V(3).Infof( + "Could not find pod information in desired or actual states or pending operation, update it in both states: %+v", + volumeToMount) + if err = rc.updateStates(volumeToMount); err != nil { + glog.Errorf("Error occured during reconstruct volume from disk: %v", err) + } + } +} + +// Reconstruct Volume object and volumeToMount data structure by reading the pod's volume directories +func (rc *reconciler) reconstructVolume(volume podVolume) (*operationexecutor.VolumeToMount, error) { + plugin, err := rc.volumePluginMgr.FindPluginByName(volume.pluginName) + if err != nil { + return nil, err + } + volumeSpec, err := plugin.ConstructVolumeSpec(volume.volumeSpecName, volume.mountPath) + if err != nil { + return nil, err + } + volumeName, err := plugin.GetVolumeName(volumeSpec) + if err != nil { + return nil, err + } + pod := &api.Pod{ + ObjectMeta: api.ObjectMeta{ + UID: types.UID(volume.podName), + }, + } + attachablePlugin, err := rc.volumePluginMgr.FindAttachablePluginByName(volume.pluginName) + if err != nil { + return nil, err + } + var uniqueVolumeName api.UniqueVolumeName + if attachablePlugin != nil { + uniqueVolumeName = volumehelper.GetUniqueVolumeName(volume.pluginName, volumeName) + } else { + uniqueVolumeName = volumehelper.GetUniqueVolumeNameForNonAttachableVolume(volume.podName, plugin, volumeSpec) + } + + volumeToMount := &operationexecutor.VolumeToMount{ + VolumeName: uniqueVolumeName, + PodName: volume.podName, + VolumeSpec: volumeSpec, + OuterVolumeSpecName: volumeName, /*volumeName is InnerVolumeSpecName. But this information will not be used for cleanup*/ + Pod: pod, + PluginIsAttachable: attachablePlugin != nil, + VolumeGidValue: "", + DevicePath: "", + } + return volumeToMount, nil +} + +func (rc *reconciler) updateStates(volumeToMount *operationexecutor.VolumeToMount) error { + err := rc.actualStateOfWorld.MarkVolumeAsAttached( + volumeToMount.VolumeName, volumeToMount.VolumeSpec, "", volumeToMount.DevicePath) + if err != nil { + return fmt.Errorf("Could not add volume information to actual state of world: %v", err) + } + err = rc.actualStateOfWorld.AddPodToVolume( + volumeToMount.PodName, + types.UID(volumeToMount.PodName), + volumeToMount.VolumeName, + nil, + volumeToMount.OuterVolumeSpecName, + volumeToMount.DevicePath) + if err != nil { + return fmt.Errorf("Could not add pod to volume information to actual state of world: %v", err) + } + if volumeToMount.PluginIsAttachable { + err = rc.actualStateOfWorld.MarkDeviceAsMounted(volumeToMount.VolumeName) + if err != nil { + return fmt.Errorf("Could not mark device is mounted to actual state of world: %v", err) + } + } + _, err = rc.desiredStateOfWorld.AddPodToVolume(volumeToMount.PodName, + volumeToMount.Pod, + volumeToMount.VolumeSpec, + volumeToMount.OuterVolumeSpecName, + volumeToMount.VolumeGidValue) + if err != nil { + return fmt.Errorf("Could not add pod to volume information to desired state of world: %v", err) + } + return nil +} + +// getVolumesFromPodDir scans through the volumes directories under the given pod directory. +// It returns a list of pod volume information including pod's uid, volume's plugin name, mount path, +// and volume spec name. +func getVolumesFromPodDir(podDir string) ([]podVolume, error) { + podsDirInfo, err := ioutil.ReadDir(podDir) + if err != nil { + return nil, err + } + volumes := []podVolume{} + for i := range podsDirInfo { + if !podsDirInfo[i].IsDir() { + continue + } + podName := podsDirInfo[i].Name() + podDir := path.Join(podDir, podName) + volumesDir := path.Join(podDir, options.DefaultKubeletVolumesDirName) + volumesDirInfo, err := ioutil.ReadDir(volumesDir) + if err != nil { + glog.Errorf("Could not read volume directory %q: %v", volumesDir, err) + continue + } + for _, volumeDir := range volumesDirInfo { + pluginName := volumeDir.Name() + volumePluginPath := path.Join(volumesDir, pluginName) + + volumePluginDirs, err := ioutil.ReadDir(volumePluginPath) + if err != nil { + glog.Errorf("Could not read volume plugin directory %q: %v", volumePluginPath, err) + continue + } + + unescapePluginName := strings.UnescapeQualifiedNameForDisk(pluginName) + for _, volumeNameDir := range volumePluginDirs { + if volumeNameDir != nil { + volumeName := volumeNameDir.Name() + mountPath := path.Join(volumePluginPath, volumeName) + volumes = append(volumes, podVolume{ + podName: volumetypes.UniquePodName(podName), + volumeSpecName: volumeName, + mountPath: mountPath, + pluginName: unescapePluginName, + }) + } + } + + } + } + glog.V(10).Infof("Get volumes from pod directory %q %+v", podDir, volumes) + return volumes, nil +} diff --git a/pkg/kubelet/volumemanager/reconciler/reconciler_test.go b/pkg/kubelet/volumemanager/reconciler/reconciler_test.go index 58ab07ad525d6..aa9e61b7a1401 100644 --- a/pkg/kubelet/volumemanager/reconciler/reconciler_test.go +++ b/pkg/kubelet/volumemanager/reconciler/reconciler_test.go @@ -25,9 +25,11 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" "k8s.io/kubernetes/pkg/client/testing/core" + "k8s.io/kubernetes/pkg/kubelet/config" "k8s.io/kubernetes/pkg/kubelet/volumemanager/cache" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util/mount" + "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/volume" volumetesting "k8s.io/kubernetes/pkg/volume/testing" @@ -38,12 +40,13 @@ import ( const ( // reconcilerLoopSleepDuration is the amount of time the reconciler loop // waits between successive executions - reconcilerLoopSleepDuration time.Duration = 0 * time.Millisecond - + reconcilerLoopSleepDuration time.Duration = 0 * time.Millisecond + reconcilerReconstructSleepPeriod time.Duration = 10 * time.Minute // waitForAttachTimeout is the maximum amount of time a // operationexecutor.Mount call will wait for a volume to be attached. waitForAttachTimeout time.Duration = 1 * time.Second nodeName string = "myhostname" + kubeletPodsDir string = "fake-dir" ) // Calls Run() @@ -59,15 +62,18 @@ func Test_Run_Positive_DoNothing(t *testing.T) { kubeClient, false, /* controllerAttachDetachEnabled */ reconcilerLoopSleepDuration, + reconcilerReconstructSleepPeriod, waitForAttachTimeout, nodeName, dsw, asw, oex, - &mount.FakeMounter{}) + &mount.FakeMounter{}, + volumePluginMgr, + kubeletPodsDir) // Act - go reconciler.Run(wait.NeverStop) + runReconciler(reconciler) // Assert assert.NoError(t, volumetesting.VerifyZeroAttachCalls(fakePlugin)) @@ -92,12 +98,15 @@ func Test_Run_Positive_VolumeAttachAndMount(t *testing.T) { kubeClient, false, /* controllerAttachDetachEnabled */ reconcilerLoopSleepDuration, + reconcilerReconstructSleepPeriod, waitForAttachTimeout, nodeName, dsw, asw, oex, - &mount.FakeMounter{}) + &mount.FakeMounter{}, + volumePluginMgr, + kubeletPodsDir) pod := &api.Pod{ ObjectMeta: api.ObjectMeta{ Name: "pod1", @@ -128,9 +137,8 @@ func Test_Run_Positive_VolumeAttachAndMount(t *testing.T) { } // Act - go reconciler.Run(wait.NeverStop) + runReconciler(reconciler) waitForMount(t, fakePlugin, generatedVolumeName, asw) - // Assert assert.NoError(t, volumetesting.VerifyAttachCallCount( 1 /* expectedAttachCallCount */, fakePlugin)) @@ -160,12 +168,15 @@ func Test_Run_Positive_VolumeMountControllerAttachEnabled(t *testing.T) { kubeClient, true, /* controllerAttachDetachEnabled */ reconcilerLoopSleepDuration, + reconcilerReconstructSleepPeriod, waitForAttachTimeout, nodeName, dsw, asw, oex, - &mount.FakeMounter{}) + &mount.FakeMounter{}, + volumePluginMgr, + kubeletPodsDir) pod := &api.Pod{ ObjectMeta: api.ObjectMeta{ Name: "pod1", @@ -197,7 +208,7 @@ func Test_Run_Positive_VolumeMountControllerAttachEnabled(t *testing.T) { } // Act - go reconciler.Run(wait.NeverStop) + runReconciler(reconciler) waitForMount(t, fakePlugin, generatedVolumeName, asw) // Assert @@ -228,12 +239,15 @@ func Test_Run_Positive_VolumeAttachMountUnmountDetach(t *testing.T) { kubeClient, false, /* controllerAttachDetachEnabled */ reconcilerLoopSleepDuration, + reconcilerReconstructSleepPeriod, waitForAttachTimeout, nodeName, dsw, asw, oex, - &mount.FakeMounter{}) + &mount.FakeMounter{}, + volumePluginMgr, + kubeletPodsDir) pod := &api.Pod{ ObjectMeta: api.ObjectMeta{ Name: "pod1", @@ -264,9 +278,8 @@ func Test_Run_Positive_VolumeAttachMountUnmountDetach(t *testing.T) { } // Act - go reconciler.Run(wait.NeverStop) + runReconciler(reconciler) waitForMount(t, fakePlugin, generatedVolumeName, asw) - // Assert assert.NoError(t, volumetesting.VerifyAttachCallCount( 1 /* expectedAttachCallCount */, fakePlugin)) @@ -308,12 +321,15 @@ func Test_Run_Positive_VolumeUnmountControllerAttachEnabled(t *testing.T) { kubeClient, true, /* controllerAttachDetachEnabled */ reconcilerLoopSleepDuration, + reconcilerReconstructSleepPeriod, waitForAttachTimeout, nodeName, dsw, asw, oex, - &mount.FakeMounter{}) + &mount.FakeMounter{}, + volumePluginMgr, + kubeletPodsDir) pod := &api.Pod{ ObjectMeta: api.ObjectMeta{ Name: "pod1", @@ -344,7 +360,8 @@ func Test_Run_Positive_VolumeUnmountControllerAttachEnabled(t *testing.T) { } // Act - go reconciler.Run(wait.NeverStop) + runReconciler(reconciler) + dsw.MarkVolumesReportedInUse([]api.UniqueVolumeName{generatedVolumeName}) waitForMount(t, fakePlugin, generatedVolumeName, asw) @@ -445,3 +462,8 @@ func createTestClient() *fake.Clientset { }) return fakeClient } + +func runReconciler(reconciler Reconciler) { + sourcesReady := config.NewSourcesReady(func(_ sets.String) bool { return false }) + go reconciler.Run(sourcesReady, wait.NeverStop) +} diff --git a/pkg/kubelet/volumemanager/volume_manager.go b/pkg/kubelet/volumemanager/volume_manager.go index e4c6b2aa7dcc0..353887af68b2d 100644 --- a/pkg/kubelet/volumemanager/volume_manager.go +++ b/pkg/kubelet/volumemanager/volume_manager.go @@ -24,6 +24,7 @@ import ( "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + "k8s.io/kubernetes/pkg/kubelet/config" "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/pod" @@ -46,6 +47,10 @@ const ( // between successive executions reconcilerLoopSleepPeriod time.Duration = 100 * time.Millisecond + // reconcilerReconstructSleepPeriod is the amount of time the reconciler reconstruct process + // waits between successive executions + reconcilerReconstructSleepPeriod time.Duration = 3 * time.Minute + // desiredStateOfWorldPopulatorLoopSleepPeriod is the amount of time the // DesiredStateOfWorldPopulator loop waits between successive executions desiredStateOfWorldPopulatorLoopSleepPeriod time.Duration = 100 * time.Millisecond @@ -76,6 +81,10 @@ const ( // operation is waiting it only blocks other operations on the same device, // other devices are not affected. waitForAttachTimeout time.Duration = 10 * time.Minute + + // reconcilerStartGracePeriod is the maximum amount of time volume manager + // can wait to start reconciler + reconcilerStartGracePeriod time.Duration = 60 * time.Second ) // VolumeManager runs a set of asynchronous loops that figure out which volumes @@ -83,7 +92,7 @@ const ( // this node and makes it so. type VolumeManager interface { // Starts the volume manager and all the asynchronous loops that it controls - Run(stopCh <-chan struct{}) + Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) // WaitForAttachAndMount processes the volumes referenced in the specified // pod and blocks until they are all attached and mounted (reflected in @@ -138,7 +147,8 @@ func NewVolumeManager( kubeClient internalclientset.Interface, volumePluginMgr *volume.VolumePluginMgr, kubeContainerRuntime kubecontainer.Runtime, - mounter mount.Interface) (VolumeManager, error) { + mounter mount.Interface, + kubeletPodsDir string) (VolumeManager, error) { vm := &volumeManager{ kubeClient: kubeClient, volumePluginMgr: volumePluginMgr, @@ -153,12 +163,15 @@ func NewVolumeManager( kubeClient, controllerAttachDetachEnabled, reconcilerLoopSleepPeriod, + reconcilerReconstructSleepPeriod, waitForAttachTimeout, hostName, vm.desiredStateOfWorld, vm.actualStateOfWorld, vm.operationExecutor, - mounter) + mounter, + volumePluginMgr, + kubeletPodsDir) vm.desiredStateOfWorldPopulator = populator.NewDesiredStateOfWorldPopulator( kubeClient, desiredStateOfWorldPopulatorLoopSleepPeriod, @@ -208,12 +221,14 @@ type volumeManager struct { desiredStateOfWorldPopulator populator.DesiredStateOfWorldPopulator } -func (vm *volumeManager) Run(stopCh <-chan struct{}) { +func (vm *volumeManager) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) { defer runtime.HandleCrash() - glog.Infof("Starting Kubelet Volume Manager") - go vm.reconciler.Run(stopCh) go vm.desiredStateOfWorldPopulator.Run(stopCh) + glog.V(2).Infof("The desired_state_of_world populator starts") + + glog.Infof("Starting Kubelet Volume Manager") + go vm.reconciler.Run(sourcesReady, stopCh) <-stopCh glog.Infof("Shutting down Kubelet Volume Manager") diff --git a/pkg/kubelet/volumemanager/volume_manager_test.go b/pkg/kubelet/volumemanager/volume_manager_test.go index 105c914271090..db9a40c03c2c1 100644 --- a/pkg/kubelet/volumemanager/volume_manager_test.go +++ b/pkg/kubelet/volumemanager/volume_manager_test.go @@ -26,11 +26,13 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" + "k8s.io/kubernetes/pkg/kubelet/config" containertest "k8s.io/kubernetes/pkg/kubelet/container/testing" "k8s.io/kubernetes/pkg/kubelet/pod" kubepod "k8s.io/kubernetes/pkg/kubelet/pod" podtest "k8s.io/kubernetes/pkg/kubelet/pod/testing" "k8s.io/kubernetes/pkg/util/mount" + "k8s.io/kubernetes/pkg/util/sets" utiltesting "k8s.io/kubernetes/pkg/util/testing" "k8s.io/kubernetes/pkg/volume" volumetest "k8s.io/kubernetes/pkg/volume/testing" @@ -58,8 +60,7 @@ func TestGetMountedVolumesForPodAndGetVolumesInUse(t *testing.T) { t.Fatalf("Failed to initialize volume manager: %v", err) } - stopCh := make(chan struct{}) - go manager.Run(stopCh) + stopCh := runVolumeManager(manager) defer close(stopCh) podManager.SetPods([]*api.Pod{pod}) @@ -149,8 +150,10 @@ func TestGetExtraSupplementalGroupsForPod(t *testing.T) { continue } - stopCh := make(chan struct{}) - go manager.Run(stopCh) + stopCh := runVolumeManager(manager) + defer func() { + close(stopCh) + }() podManager.SetPods([]*api.Pod{pod}) @@ -170,8 +173,6 @@ func TestGetExtraSupplementalGroupsForPod(t *testing.T) { if !reflect.DeepEqual(tc.expected, actual) { t.Errorf("Expected supplemental groups %v, got %v", tc.expected, actual) } - - close(stopCh) } } @@ -190,7 +191,8 @@ func newTestVolumeManager( kubeClient, plugMgr, &containertest.FakeRuntime{}, - &mount.FakeMounter{}) + &mount.FakeMounter{}, + "") return vm, err } @@ -276,3 +278,12 @@ func simulateVolumeInUseUpdate( } } } + +func runVolumeManager(manager VolumeManager) chan struct{} { + stopCh := make(chan struct{}) + //readyCh := make(chan bool, 1) + //readyCh <- true + sourcesReady := config.NewSourcesReady(func(_ sets.String) bool { return true }) + go manager.Run(sourcesReady, stopCh) + return stopCh +} diff --git a/pkg/util/goroutinemap/goroutinemap.go b/pkg/util/goroutinemap/goroutinemap.go index c5f6884a0c9dc..913d066e59e88 100644 --- a/pkg/util/goroutinemap/goroutinemap.go +++ b/pkg/util/goroutinemap/goroutinemap.go @@ -58,6 +58,10 @@ type GoRoutineMap interface { // necessary during tests - the test should wait until all operations finish // and evaluate results after that. Wait() + + // IsOperationPending returns true if the operation is pending, otherwise + // returns false + IsOperationPending(operationName string) bool } // NewGoRoutineMap returns a new instance of GoRoutineMap. @@ -75,7 +79,7 @@ type goRoutineMap struct { operations map[string]operation exponentialBackOffOnError bool cond *sync.Cond - lock sync.Mutex + lock sync.RWMutex } type operation struct { @@ -150,6 +154,16 @@ func (grm *goRoutineMap) operationComplete( } } +func (grm *goRoutineMap) IsOperationPending(operationName string) bool { + grm.lock.RLock() + defer grm.lock.RUnlock() + existingOp, exists := grm.operations[operationName] + if exists && existingOp.operationPending { + return true + } + return false +} + func (grm *goRoutineMap) Wait() { grm.lock.Lock() defer grm.lock.Unlock() diff --git a/pkg/util/mount/fake.go b/pkg/util/mount/fake.go index 3a9706cf3cedd..60b001ba6c538 100644 --- a/pkg/util/mount/fake.go +++ b/pkg/util/mount/fake.go @@ -140,3 +140,7 @@ func (f *FakeMounter) DeviceOpened(pathname string) (bool, error) { func (f *FakeMounter) PathIsDevice(pathname string) (bool, error) { return true, nil } + +func (f *FakeMounter) GetDeviceNameFromMount(mountPath, pluginDir string) (string, error) { + return getDeviceNameFromMount(f, mountPath, pluginDir) +} diff --git a/pkg/util/mount/mount.go b/pkg/util/mount/mount.go index 69f39cf6943f2..25f55df9ea928 100644 --- a/pkg/util/mount/mount.go +++ b/pkg/util/mount/mount.go @@ -19,7 +19,10 @@ limitations under the License. package mount import ( + "fmt" + "path" "path/filepath" + "strings" "github.com/golang/glog" "k8s.io/kubernetes/pkg/util/exec" @@ -43,6 +46,9 @@ type Interface interface { DeviceOpened(pathname string) (bool, error) // PathIsDevice determines if a path is a device. PathIsDevice(pathname string) (bool, error) + // GetDeviceNameFromMount finds the device name by checking the mount path + // to get the global mount path which matches its plugin directory + GetDeviceNameFromMount(mountPath, pluginDir string) (string, error) } // This represents a single line in /proc/mounts or /etc/fstab. @@ -151,3 +157,25 @@ func GetDeviceNameFromMount(mounter Interface, mountPath string) (string, int, e } return device, refCount, nil } + +// getDeviceNameFromMount find the device name from /proc/mounts in which +// the mount path reference should match the given plugin directory. In case no mount path reference +// matches, returns the volume name taken from its given mountPath +func getDeviceNameFromMount(mounter Interface, mountPath, pluginDir string) (string, error) { + refs, err := GetMountRefs(mounter, mountPath) + if err != nil { + glog.V(4).Infof("GetMountRefs failed for mount path %q: %v", mountPath, err) + return "", err + } + if len(refs) == 0 { + glog.V(4).Infof("Directory %s is not mounted", mountPath) + return "", fmt.Errorf("directory %s is not mounted", mountPath) + } + for _, ref := range refs { + if strings.HasPrefix(ref, pluginDir) { + return path.Base(ref), nil + } + } + + return path.Base(mountPath), nil +} diff --git a/pkg/util/mount/mount_linux.go b/pkg/util/mount/mount_linux.go index 03a5ccae75598..5e556c3444e43 100644 --- a/pkg/util/mount/mount_linux.go +++ b/pkg/util/mount/mount_linux.go @@ -222,6 +222,11 @@ func pathIsDevice(pathname string) (bool, error) { return false, nil } +//GetDeviceNameFromMount: given a mount point, find the device name from its global mount point +func (mounter *Mounter) GetDeviceNameFromMount(mountPath, pluginDir string) (string, error) { + return getDeviceNameFromMount(mounter, mountPath, pluginDir) +} + func listProcMounts(mountFilePath string) ([]MountPoint, error) { hash1, err := readProcMounts(mountFilePath, nil) if err != nil { diff --git a/pkg/util/mount/nsenter_mount.go b/pkg/util/mount/nsenter_mount.go index ea2b945a7f256..8004ed3f288de 100644 --- a/pkg/util/mount/nsenter_mount.go +++ b/pkg/util/mount/nsenter_mount.go @@ -217,6 +217,11 @@ func (n *NsenterMounter) PathIsDevice(pathname string) (bool, error) { return pathIsDevice(pathname) } +//GetDeviceNameFromMount given a mount point, find the volume id from checking /proc/mounts +func (n *NsenterMounter) GetDeviceNameFromMount(mountPath, pluginDir string) (string, error) { + return getDeviceNameFromMount(n, mountPath, pluginDir) +} + func (n *NsenterMounter) absHostPath(command string) string { path, ok := n.paths[command] if !ok { diff --git a/pkg/util/mount/nsenter_mount_unsupported.go b/pkg/util/mount/nsenter_mount_unsupported.go index f40f73ab2d219..dcf19edefd264 100644 --- a/pkg/util/mount/nsenter_mount_unsupported.go +++ b/pkg/util/mount/nsenter_mount_unsupported.go @@ -49,3 +49,7 @@ func (*NsenterMounter) DeviceOpened(pathname string) (bool, error) { func (*NsenterMounter) PathIsDevice(pathname string) (bool, error) { return true, nil } + +func (*NsenterMounter) GetDeviceNameFromMount(mountPath, pluginDir string) (string, error) { + return "", nil +} diff --git a/pkg/volume/aws_ebs/attacher.go b/pkg/volume/aws_ebs/attacher.go index e4a710e08f8b0..2a37527f4ed23 100644 --- a/pkg/volume/aws_ebs/attacher.go +++ b/pkg/volume/aws_ebs/attacher.go @@ -51,6 +51,11 @@ func (plugin *awsElasticBlockStorePlugin) NewAttacher() (volume.Attacher, error) }, nil } +func (plugin *awsElasticBlockStorePlugin) GetDeviceMountRefs(deviceMountPath string) ([]string, error) { + mounter := plugin.host.GetMounter() + return mount.GetMountRefs(mounter, deviceMountPath) +} + func (attacher *awsElasticBlockStoreAttacher) Attach(spec *volume.Spec, hostName string) (string, error) { volumeSource, readOnly, err := getVolumeSource(spec) if err != nil { diff --git a/pkg/volume/aws_ebs/aws_ebs.go b/pkg/volume/aws_ebs/aws_ebs.go index b2831da3a577a..e60a7c867502b 100644 --- a/pkg/volume/aws_ebs/aws_ebs.go +++ b/pkg/volume/aws_ebs/aws_ebs.go @@ -188,6 +188,24 @@ func getVolumeSource( return nil, false, fmt.Errorf("Spec does not reference an AWS EBS volume type") } +func (plugin *awsElasticBlockStorePlugin) ConstructVolumeSpec(volName, mountPath string) (*volume.Spec, error) { + mounter := plugin.host.GetMounter() + pluginDir := plugin.host.GetPluginDir(plugin.GetPluginName()) + sourceName, err := mounter.GetDeviceNameFromMount(mountPath, pluginDir) + if err != nil { + return nil, err + } + awsVolume := &api.Volume{ + Name: volName, + VolumeSource: api.VolumeSource{ + AWSElasticBlockStore: &api.AWSElasticBlockStoreVolumeSource{ + VolumeID: sourceName, + }, + }, + } + return volume.NewSpecFromVolume(awsVolume), nil +} + // Abstract interface to PD operations. type ebsManager interface { CreateVolume(provisioner *awsElasticBlockStoreProvisioner) (volumeID string, volumeSizeGB int, labels map[string]string, err error) diff --git a/pkg/volume/azure_file/azure_file.go b/pkg/volume/azure_file/azure_file.go index 9201abb16ac8f..7511dcbe35704 100644 --- a/pkg/volume/azure_file/azure_file.go +++ b/pkg/volume/azure_file/azure_file.go @@ -124,6 +124,19 @@ func (plugin *azureFilePlugin) newUnmounterInternal(volName string, podUID types }}, nil } +func (plugin *azureFilePlugin) ConstructVolumeSpec(volName, mountPath string) (*volume.Spec, error) { + azureVolume := &api.Volume{ + Name: volName, + VolumeSource: api.VolumeSource{ + AzureFile: &api.AzureFileVolumeSource{ + SecretName: volName, + ShareName: volName, + }, + }, + } + return volume.NewSpecFromVolume(azureVolume), nil +} + // azureFile volumes represent mount of an AzureFile share. type azureFile struct { volName string diff --git a/pkg/volume/cephfs/cephfs.go b/pkg/volume/cephfs/cephfs.go index 18574ee9aeece..b42b3a07eaf3e 100644 --- a/pkg/volume/cephfs/cephfs.go +++ b/pkg/volume/cephfs/cephfs.go @@ -154,6 +154,19 @@ func (plugin *cephfsPlugin) newUnmounterInternal(volName string, podUID types.UI }, nil } +func (plugin *cephfsPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) { + cephfsVolume := &api.Volume{ + Name: volumeName, + VolumeSource: api.VolumeSource{ + CephFS: &api.CephFSVolumeSource{ + Monitors: []string{}, + Path: volumeName, + }, + }, + } + return volume.NewSpecFromVolume(cephfsVolume), nil +} + // CephFS volumes represent a bare host file or directory mount of an CephFS export. type cephfs struct { volName string diff --git a/pkg/volume/cinder/attacher.go b/pkg/volume/cinder/attacher.go index ecf11e1c74920..a35894c12a4f6 100644 --- a/pkg/volume/cinder/attacher.go +++ b/pkg/volume/cinder/attacher.go @@ -53,6 +53,11 @@ func (plugin *cinderPlugin) NewAttacher() (volume.Attacher, error) { }, nil } +func (plugin *cinderPlugin) GetDeviceMountRefs(deviceMountPath string) ([]string, error) { + mounter := plugin.host.GetMounter() + return mount.GetMountRefs(mounter, deviceMountPath) +} + func (attacher *cinderDiskAttacher) Attach(spec *volume.Spec, hostName string) (string, error) { volumeSource, _, err := getVolumeSource(spec) if err != nil { diff --git a/pkg/volume/cinder/cinder.go b/pkg/volume/cinder/cinder.go index 75e8a6d7352ff..24b9d073a1984 100644 --- a/pkg/volume/cinder/cinder.go +++ b/pkg/volume/cinder/cinder.go @@ -204,6 +204,25 @@ func (plugin *cinderPlugin) getCloudProvider() (CinderProvider, error) { } } +func (plugin *cinderPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) { + mounter := plugin.host.GetMounter() + pluginDir := plugin.host.GetPluginDir(plugin.GetPluginName()) + sourceName, err := mounter.GetDeviceNameFromMount(mountPath, pluginDir) + if err != nil { + return nil, err + } + glog.V(4).Infof("Found volume %s mounted to %s", sourceName, mountPath) + cinderVolume := &api.Volume{ + Name: volumeName, + VolumeSource: api.VolumeSource{ + Cinder: &api.CinderVolumeSource{ + VolumeID: sourceName, + }, + }, + } + return volume.NewSpecFromVolume(cinderVolume), nil +} + // Abstract interface to PD operations. type cdManager interface { // Attaches the disk to the kubelet's host machine. diff --git a/pkg/volume/configmap/configmap.go b/pkg/volume/configmap/configmap.go index 98c00c44ed205..6451951f765a3 100644 --- a/pkg/volume/configmap/configmap.go +++ b/pkg/volume/configmap/configmap.go @@ -86,6 +86,16 @@ func (plugin *configMapPlugin) NewUnmounter(volName string, podUID types.UID) (v return &configMapVolumeUnmounter{&configMapVolume{volName, podUID, plugin, plugin.host.GetMounter(), plugin.host.GetWriter(), volume.MetricsNil{}}}, nil } +func (plugin *configMapPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) { + configMapVolume := &api.Volume{ + Name: volumeName, + VolumeSource: api.VolumeSource{ + ConfigMap: &api.ConfigMapVolumeSource{}, + }, + } + return volume.NewSpecFromVolume(configMapVolume), nil +} + type configMapVolume struct { volName string podUID types.UID diff --git a/pkg/volume/downwardapi/downwardapi.go b/pkg/volume/downwardapi/downwardapi.go index 184b2c5859d74..970dee86ee441 100644 --- a/pkg/volume/downwardapi/downwardapi.go +++ b/pkg/volume/downwardapi/downwardapi.go @@ -106,6 +106,16 @@ func (plugin *downwardAPIPlugin) NewUnmounter(volName string, podUID types.UID) }, nil } +func (plugin *downwardAPIPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) { + downwardAPIVolume := &api.Volume{ + Name: volumeName, + VolumeSource: api.VolumeSource{ + DownwardAPI: &api.DownwardAPIVolumeSource{}, + }, + } + return volume.NewSpecFromVolume(downwardAPIVolume), nil +} + // downwardAPIVolume retrieves downward API data and placing them into the volume on the host. type downwardAPIVolume struct { volName string diff --git a/pkg/volume/empty_dir/empty_dir.go b/pkg/volume/empty_dir/empty_dir.go index 0653b2556ffd1..2a545945d0373 100644 --- a/pkg/volume/empty_dir/empty_dir.go +++ b/pkg/volume/empty_dir/empty_dir.go @@ -128,6 +128,16 @@ func (plugin *emptyDirPlugin) newUnmounterInternal(volName string, podUID types. return ed, nil } +func (plugin *emptyDirPlugin) ConstructVolumeSpec(volName, mountPath string) (*volume.Spec, error) { + emptyDirVolume := &api.Volume{ + Name: volName, + VolumeSource: api.VolumeSource{ + EmptyDir: &api.EmptyDirVolumeSource{}, + }, + } + return volume.NewSpecFromVolume(emptyDirVolume), nil +} + // mountDetector abstracts how to find what kind of mount a path is backed by. type mountDetector interface { // GetMountMedium determines what type of medium a given path is backed diff --git a/pkg/volume/fc/fc.go b/pkg/volume/fc/fc.go index 96e8518794920..98abdc8dd038b 100644 --- a/pkg/volume/fc/fc.go +++ b/pkg/volume/fc/fc.go @@ -141,6 +141,16 @@ func (plugin *fcPlugin) execCommand(command string, args []string) ([]byte, erro return cmd.CombinedOutput() } +func (plugin *fcPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) { + fcVolume := &api.Volume{ + Name: volumeName, + VolumeSource: api.VolumeSource{ + FC: &api.FCVolumeSource{}, + }, + } + return volume.NewSpecFromVolume(fcVolume), nil +} + type fcDisk struct { volName string podUID types.UID diff --git a/pkg/volume/flexvolume/flexvolume.go b/pkg/volume/flexvolume/flexvolume.go index eb4642ca7c00c..30fd59681d6b0 100644 --- a/pkg/volume/flexvolume/flexvolume.go +++ b/pkg/volume/flexvolume/flexvolume.go @@ -179,6 +179,18 @@ func (plugin *flexVolumePlugin) newUnmounterInternal(volName string, podUID type }, nil } +func (plugin *flexVolumePlugin) ConstructVolumeSpec(volumeName, sourceName string) (*volume.Spec, error) { + flexVolume := &api.Volume{ + Name: volumeName, + VolumeSource: api.VolumeSource{ + FlexVolume: &api.FlexVolumeSource{ + Driver: sourceName, + }, + }, + } + return volume.NewSpecFromVolume(flexVolume), nil +} + // flexVolume is the disk resource provided by this plugin. type flexVolumeDisk struct { // podUID is the UID of the pod. diff --git a/pkg/volume/flocker/plugin.go b/pkg/volume/flocker/plugin.go index 9bdf9bc3e689f..8cf3193723532 100644 --- a/pkg/volume/flocker/plugin.go +++ b/pkg/volume/flocker/plugin.go @@ -117,6 +117,18 @@ func (p *flockerPlugin) NewUnmounter(datasetName string, podUID types.UID) (volu return nil, nil } +func (p *flockerPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) { + flockerVolume := &api.Volume{ + Name: volumeName, + VolumeSource: api.VolumeSource{ + Flocker: &api.FlockerVolumeSource{ + DatasetName: volumeName, + }, + }, + } + return volume.NewSpecFromVolume(flockerVolume), nil +} + type flockerMounter struct { *flocker client flockerclient.Clientable diff --git a/pkg/volume/gce_pd/attacher.go b/pkg/volume/gce_pd/attacher.go index c60a02aeaeeb4..d3e78cd978d9b 100644 --- a/pkg/volume/gce_pd/attacher.go +++ b/pkg/volume/gce_pd/attacher.go @@ -53,6 +53,11 @@ func (plugin *gcePersistentDiskPlugin) NewAttacher() (volume.Attacher, error) { }, nil } +func (plugin *gcePersistentDiskPlugin) GetDeviceMountRefs(deviceMountPath string) ([]string, error) { + mounter := plugin.host.GetMounter() + return mount.GetMountRefs(mounter, deviceMountPath) +} + // Attach checks with the GCE cloud provider if the specified volume is already // attached to the specified node. If the volume is attached, it succeeds // (returns nil). If it is not, Attach issues a call to the GCE cloud provider diff --git a/pkg/volume/gce_pd/gce_pd.go b/pkg/volume/gce_pd/gce_pd.go index a3d57bd0e0c4a..cc0431e1c7e14 100644 --- a/pkg/volume/gce_pd/gce_pd.go +++ b/pkg/volume/gce_pd/gce_pd.go @@ -182,6 +182,24 @@ func (plugin *gcePersistentDiskPlugin) newProvisionerInternal(options volume.Vol }, nil } +func (plugin *gcePersistentDiskPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) { + mounter := plugin.host.GetMounter() + pluginDir := plugin.host.GetPluginDir(plugin.GetPluginName()) + sourceName, err := mounter.GetDeviceNameFromMount(mountPath, pluginDir) + if err != nil { + return nil, err + } + gceVolume := &api.Volume{ + Name: volumeName, + VolumeSource: api.VolumeSource{ + GCEPersistentDisk: &api.GCEPersistentDiskVolumeSource{ + PDName: sourceName, + }, + }, + } + return volume.NewSpecFromVolume(gceVolume), nil +} + // Abstract interface to PD operations. type pdManager interface { // Creates a volume diff --git a/pkg/volume/git_repo/git_repo.go b/pkg/volume/git_repo/git_repo.go index 9d2c2ca3a83e4..83aa0d95ac63d 100644 --- a/pkg/volume/git_repo/git_repo.go +++ b/pkg/volume/git_repo/git_repo.go @@ -107,6 +107,16 @@ func (plugin *gitRepoPlugin) NewUnmounter(volName string, podUID types.UID) (vol }, nil } +func (plugin *gitRepoPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) { + gitVolume := &api.Volume{ + Name: volumeName, + VolumeSource: api.VolumeSource{ + GitRepo: &api.GitRepoVolumeSource{}, + }, + } + return volume.NewSpecFromVolume(gitVolume), nil +} + // gitRepo volumes are directories which are pre-filled from a git repository. // These do not persist beyond the lifetime of a pod. type gitRepoVolume struct { diff --git a/pkg/volume/glusterfs/glusterfs.go b/pkg/volume/glusterfs/glusterfs.go index bbaea800aff3e..f4ccce0bf9797 100644 --- a/pkg/volume/glusterfs/glusterfs.go +++ b/pkg/volume/glusterfs/glusterfs.go @@ -145,6 +145,19 @@ func (plugin *glusterfsPlugin) execCommand(command string, args []string) ([]byt return cmd.CombinedOutput() } +func (plugin *glusterfsPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) { + glusterfsVolume := &api.Volume{ + Name: volumeName, + VolumeSource: api.VolumeSource{ + Glusterfs: &api.GlusterfsVolumeSource{ + EndpointsName: volumeName, + Path: volumeName, + }, + }, + } + return volume.NewSpecFromVolume(glusterfsVolume), nil +} + // Glusterfs volumes represent a bare host file or directory mount of an Glusterfs export. type glusterfs struct { volName string diff --git a/pkg/volume/host_path/host_path.go b/pkg/volume/host_path/host_path.go index 466f92f7a55b5..a29d4e6e61f69 100644 --- a/pkg/volume/host_path/host_path.go +++ b/pkg/volume/host_path/host_path.go @@ -138,6 +138,18 @@ func (plugin *hostPathPlugin) NewProvisioner(options volume.VolumeOptions) (volu return plugin.newProvisionerFunc(options, plugin.host) } +func (plugin *hostPathPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) { + hostPathVolume := &api.Volume{ + Name: volumeName, + VolumeSource: api.VolumeSource{ + HostPath: &api.HostPathVolumeSource{ + Path: volumeName, + }, + }, + } + return volume.NewSpecFromVolume(hostPathVolume), nil +} + func newRecycler(pvName string, spec *volume.Spec, host volume.VolumeHost, config volume.VolumeConfig) (volume.Recycler, error) { if spec.PersistentVolume == nil || spec.PersistentVolume.Spec.HostPath == nil { return nil, fmt.Errorf("spec.PersistentVolumeSource.HostPath is nil") diff --git a/pkg/volume/iscsi/iscsi.go b/pkg/volume/iscsi/iscsi.go index 9ca787c183c7d..248605dc8752f 100644 --- a/pkg/volume/iscsi/iscsi.go +++ b/pkg/volume/iscsi/iscsi.go @@ -146,6 +146,19 @@ func (plugin *iscsiPlugin) execCommand(command string, args []string) ([]byte, e return cmd.CombinedOutput() } +func (plugin *iscsiPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) { + iscsiVolume := &api.Volume{ + Name: volumeName, + VolumeSource: api.VolumeSource{ + ISCSI: &api.ISCSIVolumeSource{ + TargetPortal: volumeName, + IQN: volumeName, + }, + }, + } + return volume.NewSpecFromVolume(iscsiVolume), nil +} + type iscsiDisk struct { volName string podUID types.UID diff --git a/pkg/volume/nfs/nfs.go b/pkg/volume/nfs/nfs.go index 463addc6b75d8..4ca95781b6e84 100644 --- a/pkg/volume/nfs/nfs.go +++ b/pkg/volume/nfs/nfs.go @@ -136,6 +136,18 @@ func (plugin *nfsPlugin) NewRecycler(pvName string, spec *volume.Spec) (volume.R return plugin.newRecyclerFunc(pvName, spec, plugin.host, plugin.config) } +func (plugin *nfsPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) { + nfsVolume := &api.Volume{ + Name: volumeName, + VolumeSource: api.VolumeSource{ + NFS: &api.NFSVolumeSource{ + Path: volumeName, + }, + }, + } + return volume.NewSpecFromVolume(nfsVolume), nil +} + // NFS volumes represent a bare host file or directory mount of an NFS export. type nfs struct { volName string diff --git a/pkg/volume/plugins.go b/pkg/volume/plugins.go index 0ab0a73d983ab..0ff8939399469 100644 --- a/pkg/volume/plugins.go +++ b/pkg/volume/plugins.go @@ -98,6 +98,12 @@ type VolumePlugin interface { // - name: The volume name, as per the api.Volume spec. // - podUID: The UID of the enclosing pod NewUnmounter(name string, podUID types.UID) (Unmounter, error) + + // ConstructVolumeSpec constructs a volume spec based on the given volume name + // and mountPath. The spec may have incomplete information due to limited + // information from input. This function is used by volume manager to reconstruct + // volume spec by reading the volume directories from disk + ConstructVolumeSpec(volumeName, mountPath string) (*Spec, error) } // PersistentVolumePlugin is an extended interface of VolumePlugin and is used @@ -151,6 +157,7 @@ type AttachableVolumePlugin interface { VolumePlugin NewAttacher() (Attacher, error) NewDetacher() (Detacher, error) + GetDeviceMountRefs(deviceMountPath string) ([]string, error) } // VolumeHost is an interface that plugins can use to access the kubelet. diff --git a/pkg/volume/rbd/rbd.go b/pkg/volume/rbd/rbd.go index 94893cd7802bf..bfc852b17ef79 100644 --- a/pkg/volume/rbd/rbd.go +++ b/pkg/volume/rbd/rbd.go @@ -165,6 +165,18 @@ func (plugin *rbdPlugin) newUnmounterInternal(volName string, podUID types.UID, }, nil } +func (plugin *rbdPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) { + rbdVolume := &api.Volume{ + Name: volumeName, + VolumeSource: api.VolumeSource{ + RBD: &api.RBDVolumeSource{ + CephMonitors: []string{}, + }, + }, + } + return volume.NewSpecFromVolume(rbdVolume), nil +} + type rbd struct { volName string podUID types.UID diff --git a/pkg/volume/secret/secret.go b/pkg/volume/secret/secret.go index 0a397fa552ac7..9ca6911690ab8 100644 --- a/pkg/volume/secret/secret.go +++ b/pkg/volume/secret/secret.go @@ -110,6 +110,18 @@ func (plugin *secretPlugin) NewUnmounter(volName string, podUID types.UID) (volu }, nil } +func (plugin *secretPlugin) ConstructVolumeSpec(volName, mountPath string) (*volume.Spec, error) { + secretVolume := &api.Volume{ + Name: volName, + VolumeSource: api.VolumeSource{ + Secret: &api.SecretVolumeSource{ + SecretName: volName, + }, + }, + } + return volume.NewSpecFromVolume(secretVolume), nil +} + type secretVolume struct { volName string podUID types.UID diff --git a/pkg/volume/testing/testing.go b/pkg/volume/testing/testing.go index 391657fb9d9b4..09569333a4c08 100644 --- a/pkg/volume/testing/testing.go +++ b/pkg/volume/testing/testing.go @@ -288,6 +288,14 @@ func (plugin *FakeVolumePlugin) GetAccessModes() []api.PersistentVolumeAccessMod return []api.PersistentVolumeAccessMode{} } +func (plugin *FakeVolumePlugin) ConstructVolumeSpec(volumeName, mountPath string) (*Spec, error) { + return nil, nil +} + +func (plugin *FakeVolumePlugin) GetDeviceMountRefs(deviceMountPath string) ([]string, error) { + return []string{}, nil +} + type FakeVolume struct { sync.RWMutex PodUID types.UID diff --git a/pkg/volume/util/nestedpendingoperations/nestedpendingoperations.go b/pkg/volume/util/nestedpendingoperations/nestedpendingoperations.go index 9b3743f7bf58e..266b6cdf054bf 100644 --- a/pkg/volume/util/nestedpendingoperations/nestedpendingoperations.go +++ b/pkg/volume/util/nestedpendingoperations/nestedpendingoperations.go @@ -58,6 +58,10 @@ type NestedPendingOperations interface { // necessary during tests - the test should wait until all operations finish // and evaluate results after that. Wait() + + // IsOperationPending returns true if an operation for the given volumeName and podName is pending, + // otherwise it returns false + IsOperationPending(volumeName api.UniqueVolumeName, podName types.UniquePodName) bool } // NewNestedPendingOperations returns a new instance of NestedPendingOperations. @@ -74,7 +78,7 @@ type nestedPendingOperations struct { operations []operation exponentialBackOffOnError bool cond *sync.Cond - lock sync.Mutex + lock sync.RWMutex } type operation struct { @@ -90,29 +94,9 @@ func (grm *nestedPendingOperations) Run( operationFunc func() error) error { grm.lock.Lock() defer grm.lock.Unlock() - - var previousOp operation - opExists := false - previousOpIndex := -1 - for previousOpIndex, previousOp = range grm.operations { - if previousOp.volumeName != volumeName { - // No match, keep searching - continue - } - - if previousOp.podName != emptyUniquePodName && - podName != emptyUniquePodName && - previousOp.podName != podName { - // No match, keep searching - continue - } - - // Match - opExists = true - break - } - + opExists, previousOpIndex := grm.isOperationExists(volumeName, podName) if opExists { + previousOp := grm.operations[previousOpIndex] // Operation already exists if previousOp.operationPending { // Operation is pending @@ -153,6 +137,43 @@ func (grm *nestedPendingOperations) Run( return nil } +func (grm *nestedPendingOperations) IsOperationPending( + volumeName api.UniqueVolumeName, + podName types.UniquePodName) bool { + + grm.lock.RLock() + defer grm.lock.RUnlock() + + exist, previousOpIndex := grm.isOperationExists(volumeName, podName) + if exist && grm.operations[previousOpIndex].operationPending { + return true + } + return false +} + +func (grm *nestedPendingOperations) isOperationExists( + volumeName api.UniqueVolumeName, + podName types.UniquePodName) (bool, int) { + + for previousOpIndex, previousOp := range grm.operations { + if previousOp.volumeName != volumeName { + // No match, keep searching + continue + } + + if previousOp.podName != emptyUniquePodName && + podName != emptyUniquePodName && + previousOp.podName != podName { + // No match, keep searching + continue + } + + // Match + return true, previousOpIndex + } + return false, -1 +} + func (grm *nestedPendingOperations) getOperation( volumeName api.UniqueVolumeName, podName types.UniquePodName) (uint, error) { diff --git a/pkg/volume/util/operationexecutor/operation_executor.go b/pkg/volume/util/operationexecutor/operation_executor.go index 28c1e57d750a5..0f055f33b9eee 100644 --- a/pkg/volume/util/operationexecutor/operation_executor.go +++ b/pkg/volume/util/operationexecutor/operation_executor.go @@ -99,6 +99,10 @@ type OperationExecutor interface { // object, for example) then an error is returned which triggers exponential // back off on retries. VerifyControllerAttachedVolume(volumeToMount VolumeToMount, nodeName string, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error + + // IsOperationPending returns true if an operation for the given volumeName and podName is pending, + // otherwise it returns false + IsOperationPending(volumeName api.UniqueVolumeName, podName volumetypes.UniquePodName) bool } // NewOperationExecutor returns a new instance of OperationExecutor. @@ -339,6 +343,10 @@ type operationExecutor struct { pendingOperations nestedpendingoperations.NestedPendingOperations } +func (oe *operationExecutor) IsOperationPending(volumeName api.UniqueVolumeName, podName volumetypes.UniquePodName) bool { + return oe.pendingOperations.IsOperationPending(volumeName, podName) +} + func (oe *operationExecutor) AttachVolume( volumeToAttach VolumeToAttach, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error { @@ -391,6 +399,7 @@ func (oe *operationExecutor) MountVolume( func (oe *operationExecutor) UnmountVolume( volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) error { + unmountFunc, err := oe.generateUnmountVolumeFunc(volumeToUnmount, actualStateOfWorld) if err != nil { @@ -811,11 +820,14 @@ func (oe *operationExecutor) generateUnmountVolumeFunc( } glog.Infof( - "UnmountVolume.TearDown succeeded for volume %q (volume.spec.Name: %q) pod %q (UID: %q).", + "UnmountVolume.TearDown succeeded for volume %q (OuterVolumeSpecName: %q) pod %q (UID: %q). InnerVolumeSpecName %q. PluginName %q, VolumeGidValue %q", volumeToUnmount.VolumeName, volumeToUnmount.OuterVolumeSpecName, volumeToUnmount.PodName, - volumeToUnmount.PodUID) + volumeToUnmount.PodUID, + volumeToUnmount.InnerVolumeSpecName, + volumeToUnmount.PluginName, + volumeToUnmount.VolumeGidValue) // Update actual state of world markVolMountedErr := actualStateOfWorld.MarkVolumeAsUnmounted( @@ -879,7 +891,17 @@ func (oe *operationExecutor) generateUnmountDeviceFunc( deviceToDetach.VolumeSpec.Name(), err) } - + refs, err := attachableVolumePlugin.GetDeviceMountRefs(deviceMountPath) + if err != nil || len(refs) > 0 { + if err == nil { + err = fmt.Errorf("The device mount path %q is still mounted by other references %v", deviceMountPath, refs) + } + return fmt.Errorf( + "GetDeviceMountRefs check failed for volume %q (spec.Name: %q) with: %v", + deviceToDetach.VolumeName, + deviceToDetach.VolumeSpec.Name(), + err) + } // Execute unmount unmountDeviceErr := volumeDetacher.UnmountDevice(deviceMountPath) if unmountDeviceErr != nil { diff --git a/pkg/volume/util/volumehelper/volumehelper.go b/pkg/volume/util/volumehelper/volumehelper.go index 2ab765e3cd950..386daa1f3f6a2 100644 --- a/pkg/volume/util/volumehelper/volumehelper.go +++ b/pkg/volume/util/volumehelper/volumehelper.go @@ -54,8 +54,10 @@ func GetUniqueVolumeName(pluginName, volumeName string) api.UniqueVolumeName { // GetUniqueVolumeNameForNonAttachableVolume returns the unique volume name // for a non-attachable volume. -func GetUniqueVolumeNameForNonAttachableVolume(podName types.UniquePodName, volumePlugin volume.VolumePlugin, podSpecName string) api.UniqueVolumeName { - return api.UniqueVolumeName(fmt.Sprintf("%s/%v-%s", volumePlugin.GetPluginName(), podName, podSpecName)) +func GetUniqueVolumeNameForNonAttachableVolume( + podName types.UniquePodName, volumePlugin volume.VolumePlugin, volumeSpec *volume.Spec) api.UniqueVolumeName { + return api.UniqueVolumeName( + fmt.Sprintf("%s/%v-%s", volumePlugin.GetPluginName(), podName, volumeSpec.Name())) } // GetUniqueVolumeNameFromSpec uses the given VolumePlugin to generate a unique diff --git a/pkg/volume/vsphere_volume/vsphere_volume.go b/pkg/volume/vsphere_volume/vsphere_volume.go index 62f26c56a339b..31b337d047f6f 100644 --- a/pkg/volume/vsphere_volume/vsphere_volume.go +++ b/pkg/volume/vsphere_volume/vsphere_volume.go @@ -135,6 +135,18 @@ func (plugin *vsphereVolumePlugin) getCloudProvider() (*vsphere.VSphere, error) return vs, nil } +func (plugin *vsphereVolumePlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) { + vsphereVolume := &api.Volume{ + Name: volumeName, + VolumeSource: api.VolumeSource{ + VsphereVolume: &api.VsphereVirtualDiskVolumeSource{ + VolumePath: volumeName, + }, + }, + } + return volume.NewSpecFromVolume(vsphereVolume), nil +} + // Abstract interface to disk operations. type vdManager interface { // Attaches the disk to the kubelet's host machine.