Skip to content

Commit

Permalink
Use KubeletPort reported in NodeStatus instead of cluster-wide master…
Browse files Browse the repository at this point in the history
… config, take 2.
  • Loading branch information
gmarek committed Dec 2, 2015
1 parent 11c878e commit 459131f
Show file tree
Hide file tree
Showing 22 changed files with 191 additions and 104 deletions.
2 changes: 1 addition & 1 deletion cmd/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ var (

type fakeKubeletClient struct{}

func (fakeKubeletClient) GetConnectionInfo(host string) (string, uint, http.RoundTripper, error) {
func (fakeKubeletClient) GetConnectionInfo(ctx api.Context, nodeName string) (string, uint, http.RoundTripper, error) {
return "", 0, nil, errors.New("Not Implemented")
}

Expand Down
8 changes: 5 additions & 3 deletions cmd/kube-apiserver/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"k8s.io/kubernetes/pkg/capabilities"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/cloudprovider"
kubeletclient "k8s.io/kubernetes/pkg/kubelet/client"
"k8s.io/kubernetes/pkg/master"
"k8s.io/kubernetes/pkg/master/ports"
"k8s.io/kubernetes/pkg/storage"
Expand Down Expand Up @@ -107,7 +108,7 @@ type APIServer struct {
EnableLogsSupport bool
MasterServiceNamespace string
RuntimeConfig util.ConfigurationMap
KubeletConfig client.KubeletConfig
KubeletConfig kubeletclient.KubeletClientConfig
ClusterName string
EnableProfiling bool
EnableWatchCache bool
Expand Down Expand Up @@ -140,7 +141,7 @@ func NewAPIServer() *APIServer {
StorageVersions: latest.AllPreferredGroupVersions(),

RuntimeConfig: make(util.ConfigurationMap),
KubeletConfig: client.KubeletConfig{
KubeletConfig: kubeletclient.KubeletClientConfig{
Port: ports.KubeletPort,
EnableHttps: true,
HTTPTimeout: time.Duration(5) * time.Second,
Expand Down Expand Up @@ -259,6 +260,7 @@ func (s *APIServer) AddFlags(fs *pflag.FlagSet) {
// Kubelet related flags:
fs.BoolVar(&s.KubeletConfig.EnableHttps, "kubelet-https", s.KubeletConfig.EnableHttps, "Use https for kubelet connections")
fs.UintVar(&s.KubeletConfig.Port, "kubelet-port", s.KubeletConfig.Port, "Kubelet port")
fs.MarkDeprecated("kubelet-port", "kubelet-port is deprecated and will be removed")
fs.DurationVar(&s.KubeletConfig.HTTPTimeout, "kubelet-timeout", s.KubeletConfig.HTTPTimeout, "Timeout for kubelet operations")
fs.StringVar(&s.KubeletConfig.CertFile, "kubelet-client-certificate", s.KubeletConfig.CertFile, "Path to a client cert file for TLS.")
fs.StringVar(&s.KubeletConfig.KeyFile, "kubelet-client-key", s.KubeletConfig.KeyFile, "Path to a client key file for TLS.")
Expand Down Expand Up @@ -427,7 +429,7 @@ func (s *APIServer) Run(_ []string) error {
// Proxying to pods and services is IP-based... don't expect to be able to verify the hostname
proxyTLSClientConfig := &tls.Config{InsecureSkipVerify: true}

kubeletClient, err := client.NewKubeletClient(&s.KubeletConfig)
kubeletClient, err := kubeletclient.NewStaticKubeletClient(&s.KubeletConfig)
if err != nil {
glog.Fatalf("Failure to start kubelet client: %v", err)
}
Expand Down
4 changes: 0 additions & 4 deletions docs/admin/kube-apiserver.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ kube-apiserver
--kubelet-client-certificate="": Path to a client cert file for TLS.
--kubelet-client-key="": Path to a client key file for TLS.
--kubelet-https[=true]: Use https for kubelet connections
--kubelet-port=10250: Kubelet port
--kubelet-timeout=5s: Timeout for kubelet operations
--kubernetes-service-node-port=0: If non-zero, the Kubernetes master service (which apiserver creates/maintains) will be of type NodePort, using this as the value of the port. If zero, the Kubernetes master service will be of type ClusterIP.
--log-flush-frequency=5s: Maximum number of seconds between log flushes
Expand Down Expand Up @@ -108,9 +107,6 @@ kube-apiserver
--watch-cache[=true]: Enable watch caching in the apiserver
```

###### Auto generated by spf13/cobra on 30-Oct-2015


<!-- BEGIN MUNGE: GENERATED_ANALYTICS -->
[![Analytics](https://kubernetes-site.appspot.com/UA-36037335-10/GitHub/docs/admin/kube-apiserver.md?pixel)]()
<!-- END MUNGE: GENERATED_ANALYTICS -->
19 changes: 0 additions & 19 deletions pkg/client/unversioned/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"reflect"
gruntime "runtime"
"strings"
"time"

"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
Expand Down Expand Up @@ -93,24 +92,6 @@ type Config struct {
Burst int
}

type KubeletConfig struct {
// ToDo: Add support for different kubelet instances exposing different ports
Port uint
EnableHttps bool

// TLSClientConfig contains settings to enable transport layer security
TLSClientConfig

// Server requires Bearer authentication
BearerToken string

// HTTPTimeout is used by the client to timeout http requests to Kubelet.
HTTPTimeout time.Duration

// Dial is a custom dialer used for the client
Dial func(net, addr string) (net.Conn, error)
}

// TLSClientConfig contains settings to enable transport layer security
type TLSClientConfig struct {
// Server requires TLS client certificate authentication
Expand Down
19 changes: 0 additions & 19 deletions pkg/client/unversioned/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,22 +64,3 @@ func (c *Config) transportConfig() *transport.Config {
BearerToken: c.BearerToken,
}
}

// transportConfig converts a client config to an appropriate transport config.
func (c *KubeletConfig) transportConfig() *transport.Config {
cfg := &transport.Config{
TLS: transport.TLSConfig{
CAFile: c.CAFile,
CAData: c.CAData,
CertFile: c.CertFile,
CertData: c.CertData,
KeyFile: c.KeyFile,
KeyData: c.KeyData,
},
BearerToken: c.BearerToken,
}
if c.EnableHttps && !cfg.HasCA() {
cfg.TLS.Insecure = true
}
return cfg
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,32 +14,54 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package unversioned
package client

import (
"errors"
"net"
"net/http"
"time"

"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/transport"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/util"
)

type KubeletClientConfig struct {
// Default port - used if no information about Kubelet port can be found in Node.NodeStatus.DaemonEndpoints.
Port uint
EnableHttps bool

// TLSClientConfig contains settings to enable transport layer security
client.TLSClientConfig

// Server requires Bearer authentication
BearerToken string

// HTTPTimeout is used by the client to timeout http requests to Kubelet.
HTTPTimeout time.Duration

// Dial is a custom dialer used for the client
Dial func(net, addr string) (net.Conn, error)
}

// KubeletClient is an interface for all kubelet functionality
type KubeletClient interface {
ConnectionInfoGetter
}

type ConnectionInfoGetter interface {
GetConnectionInfo(host string) (scheme string, port uint, transport http.RoundTripper, err error)
GetConnectionInfo(ctx api.Context, nodeName string) (scheme string, port uint, transport http.RoundTripper, err error)
}

// HTTPKubeletClient is the default implementation of KubeletHealthchecker, accesses the kubelet over HTTP.
type HTTPKubeletClient struct {
Client *http.Client
Config *KubeletConfig
Config *KubeletClientConfig
}

func MakeTransport(config *KubeletConfig) (http.RoundTripper, error) {
func MakeTransport(config *KubeletClientConfig) (http.RoundTripper, error) {
tlsConfig, err := transport.TLSConfigFor(config.transportConfig())
if err != nil {
return nil, err
Expand All @@ -57,7 +79,7 @@ func MakeTransport(config *KubeletConfig) (http.RoundTripper, error) {
}

// TODO: this structure is questionable, it should be using client.Config and overriding defaults.
func NewKubeletClient(config *KubeletConfig) (KubeletClient, error) {
func NewStaticKubeletClient(config *KubeletClientConfig) (KubeletClient, error) {
transport, err := MakeTransport(config)
if err != nil {
return nil, err
Expand All @@ -72,7 +94,8 @@ func NewKubeletClient(config *KubeletConfig) (KubeletClient, error) {
}, nil
}

func (c *HTTPKubeletClient) GetConnectionInfo(host string) (string, uint, http.RoundTripper, error) {
// In default HTTPKubeletClient ctx is unused.
func (c *HTTPKubeletClient) GetConnectionInfo(ctx api.Context, nodeName string) (string, uint, http.RoundTripper, error) {
scheme := "http"
if c.Config.EnableHttps {
scheme = "https"
Expand All @@ -85,6 +108,25 @@ func (c *HTTPKubeletClient) GetConnectionInfo(host string) (string, uint, http.R
// no kubelets.
type FakeKubeletClient struct{}

func (c FakeKubeletClient) GetConnectionInfo(host string) (string, uint, http.RoundTripper, error) {
func (c FakeKubeletClient) GetConnectionInfo(ctx api.Context, nodeName string) (string, uint, http.RoundTripper, error) {
return "", 0, nil, errors.New("Not Implemented")
}

// transportConfig converts a client config to an appropriate transport config.
func (c *KubeletClientConfig) transportConfig() *transport.Config {
cfg := &transport.Config{
TLS: transport.TLSConfig{
CAFile: c.CAFile,
CAData: c.CAData,
CertFile: c.CertFile,
CertData: c.CertData,
KeyFile: c.KeyFile,
KeyData: c.KeyData,
},
BearerToken: c.BearerToken,
}
if c.EnableHttps && !cfg.HasCA() {
cfg.TLS.Insecure = true
}
return cfg
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package unversioned
package client

import (
"encoding/json"
"net/http/httptest"
"net/url"
"testing"

client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/probe"
"k8s.io/kubernetes/pkg/util"
)
Expand All @@ -40,18 +41,17 @@ func TestHTTPKubeletClient(t *testing.T) {
testServer := httptest.NewServer(&fakeHandler)
defer testServer.Close()

_, err = url.Parse(testServer.URL)
if err != nil {
if _, err := url.Parse(testServer.URL); err != nil {
t.Errorf("unexpected error: %v", err)
}
}

func TestNewKubeletClient(t *testing.T) {
config := &KubeletConfig{
config := &KubeletClientConfig{
EnableHttps: false,
}

client, err := NewKubeletClient(config)
client, err := NewStaticKubeletClient(config)
if err != nil {
t.Errorf("Error while trying to create a client: %v", err)
}
Expand All @@ -61,17 +61,17 @@ func TestNewKubeletClient(t *testing.T) {
}

func TestNewKubeletClientTLSInvalid(t *testing.T) {
config := &KubeletConfig{
config := &KubeletClientConfig{
EnableHttps: true,
//Invalid certificate and key path
TLSClientConfig: TLSClientConfig{
CertFile: "../testdata/mycertinvalid.cer",
KeyFile: "../testdata/mycertinvalid.key",
CAFile: "../testdata/myCA.cer",
TLSClientConfig: client.TLSClientConfig{
CertFile: "../../client/testdata/mycertinvalid.cer",
KeyFile: "../../client/testdata/mycertinvalid.key",
CAFile: "../../client/testdata/myCA.cer",
},
}

client, err := NewKubeletClient(config)
client, err := NewStaticKubeletClient(config)
if err == nil {
t.Errorf("Expected an error")
}
Expand All @@ -81,18 +81,18 @@ func TestNewKubeletClientTLSInvalid(t *testing.T) {
}

func TestNewKubeletClientTLSValid(t *testing.T) {
config := &KubeletConfig{
config := &KubeletClientConfig{
EnableHttps: true,
TLSClientConfig: TLSClientConfig{
CertFile: "../testdata/mycertvalid.cer",
TLSClientConfig: client.TLSClientConfig{
CertFile: "../../client/testdata/mycertvalid.cer",
// TLS Configuration, only applies if EnableHttps is true.
KeyFile: "../testdata/mycertvalid.key",
KeyFile: "../../client/testdata/mycertvalid.key",
// TLS Configuration, only applies if EnableHttps is true.
CAFile: "../testdata/myCA.cer",
CAFile: "../../client/testdata/myCA.cer",
},
}

client, err := NewKubeletClient(config)
client, err := NewStaticKubeletClient(config)
if err != nil {
t.Errorf("Not expecting an error #%v", err)
}
Expand Down
12 changes: 9 additions & 3 deletions pkg/master/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ import (
"k8s.io/kubernetes/pkg/auth/authenticator"
"k8s.io/kubernetes/pkg/auth/authorizer"
"k8s.io/kubernetes/pkg/auth/handlers"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/healthz"
kubeletclient "k8s.io/kubernetes/pkg/kubelet/client"
"k8s.io/kubernetes/pkg/master/ports"
"k8s.io/kubernetes/pkg/registry/componentstatus"
controlleretcd "k8s.io/kubernetes/pkg/registry/controller/etcd"
Expand Down Expand Up @@ -182,7 +182,7 @@ type Config struct {
// StorageVersions is a map between groups and their storage versions
StorageVersions map[string]string
EventTTL time.Duration
KubeletClient client.KubeletClient
KubeletClient kubeletclient.KubeletClient
// allow downstream consumers to disable the core controller loops
EnableCoreControllers bool
EnableLogsSupport bool
Expand Down Expand Up @@ -533,7 +533,6 @@ func (m *Master) init(c *Config) {

storageDecorator := c.storageDecorator()
dbClient := func(resource string) storage.Interface { return c.StorageDestinations.get("", resource) }
podStorage := podetcd.NewStorage(dbClient("pods"), storageDecorator, c.KubeletClient, m.proxyTransport)

podTemplateStorage := podtemplateetcd.NewREST(dbClient("podTemplates"), storageDecorator)

Expand All @@ -555,6 +554,13 @@ func (m *Master) init(c *Config) {
nodeStorage, nodeStatusStorage := nodeetcd.NewREST(dbClient("nodes"), storageDecorator, c.KubeletClient, m.proxyTransport)
m.nodeRegistry = node.NewRegistry(nodeStorage)

podStorage := podetcd.NewStorage(
dbClient("pods"),
storageDecorator,
kubeletclient.ConnectionInfoGetter(nodeStorage),
m.proxyTransport,
)

serviceStorage := serviceetcd.NewREST(dbClient("services"), storageDecorator)
m.serviceRegistry = service.NewRegistry(serviceStorage)

Expand Down
2 changes: 1 addition & 1 deletion pkg/master/master_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import (
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/apiserver"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/kubelet/client"
"k8s.io/kubernetes/pkg/registry/endpoint"
"k8s.io/kubernetes/pkg/registry/namespace"
"k8s.io/kubernetes/pkg/registry/registrytest"
Expand Down
Loading

0 comments on commit 459131f

Please sign in to comment.