Skip to content

Commit

Permalink
Change to aggregator so it calls a user apiservice via its pod IP.
Browse files Browse the repository at this point in the history
proxy_handler now uses the endpoint router to map the cluster IP to
appropriate endpoint (Pod) IP for the given resource.
Added code to allow aggregator routing to be optional.
Updated bazel build.
Fixes to cover JLiggit comments.
Added util ResourceLocation method based on Listers.
Fixed issues from verification steps.
Updated to add an interface to obfuscate some of the routing logic.
Collapsed cluster IP resolution in to the aggregator routing
implementation.
Added 2 simple unit tests for ResolveEndpoint
  • Loading branch information
cheftako committed May 26, 2017
1 parent 2ada6e6 commit ad8a83a
Show file tree
Hide file tree
Showing 13 changed files with 343 additions and 19 deletions.
1 change: 1 addition & 0 deletions cluster/gce/container-linux/configure-helper.sh
Original file line number Diff line number Diff line change
Expand Up @@ -810,6 +810,7 @@ function start-kube-apiserver {
params+=" --tls-cert-file=/etc/srv/kubernetes/server.cert"
params+=" --tls-private-key-file=/etc/srv/kubernetes/server.key"
params+=" --token-auth-file=/etc/srv/kubernetes/known_tokens.csv"
params+=" --enable-aggregator-routing=true"
if [[ -n "${KUBE_PASSWORD:-}" && -n "${KUBE_USER:-}" ]]; then
params+=" --basic-auth-file=/etc/srv/kubernetes/basic_auth.csv"
fi
Expand Down
1 change: 1 addition & 0 deletions cluster/gce/gci/configure-helper.sh
Original file line number Diff line number Diff line change
Expand Up @@ -1015,6 +1015,7 @@ function start-kube-apiserver {
params+=" --secure-port=443"
params+=" --tls-cert-file=${APISERVER_SERVER_CERT_PATH}"
params+=" --tls-private-key-file=${APISERVER_SERVER_KEY_PATH}"
params+=" --enable-aggregator-routing=true"
if [[ -e "${APISERVER_CLIENT_CERT_PATH}" ]] && [[ -e "${APISERVER_CLIENT_KEY_PATH}" ]]; then
params+=" --kubelet-client-certificate=${APISERVER_CLIENT_CERT_PATH}"
params+=" --kubelet-client-key=${APISERVER_CLIENT_KEY_PATH}"
Expand Down
10 changes: 6 additions & 4 deletions cmd/kube-apiserver/app/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,13 @@ func createAggregatorConfig(kubeAPIServerConfig genericapiserver.Config, command
return nil, err
}
}

aggregatorConfig := &aggregatorapiserver.Config{
GenericConfig: &genericConfig,
CoreAPIServerClient: client,
ProxyClientCert: certBytes,
ProxyClientKey: keyBytes,
GenericConfig: &genericConfig,
CoreAPIServerClient: client,
ProxyClientCert: certBytes,
ProxyClientKey: keyBytes,
EnableAggregatorRouting: commandOptions.EnableAggregatorRouting,
}

return aggregatorConfig, nil
Expand Down
5 changes: 5 additions & 0 deletions cmd/kube-apiserver/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ type ServerRunOptions struct {

ProxyClientCertFile string
ProxyClientKeyFile string

EnableAggregatorRouting bool
}

// NewServerRunOptions creates a new ServerRunOptions object with default parameters
Expand Down Expand Up @@ -217,4 +219,7 @@ func (s *ServerRunOptions) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&s.ProxyClientKeyFile, "proxy-client-key-file", s.ProxyClientKeyFile,
"client certificate key used to prove the identity of the aggragator or kube-apiserver when it proxies requests to a user api-server")

