Skip to content

Commit

Permalink
Automatically install CRDs during controller init
Browse files Browse the repository at this point in the history
  • Loading branch information
saad-ali committed Aug 31, 2018
1 parent 7d673cb commit fdeb895
Show file tree
Hide file tree
Showing 11 changed files with 128 additions and 15 deletions.
1 change: 1 addition & 0 deletions cmd/kube-controller-manager/app/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ go_library(
"//pkg/volume/util:go_default_library",
"//pkg/volume/vsphere_volume:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
Expand Down
4 changes: 4 additions & 0 deletions cmd/kube-controller-manager/app/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"net/http"

"k8s.io/api/core/v1"
apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
"k8s.io/apimachinery/pkg/runtime/schema"
utilfeature "k8s.io/apiserver/pkg/util/feature"
cacheddiscovery "k8s.io/client-go/discovery/cached"
Expand Down Expand Up @@ -197,10 +198,13 @@ func startAttachDetachController(ctx ControllerContext) (http.Handler, bool, err
// csiClient works with CRDs that support json only
csiClientConfig.ContentType = "application/json"

crdClientConfig := ctx.ClientBuilder.ConfigOrDie("attachdetach-controller")

attachDetachController, attachDetachControllerErr :=
attachdetach.NewAttachDetachController(
ctx.ClientBuilder.ClientOrDie("attachdetach-controller"),
csiclientset.NewForConfigOrDie(csiClientConfig),
apiextensionsclient.NewForConfigOrDie(crdClientConfig),
ctx.InformerFactory.Core().V1().Pods(),
ctx.InformerFactory.Core().V1().Nodes(),
ctx.InformerFactory.Core().V1().PersistentVolumeClaims(),
Expand Down
7 changes: 7 additions & 0 deletions pkg/controller/volume/attachdetach/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,23 @@ go_library(
"//pkg/controller/volume/attachdetach/reconciler:go_default_library",
"//pkg/controller/volume/attachdetach/statusupdater:go_default_library",
"//pkg/controller/volume/attachdetach/util:go_default_library",
"//pkg/features:go_default_library",
"//pkg/util/mount:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/volume/util:go_default_library",
"//pkg/volume/util/operationexecutor:go_default_library",
"//pkg/volume/util/volumepathhandler:go_default_library",
"//staging/src/k8s.io/api/authentication/v1:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1:go_default_library",
"//staging/src/k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library",
Expand All @@ -39,6 +44,7 @@ go_library(
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
"//staging/src/k8s.io/csi-api/pkg/apis/csi/v1alpha1:go_default_library",
"//staging/src/k8s.io/csi-api/pkg/client/clientset/versioned:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
],
Expand All @@ -54,6 +60,7 @@ go_test(
"//pkg/controller/volume/attachdetach/testing:go_default_library",
"//pkg/volume:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/fake:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
Expand Down
89 changes: 83 additions & 6 deletions pkg/controller/volume/attachdetach/attach_detach_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,21 @@ package attachdetach
import (
"fmt"
"net"
"reflect"
"time"

"github.com/golang/glog"
authenticationv1 "k8s.io/api/authentication/v1"
"k8s.io/api/core/v1"
apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
Expand All @@ -39,7 +44,8 @@ import (
kcache "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
csiclientset "k8s.io/csi-api/pkg/client/clientset/versioned"
csiapiv1alpha1 "k8s.io/csi-api/pkg/apis/csi/v1alpha1"
csiclient "k8s.io/csi-api/pkg/client/clientset/versioned"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
Expand All @@ -48,6 +54,7 @@ import (
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/reconciler"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/statusupdater"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/util"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/volume"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
Expand Down Expand Up @@ -97,7 +104,8 @@ type AttachDetachController interface {
// NewAttachDetachController returns a new instance of AttachDetachController.
func NewAttachDetachController(
kubeClient clientset.Interface,
csiClient csiclientset.Interface,
csiClient csiclient.Interface,
crdClient apiextensionsclient.Interface,
podInformer coreinformers.PodInformer,
nodeInformer coreinformers.NodeInformer,
pvcInformer coreinformers.PersistentVolumeClaimInformer,
Expand Down Expand Up @@ -125,6 +133,7 @@ func NewAttachDetachController(
adc := &attachDetachController{
kubeClient: kubeClient,
csiClient: csiClient,
crdClient: crdClient,
pvcLister: pvcInformer.Lister(),
pvcsSynced: pvcInformer.Informer().HasSynced,
pvLister: pvInformer.Lister(),
Expand All @@ -138,6 +147,11 @@ func NewAttachDetachController(
pvcQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pvcs"),
}

// Install required CSI CRDs on API server
if utilfeature.DefaultFeatureGate.Enabled(features.CSICrdAutoInstall) {
adc.installCRDs()
}

if err := adc.volumePluginMgr.InitPlugins(plugins, prober, adc); err != nil {
return nil, fmt.Errorf("Could not initialize volume plugins for Attach/Detach Controller: %+v", err)
}
Expand Down Expand Up @@ -240,9 +254,13 @@ type attachDetachController struct {
// the API server.
kubeClient clientset.Interface

// csiClient is the csi.storage.k8s.io API client used by volumehost to communicate with
// the API server.
csiClient csiclientset.Interface
// csiClient is the client used to read/write csi.storage.k8s.io API objects
// from the API server.
csiClient csiclient.Interface

// crdClient is the client used to read/write apiextensions.k8s.io objects
// from the API server.
crdClient apiextensionsclient.Interface

// pvcLister is the shared PVC lister used to fetch and store PVC
// objects from the API server. It is shared with other controllers and
Expand Down Expand Up @@ -649,6 +667,65 @@ func (adc *attachDetachController) processVolumesInUse(
}
}

// installCRDs creates the specified CustomResourceDefinition for the CSIDrivers object.
func (adc *attachDetachController) installCRDs() error {
crd := &apiextensionsv1beta1.CustomResourceDefinition{
ObjectMeta: metav1.ObjectMeta{
Name: csiapiv1alpha1.CsiDriverResourcePlural + "." + csiapiv1alpha1.GroupName,
},
Spec: apiextensionsv1beta1.CustomResourceDefinitionSpec{
Group: csiapiv1alpha1.GroupName,
Version: csiapiv1alpha1.SchemeGroupVersion.Version,
Scope: apiextensionsv1beta1.ClusterScoped,
Names: apiextensionsv1beta1.CustomResourceDefinitionNames{
Plural: csiapiv1alpha1.CsiDriverResourcePlural,
Kind: reflect.TypeOf(csiapiv1alpha1.CSIDriver{}).Name(),
},
},
}
res, err := adc.crdClient.ApiextensionsV1beta1().CustomResourceDefinitions().Create(crd)

if err != nil && !apierrors.IsAlreadyExists(err) {
glog.Errorf("failed to create CSIDrivers CRD: %#v, err: %#v",
res, err)
} else if apierrors.IsAlreadyExists(err) {
glog.Warningf("CSIDrivers CRD already exists: %#v, err: %#v",
res, err)
} else {
glog.Infof("CSIDrivers CRD created successfully: %#v",
res)
}

crd = &apiextensionsv1beta1.CustomResourceDefinition{
ObjectMeta: metav1.ObjectMeta{
Name: csiapiv1alpha1.CsiNodeInfoResourcePlural + "." + csiapiv1alpha1.GroupName,
},
Spec: apiextensionsv1beta1.CustomResourceDefinitionSpec{
Group: csiapiv1alpha1.GroupName,
Version: csiapiv1alpha1.SchemeGroupVersion.Version,
Scope: apiextensionsv1beta1.ClusterScoped,
Names: apiextensionsv1beta1.CustomResourceDefinitionNames{
Plural: csiapiv1alpha1.CsiNodeInfoResourcePlural,
Kind: reflect.TypeOf(csiapiv1alpha1.CSINodeInfo{}).Name(),
},
},
}
res, err = adc.crdClient.ApiextensionsV1beta1().CustomResourceDefinitions().Create(crd)

if err != nil && !apierrors.IsAlreadyExists(err) {
glog.Errorf("failed to create CSINodeInfo CRD: %#v, err: %#v",
res, err)
} else if apierrors.IsAlreadyExists(err) {
glog.Warningf("CSINodeInfo CRD already exists: %#v, err: %#v",
res, err)
} else {
glog.Infof("CSINodeInfo CRD created successfully: %#v",
res)
}

return nil
}

// VolumeHost implementation
// This is an unfortunate requirement of the current factoring of volume plugin
// initializing code. It requires kubelet specific methods used by the mounting
Expand Down Expand Up @@ -759,6 +836,6 @@ func (adc *attachDetachController) GetEventRecorder() record.EventRecorder {
return adc.recorder
}

func (adc *attachDetachController) GetCSIClient() csiclientset.Interface {
func (adc *attachDetachController) GetCSIClient() csiclient.Interface {
return adc.csiClient
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"k8s.io/api/core/v1"
fakeapiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/fake"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -36,11 +37,13 @@ func Test_NewAttachDetachController_Positive(t *testing.T) {
// Arrange
fakeKubeClient := controllervolumetesting.CreateTestClient()
informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc())
fakeApiExtensionsClient := fakeapiextensionsclient.NewSimpleClientset()

// Act
_, err := NewAttachDetachController(
fakeKubeClient,
nil, /* csiClient */
fakeApiExtensionsClient, /* crdClient */
informerFactory.Core().V1().Pods(),
informerFactory.Core().V1().Nodes(),
informerFactory.Core().V1().PersistentVolumeClaims(),
Expand Down Expand Up @@ -146,6 +149,7 @@ func Test_AttachDetachControllerRecovery(t *testing.T) {

func attachDetachRecoveryTestCase(t *testing.T, extraPods1 []*v1.Pod, extraPods2 []*v1.Pod) {
fakeKubeClient := controllervolumetesting.CreateTestClient()
fakeApiExtensionsClient := fakeapiextensionsclient.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, time.Second*1)
//informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, time.Second*1)
plugins := controllervolumetesting.CreateTestPlugin()
Expand Down Expand Up @@ -215,7 +219,8 @@ func attachDetachRecoveryTestCase(t *testing.T, extraPods1 []*v1.Pod, extraPods2
// Create the controller
adcObj, err := NewAttachDetachController(
fakeKubeClient,
nil,
nil, /* csiClient */
fakeApiExtensionsClient, /* crdClient */
informerFactory.Core().V1().Pods(),
informerFactory.Core().V1().Nodes(),
informerFactory.Core().V1().PersistentVolumeClaims(),
Expand Down
9 changes: 8 additions & 1 deletion pkg/features/kube_features.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,11 +194,17 @@ const (
VolumeScheduling utilfeature.Feature = "VolumeScheduling"

// owner: @vladimirvivien
// alpha: v1.9
// beta: v1.10
//
// Enable mount/attachment of Container Storage Interface (CSI) backed PVs
CSIPersistentVolume utilfeature.Feature = "CSIPersistentVolume"

// owner: @saad-ali
// alpha: v1.12
//
// Enable automatic installation of CRD for csi.storage.k8s.io API objects.
CSICrdAutoInstall utilfeature.Feature = "CSICrdAutoInstall"

// owner @MrHohn
// beta: v1.10
//
Expand Down Expand Up @@ -400,6 +406,7 @@ var defaultKubernetesFeatureGates = map[utilfeature.Feature]utilfeature.FeatureS
MountContainers: {Default: false, PreRelease: utilfeature.Alpha},
VolumeScheduling: {Default: true, PreRelease: utilfeature.Beta},
CSIPersistentVolume: {Default: true, PreRelease: utilfeature.Beta},
CSICrdAutoInstall: {Default: false, PreRelease: utilfeature.Alpha},
CustomPodDNS: {Default: true, PreRelease: utilfeature.Beta},
BlockVolume: {Default: false, PreRelease: utilfeature.Alpha},
StorageObjectInUseProtection: {Default: true, PreRelease: utilfeature.GA},
Expand Down
12 changes: 10 additions & 2 deletions staging/src/k8s.io/csi-api/pkg/apis/csi/v1alpha1/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,16 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
)

// GroupName is the group name use in this package
const GroupName = "csi.storage.k8s.io"
const (
// GroupName is the group name use in this package
GroupName string = "csi.storage.k8s.io"

// CsiDriverResourcePlural is the plural name of the CSIDriver resource
CsiDriverResourcePlural string = "csidrivers"

// CsiNodeInfoResourcePlural is the plural name of the CSINode resource
CsiNodeInfoResourcePlural string = "csinodeinfos"
)

// SchemeGroupVersion is group version used to register these objects
var SchemeGroupVersion = schema.GroupVersion{Group: GroupName, Version: "v1alpha1"}
Expand Down
8 changes: 4 additions & 4 deletions test/cmd/crd.sh
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ run_crd_tests() {
__EOF__

# Post-Condition: assertion object exist
kube::test::get_object_assert customresourcedefinitions "{{range.items}}{{$id_field}}:{{end}}" 'foos.company.com:'
kube::test::get_object_assert customresourcedefinitions "{{range.items}}{{if eq $id_field \\\"foos.company.com\\\"}}{{$id_field}}:{{end}}{{end}}" 'foos.company.com:'

kubectl "${kube_flags_with_token[@]}" create -f - << __EOF__
{
Expand All @@ -66,7 +66,7 @@ __EOF__
__EOF__

# Post-Condition: assertion object exist
kube::test::get_object_assert customresourcedefinitions "{{range.items}}{{$id_field}}:{{end}}" 'bars.company.com:foos.company.com:'
kube::test::get_object_assert customresourcedefinitions "{{range.items}}{{if eq $id_field \\\"foos.company.com\\\" \\\"bars.company.com\\\"}}{{$id_field}}:{{end}}{{end}}" 'bars.company.com:foos.company.com:'

# This test ensures that the name printer is able to output a resource
# in the proper "kind.group/resource_name" format, and that the
Expand All @@ -93,7 +93,7 @@ __EOF__
__EOF__

# Post-Condition: assertion crd with non-matching kind and resource exists
kube::test::get_object_assert customresourcedefinitions "{{range.items}}{{$id_field}}:{{end}}" 'bars.company.com:foos.company.com:resources.mygroup.example.com:'
kube::test::get_object_assert customresourcedefinitions "{{range.items}}{{if eq $id_field \\\"foos.company.com\\\" \\\"bars.company.com\\\" \\\"resources.mygroup.example.com\\\"}}{{$id_field}}:{{end}}{{end}}" 'bars.company.com:foos.company.com:resources.mygroup.example.com:'

# This test ensures that we can create complex validation without client-side validation complaining
kubectl "${kube_flags_with_token[@]}" create -f - << __EOF__
Expand Down Expand Up @@ -128,7 +128,7 @@ __EOF__
__EOF__

# Post-Condition: assertion crd with non-matching kind and resource exists
kube::test::get_object_assert customresourcedefinitions "{{range.items}}{{$id_field}}:{{end}}" 'bars.company.com:foos.company.com:resources.mygroup.example.com:validfoos.company.com:'
kube::test::get_object_assert customresourcedefinitions "{{range.items}}{{if eq $id_field \\\"foos.company.com\\\" \\\"bars.company.com\\\" \\\"resources.mygroup.example.com\\\" \\\"validfoos.company.com\\\"}}{{$id_field}}:{{end}}{{end}}" 'bars.company.com:foos.company.com:resources.mygroup.example.com:validfoos.company.com:'

run_non_native_resource_tests

Expand Down
2 changes: 1 addition & 1 deletion test/cmd/old-print.sh
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ run_kubectl_old_print_tests() {
__EOF__

# Post-Condition: assertion object exists
kube::test::get_object_assert customresourcedefinitions "{{range.items}}{{$id_field}}:{{end}}" 'foos.company.com:'
kube::test::get_object_assert customresourcedefinitions "{{range.items}}{{if eq $id_field \\\"foos.company.com\\\"}}{{$id_field}}:{{end}}{{end}}" 'foos.company.com:'

# Test that we can list this new CustomResource
kube::test::get_object_assert foos "{{range.items}}{{$id_field}}:{{end}}" ''
Expand Down
1 change: 1 addition & 0 deletions test/integration/volume/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ go_test(
"//pkg/volume/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/storage/v1:go_default_library",
"//staging/src/k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/fake:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
Expand Down
3 changes: 3 additions & 0 deletions test/integration/volume/attach_detach_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

"k8s.io/api/core/v1"
fakeapiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/fake"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand Down Expand Up @@ -393,6 +394,7 @@ func createAdClients(ns *v1.Namespace, t *testing.T, server *httptest.Server, sy
}
resyncPeriod := 12 * time.Hour
testClient := clientset.NewForConfigOrDie(&config)
fakeApiExtensionsClient := fakeapiextensionsclient.NewSimpleClientset()

host := volumetest.NewFakeVolumeHost("/tmp/fake", nil, nil)
plugin := &volumetest.FakeVolumePlugin{
Expand All @@ -413,6 +415,7 @@ func createAdClients(ns *v1.Namespace, t *testing.T, server *httptest.Server, sy
ctrl, err := attachdetach.NewAttachDetachController(
testClient,
nil, /* csiClient */
fakeApiExtensionsClient, /* crdClient */
informers.Core().V1().Pods(),
informers.Core().V1().Nodes(),
informers.Core().V1().PersistentVolumeClaims(),
Expand Down

0 comments on commit fdeb895

Please sign in to comment.