diff --git a/pkg/controller/volume/attachdetach/cache/desired_state_of_world.go b/pkg/controller/volume/attachdetach/cache/desired_state_of_world.go index 57ee9253ec1d0..9ce0affe3d9ce 100644 --- a/pkg/controller/volume/attachdetach/cache/desired_state_of_world.go +++ b/pkg/controller/volume/attachdetach/cache/desired_state_of_world.go @@ -105,6 +105,10 @@ type DesiredStateOfWorld interface { // Mark multiattach error as reported to prevent spamming multiple // events for same error SetMultiAttachError(v1.UniqueVolumeName, k8stypes.NodeName) + + // GetPodsOnNodes returns list of pods ("namespace/name") that require + // given volume on given nodes. + GetVolumePodsOnNodes(nodes []k8stypes.NodeName, volumeName v1.UniqueVolumeName) []*v1.Pod } // VolumeToAttach represents a volume that should be attached to a node. @@ -412,3 +416,24 @@ func (dsw *desiredStateOfWorld) GetPodToAdd() map[types.UniquePodName]PodToAdd { } return pods } + +func (dsw *desiredStateOfWorld) GetVolumePodsOnNodes(nodes []k8stypes.NodeName, volumeName v1.UniqueVolumeName) []*v1.Pod { + dsw.RLock() + defer dsw.RUnlock() + + pods := []*v1.Pod{} + for _, nodeName := range nodes { + node, ok := dsw.nodesManaged[nodeName] + if !ok { + continue + } + volume, ok := node.volumesToAttach[volumeName] + if !ok { + continue + } + for _, pod := range volume.scheduledPods { + pods = append(pods, pod.podObj) + } + } + return pods +} diff --git a/pkg/controller/volume/attachdetach/cache/desired_state_of_world_test.go b/pkg/controller/volume/attachdetach/cache/desired_state_of_world_test.go index 7d909e5d3d600..c4a974acad9cb 100644 --- a/pkg/controller/volume/attachdetach/cache/desired_state_of_world_test.go +++ b/pkg/controller/volume/attachdetach/cache/desired_state_of_world_test.go @@ -1032,3 +1032,49 @@ func verifyVolumeToAttach( t.Fatalf("volumesToAttach (%v) should contain %q/%q. It does not.", volumesToAttach, expectedVolumeName, expectedNodeName) } + +func Test_GetPodsOnNodes(t *testing.T) { + volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t) + dsw := NewDesiredStateOfWorld(volumePluginMgr) + + // 2 nodes, each with one pod with a different volume + node1Name := k8stypes.NodeName("node1-name") + pod1Name := "pod1-uid" + volume1Name := v1.UniqueVolumeName("volume1-name") + volume1Spec := controllervolumetesting.GetTestVolumeSpec(string(volume1Name), volume1Name) + dsw.AddNode(node1Name, false /*keepTerminatedPodVolumes*/) + generatedVolume1Name, podAddErr := dsw.AddPod(types.UniquePodName(pod1Name), controllervolumetesting.NewPod(pod1Name, pod1Name), volume1Spec, node1Name) + if podAddErr != nil { + t.Fatalf( + "AddPod failed for pod %q. Expected: Actual: <%v>", + pod1Name, + podAddErr) + } + node2Name := k8stypes.NodeName("node2-name") + pod2Name := "pod2-uid" + volume2Name := v1.UniqueVolumeName("volume2-name") + volume2Spec := controllervolumetesting.GetTestVolumeSpec(string(volume2Name), volume2Name) + dsw.AddNode(node2Name, false /*keepTerminatedPodVolumes*/) + _, podAddErr = dsw.AddPod(types.UniquePodName(pod2Name), controllervolumetesting.NewPod(pod2Name, pod2Name), volume2Spec, node2Name) + if podAddErr != nil { + t.Fatalf( + "AddPod failed for pod %q. Expected: Actual: <%v>", + pod2Name, + podAddErr) + } + + // Third node without any pod + node3Name := k8stypes.NodeName("node3-name") + dsw.AddNode(node3Name, false /*keepTerminatedPodVolumes*/) + + // Act + pods := dsw.GetVolumePodsOnNodes([]k8stypes.NodeName{node1Name, node2Name, node3Name, "non-existing-node"}, generatedVolume1Name) + + // Assert + if len(pods) != 1 { + t.Fatalf("Expected 1 pod, got %d", len(pods)) + } + if pods[0].Name != pod1Name { + t.Errorf("Expected pod %s/%s, got %s", pod1Name, pod1Name, pods[0].Name) + } +} diff --git a/pkg/controller/volume/attachdetach/reconciler/BUILD b/pkg/controller/volume/attachdetach/reconciler/BUILD index 16379f5c0b9c1..45b51253e0dc3 100644 --- a/pkg/controller/volume/attachdetach/reconciler/BUILD +++ b/pkg/controller/volume/attachdetach/reconciler/BUILD @@ -19,6 +19,7 @@ go_library( "//pkg/volume/util/operationexecutor:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/client-go/tools/record:go_default_library", ], @@ -34,6 +35,7 @@ go_test( "//pkg/controller/volume/attachdetach/cache:go_default_library", "//pkg/controller/volume/attachdetach/statusupdater:go_default_library", "//pkg/controller/volume/attachdetach/testing:go_default_library", + "//pkg/util/strings:go_default_library", "//pkg/volume/testing:go_default_library", "//pkg/volume/util/operationexecutor:go_default_library", "//pkg/volume/util/types:go_default_library", diff --git a/pkg/controller/volume/attachdetach/reconciler/reconciler.go b/pkg/controller/volume/attachdetach/reconciler/reconciler.go index ac81f98c6ae10..32b529c27e45d 100644 --- a/pkg/controller/volume/attachdetach/reconciler/reconciler.go +++ b/pkg/controller/volume/attachdetach/reconciler/reconciler.go @@ -21,10 +21,12 @@ package reconciler import ( "fmt" + "strings" "time" "github.com/golang/glog" "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/record" "k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache" @@ -269,12 +271,8 @@ func (rc *reconciler) attachDesiredVolumes() { nodes := rc.actualStateOfWorld.GetNodesForVolume(volumeToAttach.VolumeName) if len(nodes) > 0 { if !volumeToAttach.MultiAttachErrorReported { - simpleMsg, detailedMsg := volumeToAttach.GenerateMsg("Multi-Attach error", "Volume is already exclusively attached to one node and can't be attached to another") - for _, pod := range volumeToAttach.ScheduledPods { - rc.recorder.Eventf(pod, v1.EventTypeWarning, kevents.FailedAttachVolume, simpleMsg) - } + rc.reportMultiAttachError(volumeToAttach, nodes) rc.desiredStateOfWorld.SetMultiAttachError(volumeToAttach.VolumeName, volumeToAttach.NodeName) - glog.Warningf(detailedMsg) } continue } @@ -292,5 +290,78 @@ func (rc *reconciler) attachDesiredVolumes() { glog.Errorf(volumeToAttach.GenerateErrorDetailed("attacherDetacher.AttachVolume failed to start", err).Error()) } } +} + +// reportMultiAttachError sends events and logs situation that a volume that +// should be attached to a node is already attached to different node(s). +func (rc *reconciler) reportMultiAttachError(volumeToAttach cache.VolumeToAttach, nodes []types.NodeName) { + // Filter out the current node from list of nodes where the volume is + // attached. + // Some methods need []string, some other needs []NodeName, collect both. + // In theory, these arrays should have always only one element - the + // controller does not allow more than one attachment. But use array just + // in case... + otherNodes := []types.NodeName{} + otherNodesStr := []string{} + for _, node := range nodes { + if node != volumeToAttach.NodeName { + otherNodes = append(otherNodes, node) + otherNodesStr = append(otherNodesStr, string(node)) + } + } + + // Get list of pods that use the volume on the other nodes. + pods := rc.desiredStateOfWorld.GetVolumePodsOnNodes(otherNodes, volumeToAttach.VolumeName) + + if len(pods) == 0 { + // We did not find any pods that requests the volume. The pod must have been deleted already. + simpleMsg, _ := volumeToAttach.GenerateMsg("Multi-Attach error", "Volume is already exclusively attached to one node and can't be attached to another") + for _, pod := range volumeToAttach.ScheduledPods { + rc.recorder.Eventf(pod, v1.EventTypeWarning, kevents.FailedAttachVolume, simpleMsg) + } + // Log detailed message to system admin + nodeList := strings.Join(otherNodesStr, ", ") + detailedMsg := volumeToAttach.GenerateMsgDetailed("Multi-Attach error", fmt.Sprintf("Volume is already exclusively attached to node %s and can't be attached to another", nodeList)) + glog.Warningf(detailedMsg) + return + } + // There are pods that require the volume and run on another node. Typically + // it's user error, e.g. a ReplicaSet uses a PVC and has >1 replicas. Let + // the user know what pods are blocking the volume. + for _, scheduledPod := range volumeToAttach.ScheduledPods { + // Each scheduledPod must get a custom message. They can run in + // different namespaces and user of a namespace should not see names of + // pods in other namespaces. + localPodNames := []string{} // Names of pods in scheduledPods's namespace + otherPods := 0 // Count of pods in other namespaces + for _, pod := range pods { + if pod.Namespace == scheduledPod.Namespace { + localPodNames = append(localPodNames, pod.Name) + } else { + otherPods++ + } + } + + var msg string + if len(localPodNames) > 0 { + msg = fmt.Sprintf("Volume is already used by pod(s) %s", strings.Join(localPodNames, ", ")) + if otherPods > 0 { + msg = fmt.Sprintf("%s and %d pod(s) in different namespaces", msg, otherPods) + } + } else { + // No local pods, there are pods only in different namespaces. + msg = fmt.Sprintf("Volume is already used by %d pod(s) in different namespaces", otherPods) + } + simpleMsg, _ := volumeToAttach.GenerateMsg("Multi-Attach error", msg) + rc.recorder.Eventf(scheduledPod, v1.EventTypeWarning, kevents.FailedAttachVolume, simpleMsg) + } + + // Log all pods for system admin + podNames := []string{} + for _, pod := range pods { + podNames = append(podNames, pod.Namespace+"/"+pod.Name) + } + detailedMsg := volumeToAttach.GenerateMsgDetailed("Multi-Attach error", fmt.Sprintf("Volume is already used by pods %s on node %s", strings.Join(podNames, ", "), strings.Join(otherNodesStr, ", "))) + glog.Warningf(detailedMsg) } diff --git a/pkg/controller/volume/attachdetach/reconciler/reconciler_test.go b/pkg/controller/volume/attachdetach/reconciler/reconciler_test.go index 9d7f2aefbdf20..72e2db61ab742 100644 --- a/pkg/controller/volume/attachdetach/reconciler/reconciler_test.go +++ b/pkg/controller/volume/attachdetach/reconciler/reconciler_test.go @@ -29,6 +29,7 @@ import ( "k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache" "k8s.io/kubernetes/pkg/controller/volume/attachdetach/statusupdater" controllervolumetesting "k8s.io/kubernetes/pkg/controller/volume/attachdetach/testing" + stringutil "k8s.io/kubernetes/pkg/util/strings" volumetesting "k8s.io/kubernetes/pkg/volume/testing" "k8s.io/kubernetes/pkg/volume/util/operationexecutor" "k8s.io/kubernetes/pkg/volume/util/types" @@ -531,6 +532,115 @@ func Test_Run_OneVolumeAttachAndDetachMultipleNodesWithReadWriteOnce(t *testing. waitForTotalAttachCallCount(t, 2 /* expectedAttachCallCount */, fakePlugin) } +func Test_ReportMultiAttachError(t *testing.T) { + type nodeWithPods struct { + name k8stypes.NodeName + podNames []string + } + tests := []struct { + name string + nodes []nodeWithPods + expectedEvents []string + }{ + { + "no pods use the volume", + []nodeWithPods{ + {"node1", []string{"ns1/pod1"}}, + }, + []string{"Warning FailedAttachVolume Multi-Attach error for volume \"volume-name\" Volume is already exclusively attached to one node and can't be attached to another"}, + }, + { + "pods in the same namespace use the volume", + []nodeWithPods{ + {"node1", []string{"ns1/pod1"}}, + {"node2", []string{"ns1/pod2"}}, + }, + []string{"Warning FailedAttachVolume Multi-Attach error for volume \"volume-name\" Volume is already used by pod(s) pod2"}, + }, + { + "pods in anotother namespace use the volume", + []nodeWithPods{ + {"node1", []string{"ns1/pod1"}}, + {"node2", []string{"ns2/pod2"}}, + }, + []string{"Warning FailedAttachVolume Multi-Attach error for volume \"volume-name\" Volume is already used by 1 pod(s) in different namespaces"}, + }, + { + "pods both in the same and anotother namespace use the volume", + []nodeWithPods{ + {"node1", []string{"ns1/pod1"}}, + {"node2", []string{"ns2/pod2"}}, + {"node3", []string{"ns1/pod3"}}, + }, + []string{"Warning FailedAttachVolume Multi-Attach error for volume \"volume-name\" Volume is already used by pod(s) pod3 and 1 pod(s) in different namespaces"}, + }, + } + + for _, test := range tests { + // Arrange + t.Logf("Test %q starting", test.name) + volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t) + dsw := cache.NewDesiredStateOfWorld(volumePluginMgr) + asw := cache.NewActualStateOfWorld(volumePluginMgr) + fakeKubeClient := controllervolumetesting.CreateTestClient() + fakeRecorder := record.NewFakeRecorder(100) + fakeHandler := volumetesting.NewBlockVolumePathHandler() + ad := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator( + fakeKubeClient, + volumePluginMgr, + fakeRecorder, + false, /* checkNodeCapabilitiesBeforeMount */ + fakeHandler)) + nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */) + rc := NewReconciler( + reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu, fakeRecorder) + + nodes := []k8stypes.NodeName{} + for _, n := range test.nodes { + dsw.AddNode(n.name, false /*keepTerminatedPodVolumes*/) + nodes = append(nodes, n.name) + for _, podName := range n.podNames { + volumeName := v1.UniqueVolumeName("volume-name") + volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName) + volumeSpec.PersistentVolume.Spec.AccessModes = []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce} + uid := string(n.name) + "-" + podName // unique UID + namespace, name := stringutil.SplitQualifiedName(podName) + pod := controllervolumetesting.NewPod(uid, name) + pod.Namespace = namespace + _, err := dsw.AddPod(types.UniquePodName(uid), pod, volumeSpec, n.name) + if err != nil { + t.Fatalf("Error adding pod %s to DSW: %s", podName, err) + } + } + } + // Act + volumes := dsw.GetVolumesToAttach() + for _, vol := range volumes { + if vol.NodeName == "node1" { + rc.(*reconciler).reportMultiAttachError(vol, nodes) + } + } + + // Assert + close(fakeRecorder.Events) + index := 0 + for event := range fakeRecorder.Events { + if len(test.expectedEvents) < index { + t.Errorf("Test %q: unexpected event received: %s", test.name, event) + } else { + expectedEvent := test.expectedEvents[index] + if expectedEvent != event { + t.Errorf("Test %q: event %d: expected %q, got %q", test.name, index, expectedEvent, event) + } + } + index++ + } + for i := index; i < len(test.expectedEvents); i++ { + t.Errorf("Test %q: event %d: expected %q, got none", test.name, i, test.expectedEvents[i]) + } + } +} + func waitForMultiAttachErrorOnNode( t *testing.T, attachedNode k8stypes.NodeName,