diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 8f9283856d7fe..eec5d2c8b135b 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -137,6 +137,7 @@ func NewCMServer() *CMServer { PersistentVolumeRecyclerIncrementTimeoutNFS: 30, PersistentVolumeRecyclerMinimumTimeoutHostPath: 60, PersistentVolumeRecyclerIncrementTimeoutHostPath: 30, + EnableHostPathProvisioning: false, }, KubeAPIQPS: 20.0, KubeAPIBurst: 30, @@ -176,6 +177,7 @@ type VolumeConfigFlags struct { PersistentVolumeRecyclerPodTemplateFilePathHostPath string PersistentVolumeRecyclerMinimumTimeoutHostPath int PersistentVolumeRecyclerIncrementTimeoutHostPath int + EnableHostPathProvisioning bool } // AddFlags adds flags for a specific CMServer to the specified FlagSet @@ -201,6 +203,7 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&s.VolumeConfigFlags.PersistentVolumeRecyclerPodTemplateFilePathHostPath, "pv-recycler-pod-template-filepath-hostpath", s.VolumeConfigFlags.PersistentVolumeRecyclerPodTemplateFilePathHostPath, "The file path to a pod definition used as a template for HostPath persistent volume recycling. This is for development and testing only and will not work in a multi-node cluster.") fs.IntVar(&s.VolumeConfigFlags.PersistentVolumeRecyclerMinimumTimeoutHostPath, "pv-recycler-minimum-timeout-hostpath", s.VolumeConfigFlags.PersistentVolumeRecyclerMinimumTimeoutHostPath, "The minimum ActiveDeadlineSeconds to use for a HostPath Recycler pod. This is for development and testing only and will not work in a multi-node cluster.") fs.IntVar(&s.VolumeConfigFlags.PersistentVolumeRecyclerIncrementTimeoutHostPath, "pv-recycler-timeout-increment-hostpath", s.VolumeConfigFlags.PersistentVolumeRecyclerIncrementTimeoutHostPath, "the increment of time added per Gi to ActiveDeadlineSeconds for a HostPath scrubber pod. This is for development and testing only and will not work in a multi-node cluster.") + fs.BoolVar(&s.VolumeConfigFlags.EnableHostPathProvisioning, "enable-hostpath-provisioner", s.VolumeConfigFlags.EnableHostPathProvisioning, "Enable HostPath PV provisioning when running without a cloud provider. This allows testing and development of provisioning features. HostPath provisioning is not supported in any way, won't work in a multi-node cluster, and should not be used for anything other than testing or development.") fs.IntVar(&s.TerminatedPodGCThreshold, "terminated-pod-gc-threshold", s.TerminatedPodGCThreshold, "Number of terminated pods that can exist before the terminated pod garbage collector starts deleting terminated pods. If <= 0, the terminated pod garbage collector is disabled.") fs.DurationVar(&s.HorizontalPodAutoscalerSyncPeriod, "horizontal-pod-autoscaler-sync-period", s.HorizontalPodAutoscalerSyncPeriod, "The period for syncing the number of pods in horizontal pod autoscaler.") fs.DurationVar(&s.DeploymentControllerSyncPeriod, "deployment-controller-sync-period", s.DeploymentControllerSyncPeriod, "Period for syncing the deployments.") @@ -385,15 +388,29 @@ func (s *CMServer) Run(_ []string) error { } } + volumePlugins := ProbeRecyclableVolumePlugins(s.VolumeConfigFlags) + provisioner, err := NewVolumeProvisioner(cloud, s.VolumeConfigFlags) + if err != nil { + glog.Fatal("A Provisioner could not be created, but one was expected. Provisioning will not work. This functionality is considered an early Alpha version.") + } + pvclaimBinder := persistentvolumecontroller.NewPersistentVolumeClaimBinder(clientForUserAgentOrDie(*kubeconfig, "persistent-volume-binder"), s.PVClaimBinderSyncPeriod) pvclaimBinder.Run() - pvRecycler, err := persistentvolumecontroller.NewPersistentVolumeRecycler(clientForUserAgentOrDie(*kubeconfig, "persistent-volume-recycler"), s.PVClaimBinderSyncPeriod, ProbeRecyclableVolumePlugins(s.VolumeConfigFlags)) + pvRecycler, err := persistentvolumecontroller.NewPersistentVolumeRecycler(clientForUserAgentOrDie(*kubeconfig, "persistent-volume-recycler"), s.PVClaimBinderSyncPeriod, ProbeRecyclableVolumePlugins(s.VolumeConfigFlags), cloud) if err != nil { glog.Fatalf("Failed to start persistent volume recycler: %+v", err) } pvRecycler.Run() + if provisioner != nil { + pvController, err := persistentvolumecontroller.NewPersistentVolumeProvisionerController(persistentvolumecontroller.NewControllerClient(clientForUserAgentOrDie(*kubeconfig, "persistent-volume-provisioner")), s.PVClaimBinderSyncPeriod, volumePlugins, provisioner, cloud) + if err != nil { + glog.Fatalf("Failed to start persistent volume provisioner controller: %+v", err) + } + pvController.Run() + } + var rootCA []byte if s.RootCAFile != "" { diff --git a/cmd/kube-controller-manager/app/plugins.go b/cmd/kube-controller-manager/app/plugins.go index ac42716929f7b..1e5a55043e781 100644 --- a/cmd/kube-controller-manager/app/plugins.go +++ b/cmd/kube-controller-manager/app/plugins.go @@ -21,12 +21,18 @@ import ( // This should probably be part of some configuration fed into the build for a // given binary target. + "fmt" + //Cloud providers _ "k8s.io/kubernetes/pkg/cloudprovider/providers" // Volume plugins + "k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/util/io" "k8s.io/kubernetes/pkg/volume" + "k8s.io/kubernetes/pkg/volume/aws_ebs" + "k8s.io/kubernetes/pkg/volume/cinder" + "k8s.io/kubernetes/pkg/volume/gce_pd" "k8s.io/kubernetes/pkg/volume/host_path" "k8s.io/kubernetes/pkg/volume/nfs" @@ -51,7 +57,7 @@ func ProbeRecyclableVolumePlugins(flags VolumeConfigFlags) []volume.VolumePlugin RecyclerTimeoutIncrement: flags.PersistentVolumeRecyclerIncrementTimeoutHostPath, RecyclerPodTemplate: volume.NewPersistentVolumeRecyclerPodTemplate(), } - if err := attemptToLoadRecycler(flags.PersistentVolumeRecyclerPodTemplateFilePathHostPath, &hostPathConfig); err != nil { + if err := AttemptToLoadRecycler(flags.PersistentVolumeRecyclerPodTemplateFilePathHostPath, &hostPathConfig); err != nil { glog.Fatalf("Could not create hostpath recycler pod from file %s: %+v", flags.PersistentVolumeRecyclerPodTemplateFilePathHostPath, err) } allPlugins = append(allPlugins, host_path.ProbeVolumePlugins(hostPathConfig)...) @@ -61,18 +67,49 @@ func ProbeRecyclableVolumePlugins(flags VolumeConfigFlags) []volume.VolumePlugin RecyclerTimeoutIncrement: flags.PersistentVolumeRecyclerIncrementTimeoutNFS, RecyclerPodTemplate: volume.NewPersistentVolumeRecyclerPodTemplate(), } - if err := attemptToLoadRecycler(flags.PersistentVolumeRecyclerPodTemplateFilePathNFS, &nfsConfig); err != nil { + if err := AttemptToLoadRecycler(flags.PersistentVolumeRecyclerPodTemplateFilePathNFS, &nfsConfig); err != nil { glog.Fatalf("Could not create NFS recycler pod from file %s: %+v", flags.PersistentVolumeRecyclerPodTemplateFilePathNFS, err) } allPlugins = append(allPlugins, nfs.ProbeVolumePlugins(nfsConfig)...) + allPlugins = append(allPlugins, aws_ebs.ProbeVolumePlugins()...) + allPlugins = append(allPlugins, gce_pd.ProbeVolumePlugins()...) + allPlugins = append(allPlugins, cinder.ProbeVolumePlugins()...) + return allPlugins } -// attemptToLoadRecycler tries decoding a pod from a filepath for use as a recycler for a volume. +// NewVolumeProvisioner returns a volume provisioner to use when running in a cloud or development environment. +// The beta implementation of provisioning allows 1 implied provisioner per cloud, until we allow configuration of many. +// We explicitly map clouds to volume plugins here which allows us to configure many later without backwards compatibility issues. +// Not all cloudproviders have provisioning capability, which is the reason for the bool in the return to tell the caller to expect one or not. +func NewVolumeProvisioner(cloud cloudprovider.Interface, flags VolumeConfigFlags) (volume.ProvisionableVolumePlugin, error) { + switch { + case cloud == nil && flags.EnableHostPathProvisioning: + return getProvisionablePluginFromVolumePlugins(host_path.ProbeVolumePlugins(volume.VolumeConfig{})) + // case cloud != nil && aws.ProviderName == cloud.ProviderName(): + // return getProvisionablePluginFromVolumePlugins(aws_ebs.ProbeVolumePlugins()) + // case cloud != nil && gce.ProviderName == cloud.ProviderName(): + // return getProvisionablePluginFromVolumePlugins(gce_pd.ProbeVolumePlugins()) + // case cloud != nil && openstack.ProviderName == cloud.ProviderName(): + // return getProvisionablePluginFromVolumePlugins(cinder.ProbeVolumePlugins()) + } + return nil, nil +} + +func getProvisionablePluginFromVolumePlugins(plugins []volume.VolumePlugin) (volume.ProvisionableVolumePlugin, error) { + for _, plugin := range plugins { + if provisonablePlugin, ok := plugin.(volume.ProvisionableVolumePlugin); ok { + return provisonablePlugin, nil + } + } + return nil, fmt.Errorf("ProvisionablePlugin expected but not found in %#v: ", plugins) +} + +// AttemptToLoadRecycler tries decoding a pod from a filepath for use as a recycler for a volume. // If successful, this method will set the recycler on the config. -// If unsucessful, an error is returned. -func attemptToLoadRecycler(path string, config *volume.VolumeConfig) error { +// If unsuccessful, an error is returned. Function is exported for reuse downstream. +func AttemptToLoadRecycler(path string, config *volume.VolumeConfig) error { if path != "" { recyclerPod, err := io.LoadPodFromFile(path) if err != nil { diff --git a/contrib/mesos/pkg/controllermanager/controllermanager.go b/contrib/mesos/pkg/controllermanager/controllermanager.go index 45a2251d18079..d48bf2fc14efd 100644 --- a/contrib/mesos/pkg/controllermanager/controllermanager.go +++ b/contrib/mesos/pkg/controllermanager/controllermanager.go @@ -25,7 +25,7 @@ import ( "strconv" "time" - "k8s.io/kubernetes/cmd/kube-controller-manager/app" + kubecontrollermanager "k8s.io/kubernetes/cmd/kube-controller-manager/app" "k8s.io/kubernetes/contrib/mesos/pkg/node" "k8s.io/kubernetes/pkg/api/unversioned" client "k8s.io/kubernetes/pkg/client/unversioned" @@ -57,14 +57,14 @@ import ( // CMServer is the main context object for the controller manager. type CMServer struct { - *app.CMServer + *kubecontrollermanager.CMServer UseHostPortEndpoints bool } // NewCMServer creates a new CMServer with a default config. func NewCMServer() *CMServer { s := &CMServer{ - CMServer: app.NewCMServer(), + CMServer: kubecontrollermanager.NewCMServer(), } s.CloudProvider = mesos.ProviderName s.UseHostPortEndpoints = true @@ -167,14 +167,29 @@ func (s *CMServer) Run(_ []string) error { namespaceController := namespacecontroller.NewNamespaceController(kubeClient, &unversioned.APIVersions{}, s.NamespaceSyncPeriod) namespaceController.Run() + volumePlugins := kubecontrollermanager.ProbeRecyclableVolumePlugins(s.VolumeConfigFlags) + provisioner, err := kubecontrollermanager.NewVolumeProvisioner(cloud, s.VolumeConfigFlags) + if err != nil { + glog.Fatal("A Provisioner could not be created, but one was expected. Provisioning will not work. This functionality is considered an early Alpha version.") + } + pvclaimBinder := persistentvolumecontroller.NewPersistentVolumeClaimBinder(kubeClient, s.PVClaimBinderSyncPeriod) pvclaimBinder.Run() - pvRecycler, err := persistentvolumecontroller.NewPersistentVolumeRecycler(kubeClient, s.PVClaimBinderSyncPeriod, app.ProbeRecyclableVolumePlugins(s.VolumeConfigFlags)) + + pvRecycler, err := persistentvolumecontroller.NewPersistentVolumeRecycler(kubeClient, s.PVClaimBinderSyncPeriod, kubecontrollermanager.ProbeRecyclableVolumePlugins(s.VolumeConfigFlags), cloud) if err != nil { glog.Fatalf("Failed to start persistent volume recycler: %+v", err) } pvRecycler.Run() + if provisioner != nil { + pvController, err := persistentvolumecontroller.NewPersistentVolumeProvisionerController(persistentvolumecontroller.NewControllerClient(kubeClient), s.PVClaimBinderSyncPeriod, volumePlugins, provisioner, cloud) + if err != nil { + glog.Fatalf("Failed to start persistent volume provisioner controller: %+v", err) + } + pvController.Run() + } + var rootCA []byte if s.RootCAFile != "" { diff --git a/docs/admin/kube-controller-manager.md b/docs/admin/kube-controller-manager.md index 3cfc998919114..27446f7e65785 100644 --- a/docs/admin/kube-controller-manager.md +++ b/docs/admin/kube-controller-manager.md @@ -66,6 +66,7 @@ kube-controller-manager --deleting-pods-burst=10: Number of nodes on which pods are bursty deleted in case of node failure. For more details look into RateLimiter. --deleting-pods-qps=0.1: Number of nodes per second on which pods are deleted in case of node failure. --deployment-controller-sync-period=30s: Period for syncing the deployments. + --enable-hostpath-provisioner[=false]: Enable HostPath PV provisioning when running without a cloud provider. This allows testing and development of provisioning features. HostPath provisioning is not supported in any way, won't work in a multi-node cluster, and should not be used for anything other than testing or development. --google-json-key="": The Google Cloud Platform Service Account JSON Key to use for authentication. --horizontal-pod-autoscaler-sync-period=30s: The period for syncing the number of pods in horizontal pod autoscaler. --kube-api-burst=30: Burst to use while talking with kubernetes apiserver @@ -96,7 +97,7 @@ kube-controller-manager --terminated-pod-gc-threshold=12500: Number of terminated pods that can exist before the terminated pod garbage collector starts deleting terminated pods. If <= 0, the terminated pod garbage collector is disabled. ``` -###### Auto generated by spf13/cobra on 30-Nov-2015 +###### Auto generated by spf13/cobra on 9-Dec-2015 diff --git a/examples/experimental/persistent-volume-provisioning/README.md b/examples/experimental/persistent-volume-provisioning/README.md new file mode 100644 index 0000000000000..90d353f56845e --- /dev/null +++ b/examples/experimental/persistent-volume-provisioning/README.md @@ -0,0 +1,124 @@ + + + + +WARNING +WARNING +WARNING +WARNING +WARNING + +

