diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go
index 8f9283856d7fe..eec5d2c8b135b 100644
--- a/cmd/kube-controller-manager/app/controllermanager.go
+++ b/cmd/kube-controller-manager/app/controllermanager.go
@@ -137,6 +137,7 @@ func NewCMServer() *CMServer {
PersistentVolumeRecyclerIncrementTimeoutNFS: 30,
PersistentVolumeRecyclerMinimumTimeoutHostPath: 60,
PersistentVolumeRecyclerIncrementTimeoutHostPath: 30,
+ EnableHostPathProvisioning: false,
},
KubeAPIQPS: 20.0,
KubeAPIBurst: 30,
@@ -176,6 +177,7 @@ type VolumeConfigFlags struct {
PersistentVolumeRecyclerPodTemplateFilePathHostPath string
PersistentVolumeRecyclerMinimumTimeoutHostPath int
PersistentVolumeRecyclerIncrementTimeoutHostPath int
+ EnableHostPathProvisioning bool
}
// AddFlags adds flags for a specific CMServer to the specified FlagSet
@@ -201,6 +203,7 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&s.VolumeConfigFlags.PersistentVolumeRecyclerPodTemplateFilePathHostPath, "pv-recycler-pod-template-filepath-hostpath", s.VolumeConfigFlags.PersistentVolumeRecyclerPodTemplateFilePathHostPath, "The file path to a pod definition used as a template for HostPath persistent volume recycling. This is for development and testing only and will not work in a multi-node cluster.")
fs.IntVar(&s.VolumeConfigFlags.PersistentVolumeRecyclerMinimumTimeoutHostPath, "pv-recycler-minimum-timeout-hostpath", s.VolumeConfigFlags.PersistentVolumeRecyclerMinimumTimeoutHostPath, "The minimum ActiveDeadlineSeconds to use for a HostPath Recycler pod. This is for development and testing only and will not work in a multi-node cluster.")
fs.IntVar(&s.VolumeConfigFlags.PersistentVolumeRecyclerIncrementTimeoutHostPath, "pv-recycler-timeout-increment-hostpath", s.VolumeConfigFlags.PersistentVolumeRecyclerIncrementTimeoutHostPath, "the increment of time added per Gi to ActiveDeadlineSeconds for a HostPath scrubber pod. This is for development and testing only and will not work in a multi-node cluster.")
+ fs.BoolVar(&s.VolumeConfigFlags.EnableHostPathProvisioning, "enable-hostpath-provisioner", s.VolumeConfigFlags.EnableHostPathProvisioning, "Enable HostPath PV provisioning when running without a cloud provider. This allows testing and development of provisioning features. HostPath provisioning is not supported in any way, won't work in a multi-node cluster, and should not be used for anything other than testing or development.")
fs.IntVar(&s.TerminatedPodGCThreshold, "terminated-pod-gc-threshold", s.TerminatedPodGCThreshold, "Number of terminated pods that can exist before the terminated pod garbage collector starts deleting terminated pods. If <= 0, the terminated pod garbage collector is disabled.")
fs.DurationVar(&s.HorizontalPodAutoscalerSyncPeriod, "horizontal-pod-autoscaler-sync-period", s.HorizontalPodAutoscalerSyncPeriod, "The period for syncing the number of pods in horizontal pod autoscaler.")
fs.DurationVar(&s.DeploymentControllerSyncPeriod, "deployment-controller-sync-period", s.DeploymentControllerSyncPeriod, "Period for syncing the deployments.")
@@ -385,15 +388,29 @@ func (s *CMServer) Run(_ []string) error {
}
}
+ volumePlugins := ProbeRecyclableVolumePlugins(s.VolumeConfigFlags)
+ provisioner, err := NewVolumeProvisioner(cloud, s.VolumeConfigFlags)
+ if err != nil {
+ glog.Fatal("A Provisioner could not be created, but one was expected. Provisioning will not work. This functionality is considered an early Alpha version.")
+ }
+
pvclaimBinder := persistentvolumecontroller.NewPersistentVolumeClaimBinder(clientForUserAgentOrDie(*kubeconfig, "persistent-volume-binder"), s.PVClaimBinderSyncPeriod)
pvclaimBinder.Run()
- pvRecycler, err := persistentvolumecontroller.NewPersistentVolumeRecycler(clientForUserAgentOrDie(*kubeconfig, "persistent-volume-recycler"), s.PVClaimBinderSyncPeriod, ProbeRecyclableVolumePlugins(s.VolumeConfigFlags))
+ pvRecycler, err := persistentvolumecontroller.NewPersistentVolumeRecycler(clientForUserAgentOrDie(*kubeconfig, "persistent-volume-recycler"), s.PVClaimBinderSyncPeriod, ProbeRecyclableVolumePlugins(s.VolumeConfigFlags), cloud)
if err != nil {
glog.Fatalf("Failed to start persistent volume recycler: %+v", err)
}
pvRecycler.Run()
+ if provisioner != nil {
+ pvController, err := persistentvolumecontroller.NewPersistentVolumeProvisionerController(persistentvolumecontroller.NewControllerClient(clientForUserAgentOrDie(*kubeconfig, "persistent-volume-provisioner")), s.PVClaimBinderSyncPeriod, volumePlugins, provisioner, cloud)
+ if err != nil {
+ glog.Fatalf("Failed to start persistent volume provisioner controller: %+v", err)
+ }
+ pvController.Run()
+ }
+
var rootCA []byte
if s.RootCAFile != "" {
diff --git a/cmd/kube-controller-manager/app/plugins.go b/cmd/kube-controller-manager/app/plugins.go
index ac42716929f7b..1e5a55043e781 100644
--- a/cmd/kube-controller-manager/app/plugins.go
+++ b/cmd/kube-controller-manager/app/plugins.go
@@ -21,12 +21,18 @@ import (
// This should probably be part of some configuration fed into the build for a
// given binary target.
+ "fmt"
+
//Cloud providers
_ "k8s.io/kubernetes/pkg/cloudprovider/providers"
// Volume plugins
+ "k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/util/io"
"k8s.io/kubernetes/pkg/volume"
+ "k8s.io/kubernetes/pkg/volume/aws_ebs"
+ "k8s.io/kubernetes/pkg/volume/cinder"
+ "k8s.io/kubernetes/pkg/volume/gce_pd"
"k8s.io/kubernetes/pkg/volume/host_path"
"k8s.io/kubernetes/pkg/volume/nfs"
@@ -51,7 +57,7 @@ func ProbeRecyclableVolumePlugins(flags VolumeConfigFlags) []volume.VolumePlugin
RecyclerTimeoutIncrement: flags.PersistentVolumeRecyclerIncrementTimeoutHostPath,
RecyclerPodTemplate: volume.NewPersistentVolumeRecyclerPodTemplate(),
}
- if err := attemptToLoadRecycler(flags.PersistentVolumeRecyclerPodTemplateFilePathHostPath, &hostPathConfig); err != nil {
+ if err := AttemptToLoadRecycler(flags.PersistentVolumeRecyclerPodTemplateFilePathHostPath, &hostPathConfig); err != nil {
glog.Fatalf("Could not create hostpath recycler pod from file %s: %+v", flags.PersistentVolumeRecyclerPodTemplateFilePathHostPath, err)
}
allPlugins = append(allPlugins, host_path.ProbeVolumePlugins(hostPathConfig)...)
@@ -61,18 +67,49 @@ func ProbeRecyclableVolumePlugins(flags VolumeConfigFlags) []volume.VolumePlugin
RecyclerTimeoutIncrement: flags.PersistentVolumeRecyclerIncrementTimeoutNFS,
RecyclerPodTemplate: volume.NewPersistentVolumeRecyclerPodTemplate(),
}
- if err := attemptToLoadRecycler(flags.PersistentVolumeRecyclerPodTemplateFilePathNFS, &nfsConfig); err != nil {
+ if err := AttemptToLoadRecycler(flags.PersistentVolumeRecyclerPodTemplateFilePathNFS, &nfsConfig); err != nil {
glog.Fatalf("Could not create NFS recycler pod from file %s: %+v", flags.PersistentVolumeRecyclerPodTemplateFilePathNFS, err)
}
allPlugins = append(allPlugins, nfs.ProbeVolumePlugins(nfsConfig)...)
+ allPlugins = append(allPlugins, aws_ebs.ProbeVolumePlugins()...)
+ allPlugins = append(allPlugins, gce_pd.ProbeVolumePlugins()...)
+ allPlugins = append(allPlugins, cinder.ProbeVolumePlugins()...)
+
return allPlugins
}
-// attemptToLoadRecycler tries decoding a pod from a filepath for use as a recycler for a volume.
+// NewVolumeProvisioner returns a volume provisioner to use when running in a cloud or development environment.
+// The beta implementation of provisioning allows 1 implied provisioner per cloud, until we allow configuration of many.
+// We explicitly map clouds to volume plugins here which allows us to configure many later without backwards compatibility issues.
+// Not all cloudproviders have provisioning capability, which is the reason for the bool in the return to tell the caller to expect one or not.
+func NewVolumeProvisioner(cloud cloudprovider.Interface, flags VolumeConfigFlags) (volume.ProvisionableVolumePlugin, error) {
+ switch {
+ case cloud == nil && flags.EnableHostPathProvisioning:
+ return getProvisionablePluginFromVolumePlugins(host_path.ProbeVolumePlugins(volume.VolumeConfig{}))
+ // case cloud != nil && aws.ProviderName == cloud.ProviderName():
+ // return getProvisionablePluginFromVolumePlugins(aws_ebs.ProbeVolumePlugins())
+ // case cloud != nil && gce.ProviderName == cloud.ProviderName():
+ // return getProvisionablePluginFromVolumePlugins(gce_pd.ProbeVolumePlugins())
+ // case cloud != nil && openstack.ProviderName == cloud.ProviderName():
+ // return getProvisionablePluginFromVolumePlugins(cinder.ProbeVolumePlugins())
+ }
+ return nil, nil
+}
+
+func getProvisionablePluginFromVolumePlugins(plugins []volume.VolumePlugin) (volume.ProvisionableVolumePlugin, error) {
+ for _, plugin := range plugins {
+ if provisonablePlugin, ok := plugin.(volume.ProvisionableVolumePlugin); ok {
+ return provisonablePlugin, nil
+ }
+ }
+ return nil, fmt.Errorf("ProvisionablePlugin expected but not found in %#v: ", plugins)
+}
+
+// AttemptToLoadRecycler tries decoding a pod from a filepath for use as a recycler for a volume.
// If successful, this method will set the recycler on the config.
-// If unsucessful, an error is returned.
-func attemptToLoadRecycler(path string, config *volume.VolumeConfig) error {
+// If unsuccessful, an error is returned. Function is exported for reuse downstream.
+func AttemptToLoadRecycler(path string, config *volume.VolumeConfig) error {
if path != "" {
recyclerPod, err := io.LoadPodFromFile(path)
if err != nil {
diff --git a/contrib/mesos/pkg/controllermanager/controllermanager.go b/contrib/mesos/pkg/controllermanager/controllermanager.go
index 45a2251d18079..d48bf2fc14efd 100644
--- a/contrib/mesos/pkg/controllermanager/controllermanager.go
+++ b/contrib/mesos/pkg/controllermanager/controllermanager.go
@@ -25,7 +25,7 @@ import (
"strconv"
"time"
- "k8s.io/kubernetes/cmd/kube-controller-manager/app"
+ kubecontrollermanager "k8s.io/kubernetes/cmd/kube-controller-manager/app"
"k8s.io/kubernetes/contrib/mesos/pkg/node"
"k8s.io/kubernetes/pkg/api/unversioned"
client "k8s.io/kubernetes/pkg/client/unversioned"
@@ -57,14 +57,14 @@ import (
// CMServer is the main context object for the controller manager.
type CMServer struct {
- *app.CMServer
+ *kubecontrollermanager.CMServer
UseHostPortEndpoints bool
}
// NewCMServer creates a new CMServer with a default config.
func NewCMServer() *CMServer {
s := &CMServer{
- CMServer: app.NewCMServer(),
+ CMServer: kubecontrollermanager.NewCMServer(),
}
s.CloudProvider = mesos.ProviderName
s.UseHostPortEndpoints = true
@@ -167,14 +167,29 @@ func (s *CMServer) Run(_ []string) error {
namespaceController := namespacecontroller.NewNamespaceController(kubeClient, &unversioned.APIVersions{}, s.NamespaceSyncPeriod)
namespaceController.Run()
+ volumePlugins := kubecontrollermanager.ProbeRecyclableVolumePlugins(s.VolumeConfigFlags)
+ provisioner, err := kubecontrollermanager.NewVolumeProvisioner(cloud, s.VolumeConfigFlags)
+ if err != nil {
+ glog.Fatal("A Provisioner could not be created, but one was expected. Provisioning will not work. This functionality is considered an early Alpha version.")
+ }
+
pvclaimBinder := persistentvolumecontroller.NewPersistentVolumeClaimBinder(kubeClient, s.PVClaimBinderSyncPeriod)
pvclaimBinder.Run()
- pvRecycler, err := persistentvolumecontroller.NewPersistentVolumeRecycler(kubeClient, s.PVClaimBinderSyncPeriod, app.ProbeRecyclableVolumePlugins(s.VolumeConfigFlags))
+
+ pvRecycler, err := persistentvolumecontroller.NewPersistentVolumeRecycler(kubeClient, s.PVClaimBinderSyncPeriod, kubecontrollermanager.ProbeRecyclableVolumePlugins(s.VolumeConfigFlags), cloud)
if err != nil {
glog.Fatalf("Failed to start persistent volume recycler: %+v", err)
}
pvRecycler.Run()
+ if provisioner != nil {
+ pvController, err := persistentvolumecontroller.NewPersistentVolumeProvisionerController(persistentvolumecontroller.NewControllerClient(kubeClient), s.PVClaimBinderSyncPeriod, volumePlugins, provisioner, cloud)
+ if err != nil {
+ glog.Fatalf("Failed to start persistent volume provisioner controller: %+v", err)
+ }
+ pvController.Run()
+ }
+
var rootCA []byte
if s.RootCAFile != "" {
diff --git a/docs/admin/kube-controller-manager.md b/docs/admin/kube-controller-manager.md
index 3cfc998919114..27446f7e65785 100644
--- a/docs/admin/kube-controller-manager.md
+++ b/docs/admin/kube-controller-manager.md
@@ -66,6 +66,7 @@ kube-controller-manager
--deleting-pods-burst=10: Number of nodes on which pods are bursty deleted in case of node failure. For more details look into RateLimiter.
--deleting-pods-qps=0.1: Number of nodes per second on which pods are deleted in case of node failure.
--deployment-controller-sync-period=30s: Period for syncing the deployments.
+ --enable-hostpath-provisioner[=false]: Enable HostPath PV provisioning when running without a cloud provider. This allows testing and development of provisioning features. HostPath provisioning is not supported in any way, won't work in a multi-node cluster, and should not be used for anything other than testing or development.
--google-json-key="": The Google Cloud Platform Service Account JSON Key to use for authentication.
--horizontal-pod-autoscaler-sync-period=30s: The period for syncing the number of pods in horizontal pod autoscaler.
--kube-api-burst=30: Burst to use while talking with kubernetes apiserver
@@ -96,7 +97,7 @@ kube-controller-manager
--terminated-pod-gc-threshold=12500: Number of terminated pods that can exist before the terminated pod garbage collector starts deleting terminated pods. If <= 0, the terminated pod garbage collector is disabled.
```
-###### Auto generated by spf13/cobra on 30-Nov-2015
+###### Auto generated by spf13/cobra on 9-Dec-2015
diff --git a/examples/experimental/persistent-volume-provisioning/README.md b/examples/experimental/persistent-volume-provisioning/README.md
new file mode 100644
index 0000000000000..90d353f56845e
--- /dev/null
+++ b/examples/experimental/persistent-volume-provisioning/README.md
@@ -0,0 +1,124 @@
+
+
+
+
+
+
+
+
+
+
+
PLEASE NOTE: This document applies to the HEAD of the source tree
+
+If you are using a released version of Kubernetes, you should
+refer to the docs that go with that version.
+
+
+The latest release of this document can be found
+[here](http://releases.k8s.io/release-1.1/examples/experimental/persistent-volume-provisioning/README.md).
+
+Documentation for other releases can be found at
+[releases.k8s.io](http://releases.k8s.io).
+
+--
+
+
+
+
+
+## Persistent Volume Provisioning
+
+This example shows how to use experimental persistent volume provisioning.
+
+### Pre-requisites
+
+This example assumes that you have an understanding of Kubernetes administration and can modify the
+scripts that launch kube-controller-manager.
+
+### Admin Configuration
+
+No configuration is required by the admin! 3 cloud providers will be provided in the alpha version
+of this feature: EBS, GCE, and Cinder.
+
+When Kubernetes is running in one of those clouds, there will be an implied provisioner.
+There is no provisioner when running outside of any of those 3 cloud providers.
+
+A fourth provisioner is included for testing and development only. It creates HostPath volumes,
+which will never work outside of a single node cluster. It is not supported in any way except for
+local for testing and development.
+
+
+### User provisioning requests
+
+Users request dynamically provisioned storage by including a storage class in their `PersistentVolumeClaim`.
+The annotation `volume.alpha.kubernetes.io/storage-class` is used to access this experimental feature.
+In the future, admins will be able to define many storage classes.
+The storage class may remain in an annotation or become a field on the claim itself.
+
+> The value of the storage-class annotation does not matter in the alpha version of this feature. There is
+a single implied provisioner per cloud (which creates 1 kind of volume in the provider). The full version of the feature
+will require that this value matches what is configured by the administrator.
+
+```
+{
+ "kind": "PersistentVolumeClaim",
+ "apiVersion": "v1",
+ "metadata": {
+ "name": "claim1",
+ "annotations": {
+ "volume.alpha.kubernetes.io/storage-class": "foo"
+ }
+ },
+ "spec": {
+ "accessModes": [
+ "ReadWriteOnce"
+ ],
+ "resources": {
+ "requests": {
+ "storage": "3Gi"
+ }
+ }
+ }
+}
+```
+
+### Sample output
+
+This example uses HostPath but any provisioner would follow the same flow.
+
+First we note there are no Persistent Volumes in the cluster. After creating a claim, we see a new PV is created
+and automatically bound to the claim requesting storage.
+
+
+```
+$ kubectl get pv
+
+$ kubectl create -f examples/experimental/persistent-volume-provisioning/claim1.json
+I1012 13:07:57.666759 22875 decoder.go:141] decoding stream as JSON
+persistentvolumeclaim "claim1" created
+
+$ kubectl get pv
+NAME LABELS CAPACITY ACCESSMODES STATUS CLAIM REASON AGE
+pv-hostpath-r6z5o createdby=hostpath-dynamic-provisioner 3Gi RWO Bound default/claim1 2s
+
+$ kubectl get pvc
+NAME LABELS STATUS VOLUME CAPACITY ACCESSMODES AGE
+claim1 Bound pv-hostpath-r6z5o 3Gi RWO 7s
+
+# delete the claim to release the volume
+$ kubectl delete pvc claim1
+persistentvolumeclaim "claim1" deleted
+
+# the volume is deleted in response to being release of its claim
+$ kubectl get pv
+
+```
+
+
+[![Analytics](https://kubernetes-site.appspot.com/UA-36037335-10/GitHub/examples/experimental/persistent-volume-provisioning/README.md?pixel)]()
+
diff --git a/examples/experimental/persistent-volume-provisioning/claim1.json b/examples/experimental/persistent-volume-provisioning/claim1.json
new file mode 100644
index 0000000000000..48f28b3ca876f
--- /dev/null
+++ b/examples/experimental/persistent-volume-provisioning/claim1.json
@@ -0,0 +1,20 @@
+{
+ "kind": "PersistentVolumeClaim",
+ "apiVersion": "v1",
+ "metadata": {
+ "name": "claim1",
+ "annotations": {
+ "volume.alpha.kubernetes.io/storage-class": "foo"
+ }
+ },
+ "spec": {
+ "accessModes": [
+ "ReadWriteOnce"
+ ],
+ "resources": {
+ "requests": {
+ "storage": "3Gi"
+ }
+ }
+ }
+}
diff --git a/examples/experimental/persistent-volume-provisioning/claim2.json b/examples/experimental/persistent-volume-provisioning/claim2.json
new file mode 100644
index 0000000000000..8ffd9c8e8f567
--- /dev/null
+++ b/examples/experimental/persistent-volume-provisioning/claim2.json
@@ -0,0 +1,20 @@
+{
+ "kind": "PersistentVolumeClaim",
+ "apiVersion": "v1",
+ "metadata": {
+ "name": "claim2",
+ "annotations": {
+ "volume.alpha.kubernetes.io/storage-class": "bar"
+ }
+ },
+ "spec": {
+ "accessModes": [
+ "ReadWriteOnce"
+ ],
+ "resources": {
+ "requests": {
+ "storage": "3Gi"
+ }
+ }
+ }
+}
diff --git a/hack/local-up-cluster.sh b/hack/local-up-cluster.sh
index 9993994d7fdb5..450fd1eb26fa2 100755
--- a/hack/local-up-cluster.sh
+++ b/hack/local-up-cluster.sh
@@ -91,6 +91,7 @@ RKT_PATH=${RKT_PATH:-""}
RKT_STAGE1_IMAGE=${RKT_STAGE1_IMAGE:-""}
CHAOS_CHANCE=${CHAOS_CHANCE:-0.0}
CPU_CFS_QUOTA=${CPU_CFS_QUOTA:-false}
+ENABLE_HOSTPATH_PROVISIONER=${ENABLE_HOSTPATH_PROVISIONER:-"false"}
function test_apiserver_off {
# For the common local scenario, fail fast if server is already running.
@@ -250,6 +251,7 @@ function start_controller_manager {
--v=${LOG_LEVEL} \
--service-account-private-key-file="${SERVICE_ACCOUNT_KEY}" \
--root-ca-file="${ROOT_CA_FILE}" \
+ --enable-hostpath-provisioner="${ENABLE_HOSTPATH_PROVISIONER}" \
--master="${API_HOST}:${API_PORT}" >"${CTLRMGR_LOG}" 2>&1 &
CTLRMGR_PID=$!
}
diff --git a/hack/verify-flags/known-flags.txt b/hack/verify-flags/known-flags.txt
index cefee443fa13e..386cd95fcf6d4 100644
--- a/hack/verify-flags/known-flags.txt
+++ b/hack/verify-flags/known-flags.txt
@@ -78,6 +78,7 @@ e2e-output-dir
e2e-verify-service-account
enable-debugging-handlers
enable-server
+enable-hostpath-provisioner
etcd-config
etcd-prefix
etcd-server
diff --git a/pkg/controller/persistentvolume/persistentvolume_claim_binder_controller.go b/pkg/controller/persistentvolume/persistentvolume_claim_binder_controller.go
index 3b4022cf65aae..cdcd154448be2 100644
--- a/pkg/controller/persistentvolume/persistentvolume_claim_binder_controller.go
+++ b/pkg/controller/persistentvolume/persistentvolume_claim_binder_controller.go
@@ -86,8 +86,7 @@ func NewPersistentVolumeClaimBinder(kubeClient client.Interface, syncPeriod time
framework.ResourceEventHandlerFuncs{
AddFunc: binder.addClaim,
UpdateFunc: binder.updateClaim,
- // no DeleteFunc needed. a claim requires no clean-up.
- // syncVolume handles the missing claim
+ DeleteFunc: binder.deleteClaim,
},
)
@@ -145,6 +144,33 @@ func (binder *PersistentVolumeClaimBinder) updateClaim(oldObj, newObj interface{
}
}
+func (binder *PersistentVolumeClaimBinder) deleteClaim(obj interface{}) {
+ binder.lock.Lock()
+ defer binder.lock.Unlock()
+ var volume *api.PersistentVolume
+ if pvc, ok := obj.(*api.PersistentVolumeClaim); ok {
+ if pvObj, exists, _ := binder.volumeIndex.GetByKey(pvc.Spec.VolumeName); exists {
+ if pv, ok := pvObj.(*api.PersistentVolume); ok {
+ volume = pv
+ }
+ }
+ }
+ if unk, ok := obj.(cache.DeletedFinalStateUnknown); ok && unk.Obj != nil {
+ if pv, ok := unk.Obj.(*api.PersistentVolume); ok {
+ volume = pv
+ }
+ }
+
+ // sync the volume when its claim is deleted. Explicitly sync'ing the volume here in response to
+ // claim deletion prevents the volume from waiting until the next sync period for its Release.
+ if volume != nil {
+ err := syncVolume(binder.volumeIndex, binder.client, volume)
+ if err != nil {
+ glog.Errorf("PVClaimBinder could not update volume %s from deleteClaim handler: %+v", volume.Name, err)
+ }
+ }
+}
+
func syncVolume(volumeIndex *persistentVolumeOrderedIndex, binderClient binderClient, volume *api.PersistentVolume) (err error) {
glog.V(5).Infof("Synchronizing PersistentVolume[%s], current phase: %s\n", volume.Name, volume.Status.Phase)
@@ -166,6 +192,11 @@ func syncVolume(volumeIndex *persistentVolumeOrderedIndex, binderClient binderCl
volumeIndex.Add(volume)
}
+ if isBeingProvisioned(volume) {
+ glog.V(4).Infof("Skipping PersistentVolume[%s], waiting for provisioning to finish", volume.Name)
+ return nil
+ }
+
switch currentPhase {
case api.VolumePending:
@@ -275,38 +306,46 @@ func syncClaim(volumeIndex *persistentVolumeOrderedIndex, binderClient binderCli
switch claim.Status.Phase {
case api.ClaimPending:
+ // claims w/ a storage-class annotation for provisioning with *only* match volumes with a ClaimRef of the claim.
volume, err := volumeIndex.findBestMatchForClaim(claim)
if err != nil {
return err
}
+
if volume == nil {
glog.V(5).Infof("A volume match does not exist for persistent claim: %s", claim.Name)
return nil
}
- // create a reference to the claim and assign it to the volume being bound.
- // the volume is a pointer and assigning the reference fixes a race condition where another
- // claim might match this volume but before the claimRef is persistent in the next case statement
+ if isBeingProvisioned(volume) {
+ glog.V(5).Infof("PersistentVolume[%s] for PersistentVolumeClaim[%s/%s] is still being provisioned.", volume.Name, claim.Namespace, claim.Name)
+ return nil
+ }
+
claimRef, err := api.GetReference(claim)
if err != nil {
return fmt.Errorf("Unexpected error getting claim reference: %v\n", err)
}
- // make a binding reference to the claim and ensure to update the local index to prevent dupe bindings
- clone, err := conversion.NewCloner().DeepCopy(volume)
- if err != nil {
- return fmt.Errorf("Error cloning pv: %v", err)
- }
- volumeClone, ok := clone.(*api.PersistentVolume)
- if !ok {
- return fmt.Errorf("Unexpected pv cast error : %v\n", volumeClone)
- }
- volumeClone.Spec.ClaimRef = claimRef
- if updatedVolume, err := binderClient.UpdatePersistentVolume(volumeClone); err != nil {
- return fmt.Errorf("Unexpected error saving PersistentVolume.Status: %+v", err)
- } else {
- volume = updatedVolume
- volumeIndex.Update(updatedVolume)
+ // Make a binding reference to the claim by persisting claimRef on the volume.
+ // The local cache must be updated with the new bind to prevent subsequent
+ // claims from binding to the volume.
+ if volume.Spec.ClaimRef == nil {
+ clone, err := conversion.NewCloner().DeepCopy(volume)
+ if err != nil {
+ return fmt.Errorf("Error cloning pv: %v", err)
+ }
+ volumeClone, ok := clone.(*api.PersistentVolume)
+ if !ok {
+ return fmt.Errorf("Unexpected pv cast error : %v\n", volumeClone)
+ }
+ volumeClone.Spec.ClaimRef = claimRef
+ if updatedVolume, err := binderClient.UpdatePersistentVolume(volumeClone); err != nil {
+ return fmt.Errorf("Unexpected error saving PersistentVolume.Status: %+v", err)
+ } else {
+ volume = updatedVolume
+ volumeIndex.Update(updatedVolume)
+ }
}
// the bind is persisted on the volume above and will always match the claim in a search.
@@ -341,6 +380,14 @@ func syncClaim(volumeIndex *persistentVolumeOrderedIndex, binderClient binderCli
return nil
}
+func isBeingProvisioned(volume *api.PersistentVolume) bool {
+ value, found := volume.Annotations[pvProvisioningRequiredAnnotationKey]
+ if found && value != pvProvisioningCompletedAnnotationValue {
+ return true
+ }
+ return false
+}
+
// Run starts all of this binder's control loops
func (controller *PersistentVolumeClaimBinder) Run() {
glog.V(5).Infof("Starting PersistentVolumeClaimBinder\n")
diff --git a/pkg/controller/persistentvolume/persistentvolume_provisioner_controller.go b/pkg/controller/persistentvolume/persistentvolume_provisioner_controller.go
new file mode 100644
index 0000000000000..e0cfa42001244
--- /dev/null
+++ b/pkg/controller/persistentvolume/persistentvolume_provisioner_controller.go
@@ -0,0 +1,498 @@
+/*
+Copyright 2015 The Kubernetes Authors All rights reserved.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package persistentvolume
+
+import (
+ "fmt"
+ "sync"
+ "time"
+
+ "k8s.io/kubernetes/pkg/api"
+ "k8s.io/kubernetes/pkg/api/unversioned"
+ "k8s.io/kubernetes/pkg/client/cache"
+ client "k8s.io/kubernetes/pkg/client/unversioned"
+ "k8s.io/kubernetes/pkg/cloudprovider"
+ "k8s.io/kubernetes/pkg/controller/framework"
+ "k8s.io/kubernetes/pkg/conversion"
+ "k8s.io/kubernetes/pkg/runtime"
+ "k8s.io/kubernetes/pkg/types"
+ "k8s.io/kubernetes/pkg/util/io"
+ "k8s.io/kubernetes/pkg/util/mount"
+ "k8s.io/kubernetes/pkg/volume"
+ "k8s.io/kubernetes/pkg/watch"
+
+ "github.com/golang/glog"
+)
+
+// PersistentVolumeProvisionerController reconciles the state of all PersistentVolumes and PersistentVolumeClaims.
+type PersistentVolumeProvisionerController struct {
+ volumeController *framework.Controller
+ volumeStore cache.Store
+ claimController *framework.Controller
+ claimStore cache.Store
+ client controllerClient
+ cloud cloudprovider.Interface
+ provisioner volume.ProvisionableVolumePlugin
+ pluginMgr volume.VolumePluginMgr
+ stopChannels map[string]chan struct{}
+ mutex sync.RWMutex
+}
+
+// constant name values for the controllers stopChannels map.
+// the controller uses these for graceful shutdown
+const volumesStopChannel = "volumes"
+const claimsStopChannel = "claims"
+
+// NewPersistentVolumeProvisionerController creates a new PersistentVolumeProvisionerController
+func NewPersistentVolumeProvisionerController(client controllerClient, syncPeriod time.Duration, plugins []volume.VolumePlugin, provisioner volume.ProvisionableVolumePlugin, cloud cloudprovider.Interface) (*PersistentVolumeProvisionerController, error) {
+ controller := &PersistentVolumeProvisionerController{
+ client: client,
+ cloud: cloud,
+ provisioner: provisioner,
+ }
+
+ if err := controller.pluginMgr.InitPlugins(plugins, controller); err != nil {
+ return nil, fmt.Errorf("Could not initialize volume plugins for PersistentVolumeProvisionerController: %+v", err)
+ }
+
+ glog.V(5).Infof("Initializing provisioner: %s", controller.provisioner.Name())
+ controller.provisioner.Init(controller)
+
+ controller.volumeStore, controller.volumeController = framework.NewInformer(
+ &cache.ListWatch{
+ ListFunc: func(options unversioned.ListOptions) (runtime.Object, error) {
+ return client.ListPersistentVolumes(options)
+ },
+ WatchFunc: func(options unversioned.ListOptions) (watch.Interface, error) {
+ return client.WatchPersistentVolumes(options)
+ },
+ },
+ &api.PersistentVolume{},
+ syncPeriod,
+ framework.ResourceEventHandlerFuncs{
+ AddFunc: controller.handleAddVolume,
+ UpdateFunc: controller.handleUpdateVolume,
+ // delete handler not needed in this controller.
+ // volume deletion is handled by the recycler controller
+ },
+ )
+ controller.claimStore, controller.claimController = framework.NewInformer(
+ &cache.ListWatch{
+ ListFunc: func(options unversioned.ListOptions) (runtime.Object, error) {
+ return client.ListPersistentVolumeClaims(api.NamespaceAll, options)
+ },
+ WatchFunc: func(options unversioned.ListOptions) (watch.Interface, error) {
+ return client.WatchPersistentVolumeClaims(api.NamespaceAll, options)
+ },
+ },
+ &api.PersistentVolumeClaim{},
+ syncPeriod,
+ framework.ResourceEventHandlerFuncs{
+ AddFunc: controller.handleAddClaim,
+ UpdateFunc: controller.handleUpdateClaim,
+ // delete handler not needed.
+ // normal recycling applies when a claim is deleted.
+ // recycling is handled by the binding controller.
+ },
+ )
+
+ return controller, nil
+}
+
+func (controller *PersistentVolumeProvisionerController) handleAddVolume(obj interface{}) {
+ controller.mutex.Lock()
+ defer controller.mutex.Unlock()
+ cachedPv, _, _ := controller.volumeStore.Get(obj)
+ if pv, ok := cachedPv.(*api.PersistentVolume); ok {
+ err := controller.reconcileVolume(pv)
+ if err != nil {
+ glog.Errorf("Error reconciling volume %s: %+v", pv.Name, err)
+ }
+ }
+}
+
+func (controller *PersistentVolumeProvisionerController) handleUpdateVolume(oldObj, newObj interface{}) {
+ // The flow for Update is the same as Add.
+ // A volume is only provisioned if not done so already.
+ controller.handleAddVolume(newObj)
+}
+
+func (controller *PersistentVolumeProvisionerController) handleAddClaim(obj interface{}) {
+ controller.mutex.Lock()
+ defer controller.mutex.Unlock()
+ cachedPvc, exists, _ := controller.claimStore.Get(obj)
+ if !exists {
+ glog.Errorf("PersistentVolumeClaim does not exist in the local cache: %+v", obj)
+ return
+ }
+ if pvc, ok := cachedPvc.(*api.PersistentVolumeClaim); ok {
+ err := controller.reconcileClaim(pvc)
+ if err != nil {
+ glog.Errorf("Error encoutered reconciling claim %s: %+v", pvc.Name, err)
+ }
+ }
+}
+
+func (controller *PersistentVolumeProvisionerController) handleUpdateClaim(oldObj, newObj interface{}) {
+ // The flow for Update is the same as Add.
+ // A volume is only provisioned for a claim if not done so already.
+ controller.handleAddClaim(newObj)
+}
+
+func (controller *PersistentVolumeProvisionerController) reconcileClaim(claim *api.PersistentVolumeClaim) error {
+ if controller.provisioner == nil {
+ return fmt.Errorf("No provisioner configured for controller")
+ }
+
+ // no provisioning requested, return Pending. Claim may be pending indefinitely without a match.
+ if !keyExists(qosProvisioningKey, claim.Annotations) {
+ glog.V(5).Infof("PersistentVolumeClaim[%s] no provisioning required", claim.Name)
+ return nil
+ }
+ if len(claim.Spec.VolumeName) != 0 {
+ glog.V(5).Infof("PersistentVolumeClaim[%s] already bound. No provisioning required", claim.Name)
+ return nil
+ }
+ if isAnnotationMatch(pvProvisioningRequiredAnnotationKey, pvProvisioningCompletedAnnotationValue, claim.Annotations) {
+ glog.V(5).Infof("PersistentVolumeClaim[%s] is already provisioned.", claim.Name)
+ return nil
+ }
+
+ glog.V(5).Infof("PersistentVolumeClaim[%s] provisioning", claim.Name)
+ provisioner, err := newProvisioner(controller.provisioner, claim)
+ if err != nil {
+ return fmt.Errorf("Unexpected error getting new provisioner for claim %s: %v\n", claim.Name, err)
+ }
+ newVolume, err := provisioner.NewPersistentVolumeTemplate()
+ if err != nil {
+ return fmt.Errorf("Unexpected error getting new volume template for claim %s: %v\n", claim.Name, err)
+ }
+
+ claimRef, err := api.GetReference(claim)
+ if err != nil {
+ return fmt.Errorf("Unexpected error getting claim reference for %s: %v\n", claim.Name, err)
+ }
+
+ storageClass, _ := claim.Annotations[qosProvisioningKey]
+
+ // the creation of this volume is the bind to the claim.
+ // The claim will match the volume during the next sync period when the volume is in the local cache
+ newVolume.Spec.ClaimRef = claimRef
+ newVolume.Annotations[pvProvisioningRequiredAnnotationKey] = "true"
+ newVolume.Annotations[qosProvisioningKey] = storageClass
+ newVolume, err = controller.client.CreatePersistentVolume(newVolume)
+ glog.V(5).Infof("Unprovisioned PersistentVolume[%s] created for PVC[%s], which will be fulfilled in the storage provider", newVolume.Name, claim.Name)
+ if err != nil {
+ return fmt.Errorf("PersistentVolumeClaim[%s] failed provisioning: %+v", claim.Name, err)
+ }
+
+ claim.Annotations[pvProvisioningRequiredAnnotationKey] = pvProvisioningCompletedAnnotationValue
+ _, err = controller.client.UpdatePersistentVolumeClaim(claim)
+ if err != nil {
+ glog.Error("error updating persistent volume claim: %v", err)
+ }
+
+ return nil
+}
+
+func (controller *PersistentVolumeProvisionerController) reconcileVolume(pv *api.PersistentVolume) error {
+ glog.V(5).Infof("PersistentVolume[%s] reconciling", pv.Name)
+
+ if pv.Spec.ClaimRef == nil {
+ glog.V(5).Infof("PersistentVolume[%s] is not bound to a claim. No provisioning required", pv.Name)
+ return nil
+ }
+
+ // TODO: fix this leaky abstraction. Had to make our own store key because ClaimRef fails the default keyfunc (no Meta on object).
+ obj, exists, _ := controller.claimStore.GetByKey(fmt.Sprintf("%s/%s", pv.Spec.ClaimRef.Namespace, pv.Spec.ClaimRef.Name))
+ if !exists {
+ return fmt.Errorf("PersistentVolumeClaim[%s/%s] not found in local cache", pv.Spec.ClaimRef.Namespace, pv.Spec.ClaimRef.Name)
+ }
+
+ claim, ok := obj.(*api.PersistentVolumeClaim)
+ if !ok {
+ return fmt.Errorf("PersistentVolumeClaim expected, but got %v", obj)
+ }
+
+ // no provisioning required, volume is ready and Bound
+ if !keyExists(pvProvisioningRequiredAnnotationKey, pv.Annotations) {
+ glog.V(5).Infof("PersistentVolume[%s] does not require provisioning", pv.Name)
+ return nil
+ }
+
+ // provisioning is completed, volume is ready.
+ if isProvisioningComplete(pv) {
+ glog.V(5).Infof("PersistentVolume[%s] is bound and provisioning is complete", pv.Name)
+ if pv.Spec.ClaimRef.Namespace != claim.Namespace || pv.Spec.ClaimRef.Name != claim.Name {
+ return fmt.Errorf("pre-bind mismatch - expected %s but found %s/%s", claimToClaimKey(claim), pv.Spec.ClaimRef.Namespace, pv.Spec.ClaimRef.Name)
+ }
+ return nil
+ }
+
+ // provisioning is incomplete. Attempt to provision the volume.
+ glog.V(5).Infof("PersistentVolume[%s] provisioning in progress", pv.Name)
+ err := provisionVolume(pv, controller)
+ if err != nil {
+ return fmt.Errorf("Error provisioning PersistentVolume[%s]: %v", err)
+ }
+
+ return nil
+}
+
+// provisionVolume provisions a volume that has been created in the cluster but not yet fulfilled by
+// the storage provider.
+func provisionVolume(pv *api.PersistentVolume, controller *PersistentVolumeProvisionerController) error {
+ if isProvisioningComplete(pv) {
+ return fmt.Errorf("PersistentVolume[%s] is already provisioned", pv.Name)
+ }
+
+ if _, exists := pv.Annotations[qosProvisioningKey]; !exists {
+ return fmt.Errorf("PersistentVolume[%s] does not contain a provisioning request. Provisioning not required.", pv.Name)
+ }
+
+ if controller.provisioner == nil {
+ return fmt.Errorf("No provisioner found for volume: %s", pv.Name)
+ }
+
+ // Find the claim in local cache
+ obj, exists, _ := controller.claimStore.GetByKey(fmt.Sprintf("%s/%s", pv.Spec.ClaimRef.Namespace, pv.Spec.ClaimRef.Name))
+ if !exists {
+ return fmt.Errorf("Could not find PersistentVolumeClaim[%s/%s] in local cache", pv.Spec.ClaimRef.Name, pv.Name)
+ }
+ claim := obj.(*api.PersistentVolumeClaim)
+
+ provisioner, _ := newProvisioner(controller.provisioner, claim)
+ err := provisioner.Provision(pv)
+ if err != nil {
+ glog.Errorf("Could not provision %s", pv.Name)
+ pv.Status.Phase = api.VolumeFailed
+ pv.Status.Message = err.Error()
+ if pv, apiErr := controller.client.UpdatePersistentVolumeStatus(pv); apiErr != nil {
+ return fmt.Errorf("PersistentVolume[%s] failed provisioning and also failed status update: %v - %v", pv.Name, err, apiErr)
+ }
+ return fmt.Errorf("PersistentVolume[%s] failed provisioning : %v", pv.Name, err, err)
+ }
+
+ clone, err := conversion.NewCloner().DeepCopy(pv)
+ volumeClone, ok := clone.(*api.PersistentVolume)
+ if !ok {
+ return fmt.Errorf("Unexpected pv cast error : %v\n", volumeClone)
+ }
+ volumeClone.Annotations[pvProvisioningRequiredAnnotationKey] = pvProvisioningCompletedAnnotationValue
+
+ pv, err = controller.client.UpdatePersistentVolume(volumeClone)
+ if err != nil {
+ // TODO: https://github.com/kubernetes/kubernetes/issues/14443
+ // the volume was created in the infrastructure and likely has a PV name on it,
+ // but we failed to save the annotation that marks the volume as provisioned.
+ return fmt.Errorf("Error updating PersistentVolume[%s] with provisioning completed annotation. There is a potential for dupes and orphans.", volumeClone.Name)
+ }
+ return nil
+}
+
+// Run starts all of this controller's control loops
+func (controller *PersistentVolumeProvisionerController) Run() {
+ glog.V(5).Infof("Starting PersistentVolumeProvisionerController\n")
+ if controller.stopChannels == nil {
+ controller.stopChannels = make(map[string]chan struct{})
+ }
+
+ if _, exists := controller.stopChannels[volumesStopChannel]; !exists {
+ controller.stopChannels[volumesStopChannel] = make(chan struct{})
+ go controller.volumeController.Run(controller.stopChannels[volumesStopChannel])
+ }
+
+ if _, exists := controller.stopChannels[claimsStopChannel]; !exists {
+ controller.stopChannels[claimsStopChannel] = make(chan struct{})
+ go controller.claimController.Run(controller.stopChannels[claimsStopChannel])
+ }
+}
+
+// Stop gracefully shuts down this controller
+func (controller *PersistentVolumeProvisionerController) Stop() {
+ glog.V(5).Infof("Stopping PersistentVolumeProvisionerController\n")
+ for name, stopChan := range controller.stopChannels {
+ close(stopChan)
+ delete(controller.stopChannels, name)
+ }
+}
+
+func newProvisioner(plugin volume.ProvisionableVolumePlugin, claim *api.PersistentVolumeClaim) (volume.Provisioner, error) {
+ volumeOptions := volume.VolumeOptions{
+ Capacity: claim.Spec.Resources.Requests[api.ResourceName(api.ResourceStorage)],
+ AccessModes: claim.Spec.AccessModes,
+ PersistentVolumeReclaimPolicy: api.PersistentVolumeReclaimDelete,
+ }
+
+ provisioner, err := plugin.NewProvisioner(volumeOptions)
+ return provisioner, err
+}
+
+// controllerClient abstracts access to PVs and PVCs. Easy to mock for testing and wrap for real client.
+type controllerClient interface {
+ CreatePersistentVolume(pv *api.PersistentVolume) (*api.PersistentVolume, error)
+ ListPersistentVolumes(options unversioned.ListOptions) (*api.PersistentVolumeList, error)
+ WatchPersistentVolumes(options unversioned.ListOptions) (watch.Interface, error)
+ GetPersistentVolume(name string) (*api.PersistentVolume, error)
+ UpdatePersistentVolume(volume *api.PersistentVolume) (*api.PersistentVolume, error)
+ DeletePersistentVolume(volume *api.PersistentVolume) error
+ UpdatePersistentVolumeStatus(volume *api.PersistentVolume) (*api.PersistentVolume, error)
+
+ GetPersistentVolumeClaim(namespace, name string) (*api.PersistentVolumeClaim, error)
+ ListPersistentVolumeClaims(namespace string, options unversioned.ListOptions) (*api.PersistentVolumeClaimList, error)
+ WatchPersistentVolumeClaims(namespace string, options unversioned.ListOptions) (watch.Interface, error)
+ UpdatePersistentVolumeClaim(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error)
+ UpdatePersistentVolumeClaimStatus(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error)
+
+ // provided to give VolumeHost and plugins access to the kube client
+ GetKubeClient() client.Interface
+}
+
+func NewControllerClient(c client.Interface) controllerClient {
+ return &realControllerClient{c}
+}
+
+var _ controllerClient = &realControllerClient{}
+
+type realControllerClient struct {
+ client client.Interface
+}
+
+func (c *realControllerClient) GetPersistentVolume(name string) (*api.PersistentVolume, error) {
+ return c.client.PersistentVolumes().Get(name)
+}
+
+func (c *realControllerClient) ListPersistentVolumes(options unversioned.ListOptions) (*api.PersistentVolumeList, error) {
+ return c.client.PersistentVolumes().List(options)
+}
+
+func (c *realControllerClient) WatchPersistentVolumes(options unversioned.ListOptions) (watch.Interface, error) {
+ return c.client.PersistentVolumes().Watch(options)
+}
+
+func (c *realControllerClient) CreatePersistentVolume(pv *api.PersistentVolume) (*api.PersistentVolume, error) {
+ return c.client.PersistentVolumes().Create(pv)
+}
+
+func (c *realControllerClient) UpdatePersistentVolume(volume *api.PersistentVolume) (*api.PersistentVolume, error) {
+ return c.client.PersistentVolumes().Update(volume)
+}
+
+func (c *realControllerClient) DeletePersistentVolume(volume *api.PersistentVolume) error {
+ return c.client.PersistentVolumes().Delete(volume.Name)
+}
+
+func (c *realControllerClient) UpdatePersistentVolumeStatus(volume *api.PersistentVolume) (*api.PersistentVolume, error) {
+ return c.client.PersistentVolumes().UpdateStatus(volume)
+}
+
+func (c *realControllerClient) GetPersistentVolumeClaim(namespace, name string) (*api.PersistentVolumeClaim, error) {
+ return c.client.PersistentVolumeClaims(namespace).Get(name)
+}
+
+func (c *realControllerClient) ListPersistentVolumeClaims(namespace string, options unversioned.ListOptions) (*api.PersistentVolumeClaimList, error) {
+ return c.client.PersistentVolumeClaims(namespace).List(options)
+}
+
+func (c *realControllerClient) WatchPersistentVolumeClaims(namespace string, options unversioned.ListOptions) (watch.Interface, error) {
+ return c.client.PersistentVolumeClaims(namespace).Watch(options)
+}
+
+func (c *realControllerClient) UpdatePersistentVolumeClaim(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error) {
+ return c.client.PersistentVolumeClaims(claim.Namespace).Update(claim)
+}
+
+func (c *realControllerClient) UpdatePersistentVolumeClaimStatus(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error) {
+ return c.client.PersistentVolumeClaims(claim.Namespace).UpdateStatus(claim)
+}
+
+func (c *realControllerClient) GetKubeClient() client.Interface {
+ return c.client
+}
+
+func keyExists(key string, haystack map[string]string) bool {
+ _, exists := haystack[key]
+ return exists
+}
+
+func isProvisioningComplete(pv *api.PersistentVolume) bool {
+ return isAnnotationMatch(pvProvisioningRequiredAnnotationKey, pvProvisioningCompletedAnnotationValue, pv.Annotations)
+}
+
+func isAnnotationMatch(key, needle string, haystack map[string]string) bool {
+ value, exists := haystack[key]
+ if !exists {
+ return false
+ }
+ return value == needle
+}
+
+func isRecyclable(policy api.PersistentVolumeReclaimPolicy) bool {
+ return policy == api.PersistentVolumeReclaimDelete || policy == api.PersistentVolumeReclaimRecycle
+}
+
+// VolumeHost implementation
+// PersistentVolumeRecycler is host to the volume plugins, but does not actually mount any volumes.
+// Because no mounting is performed, most of the VolumeHost methods are not implemented.
+func (c *PersistentVolumeProvisionerController) GetPluginDir(podUID string) string {
+ return ""
+}
+
+func (c *PersistentVolumeProvisionerController) GetPodVolumeDir(podUID types.UID, pluginName, volumeName string) string {
+ return ""
+}
+
+func (c *PersistentVolumeProvisionerController) GetPodPluginDir(podUID types.UID, pluginName string) string {
+ return ""
+}
+
+func (c *PersistentVolumeProvisionerController) GetKubeClient() client.Interface {
+ return c.client.GetKubeClient()
+}
+
+func (c *PersistentVolumeProvisionerController) NewWrapperBuilder(spec *volume.Spec, pod *api.Pod, opts volume.VolumeOptions) (volume.Builder, error) {
+ return nil, fmt.Errorf("NewWrapperBuilder not supported by PVClaimBinder's VolumeHost implementation")
+}
+
+func (c *PersistentVolumeProvisionerController) NewWrapperCleaner(spec *volume.Spec, podUID types.UID) (volume.Cleaner, error) {
+ return nil, fmt.Errorf("NewWrapperCleaner not supported by PVClaimBinder's VolumeHost implementation")
+}
+
+func (c *PersistentVolumeProvisionerController) GetCloudProvider() cloudprovider.Interface {
+ return c.cloud
+}
+
+func (c *PersistentVolumeProvisionerController) GetMounter() mount.Interface {
+ return nil
+}
+
+func (c *PersistentVolumeProvisionerController) GetWriter() io.Writer {
+ return nil
+}
+
+func (c *PersistentVolumeProvisionerController) GetHostName() string {
+ return ""
+}
+
+const (
+ // these pair of constants are used by the provisioner.
+ // The key is a kube namespaced key that denotes a volume requires provisioning.
+ // The value is set only when provisioning is completed. Any other value will tell the provisioner
+ // that provisioning has not yet occurred.
+ pvProvisioningRequiredAnnotationKey = "volume.experimental.kubernetes.io/provisioning-required"
+ pvProvisioningCompletedAnnotationValue = "volume.experimental.kubernetes.io/provisioning-completed"
+)
diff --git a/pkg/controller/persistentvolume/persistentvolume_provisioner_controller_test.go b/pkg/controller/persistentvolume/persistentvolume_provisioner_controller_test.go
new file mode 100644
index 0000000000000..08eb03383a5ab
--- /dev/null
+++ b/pkg/controller/persistentvolume/persistentvolume_provisioner_controller_test.go
@@ -0,0 +1,240 @@
+/*
+Copyright 2015 The Kubernetes Authors All rights reserved.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package persistentvolume
+
+import (
+ "fmt"
+ "testing"
+ "time"
+
+ "k8s.io/kubernetes/pkg/api"
+ "k8s.io/kubernetes/pkg/api/errors"
+ "k8s.io/kubernetes/pkg/api/resource"
+ "k8s.io/kubernetes/pkg/api/testapi"
+ "k8s.io/kubernetes/pkg/api/unversioned"
+ client "k8s.io/kubernetes/pkg/client/unversioned"
+ fake_cloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake"
+ "k8s.io/kubernetes/pkg/util"
+ "k8s.io/kubernetes/pkg/volume"
+ "k8s.io/kubernetes/pkg/watch"
+)
+
+func TestProvisionerRunStop(t *testing.T) {
+ controller, _ := makeTestController()
+
+ if len(controller.stopChannels) != 0 {
+ t.Errorf("Non-running provisioner should not have any stopChannels. Got %v", len(controller.stopChannels))
+ }
+
+ controller.Run()
+
+ if len(controller.stopChannels) != 2 {
+ t.Errorf("Running provisioner should have exactly 2 stopChannels. Got %v", len(controller.stopChannels))
+ }
+
+ controller.Stop()
+
+ if len(controller.stopChannels) != 0 {
+ t.Errorf("Non-running provisioner should not have any stopChannels. Got %v", len(controller.stopChannels))
+ }
+}
+
+func makeTestVolume() *api.PersistentVolume {
+ return &api.PersistentVolume{
+ ObjectMeta: api.ObjectMeta{
+ Annotations: map[string]string{},
+ Name: "pv01",
+ },
+ Spec: api.PersistentVolumeSpec{
+ PersistentVolumeReclaimPolicy: api.PersistentVolumeReclaimDelete,
+ AccessModes: []api.PersistentVolumeAccessMode{api.ReadWriteOnce},
+ Capacity: api.ResourceList{
+ api.ResourceName(api.ResourceStorage): resource.MustParse("10Gi"),
+ },
+ PersistentVolumeSource: api.PersistentVolumeSource{
+ HostPath: &api.HostPathVolumeSource{
+ Path: "/tmp/data01",
+ },
+ },
+ },
+ }
+}
+
+func makeTestClaim() *api.PersistentVolumeClaim {
+ return &api.PersistentVolumeClaim{
+ ObjectMeta: api.ObjectMeta{
+ Annotations: map[string]string{},
+ Name: "claim01",
+ Namespace: "ns",
+ SelfLink: testapi.Default.SelfLink("pvc", ""),
+ },
+ Spec: api.PersistentVolumeClaimSpec{
+ AccessModes: []api.PersistentVolumeAccessMode{api.ReadWriteOnce},
+ Resources: api.ResourceRequirements{
+ Requests: api.ResourceList{
+ api.ResourceName(api.ResourceStorage): resource.MustParse("8G"),
+ },
+ },
+ },
+ }
+}
+
+func makeTestController() (*PersistentVolumeProvisionerController, *mockControllerClient) {
+ mockClient := &mockControllerClient{}
+ mockVolumePlugin := &volume.FakeVolumePlugin{}
+ controller, _ := NewPersistentVolumeProvisionerController(mockClient, 1*time.Second, nil, mockVolumePlugin, &fake_cloud.FakeCloud{})
+ return controller, mockClient
+}
+
+func TestReconcileClaim(t *testing.T) {
+ controller, mockClient := makeTestController()
+ pvc := makeTestClaim()
+
+ // watch would have added the claim to the store
+ controller.claimStore.Add(pvc)
+ err := controller.reconcileClaim(pvc)
+ if err != nil {
+ t.Errorf("Unexpected error: %v", err)
+ }
+
+ // non-provisionable PVC should not have created a volume on reconciliation
+ if mockClient.volume != nil {
+ t.Error("Unexpected volume found in mock client. Expected nil")
+ }
+
+ pvc.Annotations[qosProvisioningKey] = "foo"
+
+ err = controller.reconcileClaim(pvc)
+ if err != nil {
+ t.Errorf("Unexpected error: %v", err)
+ }
+
+ // PVC requesting provisioning should have a PV created for it
+ if mockClient.volume == nil {
+ t.Error("Expected to find bound volume but got nil")
+ }
+
+ if mockClient.volume.Spec.ClaimRef.Name != pvc.Name {
+ t.Errorf("Expected PV to be bound to %s but got %s", mockClient.volume.Spec.ClaimRef.Name, pvc.Name)
+ }
+}
+
+func TestReconcileVolume(t *testing.T) {
+
+ controller, mockClient := makeTestController()
+ pv := makeTestVolume()
+ pvc := makeTestClaim()
+
+ err := controller.reconcileVolume(pv)
+ if err != nil {
+ t.Errorf("Unexpected error %v", err)
+ }
+
+ // watch adds claim to the store.
+ // we need to add it to our mock client to mimic normal Get call
+ controller.claimStore.Add(pvc)
+ mockClient.claim = pvc
+
+ // pretend the claim and volume are bound, no provisioning required
+ claimRef, _ := api.GetReference(pvc)
+ pv.Spec.ClaimRef = claimRef
+ err = controller.reconcileVolume(pv)
+ if err != nil {
+ t.Errorf("Unexpected error %v", err)
+ }
+
+ pv.Annotations[pvProvisioningRequiredAnnotationKey] = "!pvProvisioningCompleted"
+ pv.Annotations[qosProvisioningKey] = "foo"
+ err = controller.reconcileVolume(pv)
+
+ if !isAnnotationMatch(pvProvisioningRequiredAnnotationKey, pvProvisioningCompletedAnnotationValue, mockClient.volume.Annotations) {
+ t.Errorf("Expected %s but got %s", pvProvisioningRequiredAnnotationKey, mockClient.volume.Annotations[pvProvisioningRequiredAnnotationKey])
+ }
+}
+
+var _ controllerClient = &mockControllerClient{}
+
+type mockControllerClient struct {
+ volume *api.PersistentVolume
+ claim *api.PersistentVolumeClaim
+}
+
+func (c *mockControllerClient) GetPersistentVolume(name string) (*api.PersistentVolume, error) {
+ return c.volume, nil
+}
+
+func (c *mockControllerClient) CreatePersistentVolume(pv *api.PersistentVolume) (*api.PersistentVolume, error) {
+ if pv.GenerateName != "" && pv.Name == "" {
+ pv.Name = fmt.Sprintf(pv.GenerateName, util.NewUUID())
+ }
+ c.volume = pv
+ return c.volume, nil
+}
+
+func (c *mockControllerClient) ListPersistentVolumes(options unversioned.ListOptions) (*api.PersistentVolumeList, error) {
+ return &api.PersistentVolumeList{
+ Items: []api.PersistentVolume{*c.volume},
+ }, nil
+}
+
+func (c *mockControllerClient) WatchPersistentVolumes(options unversioned.ListOptions) (watch.Interface, error) {
+ return watch.NewFake(), nil
+}
+
+func (c *mockControllerClient) UpdatePersistentVolume(pv *api.PersistentVolume) (*api.PersistentVolume, error) {
+ return c.CreatePersistentVolume(pv)
+}
+
+func (c *mockControllerClient) DeletePersistentVolume(volume *api.PersistentVolume) error {
+ c.volume = nil
+ return nil
+}
+
+func (c *mockControllerClient) UpdatePersistentVolumeStatus(volume *api.PersistentVolume) (*api.PersistentVolume, error) {
+ return volume, nil
+}
+
+func (c *mockControllerClient) GetPersistentVolumeClaim(namespace, name string) (*api.PersistentVolumeClaim, error) {
+ if c.claim != nil {
+ return c.claim, nil
+ } else {
+ return nil, errors.NewNotFound("persistentVolume", name)
+ }
+}
+
+func (c *mockControllerClient) ListPersistentVolumeClaims(namespace string, options unversioned.ListOptions) (*api.PersistentVolumeClaimList, error) {
+ return &api.PersistentVolumeClaimList{
+ Items: []api.PersistentVolumeClaim{*c.claim},
+ }, nil
+}
+
+func (c *mockControllerClient) WatchPersistentVolumeClaims(namespace string, options unversioned.ListOptions) (watch.Interface, error) {
+ return watch.NewFake(), nil
+}
+
+func (c *mockControllerClient) UpdatePersistentVolumeClaim(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error) {
+ c.claim = claim
+ return c.claim, nil
+}
+
+func (c *mockControllerClient) UpdatePersistentVolumeClaimStatus(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error) {
+ return claim, nil
+}
+
+func (c *mockControllerClient) GetKubeClient() client.Interface {
+ return nil
+}
diff --git a/pkg/controller/persistentvolume/persistentvolume_recycler_controller.go b/pkg/controller/persistentvolume/persistentvolume_recycler_controller.go
index 0cd6de4085013..629c465e2d4c6 100644
--- a/pkg/controller/persistentvolume/persistentvolume_recycler_controller.go
+++ b/pkg/controller/persistentvolume/persistentvolume_recycler_controller.go
@@ -46,14 +46,16 @@ type PersistentVolumeRecycler struct {
client recyclerClient
kubeClient client.Interface
pluginMgr volume.VolumePluginMgr
+ cloud cloudprovider.Interface
}
// PersistentVolumeRecycler creates a new PersistentVolumeRecycler
-func NewPersistentVolumeRecycler(kubeClient client.Interface, syncPeriod time.Duration, plugins []volume.VolumePlugin) (*PersistentVolumeRecycler, error) {
+func NewPersistentVolumeRecycler(kubeClient client.Interface, syncPeriod time.Duration, plugins []volume.VolumePlugin, cloud cloudprovider.Interface) (*PersistentVolumeRecycler, error) {
recyclerClient := NewRecyclerClient(kubeClient)
recycler := &PersistentVolumeRecycler{
client: recyclerClient,
kubeClient: kubeClient,
+ cloud: cloud,
}
if err := recycler.pluginMgr.InitPlugins(plugins, recycler); err != nil {
@@ -283,7 +285,7 @@ func (f *PersistentVolumeRecycler) NewWrapperCleaner(spec *volume.Spec, podUID t
}
func (f *PersistentVolumeRecycler) GetCloudProvider() cloudprovider.Interface {
- return nil
+ return f.cloud
}
func (f *PersistentVolumeRecycler) GetMounter() mount.Interface {
diff --git a/pkg/controller/persistentvolume/types.go b/pkg/controller/persistentvolume/types.go
index c022f97a3c564..04046d2044501 100644
--- a/pkg/controller/persistentvolume/types.go
+++ b/pkg/controller/persistentvolume/types.go
@@ -21,15 +21,15 @@ import (
"sort"
"k8s.io/kubernetes/pkg/api"
- "k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/client/cache"
)
const (
- // A PV created specifically for one claim must contain this annotation in order to bind to the claim.
- // The value must be the namespace and name of the claim being bound to (i.e, claim.Namespace/claim.Name)
- // This is an experimental feature and likely to change in the future.
- createdForKey = "volume.extensions.kubernetes.io/provisioned-for"
+ // A PVClaim can request a quality of service tier by adding this annotation. The value of the annotation
+ // is arbitrary. The values are pre-defined by a cluster admin and known to users when requesting a QoS.
+ // For example tiers might be gold, silver, and tin and the admin configures what that means for each volume plugin that can provision a volume.
+ // Values in the alpha version of this feature are not meaningful, but will be in the full version of this feature.
+ qosProvisioningKey = "volume.alpha.kubernetes.io/storage-class"
)
// persistentVolumeOrderedIndex is a cache.Store that keeps persistent volumes indexed by AccessModes and ordered by storage capacity.
@@ -80,10 +80,7 @@ func (pvIndex *persistentVolumeOrderedIndex) ListByAccessModes(modes []api.Persi
type matchPredicate func(compareThis, toThis *api.PersistentVolume) bool
// find returns the nearest PV from the ordered list or nil if a match is not found
-func (pvIndex *persistentVolumeOrderedIndex) find(searchPV *api.PersistentVolume, matchPredicate matchPredicate) (*api.PersistentVolume, error) {
- // the 'searchPV' argument is a synthetic PV with capacity and accessmodes set according to the user's PersistentVolumeClaim.
- // the synthetic pv arg is, therefore, a request for a storage resource.
- //
+func (pvIndex *persistentVolumeOrderedIndex) findByClaim(claim *api.PersistentVolumeClaim, matchPredicate matchPredicate) (*api.PersistentVolume, error) {
// PVs are indexed by their access modes to allow easier searching. Each index is the string representation of a set of access modes.
// There is a finite number of possible sets and PVs will only be indexed in one of them (whichever index matches the PV's modes).
//
@@ -92,17 +89,7 @@ func (pvIndex *persistentVolumeOrderedIndex) find(searchPV *api.PersistentVolume
//
// Searches are performed against a set of access modes, so we can attempt not only the exact matching modes but also
// potential matches (the GCEPD example above).
- allPossibleModes := pvIndex.allPossibleMatchingAccessModes(searchPV.Spec.AccessModes)
-
- // the searchPV should contain an annotation that allows pre-binding to a claim.
- // we can use the same annotation value (pvc's namespace/name) and check against
- // existing volumes to find an exact match. It is possible that a bind is made (ClaimRef persisted to PV)
- // but the fail to update claim.Spec.VolumeName fails. This check allows the claim to find the volume
- // that's already bound to the claim.
- preboundClaim := ""
- if createdFor, ok := searchPV.Annotations[createdForKey]; ok {
- preboundClaim = createdFor
- }
+ allPossibleModes := pvIndex.allPossibleMatchingAccessModes(claim.Spec.AccessModes)
for _, modes := range allPossibleModes {
volumes, err := pvIndex.ListByAccessModes(modes)
@@ -115,19 +102,34 @@ func (pvIndex *persistentVolumeOrderedIndex) find(searchPV *api.PersistentVolume
// return the exact pre-binding match, if found
unboundVolumes := []*api.PersistentVolume{}
for _, volume := range volumes {
+ // volume isn't currently bound or pre-bound.
if volume.Spec.ClaimRef == nil {
- // volume isn't currently bound or pre-bound.
unboundVolumes = append(unboundVolumes, volume)
continue
}
- boundClaim := fmt.Sprintf("%s/%s", volume.Spec.ClaimRef.Namespace, volume.Spec.ClaimRef.Name)
- if boundClaim == preboundClaim {
+ if claim.Name == volume.Spec.ClaimRef.Name && claim.Namespace == volume.Spec.ClaimRef.Namespace {
// exact match! No search required.
return volume, nil
}
}
+ // a claim requesting provisioning will have an exact match pre-bound to the claim.
+ // no need to search through unbound volumes. The matching volume will be created by the provisioner
+ // and will match above when the claim is re-processed by the binder.
+ if keyExists(qosProvisioningKey, claim.Annotations) {
+ return nil, nil
+ }
+
+ searchPV := &api.PersistentVolume{
+ Spec: api.PersistentVolumeSpec{
+ AccessModes: claim.Spec.AccessModes,
+ Capacity: api.ResourceList{
+ api.ResourceName(api.ResourceStorage): claim.Spec.Resources.Requests[api.ResourceName(api.ResourceStorage)],
+ },
+ },
+ }
+
i := sort.Search(len(unboundVolumes), func(i int) bool { return matchPredicate(searchPV, unboundVolumes[i]) })
if i < len(unboundVolumes) {
return unboundVolumes[i], nil
@@ -136,27 +138,9 @@ func (pvIndex *persistentVolumeOrderedIndex) find(searchPV *api.PersistentVolume
return nil, nil
}
-// findByAccessModesAndStorageCapacity is a convenience method that calls Find w/ requisite matchPredicate for storage
-func (pvIndex *persistentVolumeOrderedIndex) findByAccessModesAndStorageCapacity(prebindKey string, modes []api.PersistentVolumeAccessMode, qty resource.Quantity) (*api.PersistentVolume, error) {
- pv := &api.PersistentVolume{
- ObjectMeta: api.ObjectMeta{
- Annotations: map[string]string{
- createdForKey: prebindKey,
- },
- },
- Spec: api.PersistentVolumeSpec{
- AccessModes: modes,
- Capacity: api.ResourceList{
- api.ResourceName(api.ResourceStorage): qty,
- },
- },
- }
- return pvIndex.find(pv, matchStorageCapacity)
-}
-
// findBestMatchForClaim is a convenience method that finds a volume by the claim's AccessModes and requests for Storage
func (pvIndex *persistentVolumeOrderedIndex) findBestMatchForClaim(claim *api.PersistentVolumeClaim) (*api.PersistentVolume, error) {
- return pvIndex.findByAccessModesAndStorageCapacity(fmt.Sprintf("%s/%s", claim.Namespace, claim.Name), claim.Spec.AccessModes, claim.Spec.Resources.Requests[api.ResourceName(api.ResourceStorage)])
+ return pvIndex.findByClaim(claim, matchStorageCapacity)
}
// byCapacity is used to order volumes by ascending storage size
@@ -268,3 +252,7 @@ func (c byAccessModes) Swap(i, j int) {
func (c byAccessModes) Len() int {
return len(c.modes)
}
+
+func claimToClaimKey(claim *api.PersistentVolumeClaim) string {
+ return fmt.Sprintf("%s/%s", claim.Namespace, claim.Name)
+}
diff --git a/pkg/volume/host_path/host_path.go b/pkg/volume/host_path/host_path.go
index 526bac2f0e518..0882479856b61 100644
--- a/pkg/volume/host_path/host_path.go
+++ b/pkg/volume/host_path/host_path.go
@@ -22,7 +22,6 @@ import (
"regexp"
"k8s.io/kubernetes/pkg/api"
- "k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/volume"
@@ -35,11 +34,11 @@ import (
func ProbeVolumePlugins(volumeConfig volume.VolumeConfig) []volume.VolumePlugin {
return []volume.VolumePlugin{
&hostPathPlugin{
- host: nil,
- newRecyclerFunc: newRecycler,
- newDeleterFunc: newDeleter,
- newCreaterFunc: newCreater,
- config: volumeConfig,
+ host: nil,
+ newRecyclerFunc: newRecycler,
+ newDeleterFunc: newDeleter,
+ newProvisionerFunc: newProvisioner,
+ config: volumeConfig,
},
}
}
@@ -47,28 +46,28 @@ func ProbeVolumePlugins(volumeConfig volume.VolumeConfig) []volume.VolumePlugin
func ProbeRecyclableVolumePlugins(recyclerFunc func(spec *volume.Spec, host volume.VolumeHost, volumeConfig volume.VolumeConfig) (volume.Recycler, error), volumeConfig volume.VolumeConfig) []volume.VolumePlugin {
return []volume.VolumePlugin{
&hostPathPlugin{
- host: nil,
- newRecyclerFunc: recyclerFunc,
- newCreaterFunc: newCreater,
- config: volumeConfig,
+ host: nil,
+ newRecyclerFunc: recyclerFunc,
+ newProvisionerFunc: newProvisioner,
+ config: volumeConfig,
},
}
}
type hostPathPlugin struct {
host volume.VolumeHost
- // decouple creating Recyclers/Deleters/Creaters by deferring to a function. Allows for easier testing.
- newRecyclerFunc func(spec *volume.Spec, host volume.VolumeHost, volumeConfig volume.VolumeConfig) (volume.Recycler, error)
- newDeleterFunc func(spec *volume.Spec, host volume.VolumeHost) (volume.Deleter, error)
- newCreaterFunc func(options volume.VolumeOptions, host volume.VolumeHost) (volume.Creater, error)
- config volume.VolumeConfig
+ // decouple creating Recyclers/Deleters/Provisioners by deferring to a function. Allows for easier testing.
+ newRecyclerFunc func(spec *volume.Spec, host volume.VolumeHost, volumeConfig volume.VolumeConfig) (volume.Recycler, error)
+ newDeleterFunc func(spec *volume.Spec, host volume.VolumeHost) (volume.Deleter, error)
+ newProvisionerFunc func(options volume.VolumeOptions, host volume.VolumeHost) (volume.Provisioner, error)
+ config volume.VolumeConfig
}
var _ volume.VolumePlugin = &hostPathPlugin{}
var _ volume.PersistentVolumePlugin = &hostPathPlugin{}
var _ volume.RecyclableVolumePlugin = &hostPathPlugin{}
var _ volume.DeletableVolumePlugin = &hostPathPlugin{}
-var _ volume.CreatableVolumePlugin = &hostPathPlugin{}
+var _ volume.ProvisionableVolumePlugin = &hostPathPlugin{}
const (
hostPathPluginName = "kubernetes.io/host-path"
@@ -124,11 +123,11 @@ func (plugin *hostPathPlugin) NewDeleter(spec *volume.Spec) (volume.Deleter, err
return plugin.newDeleterFunc(spec, plugin.host)
}
-func (plugin *hostPathPlugin) NewCreater(options volume.VolumeOptions) (volume.Creater, error) {
+func (plugin *hostPathPlugin) NewProvisioner(options volume.VolumeOptions) (volume.Provisioner, error) {
if len(options.AccessModes) == 0 {
options.AccessModes = plugin.GetAccessModes()
}
- return plugin.newCreaterFunc(options, plugin.host)
+ return plugin.newProvisionerFunc(options, plugin.host)
}
func newRecycler(spec *volume.Spec, host volume.VolumeHost, config volume.VolumeConfig) (volume.Recycler, error) {
@@ -154,8 +153,8 @@ func newDeleter(spec *volume.Spec, host volume.VolumeHost) (volume.Deleter, erro
return &hostPathDeleter{spec.Name(), path, host, volume.NewMetricsDu(path)}, nil
}
-func newCreater(options volume.VolumeOptions, host volume.VolumeHost) (volume.Creater, error) {
- return &hostPathCreater{options: options, host: host}, nil
+func newProvisioner(options volume.VolumeOptions, host volume.VolumeHost) (volume.Provisioner, error) {
+ return &hostPathProvisioner{options: options, host: host}, nil
}
// HostPath volumes represent a bare host file or directory mount.
@@ -215,7 +214,7 @@ func (c *hostPathCleaner) TearDownAt(dir string) error {
return fmt.Errorf("TearDownAt() does not make sense for host paths")
}
-// hostPathRecycler implements a dynamic provisioning Recycler for the HostPath plugin
+// hostPathRecycler implements a Recycler for the HostPath plugin
// This implementation is meant for testing only and only works in a single node cluster
type hostPathRecycler struct {
name string
@@ -246,34 +245,36 @@ func (r *hostPathRecycler) Recycle() error {
return volume.RecycleVolumeByWatchingPodUntilCompletion(pod, r.host.GetKubeClient())
}
-// hostPathCreater implements a dynamic provisioning Creater for the HostPath plugin
+// hostPathProvisioner implements a Provisioner for the HostPath plugin
// This implementation is meant for testing only and only works in a single node cluster.
-type hostPathCreater struct {
+type hostPathProvisioner struct {
host volume.VolumeHost
options volume.VolumeOptions
}
// Create for hostPath simply creates a local /tmp/hostpath_pv/%s directory as a new PersistentVolume.
-// This Creater is meant for development and testing only and WILL NOT WORK in a multi-node cluster.
-func (r *hostPathCreater) Create() (*api.PersistentVolume, error) {
- fullpath := fmt.Sprintf("/tmp/hostpath_pv/%s", util.NewUUID())
- err := os.MkdirAll(fullpath, 0750)
- if err != nil {
- return nil, err
+// This Provisioner is meant for development and testing only and WILL NOT WORK in a multi-node cluster.
+func (r *hostPathProvisioner) Provision(pv *api.PersistentVolume) error {
+ if pv.Spec.HostPath == nil {
+ return fmt.Errorf("pv.Spec.HostPath cannot be nil")
}
+ return os.MkdirAll(pv.Spec.HostPath.Path, 0750)
+}
+func (r *hostPathProvisioner) NewPersistentVolumeTemplate() (*api.PersistentVolume, error) {
+ fullpath := fmt.Sprintf("/tmp/hostpath_pv/%s", util.NewUUID())
return &api.PersistentVolume{
ObjectMeta: api.ObjectMeta{
GenerateName: "pv-hostpath-",
- Labels: map[string]string{
- "createdby": "hostpath dynamic provisioner",
+ Annotations: map[string]string{
+ "kubernetes.io/createdby": "hostpath-dynamic-provisioner",
},
},
Spec: api.PersistentVolumeSpec{
PersistentVolumeReclaimPolicy: r.options.PersistentVolumeReclaimPolicy,
AccessModes: r.options.AccessModes,
Capacity: api.ResourceList{
- api.ResourceName(api.ResourceStorage): resource.MustParse(fmt.Sprintf("%dMi", r.options.CapacityMB)),
+ api.ResourceName(api.ResourceStorage): r.options.Capacity,
},
PersistentVolumeSource: api.PersistentVolumeSource{
HostPath: &api.HostPathVolumeSource{
diff --git a/pkg/volume/host_path/host_path_test.go b/pkg/volume/host_path/host_path_test.go
index f5681ea9a3fc0..a82157c600f96 100644
--- a/pkg/volume/host_path/host_path_test.go
+++ b/pkg/volume/host_path/host_path_test.go
@@ -145,7 +145,7 @@ func TestDeleterTempDir(t *testing.T) {
}
}
-func TestCreater(t *testing.T) {
+func TestProvisioner(t *testing.T) {
tempPath := "/tmp/hostpath/"
defer os.RemoveAll(tempPath)
err := os.MkdirAll(tempPath, 0750)
@@ -157,18 +157,18 @@ func TestCreater(t *testing.T) {
if err != nil {
t.Errorf("Can't find the plugin by name")
}
- creater, err := plug.NewCreater(volume.VolumeOptions{CapacityMB: 100, PersistentVolumeReclaimPolicy: api.PersistentVolumeReclaimDelete})
+ creater, err := plug.NewProvisioner(volume.VolumeOptions{Capacity: resource.MustParse("1Gi"), PersistentVolumeReclaimPolicy: api.PersistentVolumeReclaimDelete})
if err != nil {
- t.Errorf("Failed to make a new Creater: %v", err)
+ t.Errorf("Failed to make a new Provisioner: %v", err)
}
- pv, err := creater.Create()
+ pv, err := creater.NewPersistentVolumeTemplate()
if err != nil {
t.Errorf("Unexpected error creating volume: %v", err)
}
if pv.Spec.HostPath.Path == "" {
t.Errorf("Expected pv.Spec.HostPath.Path to not be empty: %#v", pv)
}
- expectedCapacity := resource.NewQuantity(100*1024*1024, resource.BinarySI)
+ expectedCapacity := resource.NewQuantity(1*1024*1024*1024, resource.BinarySI)
actualCapacity := pv.Spec.Capacity[api.ResourceStorage]
expectedAmt := expectedCapacity.Value()
actualAmt := actualCapacity.Value()
diff --git a/pkg/volume/plugins.go b/pkg/volume/plugins.go
index 3a172889dc885..91cd7a04d4607 100644
--- a/pkg/volume/plugins.go
+++ b/pkg/volume/plugins.go
@@ -23,6 +23,7 @@ import (
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
+ "k8s.io/kubernetes/pkg/api/resource"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/types"
@@ -39,11 +40,11 @@ type VolumeOptions struct {
// it will be replaced and expanded on by future SecurityContext work.
RootContext string
- // The attributes below are required by volume.Creater
- // perhaps CreaterVolumeOptions struct?
+ // The attributes below are required by volume.Provisioner
+ // TODO: refactor all of this out of volumes when an admin can configure many kinds of provisioners.
- // CapacityMB is the size in MB of a volume.
- CapacityMB int
+ // Capacity is the size of a volume.
+ Capacity resource.Quantity
// AccessModes of a volume
AccessModes []api.PersistentVolumeAccessMode
// Reclamation policy for a persistent volume
@@ -106,12 +107,12 @@ type DeletableVolumePlugin interface {
NewDeleter(spec *Spec) (Deleter, error)
}
-// CreatableVolumePlugin is an extended interface of VolumePlugin and is used to create volumes for the cluster.
-type CreatableVolumePlugin interface {
+// ProvisionableVolumePlugin is an extended interface of VolumePlugin and is used to create volumes for the cluster.
+type ProvisionableVolumePlugin interface {
VolumePlugin
- // NewCreater creates a new volume.Creater which knows how to create PersistentVolumes in accordance with
+ // NewProvisioner creates a new volume.Provisioner which knows how to create PersistentVolumes in accordance with
// the plugin's underlying storage provider
- NewCreater(options VolumeOptions) (Creater, error)
+ NewProvisioner(options VolumeOptions) (Provisioner, error)
}
// VolumeHost is an interface that plugins can use to access the kubelet.
@@ -365,13 +366,13 @@ func (pm *VolumePluginMgr) FindDeletablePluginBySpec(spec *Spec) (DeletableVolum
// FindCreatablePluginBySpec fetches a persistent volume plugin by name. If no plugin
// is found, returns error.
-func (pm *VolumePluginMgr) FindCreatablePluginBySpec(spec *Spec) (CreatableVolumePlugin, error) {
+func (pm *VolumePluginMgr) FindCreatablePluginBySpec(spec *Spec) (ProvisionableVolumePlugin, error) {
volumePlugin, err := pm.FindPluginBySpec(spec)
if err != nil {
return nil, err
}
- if creatableVolumePlugin, ok := volumePlugin.(CreatableVolumePlugin); ok {
- return creatableVolumePlugin, nil
+ if provisionableVolumePlugin, ok := volumePlugin.(ProvisionableVolumePlugin); ok {
+ return provisionableVolumePlugin, nil
}
return nil, fmt.Errorf("no creatable volume plugin matched")
}
diff --git a/pkg/volume/testing.go b/pkg/volume/testing.go
index 267a5a1838cb5..9f34a5eef4136 100644
--- a/pkg/volume/testing.go
+++ b/pkg/volume/testing.go
@@ -117,10 +117,13 @@ func ProbeVolumePlugins(config VolumeConfig) []VolumePlugin {
type FakeVolumePlugin struct {
PluginName string
Host VolumeHost
+ Config VolumeConfig
}
var _ VolumePlugin = &FakeVolumePlugin{}
var _ RecyclableVolumePlugin = &FakeVolumePlugin{}
+var _ DeletableVolumePlugin = &FakeVolumePlugin{}
+var _ ProvisionableVolumePlugin = &FakeVolumePlugin{}
func (plugin *FakeVolumePlugin) Init(host VolumeHost) {
plugin.Host = host
@@ -151,6 +154,10 @@ func (plugin *FakeVolumePlugin) NewDeleter(spec *Spec) (Deleter, error) {
return &FakeDeleter{"/attributesTransferredFromSpec", MetricsNil{}}, nil
}
+func (plugin *FakeVolumePlugin) NewProvisioner(options VolumeOptions) (Provisioner, error) {
+ return &FakeProvisioner{options, plugin.Host}, nil
+}
+
func (plugin *FakeVolumePlugin) GetAccessModes() []api.PersistentVolumeAccessMode {
return []api.PersistentVolumeAccessMode{}
}
@@ -227,3 +234,36 @@ func (fd *FakeDeleter) Delete() error {
func (fd *FakeDeleter) GetPath() string {
return fd.path
}
+
+type FakeProvisioner struct {
+ Options VolumeOptions
+ Host VolumeHost
+}
+
+func (fc *FakeProvisioner) NewPersistentVolumeTemplate() (*api.PersistentVolume, error) {
+ fullpath := fmt.Sprintf("/tmp/hostpath_pv/%s", util.NewUUID())
+ return &api.PersistentVolume{
+ ObjectMeta: api.ObjectMeta{
+ GenerateName: "pv-fakeplugin-",
+ Annotations: map[string]string{
+ "kubernetes.io/createdby": "fakeplugin-provisioner",
+ },
+ },
+ Spec: api.PersistentVolumeSpec{
+ PersistentVolumeReclaimPolicy: fc.Options.PersistentVolumeReclaimPolicy,
+ AccessModes: fc.Options.AccessModes,
+ Capacity: api.ResourceList{
+ api.ResourceName(api.ResourceStorage): fc.Options.Capacity,
+ },
+ PersistentVolumeSource: api.PersistentVolumeSource{
+ HostPath: &api.HostPathVolumeSource{
+ Path: fullpath,
+ },
+ },
+ },
+ }, nil
+}
+
+func (fc *FakeProvisioner) Provision(pv *api.PersistentVolume) error {
+ return nil
+}
diff --git a/pkg/volume/volume.go b/pkg/volume/volume.go
index ee9d213aae8af..aab01158f2bf9 100644
--- a/pkg/volume/volume.go
+++ b/pkg/volume/volume.go
@@ -100,10 +100,16 @@ type Recycler interface {
Recycle() error
}
-// Create adds a new resource in the storage provider and creates a PersistentVolume for the new resource.
-// Calls to Create should block until complete.
-type Creater interface {
- Create() (*api.PersistentVolume, error)
+// Provisioner is an interface that creates templates for PersistentVolumes and can create the volume
+// as a new resource in the infrastructure provider.
+type Provisioner interface {
+ // Provision creates the resource by allocating the underlying volume in a storage system.
+ // This method should block until completion.
+ Provision(*api.PersistentVolume) error
+ // NewPersistentVolumeTemplate creates a new PersistentVolume to be used as a template before saving.
+ // The provisioner will want to tweak its properties, assign correct annotations, etc.
+ // This func should *NOT* persist the PV in the API. That is left to the caller.
+ NewPersistentVolumeTemplate() (*api.PersistentVolume, error)
}
// Delete removes the resource from the underlying storage provider. Calls to this method should block until
@@ -111,6 +117,7 @@ type Creater interface {
// A nil return indicates success.
type Deleter interface {
Volume
+ // This method should block until completion.
Delete() error
}
diff --git a/test/integration/persistent_volumes_test.go b/test/integration/persistent_volumes_test.go
index 99dfdd21987ea..2b7daf67fd978 100644
--- a/test/integration/persistent_volumes_test.go
+++ b/test/integration/persistent_volumes_test.go
@@ -29,6 +29,7 @@ import (
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/api/unversioned"
client "k8s.io/kubernetes/pkg/client/unversioned"
+ fake_cloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake"
persistentvolumecontroller "k8s.io/kubernetes/pkg/controller/persistentvolume"
"k8s.io/kubernetes/pkg/conversion"
"k8s.io/kubernetes/pkg/volume"
@@ -48,12 +49,16 @@ func TestPersistentVolumeRecycler(t *testing.T) {
binderClient := client.NewOrDie(&client.Config{Host: s.URL, GroupVersion: testapi.Default.GroupVersion()})
recyclerClient := client.NewOrDie(&client.Config{Host: s.URL, GroupVersion: testapi.Default.GroupVersion()})
testClient := client.NewOrDie(&client.Config{Host: s.URL, GroupVersion: testapi.Default.GroupVersion()})
+ host := volume.NewFakeVolumeHost("/tmp/fake", nil, nil)
- binder := persistentvolumecontroller.NewPersistentVolumeClaimBinder(binderClient, 10*time.Minute)
+ plugins := []volume.VolumePlugin{&volume.FakeVolumePlugin{"plugin-name", host, volume.VolumeConfig{}}}
+ cloud := &fake_cloud.FakeCloud{}
+
+ binder := persistentvolumecontroller.NewPersistentVolumeClaimBinder(binderClient, 10*time.Second)
binder.Run()
defer binder.Stop()
- recycler, _ := persistentvolumecontroller.NewPersistentVolumeRecycler(recyclerClient, 30*time.Minute, []volume.VolumePlugin{&volume.FakeVolumePlugin{"plugin-name", volume.NewFakeVolumeHost("/tmp/fake", nil, nil)}})
+ recycler, _ := persistentvolumecontroller.NewPersistentVolumeRecycler(recyclerClient, 30*time.Second, plugins, cloud)
recycler.Run()
defer recycler.Stop()