Skip to content

Commit

Permalink
Make container lookup in the Kubelet cleaner
Browse files Browse the repository at this point in the history
Reduce duplicate calls to list lookups for parallel go routines.
  • Loading branch information
smarterclayton committed Jul 16, 2014
1 parent 8ddc339 commit 185a97b
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 182 deletions.
72 changes: 72 additions & 0 deletions pkg/kubelet/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,78 @@ func (p dockerPuller) Pull(image string) error {
return p.client.PullImage(opts, docker.AuthConfiguration{})
}

// DockerContainers is a map of containers
type DockerContainers map[DockerID]*docker.APIContainers

func (c DockerContainers) FindPodContainer(manifestID, containerName string) (*docker.APIContainers, bool) {
for _, dockerContainer := range c {
dockerManifestID, dockerContainerName := parseDockerName(dockerContainer.Names[0])
if dockerManifestID == manifestID && dockerContainerName == containerName {
return dockerContainer, true
}
}
return nil, false
}

func (c DockerContainers) FindContainersByPodFullName(manifestID string) map[string]*docker.APIContainers {
containers := make(map[string]*docker.APIContainers)

for _, dockerContainer := range c {
dockerManifestID, dockerContainerName := parseDockerName(dockerContainer.Names[0])
if dockerManifestID == manifestID {
containers[dockerContainerName] = dockerContainer
}
}
return containers
}

// GetKubeletDockerContainers returns a map of docker containers that we manage. The map key is the docker container ID
func getKubeletDockerContainers(client DockerInterface) (DockerContainers, error) {
result := make(DockerContainers)
containers, err := client.ListContainers(docker.ListContainersOptions{})
if err != nil {
return nil, err
}
for i := range containers {
container := &containers[i]
// Skip containers that we didn't create to allow users to manually
// spin up their own containers if they want.
if !strings.HasPrefix(container.Names[0], "/"+containerNamePrefix+"--") {
continue
}
result[DockerID(container.ID)] = container
}
return result, nil
}

// GetDockerPodInfo returns docker info for all containers in the pod/manifest.
func getDockerPodInfo(client DockerInterface, manifestID string) (api.PodInfo, error) {
info := api.PodInfo{}

containers, err := client.ListContainers(docker.ListContainersOptions{})
if err != nil {
return nil, err
}

for _, value := range containers {
dockerManifestID, dockerContainerName := parseDockerName(value.Names[0])
if dockerManifestID != manifestID {
continue
}
inspectResult, err := client.InspectContainer(value.ID)
if err != nil {
return nil, err
}
if inspectResult == nil {
// Why did we not get an error?
info[dockerContainerName] = docker.Container{}
} else {
info[dockerContainerName] = *inspectResult
}
}
return info, nil
}