PLEASE NOTE: This document applies to the HEAD of the source tree

+ +If you are using a released version of Kubernetes, you should +refer to the docs that go with that version. + + +The latest release of this document can be found +[here](http://releases.k8s.io/release-1.1/examples/experimental/persistent-volume-provisioning/README.md). + +Documentation for other releases can be found at +[releases.k8s.io](http://releases.k8s.io). + +-- + + + + + +## Persistent Volume Provisioning + +This example shows how to use experimental persistent volume provisioning. + +### Pre-requisites + +This example assumes that you have an understanding of Kubernetes administration and can modify the +scripts that launch kube-controller-manager. + +### Admin Configuration + +No configuration is required by the admin! 3 cloud providers will be provided in the alpha version +of this feature: EBS, GCE, and Cinder. + +When Kubernetes is running in one of those clouds, there will be an implied provisioner. +There is no provisioner when running outside of any of those 3 cloud providers. + +A fourth provisioner is included for testing and development only. It creates HostPath volumes, +which will never work outside of a single node cluster. It is not supported in any way except for +local for testing and development. + + +### User provisioning requests + +Users request dynamically provisioned storage by including a storage class in their `PersistentVolumeClaim`. +The annotation `volume.alpha.kubernetes.io/storage-class` is used to access this experimental feature. +In the future, admins will be able to define many storage classes. +The storage class may remain in an annotation or become a field on the claim itself. + +> The value of the storage-class annotation does not matter in the alpha version of this feature. There is +a single implied provisioner per cloud (which creates 1 kind of volume in the provider). The full version of the feature +will require that this value matches what is configured by the administrator. + +``` +{ + "kind": "PersistentVolumeClaim", + "apiVersion": "v1", + "metadata": { + "name": "claim1", + "annotations": { + "volume.alpha.kubernetes.io/storage-class": "foo" + } + }, + "spec": { + "accessModes": [ + "ReadWriteOnce" + ], + "resources": { + "requests": { + "storage": "3Gi" + } + } + } +} +``` + +### Sample output + +This example uses HostPath but any provisioner would follow the same flow. + +First we note there are no Persistent Volumes in the cluster. After creating a claim, we see a new PV is created +and automatically bound to the claim requesting storage. + + +``` +$ kubectl get pv + +$ kubectl create -f examples/experimental/persistent-volume-provisioning/claim1.json +I1012 13:07:57.666759 22875 decoder.go:141] decoding stream as JSON +persistentvolumeclaim "claim1" created + +$ kubectl get pv +NAME LABELS CAPACITY ACCESSMODES STATUS CLAIM REASON AGE +pv-hostpath-r6z5o createdby=hostpath-dynamic-provisioner 3Gi RWO Bound default/claim1 2s + +$ kubectl get pvc +NAME LABELS STATUS VOLUME CAPACITY ACCESSMODES AGE +claim1 Bound pv-hostpath-r6z5o 3Gi RWO 7s + +# delete the claim to release the volume +$ kubectl delete pvc claim1 +persistentvolumeclaim "claim1" deleted + +# the volume is deleted in response to being release of its claim +$ kubectl get pv + +``` + + +[![Analytics](https://kubernetes-site.appspot.com/UA-36037335-10/GitHub/examples/experimental/persistent-volume-provisioning/README.md?pixel)]() + diff --git a/examples/experimental/persistent-volume-provisioning/claim1.json b/examples/experimental/persistent-volume-provisioning/claim1.json new file mode 100644 index 0000000000000..48f28b3ca876f --- /dev/null +++ b/examples/experimental/persistent-volume-provisioning/claim1.json @@ -0,0 +1,20 @@ +{ + "kind": "PersistentVolumeClaim", + "apiVersion": "v1", + "metadata": { + "name": "claim1", + "annotations": { + "volume.alpha.kubernetes.io/storage-class": "foo" + } + }, + "spec": { + "accessModes": [ + "ReadWriteOnce" + ], + "resources": { + "requests": { + "storage": "3Gi" + } + } + } +} diff --git a/examples/experimental/persistent-volume-provisioning/claim2.json b/examples/experimental/persistent-volume-provisioning/claim2.json new file mode 100644 index 0000000000000..8ffd9c8e8f567 --- /dev/null +++ b/examples/experimental/persistent-volume-provisioning/claim2.json @@ -0,0 +1,20 @@ +{ + "kind": "PersistentVolumeClaim", + "apiVersion": "v1", + "metadata": { + "name": "claim2", + "annotations": { + "volume.alpha.kubernetes.io/storage-class": "bar" + } + }, + "spec": { + "accessModes": [ + "ReadWriteOnce" + ], + "resources": { + "requests": { + "storage": "3Gi" + } + } + } +} diff --git a/hack/local-up-cluster.sh b/hack/local-up-cluster.sh index 9993994d7fdb5..450fd1eb26fa2 100755 --- a/hack/local-up-cluster.sh +++ b/hack/local-up-cluster.sh @@ -91,6 +91,7 @@ RKT_PATH=${RKT_PATH:-""} RKT_STAGE1_IMAGE=${RKT_STAGE1_IMAGE:-""} CHAOS_CHANCE=${CHAOS_CHANCE:-0.0} CPU_CFS_QUOTA=${CPU_CFS_QUOTA:-false} +ENABLE_HOSTPATH_PROVISIONER=${ENABLE_HOSTPATH_PROVISIONER:-"false"} function test_apiserver_off { # For the common local scenario, fail fast if server is already running. @@ -250,6 +251,7 @@ function start_controller_manager { --v=${LOG_LEVEL} \ --service-account-private-key-file="${SERVICE_ACCOUNT_KEY}" \ --root-ca-file="${ROOT_CA_FILE}" \ + --enable-hostpath-provisioner="${ENABLE_HOSTPATH_PROVISIONER}" \ --master="${API_HOST}:${API_PORT}" >"${CTLRMGR_LOG}" 2>&1 & CTLRMGR_PID=$! } diff --git a/hack/verify-flags/known-flags.txt b/hack/verify-flags/known-flags.txt index cefee443fa13e..386cd95fcf6d4 100644 --- a/hack/verify-flags/known-flags.txt +++ b/hack/verify-flags/known-flags.txt @@ -78,6 +78,7 @@ e2e-output-dir e2e-verify-service-account enable-debugging-handlers enable-server +enable-hostpath-provisioner etcd-config etcd-prefix etcd-server diff --git a/pkg/controller/persistentvolume/persistentvolume_claim_binder_controller.go b/pkg/controller/persistentvolume/persistentvolume_claim_binder_controller.go index 3b4022cf65aae..cdcd154448be2 100644 --- a/pkg/controller/persistentvolume/persistentvolume_claim_binder_controller.go +++ b/pkg/controller/persistentvolume/persistentvolume_claim_binder_controller.go @@ -86,8 +86,7 @@ func NewPersistentVolumeClaimBinder(kubeClient client.Interface, syncPeriod time framework.ResourceEventHandlerFuncs{ AddFunc: binder.addClaim, UpdateFunc: binder.updateClaim, - // no DeleteFunc needed. a claim requires no clean-up. - // syncVolume handles the missing claim + DeleteFunc: binder.deleteClaim, }, ) @@ -145,6 +144,33 @@ func (binder *PersistentVolumeClaimBinder) updateClaim(oldObj, newObj interface{ } } +func (binder *PersistentVolumeClaimBinder) deleteClaim(obj interface{}) { + binder.lock.Lock() + defer binder.lock.Unlock() + var volume *api.PersistentVolume + if pvc, ok := obj.(*api.PersistentVolumeClaim); ok { + if pvObj, exists, _ := binder.volumeIndex.GetByKey(pvc.Spec.VolumeName); exists { + if pv, ok := pvObj.(*api.PersistentVolume); ok { + volume = pv + } + } + } + if unk, ok := obj.(cache.DeletedFinalStateUnknown); ok && unk.Obj != nil { + if pv, ok := unk.Obj.(*api.PersistentVolume); ok { + volume = pv + } + } + + // sync the volume when its claim is deleted. Explicitly sync'ing the volume here in response to + // claim deletion prevents the volume from waiting until the next sync period for its Release. + if volume != nil { + err := syncVolume(binder.volumeIndex, binder.client, volume) + if err != nil { + glog.Errorf("PVClaimBinder could not update volume %s from deleteClaim handler: %+v", volume.Name, err) + } + } +} + func syncVolume(volumeIndex *persistentVolumeOrderedIndex, binderClient binderClient, volume *api.PersistentVolume) (err error) { glog.V(5).Infof("Synchronizing PersistentVolume[%s], current phase: %s\n", volume.Name, volume.Status.Phase) @@ -166,6 +192,11 @@ func syncVolume(volumeIndex *persistentVolumeOrderedIndex, binderClient binderCl volumeIndex.Add(volume) } + if isBeingProvisioned(volume) { + glog.V(4).Infof("Skipping PersistentVolume[%s], waiting for provisioning to finish", volume.Name) + return nil + } + switch currentPhase { case api.VolumePending: @@ -275,38 +306,46 @@ func syncClaim(volumeIndex *persistentVolumeOrderedIndex, binderClient binderCli switch claim.Status.Phase { case api.ClaimPending: + // claims w/ a storage-class annotation for provisioning with *only* match volumes with a ClaimRef of the claim. volume, err := volumeIndex.findBestMatchForClaim(claim) if err != nil { return err } + if volume == nil { glog.V(5).Infof("A volume match does not exist for persistent claim: %s", claim.Name) return nil } - // create a reference to the claim and assign it to the volume being bound. - // the volume is a pointer and assigning the reference fixes a race condition where another - // claim might match this volume but before the claimRef is persistent in the next case statement + if isBeingProvisioned(volume) { + glog.V(5).Infof("PersistentVolume[%s] for PersistentVolumeClaim[%s/%s] is still being provisioned.", volume.Name, claim.Namespace, claim.Name) + return nil + } + claimRef, err := api.GetReference(claim) if err != nil { return fmt.Errorf("Unexpected error getting claim reference: %v\n", err) } - // make a binding reference to the claim and ensure to update the local index to prevent dupe bindings - clone, err := conversion.NewCloner().DeepCopy(volume) - if err != nil { - return fmt.Errorf("Error cloning pv: %v", err) - } - volumeClone, ok := clone.(*api.PersistentVolume) - if !ok { - return fmt.Errorf("Unexpected pv cast error : %v\n", volumeClone) - } - volumeClone.Spec.ClaimRef = claimRef - if updatedVolume, err := binderClient.UpdatePersistentVolume(volumeClone); err != nil { - return fmt.Errorf("Unexpected error saving PersistentVolume.Status: %+v", err) - } else { - volume = updatedVolume - volumeIndex.Update(updatedVolume) + // Make a binding reference to the claim by persisting claimRef on the volume. + // The local cache must be updated with the new bind to prevent subsequent + // claims from binding to the volume. + if volume.Spec.ClaimRef == nil { + clone, err := conversion.NewCloner().DeepCopy(volume) + if err != nil { + return fmt.Errorf("Error cloning pv: %v", err) + } + volumeClone, ok := clone.(*api.PersistentVolume) + if !ok { + return fmt.Errorf("Unexpected pv cast error : %v\n", volumeClone) + } + volumeClone.Spec.ClaimRef = claimRef + if updatedVolume, err := binderClient.UpdatePersistentVolume(volumeClone); err != nil { + return fmt.Errorf("Unexpected error saving PersistentVolume.Status: %+v", err) + } else { + volume = updatedVolume + volumeIndex.Update(updatedVolume) + } } // the bind is persisted on the volume above and will always match the claim in a search. @@ -341,6 +380,14 @@ func syncClaim(volumeIndex *persistentVolumeOrderedIndex, binderClient binderCli return nil } +func isBeingProvisioned(volume *api.PersistentVolume) bool { + value, found := volume.Annotations[pvProvisioningRequiredAnnotationKey] + if found && value != pvProvisioningCompletedAnnotationValue { + return true + } + return false +} + // Run starts all of this binder's control loops func (controller *PersistentVolumeClaimBinder) Run() { glog.V(5).Infof("Starting PersistentVolumeClaimBinder\n") diff --git a/pkg/controller/persistentvolume/persistentvolume_provisioner_controller.go b/pkg/controller/persistentvolume/persistentvolume_provisioner_controller.go new file mode 100644 index 0000000000000..e0cfa42001244 --- /dev/null +++ b/pkg/controller/persistentvolume/persistentvolume_provisioner_controller.go @@ -0,0 +1,498 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package persistentvolume + +import ( + "fmt" + "sync" + "time" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/client/cache" + client "k8s.io/kubernetes/pkg/client/unversioned" + "k8s.io/kubernetes/pkg/cloudprovider" + "k8s.io/kubernetes/pkg/controller/framework" + "k8s.io/kubernetes/pkg/conversion" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/types" + "k8s.io/kubernetes/pkg/util/io" + "k8s.io/kubernetes/pkg/util/mount" + "k8s.io/kubernetes/pkg/volume" + "k8s.io/kubernetes/pkg/watch" + + "github.com/golang/glog" +) + +// PersistentVolumeProvisionerController reconciles the state of all PersistentVolumes and PersistentVolumeClaims. +type PersistentVolumeProvisionerController struct { + volumeController *framework.Controller + volumeStore cache.Store + claimController *framework.Controller + claimStore cache.Store + client controllerClient + cloud cloudprovider.Interface + provisioner volume.ProvisionableVolumePlugin + pluginMgr volume.VolumePluginMgr + stopChannels map[string]chan struct{} + mutex sync.RWMutex +} + +// constant name values for the controllers stopChannels map. +// the controller uses these for graceful shutdown +const volumesStopChannel = "volumes" +const claimsStopChannel = "claims" + +// NewPersistentVolumeProvisionerController creates a new PersistentVolumeProvisionerController +func NewPersistentVolumeProvisionerController(client controllerClient, syncPeriod time.Duration, plugins []volume.VolumePlugin, provisioner volume.ProvisionableVolumePlugin, cloud cloudprovider.Interface) (*PersistentVolumeProvisionerController, error) { + controller := &PersistentVolumeProvisionerController{ + client: client, + cloud: cloud, + provisioner: provisioner, + } + + if err := controller.pluginMgr.InitPlugins(plugins, controller); err != nil { + return nil, fmt.Errorf("Could not initialize volume plugins for PersistentVolumeProvisionerController: %+v", err) + } + + glog.V(5).Infof("Initializing provisioner: %s", controller.provisioner.Name()) + controller.provisioner.Init(controller) + + controller.volumeStore, controller.volumeController = framework.NewInformer( + &cache.ListWatch{ + ListFunc: func(options unversioned.ListOptions) (runtime.Object, error) { + return client.ListPersistentVolumes(options) + }, + WatchFunc: func(options unversioned.ListOptions) (watch.Interface, error) { + return client.WatchPersistentVolumes(options) + }, + }, + &api.PersistentVolume{}, + syncPeriod, + framework.ResourceEventHandlerFuncs{ + AddFunc: controller.handleAddVolume, + UpdateFunc: controller.handleUpdateVolume, + // delete handler not needed in this controller. + // volume deletion is handled by the recycler controller + }, + ) + controller.claimStore, controller.claimController = framework.NewInformer( + &cache.ListWatch{ + ListFunc: func(options unversioned.ListOptions) (runtime.Object, error) { + return client.ListPersistentVolumeClaims(api.NamespaceAll, options) + }, + WatchFunc: func(options unversioned.ListOptions) (watch.Interface, error) { + return client.WatchPersistentVolumeClaims(api.NamespaceAll, options) + }, + }, + &api.PersistentVolumeClaim{}, + syncPeriod, + framework.ResourceEventHandlerFuncs{ + AddFunc: controller.handleAddClaim, + UpdateFunc: controller.handleUpdateClaim, + // delete handler not needed. + // normal recycling applies when a claim is deleted. + // recycling is handled by the binding controller. + }, + ) + + return controller, nil +} + +func (controller *PersistentVolumeProvisionerController) handleAddVolume(obj interface{}) { + controller.mutex.Lock() + defer controller.mutex.Unlock() + cachedPv, _, _ := controller.volumeStore.Get(obj) + if pv, ok := cachedPv.(*api.PersistentVolume); ok { + err := controller.reconcileVolume(pv) + if err != nil { + glog.Errorf("Error reconciling volume %s: %+v", pv.Name, err) + } + } +} + +func (controller *PersistentVolumeProvisionerController) handleUpdateVolume(oldObj, newObj interface{}) { + // The flow for Update is the same as Add. + // A volume is only provisioned if not done so already. + controller.handleAddVolume(newObj) +} + +func (controller *PersistentVolumeProvisionerController) handleAddClaim(obj interface{}) { + controller.mutex.Lock() + defer controller.mutex.Unlock() + cachedPvc, exists, _ := controller.claimStore.Get(obj) + if !exists { + glog.Errorf("PersistentVolumeClaim does not exist in the local cache: %+v", obj) + return + } + if pvc, ok := cachedPvc.(*api.PersistentVolumeClaim); ok { + err := controller.reconcileClaim(pvc) + if err != nil { + glog.Errorf("Error encoutered reconciling claim %s: %+v", pvc.Name, err) + } + } +} + +func (controller *PersistentVolumeProvisionerController) handleUpdateClaim(oldObj, newObj interface{}) { + // The flow for Update is the same as Add. + // A volume is only provisioned for a claim if not done so already. + controller.handleAddClaim(newObj) +} + +func (controller *PersistentVolumeProvisionerController) reconcileClaim(claim *api.PersistentVolumeClaim) error { + if controller.provisioner == nil { + return fmt.Errorf("No provisioner configured for controller") + } + + // no provisioning requested, return Pending. Claim may be pending indefinitely without a match. + if !keyExists(qosProvisioningKey, claim.Annotations) { + glog.V(5).Infof("PersistentVolumeClaim[%s] no provisioning required", claim.Name) + return nil + } + if len(claim.Spec.VolumeName) != 0 { + glog.V(5).Infof("PersistentVolumeClaim[%s] already bound. No provisioning required", claim.Name) + return nil + } + if isAnnotationMatch(pvProvisioningRequiredAnnotationKey, pvProvisioningCompletedAnnotationValue, claim.Annotations) { + glog.V(5).Infof("PersistentVolumeClaim[%s] is already provisioned.", claim.Name) + return nil + } + + glog.V(5).Infof("PersistentVolumeClaim[%s] provisioning", claim.Name) + provisioner, err := newProvisioner(controller.provisioner, claim) + if err != nil { + return fmt.Errorf("Unexpected error getting new provisioner for claim %s: %v\n", claim.Name, err) + } + newVolume, err := provisioner.NewPersistentVolumeTemplate() + if err != nil { + return fmt.Errorf("Unexpected error getting new volume template for claim %s: %v\n", claim.Name, err) + } + + claimRef, err := api.GetReference(claim) + if err != nil { + return fmt.Errorf("Unexpected error getting claim reference for %s: %v\n", claim.Name, err) + } + + storageClass, _ := claim.Annotations[qosProvisioningKey] + + // the creation of this volume is the bind to the claim. + // The claim will match the volume during the next sync period when the volume is in the local cache + newVolume.Spec.ClaimRef = claimRef + newVolume.Annotations[pvProvisioningRequiredAnnotationKey] = "true" + newVolume.Annotations[qosProvisioningKey] = storageClass + newVolume, err = controller.client.CreatePersistentVolume(newVolume) + glog.V(5).Infof("Unprovisioned PersistentVolume[%s] created for PVC[%s], which will be fulfilled in the storage provider", newVolume.Name, claim.Name) + if err != nil { + return fmt.Errorf("PersistentVolumeClaim[%s] failed provisioning: %+v", claim.Name, err) + } + + claim.Annotations[pvProvisioningRequiredAnnotationKey] = pvProvisioningCompletedAnnotationValue + _, err = controller.client.UpdatePersistentVolumeClaim(claim) + if err != nil { + glog.Error("error updating persistent volume claim: %v", err) + } + + return nil +} + +func (controller *PersistentVolumeProvisionerController) reconcileVolume(pv *api.PersistentVolume) error { + glog.V(5).Infof("PersistentVolume[%s] reconciling", pv.Name) + + if pv.Spec.ClaimRef == nil { + glog.V(5).Infof("PersistentVolume[%s] is not bound to a claim. No provisioning required", pv.Name) + return nil + } + + // TODO: fix this leaky abstraction. Had to make our own store key because ClaimRef fails the default keyfunc (no Meta on object). + obj, exists, _ := controller.claimStore.GetByKey(fmt.Sprintf("%s/%s", pv.Spec.ClaimRef.Namespace, pv.Spec.ClaimRef.Name)) + if !exists { + return fmt.Errorf("PersistentVolumeClaim[%s/%s] not found in local cache", pv.Spec.ClaimRef.Namespace, pv.Spec.ClaimRef.Name) + } + + claim, ok := obj.(*api.PersistentVolumeClaim) + if !ok { + return fmt.Errorf("PersistentVolumeClaim expected, but got %v", obj) + } + + // no provisioning required, volume is ready and Bound + if !keyExists(pvProvisioningRequiredAnnotationKey, pv.Annotations) { + glog.V(5).Infof("PersistentVolume[%s] does not require provisioning", pv.Name) + return nil + } + + // provisioning is completed, volume is ready. + if isProvisioningComplete(pv) { + glog.V(5).Infof("PersistentVolume[%s] is bound and provisioning is complete", pv.Name) + if pv.Spec.ClaimRef.Namespace != claim.Namespace || pv.Spec.ClaimRef.Name != claim.Name { + return fmt.Errorf("pre-bind mismatch - expected %s but found %s/%s", claimToClaimKey(claim), pv.Spec.ClaimRef.Namespace, pv.Spec.ClaimRef.Name) + } + return nil + } + + // provisioning is incomplete. Attempt to provision the volume. + glog.V(5).Infof("PersistentVolume[%s] provisioning in progress", pv.Name) + err := provisionVolume(pv, controller) + if err != nil { + return fmt.Errorf("Error provisioning PersistentVolume[%s]: %v", err) + } + + return nil +} + +// provisionVolume provisions a volume that has been created in the cluster but not yet fulfilled by +// the storage provider. +func provisionVolume(pv *api.PersistentVolume, controller *PersistentVolumeProvisionerController) error { + if isProvisioningComplete(pv) { + return fmt.Errorf("PersistentVolume[%s] is already provisioned", pv.Name) + } + + if _, exists := pv.Annotations[qosProvisioningKey]; !exists { + return fmt.Errorf("PersistentVolume[%s] does not contain a provisioning request. Provisioning not required.", pv.Name) + } + + if controller.provisioner == nil { + return fmt.Errorf("No provisioner found for volume: %s", pv.Name) + } + + // Find the claim in local cache + obj, exists, _ := controller.claimStore.GetByKey(fmt.Sprintf("%s/%s", pv.Spec.ClaimRef.Namespace, pv.Spec.ClaimRef.Name)) + if !exists { + return fmt.Errorf("Could not find PersistentVolumeClaim[%s/%s] in local cache", pv.Spec.ClaimRef.Name, pv.Name) + } + claim := obj.(*api.PersistentVolumeClaim) + + provisioner, _ := newProvisioner(controller.provisioner, claim) + err := provisioner.Provision(pv) + if err != nil { + glog.Errorf("Could not provision %s", pv.Name) + pv.Status.Phase = api.VolumeFailed + pv.Status.Message = err.Error() + if pv, apiErr := controller.client.UpdatePersistentVolumeStatus(pv); apiErr != nil { + return fmt.Errorf("PersistentVolume[%s] failed provisioning and also failed status update: %v - %v", pv.Name, err, apiErr) + } + return fmt.Errorf("PersistentVolume[%s] failed provisioning : %v", pv.Name, err, err) + } + + clone, err := conversion.NewCloner().DeepCopy(pv) + volumeClone, ok := clone.(*api.PersistentVolume) + if !ok { + return fmt.Errorf("Unexpected pv cast error : %v\n", volumeClone) + } + volumeClone.Annotations[pvProvisioningRequiredAnnotationKey] = pvProvisioningCompletedAnnotationValue + + pv, err = controller.client.UpdatePersistentVolume(volumeClone) + if err != nil { + // TODO: https://github.com/kubernetes/kubernetes/issues/14443 + // the volume was created in the infrastructure and likely has a PV name on it, + // but we failed to save the annotation that marks the volume as provisioned. + return fmt.Errorf("Error updating PersistentVolume[%s] with provisioning completed annotation. There is a potential for dupes and orphans.", volumeClone.Name) + } + return nil +} + +// Run starts all of this controller's control loops +func (controller *PersistentVolumeProvisionerController) Run() { + glog.V(5).Infof("Starting PersistentVolumeProvisionerController\n") + if controller.stopChannels == nil { + controller.stopChannels = make(map[string]chan struct{}) + } + + if _, exists := controller.stopChannels[volumesStopChannel]; !exists { + controller.stopChannels[volumesStopChannel] = make(chan struct{}) + go controller.volumeController.Run(controller.stopChannels[volumesStopChannel]) + } + + if _, exists := controller.stopChannels[claimsStopChannel]; !exists { + controller.stopChannels[claimsStopChannel] = make(chan struct{}) + go controller.claimController.Run(controller.stopChannels[claimsStopChannel]) + } +} + +// Stop gracefully shuts down this controller +func (controller *PersistentVolumeProvisionerController) Stop() { + glog.V(5).Infof("Stopping PersistentVolumeProvisionerController\n") + for name, stopChan := range controller.stopChannels { + close(stopChan) + delete(controller.stopChannels, name) + } +} + +func newProvisioner(plugin volume.ProvisionableVolumePlugin, claim *api.PersistentVolumeClaim) (volume.Provisioner, error) { + volumeOptions := volume.VolumeOptions{ + Capacity: claim.Spec.Resources.Requests[api.ResourceName(api.ResourceStorage)], + AccessModes: claim.Spec.AccessModes, + PersistentVolumeReclaimPolicy: api.PersistentVolumeReclaimDelete, + } + + provisioner, err := plugin.NewProvisioner(volumeOptions) + return provisioner, err +} + +// controllerClient abstracts access to PVs and PVCs. Easy to mock for testing and wrap for real client. +type controllerClient interface { + CreatePersistentVolume(pv *api.PersistentVolume) (*api.PersistentVolume, error) + ListPersistentVolumes(options unversioned.ListOptions) (*api.PersistentVolumeList, error) + WatchPersistentVolumes(options unversioned.ListOptions) (watch.Interface, error) + GetPersistentVolume(name string) (*api.PersistentVolume, error) + UpdatePersistentVolume(volume *api.PersistentVolume) (*api.PersistentVolume, error) + DeletePersistentVolume(volume *api.PersistentVolume) error + UpdatePersistentVolumeStatus(volume *api.PersistentVolume) (*api.PersistentVolume, error) + + GetPersistentVolumeClaim(namespace, name string) (*api.PersistentVolumeClaim, error) + ListPersistentVolumeClaims(namespace string, options unversioned.ListOptions) (*api.PersistentVolumeClaimList, error) + WatchPersistentVolumeClaims(namespace string, options unversioned.ListOptions) (watch.Interface, error) + UpdatePersistentVolumeClaim(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error) + UpdatePersistentVolumeClaimStatus(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error) + + // provided to give VolumeHost and plugins access to the kube client + GetKubeClient() client.Interface +} + +func NewControllerClient(c client.Interface) controllerClient { + return &realControllerClient{c} +} + +var _ controllerClient = &realControllerClient{} + +type realControllerClient struct { + client client.Interface +} + +func (c *realControllerClient) GetPersistentVolume(name string) (*api.PersistentVolume, error) { + return c.client.PersistentVolumes().Get(name) +} + +func (c *realControllerClient) ListPersistentVolumes(options unversioned.ListOptions) (*api.PersistentVolumeList, error) { + return c.client.PersistentVolumes().List(options) +} + +func (c *realControllerClient) WatchPersistentVolumes(options unversioned.ListOptions) (watch.Interface, error) { + return c.client.PersistentVolumes().Watch(options) +} + +func (c *realControllerClient) CreatePersistentVolume(pv *api.PersistentVolume) (*api.PersistentVolume, error) { + return c.client.PersistentVolumes().Create(pv) +} + +func (c *realControllerClient) UpdatePersistentVolume(volume *api.PersistentVolume) (*api.PersistentVolume, error) { + return c.client.PersistentVolumes().Update(volume) +} + +func (c *realControllerClient) DeletePersistentVolume(volume *api.PersistentVolume) error { + return c.client.PersistentVolumes().Delete(volume.Name) +} + +func (c *realControllerClient) UpdatePersistentVolumeStatus(volume *api.PersistentVolume) (*api.PersistentVolume, error) { + return c.client.PersistentVolumes().UpdateStatus(volume) +} + +func (c *realControllerClient) GetPersistentVolumeClaim(namespace, name string) (*api.PersistentVolumeClaim, error) { + return c.client.PersistentVolumeClaims(namespace).Get(name) +} + +func (c *realControllerClient) ListPersistentVolumeClaims(namespace string, options unversioned.ListOptions) (*api.PersistentVolumeClaimList, error) { + return c.client.PersistentVolumeClaims(namespace).List(options) +} + +func (c *realControllerClient) WatchPersistentVolumeClaims(namespace string, options unversioned.ListOptions) (watch.Interface, error) { + return c.client.PersistentVolumeClaims(namespace).Watch(options) +} + +func (c *realControllerClient) UpdatePersistentVolumeClaim(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error) { + return c.client.PersistentVolumeClaims(claim.Namespace).Update(claim) +} + +func (c *realControllerClient) UpdatePersistentVolumeClaimStatus(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error) { + return c.client.PersistentVolumeClaims(claim.Namespace).UpdateStatus(claim) +} + +func (c *realControllerClient) GetKubeClient() client.Interface { + return c.client +} + +func keyExists(key string, haystack map[string]string) bool { + _, exists := haystack[key] + return exists +} + +func isProvisioningComplete(pv *api.PersistentVolume) bool { + return isAnnotationMatch(pvProvisioningRequiredAnnotationKey, pvProvisioningCompletedAnnotationValue, pv.Annotations) +} + +func isAnnotationMatch(key, needle string, haystack map[string]string) bool { + value, exists := haystack[key] + if !exists { + return false + } + return value == needle +} + +func isRecyclable(policy api.PersistentVolumeReclaimPolicy) bool { + return policy == api.PersistentVolumeReclaimDelete || policy == api.PersistentVolumeReclaimRecycle +} + +// VolumeHost implementation +// PersistentVolumeRecycler is host to the volume plugins, but does not actually mount any volumes. +// Because no mounting is performed, most of the VolumeHost methods are not implemented. +func (c *PersistentVolumeProvisionerController) GetPluginDir(podUID string) string { + return "" +} + +func (c *PersistentVolumeProvisionerController) GetPodVolumeDir(podUID types.UID, pluginName, volumeName string) string { + return "" +} + +func (c *PersistentVolumeProvisionerController) GetPodPluginDir(podUID types.UID, pluginName string) string { + return "" +} + +func (c *PersistentVolumeProvisionerController) GetKubeClient() client.Interface { + return c.client.GetKubeClient() +} + +func (c *PersistentVolumeProvisionerController) NewWrapperBuilder(spec *volume.Spec, pod *api.Pod, opts volume.VolumeOptions) (volume.Builder, error) { + return nil, fmt.Errorf("NewWrapperBuilder not supported by PVClaimBinder's VolumeHost implementation") +} + +func (c *PersistentVolumeProvisionerController) NewWrapperCleaner(spec *volume.Spec, podUID types.UID) (volume.Cleaner, error) { + return nil, fmt.Errorf("NewWrapperCleaner not supported by PVClaimBinder's VolumeHost implementation") +} + +func (c *PersistentVolumeProvisionerController) GetCloudProvider() cloudprovider.Interface { + return c.cloud +} + +func (c *PersistentVolumeProvisionerController) GetMounter() mount.Interface { + return nil +} + +func (c *PersistentVolumeProvisionerController) GetWriter() io.Writer { + return nil +} + +func (c *PersistentVolumeProvisionerController) GetHostName() string { + return "" +} + +const ( + // these pair of constants are used by the provisioner. + // The key is a kube namespaced key that denotes a volume requires provisioning. + // The value is set only when provisioning is completed. Any other value will tell the provisioner + // that provisioning has not yet occurred. + pvProvisioningRequiredAnnotationKey = "volume.experimental.kubernetes.io/provisioning-required" + pvProvisioningCompletedAnnotationValue = "volume.experimental.kubernetes.io/provisioning-completed" +) diff --git a/pkg/controller/persistentvolume/persistentvolume_provisioner_controller_test.go b/pkg/controller/persistentvolume/persistentvolume_provisioner_controller_test.go new file mode 100644 index 0000000000000..08eb03383a5ab --- /dev/null +++ b/pkg/controller/persistentvolume/persistentvolume_provisioner_controller_test.go @@ -0,0 +1,240 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package persistentvolume + +import ( + "fmt" + "testing" + "time" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/errors" + "k8s.io/kubernetes/pkg/api/resource" + "k8s.io/kubernetes/pkg/api/testapi" + "k8s.io/kubernetes/pkg/api/unversioned" + client "k8s.io/kubernetes/pkg/client/unversioned" + fake_cloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake" + "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/volume" + "k8s.io/kubernetes/pkg/watch" +) + +func TestProvisionerRunStop(t *testing.T) { + controller, _ := makeTestController() + + if len(controller.stopChannels) != 0 { + t.Errorf("Non-running provisioner should not have any stopChannels. Got %v", len(controller.stopChannels)) + } + + controller.Run() + + if len(controller.stopChannels) != 2 { + t.Errorf("Running provisioner should have exactly 2 stopChannels. Got %v", len(controller.stopChannels)) + } + + controller.Stop() + + if len(controller.stopChannels) != 0 { + t.Errorf("Non-running provisioner should not have any stopChannels. Got %v", len(controller.stopChannels)) + } +} + +func makeTestVolume() *api.PersistentVolume { + return &api.PersistentVolume{ + ObjectMeta: api.ObjectMeta{ + Annotations: map[string]string{}, + Name: "pv01", + }, + Spec: api.PersistentVolumeSpec{ + PersistentVolumeReclaimPolicy: api.PersistentVolumeReclaimDelete, + AccessModes: []api.PersistentVolumeAccessMode{api.ReadWriteOnce}, + Capacity: api.ResourceList{ + api.ResourceName(api.ResourceStorage): resource.MustParse("10Gi"), + }, + PersistentVolumeSource: api.PersistentVolumeSource{ + HostPath: &api.HostPathVolumeSource{ + Path: "/tmp/data01", + }, + }, + }, + } +} + +func makeTestClaim() *api.PersistentVolumeClaim { + return &api.PersistentVolumeClaim{ + ObjectMeta: api.ObjectMeta{ + Annotations: map[string]string{}, + Name: "claim01", + Namespace: "ns", + SelfLink: testapi.Default.SelfLink("pvc", ""), + }, + Spec: api.PersistentVolumeClaimSpec{ + AccessModes: []api.PersistentVolumeAccessMode{api.ReadWriteOnce}, + Resources: api.ResourceRequirements{ + Requests: api.ResourceList{ + api.ResourceName(api.ResourceStorage): resource.MustParse("8G"), + }, + }, + }, + } +} + +func makeTestController() (*PersistentVolumeProvisionerController, *mockControllerClient) { + mockClient := &mockControllerClient{} + mockVolumePlugin := &volume.FakeVolumePlugin{} + controller, _ := NewPersistentVolumeProvisionerController(mockClient, 1*time.Second, nil, mockVolumePlugin, &fake_cloud.FakeCloud{}) + return controller, mockClient +} + +func TestReconcileClaim(t *testing.T) { + controller, mockClient := makeTestController() + pvc := makeTestClaim() + + // watch would have added the claim to the store + controller.claimStore.Add(pvc) + err := controller.reconcileClaim(pvc) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + + // non-provisionable PVC should not have created a volume on reconciliation + if mockClient.volume != nil { + t.Error("Unexpected volume found in mock client. Expected nil") + } + + pvc.Annotations[qosProvisioningKey] = "foo" + + err = controller.reconcileClaim(pvc) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + + // PVC requesting provisioning should have a PV created for it + if mockClient.volume == nil { + t.Error("Expected to find bound volume but got nil") + } + + if mockClient.volume.Spec.ClaimRef.Name != pvc.Name { + t.Errorf("Expected PV to be bound to %s but got %s", mockClient.volume.Spec.ClaimRef.Name, pvc.Name) + } +} + +func TestReconcileVolume(t *testing.T) { + + controller, mockClient := makeTestController() + pv := makeTestVolume() + pvc := makeTestClaim() + + err := controller.reconcileVolume(pv) + if err != nil { + t.Errorf("Unexpected error %v", err) + } + + // watch adds claim to the store. + // we need to add it to our mock client to mimic normal Get call + controller.claimStore.Add(pvc) + mockClient.claim = pvc + + // pretend the claim and volume are bound, no provisioning required + claimRef, _ := api.GetReference(pvc) + pv.Spec.ClaimRef = claimRef + err = controller.reconcileVolume(pv) + if err != nil { + t.Errorf("Unexpected error %v", err) + } + + pv.Annotations[pvProvisioningRequiredAnnotationKey] = "!pvProvisioningCompleted" + pv.Annotations[qosProvisioningKey] = "foo" + err = controller.reconcileVolume(pv) + + if !isAnnotationMatch(pvProvisioningRequiredAnnotationKey, pvProvisioningCompletedAnnotationValue, mockClient.volume.Annotations) { + t.Errorf("Expected %s but got %s", pvProvisioningRequiredAnnotationKey, mockClient.volume.Annotations[pvProvisioningRequiredAnnotationKey]) + } +} + +var _ controllerClient = &mockControllerClient{} + +type mockControllerClient struct { + volume *api.PersistentVolume + claim *api.PersistentVolumeClaim +} + +func (c *mockControllerClient) GetPersistentVolume(name string) (*api.PersistentVolume, error) { + return c.volume, nil +} + +func (c *mockControllerClient) CreatePersistentVolume(pv *api.PersistentVolume) (*api.PersistentVolume, error) { + if pv.GenerateName != "" && pv.Name == "" { + pv.Name = fmt.Sprintf(pv.GenerateName, util.NewUUID()) + } + c.volume = pv + return c.volume, nil +} + +func (c *mockControllerClient) ListPersistentVolumes(options unversioned.ListOptions) (*api.PersistentVolumeList, error) { + return &api.PersistentVolumeList{ + Items: []api.PersistentVolume{*c.volume}, + }, nil +} + +func (c *mockControllerClient) WatchPersistentVolumes(options unversioned.ListOptions) (watch.Interface, error) { + return watch.NewFake(), nil +} + +func (c *mockControllerClient) UpdatePersistentVolume(pv *api.PersistentVolume) (*api.PersistentVolume, error) { + return c.CreatePersistentVolume(pv) +} + +func (c *mockControllerClient) DeletePersistentVolume(volume *api.PersistentVolume) error { + c.volume = nil + return nil +} + +func (c *mockControllerClient) UpdatePersistentVolumeStatus(volume *api.PersistentVolume) (*api.PersistentVolume, error) { + return volume, nil +} + +func (c *mockControllerClient) GetPersistentVolumeClaim(namespace, name string) (*api.PersistentVolumeClaim, error) { + if c.claim != nil { + return c.claim, nil + } else { + return nil, errors.NewNotFound("persistentVolume", name) + } +} + +func (c *mockControllerClient) ListPersistentVolumeClaims(namespace string, options unversioned.ListOptions) (*api.PersistentVolumeClaimList, error) { + return &api.PersistentVolumeClaimList{ + Items: []api.PersistentVolumeClaim{*c.claim}, + }, nil +} + +func (c *mockControllerClient) WatchPersistentVolumeClaims(namespace string, options unversioned.ListOptions) (watch.Interface, error) { + return watch.NewFake(), nil +} + +func (c *mockControllerClient) UpdatePersistentVolumeClaim(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error) { + c.claim = claim + return c.claim, nil +} + +func (c *mockControllerClient) UpdatePersistentVolumeClaimStatus(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error) { + return claim, nil +} + +func (c *mockControllerClient) GetKubeClient() client.Interface { + return nil +} diff --git a/pkg/controller/persistentvolume/persistentvolume_recycler_controller.go b/pkg/controller/persistentvolume/persistentvolume_recycler_controller.go index 0cd6de4085013..629c465e2d4c6 100644 --- a/pkg/controller/persistentvolume/persistentvolume_recycler_controller.go +++ b/pkg/controller/persistentvolume/persistentvolume_recycler_controller.go @@ -46,14 +46,16 @@ type PersistentVolumeRecycler struct { client recyclerClient kubeClient client.Interface pluginMgr volume.VolumePluginMgr + cloud cloudprovider.Interface } // PersistentVolumeRecycler creates a new PersistentVolumeRecycler -func NewPersistentVolumeRecycler(kubeClient client.Interface, syncPeriod time.Duration, plugins []volume.VolumePlugin) (*PersistentVolumeRecycler, error) { +func NewPersistentVolumeRecycler(kubeClient client.Interface, syncPeriod time.Duration, plugins []volume.VolumePlugin, cloud cloudprovider.Interface) (*PersistentVolumeRecycler, error) { recyclerClient := NewRecyclerClient(kubeClient) recycler := &PersistentVolumeRecycler{ client: recyclerClient, kubeClient: kubeClient, + cloud: cloud, } if err := recycler.pluginMgr.InitPlugins(plugins, recycler); err != nil { @@ -283,7 +285,7 @@ func (f *PersistentVolumeRecycler) NewWrapperCleaner(spec *volume.Spec, podUID t } func (f *PersistentVolumeRecycler) GetCloudProvider() cloudprovider.Interface { - return nil + return f.cloud } func (f *PersistentVolumeRecycler) GetMounter() mount.Interface { diff --git a/pkg/controller/persistentvolume/types.go b/pkg/controller/persistentvolume/types.go index c022f97a3c564..04046d2044501 100644 --- a/pkg/controller/persistentvolume/types.go +++ b/pkg/controller/persistentvolume/types.go @@ -21,15 +21,15 @@ import ( "sort" "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/api/resource" "k8s.io/kubernetes/pkg/client/cache" ) const ( - // A PV created specifically for one claim must contain this annotation in order to bind to the claim. - // The value must be the namespace and name of the claim being bound to (i.e, claim.Namespace/claim.Name) - // This is an experimental feature and likely to change in the future. - createdForKey = "volume.extensions.kubernetes.io/provisioned-for" + // A PVClaim can request a quality of service tier by adding this annotation. The value of the annotation + // is arbitrary. The values are pre-defined by a cluster admin and known to users when requesting a QoS. + // For example tiers might be gold, silver, and tin and the admin configures what that means for each volume plugin that can provision a volume. + // Values in the alpha version of this feature are not meaningful, but will be in the full version of this feature. + qosProvisioningKey = "volume.alpha.kubernetes.io/storage-class" ) // persistentVolumeOrderedIndex is a cache.Store that keeps persistent volumes indexed by AccessModes and ordered by storage capacity. @@ -80,10 +80,7 @@ func (pvIndex *persistentVolumeOrderedIndex) ListByAccessModes(modes []api.Persi type matchPredicate func(compareThis, toThis *api.PersistentVolume) bool // find returns the nearest PV from the ordered list or nil if a match is not found -func (pvIndex *persistentVolumeOrderedIndex) find(searchPV *api.PersistentVolume, matchPredicate matchPredicate) (*api.PersistentVolume, error) { - // the 'searchPV' argument is a synthetic PV with capacity and accessmodes set according to the user's PersistentVolumeClaim. - // the synthetic pv arg is, therefore, a request for a storage resource. - // +func (pvIndex *persistentVolumeOrderedIndex) findByClaim(claim *api.PersistentVolumeClaim, matchPredicate matchPredicate) (*api.PersistentVolume, error) { // PVs are indexed by their access modes to allow easier searching. Each index is the string representation of a set of access modes. // There is a finite number of possible sets and PVs will only be indexed in one of them (whichever index matches the PV's modes). // @@ -92,17 +89,7 @@ func (pvIndex *persistentVolumeOrderedIndex) find(searchPV *api.PersistentVolume // // Searches are performed against a set of access modes, so we can attempt not only the exact matching modes but also // potential matches (the GCEPD example above). - allPossibleModes := pvIndex.allPossibleMatchingAccessModes(searchPV.Spec.AccessModes) - - // the searchPV should contain an annotation that allows pre-binding to a claim. - // we can use the same annotation value (pvc's namespace/name) and check against - // existing volumes to find an exact match. It is possible that a bind is made (ClaimRef persisted to PV) - // but the fail to update claim.Spec.VolumeName fails. This check allows the claim to find the volume - // that's already bound to the claim. - preboundClaim := "" - if createdFor, ok := searchPV.Annotations[createdForKey]; ok { - preboundClaim = createdFor - } + allPossibleModes := pvIndex.allPossibleMatchingAccessModes(claim.Spec.AccessModes) for _, modes := range allPossibleModes { volumes, err := pvIndex.ListByAccessModes(modes) @@ -115,19 +102,34 @@ func (pvIndex *persistentVolumeOrderedIndex) find(searchPV *api.PersistentVolume // return the exact pre-binding match, if found unboundVolumes := []*api.PersistentVolume{} for _, volume := range volumes { + // volume isn't currently bound or pre-bound. if volume.Spec.ClaimRef == nil { - // volume isn't currently bound or pre-bound. unboundVolumes = append(unboundVolumes, volume) continue } - boundClaim := fmt.Sprintf("%s/%s", volume.Spec.ClaimRef.Namespace, volume.Spec.ClaimRef.Name) - if boundClaim == preboundClaim { + if claim.Name == volume.Spec.ClaimRef.Name && claim.Namespace == volume.Spec.ClaimRef.Namespace { // exact match! No search required. return volume, nil } } + // a claim requesting provisioning will have an exact match pre-bound to the claim. + // no need to search through unbound volumes. The matching volume will be created by the provisioner + // and will match above when the claim is re-processed by the binder. + if keyExists(qosProvisioningKey, claim.Annotations) { + return nil, nil + } + + searchPV := &api.PersistentVolume{ + Spec: api.PersistentVolumeSpec{ + AccessModes: claim.Spec.AccessModes, + Capacity: api.ResourceList{ + api.ResourceName(api.ResourceStorage): claim.Spec.Resources.Requests[api.ResourceName(api.ResourceStorage)], + }, + }, + } + i := sort.Search(len(unboundVolumes), func(i int) bool { return matchPredicate(searchPV, unboundVolumes[i]) }) if i < len(unboundVolumes) { return unboundVolumes[i], nil @@ -136,27 +138,9 @@ func (pvIndex *persistentVolumeOrderedIndex) find(searchPV *api.PersistentVolume return nil, nil } -// findByAccessModesAndStorageCapacity is a convenience method that calls Find w/ requisite matchPredicate for storage -func (pvIndex *persistentVolumeOrderedIndex) findByAccessModesAndStorageCapacity(prebindKey string, modes []api.PersistentVolumeAccessMode, qty resource.Quantity) (*api.PersistentVolume, error) { - pv := &api.PersistentVolume{ - ObjectMeta: api.ObjectMeta{ - Annotations: map[string]string{ - createdForKey: prebindKey, - }, - }, - Spec: api.PersistentVolumeSpec{ - AccessModes: modes, - Capacity: api.ResourceList{ - api.ResourceName(api.ResourceStorage): qty, - }, - }, - } - return pvIndex.find(pv, matchStorageCapacity) -} - // findBestMatchForClaim is a convenience method that finds a volume by the claim's AccessModes and requests for Storage func (pvIndex *persistentVolumeOrderedIndex) findBestMatchForClaim(claim *api.PersistentVolumeClaim) (*api.PersistentVolume, error) { - return pvIndex.findByAccessModesAndStorageCapacity(fmt.Sprintf("%s/%s", claim.Namespace, claim.Name), claim.Spec.AccessModes, claim.Spec.Resources.Requests[api.ResourceName(api.ResourceStorage)]) + return pvIndex.findByClaim(claim, matchStorageCapacity) } // byCapacity is used to order volumes by ascending storage size @@ -268,3 +252,7 @@ func (c byAccessModes) Swap(i, j int) { func (c byAccessModes) Len() int { return len(c.modes) } + +func claimToClaimKey(claim *api.PersistentVolumeClaim) string { + return fmt.Sprintf("%s/%s", claim.Namespace, claim.Name) +} diff --git a/pkg/volume/host_path/host_path.go b/pkg/volume/host_path/host_path.go index 526bac2f0e518..0882479856b61 100644 --- a/pkg/volume/host_path/host_path.go +++ b/pkg/volume/host_path/host_path.go @@ -22,7 +22,6 @@ import ( "regexp" "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/api/resource" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/volume" @@ -35,11 +34,11 @@ import ( func ProbeVolumePlugins(volumeConfig volume.VolumeConfig) []volume.VolumePlugin { return []volume.VolumePlugin{ &hostPathPlugin{ - host: nil, - newRecyclerFunc: newRecycler, - newDeleterFunc: newDeleter, - newCreaterFunc: newCreater, - config: volumeConfig, + host: nil, + newRecyclerFunc: newRecycler, + newDeleterFunc: newDeleter, + newProvisionerFunc: newProvisioner, + config: volumeConfig, }, } } @@ -47,28 +46,28 @@ func ProbeVolumePlugins(volumeConfig volume.VolumeConfig) []volume.VolumePlugin func ProbeRecyclableVolumePlugins(recyclerFunc func(spec *volume.Spec, host volume.VolumeHost, volumeConfig volume.VolumeConfig) (volume.Recycler, error), volumeConfig volume.VolumeConfig) []volume.VolumePlugin { return []volume.VolumePlugin{ &hostPathPlugin{ - host: nil, - newRecyclerFunc: recyclerFunc, - newCreaterFunc: newCreater, - config: volumeConfig, + host: nil, + newRecyclerFunc: recyclerFunc, + newProvisionerFunc: newProvisioner, + config: volumeConfig, }, } } type hostPathPlugin struct { host volume.VolumeHost - // decouple creating Recyclers/Deleters/Creaters by deferring to a function. Allows for easier testing. - newRecyclerFunc func(spec *volume.Spec, host volume.VolumeHost, volumeConfig volume.VolumeConfig) (volume.Recycler, error) - newDeleterFunc func(spec *volume.Spec, host volume.VolumeHost) (volume.Deleter, error) - newCreaterFunc func(options volume.VolumeOptions, host volume.VolumeHost) (volume.Creater, error) - config volume.VolumeConfig + // decouple creating Recyclers/Deleters/Provisioners by deferring to a function. Allows for easier testing. + newRecyclerFunc func(spec *volume.Spec, host volume.VolumeHost, volumeConfig volume.VolumeConfig) (volume.Recycler, error) + newDeleterFunc func(spec *volume.Spec, host volume.VolumeHost) (volume.Deleter, error) + newProvisionerFunc func(options volume.VolumeOptions, host volume.VolumeHost) (volume.Provisioner, error) + config volume.VolumeConfig } var _ volume.VolumePlugin = &hostPathPlugin{} var _ volume.PersistentVolumePlugin = &hostPathPlugin{} var _ volume.RecyclableVolumePlugin = &hostPathPlugin{} var _ volume.DeletableVolumePlugin = &hostPathPlugin{} -var _ volume.CreatableVolumePlugin = &hostPathPlugin{} +var _ volume.ProvisionableVolumePlugin = &hostPathPlugin{} const ( hostPathPluginName = "kubernetes.io/host-path" @@ -124,11 +123,11 @@ func (plugin *hostPathPlugin) NewDeleter(spec *volume.Spec) (volume.Deleter, err return plugin.newDeleterFunc(spec, plugin.host) } -func (plugin *hostPathPlugin) NewCreater(options volume.VolumeOptions) (volume.Creater, error) { +func (plugin *hostPathPlugin) NewProvisioner(options volume.VolumeOptions) (volume.Provisioner, error) { if len(options.AccessModes) == 0 { options.AccessModes = plugin.GetAccessModes() } - return plugin.newCreaterFunc(options, plugin.host) + return plugin.newProvisionerFunc(options, plugin.host) } func newRecycler(spec *volume.Spec, host volume.VolumeHost, config volume.VolumeConfig) (volume.Recycler, error) { @@ -154,8 +153,8 @@ func newDeleter(spec *volume.Spec, host volume.VolumeHost) (volume.Deleter, erro return &hostPathDeleter{spec.Name(), path, host, volume.NewMetricsDu(path)}, nil } -func newCreater(options volume.VolumeOptions, host volume.VolumeHost) (volume.Creater, error) { - return &hostPathCreater{options: options, host: host}, nil +func newProvisioner(options volume.VolumeOptions, host volume.VolumeHost) (volume.Provisioner, error) { + return &hostPathProvisioner{options: options, host: host}, nil } // HostPath volumes represent a bare host file or directory mount. @@ -215,7 +214,7 @@ func (c *hostPathCleaner) TearDownAt(dir string) error { return fmt.Errorf("TearDownAt() does not make sense for host paths") } -// hostPathRecycler implements a dynamic provisioning Recycler for the HostPath plugin +// hostPathRecycler implements a Recycler for the HostPath plugin // This implementation is meant for testing only and only works in a single node cluster type hostPathRecycler struct { name string @@ -246,34 +245,36 @@ func (r *hostPathRecycler) Recycle() error { return volume.RecycleVolumeByWatchingPodUntilCompletion(pod, r.host.GetKubeClient()) } -// hostPathCreater implements a dynamic provisioning Creater for the HostPath plugin +// hostPathProvisioner implements a Provisioner for the HostPath plugin // This implementation is meant for testing only and only works in a single node cluster. -type hostPathCreater struct { +type hostPathProvisioner struct { host volume.VolumeHost options volume.VolumeOptions } // Create for hostPath simply creates a local /tmp/hostpath_pv/%s directory as a new PersistentVolume. -// This Creater is meant for development and testing only and WILL NOT WORK in a multi-node cluster. -func (r *hostPathCreater) Create() (*api.PersistentVolume, error) { - fullpath := fmt.Sprintf("/tmp/hostpath_pv/%s", util.NewUUID()) - err := os.MkdirAll(fullpath, 0750) - if err != nil { - return nil, err +// This Provisioner is meant for development and testing only and WILL NOT WORK in a multi-node cluster. +func (r *hostPathProvisioner) Provision(pv *api.PersistentVolume) error { + if pv.Spec.HostPath == nil { + return fmt.Errorf("pv.Spec.HostPath cannot be nil") } + return os.MkdirAll(pv.Spec.HostPath.Path, 0750) +} +func (r *hostPathProvisioner) NewPersistentVolumeTemplate() (*api.PersistentVolume, error) { + fullpath := fmt.Sprintf("/tmp/hostpath_pv/%s", util.NewUUID()) return &api.PersistentVolume{ ObjectMeta: api.ObjectMeta{ GenerateName: "pv-hostpath-", - Labels: map[string]string{ - "createdby": "hostpath dynamic provisioner", + Annotations: map[string]string{ + "kubernetes.io/createdby": "hostpath-dynamic-provisioner", }, }, Spec: api.PersistentVolumeSpec{ PersistentVolumeReclaimPolicy: r.options.PersistentVolumeReclaimPolicy, AccessModes: r.options.AccessModes, Capacity: api.ResourceList{ - api.ResourceName(api.ResourceStorage): resource.MustParse(fmt.Sprintf("%dMi", r.options.CapacityMB)), + api.ResourceName(api.ResourceStorage): r.options.Capacity, }, PersistentVolumeSource: api.PersistentVolumeSource{ HostPath: &api.HostPathVolumeSource{ diff --git a/pkg/volume/host_path/host_path_test.go b/pkg/volume/host_path/host_path_test.go index f5681ea9a3fc0..a82157c600f96 100644 --- a/pkg/volume/host_path/host_path_test.go +++ b/pkg/volume/host_path/host_path_test.go @@ -145,7 +145,7 @@ func TestDeleterTempDir(t *testing.T) { } } -func TestCreater(t *testing.T) { +func TestProvisioner(t *testing.T) { tempPath := "/tmp/hostpath/" defer os.RemoveAll(tempPath) err := os.MkdirAll(tempPath, 0750) @@ -157,18 +157,18 @@ func TestCreater(t *testing.T) { if err != nil { t.Errorf("Can't find the plugin by name") } - creater, err := plug.NewCreater(volume.VolumeOptions{CapacityMB: 100, PersistentVolumeReclaimPolicy: api.PersistentVolumeReclaimDelete}) + creater, err := plug.NewProvisioner(volume.VolumeOptions{Capacity: resource.MustParse("1Gi"), PersistentVolumeReclaimPolicy: api.PersistentVolumeReclaimDelete}) if err != nil { - t.Errorf("Failed to make a new Creater: %v", err) + t.Errorf("Failed to make a new Provisioner: %v", err) } - pv, err := creater.Create() + pv, err := creater.NewPersistentVolumeTemplate() if err != nil { t.Errorf("Unexpected error creating volume: %v", err) } if pv.Spec.HostPath.Path == "" { t.Errorf("Expected pv.Spec.HostPath.Path to not be empty: %#v", pv) } - expectedCapacity := resource.NewQuantity(100*1024*1024, resource.BinarySI) + expectedCapacity := resource.NewQuantity(1*1024*1024*1024, resource.BinarySI) actualCapacity := pv.Spec.Capacity[api.ResourceStorage] expectedAmt := expectedCapacity.Value() actualAmt := actualCapacity.Value() diff --git a/pkg/volume/plugins.go b/pkg/volume/plugins.go index 3a172889dc885..91cd7a04d4607 100644 --- a/pkg/volume/plugins.go +++ b/pkg/volume/plugins.go @@ -23,6 +23,7 @@ import ( "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/resource" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/types" @@ -39,11 +40,11 @@ type VolumeOptions struct { // it will be replaced and expanded on by future SecurityContext work. RootContext string - // The attributes below are required by volume.Creater - // perhaps CreaterVolumeOptions struct? + // The attributes below are required by volume.Provisioner + // TODO: refactor all of this out of volumes when an admin can configure many kinds of provisioners. - // CapacityMB is the size in MB of a volume. - CapacityMB int + // Capacity is the size of a volume. + Capacity resource.Quantity // AccessModes of a volume AccessModes []api.PersistentVolumeAccessMode // Reclamation policy for a persistent volume @@ -106,12 +107,12 @@ type DeletableVolumePlugin interface { NewDeleter(spec *Spec) (Deleter, error) } -// CreatableVolumePlugin is an extended interface of VolumePlugin and is used to create volumes for the cluster. -type CreatableVolumePlugin interface { +// ProvisionableVolumePlugin is an extended interface of VolumePlugin and is used to create volumes for the cluster. +type ProvisionableVolumePlugin interface { VolumePlugin - // NewCreater creates a new volume.Creater which knows how to create PersistentVolumes in accordance with + // NewProvisioner creates a new volume.Provisioner which knows how to create PersistentVolumes in accordance with // the plugin's underlying storage provider - NewCreater(options VolumeOptions) (Creater, error) + NewProvisioner(options VolumeOptions) (Provisioner, error) } // VolumeHost is an interface that plugins can use to access the kubelet. @@ -365,13 +366,13 @@ func (pm *VolumePluginMgr) FindDeletablePluginBySpec(spec *Spec) (DeletableVolum // FindCreatablePluginBySpec fetches a persistent volume plugin by name. If no plugin // is found, returns error. -func (pm *VolumePluginMgr) FindCreatablePluginBySpec(spec *Spec) (CreatableVolumePlugin, error) { +func (pm *VolumePluginMgr) FindCreatablePluginBySpec(spec *Spec) (ProvisionableVolumePlugin, error) { volumePlugin, err := pm.FindPluginBySpec(spec) if err != nil { return nil, err } - if creatableVolumePlugin, ok := volumePlugin.(CreatableVolumePlugin); ok { - return creatableVolumePlugin, nil + if provisionableVolumePlugin, ok := volumePlugin.(ProvisionableVolumePlugin); ok { + return provisionableVolumePlugin, nil } return nil, fmt.Errorf("no creatable volume plugin matched") } diff --git a/pkg/volume/testing.go b/pkg/volume/testing.go index 267a5a1838cb5..9f34a5eef4136 100644 --- a/pkg/volume/testing.go +++ b/pkg/volume/testing.go @@ -117,10 +117,13 @@ func ProbeVolumePlugins(config VolumeConfig) []VolumePlugin { type FakeVolumePlugin struct { PluginName string Host VolumeHost + Config VolumeConfig } var _ VolumePlugin = &FakeVolumePlugin{} var _ RecyclableVolumePlugin = &FakeVolumePlugin{} +var _ DeletableVolumePlugin = &FakeVolumePlugin{} +var _ ProvisionableVolumePlugin = &FakeVolumePlugin{} func (plugin *FakeVolumePlugin) Init(host VolumeHost) { plugin.Host = host @@ -151,6 +154,10 @@ func (plugin *FakeVolumePlugin) NewDeleter(spec *Spec) (Deleter, error) { return &FakeDeleter{"/attributesTransferredFromSpec", MetricsNil{}}, nil } +func (plugin *FakeVolumePlugin) NewProvisioner(options VolumeOptions) (Provisioner, error) { + return &FakeProvisioner{options, plugin.Host}, nil +} + func (plugin *FakeVolumePlugin) GetAccessModes() []api.PersistentVolumeAccessMode { return []api.PersistentVolumeAccessMode{} } @@ -227,3 +234,36 @@ func (fd *FakeDeleter) Delete() error { func (fd *FakeDeleter) GetPath() string { return fd.path } + +type FakeProvisioner struct { + Options VolumeOptions + Host VolumeHost +} + +func (fc *FakeProvisioner) NewPersistentVolumeTemplate() (*api.PersistentVolume, error) { + fullpath := fmt.Sprintf("/tmp/hostpath_pv/%s", util.NewUUID()) + return &api.PersistentVolume{ + ObjectMeta: api.ObjectMeta{ + GenerateName: "pv-fakeplugin-", + Annotations: map[string]string{ + "kubernetes.io/createdby": "fakeplugin-provisioner", + }, + }, + Spec: api.PersistentVolumeSpec{ + PersistentVolumeReclaimPolicy: fc.Options.PersistentVolumeReclaimPolicy, + AccessModes: fc.Options.AccessModes, + Capacity: api.ResourceList{ + api.ResourceName(api.ResourceStorage): fc.Options.Capacity, + }, + PersistentVolumeSource: api.PersistentVolumeSource{ + HostPath: &api.HostPathVolumeSource{ + Path: fullpath, + }, + }, + }, + }, nil +} + +func (fc *FakeProvisioner) Provision(pv *api.PersistentVolume) error { + return nil +} diff --git a/pkg/volume/volume.go b/pkg/volume/volume.go index ee9d213aae8af..aab01158f2bf9 100644 --- a/pkg/volume/volume.go +++ b/pkg/volume/volume.go @@ -100,10 +100,16 @@ type Recycler interface { Recycle() error } -// Create adds a new resource in the storage provider and creates a PersistentVolume for the new resource. -// Calls to Create should block until complete. -type Creater interface { - Create() (*api.PersistentVolume, error) +// Provisioner is an interface that creates templates for PersistentVolumes and can create the volume +// as a new resource in the infrastructure provider. +type Provisioner interface { + // Provision creates the resource by allocating the underlying volume in a storage system. + // This method should block until completion. + Provision(*api.PersistentVolume) error + // NewPersistentVolumeTemplate creates a new PersistentVolume to be used as a template before saving. + // The provisioner will want to tweak its properties, assign correct annotations, etc. + // This func should *NOT* persist the PV in the API. That is left to the caller. + NewPersistentVolumeTemplate() (*api.PersistentVolume, error) } // Delete removes the resource from the underlying storage provider. Calls to this method should block until @@ -111,6 +117,7 @@ type Creater interface { // A nil return indicates success. type Deleter interface { Volume + // This method should block until completion. Delete() error } diff --git a/test/integration/persistent_volumes_test.go b/test/integration/persistent_volumes_test.go index 99dfdd21987ea..2b7daf67fd978 100644 --- a/test/integration/persistent_volumes_test.go +++ b/test/integration/persistent_volumes_test.go @@ -29,6 +29,7 @@ import ( "k8s.io/kubernetes/pkg/api/testapi" "k8s.io/kubernetes/pkg/api/unversioned" client "k8s.io/kubernetes/pkg/client/unversioned" + fake_cloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake" persistentvolumecontroller "k8s.io/kubernetes/pkg/controller/persistentvolume" "k8s.io/kubernetes/pkg/conversion" "k8s.io/kubernetes/pkg/volume" @@ -48,12 +49,16 @@ func TestPersistentVolumeRecycler(t *testing.T) { binderClient := client.NewOrDie(&client.Config{Host: s.URL, GroupVersion: testapi.Default.GroupVersion()}) recyclerClient := client.NewOrDie(&client.Config{Host: s.URL, GroupVersion: testapi.Default.GroupVersion()}) testClient := client.NewOrDie(&client.Config{Host: s.URL, GroupVersion: testapi.Default.GroupVersion()}) + host := volume.NewFakeVolumeHost("/tmp/fake", nil, nil) - binder := persistentvolumecontroller.NewPersistentVolumeClaimBinder(binderClient, 10*time.Minute) + plugins := []volume.VolumePlugin{&volume.FakeVolumePlugin{"plugin-name", host, volume.VolumeConfig{}}} + cloud := &fake_cloud.FakeCloud{} + + binder := persistentvolumecontroller.NewPersistentVolumeClaimBinder(binderClient, 10*time.Second) binder.Run() defer binder.Stop() - recycler, _ := persistentvolumecontroller.NewPersistentVolumeRecycler(recyclerClient, 30*time.Minute, []volume.VolumePlugin{&volume.FakeVolumePlugin{"plugin-name", volume.NewFakeVolumeHost("/tmp/fake", nil, nil)}}) + recycler, _ := persistentvolumecontroller.NewPersistentVolumeRecycler(recyclerClient, 30*time.Second, plugins, cloud) recycler.Run() defer recycler.Stop()