diff --git a/api/examples/pod.json b/api/examples/pod.json index 665444c712fa4..5ab5bc461ae25 100644 --- a/api/examples/pod.json +++ b/api/examples/pod.json @@ -5,11 +5,21 @@ "version": "v1beta1", "id": "php", "containers": [{ + "name": "nginx", "image": "dockerfile/nginx", "ports": [{ "containerPort": 80, "hostPort": 8080 - }] + }], + "livenessProbe": { + "enabled": true, + "type": "http", + "initialDelaySeconds": 30, + "httpGet": { + "path": "/index.html", + "port": "8080" + } + } }] } }, diff --git a/cluster/saltbase/salt/nginx/kubernetes-site b/cluster/saltbase/salt/nginx/kubernetes-site index 0690f3f0e14cc..c5113c3489af5 100644 --- a/cluster/saltbase/salt/nginx/kubernetes-site +++ b/cluster/saltbase/salt/nginx/kubernetes-site @@ -55,6 +55,8 @@ server { proxy_buffers 16 32k; proxy_busy_buffers_size 64k; proxy_temp_file_write_size 64k; + # Disable retry + proxy_next_upstream off; } location /etcd/ { auth_basic "Restricted"; diff --git a/pkg/api/types.go b/pkg/api/types.go index 1fa2a5c1448d3..5b83cba305ce8 100644 --- a/pkg/api/types.go +++ b/pkg/api/types.go @@ -102,6 +102,25 @@ type EnvVar struct { Value string `yaml:"value,omitempty" json:"value,omitempty"` } +type HTTPGetProbe struct { + // Path to access on the http server + Path string `yaml:"path,omitempty" json:"path,omitempty"` + // Name or number of the port to access on the container + Port string `yaml:"port,omitempty" json:"port,omitempty"` + // Host name to connect to. Optional, default: "localhost" + Host string `yaml:"host,omitempty" json:"host,omitempty"` +} + +type LivenessProbe struct { + Enabled bool `yaml:"enabled,omitempty" json:"enabled,omitempty"` + // Type of liveness probe. Current legal values "http" + Type string `yaml:"type,omitempty" json:"type,omitempty"` + // HTTPGetProbe parameters, required if Type == 'http' + HTTPGet HTTPGetProbe `yaml:"httpGet,omitempty" json:"httpGet,omitempty"` + // Length of time before health checking is activated. In seconds. + InitialDelaySeconds int64 `yaml:"initialDelaySeconds,omitempty" json:"initialDelaySeconds,omitempty"` +} + // Container represents a single container that is expected to be run on the host. type Container struct { // Required: This must be a DNS_LABEL. Each container in a pod must @@ -118,8 +137,9 @@ type Container struct { // Optional: Defaults to unlimited. Memory int `yaml:"memory,omitempty" json:"memory,omitempty"` // Optional: Defaults to unlimited. - CPU int `yaml:"cpu,omitempty" json:"cpu,omitempty"` - VolumeMounts []VolumeMount `yaml:"volumeMounts,omitempty" json:"volumeMounts,omitempty"` + CPU int `yaml:"cpu,omitempty" json:"cpu,omitempty"` + VolumeMounts []VolumeMount `yaml:"volumeMounts,omitempty" json:"volumeMounts,omitempty"` + LivenessProbe LivenessProbe `yaml:"livenessProbe,omitempty" json:"livenessProbe,omitempty"` } // Percentile represents a pair which contains a percentage from 0 to 100 and diff --git a/pkg/kubelet/fake_docker_client.go b/pkg/kubelet/fake_docker_client.go index c51495d5bba88..d84496da925fd 100644 --- a/pkg/kubelet/fake_docker_client.go +++ b/pkg/kubelet/fake_docker_client.go @@ -56,8 +56,9 @@ func (f *FakeDockerClient) CreateContainer(c docker.CreateContainerOptions) (*do f.Created = append(f.Created, c.Name) // This is not a very good fake. We'll just add this container's name to the list. // Docker likes to add a '/', so copy that behavior. - f.containerList = append(f.containerList, docker.APIContainers{ID: c.Name, Names: []string{"/" + c.Name}}) - return &docker.Container{ID: "/" + c.Name}, nil + name := "/" + c.Name + f.containerList = append(f.containerList, docker.APIContainers{ID: name, Names: []string{name}}) + return &docker.Container{ID: name}, nil } func (f *FakeDockerClient) StartContainer(id string, hostConfig *docker.HostConfig) error { @@ -68,6 +69,13 @@ func (f *FakeDockerClient) StartContainer(id string, hostConfig *docker.HostConf func (f *FakeDockerClient) StopContainer(id string, timeout uint) error { f.appendCall("stop") f.stopped = append(f.stopped, id) + var newList []docker.APIContainers + for _, container := range f.containerList { + if container.ID != id { + newList = append(newList, container) + } + } + f.containerList = newList return f.err } diff --git a/pkg/kubelet/health_check.go b/pkg/kubelet/health_check.go new file mode 100644 index 0000000000000..391d976efa2d5 --- /dev/null +++ b/pkg/kubelet/health_check.go @@ -0,0 +1,99 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubelet + +import ( + "fmt" + "net/http" + "strconv" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/golang/glog" +) + +type HealthChecker interface { + IsHealthy(container api.Container) (bool, error) +} + +type httpDoInterface interface { + Get(string) (*http.Response, error) +} + +func MakeHealthChecker() HealthChecker { + return &MuxHealthChecker{ + checkers: map[string]HealthChecker{ + "http": &HTTPHealthChecker{ + client: &http.Client{}, + }, + }, + } +} + +type MuxHealthChecker struct { + checkers map[string]HealthChecker +} + +func (m *MuxHealthChecker) IsHealthy(container api.Container) (bool, error) { + checker, ok := m.checkers[container.LivenessProbe.Type] + if !ok || checker == nil { + glog.Warningf("Failed to find health checker for %s %s", container.Name, container.LivenessProbe.Type) + return true, nil + } + return checker.IsHealthy(container) +} + +type HTTPHealthChecker struct { + client httpDoInterface +} + +func (h *HTTPHealthChecker) findPort(container api.Container, portName string) int64 { + for _, port := range container.Ports { + if port.Name == portName { + // TODO This means you can only health check exposed ports + return int64(port.HostPort) + } + } + return -1 +} + +func (h *HTTPHealthChecker) IsHealthy(container api.Container) (bool, error) { + params := container.LivenessProbe.HTTPGet + port := h.findPort(container, params.Port) + if port == -1 { + var err error + port, err = strconv.ParseInt(params.Port, 10, 0) + if err != nil { + return true, err + } + } + var host string + if len(params.Host) > 0 { + host = params.Host + } else { + host = "localhost" + } + url := fmt.Sprintf("http://%s:%d%s", host, port, params.Path) + res, err := h.client.Get(url) + if res != nil && res.Body != nil { + defer res.Body.Close() + } + if err != nil { + // At this point, if it fails, its either a policy (unlikely) or HTTP protocol (likely) error. + return false, nil + } + return res.StatusCode == http.StatusOK, nil +} diff --git a/pkg/kubelet/health_check_test.go b/pkg/kubelet/health_check_test.go new file mode 100644 index 0000000000000..da25b590c29a4 --- /dev/null +++ b/pkg/kubelet/health_check_test.go @@ -0,0 +1,91 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubelet + +import ( + "net/http" + "testing" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" +) + +type fakeHttpClient struct { + req string + res http.Response + err error +} + +func (f *fakeHttpClient) Get(url string) (*http.Response, error) { + f.req = url + return &f.res, f.err +} + +func TestHttpHealth(t *testing.T) { + fakeClient := fakeHttpClient{ + res: http.Response{ + StatusCode: http.StatusOK, + }, + } + + check := HTTPHealthChecker{ + client: &fakeClient, + } + + container := api.Container{ + LivenessProbe: api.LivenessProbe{ + HTTPGet: api.HTTPGetProbe{ + Port: "8080", + Path: "/foo/bar", + }, + Type: "http", + }, + } + + ok, err := check.IsHealthy(container) + if !ok { + t.Error("Unexpected unhealthy") + } + if err != nil { + t.Errorf("Unexpected error: %#v", err) + } + if fakeClient.req != "http://localhost:8080/foo/bar" { + t.Errorf("Unexpected url: %s", fakeClient.req) + } +} + +func TestFindPort(t *testing.T) { + container := api.Container{ + Ports: []api.Port{ + { + Name: "foo", + HostPort: 8080, + }, + { + Name: "bar", + HostPort: 9000, + }, + }, + } + check := HTTPHealthChecker{} + validatePort(t, check.findPort(container, "foo"), 8080) +} + +func validatePort(t *testing.T, port int64, expectedPort int64) { + if port != expectedPort { + t.Errorf("Unexpected port: %d, expected: %d", port, expectedPort) + } +} diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 296734315ef49..a018786c428a1 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -92,6 +92,7 @@ type Kubelet struct { SyncFrequency time.Duration HTTPCheckFrequency time.Duration pullLock sync.Mutex + HealthChecker HealthChecker } type manifestUpdate struct { @@ -155,6 +156,7 @@ func (kl *Kubelet) RunKubelet(dockerEndpoint, config_path, manifest_url, etcd_se } go util.Forever(func() { s.ListenAndServe() }, 0) } + kl.HealthChecker = MakeHealthChecker() kl.syncLoop(updateChannel, kl) } @@ -219,6 +221,19 @@ func (kl *Kubelet) getContainerID(manifest *api.ContainerManifest, container *ap return "", nil } +func (kl *Kubelet) getContainer(ID DockerID) (*docker.APIContainers, error) { + dockerContainers, err := kl.getDockerContainers() + if err != nil { + return nil, err + } + for dockerID, dockerContainer := range dockerContainers { + if dockerID == ID { + return &dockerContainer, nil + } + } + return nil, nil +} + func (kl *Kubelet) MakeDockerPuller() DockerPuller { return dockerPuller{ client: kl.DockerClient, @@ -686,7 +701,6 @@ func (kl *Kubelet) syncManifest(manifest *api.ContainerManifest, keepChannel cha } keepChannel <- netID for _, container := range manifest.Containers { - glog.Infof("Syncing container: %v", container) containerID, err := kl.getContainerID(manifest, &container) if err != nil { glog.Errorf("Error finding container: %v skipping manifest %s container %s.", err, manifest.ID, container.Name) @@ -706,7 +720,28 @@ func (kl *Kubelet) syncManifest(manifest *api.ContainerManifest, keepChannel cha continue } } else { + glog.Infof("manifest %s container %s exists as %v", manifest.ID, container.Name, containerID) glog.V(1).Infof("manifest %s container %s exists as %v", manifest.ID, container.Name, containerID) + dockerContainer, err := kl.getContainer(containerID) + // TODO: This should probably be separated out into a separate goroutine. + healthy, err := kl.healthy(container, dockerContainer) + if err != nil { + glog.V(1).Infof("health check errored: %v", err) + continue + } + if !healthy { + glog.V(1).Infof("manifest %s container %s is unhealthy.", manifest.ID, container.Name) + if err != nil { + glog.V(1).Infof("Failed to get container info %v, for %s", err, containerID) + continue + } + err = kl.killContainer(*dockerContainer) + if err != nil { + glog.V(1).Infof("Failed to kill container %s: %v", containerID, err) + continue + } + containerID, err = kl.runContainer(manifest, &container, "container:"+string(netID)) + } } keepChannel <- containerID } @@ -942,3 +977,17 @@ func (kl *Kubelet) GetContainerStats(podID, containerName string) (*api.Containe func (kl *Kubelet) GetMachineStats() (*api.ContainerStats, error) { return kl.statsFromContainerPath("/") } + +func (kl *Kubelet) healthy(container api.Container, dockerContainer *docker.APIContainers) (bool, error) { + // Give the container 60 seconds to start up. + if !container.LivenessProbe.Enabled { + return true, nil + } + if time.Now().Unix()-dockerContainer.Created < container.LivenessProbe.InitialDelaySeconds { + return true, nil + } + if kl.HealthChecker == nil { + return true, nil + } + return kl.HealthChecker.IsHealthy(container) +} diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 5484e2265e1eb..0fd1e2275a364 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -106,9 +106,11 @@ func verifyCalls(t *testing.T, fakeDocker *FakeDockerClient, calls []string) { func verifyStringArrayEquals(t *testing.T, actual, expected []string) { invalid := len(actual) != len(expected) - for ix, value := range actual { - if expected[ix] != value { - invalid = true + if !invalid { + for ix, value := range actual { + if expected[ix] != value { + invalid = true + } } } if invalid { @@ -382,7 +384,7 @@ func TestSyncManifestsDoesNothing(t *testing.T) { }, }) expectNoError(t, err) - verifyCalls(t, fakeDocker, []string{"list", "list", "list"}) + verifyCalls(t, fakeDocker, []string{"list", "list", "list", "list"}) } func TestSyncManifestsDeletes(t *testing.T) { @@ -420,6 +422,54 @@ func TestSyncManifestsDeletes(t *testing.T) { } } +type FalseHealthChecker struct{} + +func (f *FalseHealthChecker) IsHealthy(container api.Container) (bool, error) { + return false, nil +} + +func TestSyncManifestsUnhealthy(t *testing.T) { + kubelet, _, fakeDocker := makeTestKubelet(t) + kubelet.HealthChecker = &FalseHealthChecker{} + fakeDocker.containerList = []docker.APIContainers{ + { + // the k8s prefix is required for the kubelet to manage the container + Names: []string{"/k8s--bar--foo"}, + ID: "1234", + }, + { + // network container + Names: []string{"/k8s--net--foo--"}, + ID: "9876", + }, + } + err := kubelet.SyncManifests([]api.ContainerManifest{ + { + ID: "foo", + Containers: []api.Container{ + {Name: "bar", + LivenessProbe: api.LivenessProbe{ + Enabled: true, + // Always returns healthy == false + Type: "false", + }, + }, + }, + }}) + expectNoError(t, err) + verifyCalls(t, fakeDocker, []string{"list", "list", "list", "stop", "create", "start", "list"}) + + // A map interation is used to delete containers, so must not depend on + // order here. + expectedToStop := map[string]bool{ + "1234": true, + } + if len(fakeDocker.stopped) != 1 || + !expectedToStop[fakeDocker.stopped[0]] { + t.Errorf("Wrong containers were stopped: %v", fakeDocker.stopped) + } +} + func TestEventWriting(t *testing.T) { kubelet, fakeEtcd, _ := makeTestKubelet(t) expectedEvent := api.Event{