Skip to content

Commit

Permalink
move new etcd storage into cacher
Browse files Browse the repository at this point in the history
  • Loading branch information
hongchaodeng committed Aug 13, 2016
1 parent 2b34988 commit d093809
Show file tree
Hide file tree
Showing 88 changed files with 188 additions and 173 deletions.
8 changes: 4 additions & 4 deletions cmd/kube-apiserver/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,11 @@ func Run(s *options.APIServer) error {
if s.ServiceAccountLookup {
// If we need to look up service accounts and tokens,
// go directly to etcd to avoid recursive auth insanity
storage, err := storageFactory.New(api.Resource("serviceaccounts"))
storageConfig, err := storageFactory.NewConfig(api.Resource("serviceaccounts"))
if err != nil {
glog.Fatalf("Unable to get serviceaccounts storage: %v", err)
}
serviceAccountGetter = serviceaccountcontroller.NewGetterFromStorageInterface(storage, storageFactory.ResourcePrefix(api.Resource("serviceaccounts")), storageFactory.ResourcePrefix(api.Resource("secrets")))
serviceAccountGetter = serviceaccountcontroller.NewGetterFromStorageInterface(storageConfig, storageFactory.ResourcePrefix(api.Resource("serviceaccounts")), storageFactory.ResourcePrefix(api.Resource("secrets")))
}

authenticator, err := authenticator.New(authenticator.AuthenticatorConfig{
Expand Down Expand Up @@ -227,11 +227,11 @@ func Run(s *options.APIServer) error {

if modeEnabled(apiserver.ModeRBAC) {
mustGetRESTOptions := func(resource string) generic.RESTOptions {
s, err := storageFactory.New(rbac.Resource(resource))
config, err := storageFactory.NewConfig(rbac.Resource(resource))
if err != nil {
glog.Fatalf("Unable to get %s storage: %v", resource, err)
}
return generic.RESTOptions{Storage: s, Decorator: generic.UndecoratedStorage, ResourcePrefix: storageFactory.ResourcePrefix(rbac.Resource(resource))}
return generic.RESTOptions{StorageConfig: config, Decorator: generic.UndecoratedStorage, ResourcePrefix: storageFactory.ResourcePrefix(rbac.Resource(resource))}
}

// For initial bootstrapping go directly to etcd to avoid privillege escalation check.
Expand Down
6 changes: 3 additions & 3 deletions examples/apiserver/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,13 @@ func Run(serverOptions *genericoptions.ServerRunOptions) error {
return fmt.Errorf("%v", err)
}
storageFactory := newStorageFactory()
storage, err := storageFactory.New(unversioned.GroupResource{Group: groupName, Resource: "testtype"})
storageConfig, err := storageFactory.NewConfig(unversioned.GroupResource{Group: groupName, Resource: "testtype"})
if err != nil {
return fmt.Errorf("Unable to get storage: %v", err)
return fmt.Errorf("Unable to get storage config: %v", err)
}

restStorageMap := map[string]rest.Storage{
"testtypes": testgroupetcd.NewREST(storage, s.StorageDecorator()),
"testtypes": testgroupetcd.NewREST(storageConfig, s.StorageDecorator()),
}
apiGroupInfo := genericapiserver.APIGroupInfo{
GroupMeta: *groupMeta,
Expand Down
5 changes: 3 additions & 2 deletions examples/apiserver/rest/reststorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,21 @@ import (
"k8s.io/kubernetes/pkg/registry/generic/registry"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
"k8s.io/kubernetes/pkg/storage/storagebackend"
)

type REST struct {
*registry.Store
}

// NewREST returns a RESTStorage object that will work with testtype.
func NewREST(s storage.Interface, storageDecorator generic.StorageDecorator) *REST {
func NewREST(config *storagebackend.Config, storageDecorator generic.StorageDecorator) *REST {
prefix := "/testtype"
newListFunc := func() runtime.Object { return &testgroup.TestTypeList{} }
// Usually you should reuse your RESTCreateStrategy.
strategy := &NotNamespaceScoped{}
storageInterface := storageDecorator(
s, 100, &testgroup.TestType{}, prefix, strategy, newListFunc, storage.NoTriggerPublisher)
config, 100, &testgroup.TestType{}, prefix, strategy, newListFunc, storage.NoTriggerPublisher)
store := &registry.Store{
NewFunc: func() runtime.Object { return &testgroup.TestType{} },
// NewListFunc returns an object capable of storing results of an etcd list.
Expand Down
6 changes: 3 additions & 3 deletions federation/cmd/federation-apiserver/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,12 +162,12 @@ func Run(s *options.ServerRunOptions) error {
}

func createRESTOptionsOrDie(s *options.ServerRunOptions, g *genericapiserver.GenericAPIServer, f genericapiserver.StorageFactory, resource unversioned.GroupResource) generic.RESTOptions {
storage, err := f.New(resource)
config, err := f.NewConfig(resource)
if err != nil {
glog.Fatalf("Unable to find storage destination for %v, due to %v", resource, err.Error())
glog.Fatalf("Unable to find storage config for %v, due to %v", resource, err.Error())
}
return generic.RESTOptions{
Storage: storage,
StorageConfig: config,
Decorator: g.StorageDecorator(),
DeleteCollectionWorkers: s.DeleteCollectionWorkers,
ResourcePrefix: f.ResourcePrefix(resource),
Expand Down
2 changes: 1 addition & 1 deletion federation/registry/cluster/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) {

newListFunc := func() runtime.Object { return &federation.ClusterList{} }
storageInterface := opts.Decorator(
opts.Storage,
opts.StorageConfig,
100,
&federation.Cluster{},
prefix,
Expand Down
4 changes: 2 additions & 2 deletions federation/registry/cluster/etcd/etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ import (
)

func newStorage(t *testing.T) (*REST, *etcdtesting.EtcdTestServer) {
etcdStorage, server := registrytest.NewEtcdStorage(t, federation.GroupName)
storageConfig, server := registrytest.NewEtcdStorage(t, federation.GroupName)
restOptions := generic.RESTOptions{
Storage: etcdStorage,
StorageConfig: storageConfig,
Decorator: generic.UndecoratedStorage,
DeleteCollectionWorkers: 1}
storage, _ := NewREST(restOptions)
Expand Down
8 changes: 4 additions & 4 deletions pkg/controller/serviceaccount/tokengetter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
serviceaccountregistry "k8s.io/kubernetes/pkg/registry/serviceaccount"
serviceaccountetcd "k8s.io/kubernetes/pkg/registry/serviceaccount/etcd"
"k8s.io/kubernetes/pkg/serviceaccount"
"k8s.io/kubernetes/pkg/storage"
"k8s.io/kubernetes/pkg/storage/storagebackend"
)

// clientGetter implements ServiceAccountTokenGetter using a clientset.Interface
Expand Down Expand Up @@ -69,9 +69,9 @@ func (r *registryGetter) GetSecret(namespace, name string) (*api.Secret, error)

// NewGetterFromStorageInterface returns a ServiceAccountTokenGetter that
// uses the specified storage to retrieve service accounts and secrets.
func NewGetterFromStorageInterface(s storage.Interface, saPrefix, secretPrefix string) serviceaccount.ServiceAccountTokenGetter {
func NewGetterFromStorageInterface(config *storagebackend.Config, saPrefix, secretPrefix string) serviceaccount.ServiceAccountTokenGetter {
return NewGetterFromRegistries(
serviceaccountregistry.NewRegistry(serviceaccountetcd.NewREST(generic.RESTOptions{Storage: s, Decorator: generic.UndecoratedStorage, ResourcePrefix: saPrefix})),
secret.NewRegistry(secretetcd.NewREST(generic.RESTOptions{Storage: s, Decorator: generic.UndecoratedStorage, ResourcePrefix: secretPrefix})),
serviceaccountregistry.NewRegistry(serviceaccountetcd.NewREST(generic.RESTOptions{StorageConfig: config, Decorator: generic.UndecoratedStorage, ResourcePrefix: saPrefix})),
secret.NewRegistry(secretetcd.NewREST(generic.RESTOptions{StorageConfig: config, Decorator: generic.UndecoratedStorage, ResourcePrefix: secretPrefix})),
)
}
18 changes: 4 additions & 14 deletions pkg/genericapiserver/storage_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@ import (
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/runtime/serializer/recognizer"
"k8s.io/kubernetes/pkg/runtime/serializer/versioning"
"k8s.io/kubernetes/pkg/storage"
"k8s.io/kubernetes/pkg/storage/storagebackend"
storagebackendfactory "k8s.io/kubernetes/pkg/storage/storagebackend/factory"
"k8s.io/kubernetes/pkg/util/sets"

"github.com/golang/glog"
Expand All @@ -37,7 +35,7 @@ import (
type StorageFactory interface {
// New finds the storage destination for the given group and resource. It will
// return an error if the group has no storage destination configured.
New(groupResource unversioned.GroupResource) (storage.Interface, error)
NewConfig(groupResource unversioned.GroupResource) (*storagebackend.Config, error)

// ResourcePrefix returns the overridden resource prefix for the GroupResource
// This allows for cohabitation of resources with different native types and provides
Expand Down Expand Up @@ -76,9 +74,6 @@ type DefaultStorageFactory struct {

// newStorageCodecFn exists to be overwritten for unit testing.
newStorageCodecFn func(storageMediaType string, ns runtime.StorageSerializer, storageVersion, memoryVersion unversioned.GroupVersion, config storagebackend.Config) (codec runtime.Codec, err error)

// newStorageFn exists to be overwritten for unit testing.
newStorageFn func(config storagebackend.Config, codec runtime.Codec) (etcdStorage storage.Interface, err error)
}

type groupResourceOverrides struct {
Expand Down Expand Up @@ -118,7 +113,6 @@ func NewDefaultStorageFactory(config storagebackend.Config, defaultMediaType str
APIResourceConfigSource: resourceConfig,

newStorageCodecFn: NewStorageCodec,
newStorageFn: newStorage,
}
}

Expand Down Expand Up @@ -173,7 +167,7 @@ func (s *DefaultStorageFactory) getStorageGroupResource(groupResource unversione

// New finds the storage destination for the given group and resource. It will
// return an error if the group has no storage destination configured.
func (s *DefaultStorageFactory) New(groupResource unversioned.GroupResource) (storage.Interface, error) {
func (s *DefaultStorageFactory) NewConfig(groupResource unversioned.GroupResource) (*storagebackend.Config, error) {
chosenStorageResource := s.getStorageGroupResource(groupResource)

groupOverride := s.Overrides[getAllResourcesAlias(chosenStorageResource)]
Expand Down Expand Up @@ -232,12 +226,8 @@ func (s *DefaultStorageFactory) New(groupResource unversioned.GroupResource) (st
}

glog.V(3).Infof("storing %v in %v, reading as %v from %v", groupResource, storageEncodingVersion, internalVersion, config)
return s.newStorageFn(config, codec)
}

// newStorage is the default implementation for creating a storage backend.
func newStorage(config storagebackend.Config, codec runtime.Codec) (etcdStorage storage.Interface, err error) {
return storagebackendfactory.Create(config, codec)
config.Codec = codec
return &config, nil
}

// Get all backends for all registered storage destinations.
Expand Down
21 changes: 6 additions & 15 deletions pkg/genericapiserver/storage_factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ import (
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/genericapiserver/options"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
"k8s.io/kubernetes/pkg/storage/storagebackend"
)

Expand All @@ -50,38 +48,31 @@ func TestUpdateEtcdOverrides(t *testing.T) {

defaultEtcdLocation := []string{"http://127.0.0.1"}
for i, test := range testCases {
actualConfig := storagebackend.Config{}
newStorageFn := func(config storagebackend.Config, codec runtime.Codec) (_ storage.Interface, err error) {
actualConfig = config
return nil, nil
}

defaultConfig := storagebackend.Config{
Prefix: options.DefaultEtcdPathPrefix,
ServerList: defaultEtcdLocation,
}
storageFactory := NewDefaultStorageFactory(defaultConfig, "", api.Codecs, NewDefaultResourceEncodingConfig(), NewResourceConfig())
storageFactory.newStorageFn = newStorageFn
storageFactory.SetEtcdLocation(test.resource, test.servers)

var err error
_, err = storageFactory.New(test.resource)
config, err := storageFactory.NewConfig(test.resource)
if err != nil {
t.Errorf("%d: unexpected error %v", i, err)
continue
}
if !reflect.DeepEqual(actualConfig.ServerList, test.servers) {
t.Errorf("%d: expected %v, got %v", i, test.servers, actualConfig.ServerList)
if !reflect.DeepEqual(config.ServerList, test.servers) {
t.Errorf("%d: expected %v, got %v", i, test.servers, config.ServerList)
continue
}

_, err = storageFactory.New(unversioned.GroupResource{Group: api.GroupName, Resource: "unlikely"})
config, err = storageFactory.NewConfig(unversioned.GroupResource{Group: api.GroupName, Resource: "unlikely"})
if err != nil {
t.Errorf("%d: unexpected error %v", i, err)
continue
}
if !reflect.DeepEqual(actualConfig.ServerList, defaultEtcdLocation) {
t.Errorf("%d: expected %v, got %v", i, defaultEtcdLocation, actualConfig.ServerList)
if !reflect.DeepEqual(config.ServerList, defaultEtcdLocation) {
t.Errorf("%d: expected %v, got %v", i, defaultEtcdLocation, config.ServerList)
continue
}

Expand Down
18 changes: 9 additions & 9 deletions pkg/master/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ import (
"k8s.io/kubernetes/pkg/registry/thirdpartyresourcedata"
thirdpartyresourcedataetcd "k8s.io/kubernetes/pkg/registry/thirdpartyresourcedata/etcd"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
etcdmetrics "k8s.io/kubernetes/pkg/storage/etcd/metrics"
etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util"
"k8s.io/kubernetes/pkg/storage/storagebackend"
"k8s.io/kubernetes/pkg/util/sets"

"github.com/golang/glog"
Expand Down Expand Up @@ -142,7 +142,7 @@ type Master struct {
serviceNodePortAllocator rangeallocation.RangeRegistry

// storage for third party objects
thirdPartyStorage storage.Interface
thirdPartyStorageConfig *storagebackend.Config
// map from api path to a tuple of (storage for the objects, APIGroup)
thirdPartyResources map[string]thirdPartyEntry
// protects the map
Expand Down Expand Up @@ -276,7 +276,7 @@ func (m *Master) InstallAPIs(c *Config) {
// TODO seems like this bit ought to be unconditional and the REST API is controlled by the config
if c.APIResourceConfigSource.ResourceEnabled(extensionsapiv1beta1.SchemeGroupVersion.WithResource("thirdpartyresources")) {
var err error
m.thirdPartyStorage, err = c.StorageFactory.New(extensions.Resource("thirdpartyresources"))
m.thirdPartyStorageConfig, err = c.StorageFactory.NewConfig(extensions.Resource("thirdpartyresources"))
if err != nil {
glog.Fatalf("Error getting third party storage: %v", err)
}
Expand Down Expand Up @@ -349,15 +349,15 @@ func (m *Master) initV1ResourcesStorage(c *Config) {
return
}

serviceStorage, err := c.StorageFactory.New(api.Resource("services"))
serviceStorageConfig, err := c.StorageFactory.NewConfig(api.Resource("services"))
if err != nil {
glog.Fatal(err.Error())
}

serviceClusterIPAllocator := ipallocator.NewAllocatorCIDRRange(serviceClusterIPRange, func(max int, rangeSpec string) allocator.Interface {
mem := allocator.NewAllocationMap(max, rangeSpec)
// TODO etcdallocator package to return a storage interface via the storageFactory
etcd := etcdallocator.NewEtcd(mem, "/ranges/serviceips", api.Resource("serviceipallocations"), serviceStorage)
etcd := etcdallocator.NewEtcd(mem, "/ranges/serviceips", api.Resource("serviceipallocations"), serviceStorageConfig)
serviceClusterIPRegistry = etcd
return etcd
})
Expand All @@ -367,7 +367,7 @@ func (m *Master) initV1ResourcesStorage(c *Config) {
serviceNodePortAllocator := portallocator.NewPortAllocatorCustom(m.ServiceNodePortRange, func(max int, rangeSpec string) allocator.Interface {
mem := allocator.NewAllocationMap(max, rangeSpec)
// TODO etcdallocator package to return a storage interface via the storageFactory
etcd := etcdallocator.NewEtcd(mem, "/ranges/servicenodeports", api.Resource("servicenodeportallocations"), serviceStorage)
etcd := etcdallocator.NewEtcd(mem, "/ranges/servicenodeports", api.Resource("servicenodeportallocations"), serviceStorageConfig)
serviceNodePortRegistry = etcd
return etcd
})
Expand Down Expand Up @@ -648,7 +648,7 @@ func (m *Master) InstallThirdPartyResource(rsrc *extensions.ThirdPartyResource)
func (m *Master) thirdpartyapi(group, kind, version, pluralResource string) *apiserver.APIGroupVersion {
resourceStorage := thirdpartyresourcedataetcd.NewREST(
generic.RESTOptions{
Storage: m.thirdPartyStorage,
StorageConfig: m.thirdPartyStorageConfig,
Decorator: generic.UndecoratedStorage,
DeleteCollectionWorkers: m.deleteCollectionWorkers,
},
Expand Down Expand Up @@ -691,13 +691,13 @@ func (m *Master) thirdpartyapi(group, kind, version, pluralResource string) *api
}

func (m *Master) GetRESTOptionsOrDie(c *Config, resource unversioned.GroupResource) generic.RESTOptions {
storage, err := c.StorageFactory.New(resource)
storageConfig, err := c.StorageFactory.NewConfig(resource)
if err != nil {
glog.Fatalf("Unable to find storage destination for %v, due to %v", resource, err.Error())
}

return generic.RESTOptions{
Storage: storage,
StorageConfig: storageConfig,
Decorator: m.StorageDecorator(),
DeleteCollectionWorkers: m.deleteCollectionWorkers,
ResourcePrefix: c.StorageFactory.ResourcePrefix(resource),
Expand Down
Loading

0 comments on commit d093809

Please sign in to comment.