Skip to content

Commit

Permalink
Merge pull request kubernetes#2126 from brendandburns/validatez
Browse files Browse the repository at this point in the history
Add etcd to the list of services to validate.
  • Loading branch information
smarterclayton committed Nov 10, 2014
2 parents 3cf17c0 + d7dc20f commit 21a6e96
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 34 deletions.
22 changes: 9 additions & 13 deletions pkg/apiserver/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,15 @@ func NewAPIGroup(storage map[string]RESTStorage, codec runtime.Codec, canonicalP
}}
}

func InstallValidator(mux Mux, servers map[string]Server) {
validator, err := NewValidator(servers)
if err != nil {
glog.Errorf("failed to set up validator: %v", err)
return
}
mux.Handle("/validate", validator)
}

// InstallREST registers the REST handlers (storage, watch, and operations) into a mux.
// It is expected that the provided prefix will serve all operations. Path MUST NOT end
// in a slash.
Expand All @@ -99,16 +108,6 @@ func (g *APIGroup) InstallREST(mux Mux, paths ...string) {
redirectHandler := &RedirectHandler{g.handler.storage, g.handler.codec}
opHandler := &OperationHandler{g.handler.ops, g.handler.codec}

servers := map[string]string{
"controller-manager": "127.0.0.1:10252",
"scheduler": "127.0.0.1:10251",
// TODO: Add minion health checks here too.
}
validator, err := NewValidator(servers)
if err != nil {
glog.Errorf("failed to set up validator: %v", err)
validator = nil
}
for _, prefix := range paths {
prefix = strings.TrimRight(prefix, "/")
proxyHandler := &ProxyHandler{prefix + "/proxy/", g.handler.storage, g.handler.codec}
Expand All @@ -119,9 +118,6 @@ func (g *APIGroup) InstallREST(mux Mux, paths ...string) {
mux.Handle(prefix+"/redirect/", http.StripPrefix(prefix+"/redirect/", redirectHandler))
mux.Handle(prefix+"/operations", http.StripPrefix(prefix+"/operations", opHandler))
mux.Handle(prefix+"/operations/", http.StripPrefix(prefix+"/operations/", opHandler))
if validator != nil {
mux.Handle(prefix+"/validate", validator)
}
}
}

Expand Down
38 changes: 19 additions & 19 deletions pkg/apiserver/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net"
"net/http"
"strconv"
Expand All @@ -33,20 +32,21 @@ type httpGet interface {
Get(url string) (*http.Response, error)
}

type server struct {
addr string
port int
type Server struct {
Addr string
Port int
Path string
}

// validator is responsible for validating the cluster and serving
type validator struct {
// a list of servers to health check
servers map[string]server
servers map[string]Server
client httpGet
}

func (s *server) check(client httpGet) (health.Status, string, error) {
resp, err := client.Get("http://" + net.JoinHostPort(s.addr, strconv.Itoa(s.port)) + "/healthz")
func (s *Server) check(client httpGet) (health.Status, string, error) {
resp, err := client.Get("http://" + net.JoinHostPort(s.Addr, strconv.Itoa(s.Port)) + s.Path)
if err != nil {
return health.Unknown, "", err
}
Expand Down Expand Up @@ -82,8 +82,7 @@ func (v *validator) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
reply = append(reply, ServerStatus{name, status.String(), status, msg, errorMsg})
}
data, err := json.Marshal(reply)
log.Printf("FOO: %s", string(data))
data, err := json.MarshalIndent(reply, "", " ")
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
Expand All @@ -94,8 +93,15 @@ func (v *validator) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}

// NewValidator creates a validator for a set of servers.
func NewValidator(servers map[string]string) (http.Handler, error) {
result := map[string]server{}
func NewValidator(servers map[string]Server) (http.Handler, error) {
return &validator{
servers: servers,
client: &http.Client{},
}, nil
}

func makeTestValidator(servers map[string]string, get httpGet) (http.Handler, error) {
result := map[string]Server{}
for name, value := range servers {
host, port, err := net.SplitHostPort(value)
if err != nil {
Expand All @@ -105,16 +111,10 @@ func NewValidator(servers map[string]string) (http.Handler, error) {
if err != nil {
return nil, fmt.Errorf("invalid server spec: %s (%v)", port, err)
}
result[name] = server{host, val}
result[name] = Server{Addr: host, Port: val, Path: "/healthz"}
}
return &validator{
servers: result,
client: &http.Client{},
}, nil
}

func makeTestValidator(servers map[string]string, get httpGet) (http.Handler, error) {
v, e := NewValidator(servers)
v, e := NewValidator(result)
if e == nil {
v.(*validator).client = get
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/apiserver/validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,12 @@ func TestValidate(t *testing.T) {
{nil, "foo", health.Unhealthy, 500, true},
}

s := server{addr: "foo.com", port: 8080}
s := Server{Addr: "foo.com", Port: 8080, Path: "/healthz"}

for _, test := range tests {
fake := makeFake(test.data, test.code, test.err)
status, data, err := s.check(fake)
expect := fmt.Sprintf("http://%s:%d/healthz", s.addr, s.port)
expect := fmt.Sprintf("http://%s:%d/healthz", s.Addr, s.Port)
if fake.url != expect {
t.Errorf("expected %s, got %s", expect, fake.url)
}
Expand Down
43 changes: 43 additions & 0 deletions pkg/master/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@ limitations under the License.
package master

import (
"fmt"
"net"
"net/http"
"net/url"
"strconv"
"strings"
"time"

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta1"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta2"
Expand Down Expand Up @@ -306,6 +309,9 @@ func (m *Master) init(c *Config) {
versionHandler := apiserver.APIVersionHandler("v1beta1", "v1beta2")
m.mux.Handle(c.APIPrefix, versionHandler)
apiserver.InstallSupport(m.mux)
serversToValidate := m.getServersToValidate(c)

apiserver.InstallValidator(m.mux, serversToValidate)
if c.EnableLogsSupport {
apiserver.InstallLogsSupport(m.mux)
}
Expand Down Expand Up @@ -340,6 +346,43 @@ func (m *Master) init(c *Config) {
m.masterServices.Start()
}

func (m *Master) getServersToValidate(c *Config) map[string]apiserver.Server {
serversToValidate := map[string]apiserver.Server{
"controller-manager": {Addr: "127.0.0.1", Port: 10252, Path: "/healthz"},
"scheduler": {Addr: "127.0.0.1", Port: 10251, Path: "/healthz"},
}
for ix, machine := range c.EtcdHelper.Client.GetCluster() {
etcdUrl, err := url.Parse(machine)
if err != nil {
glog.Errorf("Failed to parse etcd url for validation: %v", err)
continue
}
var port int
var addr string
if strings.Contains(etcdUrl.Host, ":") {
var portString string
addr, portString, err = net.SplitHostPort(etcdUrl.Host)
if err != nil {
glog.Errorf("Failed to split host/port: %s (%v)", etcdUrl.Host, err)
continue
}
port, _ = strconv.Atoi(portString)
} else {
addr = etcdUrl.Host
port = 4001
}
serversToValidate[fmt.Sprintf("etcd-%d", ix)] = apiserver.Server{Addr: addr, Port: port, Path: "/v2/keys/"}
}
nodes, err := m.minionRegistry.ListMinions(api.NewDefaultContext())
if err != nil {
glog.Errorf("Failed to list minions: %v", err)
}
for ix, node := range nodes.Items {
serversToValidate[fmt.Sprintf("node-%d", ix)] = apiserver.Server{Addr: node.HostIP, Port: 10250, Path: "/healthz"}
}
return serversToValidate
}

// API_v1beta1 returns the resources and codec for API version v1beta1.
func (m *Master) API_v1beta1() (map[string]apiserver.RESTStorage, runtime.Codec, string, runtime.SelfLinker) {
storage := make(map[string]apiserver.RESTStorage)
Expand Down
47 changes: 47 additions & 0 deletions pkg/master/master_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package master

import (
"testing"

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
)

func TestGetServersToValidate(t *testing.T) {
master := Master{}
config := Config{}
fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.Machines = []string{"http://machine1:4001", "http://machine2", "http://machine3:4003"}
config.EtcdHelper = tools.EtcdHelper{fakeClient, latest.Codec, nil}

master.minionRegistry = registrytest.NewMinionRegistry([]string{"node1", "node2"}, api.NodeResources{})

servers := master.getServersToValidate(&config)

if len(servers) != 7 {
t.Errorf("unexpected server list: %#v", servers)
}
for _, server := range []string{"scheduler", "controller-manager", "etcd-0", "etcd-1", "etcd-2", "node-0", "node-1"} {
if _, ok := servers[server]; !ok {
t.Errorf("server list missing: %s", server)
}
}
}
2 changes: 2 additions & 0 deletions pkg/tools/etcd_tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ var (

// EtcdClient is an injectable interface for testing.
type EtcdClient interface {
GetCluster() []string
AddChild(key, data string, ttl uint64) (*etcd.Response, error)
Get(key string, sort, recursive bool) (*etcd.Response, error)
Set(key, value string, ttl uint64) (*etcd.Response, error)
Expand All @@ -56,6 +57,7 @@ type EtcdClient interface {

// EtcdGetSet interface exposes only the etcd operations needed by EtcdHelper.
type EtcdGetSet interface {
GetCluster() []string
Get(key string, sort, recursive bool) (*etcd.Response, error)
Set(key, value string, ttl uint64) (*etcd.Response, error)
Create(key, value string, ttl uint64) (*etcd.Response, error)
Expand Down
5 changes: 5 additions & 0 deletions pkg/tools/fake_etcd_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type FakeEtcdClient struct {
TestIndex bool
ChangeIndex uint64
LastSetTTL uint64
Machines []string

// Will become valid after Watch is called; tester may write to it. Tester may
// also read from it to verify that it's closed after injecting an error.
Expand Down Expand Up @@ -83,6 +84,10 @@ func NewFakeEtcdClient(t TestLogger) *FakeEtcdClient {
return ret
}

func (f *FakeEtcdClient) GetCluster() []string {
return f.Machines
}

func (f *FakeEtcdClient) ExpectNotFoundGet(key string) {
f.expectNotFoundGetSet[key] = struct{}{}
}
Expand Down

0 comments on commit 21a6e96

Please sign in to comment.