Skip to content

Commit

Permalink
add http health checks.
Browse files Browse the repository at this point in the history
  • Loading branch information
brendandburns committed Jul 9, 2014
1 parent 4b6ff69 commit 41c6680
Show file tree
Hide file tree
Showing 8 changed files with 339 additions and 10 deletions.
12 changes: 11 additions & 1 deletion api/examples/pod.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,21 @@
"version": "v1beta1",
"id": "php",
"containers": [{
"name": "nginx",
"image": "dockerfile/nginx",
"ports": [{
"containerPort": 80,
"hostPort": 8080
}]
}],
"livenessProbe": {
"enabled": true,
"type": "http",
"initialDelaySeconds": 30,
"httpGet": {
"path": "/index.html",
"port": "8080"
}
}
}]
}
},
Expand Down
2 changes: 2 additions & 0 deletions cluster/saltbase/salt/nginx/kubernetes-site
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ server {
proxy_buffers 16 32k;
proxy_busy_buffers_size 64k;
proxy_temp_file_write_size 64k;
# Disable retry
proxy_next_upstream off;
}
location /etcd/ {
auth_basic "Restricted";
Expand Down
24 changes: 22 additions & 2 deletions pkg/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,25 @@ type EnvVar struct {
Value string `yaml:"value,omitempty" json:"value,omitempty"`
}

type HTTPGetProbe struct {
// Path to access on the http server
Path string `yaml:"path,omitempty" json:"path,omitempty"`
// Name or number of the port to access on the container
Port string `yaml:"port,omitempty" json:"port,omitempty"`
// Host name to connect to. Optional, default: "localhost"
Host string `yaml:"host,omitempty" json:"host,omitempty"`
}

type LivenessProbe struct {
Enabled bool `yaml:"enabled,omitempty" json:"enabled,omitempty"`
// Type of liveness probe. Current legal values "http"
Type string `yaml:"type,omitempty" json:"type,omitempty"`
// HTTPGetProbe parameters, required if Type == 'http'
HTTPGet HTTPGetProbe `yaml:"httpGet,omitempty" json:"httpGet,omitempty"`
// Length of time before health checking is activated. In seconds.
InitialDelaySeconds int64 `yaml:"initialDelaySeconds,omitempty" json:"initialDelaySeconds,omitempty"`
}

// Container represents a single container that is expected to be run on the host.
type Container struct {
// Required: This must be a DNS_LABEL. Each container in a pod must
Expand All @@ -118,8 +137,9 @@ type Container struct {
// Optional: Defaults to unlimited.
Memory int `yaml:"memory,omitempty" json:"memory,omitempty"`
// Optional: Defaults to unlimited.
CPU int `yaml:"cpu,omitempty" json:"cpu,omitempty"`
VolumeMounts []VolumeMount `yaml:"volumeMounts,omitempty" json:"volumeMounts,omitempty"`
CPU int `yaml:"cpu,omitempty" json:"cpu,omitempty"`
VolumeMounts []VolumeMount `yaml:"volumeMounts,omitempty" json:"volumeMounts,omitempty"`
LivenessProbe LivenessProbe `yaml:"livenessProbe,omitempty" json:"livenessProbe,omitempty"`
}

// Percentile represents a pair which contains a percentage from 0 to 100 and
Expand Down
12 changes: 10 additions & 2 deletions pkg/kubelet/fake_docker_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@ func (f *FakeDockerClient) CreateContainer(c docker.CreateContainerOptions) (*do
f.Created = append(f.Created, c.Name)
// This is not a very good fake. We'll just add this container's name to the list.
// Docker likes to add a '/', so copy that behavior.
f.containerList = append(f.containerList, docker.APIContainers{ID: c.Name, Names: []string{"/" + c.Name}})
return &docker.Container{ID: "/" + c.Name}, nil
name := "/" + c.Name
f.containerList = append(f.containerList, docker.APIContainers{ID: name, Names: []string{name}})
return &docker.Container{ID: name}, nil
}

func (f *FakeDockerClient) StartContainer(id string, hostConfig *docker.HostConfig) error {
Expand All @@ -68,6 +69,13 @@ func (f *FakeDockerClient) StartContainer(id string, hostConfig *docker.HostConf
func (f *FakeDockerClient) StopContainer(id string, timeout uint) error {
f.appendCall("stop")
f.stopped = append(f.stopped, id)
var newList []docker.APIContainers
for _, container := range f.containerList {
if container.ID != id {
newList = append(newList, container)
}
}
f.containerList = newList
return f.err
}

Expand Down
99 changes: 99 additions & 0 deletions pkg/kubelet/health_check.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package kubelet

import (
"fmt"
"net/http"
"strconv"

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/golang/glog"
)

type HealthChecker interface {
IsHealthy(container api.Container) (bool, error)
}

type httpDoInterface interface {
Get(string) (*http.Response, error)
}

func MakeHealthChecker() HealthChecker {
return &MuxHealthChecker{
checkers: map[string]HealthChecker{
"http": &HTTPHealthChecker{
client: &http.Client{},
},
},
}
}

type MuxHealthChecker struct {
checkers map[string]HealthChecker
}

func (m *MuxHealthChecker) IsHealthy(container api.Container) (bool, error) {
checker, ok := m.checkers[container.LivenessProbe.Type]
if !ok || checker == nil {
glog.Warningf("Failed to find health checker for %s %s", container.Name, container.LivenessProbe.Type)
return true, nil
}
return checker.IsHealthy(container)
}

type HTTPHealthChecker struct {
client httpDoInterface
}

func (h *HTTPHealthChecker) findPort(container api.Container, portName string) int64 {
for _, port := range container.Ports {
if port.Name == portName {
// TODO This means you can only health check exposed ports
return int64(port.HostPort)
}
}
return -1
}

func (h *HTTPHealthChecker) IsHealthy(container api.Container) (bool, error) {
params := container.LivenessProbe.HTTPGet
port := h.findPort(container, params.Port)
if port == -1 {
var err error
port, err = strconv.ParseInt(params.Port, 10, 0)
if err != nil {
return true, err
}
}
var host string
if len(params.Host) > 0 {
host = params.Host
} else {
host = "localhost"
}
url := fmt.Sprintf("http://%s:%d%s", host, port, params.Path)
res, err := h.client.Get(url)
if res != nil && res.Body != nil {
defer res.Body.Close()
}
if err != nil {
// At this point, if it fails, its either a policy (unlikely) or HTTP protocol (likely) error.
return false, nil
}
return res.StatusCode == http.StatusOK, nil
}
91 changes: 91 additions & 0 deletions pkg/kubelet/health_check_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package kubelet

import (
"net/http"
"testing"

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
)

type fakeHttpClient struct {
req string
res http.Response
err error
}

func (f *fakeHttpClient) Get(url string) (*http.Response, error) {
f.req = url
return &f.res, f.err
}

func TestHttpHealth(t *testing.T) {
fakeClient := fakeHttpClient{
res: http.Response{
StatusCode: http.StatusOK,
},
}

check := HTTPHealthChecker{
client: &fakeClient,
}

container := api.Container{
LivenessProbe: api.LivenessProbe{
HTTPGet: api.HTTPGetProbe{
Port: "8080",
Path: "/foo/bar",
},
Type: "http",
},
}

ok, err := check.IsHealthy(container)
if !ok {
t.Error("Unexpected unhealthy")
}
if err != nil {
t.Errorf("Unexpected error: %#v", err)
}
if fakeClient.req != "http://localhost:8080/foo/bar" {
t.Errorf("Unexpected url: %s", fakeClient.req)
}
}

func TestFindPort(t *testing.T) {
container := api.Container{
Ports: []api.Port{
{
Name: "foo",
HostPort: 8080,
},
{
Name: "bar",
HostPort: 9000,
},
},
}
check := HTTPHealthChecker{}
validatePort(t, check.findPort(container, "foo"), 8080)
}

func validatePort(t *testing.T, port int64, expectedPort int64) {
if port != expectedPort {
t.Errorf("Unexpected port: %d, expected: %d", port, expectedPort)
}
}
51 changes: 50 additions & 1 deletion pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ type Kubelet struct {
SyncFrequency time.Duration
HTTPCheckFrequency time.Duration
pullLock sync.Mutex
HealthChecker HealthChecker
}

type manifestUpdate struct {
Expand Down Expand Up @@ -155,6 +156,7 @@ func (kl *Kubelet) RunKubelet(dockerEndpoint, config_path, manifest_url, etcd_se
}
go util.Forever(func() { s.ListenAndServe() }, 0)
}
kl.HealthChecker = MakeHealthChecker()
kl.syncLoop(updateChannel, kl)
}

Expand Down Expand Up @@ -219,6 +221,19 @@ func (kl *Kubelet) getContainerID(manifest *api.ContainerManifest, container *ap
return "", nil
}

func (kl *Kubelet) getContainer(ID DockerID) (*docker.APIContainers, error) {
dockerContainers, err := kl.getDockerContainers()
if err != nil {
return nil, err
}
for dockerID, dockerContainer := range dockerContainers {
if dockerID == ID {
return &dockerContainer, nil
}
}
return nil, nil
}

func (kl *Kubelet) MakeDockerPuller() DockerPuller {
return dockerPuller{
client: kl.DockerClient,
Expand Down Expand Up @@ -686,7 +701,6 @@ func (kl *Kubelet) syncManifest(manifest *api.ContainerManifest, keepChannel cha
}
keepChannel <- netID
for _, container := range manifest.Containers {
glog.Infof("Syncing container: %v", container)
containerID, err := kl.getContainerID(manifest, &container)
if err != nil {
glog.Errorf("Error finding container: %v skipping manifest %s container %s.", err, manifest.ID, container.Name)
Expand All @@ -706,7 +720,28 @@ func (kl *Kubelet) syncManifest(manifest *api.ContainerManifest, keepChannel cha
continue
}
} else {
glog.Infof("manifest %s container %s exists as %v", manifest.ID, container.Name, containerID)
glog.V(1).Infof("manifest %s container %s exists as %v", manifest.ID, container.Name, containerID)
dockerContainer, err := kl.getContainer(containerID)
// TODO: This should probably be separated out into a separate goroutine.
healthy, err := kl.healthy(container, dockerContainer)
if err != nil {
glog.V(1).Infof("health check errored: %v", err)
continue
}
if !healthy {
glog.V(1).Infof("manifest %s container %s is unhealthy.", manifest.ID, container.Name)
if err != nil {
glog.V(1).Infof("Failed to get container info %v, for %s", err, containerID)
continue
}
err = kl.killContainer(*dockerContainer)
if err != nil {
glog.V(1).Infof("Failed to kill container %s: %v", containerID, err)
continue
}
containerID, err = kl.runContainer(manifest, &container, "container:"+string(netID))
}
}
keepChannel <- containerID
}
Expand Down Expand Up @@ -942,3 +977,17 @@ func (kl *Kubelet) GetContainerStats(podID, containerName string) (*api.Containe
func (kl *Kubelet) GetMachineStats() (*api.ContainerStats, error) {
return kl.statsFromContainerPath("/")
}

func (kl *Kubelet) healthy(container api.Container, dockerContainer *docker.APIContainers) (bool, error) {
// Give the container 60 seconds to start up.
if !container.LivenessProbe.Enabled {
return true, nil
}
if time.Now().Unix()-dockerContainer.Created < container.LivenessProbe.InitialDelaySeconds {
return true, nil
}
if kl.HealthChecker == nil {
return true, nil
}
return kl.HealthChecker.IsHealthy(container)
}
Loading

0 comments on commit 41c6680

Please sign in to comment.