Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kubernetes CSI - in-tree plugin Attacher/Detacher API #55809

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cmd/kube-controller-manager/app/plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"k8s.io/kubernetes/pkg/volume/azure_dd"
"k8s.io/kubernetes/pkg/volume/azure_file"
"k8s.io/kubernetes/pkg/volume/cinder"
"k8s.io/kubernetes/pkg/volume/csi"
"k8s.io/kubernetes/pkg/volume/fc"
"k8s.io/kubernetes/pkg/volume/flexvolume"
"k8s.io/kubernetes/pkg/volume/flocker"
Expand Down Expand Up @@ -79,6 +80,7 @@ func ProbeAttachableVolumePlugins() []volume.VolumePlugin {
allPlugins = append(allPlugins, fc.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, iscsi.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, rbd.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, csi.ProbeVolumePlugins()...)
return allPlugins
}

Expand All @@ -105,6 +107,7 @@ func ProbeExpandableVolumePlugins(config componentconfig.VolumeConfiguration) []
allPlugins = append(allPlugins, scaleio.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, storageos.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, fc.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, csi.ProbeVolumePlugins()...)
return allPlugins
}

Expand Down
2 changes: 2 additions & 0 deletions cmd/kubelet/app/plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"k8s.io/kubernetes/pkg/volume/cephfs"
"k8s.io/kubernetes/pkg/volume/cinder"
"k8s.io/kubernetes/pkg/volume/configmap"
"k8s.io/kubernetes/pkg/volume/csi"
"k8s.io/kubernetes/pkg/volume/downwardapi"
"k8s.io/kubernetes/pkg/volume/empty_dir"
"k8s.io/kubernetes/pkg/volume/fc"
Expand Down Expand Up @@ -96,6 +97,7 @@ func ProbeVolumePlugins() []volume.VolumePlugin {
allPlugins = append(allPlugins, scaleio.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, local.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, storageos.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, csi.ProbeVolumePlugins()...)
return allPlugins
}

Expand Down
1 change: 1 addition & 0 deletions hack/.golint_failures
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,7 @@ pkg/volume/azure_file
pkg/volume/cephfs
pkg/volume/cinder
pkg/volume/configmap
pkg/volume/csi/proto/csi/
pkg/volume/empty_dir
pkg/volume/fc
pkg/volume/flexvolume
Expand Down
34 changes: 34 additions & 0 deletions pkg/apis/core/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,9 @@ type PersistentVolumeSource struct {
// More info: https://releases.k8s.io/HEAD/examples/volumes/storageos/README.md
// +optional
StorageOS *StorageOSPersistentVolumeSource
// CSI (Container Storage Interface) represents storage that handled by an external CSI driver
// +optional
CSI *CSIPersistentVolumeSource
}

