From 185a97b0374a80e7b02ac6f97225b63bc26342d9 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Tue, 15 Jul 2014 13:26:56 -0400 Subject: [PATCH] Make container lookup in the Kubelet cleaner Reduce duplicate calls to list lookups for parallel go routines. --- pkg/kubelet/docker.go | 72 ++++++++++++ pkg/kubelet/kubelet.go | 215 +++++++++--------------------------- pkg/kubelet/kubelet_test.go | 42 +++---- 3 files changed, 147 insertions(+), 182 deletions(-) diff --git a/pkg/kubelet/docker.go b/pkg/kubelet/docker.go index 33b26e6b7c16b..c16e982e68d4e 100644 --- a/pkg/kubelet/docker.go +++ b/pkg/kubelet/docker.go @@ -71,6 +71,78 @@ func (p dockerPuller) Pull(image string) error { return p.client.PullImage(opts, docker.AuthConfiguration{}) } +// DockerContainers is a map of containers +type DockerContainers map[DockerID]*docker.APIContainers + +func (c DockerContainers) FindPodContainer(manifestID, containerName string) (*docker.APIContainers, bool) { + for _, dockerContainer := range c { + dockerManifestID, dockerContainerName := parseDockerName(dockerContainer.Names[0]) + if dockerManifestID == manifestID && dockerContainerName == containerName { + return dockerContainer, true + } + } + return nil, false +} + +func (c DockerContainers) FindContainersByPodFullName(manifestID string) map[string]*docker.APIContainers { + containers := make(map[string]*docker.APIContainers) + + for _, dockerContainer := range c { + dockerManifestID, dockerContainerName := parseDockerName(dockerContainer.Names[0]) + if dockerManifestID == manifestID { + containers[dockerContainerName] = dockerContainer + } + } + return containers +} + +// GetKubeletDockerContainers returns a map of docker containers that we manage. The map key is the docker container ID +func getKubeletDockerContainers(client DockerInterface) (DockerContainers, error) { + result := make(DockerContainers) + containers, err := client.ListContainers(docker.ListContainersOptions{}) + if err != nil { + return nil, err + } + for i := range containers { + container := &containers[i] + // Skip containers that we didn't create to allow users to manually + // spin up their own containers if they want. + if !strings.HasPrefix(container.Names[0], "/"+containerNamePrefix+"--") { + continue + } + result[DockerID(container.ID)] = container + } + return result, nil +} + +// GetDockerPodInfo returns docker info for all containers in the pod/manifest. +func getDockerPodInfo(client DockerInterface, manifestID string) (api.PodInfo, error) { + info := api.PodInfo{} + + containers, err := client.ListContainers(docker.ListContainersOptions{}) + if err != nil { + return nil, err + } + + for _, value := range containers { + dockerManifestID, dockerContainerName := parseDockerName(value.Names[0]) + if dockerManifestID != manifestID { + continue + } + inspectResult, err := client.InspectContainer(value.ID) + if err != nil { + return nil, err + } + if inspectResult == nil { + // Why did we not get an error? + info[dockerContainerName] = docker.Container{} + } else { + info[dockerContainerName] = *inspectResult + } + } + return info, nil +} + // Converts "-" to "_-_" and "_" to "___" so that we can use "--" to meaningfully separate parts of a docker name. func escapeDash(in string) (out string) { out = strings.Replace(in, "_", "___", -1) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 6e5bb93f3289f..6909411121ee2 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -170,52 +170,6 @@ func (kl *Kubelet) LogEvent(event *api.Event) error { return err } -// Return a map of docker containers that we manage. The map key is the docker container ID -func (kl *Kubelet) getDockerContainers() (map[DockerID]docker.APIContainers, error) { - result := map[DockerID]docker.APIContainers{} - containerList, err := kl.DockerClient.ListContainers(docker.ListContainersOptions{}) - if err != nil { - return nil, err - } - for _, value := range containerList { - // Skip containers that we didn't create to allow users to manually - // spin up their own containers if they want. - if !strings.HasPrefix(value.Names[0], "/"+containerNamePrefix+"--") { - continue - } - result[DockerID(value.ID)] = value - } - return result, nil -} - -// Return Docker's container ID for a manifest's container. Returns an empty string if it doesn't exist. -func (kl *Kubelet) getContainerID(manifest *api.ContainerManifest, container *api.Container) (DockerID, error) { - dockerContainers, err := kl.getDockerContainers() - if err != nil { - return "", err - } - for id, dockerContainer := range dockerContainers { - manifestID, containerName := parseDockerName(dockerContainer.Names[0]) - if manifestID == manifest.ID && containerName == container.Name { - return DockerID(id), nil - } - } - return "", nil -} - -func (kl *Kubelet) getContainer(ID DockerID) (*docker.APIContainers, error) { - dockerContainers, err := kl.getDockerContainers() - if err != nil { - return nil, err - } - for dockerID, dockerContainer := range dockerContainers { - if dockerID == ID { - return &dockerContainer, nil - } - } - return nil, nil -} - func makeEnvironmentVariables(container *api.Container) []string { var result []string for _, value := range container.Env { @@ -571,11 +525,6 @@ func (kl *Kubelet) WatchEtcd(watchChannel <-chan *etcd.Response, updateChannel c const networkContainerName = "net" -// Return the docker ID for a manifest's network container. Returns an empty string if it doesn't exist. -func (kl *Kubelet) getNetworkContainerID(manifest *api.ContainerManifest) (DockerID, error) { - return kl.getContainerID(manifest, &api.Container{Name: networkContainerName}) -} - // Create a network container for a manifest. Returns the docker container ID of the newly created container. func (kl *Kubelet) createNetworkContainer(manifest *api.ContainerManifest) (DockerID, error) { var ports []api.Port @@ -594,65 +543,56 @@ func (kl *Kubelet) createNetworkContainer(manifest *api.ContainerManifest) (Dock return kl.runContainer(manifest, container, "") } -func (kl *Kubelet) syncManifest(manifest *api.ContainerManifest, keepChannel chan<- DockerID) error { +func (kl *Kubelet) syncManifest(manifest *api.ContainerManifest, dockerContainers DockerContainers, keepChannel chan<- DockerID) error { // Make sure we have a network container - netID, err := kl.getNetworkContainerID(manifest) - if err != nil { - glog.Errorf("Failed to introspect network container. (%v) Skipping manifest %s", err, manifest.ID) - return err - } - if netID == "" { - glog.Infof("Network container doesn't exist, creating") - netID, err = kl.createNetworkContainer(manifest) + var netID DockerID + if networkDockerContainer, found := dockerContainers.FindPodContainer(manifest.ID, networkContainerName); found { + netID = DockerID(networkDockerContainer.ID) + } else { + dockerNetworkID, err := kl.createNetworkContainer(manifest) if err != nil { glog.Errorf("Failed to introspect network container. (%v) Skipping manifest %s", err, manifest.ID) return err } + netID = dockerNetworkID } keepChannel <- netID + for _, container := range manifest.Containers { - containerID, err := kl.getContainerID(manifest, &container) - if err != nil { - glog.Errorf("Error finding container: %v skipping manifest %s container %s.", err, manifest.ID, container.Name) - continue - } - if containerID == "" { - glog.Infof("%+v doesn't exist, creating", container) - kl.DockerPuller.Pull(container.Image) - if err != nil { - glog.Errorf("Failed to create container: %v skipping manifest %s container %s.", err, manifest.ID, container.Name) - continue - } - containerID, err = kl.runContainer(manifest, &container, "container:"+string(netID)) - if err != nil { - // TODO(bburns) : Perhaps blacklist a container after N failures? - glog.Errorf("Error running manifest %s container %s: %v", manifest.ID, container.Name, err) - continue - } - } else { + if dockerContainer, found := dockerContainers.FindPodContainer(manifest.ID, container.Name); found { + containerID := DockerID(dockerContainer.ID) glog.Infof("manifest %s container %s exists as %v", manifest.ID, container.Name, containerID) glog.V(1).Infof("manifest %s container %s exists as %v", manifest.ID, container.Name, containerID) - dockerContainer, err := kl.getContainer(containerID) + // TODO: This should probably be separated out into a separate goroutine. healthy, err := kl.healthy(container, dockerContainer) if err != nil { glog.V(1).Infof("health check errored: %v", err) continue } - if healthy != health.Healthy { - glog.V(1).Infof("manifest %s container %s is unhealthy.", manifest.ID, container.Name) - if err != nil { - glog.V(1).Infof("Failed to get container info %v, for %s", err, containerID) - continue - } - err = kl.killContainer(*dockerContainer) - if err != nil { - glog.V(1).Infof("Failed to kill container %s: %v", containerID, err) - continue - } - containerID, err = kl.runContainer(manifest, &container, "container:"+string(netID)) + if healthy == health.Healthy { + keepChannel <- containerID + continue + } + + glog.V(1).Infof("manifest %s container %s is unhealthy %d.", manifest.ID, container.Name, healthy) + if err := kl.killContainer(*dockerContainer); err != nil { + glog.V(1).Infof("Failed to kill container %s: %v", containerID, err) + continue } } + + glog.Infof("%+v doesn't exist, creating", container) + if err := kl.DockerPuller.Pull(container.Image); err != nil { + glog.Errorf("Failed to create container: %v skipping manifest %s container %s.", err, manifest.ID, container.Name) + continue + } + containerID, err := kl.runContainer(manifest, &container, "container:"+string(netID)) + if err != nil { + // TODO(bburns) : Perhaps blacklist a container after N failures? + glog.Errorf("Error running manifest %s container %s: %v", manifest.ID, container.Name, err) + continue + } keepChannel <- containerID } return nil @@ -663,11 +603,16 @@ type empty struct{} // SyncManifests synchronizes the configured list of containers (desired state) with the host current state. func (kl *Kubelet) SyncManifests(config []api.ContainerManifest) error { glog.Infof("Desired: %+v", config) - var err error dockerIdsToKeep := map[DockerID]empty{} keepChannel := make(chan DockerID, defaultChanSize) waitGroup := sync.WaitGroup{} + dockerContainers, err := getKubeletDockerContainers(kl.DockerClient) + if err != nil { + glog.Errorf("Error listing containers %#v", dockerContainers) + return err + } + // Check for any containers that need starting for ix := range config { waitGroup.Add(1) @@ -676,7 +621,7 @@ func (kl *Kubelet) SyncManifests(config []api.ContainerManifest) error { defer waitGroup.Done() // necessary to dereference by index here b/c otherwise the shared value // in the for each is re-used. - err := kl.syncManifest(&config[index], keepChannel) + err := kl.syncManifest(&config[index], dockerContainers, keepChannel) if err != nil { glog.Errorf("Error syncing manifest: %v skipping.", err) } @@ -696,7 +641,7 @@ func (kl *Kubelet) SyncManifests(config []api.ContainerManifest) error { <-ch // Kill any containers we don't need - existingContainers, err := kl.getDockerContainers() + existingContainers, err := getKubeletDockerContainers(kl.DockerClient) if err != nil { glog.Errorf("Error listing containers: %v", err) return err @@ -704,7 +649,7 @@ func (kl *Kubelet) SyncManifests(config []api.ContainerManifest) error { for id, container := range existingContainers { if _, ok := dockerIdsToKeep[id]; !ok { glog.Infof("Killing: %s", id) - err = kl.killContainer(container) + err = kl.killContainer(*container) if err != nil { glog.Errorf("Error killing container: %v", err) } @@ -782,67 +727,6 @@ func (kl *Kubelet) syncLoop(updateChannel <-chan manifestUpdate, handler SyncHan } } -// getContainerIDFromName looks at the list of containers on the machine and returns the ID of the container whose name -// matches 'name'. It returns the name of the container, or empty string, if the container isn't found. -// it returns true if the container is found, false otherwise, and any error that occurs. -// TODO: This functions exists to support GetContainerInfo and GetContainerStats -// It should be removed once those two functions start taking proper pod.IDs -func (kl *Kubelet) getContainerIDFromName(name string) (DockerID, bool, error) { - containerList, err := kl.DockerClient.ListContainers(docker.ListContainersOptions{}) - if err != nil { - return "", false, err - } - for _, value := range containerList { - if strings.Contains(value.Names[0], name) { - return DockerID(value.ID), true, nil - } - } - return "", false, nil -} - -// GetPodInfo returns docker info for all containers in the pod/manifest. -func (kl *Kubelet) GetPodInfo(podID string) (api.PodInfo, error) { - info := api.PodInfo{} - - containerList, err := kl.DockerClient.ListContainers(docker.ListContainersOptions{}) - if err != nil { - return nil, err - } - - for _, value := range containerList { - manifestID, containerName := parseDockerName(value.Names[0]) - if manifestID != podID { - continue - } - inspectResult, err := kl.DockerClient.InspectContainer(value.ID) - if err != nil { - return nil, err - } - if inspectResult == nil { - // Why did we not get an error? - info[containerName] = docker.Container{} - } else { - info[containerName] = *inspectResult - } - } - return info, nil -} - -// Returns the docker id corresponding to pod-id-container-name pair. -func (kl *Kubelet) getDockerIDFromPodIDAndContainerName(podID, containerName string) (DockerID, error) { - containerList, err := kl.DockerClient.ListContainers(docker.ListContainersOptions{}) - if err != nil { - return "", err - } - for _, value := range containerList { - manifestID, cName := parseDockerName(value.Names[0]) - if manifestID == podID && cName == containerName { - return DockerID(value.ID), nil - } - } - return "", errors.New("couldn't find container") -} - func getCadvisorContainerInfoRequest(req *info.ContainerInfoRequest) *info.ContainerInfoRequest { ret := &info.ContainerInfoRequest{ NumStats: req.NumStats, @@ -864,16 +748,25 @@ func (kl *Kubelet) statsFromContainerPath(containerPath string, req *info.Contai return cinfo, nil } +// GetPodInfo returns information from Docker about the containers in a pod +func (kl *Kubelet) GetPodInfo(manifestID string) (api.PodInfo, error) { + return getDockerPodInfo(kl.DockerClient, manifestID) +} + // GetContainerInfo returns stats (from Cadvisor) for a container. -func (kl *Kubelet) GetContainerInfo(podID, containerName string, req *info.ContainerInfoRequest) (*info.ContainerInfo, error) { +func (kl *Kubelet) GetContainerInfo(manifestID, containerName string, req *info.ContainerInfoRequest) (*info.ContainerInfo, error) { if kl.CadvisorClient == nil { return nil, nil } - dockerID, err := kl.getDockerIDFromPodIDAndContainerName(podID, containerName) - if err != nil || len(dockerID) == 0 { + dockerContainers, err := getKubeletDockerContainers(kl.DockerClient) + if err != nil { return nil, err } - return kl.statsFromContainerPath(fmt.Sprintf("/docker/%s", string(dockerID)), req) + dockerContainer, found := dockerContainers.FindPodContainer(manifestID, containerName) + if !found { + return nil, errors.New("couldn't find container") + } + return kl.statsFromContainerPath(fmt.Sprintf("/docker/%s", dockerContainer.ID), req) } // GetMachineStats returns stats (from Cadvisor) of current machine. diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index dbfb884a3234e..1777e6fec4c39 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -145,13 +145,7 @@ func TestContainerManifestNaming(t *testing.T) { } func TestGetContainerID(t *testing.T) { - kubelet, _, fakeDocker := makeTestKubelet(t) - manifest := api.ContainerManifest{ - ID: "qux", - } - container := api.Container{ - Name: "foo", - } + _, _, fakeDocker := makeTestKubelet(t) fakeDocker.containerList = []docker.APIContainers{ { ID: "foobar", @@ -166,21 +160,24 @@ func TestGetContainerID(t *testing.T) { ID: "foobar", } - id, err := kubelet.getContainerID(&manifest, &container) - verifyCalls(t, fakeDocker, []string{"list"}) - if id == "" { - t.Errorf("Failed to find container %#v", container) - } + dockerContainers, err := getKubeletDockerContainers(fakeDocker) if err != nil { - t.Errorf("Unexpected error: %#v", err) + t.Errorf("Expected no error, Got %#v", err) + } + if len(dockerContainers) != 2 { + t.Errorf("Expected %#v, Got %#v", fakeDocker.containerList, dockerContainers) + } + verifyCalls(t, fakeDocker, []string{"list"}) + dockerContainer, found := dockerContainers.FindPodContainer("qux", "foo") + if dockerContainer == nil || !found { + t.Errorf("Failed to find container %#v", dockerContainer) } fakeDocker.clearCalls() - missingManifest := api.ContainerManifest{ID: "foobar"} - id, err = kubelet.getContainerID(&missingManifest, &container) - verifyCalls(t, fakeDocker, []string{"list"}) - if id != "" { - t.Errorf("Failed to not find container %#v", missingManifest) + dockerContainer, found = dockerContainers.FindPodContainer("foobar", "foo") + verifyCalls(t, fakeDocker, []string{}) + if dockerContainer != nil || found { + t.Errorf("Should not have found container %#v", dockerContainer) } } @@ -383,7 +380,7 @@ func TestSyncManifestsDoesNothing(t *testing.T) { }, }) expectNoError(t, err) - verifyCalls(t, fakeDocker, []string{"list", "list", "list", "list"}) + verifyCalls(t, fakeDocker, []string{"list", "list"}) } func TestSyncManifestsDeletes(t *testing.T) { @@ -406,7 +403,7 @@ func TestSyncManifestsDeletes(t *testing.T) { } err := kubelet.SyncManifests([]api.ContainerManifest{}) expectNoError(t, err) - verifyCalls(t, fakeDocker, []string{"list", "stop", "stop"}) + verifyCalls(t, fakeDocker, []string{"list", "list", "stop", "stop"}) // A map interation is used to delete containers, so must not depend on // order here. @@ -455,7 +452,7 @@ func TestSyncManifestsUnhealthy(t *testing.T) { }, }}) expectNoError(t, err) - verifyCalls(t, fakeDocker, []string{"list", "list", "list", "stop", "create", "start", "list"}) + verifyCalls(t, fakeDocker, []string{"list", "stop", "create", "start", "list"}) // A map interation is used to delete containers, so must not depend on // order here. @@ -993,6 +990,9 @@ func TestGetContainerStats(t *testing.T) { if err != nil { t.Errorf("unexpected error: %v", err) } + if stats == nil { + t.Fatalf("stats should not be nil") + } if stats.StatsPercentiles.MaxMemoryUsage != containerInfo.StatsPercentiles.MaxMemoryUsage { t.Errorf("wrong max memory usage") }