diff --git a/pkg/kubelet/dockertools/docker.go b/pkg/kubelet/dockertools/docker.go index 6c64f11c39007..e01da47f27e78 100644 --- a/pkg/kubelet/dockertools/docker.go +++ b/pkg/kubelet/dockertools/docker.go @@ -19,11 +19,9 @@ package dockertools import ( "bufio" "bytes" - "errors" "fmt" "hash/adler32" "io" - "io/ioutil" "math/rand" "os" "os/exec" @@ -447,286 +445,6 @@ func (c DockerContainers) FindContainersByPod(podUID types.UID, podFullName stri return containers } -// GetKubeletDockerContainers takes client and boolean whether to list all container or just the running ones. -// Returns a map of docker containers that we manage. The map key is the docker container ID -func GetKubeletDockerContainers(client DockerInterface, allContainers bool) (DockerContainers, error) { - result := make(DockerContainers) - containers, err := client.ListContainers(docker.ListContainersOptions{All: allContainers}) - if err != nil { - return nil, err - } - for i := range containers { - container := &containers[i] - if len(container.Names) == 0 { - continue - } - // Skip containers that we didn't create to allow users to manually - // spin up their own containers if they want. - // TODO(dchen1107): Remove the old separator "--" by end of Oct - if !strings.HasPrefix(container.Names[0], "/"+containerNamePrefix+"_") && - !strings.HasPrefix(container.Names[0], "/"+containerNamePrefix+"--") { - glog.V(3).Infof("Docker Container: %s is not managed by kubelet.", container.Names[0]) - continue - } - result[DockerID(container.ID)] = container - } - return result, nil -} - -// GetRecentDockerContainersWithNameAndUUID returns a list of dead docker containers which matches the name -// and uid given. -func GetRecentDockerContainersWithNameAndUUID(client DockerInterface, podFullName string, uid types.UID, containerName string) ([]*docker.Container, error) { - var result []*docker.Container - containers, err := client.ListContainers(docker.ListContainersOptions{All: true}) - if err != nil { - return nil, err - } - for _, dockerContainer := range containers { - if len(dockerContainer.Names) == 0 { - continue - } - dockerName, _, err := ParseDockerName(dockerContainer.Names[0]) - if err != nil { - continue - } - if dockerName.PodFullName != podFullName { - continue - } - if uid != "" && dockerName.PodUID != uid { - continue - } - if dockerName.ContainerName != containerName { - continue - } - inspectResult, _ := client.InspectContainer(dockerContainer.ID) - if inspectResult != nil && !inspectResult.State.Running && !inspectResult.State.Paused { - result = append(result, inspectResult) - } - } - return result, nil -} - -// GetKubeletDockerContainerLogs returns logs of specific container -// By default the function will return snapshot of the container log -// Log streaming is possible if 'follow' param is set to true -// Log tailing is possible when number of tailed lines are set and only if 'follow' is false -// TODO: Make 'RawTerminal' option flagable. -func GetKubeletDockerContainerLogs(client DockerInterface, containerID, tail string, follow bool, stdout, stderr io.Writer) (err error) { - opts := docker.LogsOptions{ - Container: containerID, - Stdout: true, - Stderr: true, - OutputStream: stdout, - ErrorStream: stderr, - Timestamps: true, - RawTerminal: false, - Follow: follow, - } - - if !follow { - opts.Tail = tail - } - - err = client.Logs(opts) - return -} - -var ( - // ErrNoContainersInPod is returned when there are no containers for a given pod - ErrNoContainersInPod = errors.New("no containers exist for this pod") - - // ErrNoPodInfraContainerInPod is returned when there is no pod infra container for a given pod - ErrNoPodInfraContainerInPod = errors.New("No pod infra container exists for this pod") - - // ErrContainerCannotRun is returned when a container is created, but cannot run properly - ErrContainerCannotRun = errors.New("Container cannot run") -) - -// Internal information kept for containers from inspection -type containerStatusResult struct { - status api.ContainerStatus - ip string - err error -} - -func inspectContainer(client DockerInterface, dockerID, containerName, tPath string) *containerStatusResult { - result := containerStatusResult{api.ContainerStatus{}, "", nil} - - inspectResult, err := client.InspectContainer(dockerID) - - if err != nil { - result.err = err - return &result - } - if inspectResult == nil { - // Why did we not get an error? - return &result - } - - glog.V(3).Infof("Container inspect result: %+v", *inspectResult) - result.status = api.ContainerStatus{ - Name: containerName, - Image: inspectResult.Config.Image, - ImageID: DockerPrefix + inspectResult.Image, - ContainerID: DockerPrefix + dockerID, - } - - waiting := true - if inspectResult.State.Running { - result.status.State.Running = &api.ContainerStateRunning{ - StartedAt: util.NewTime(inspectResult.State.StartedAt), - } - if containerName == PodInfraContainerName && inspectResult.NetworkSettings != nil { - result.ip = inspectResult.NetworkSettings.IPAddress - } - waiting = false - } else if !inspectResult.State.FinishedAt.IsZero() { - reason := "" - // Note: An application might handle OOMKilled gracefully. - // In that case, the container is oom killed, but the exit - // code could be 0. - if inspectResult.State.OOMKilled { - reason = "OOM Killed" - } else { - reason = inspectResult.State.Error - } - result.status.State.Termination = &api.ContainerStateTerminated{ - ExitCode: inspectResult.State.ExitCode, - Reason: reason, - StartedAt: util.NewTime(inspectResult.State.StartedAt), - FinishedAt: util.NewTime(inspectResult.State.FinishedAt), - } - if tPath != "" { - path, found := inspectResult.Volumes[tPath] - if found { - data, err := ioutil.ReadFile(path) - if err != nil { - glog.Errorf("Error on reading termination-log %s: %v", path, err) - } else { - result.status.State.Termination.Message = string(data) - } - } - } - waiting = false - } - - if waiting { - // TODO(dchen1107): Separate issue docker/docker#8294 was filed - // TODO(dchen1107): Need to figure out why we are still waiting - // Check any issue to run container - result.status.State.Waiting = &api.ContainerStateWaiting{ - Reason: ErrContainerCannotRun.Error(), - } - } - - return &result -} - -// GetDockerPodStatus returns docker related status for all containers in the pod/manifest and -// infrastructure container -func GetDockerPodStatus(client DockerInterface, manifest api.PodSpec, podFullName string, uid types.UID) (*api.PodStatus, error) { - var podStatus api.PodStatus - statuses := make(map[string]api.ContainerStatus) - - expectedContainers := make(map[string]api.Container) - for _, container := range manifest.Containers { - expectedContainers[container.Name] = container - } - expectedContainers[PodInfraContainerName] = api.Container{} - - containers, err := client.ListContainers(docker.ListContainersOptions{All: true}) - if err != nil { - return nil, err - } - - for _, value := range containers { - if len(value.Names) == 0 { - continue - } - dockerName, _, err := ParseDockerName(value.Names[0]) - if err != nil { - continue - } - if dockerName.PodFullName != podFullName { - continue - } - if uid != "" && dockerName.PodUID != uid { - continue - } - dockerContainerName := dockerName.ContainerName - c, found := expectedContainers[dockerContainerName] - terminationMessagePath := "" - if !found { - // TODO(dchen1107): should figure out why not continue here - // continue - } else { - terminationMessagePath = c.TerminationMessagePath - } - // We assume docker return us a list of containers in time order - if containerStatus, found := statuses[dockerContainerName]; found { - containerStatus.RestartCount += 1 - statuses[dockerContainerName] = containerStatus - continue - } - - result := inspectContainer(client, value.ID, dockerContainerName, terminationMessagePath) - if result.err != nil { - return nil, err - } - - // Add user container information - if dockerContainerName == PodInfraContainerName && - result.status.State.Running != nil { - // Found network container - podStatus.PodIP = result.ip - } else { - statuses[dockerContainerName] = result.status - } - } - - if len(statuses) == 0 && podStatus.PodIP == "" { - return nil, ErrNoContainersInPod - } - - // Not all containers expected are created, check if there are - // image related issues - if len(statuses) < len(manifest.Containers) { - var containerStatus api.ContainerStatus - for _, container := range manifest.Containers { - if _, found := statuses[container.Name]; found { - continue - } - - image := container.Image - // Check image is ready on the node or not - // TODO(dchen1107): docker/docker/issues/8365 to figure out if the image exists - _, err := client.InspectImage(image) - if err == nil { - containerStatus.State.Waiting = &api.ContainerStateWaiting{ - Reason: fmt.Sprintf("Image: %s is ready, container is creating", image), - } - } else if err == docker.ErrNoSuchImage { - containerStatus.State.Waiting = &api.ContainerStateWaiting{ - Reason: fmt.Sprintf("Image: %s is not ready on the node", image), - } - } else { - containerStatus.State.Waiting = &api.ContainerStateWaiting{ - Reason: err.Error(), - } - } - - statuses[container.Name] = containerStatus - } - } - - podStatus.ContainerStatuses = make([]api.ContainerStatus, 0) - for _, status := range statuses { - podStatus.ContainerStatuses = append(podStatus.ContainerStatuses, status) - } - - return &podStatus, nil -} - const containerNamePrefix = "k8s" func HashContainer(container *api.Container) uint64 { @@ -781,23 +499,6 @@ func ParseDockerName(name string) (dockerName *KubeletContainerName, hash uint64 return &KubeletContainerName{podFullName, podUID, containerName}, hash, nil } -func GetRunningContainers(client DockerInterface, ids []string) ([]*docker.Container, error) { - result := []*docker.Container{} - if client == nil { - return nil, fmt.Errorf("unexpected nil docker client.") - } - for ix := range ids { - status, err := client.InspectContainer(ids[ix]) - if err != nil { - return nil, err - } - if status != nil && status.State.Running { - result = append(result, status) - } - } - return result, nil -} - // Get a docker endpoint, either from the string passed in, or $DOCKER_HOST environment variables func getDockerEndpoint(dockerEndpoint string) string { var endpoint string @@ -833,6 +534,47 @@ type ContainerCommandRunner interface { PortForward(podInfraContainerID string, port uint16, stream io.ReadWriteCloser) error } +func milliCPUToShares(milliCPU int64) int64 { + if milliCPU == 0 { + // zero milliCPU means unset. Use kernel default. + return 0 + } + // Conceptually (milliCPU / milliCPUToCPU) * sharesPerCPU, but factored to improve rounding. + shares := (milliCPU * sharesPerCPU) / milliCPUToCPU + if shares < minShares { + return minShares + } + return shares +} + +// GetKubeletDockerContainers lists all container or just the running ones. +// Returns a map of docker containers that we manage, keyed by container ID. +// TODO: Move this function with dockerCache to DockerManager. +func GetKubeletDockerContainers(client DockerInterface, allContainers bool) (DockerContainers, error) { + result := make(DockerContainers) + containers, err := client.ListContainers(docker.ListContainersOptions{All: allContainers}) + if err != nil { + return nil, err + } + for i := range containers { + container := &containers[i] + if len(container.Names) == 0 { + continue + } + // Skip containers that we didn't create to allow users to manually + // spin up their own containers if they want. + // TODO(dchen1107): Remove the old separator "--" by end of Oct + if !strings.HasPrefix(container.Names[0], "/"+containerNamePrefix+"_") && + !strings.HasPrefix(container.Names[0], "/"+containerNamePrefix+"--") { + glog.V(3).Infof("Docker Container: %s is not managed by kubelet.", container.Names[0]) + continue + } + result[DockerID(container.ID)] = container + } + return result, nil +} + +// TODO: Move this function with dockerCache to DockerManager. func GetPods(client DockerInterface, all bool) ([]*kubecontainer.Pod, error) { pods := make(map[types.UID]*kubecontainer.Pod) var result []*kubecontainer.Pod @@ -881,64 +623,3 @@ func GetPods(client DockerInterface, all bool) ([]*kubecontainer.Pod, error) { } return result, nil } - -func milliCPUToShares(milliCPU int64) int64 { - if milliCPU == 0 { - // zero milliCPU means unset. Use kernel default. - return 0 - } - // Conceptually (milliCPU / milliCPUToCPU) * sharesPerCPU, but factored to improve rounding. - shares := (milliCPU * sharesPerCPU) / milliCPUToCPU - if shares < minShares { - return minShares - } - return shares -} - -func makePortsAndBindings(container *api.Container) (map[docker.Port]struct{}, map[docker.Port][]docker.PortBinding) { - exposedPorts := map[docker.Port]struct{}{} - portBindings := map[docker.Port][]docker.PortBinding{} - for _, port := range container.Ports { - exteriorPort := port.HostPort - if exteriorPort == 0 { - // No need to do port binding when HostPort is not specified - continue - } - interiorPort := port.ContainerPort - // Some of this port stuff is under-documented voodoo. - // See http://stackoverflow.com/questions/20428302/binding-a-port-to-a-host-interface-using-the-rest-api - var protocol string - switch strings.ToUpper(string(port.Protocol)) { - case "UDP": - protocol = "/udp" - case "TCP": - protocol = "/tcp" - default: - glog.Warningf("Unknown protocol %q: defaulting to TCP", port.Protocol) - protocol = "/tcp" - } - dockerPort := docker.Port(strconv.Itoa(interiorPort) + protocol) - exposedPorts[dockerPort] = struct{}{} - portBindings[dockerPort] = []docker.PortBinding{ - { - HostPort: strconv.Itoa(exteriorPort), - HostIP: port.HostIP, - }, - } - } - return exposedPorts, portBindings -} - -func makeCapabilites(capAdd []api.CapabilityType, capDrop []api.CapabilityType) ([]string, []string) { - var ( - addCaps []string - dropCaps []string - ) - for _, cap := range capAdd { - addCaps = append(addCaps, string(cap)) - } - for _, cap := range capDrop { - dropCaps = append(dropCaps, string(cap)) - } - return addCaps, dropCaps -} diff --git a/pkg/kubelet/dockertools/docker_test.go b/pkg/kubelet/dockertools/docker_test.go index 65963bd30ff8c..c40fdf11e3fe8 100644 --- a/pkg/kubelet/dockertools/docker_test.go +++ b/pkg/kubelet/dockertools/docker_test.go @@ -23,6 +23,7 @@ import ( "testing" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" "github.com/GoogleCloudPlatform/kubernetes/pkg/credentialprovider" "github.com/GoogleCloudPlatform/kubernetes/pkg/types" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" @@ -394,6 +395,8 @@ func TestIsImagePresent(t *testing.T) { func TestGetRunningContainers(t *testing.T) { fakeDocker := &FakeDockerClient{} + fakeRecorder := &record.FakeRecorder{} + containerManager := NewDockerManager(fakeDocker, fakeRecorder) tests := []struct { containers map[string]*docker.Container inputIDs []string @@ -476,7 +479,7 @@ func TestGetRunningContainers(t *testing.T) { for _, test := range tests { fakeDocker.ContainerMap = test.containers fakeDocker.Err = test.err - if results, err := GetRunningContainers(fakeDocker, test.inputIDs); err == nil { + if results, err := containerManager.GetRunningContainers(test.inputIDs); err == nil { resultIDs := []string{} for _, result := range results { resultIDs = append(resultIDs, result.ID) diff --git a/pkg/kubelet/dockertools/manager.go b/pkg/kubelet/dockertools/manager.go new file mode 100644 index 0000000000000..3723253722d53 --- /dev/null +++ b/pkg/kubelet/dockertools/manager.go @@ -0,0 +1,489 @@ +/* +Copyright 2015 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dockertools + +import ( + "errors" + "fmt" + "io" + "io/ioutil" + "os" + "path" + "strconv" + "strings" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/capabilities" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" + kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container" + "github.com/GoogleCloudPlatform/kubernetes/pkg/types" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/fsouza/go-dockerclient" + "github.com/golang/glog" +) + +// Implements kubecontainer.ContainerRunner. +// TODO: Eventually DockerManager should implement kubecontainer.Runtime +// interface, and it should also add a cache to replace dockerCache. +type DockerManager struct { + client DockerInterface + recorder record.EventRecorder +} + +// Ensures DockerManager implements ConatinerRunner. +var _ kubecontainer.ContainerRunner = new(DockerManager) + +func NewDockerManager(client DockerInterface, recorder record.EventRecorder) *DockerManager { + return &DockerManager{client: client, recorder: recorder} +} + +// GetRecentDockerContainersWithNameAndUUID returns a list of dead docker containers which matches the name +// and uid given. +func (self *DockerManager) GetRecentDockerContainersWithNameAndUUID(podFullName string, uid types.UID, + containerName string) ([]*docker.Container, error) { + var result []*docker.Container + containers, err := self.client.ListContainers(docker.ListContainersOptions{All: true}) + if err != nil { + return nil, err + } + for _, dockerContainer := range containers { + if len(dockerContainer.Names) == 0 { + continue + } + dockerName, _, err := ParseDockerName(dockerContainer.Names[0]) + if err != nil { + continue + } + if dockerName.PodFullName != podFullName { + continue + } + if uid != "" && dockerName.PodUID != uid { + continue + } + if dockerName.ContainerName != containerName { + continue + } + inspectResult, _ := self.client.InspectContainer(dockerContainer.ID) + if inspectResult != nil && !inspectResult.State.Running && !inspectResult.State.Paused { + result = append(result, inspectResult) + } + } + return result, nil +} + +// GetKubeletDockerContainerLogs returns logs of a specific container. By +// default, it returns a snapshot of the container log. Set |follow| to true to +// stream the log. Set |follow| to false and specify the number of lines (e.g. +// "100" or "all") to tail the log. +// TODO: Make 'RawTerminal' option flagable. +func (self *DockerManager) GetKubeletDockerContainerLogs(containerID, tail string, follow bool, stdout, stderr io.Writer) (err error) { + opts := docker.LogsOptions{ + Container: containerID, + Stdout: true, + Stderr: true, + OutputStream: stdout, + ErrorStream: stderr, + Timestamps: true, + RawTerminal: false, + Follow: follow, + } + + if !follow { + opts.Tail = tail + } + + err = self.client.Logs(opts) + return +} + +var ( + // ErrNoContainersInPod is returned when there are no containers for a given pod + ErrNoContainersInPod = errors.New("no containers exist for this pod") + + // ErrNoPodInfraContainerInPod is returned when there is no pod infra container for a given pod + ErrNoPodInfraContainerInPod = errors.New("No pod infra container exists for this pod") + + // ErrContainerCannotRun is returned when a container is created, but cannot run properly + ErrContainerCannotRun = errors.New("Container cannot run") +) + +// Internal information kept for containers from inspection +type containerStatusResult struct { + status api.ContainerStatus + ip string + err error +} + +func (self *DockerManager) inspectContainer(dockerID, containerName, tPath string) *containerStatusResult { + result := containerStatusResult{api.ContainerStatus{}, "", nil} + + inspectResult, err := self.client.InspectContainer(dockerID) + + if err != nil { + result.err = err + return &result + } + if inspectResult == nil { + // Why did we not get an error? + return &result + } + + glog.V(3).Infof("Container inspect result: %+v", *inspectResult) + result.status = api.ContainerStatus{ + Name: containerName, + Image: inspectResult.Config.Image, + ImageID: DockerPrefix + inspectResult.Image, + ContainerID: DockerPrefix + dockerID, + } + + waiting := true + if inspectResult.State.Running { + result.status.State.Running = &api.ContainerStateRunning{ + StartedAt: util.NewTime(inspectResult.State.StartedAt), + } + if containerName == PodInfraContainerName && inspectResult.NetworkSettings != nil { + result.ip = inspectResult.NetworkSettings.IPAddress + } + waiting = false + } else if !inspectResult.State.FinishedAt.IsZero() { + reason := "" + // Note: An application might handle OOMKilled gracefully. + // In that case, the container is oom killed, but the exit + // code could be 0. + if inspectResult.State.OOMKilled { + reason = "OOM Killed" + } else { + reason = inspectResult.State.Error + } + result.status.State.Termination = &api.ContainerStateTerminated{ + ExitCode: inspectResult.State.ExitCode, + Reason: reason, + StartedAt: util.NewTime(inspectResult.State.StartedAt), + FinishedAt: util.NewTime(inspectResult.State.FinishedAt), + } + if tPath != "" { + path, found := inspectResult.Volumes[tPath] + if found { + data, err := ioutil.ReadFile(path) + if err != nil { + glog.Errorf("Error on reading termination-log %s: %v", path, err) + } else { + result.status.State.Termination.Message = string(data) + } + } + } + waiting = false + } + + if waiting { + // TODO(dchen1107): Separate issue docker/docker#8294 was filed + // TODO(dchen1107): Need to figure out why we are still waiting + // Check any issue to run container + result.status.State.Waiting = &api.ContainerStateWaiting{ + Reason: ErrContainerCannotRun.Error(), + } + } + + return &result +} + +// GetPodStatus returns docker related status for all containers in the pod as +// well as the infrastructure container. +func (self *DockerManager) GetPodStatus(pod *api.Pod) (*api.PodStatus, error) { + podFullName := kubecontainer.GetPodFullName(pod) + uid := pod.UID + manifest := pod.Spec + + var podStatus api.PodStatus + statuses := make(map[string]api.ContainerStatus) + + expectedContainers := make(map[string]api.Container) + for _, container := range manifest.Containers { + expectedContainers[container.Name] = container + } + expectedContainers[PodInfraContainerName] = api.Container{} + + containers, err := self.client.ListContainers(docker.ListContainersOptions{All: true}) + if err != nil { + return nil, err + } + + for _, value := range containers { + if len(value.Names) == 0 { + continue + } + dockerName, _, err := ParseDockerName(value.Names[0]) + if err != nil { + continue + } + if dockerName.PodFullName != podFullName { + continue + } + if uid != "" && dockerName.PodUID != uid { + continue + } + dockerContainerName := dockerName.ContainerName + c, found := expectedContainers[dockerContainerName] + terminationMessagePath := "" + if !found { + // TODO(dchen1107): should figure out why not continue here + // continue + } else { + terminationMessagePath = c.TerminationMessagePath + } + // We assume docker return us a list of containers in time order + if containerStatus, found := statuses[dockerContainerName]; found { + containerStatus.RestartCount += 1 + statuses[dockerContainerName] = containerStatus + continue + } + + result := self.inspectContainer(value.ID, dockerContainerName, terminationMessagePath) + if result.err != nil { + return nil, err + } + + // Add user container information + if dockerContainerName == PodInfraContainerName && + result.status.State.Running != nil { + // Found network container + podStatus.PodIP = result.ip + } else { + statuses[dockerContainerName] = result.status + } + } + + if len(statuses) == 0 && podStatus.PodIP == "" { + return nil, ErrNoContainersInPod + } + + // Not all containers expected are created, check if there are + // image related issues + if len(statuses) < len(manifest.Containers) { + var containerStatus api.ContainerStatus + for _, container := range manifest.Containers { + if _, found := statuses[container.Name]; found { + continue + } + + image := container.Image + // Check image is ready on the node or not + // TODO(dchen1107): docker/docker/issues/8365 to figure out if the image exists + _, err := self.client.InspectImage(image) + if err == nil { + containerStatus.State.Waiting = &api.ContainerStateWaiting{ + Reason: fmt.Sprintf("Image: %s is ready, container is creating", image), + } + } else if err == docker.ErrNoSuchImage { + containerStatus.State.Waiting = &api.ContainerStateWaiting{ + Reason: fmt.Sprintf("Image: %s is not ready on the node", image), + } + } else { + containerStatus.State.Waiting = &api.ContainerStateWaiting{ + Reason: "", + } + } + + statuses[container.Name] = containerStatus + } + } + + podStatus.ContainerStatuses = make([]api.ContainerStatus, 0) + for _, status := range statuses { + podStatus.ContainerStatuses = append(podStatus.ContainerStatuses, status) + } + + return &podStatus, nil +} + +func (self *DockerManager) GetRunningContainers(ids []string) ([]*docker.Container, error) { + result := []*docker.Container{} + if self.client == nil { + return nil, fmt.Errorf("unexpected nil docker client.") + } + for ix := range ids { + status, err := self.client.InspectContainer(ids[ix]) + if err != nil { + return nil, err + } + if status != nil && status.State.Running { + result = append(result, status) + } + } + return result, nil +} + +func (self *DockerManager) RunContainer(pod *api.Pod, container *api.Container, opts *kubecontainer.RunContainerOptions) (string, error) { + ref, err := kubecontainer.GenerateContainerRef(pod, container) + if err != nil { + glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err) + } + + dockerName := KubeletContainerName{ + PodFullName: kubecontainer.GetPodFullName(pod), + PodUID: pod.UID, + ContainerName: container.Name, + } + exposedPorts, portBindings := makePortsAndBindings(container) + + // TODO(vmarmol): Handle better. + // Cap hostname at 63 chars (specification is 64bytes which is 63 chars and the null terminating char). + const hostnameMaxLen = 63 + containerHostname := pod.Name + if len(containerHostname) > hostnameMaxLen { + containerHostname = containerHostname[:hostnameMaxLen] + } + dockerOpts := docker.CreateContainerOptions{ + Name: BuildDockerName(dockerName, container), + Config: &docker.Config{ + Env: opts.Envs, + ExposedPorts: exposedPorts, + Hostname: containerHostname, + Image: container.Image, + Memory: container.Resources.Limits.Memory().Value(), + CPUShares: milliCPUToShares(container.Resources.Limits.Cpu().MilliValue()), + WorkingDir: container.WorkingDir, + }, + } + + setEntrypointAndCommand(container, &dockerOpts) + + glog.V(3).Infof("Container %v/%v/%v: setting entrypoint \"%v\" and command \"%v\"", pod.Namespace, pod.Name, container.Name, dockerOpts.Config.Entrypoint, dockerOpts.Config.Cmd) + + dockerContainer, err := self.client.CreateContainer(dockerOpts) + if err != nil { + if ref != nil { + self.recorder.Eventf(ref, "failed", "Failed to create docker container with error: %v", err) + } + return "", err + } + + if ref != nil { + self.recorder.Eventf(ref, "created", "Created with docker id %v", dockerContainer.ID) + } + + // The reason we create and mount the log file in here (not in kubelet) is because + // the file's location depends on the ID of the container, and we need to create and + // mount the file before actually starting the container. + // TODO(yifan): Consider to pull this logic out since we might need to reuse it in + // other container runtime. + if opts.PodContainerDir != "" && len(container.TerminationMessagePath) != 0 { + containerLogPath := path.Join(opts.PodContainerDir, dockerContainer.ID) + fs, err := os.Create(containerLogPath) + if err != nil { + // TODO: Clean up the previouly created dir? return the error? + glog.Errorf("Error on creating termination-log file %q: %v", containerLogPath, err) + } else { + fs.Close() // Close immediately; we're just doing a `touch` here + b := fmt.Sprintf("%s:%s", containerLogPath, container.TerminationMessagePath) + opts.Binds = append(opts.Binds, b) + } + } + + privileged := false + if capabilities.Get().AllowPrivileged { + privileged = container.Privileged + } else if container.Privileged { + return "", fmt.Errorf("container requested privileged mode, but it is disallowed globally.") + } + + capAdd, capDrop := makeCapabilites(container.Capabilities.Add, container.Capabilities.Drop) + hc := &docker.HostConfig{ + PortBindings: portBindings, + Binds: opts.Binds, + NetworkMode: opts.NetMode, + IpcMode: opts.IpcMode, + Privileged: privileged, + CapAdd: capAdd, + CapDrop: capDrop, + } + if len(opts.DNS) > 0 { + hc.DNS = opts.DNS + } + if len(opts.DNSSearch) > 0 { + hc.DNSSearch = opts.DNSSearch + } + + if err = self.client.StartContainer(dockerContainer.ID, hc); err != nil { + if ref != nil { + self.recorder.Eventf(ref, "failed", + "Failed to start with docker id %v with error: %v", dockerContainer.ID, err) + } + return "", err + } + if ref != nil { + self.recorder.Eventf(ref, "started", "Started with docker id %v", dockerContainer.ID) + } + return dockerContainer.ID, nil +} + +func setEntrypointAndCommand(container *api.Container, opts *docker.CreateContainerOptions) { + if len(container.Command) != 0 { + opts.Config.Entrypoint = container.Command + } + if len(container.Args) != 0 { + opts.Config.Cmd = container.Args + } +} + +func makePortsAndBindings(container *api.Container) (map[docker.Port]struct{}, map[docker.Port][]docker.PortBinding) { + exposedPorts := map[docker.Port]struct{}{} + portBindings := map[docker.Port][]docker.PortBinding{} + for _, port := range container.Ports { + exteriorPort := port.HostPort + if exteriorPort == 0 { + // No need to do port binding when HostPort is not specified + continue + } + interiorPort := port.ContainerPort + // Some of this port stuff is under-documented voodoo. + // See http://stackoverflow.com/questions/20428302/binding-a-port-to-a-host-interface-using-the-rest-api + var protocol string + switch strings.ToUpper(string(port.Protocol)) { + case "UDP": + protocol = "/udp" + case "TCP": + protocol = "/tcp" + default: + glog.Warningf("Unknown protocol %q: defaulting to TCP", port.Protocol) + protocol = "/tcp" + } + dockerPort := docker.Port(strconv.Itoa(interiorPort) + protocol) + exposedPorts[dockerPort] = struct{}{} + portBindings[dockerPort] = []docker.PortBinding{ + { + HostPort: strconv.Itoa(exteriorPort), + HostIP: port.HostIP, + }, + } + } + return exposedPorts, portBindings +} + +func makeCapabilites(capAdd []api.CapabilityType, capDrop []api.CapabilityType) ([]string, []string) { + var ( + addCaps []string + dropCaps []string + ) + for _, cap := range capAdd { + addCaps = append(addCaps, string(cap)) + } + for _, cap := range capDrop { + dropCaps = append(dropCaps, string(cap)) + } + return addCaps, dropCaps +} diff --git a/pkg/kubelet/dockertools/runner_test.go b/pkg/kubelet/dockertools/manager_test.go similarity index 100% rename from pkg/kubelet/dockertools/runner_test.go rename to pkg/kubelet/dockertools/manager_test.go diff --git a/pkg/kubelet/dockertools/runner.go b/pkg/kubelet/dockertools/runner.go deleted file mode 100644 index 4e833a557e85e..0000000000000 --- a/pkg/kubelet/dockertools/runner.go +++ /dev/null @@ -1,148 +0,0 @@ -/* -Copyright 2015 Google Inc. All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package dockertools - -import ( - "fmt" - "os" - "path" - - "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - "github.com/GoogleCloudPlatform/kubernetes/pkg/capabilities" - "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" - kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container" - "github.com/fsouza/go-dockerclient" - "github.com/golang/glog" -) - -type DockerContainerRunner struct { - Client DockerInterface - Recorder record.EventRecorder -} - -func (r *DockerContainerRunner) RunContainer(pod *api.Pod, container *api.Container, opts *kubecontainer.RunContainerOptions) (string, error) { - ref, err := kubecontainer.GenerateContainerRef(pod, container) - if err != nil { - glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err) - } - - dockerName := KubeletContainerName{ - PodFullName: kubecontainer.GetPodFullName(pod), - PodUID: pod.UID, - ContainerName: container.Name, - } - exposedPorts, portBindings := makePortsAndBindings(container) - - // TODO(vmarmol): Handle better. - // Cap hostname at 63 chars (specification is 64bytes which is 63 chars and the null terminating char). - const hostnameMaxLen = 63 - containerHostname := pod.Name - if len(containerHostname) > hostnameMaxLen { - containerHostname = containerHostname[:hostnameMaxLen] - } - dockerOpts := docker.CreateContainerOptions{ - Name: BuildDockerName(dockerName, container), - Config: &docker.Config{ - Env: opts.Envs, - ExposedPorts: exposedPorts, - Hostname: containerHostname, - Image: container.Image, - Memory: container.Resources.Limits.Memory().Value(), - CPUShares: milliCPUToShares(container.Resources.Limits.Cpu().MilliValue()), - WorkingDir: container.WorkingDir, - }, - } - - setEntrypointAndCommand(container, &dockerOpts) - - glog.V(3).Infof("Container %v/%v/%v: setting entrypoint \"%v\" and command \"%v\"", pod.Namespace, pod.Name, container.Name, dockerOpts.Config.Entrypoint, dockerOpts.Config.Cmd) - - dockerContainer, err := r.Client.CreateContainer(dockerOpts) - if err != nil { - if ref != nil { - r.Recorder.Eventf(ref, "failed", "Failed to create docker container with error: %v", err) - } - return "", err - } - - if ref != nil { - r.Recorder.Eventf(ref, "created", "Created with docker id %v", dockerContainer.ID) - } - - // The reason we create and mount the log file in here (not in kubelet) is because - // the file's location depends on the ID of the container, and we need to create and - // mount the file before actually starting the container. - // TODO(yifan): Consider to pull this logic out since we might need to reuse it in - // other container runtime. - if opts.PodContainerDir != "" && len(container.TerminationMessagePath) != 0 { - containerLogPath := path.Join(opts.PodContainerDir, dockerContainer.ID) - fs, err := os.Create(containerLogPath) - if err != nil { - // TODO: Clean up the previouly created dir? return the error? - glog.Errorf("Error on creating termination-log file %q: %v", containerLogPath, err) - } else { - fs.Close() // Close immediately; we're just doing a `touch` here - b := fmt.Sprintf("%s:%s", containerLogPath, container.TerminationMessagePath) - opts.Binds = append(opts.Binds, b) - } - } - - privileged := false - if capabilities.Get().AllowPrivileged { - privileged = container.Privileged - } else if container.Privileged { - return "", fmt.Errorf("container requested privileged mode, but it is disallowed globally.") - } - - capAdd, capDrop := makeCapabilites(container.Capabilities.Add, container.Capabilities.Drop) - hc := &docker.HostConfig{ - PortBindings: portBindings, - Binds: opts.Binds, - NetworkMode: opts.NetMode, - IpcMode: opts.IpcMode, - Privileged: privileged, - CapAdd: capAdd, - CapDrop: capDrop, - } - if len(opts.DNS) > 0 { - hc.DNS = opts.DNS - } - if len(opts.DNSSearch) > 0 { - hc.DNSSearch = opts.DNSSearch - } - - if err = r.Client.StartContainer(dockerContainer.ID, hc); err != nil { - if ref != nil { - r.Recorder.Eventf(ref, "failed", - "Failed to start with docker id %v with error: %v", dockerContainer.ID, err) - } - return "", err - } - if ref != nil { - r.Recorder.Eventf(ref, "started", "Started with docker id %v", dockerContainer.ID) - } - return dockerContainer.ID, nil -} - -func setEntrypointAndCommand(container *api.Container, opts *docker.CreateContainerOptions) { - if len(container.Command) != 0 { - opts.Config.Entrypoint = container.Command - } - if len(container.Args) != 0 { - opts.Config.Cmd = container.Args - } -} diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 160d00b34ee2d..1ba993aadab2a 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -212,10 +212,7 @@ func NewMainKubelet( return nil, fmt.Errorf("failed to initialize image manager: %v", err) } statusManager := newStatusManager(kubeClient) - containerRunner := &dockertools.DockerContainerRunner{ - Client: dockerClient, - Recorder: recorder, - } + containerManager := dockertools.NewDockerManager(dockerClient, recorder) klet := &Kubelet{ hostname: hostname, @@ -245,7 +242,7 @@ func NewMainKubelet( statusManager: statusManager, cloud: cloud, nodeRef: nodeRef, - containerRunner: containerRunner, + containerManager: containerManager, } klet.podManager = newBasicPodManager(klet.kubeClient) @@ -362,14 +359,14 @@ type Kubelet struct { // Syncs pods statuses with apiserver; also used as a cache of statuses. statusManager *statusManager - // Knows how to run a container in a pod - containerRunner kubecontainer.ContainerRunner - //Cloud provider interface cloud cloudprovider.Interface // Reference to this node. nodeRef *api.ObjectReference + + // Manage containers. + containerManager *dockertools.DockerManager } // getRootDir returns the full path to the directory under which kubelet can @@ -665,7 +662,7 @@ func (kl *Kubelet) runContainer(pod *api.Pod, container *api.Container, podVolum return "", err } - id, err := kl.containerRunner.RunContainer(pod, container, opts) + id, err := kl.containerManager.RunContainer(pod, container, opts) if err != nil { return "", err } @@ -1002,7 +999,7 @@ func (kl *Kubelet) makePodDataDirs(pod *api.Pod) error { func (kl *Kubelet) shouldContainerBeRestarted(container *api.Container, pod *api.Pod) bool { podFullName := kubecontainer.GetPodFullName(pod) // Check RestartPolicy for dead container - recentContainers, err := dockertools.GetRecentDockerContainersWithNameAndUUID(kl.dockerClient, podFullName, pod.UID, container.Name) + recentContainers, err := kl.containerManager.GetRecentDockerContainersWithNameAndUUID(podFullName, pod.UID, container.Name) if err != nil { glog.Errorf("Error listing recent containers for pod %q: %v", podFullName, err) // TODO(dawnchen): error handling here? @@ -1497,7 +1494,7 @@ func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metric } } - running, err := dockertools.GetRunningContainers(kl.dockerClient, killed) + running, err := kl.containerManager.GetRunningContainers(killed) if err != nil { glog.Errorf("Failed to poll container state: %v", err) return err @@ -1697,7 +1694,7 @@ func (kl *Kubelet) GetKubeletContainerLogs(podFullName, containerName, tail stri if err != nil { return err } - return dockertools.GetKubeletDockerContainerLogs(kl.dockerClient, dockerContainerID, tail, follow, stdout, stderr) + return kl.containerManager.GetKubeletDockerContainerLogs(dockerContainerID, tail, follow, stdout, stderr) } // GetHostname Returns the hostname as the kubelet sees it. @@ -1946,7 +1943,7 @@ func (kl *Kubelet) generatePodStatusByPod(pod *api.Pod) (api.PodStatus, error) { glog.V(3).Infof("Generating status for %q", podFullName) spec := &pod.Spec - podStatus, err := dockertools.GetDockerPodStatus(kl.dockerClient, *spec, podFullName, pod.UID) + podStatus, err := kl.containerManager.GetPodStatus(pod) if err != nil { // Error handling diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index de64c99cf6954..5d36da0e961c9 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -113,7 +113,7 @@ func newTestKubelet(t *testing.T) *TestKubelet { podManager, fakeMirrorClient := newFakePodManager() kubelet.podManager = podManager kubelet.containerRefManager = kubecontainer.NewRefManager() - kubelet.containerRunner = &dockertools.DockerContainerRunner{fakeDocker, fakeRecorder} + kubelet.containerManager = dockertools.NewDockerManager(fakeDocker, fakeRecorder) return &TestKubelet{kubelet, fakeDocker, mockCadvisor, fakeKubeClient, waitGroup, fakeMirrorClient} } diff --git a/pkg/kubelet/runonce_test.go b/pkg/kubelet/runonce_test.go index 92125f13b10e3..d00fba3aa9011 100644 --- a/pkg/kubelet/runonce_test.go +++ b/pkg/kubelet/runonce_test.go @@ -140,7 +140,7 @@ func TestRunOnce(t *testing.T) { t: t, } kb.dockerPuller = &dockertools.FakeDockerPuller{} - kb.containerRunner = &dockertools.DockerContainerRunner{kb.dockerClient, kb.recorder} + kb.containerManager = dockertools.NewDockerManager(kb.dockerClient, kb.recorder) results, err := kb.runOnce([]api.Pod{ { ObjectMeta: api.ObjectMeta{