type PersistentVolumeClaimVolumeSource struct {
Expand Down Expand Up @@ -1503,6 +1506,37 @@ type LocalVolumeSource struct {
Path string
}

// Represents storage that is managed by an external CSI volume driver
type CSIPersistentVolumeSource struct {
// Driver is the name of the driver to use for this volume.
// Required.
Driver string

// VolumeHandle is the unique volume name returned by the CSI volume
// plugin’s CreateVolume to refer to the volume on all subsequent calls.
// Required.
VolumeHandle string

// Optional: MountSecretRef is a reference to the secret object containing
// sensitive information to pass to the CSI driver during NodePublish.
// This may be empty if no secret is required. If the secret object contains
// more than one secret, all secrets are passed.
// +optional
MountSecretRef *SecretReference

// Optional: AttachSecretRef is a reference to the secret object containing
// sensitive information to pass to the CSI driver during ControllerPublish.
// This may be empty if no secret is required. If the secret object contains
// more than one secret, all secrets are passed.
// +optional
AttachSecretRef *SecretReference

// Optional: The value to pass to ControllerPublishVolumeRequest.
// Defaults to false (read/write).
// +optional
ReadOnly bool
}

// ContainerPort represents a network port in a single container
type ContainerPort struct {
// Optional: If specified, this must be an IANA_SVC_NAME Each named port
Expand Down
22 changes: 22 additions & 0 deletions pkg/apis/core/validation/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -1318,6 +1318,19 @@ func validateStorageOSPersistentVolumeSource(storageos *core.StorageOSPersistent
return allErrs
}

func validateCSIPersistentVolumeSource(csi *api.CSIPersistentVolumeSource, fldPath *field.Path) field.ErrorList {
allErrs := field.ErrorList{}
if len(csi.Driver) == 0 {
allErrs = append(allErrs, field.Required(fldPath.Child("driver"), ""))
}

if len(csi.VolumeHandle) == 0 {
allErrs = append(allErrs, field.Required(fldPath.Child("volumeHandle"), ""))
}

return allErrs
}

// ValidatePersistentVolumeName checks that a name is appropriate for a
// PersistentVolumeName object.
var ValidatePersistentVolumeName = NameIsDNSSubdomain
Expand Down Expand Up @@ -1541,6 +1554,15 @@ func ValidatePersistentVolume(pv *core.PersistentVolume) field.ErrorList {
}
}

if pv.Spec.CSI != nil {
if numVolumes > 0 {
allErrs = append(allErrs, field.Forbidden(specPath.Child("csi"), "may not specify more than 1 volume type"))
} else {
numVolumes++
allErrs = append(allErrs, validateCSIPersistentVolumeSource(pv.Spec.CSI, specPath.Child("csi"))...)
}
}

if numVolumes == 0 {
allErrs = append(allErrs, field.Required(specPath, "must specify a volume type"))
}
Expand Down
46 changes: 46 additions & 0 deletions pkg/apis/core/validation/validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1353,6 +1353,52 @@ func TestValidateGlusterfs(t *testing.T) {
}
}

func TestValidateCSIVolumeSource(t *testing.T) {
testCases := []struct {
name string
csi *api.CSIPersistentVolumeSource
errtype field.ErrorType
errfield string
}{
{
name: "all required fields ok",
csi: &api.CSIPersistentVolumeSource{Driver: "test-driver", VolumeHandle: "test-123", ReadOnly: true},
},
{
name: "with default values ok",
csi: &api.CSIPersistentVolumeSource{Driver: "test-driver", VolumeHandle: "test-123"},
},
{
name: "missing driver name",
csi: &api.CSIPersistentVolumeSource{VolumeHandle: "test-123"},
errtype: field.ErrorTypeRequired,
errfield: "driver",
},
{
name: "missing volume handle",
csi: &api.CSIPersistentVolumeSource{Driver: "my-driver"},
errtype: field.ErrorTypeRequired,
errfield: "volumeHandle",
},
}

for i, tc := range testCases {
errs := validateCSIPersistentVolumeSource(tc.csi, field.NewPath("field"))

if len(errs) > 0 && tc.errtype == "" {
t.Errorf("[%d: %q] unexpected error(s): %v", i, tc.name, errs)
} else if len(errs) == 0 && tc.errtype != "" {
t.Errorf("[%d: %q] expected error type %v", i, tc.name, tc.errtype)
} else if len(errs) >= 1 {
if errs[0].Type != tc.errtype {
t.Errorf("[%d: %q] expected error type %v, got %v", i, tc.name, tc.errtype, errs[0].Type)
} else if !strings.HasSuffix(errs[0].Field, "."+tc.errfield) {
t.Errorf("[%d: %q] expected error on field %q, got %q", i, tc.name, tc.errfield, errs[0].Field)
}
}
}
}

// helper
func newInt32(val int) *int32 {
p := new(int32)
Expand Down
1 change: 1 addition & 0 deletions pkg/apis/extensions/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -919,6 +919,7 @@ var (
Projected FSType = "projected"
PortworxVolume FSType = "portworxVolume"
ScaleIO FSType = "scaleIO"
CSI FSType = "csi"
All FSType = "*"
)

Expand Down
10 changes: 10 additions & 0 deletions pkg/printers/internalversion/describe.go
Original file line number Diff line number Diff line change
Expand Up @@ -1073,6 +1073,14 @@ func printFlockerVolumeSource(flocker *api.FlockerVolumeSource, w PrefixWriter)
flocker.DatasetName, flocker.DatasetUUID)
}