fs.BoolVar(&s.EnableAggregatorRouting, "enable-aggregator-routing", s.EnableAggregatorRouting,
"Turns on aggregator routing requests to endoints IP rather than cluster IP.")

}
1 change: 1 addition & 0 deletions hack/verify-flags/known-flags.txt
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ dump-logs-on-failure
duration-sec
e2e-output-dir
e2e-verify-service-account
enable-aggregator-routing
enable-controller-attach-detach
enable-custom-metrics
enable-debugging-handlers
Expand Down
13 changes: 12 additions & 1 deletion staging/src/k8s.io/apiserver/pkg/util/proxy/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -12,27 +12,38 @@ go_test(
name = "go_default_test",
srcs = [
"dial_test.go",
"proxy_test.go",
"transport_test.go",
],
library = ":go_default_library",
tags = ["automanaged"],
deps = ["//vendor/k8s.io/apimachinery/pkg/util/net:go_default_library"],
deps = [
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//vendor/k8s.io/client-go/listers/core/v1:go_default_library",
"//vendor/k8s.io/client-go/pkg/api/v1:go_default_library",
],
)

go_library(
name = "go_default_library",
srcs = [
"dial.go",
"doc.go",
"proxy.go",
"transport.go",
],
tags = ["automanaged"],
deps = [
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/golang.org/x/net/html:go_default_library",
"//vendor/golang.org/x/net/html/atom:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apimachinery/third_party/forked/golang/netutil:go_default_library",
"//vendor/k8s.io/client-go/listers/core/v1:go_default_library",
"//vendor/k8s.io/client-go/pkg/api/v1:go_default_library",
],
)
117 changes: 117 additions & 0 deletions staging/src/k8s.io/apiserver/pkg/util/proxy/proxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package proxy

import (
"fmt"
"math/rand"
"net"
"net/url"
"strconv"

"k8s.io/apimachinery/pkg/api/errors"
utilnet "k8s.io/apimachinery/pkg/util/net"
listersv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/pkg/api/v1"
)

// ResourceLocation returns a URL to which one can send traffic for the specified service.
func ResolveEndpoint(services listersv1.ServiceLister, endpoints listersv1.EndpointsLister, namespace, id string) (*url.URL, error) {
// Allow ID as "svcname", "svcname:port", or "scheme:svcname:port".
svcScheme, svcName, portStr, valid := utilnet.SplitSchemeNamePort(id)
if !valid {
return nil, errors.NewBadRequest(fmt.Sprintf("invalid service request %q", id))
}

// If a port *number* was specified, find the corresponding service port name
if portNum, err := strconv.ParseInt(portStr, 10, 64); err == nil {
svc, err := services.Services(namespace).Get(svcName)
if err != nil {
return nil, err
}
found := false
for _, svcPort := range svc.Spec.Ports {
if int64(svcPort.Port) == portNum {
// use the declared port's name
portStr = svcPort.Name
found = true
break
}
}
if !found {
return nil, errors.NewServiceUnavailable(fmt.Sprintf("no service port %d found for service %q", portNum, svcName))
}
}

eps, err := endpoints.Endpoints(namespace).Get(svcName)
if err != nil {
return nil, err
}
if len(eps.Subsets) == 0 {
return nil, errors.NewServiceUnavailable(fmt.Sprintf("no endpoints available for service %q", svcName))
}
// Pick a random Subset to start searching from.
ssSeed := rand.Intn(len(eps.Subsets))
// Find a Subset that has the port.
for ssi := 0; ssi < len(eps.Subsets); ssi++ {
ss := &eps.Subsets[(ssSeed+ssi)%len(eps.Subsets)]
if len(ss.Addresses) == 0 {
continue
}
for i := range ss.Ports {
if ss.Ports[i].Name == portStr {
// Pick a random address.
ip := ss.Addresses[rand.Intn(len(ss.Addresses))].IP
port := int(ss.Ports[i].Port)
return &url.URL{
Scheme: svcScheme,
Host: net.JoinHostPort(ip, strconv.Itoa(port)),
}, nil
}
}
}
return nil, errors.NewServiceUnavailable(fmt.Sprintf("no endpoints available for service %q", id))
}