// Converts "-" to "_-_" and "_" to "___" so that we can use "--" to meaningfully separate parts of a docker name.
func escapeDash(in string) (out string) {
out = strings.Replace(in, "_", "___", -1)
Expand Down
215 changes: 54 additions & 161 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,52 +170,6 @@ func (kl *Kubelet) LogEvent(event *api.Event) error {
return err
}

// Return a map of docker containers that we manage. The map key is the docker container ID
func (kl *Kubelet) getDockerContainers() (map[DockerID]docker.APIContainers, error) {
result := map[DockerID]docker.APIContainers{}
containerList, err := kl.DockerClient.ListContainers(docker.ListContainersOptions{})
if err != nil {
return nil, err
}
for _, value := range containerList {
// Skip containers that we didn't create to allow users to manually
// spin up their own containers if they want.
if !strings.HasPrefix(value.Names[0], "/"+containerNamePrefix+"--") {
continue
}
result[DockerID(value.ID)] = value
}
return result, nil
}

// Return Docker's container ID for a manifest's container. Returns an empty string if it doesn't exist.
func (kl *Kubelet) getContainerID(manifest *api.ContainerManifest, container *api.Container) (DockerID, error) {
dockerContainers, err := kl.getDockerContainers()
if err != nil {
return "", err
}
for id, dockerContainer := range dockerContainers {
manifestID, containerName := parseDockerName(dockerContainer.Names[0])
if manifestID == manifest.ID && containerName == container.Name {
return DockerID(id), nil
}
}
return "", nil
}

func (kl *Kubelet) getContainer(ID DockerID) (*docker.APIContainers, error) {
dockerContainers, err := kl.getDockerContainers()
if err != nil {
return nil, err
}
for dockerID, dockerContainer := range dockerContainers {
if dockerID == ID {
return &dockerContainer, nil
}
}
return nil, nil
}

func makeEnvironmentVariables(container *api.Container) []string {
var result []string
for _, value := range container.Env {
Expand Down Expand Up @@ -571,11 +525,6 @@ func (kl *Kubelet) WatchEtcd(watchChannel <-chan *etcd.Response, updateChannel c

const networkContainerName = "net"

// Return the docker ID for a manifest's network container. Returns an empty string if it doesn't exist.
func (kl *Kubelet) getNetworkContainerID(manifest *api.ContainerManifest) (DockerID, error) {
return kl.getContainerID(manifest, &api.Container{Name: networkContainerName})
}

// Create a network container for a manifest. Returns the docker container ID of the newly created container.
func (kl *Kubelet) createNetworkContainer(manifest *api.ContainerManifest) (DockerID, error) {
var ports []api.Port
Expand All @@ -594,65 +543,56 @@ func (kl *Kubelet) createNetworkContainer(manifest *api.ContainerManifest) (Dock
return kl.runContainer(manifest, container, "")
}

func (kl *Kubelet) syncManifest(manifest *api.ContainerManifest, keepChannel chan<- DockerID) error {
func (kl *Kubelet) syncManifest(manifest *api.ContainerManifest, dockerContainers DockerContainers, keepChannel chan<- DockerID) error {
// Make sure we have a network container
netID, err := kl.getNetworkContainerID(manifest)
if err != nil {
glog.Errorf("Failed to introspect network container. (%v) Skipping manifest %s", err, manifest.ID)
return err
}
if netID == "" {
glog.Infof("Network container doesn't exist, creating")
netID, err = kl.createNetworkContainer(manifest)
var netID DockerID
if networkDockerContainer, found := dockerContainers.FindPodContainer(manifest.ID, networkContainerName); found {
netID = DockerID(networkDockerContainer.ID)
} else {
dockerNetworkID, err := kl.createNetworkContainer(manifest)
if err != nil {
glog.Errorf("Failed to introspect network container. (%v) Skipping manifest %s", err, manifest.ID)
return err
}
netID = dockerNetworkID
}
keepChannel <- netID

for _, container := range manifest.Containers {
containerID, err := kl.getContainerID(manifest, &container)
if err != nil {
glog.Errorf("Error finding container: %v skipping manifest %s container %s.", err, manifest.ID, container.Name)
continue
}
if containerID == "" {
glog.Infof("%+v doesn't exist, creating", container)
kl.DockerPuller.Pull(container.Image)
if err != nil {
glog.Errorf("Failed to create container: %v skipping manifest %s container %s.", err, manifest.ID, container.Name)
continue
}
containerID, err = kl.runContainer(manifest, &container, "container:"+string(netID))
if err != nil {
// TODO(bburns) : Perhaps blacklist a container after N failures?
glog.Errorf("Error running manifest %s container %s: %v", manifest.ID, container.Name, err)
continue
}
} else {
if dockerContainer, found := dockerContainers.FindPodContainer(manifest.ID, container.Name); found {
containerID := DockerID(dockerContainer.ID)
glog.Infof("manifest %s container %s exists as %v", manifest.ID, container.Name, containerID)
glog.V(1).Infof("manifest %s container %s exists as %v", manifest.ID, container.Name, containerID)
dockerContainer, err := kl.getContainer(containerID)

// TODO: This should probably be separated out into a separate goroutine.
healthy, err := kl.healthy(container, dockerContainer)
if err != nil {
glog.V(1).Infof("health check errored: %v", err)
continue
}
if healthy != health.Healthy {
glog.V(1).Infof("manifest %s container %s is unhealthy.", manifest.ID, container.Name)
if err != nil {
glog.V(1).Infof("Failed to get container info %v, for %s", err, containerID)
continue
}
err = kl.killContainer(*dockerContainer)
if err != nil {
glog.V(1).Infof("Failed to kill container %s: %v", containerID, err)
continue
}
containerID, err = kl.runContainer(manifest, &container, "container:"+string(netID))
if healthy == health.Healthy {
keepChannel <- containerID
continue
}

glog.V(1).Infof("manifest %s container %s is unhealthy %d.", manifest.ID, container.Name, healthy)
if err := kl.killContainer(*dockerContainer); err != nil {
glog.V(1).Infof("Failed to kill container %s: %v", containerID, err)
continue
}
}

glog.Infof("%+v doesn't exist, creating", container)
if err := kl.DockerPuller.Pull(container.Image); err != nil {
glog.Errorf("Failed to create container: %v skipping manifest %s container %s.", err, manifest.ID, container.Name)
continue
}
containerID, err := kl.runContainer(manifest, &container, "container:"+string(netID))
if err != nil {
// TODO(bburns) : Perhaps blacklist a container after N failures?
glog.Errorf("Error running manifest %s container %s: %v", manifest.ID, container.Name, err)
continue
}
keepChannel <- containerID
}
return nil
Expand All @@ -663,11 +603,16 @@ type empty struct{}
// SyncManifests synchronizes the configured list of containers (desired state) with the host current state.
func (kl *Kubelet) SyncManifests(config []api.ContainerManifest) error {
glog.Infof("Desired: %+v", config)
var err error
dockerIdsToKeep := map[DockerID]empty{}
keepChannel := make(chan DockerID, defaultChanSize)
waitGroup := sync.WaitGroup{}

dockerContainers, err := getKubeletDockerContainers(kl.DockerClient)
if err != nil {
glog.Errorf("Error listing containers %#v", dockerContainers)
return err
}

// Check for any containers that need starting
for ix := range config {
waitGroup.Add(1)
Expand All @@ -676,7 +621,7 @@ func (kl *Kubelet) SyncManifests(config []api.ContainerManifest) error {
defer waitGroup.Done()
// necessary to dereference by index here b/c otherwise the shared value
// in the for each is re-used.
err := kl.syncManifest(&config[index], keepChannel)
err := kl.syncManifest(&config[index], dockerContainers, keepChannel)
if err != nil {
glog.Errorf("Error syncing manifest: %v skipping.", err)
}
Expand All @@ -696,15 +641,15 @@ func (kl *Kubelet) SyncManifests(config []api.ContainerManifest) error {
<-ch

// Kill any containers we don't need
existingContainers, err := kl.getDockerContainers()
existingContainers, err := getKubeletDockerContainers(kl.DockerClient)
if err != nil {
glog.Errorf("Error listing containers: %v", err)
return err
}
for id, container := range existingContainers {
if _, ok := dockerIdsToKeep[id]; !ok {
glog.Infof("Killing: %s", id)
err = kl.killContainer(container)
err = kl.killContainer(*container)
if err != nil {
glog.Errorf("Error killing container: %v", err)
}
Expand Down Expand Up @@ -782,67 +727,6 @@ func (kl *Kubelet) syncLoop(updateChannel <-chan manifestUpdate, handler SyncHan
}
}

// getContainerIDFromName looks at the list of containers on the machine and returns the ID of the container whose name
// matches 'name'. It returns the name of the container, or empty string, if the container isn't found.
// it returns true if the container is found, false otherwise, and any error that occurs.
// TODO: This functions exists to support GetContainerInfo and GetContainerStats
// It should be removed once those two functions start taking proper pod.IDs
func (kl *Kubelet) getContainerIDFromName(name string) (DockerID, bool, error) {
containerList, err := kl.DockerClient.ListContainers(docker.ListContainersOptions{})
if err != nil {
return "", false, err
}
for _, value := range containerList {
if strings.Contains(value.Names[0], name) {
return DockerID(value.ID), true, nil
}
}
return "", false, nil
}

// GetPodInfo returns docker info for all containers in the pod/manifest.
func (kl *Kubelet) GetPodInfo(podID string) (api.PodInfo, error) {
info := api.PodInfo{}

containerList, err := kl.DockerClient.ListContainers(docker.ListContainersOptions{})
if err != nil {
return nil, err
}

for _, value := range containerList {
manifestID, containerName := parseDockerName(value.Names[0])
if manifestID != podID {
continue
}
inspectResult, err := kl.DockerClient.InspectContainer(value.ID)
if err != nil {
return nil, err
}
if inspectResult == nil {
// Why did we not get an error?
info[containerName] = docker.Container{}
} else {
info[containerName] = *inspectResult
}
}
return info, nil
}

// Returns the docker id corresponding to pod-id-container-name pair.
func (kl *Kubelet) getDockerIDFromPodIDAndContainerName(podID, containerName string) (DockerID, error) {
containerList, err := kl.DockerClient.ListContainers(docker.ListContainersOptions{})
if err != nil {
return "", err
}
for _, value := range containerList {
manifestID, cName := parseDockerName(value.Names[0])
if manifestID == podID && cName == containerName {
return DockerID(value.ID), nil
}
}
return "", errors.New("couldn't find container")
}

func getCadvisorContainerInfoRequest(req *info.ContainerInfoRequest) *info.ContainerInfoRequest {
ret := &info.ContainerInfoRequest{
NumStats: req.NumStats,
Expand All @@ -864,16 +748,25 @@ func (kl *Kubelet) statsFromContainerPath(containerPath string, req *info.Contai
return cinfo, nil
}

// GetPodInfo returns information from Docker about the containers in a pod
func (kl *Kubelet) GetPodInfo(manifestID string) (api.PodInfo, error) {
return getDockerPodInfo(kl.DockerClient, manifestID)
}

// GetContainerInfo returns stats (from Cadvisor) for a container.
func (kl *Kubelet) GetContainerInfo(podID, containerName string, req *info.ContainerInfoRequest) (*info.ContainerInfo, error) {
func (kl *Kubelet) GetContainerInfo(manifestID, containerName string, req *info.ContainerInfoRequest) (*info.ContainerInfo, error) {
if kl.CadvisorClient == nil {
return nil, nil
}
dockerID, err := kl.getDockerIDFromPodIDAndContainerName(podID, containerName)
if err != nil || len(dockerID) == 0 {
dockerContainers, err := getKubeletDockerContainers(kl.DockerClient)
if err != nil {
return nil, err
}
return kl.statsFromContainerPath(fmt.Sprintf("/docker/%s", string(dockerID)), req)
dockerContainer, found := dockerContainers.FindPodContainer(manifestID, containerName)
if !found {
return nil, errors.New("couldn't find container")
}
return kl.statsFromContainerPath(fmt.Sprintf("/docker/%s", dockerContainer.ID), req)
}

// GetMachineStats returns stats (from Cadvisor) of current machine.
Expand Down
Loading

0 comments on commit 185a97b

Please sign in to comment.