func printCSIPersistentVolumeSource(csi *api.CSIPersistentVolumeSource, w PrefixWriter) {
w.Write(LEVEL_2, "Type:\tCSI (a Container Storage Interface (CSI) volume source)\n"+
" Driver:\t%v\n"+
" VolumeHandle:\t%v\n",
" ReadOnly:\t%v\n",
csi.Driver, csi.VolumeHandle, csi.ReadOnly)
}

type PersistentVolumeDescriber struct {
clientset.Interface
}
Expand Down Expand Up @@ -1156,6 +1164,8 @@ func describePersistentVolume(pv *api.PersistentVolume, events *api.EventList) (
printFlexVolumeSource(pv.Spec.FlexVolume, w)
case pv.Spec.Flocker != nil:
printFlockerVolumeSource(pv.Spec.Flocker, w)
case pv.Spec.CSI != nil:
printCSIPersistentVolumeSource(pv.Spec.CSI, w)
default:
w.Write(LEVEL_1, "<unknown>\n")
}
Expand Down
1 change: 1 addition & 0 deletions pkg/security/podsecuritypolicy/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func GetAllFSTypesAsSet() sets.String {
string(extensions.Projected),
string(extensions.PortworxVolume),
string(extensions.ScaleIO),
string(extensions.CSI),
)
return fstypes
}
Expand Down
147 changes: 147 additions & 0 deletions pkg/volume/csi/csi_attacher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
Copyright 2014 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package csi

import (
"crypto/sha256"
"errors"
"fmt"
"time"

"github.com/golang/glog"

"k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1alpha1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/pkg/volume"
)

type csiAttacher struct {
plugin *csiPlugin
k8s kubernetes.Interface
waitSleepTime time.Duration
}

// volume.Attacher methods
var _ volume.Attacher = &csiAttacher{}

func (c *csiAttacher) Attach(spec *volume.Spec, nodeName types.NodeName) (string, error) {
if spec == nil {
return "", errors.New("missing spec")
}

if spec.PersistentVolume == nil {
return "", errors.New("missing persistent volume")
}

// namespace := spec.PersistentVolume.GetObjectMeta().GetNamespace()
pvName := spec.PersistentVolume.GetName()
attachID := fmt.Sprintf("pv-%s", hashAttachmentName(pvName, string(nodeName)))

attachment := &storage.VolumeAttachment{
ObjectMeta: meta.ObjectMeta{
Name: attachID,
// Namespace: namespace, TODO should VolumeAttachment namespaced ?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, VolumeAttachment, like PersistentVolume is non-namespaced because it is a cluster resource.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, got it.

},
Spec: storage.VolumeAttachmentSpec{
NodeName: string(nodeName),
Attacher: csiPluginName,
Source: storage.VolumeAttachmentSource{
PersistentVolumeName: &pvName,
},
},
}
attach, err := c.k8s.StorageV1alpha1().VolumeAttachments().Create(attachment)
if err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should ignore AlreadyExists error here - Attach will be called periodically, but VolumeAttachment will be already there.

There is IsAlreadyExists(err) defined in k8s.io/apimachinery/pkg/api/errors/errors.go.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah ok, thanks will look into this - TODO P1

glog.Error(log("attacher.Attach failed: %v", err))
return "", err
}
glog.V(4).Info(log("volume attachment sent: [%v]", attach.GetName()))

return attach.GetName(), nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should wait for a while until VolumeAttachment.Status.Attached gets true. Traditionally, AWS and GCE wait for 20 minutes (!). And return when a new error / timestamp appears in VolumeAttachment.Status.AttachError .

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: Probably not actually do a wait loop here, but instead act on the event state change as a next step, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jsafrane I thought the Attacher.WaitForAttach() method is where that is done ? But will look into AWS/GCE for clues. TODO P1.

}