func ResolveCluster(services listersv1.ServiceLister, namespace, id string) (*url.URL, error) {
if len(id) == 0 {
return &url.URL{Scheme: "https"}, nil
}

destinationHost := id + "." + namespace + ".svc"
service, err := services.Services(namespace).Get(id)
if err != nil {
return &url.URL{
Scheme: "https",
Host: destinationHost,
}, nil
}
switch {
// use IP from a clusterIP for these service types
case service.Spec.Type == v1.ServiceTypeClusterIP,
service.Spec.Type == v1.ServiceTypeNodePort,
service.Spec.Type == v1.ServiceTypeLoadBalancer:
return &url.URL{
Scheme: "https",
Host: service.Spec.ClusterIP,
}, nil
}
return &url.URL{
Scheme: "https",
Host: destinationHost,
}, nil
}
119 changes: 119 additions & 0 deletions staging/src/k8s.io/apiserver/pkg/util/proxy/proxy_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package proxy

import (
"testing"

"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
listersv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/pkg/api/v1"
"net/http"
)

type serviceListerMock struct {
services []*v1.Service
err error
}

func (s *serviceListerMock) List(selector labels.Selector) (ret []*v1.Service, err error) {
return s.services, err
}

func (s *serviceListerMock) Services(namespace string) listersv1.ServiceNamespaceLister {
return nil
}

func (s *serviceListerMock) GetPodServices(pod *v1.Pod) ([]*v1.Service, error) {
return nil, nil
}

type endpointsListerMock struct {
endpoints []*v1.Endpoints
err error
}

func (e *endpointsListerMock) List(selector labels.Selector) (ret []*v1.Endpoints, err error) {
return e.endpoints, e.err
}

func (e *endpointsListerMock) Endpoints(namespace string) listersv1.EndpointsNamespaceLister {
return endpointsNamespaceListMock{
endpoints: e.endpoints,
err: e.err,
}
}

type endpointsNamespaceListMock struct {
endpoints []*v1.Endpoints
err error
}

func (e endpointsNamespaceListMock) List(selector labels.Selector) (ret []*v1.Endpoints, err error) {
return e.endpoints, e.err
}

func (e endpointsNamespaceListMock) Get(name string) (*v1.Endpoints, error) {
if len(e.endpoints) == 0 {
return nil, e.err
}
return e.endpoints[0], e.err
}

func TestNoEndpointNoPort(t *testing.T) {
services := &serviceListerMock{}
endpoints := &endpointsListerMock{err: errors.NewNotFound(v1.Resource("endpoints"), "dummy-svc")}
url, err := ResolveEndpoint(services, endpoints, "dummy-ns", "dummy-svc")
if url != nil {
t.Error("Should not have gotten back an URL")
}
if err == nil {
t.Error("Should have gotten an error")
}
se, ok := err.(*errors.StatusError)
if !ok {
t.Error("Should have gotten a status error not %T", err)
}
if se.ErrStatus.Code != http.StatusNotFound {
t.Error("Should have gotten a http 404 not %d", se.ErrStatus.Code)
}
}

func TestOneEndpointNoPort(t *testing.T) {
services := &serviceListerMock{}
address := v1.EndpointAddress{Hostname: "dummy-host", IP: "127.0.0.1"}
addresses := []v1.EndpointAddress{address}
port := v1.EndpointPort{Port: 443}
ports := []v1.EndpointPort{port}
endpoint := v1.EndpointSubset{Addresses: addresses, Ports: ports}
subsets := []v1.EndpointSubset{endpoint}
one := &v1.Endpoints{Subsets: subsets}
slice := []*v1.Endpoints{one}
endpoints := &endpointsListerMock{endpoints: slice}
url, err := ResolveEndpoint(services, endpoints, "dummy-ns", "dummy-svc")
if err != nil {
t.Errorf("Should not have gotten error %v", err)
}
if url == nil {
t.Error("Should not have gotten back an URL")
}
if url.Host != "127.0.0.1:443" {
t.Error("Should have gotten back a host of dummy-host not %s", url.Host)
}

}
1 change: 1 addition & 0 deletions staging/src/k8s.io/kube-aggregator/pkg/apiserver/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ go_library(
"//vendor/k8s.io/apiserver/pkg/registry/rest:go_default_library",
"//vendor/k8s.io/apiserver/pkg/server:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/proxy:go_default_library",
"//vendor/k8s.io/client-go/informers:go_default_library",
"//vendor/k8s.io/client-go/informers/core/v1:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
Expand Down
Loading

0 comments on commit ad8a83a

Please sign in to comment.