Skip to content

Commit

Permalink
Initialize API servers with negotiated serializers
Browse files Browse the repository at this point in the history
Pass down into the server initialization the necessary interface for
handling client/server content type negotiation. Add integration tests
for the negotiation.
  • Loading branch information
smarterclayton committed Jan 22, 2016
1 parent c49cd4e commit 4d127dc
Show file tree
Hide file tree
Showing 11 changed files with 65 additions and 48 deletions.
20 changes: 11 additions & 9 deletions cmd/kube-apiserver/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"k8s.io/kubernetes/pkg/genericapiserver"
kubeletclient "k8s.io/kubernetes/pkg/kubelet/client"
"k8s.io/kubernetes/pkg/master"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/serviceaccount"
"k8s.io/kubernetes/pkg/storage"
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
Expand Down Expand Up @@ -81,9 +82,9 @@ func verifyClusterIPFlags(s *options.APIServer) {
}
}

type newEtcdFunc func([]string, meta.VersionInterfacesFunc, string, string) (storage.Interface, error)
type newEtcdFunc func([]string, runtime.NegotiatedSerializer, string, string) (storage.Interface, error)

func newEtcd(etcdServerList []string, interfacesFunc meta.VersionInterfacesFunc, storageGroupVersionString, pathPrefix string) (etcdStorage storage.Interface, err error) {
func newEtcd(etcdServerList []string, ns runtime.NegotiatedSerializer, storageGroupVersionString, pathPrefix string) (etcdStorage storage.Interface, err error) {
if storageGroupVersionString == "" {
return etcdStorage, fmt.Errorf("storageVersion is required to create a etcd storage")
}
Expand All @@ -95,11 +96,11 @@ func newEtcd(etcdServerList []string, interfacesFunc meta.VersionInterfacesFunc,
var storageConfig etcdstorage.EtcdConfig
storageConfig.ServerList = etcdServerList
storageConfig.Prefix = pathPrefix
versionedInterface, err := interfacesFunc(storageVersion)
if err != nil {
return nil, err
s, ok := ns.SerializerForMediaType("application/json", nil)
if !ok {
return nil, fmt.Errorf("unable to find serializer for JSON")
}
storageConfig.Codec = versionedInterface.Codec
storageConfig.Codec = runtime.NewCodec(ns.EncoderForVersion(s, storageVersion), ns.DecoderToVersion(s, unversioned.GroupVersion{Group: storageVersion.Group, Version: runtime.APIVersionInternal}))
return storageConfig.NewStorage()
}

Expand Down Expand Up @@ -148,7 +149,7 @@ func updateEtcdOverrides(overrides []string, storageVersions map[string]string,
}

servers := strings.Split(tokens[1], ";")
etcdOverrideStorage, err := newEtcdFn(servers, apigroup.InterfacesFor, storageVersions[apigroup.GroupVersion.Group], prefix)
etcdOverrideStorage, err := newEtcdFn(servers, api.Codecs, storageVersions[apigroup.GroupVersion.Group], prefix)
if err != nil {
glog.Fatalf("Invalid storage version or misconfigured etcd for %s: %v", tokens[0], err)
}
Expand Down Expand Up @@ -259,7 +260,7 @@ func Run(s *options.APIServer) error {
if _, found := storageVersions[legacyV1Group.GroupVersion.Group]; !found {
glog.Fatalf("Couldn't find the storage version for group: %q in storageVersions: %v", legacyV1Group.GroupVersion.Group, storageVersions)
}
etcdStorage, err := newEtcd(s.EtcdServerList, legacyV1Group.InterfacesFor, storageVersions[legacyV1Group.GroupVersion.Group], s.EtcdPathPrefix)
etcdStorage, err := newEtcd(s.EtcdServerList, api.Codecs, storageVersions[legacyV1Group.GroupVersion.Group], s.EtcdPathPrefix)
if err != nil {
glog.Fatalf("Invalid storage version or misconfigured etcd: %v", err)
}
Expand All @@ -273,7 +274,7 @@ func Run(s *options.APIServer) error {
if _, found := storageVersions[expGroup.GroupVersion.Group]; !found {
glog.Fatalf("Couldn't find the storage version for group: %q in storageVersions: %v", expGroup.GroupVersion.Group, storageVersions)
}
expEtcdStorage, err := newEtcd(s.EtcdServerList, expGroup.InterfacesFor, storageVersions[expGroup.GroupVersion.Group], s.EtcdPathPrefix)
expEtcdStorage, err := newEtcd(s.EtcdServerList, api.Codecs, storageVersions[expGroup.GroupVersion.Group], s.EtcdPathPrefix)
if err != nil {
glog.Fatalf("Invalid extensions storage version or misconfigured etcd: %v", err)
}
Expand Down Expand Up @@ -380,6 +381,7 @@ func Run(s *options.APIServer) error {
ProxyTLSClientConfig: proxyTLSClientConfig,
ServiceNodePortRange: s.ServiceNodePortRange,
KubernetesServiceNodePort: s.KubernetesServiceNodePort,
Serializer: api.Codecs,
},
EnableCoreControllers: true,
EventTTL: s.EventTTL,
Expand Down
4 changes: 2 additions & 2 deletions cmd/kube-apiserver/app/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import (

"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/meta"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/genericapiserver"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
)

Expand Down Expand Up @@ -133,7 +133,7 @@ func TestUpdateEtcdOverrides(t *testing.T) {
}

for _, test := range testCases {
newEtcd := func(serverList []string, _ meta.VersionInterfacesFunc, _, _ string) (storage.Interface, error) {
newEtcd := func(serverList []string, _ runtime.NegotiatedSerializer, _, _ string) (storage.Interface, error) {
if !reflect.DeepEqual(test.servers, serverList) {
t.Errorf("unexpected server list, expected: %#v, got: %#v", test.servers, serverList)
}
Expand Down
6 changes: 0 additions & 6 deletions pkg/apis/extensions/install/install_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,9 @@ func TestRESTMapper(t *testing.T) {
t.Errorf("incorrect groupVersion: %v", mapping)
}

<<<<<<< HEAD
interfaces, _ := registered.GroupOrDie(extensions.GroupName).InterfacesFor(version)
if mapping.Codec != interfaces.Codec {
t.Errorf("unexpected codec: %#v, expected: %#v", mapping, interfaces)
=======
interfaces, _ := latest.GroupOrDie(extensions.GroupName).InterfacesFor(version)
if mapping.ObjectConvertor != interfaces.ObjectConvertor {
t.Errorf("unexpected: %#v, expected: %#v", mapping, interfaces)
>>>>>>> e776ada... Switch API objects to not register per version codecs
}

rc := &extensions.HorizontalPodAutoscaler{ObjectMeta: api.ObjectMeta{Name: "foo"}}
Expand Down
1 change: 0 additions & 1 deletion pkg/conversion/unversioned_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

// TODO: Ideally we should create the necessary package structure in e.g.,
// pkg/conversion/test/... instead of importing pkg/api here.
"k8s.io/kubernetes/pkg/api/latest"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/extensions"
Expand Down
31 changes: 16 additions & 15 deletions pkg/genericapiserver/genericapiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"k8s.io/kubernetes/pkg/registry/generic"
genericetcd "k8s.io/kubernetes/pkg/registry/generic/etcd"
ipallocator "k8s.io/kubernetes/pkg/registry/service/ipallocator"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
"k8s.io/kubernetes/pkg/ui"
"k8s.io/kubernetes/pkg/util"
Expand Down Expand Up @@ -188,6 +189,9 @@ type Config struct {
// Map requests to contexts. Exported so downstream consumers can provider their own mappers
RequestContextMapper api.RequestContextMapper

// Required, the interface for serializing and converting objects to and from the wire
Serializer runtime.NegotiatedSerializer

// If specified, all web services will be registered into this container
RestfulContainer *restful.Container

Expand Down Expand Up @@ -394,6 +398,7 @@ func New(c *Config) *GenericAPIServer {
AdmissionControl: c.AdmissionControl,
ApiGroupVersionOverrides: c.APIGroupVersionOverrides,
RequestContextMapper: c.RequestContextMapper,
Serializer: c.Serializer,

cacheTimeout: c.CacheTimeout,
MinRequestTimeout: time.Duration(c.MinRequestTimeout) * time.Second,
Expand All @@ -418,7 +423,7 @@ func New(c *Config) *GenericAPIServer {
} else {
mux := http.NewServeMux()
s.mux = mux
handlerContainer = NewHandlerContainer(mux)
handlerContainer = NewHandlerContainer(mux, c.Serializer)
}
s.HandlerContainer = handlerContainer
// Use CurlyRouter to be able to use regular expressions in paths. Regular expressions are required in paths for example for proxy (where the path is proxy/{kind}/{name}/{*})
Expand Down Expand Up @@ -457,10 +462,10 @@ func (s *GenericAPIServer) HandleFuncWithAuth(pattern string, handler func(http.
s.MuxHelper.HandleFunc(pattern, handler)
}

func NewHandlerContainer(mux *http.ServeMux) *restful.Container {
func NewHandlerContainer(mux *http.ServeMux, s runtime.NegotiatedSerializer) *restful.Container {
container := restful.NewContainer()
container.ServeMux = mux
apiserver.InstallRecoverHandler(container)
apiserver.InstallRecoverHandler(s, container)
return container
}

Expand Down Expand Up @@ -667,7 +672,7 @@ func (s *GenericAPIServer) installAPIGroup(apiGroupInfo *APIGroupInfo) error {
// Install the version handler.
if apiGroupInfo.IsLegacyGroup {
// Add a handler at /api to enumerate the supported api versions.
apiserver.AddApiWebService(s.HandlerContainer, apiPrefix, apiVersions)
apiserver.AddApiWebService(s.Serializer, s.HandlerContainer, apiPrefix, apiVersions)
} else {
// Add a handler at /apis/<groupName> to enumerate all versions supported by this group.
apiVersionsForDiscovery := []unversioned.GroupVersionForDiscovery{}
Expand All @@ -686,9 +691,9 @@ func (s *GenericAPIServer) installAPIGroup(apiGroupInfo *APIGroupInfo) error {
Versions: apiVersionsForDiscovery,
PreferredVersion: preferedVersionForDiscovery,
}
apiserver.AddGroupWebService(s.HandlerContainer, apiPrefix+"/"+apiGroup.Name, apiGroup)
apiserver.AddGroupWebService(s.Serializer, s.HandlerContainer, apiPrefix+"/"+apiGroup.Name, apiGroup)
}
apiserver.InstallServiceErrorHandler(s.HandlerContainer, s.NewRequestInfoResolver(), apiVersions)
apiserver.InstallServiceErrorHandler(s.Serializer, s.HandlerContainer, s.NewRequestInfoResolver(), apiVersions)
return nil
}

Expand All @@ -700,25 +705,21 @@ func (s *GenericAPIServer) getAPIGroupVersion(apiGroupInfo *APIGroupInfo, groupV
version, err := s.newAPIGroupVersion(apiGroupInfo.GroupMeta, groupVersion)
version.Root = apiPrefix
version.Storage = storage
version.ParameterCodec = apiGroupInfo.ParameterCodec
version.Serializer = apiGroupInfo.NegotiatedSerializer
version.Creater = apiGroupInfo.Scheme
version.Convertor = apiGroupInfo.Scheme
version.Typer = apiGroupInfo.Scheme
return version, err
}

func (s *GenericAPIServer) newAPIGroupVersion(groupMeta apimachinery.GroupMeta, groupVersion unversioned.GroupVersion) (*apiserver.APIGroupVersion, error) {
versionInterface, err := groupMeta.InterfacesFor(groupVersion)
if err != nil {
return nil, err
}
return &apiserver.APIGroupVersion{
RequestInfoResolver: s.NewRequestInfoResolver(),

Creater: api.Scheme,
Convertor: api.Scheme,
Typer: api.Scheme,

GroupVersion: groupVersion,
Linker: groupMeta.SelfLinker,
Mapper: groupMeta.RESTMapper,
Codec: versionInterface.Codec,

Admit: s.AdmissionControl,
Context: s.RequestContextMapper,
Expand Down
10 changes: 7 additions & 3 deletions pkg/genericapiserver/genericapiserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,16 @@ func TestInstallAPIGroups(t *testing.T) {
GroupMeta: *apiGroupMeta,
VersionedResourcesStorageMap: map[string]map[string]rest.Storage{},
IsLegacyGroup: true,
ParameterCodec: api.ParameterCodec,
NegotiatedSerializer: api.Codecs,
},
{
// extensions group version
GroupMeta: *extensionsGroupMeta,
VersionedResourcesStorageMap: map[string]map[string]rest.Storage{},
OptionsExternalVersion: &apiGroupMeta.GroupVersion,
ParameterCodec: api.ParameterCodec,
NegotiatedSerializer: api.Codecs,
},
}
s.InstallAPIGroups(apiGroupsInfo)
Expand Down Expand Up @@ -140,7 +144,7 @@ func TestInstallAPIGroups(t *testing.T) {
func TestNewHandlerContainer(t *testing.T) {
assert := assert.New(t)
mux := http.NewServeMux()
container := NewHandlerContainer(mux)
container := NewHandlerContainer(mux, nil)
assert.Equal(mux, container.ServeMux, "ServerMux's do not match")
}

Expand Down Expand Up @@ -179,7 +183,7 @@ func TestInstallSwaggerAPI(t *testing.T) {
defer etcdserver.Terminate(t)

mux := http.NewServeMux()
server.HandlerContainer = NewHandlerContainer(mux)
server.HandlerContainer = NewHandlerContainer(mux, nil)

// Ensure swagger isn't installed without the call
ws := server.HandlerContainer.RegisteredWebServices()
Expand All @@ -198,7 +202,7 @@ func TestInstallSwaggerAPI(t *testing.T) {

// Empty externalHost verification
mux = http.NewServeMux()
server.HandlerContainer = NewHandlerContainer(mux)
server.HandlerContainer = NewHandlerContainer(mux, nil)
server.externalHost = ""
server.ClusterIP = net.IPv4(10, 10, 10, 10)
server.PublicReadWritePort = 1010
Expand Down
23 changes: 17 additions & 6 deletions pkg/master/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ import (
thirdpartyresourceetcd "k8s.io/kubernetes/pkg/registry/thirdpartyresource/etcd"
"k8s.io/kubernetes/pkg/registry/thirdpartyresourcedata"
thirdpartyresourcedataetcd "k8s.io/kubernetes/pkg/registry/thirdpartyresourcedata/etcd"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util"
"k8s.io/kubernetes/pkg/util"
Expand Down Expand Up @@ -183,7 +184,10 @@ func (m *Master) InstallAPIs(c *Config) {
VersionedResourcesStorageMap: map[string]map[string]rest.Storage{
"v1": m.v1ResourcesStorage,
},
IsLegacyGroup: true,
IsLegacyGroup: true,
Scheme: api.Scheme,
ParameterCodec: api.ParameterCodec,
NegotiatedSerializer: api.Codecs,
}
apiGroupsInfo = append(apiGroupsInfo, apiGroupInfo)
}
Expand Down Expand Up @@ -217,6 +221,9 @@ func (m *Master) InstallAPIs(c *Config) {
"v1beta1": extensionResources,
},
OptionsExternalVersion: &registered.GroupOrDie(api.GroupName).GroupVersion,
Scheme: api.Scheme,
ParameterCodec: api.ParameterCodec,
NegotiatedSerializer: api.Codecs,
}
apiGroupsInfo = append(apiGroupsInfo, apiGroupInfo)

Expand All @@ -237,7 +244,7 @@ func (m *Master) InstallAPIs(c *Config) {

// This should be done after all groups are registered
// TODO: replace the hardcoded "apis".
apiserver.AddApisWebService(m.HandlerContainer, "/apis", func() []unversioned.APIGroup {
apiserver.AddApisWebService(m.Serializer, m.HandlerContainer, "/apis", func() []unversioned.APIGroup {
groups := []unversioned.APIGroup{}
for ix := range allGroups {
groups = append(groups, allGroups[ix])
Expand Down Expand Up @@ -517,9 +524,9 @@ func (m *Master) InstallThirdPartyResource(rsrc *extensions.ThirdPartyResource)
Name: group,
Versions: []unversioned.GroupVersionForDiscovery{groupVersion},
}
apiserver.AddGroupWebService(m.HandlerContainer, path, apiGroup)
apiserver.AddGroupWebService(api.Codecs, m.HandlerContainer, path, apiGroup)
m.addThirdPartyResourceStorage(path, thirdparty.Storage[strings.ToLower(kind)+"s"].(*thirdpartyresourcedataetcd.REST), apiGroup)
apiserver.InstallServiceErrorHandler(m.HandlerContainer, m.NewRequestInfoResolver(), []string{thirdparty.GroupVersion.String()})
apiserver.InstallServiceErrorHandler(api.Codecs, m.HandlerContainer, m.NewRequestInfoResolver(), []string{thirdparty.GroupVersion.String()})
return nil
}

Expand All @@ -533,22 +540,26 @@ func (m *Master) thirdpartyapi(group, kind, version string) *apiserver.APIGroupV
}

optionsExternalVersion := registered.GroupOrDie(api.GroupName).GroupVersion
internalVersion := unversioned.GroupVersion{Group: group, Version: runtime.APIVersionInternal}
externalVersion := unversioned.GroupVersion{Group: group, Version: version}

return &apiserver.APIGroupVersion{
Root: apiRoot,
GroupVersion: unversioned.GroupVersion{Group: group, Version: version},
GroupVersion: externalVersion,
RequestInfoResolver: m.NewRequestInfoResolver(),

Creater: thirdpartyresourcedata.NewObjectCreator(group, version, api.Scheme),
Convertor: api.Scheme,
Typer: api.Scheme,

Mapper: thirdpartyresourcedata.NewMapper(registered.GroupOrDie(extensions.GroupName).RESTMapper, kind, version, group),
Codec: thirdpartyresourcedata.NewCodec(registered.GroupOrDie(extensions.GroupName).Codec, kind),
Linker: registered.GroupOrDie(extensions.GroupName).SelfLinker,
Storage: storage,
OptionsExternalVersion: &optionsExternalVersion,

Serializer: thirdpartyresourcedata.NewNegotiatedSerializer(api.Codecs, kind, externalVersion, internalVersion),
ParameterCodec: api.ParameterCodec,

Context: m.RequestContextMapper,

MinRequestTimeout: m.MinRequestTimeout,
Expand Down
2 changes: 1 addition & 1 deletion pkg/master/master_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func setUp(t *testing.T) (Master, *etcdtesting.EtcdTestServer, Config, *assert.A
func newMaster(t *testing.T) (*Master, *etcdtesting.EtcdTestServer, Config, *assert.Assertions) {
_, etcdserver, config, assert := setUp(t)

config.Serializer = api.Codecs
config.KubeletClient = client.FakeKubeletClient{}

config.ProxyDialer = func(network, addr string) (net.Conn, error) { return nil, nil }
Expand Down Expand Up @@ -496,7 +497,6 @@ func decodeResponse(resp *http.Response, obj interface{}) error {
if err != nil {
return err
}

if err := json.Unmarshal(data, obj); err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions test/integration/etcd_tools_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestSet(t *testing.T) {
if err != nil || resp.Node == nil {
t.Fatalf("unexpected error: %v %v", err, resp)
}
decoded, err := testapi.Default.Codec().Decode([]byte(resp.Node.Value))
decoded, err := runtime.Decode(testapi.Default.Codec(), []byte(resp.Node.Value))
if err != nil {
t.Fatalf("unexpected response: %#v", resp.Node)
}
Expand All @@ -67,7 +67,7 @@ func TestGet(t *testing.T) {
ctx := context.TODO()
framework.WithEtcdKey(func(key string) {
testObject := api.ServiceAccount{ObjectMeta: api.ObjectMeta{Name: "foo"}}
coded, err := testapi.Default.Codec().Encode(&testObject)
coded, err := runtime.Encode(testapi.Default.Codec(), &testObject)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
Expand Down
Loading

0 comments on commit 4d127dc

Please sign in to comment.