func (c *csiAttacher) WaitForAttach(spec *volume.Spec, attachID string, pod *v1.Pod, timeout time.Duration) (string, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WaitForAttach is executed on the node and it waits for a device to appear in /dev there. So, all the code below should be actually part of Attach() call above. There is nothing we can do on the node to wait for the device - CSI does not tell us the device at all and WaitForAttach should be probably empty.

Copy link
Member Author

@vladimirvivien vladimirvivien Nov 20, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jsafrane ok I see your point. I can move the wait logic from WaitForAttach right inside Attach. TODO P1

glog.V(4).Info(log("waiting for attachment update from CSI driver [attachment.ID=%v]", attachID))

source, err := getCSISourceFromSpec(spec)
if err != nil {
glog.Error(log("attach.WaitForAttach failed to get volume source: %v", err))
return "", err
}

ticker := time.NewTicker(c.waitSleepTime)
defer ticker.Stop()

timer := time.NewTimer(timeout)
defer timer.Stop()

for {
select {
case <-ticker.C:
glog.V(4).Info(log("probing VolumeAttachment [id=%v]", attachID))
attach, err := c.k8s.StorageV1alpha1().VolumeAttachments().Get(attachID, meta.GetOptions{})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a TODO to revisit this and replace it with a watch instead of poll.

Copy link
Member Author

@vladimirvivien vladimirvivien Nov 20, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok will do. TODO P2

if err != nil {
// log error, but continue to check again
glog.Error(log("attacher.WaitForAttach failed (will continue to try): %v", err))
}
// attachment OK
if attach.Status.Attached {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If err != nil can attacher.Status be nil, resulting in nil pointer reference?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, good catch, will add a continue (thought I had that in there already). TODO P1.

return attachID, nil
}
// driver reports attach error
attachErr := attach.Status.AttachError
if attachErr != nil {
glog.Error(log("attachment for %v failed: %v", source.VolumeHandle, attachErr.Message))
return "", errors.New(attachErr.Message)
}
case <-timer.C:
glog.Error(log("attacher.WaitForAttach timeout after %v [volume=%v; attachment.ID=%v]", timeout, source.VolumeHandle, attachID))
return "", fmt.Errorf("attachment timeout for volume %v", source.VolumeHandle)
}

}
}

func (c *csiAttacher) VolumesAreAttached(specs []*volume.Spec, nodeName types.NodeName) (map[*volume.Spec]bool, error) {
return nil, errors.New("unimplemented")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This must be implemented. At least you should check that VolumeAttachment still exists and it has Attached==true

Copy link
Member Author

@vladimirvivien vladimirvivien Nov 20, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, will implement attacher.VolumesAreAttached method as suggested. TODO P1.

}

func (c *csiAttacher) GetDeviceMountPath(spec *volume.Spec) (string, error) {
return "", errors.New("unimplemented")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This must be implemented and it should be the same as GetPath()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably we don't need to implement this as we don't implement MountDevice.

Copy link
Member Author

@vladimirvivien vladimirvivien Nov 20, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok got it. attacher.GetDeviceMountPath not needed.

}

func (c *csiAttacher) MountDevice(spec *volume.Spec, devicePath string, deviceMountPath string) error {
return errors.New("unimplemented")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This must be implemented. It should be the same as SetUp. DevicePath is probably empty.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably we don't need to implement this, kubelet should use SetupAt. But it should not return an error though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jsafrane the last sentence But should not return an error... needs clarification because I am returning errors when things go wrong in SetupAt()

}

var _ volume.Detacher = &csiAttacher{}

func (c *csiAttacher) Detach(deviceName string, nodeName types.NodeName) error {
return errors.New("unimplemented")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete VolumeAttachment and wait until it's really deleted. Wait loop should be very similar to Attach().

Copy link
Member Author

@vladimirvivien vladimirvivien Nov 20, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, got it. Implement attacher.Detach - TODO P1

}

func (c *csiAttacher) UnmountDevice(deviceMountPath string) error {
return errors.New("unimplemented")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This must be implemented and should follow the same pattern as TearDown

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably we don't need to implement this, kubelet should use TearDown. But it should not return an error though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jsafrane ok will need to understand why not returning error.

Copy link
Member Author

@vladimirvivien vladimirvivien Nov 20, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. it refers to the commented method, not TearDown.

}

func hashAttachmentName(pvName, nodeName string) string {
result := sha256.Sum256([]byte(fmt.Sprintf("%s%s", pvName, nodeName)))
return fmt.Sprintf("%x", result)
}
Loading