Skip to content

Commit

Permalink
Merge pull request kubernetes#469 from smarterclayton/unify_container…
Browse files Browse the repository at this point in the history
…_lookup

Make container lookup in the Kubelet cleaner
  • Loading branch information
lavalamp committed Jul 16, 2014
2 parents 0e1636e + 185a97b commit 7127eef
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 182 deletions.
72 changes: 72 additions & 0 deletions pkg/kubelet/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,78 @@ func (p dockerPuller) Pull(image string) error {
return p.client.PullImage(opts, docker.AuthConfiguration{})
}

// DockerContainers is a map of containers
type DockerContainers map[DockerID]*docker.APIContainers

func (c DockerContainers) FindPodContainer(manifestID, containerName string) (*docker.APIContainers, bool) {
for _, dockerContainer := range c {
dockerManifestID, dockerContainerName := parseDockerName(dockerContainer.Names[0])
if dockerManifestID == manifestID && dockerContainerName == containerName {
return dockerContainer, true
}
}
return nil, false
}

func (c DockerContainers) FindContainersByPodFullName(manifestID string) map[string]*docker.APIContainers {
containers := make(map[string]*docker.APIContainers)

for _, dockerContainer := range c {
dockerManifestID, dockerContainerName := parseDockerName(dockerContainer.Names[0])
if dockerManifestID == manifestID {
containers[dockerContainerName] = dockerContainer
}
}
return containers
}

// GetKubeletDockerContainers returns a map of docker containers that we manage. The map key is the docker container ID
func getKubeletDockerContainers(client DockerInterface) (DockerContainers, error) {
result := make(DockerContainers)
containers, err := client.ListContainers(docker.ListContainersOptions{})
if err != nil {
return nil, err
}
for i := range containers {
container := &containers[i]
// Skip containers that we didn't create to allow users to manually
// spin up their own containers if they want.
if !strings.HasPrefix(container.Names[0], "/"+containerNamePrefix+"--") {
continue
}
result[DockerID(container.ID)] = container
}
return result, nil
}

// GetDockerPodInfo returns docker info for all containers in the pod/manifest.
func getDockerPodInfo(client DockerInterface, manifestID string) (api.PodInfo, error) {
info := api.PodInfo{}

containers, err := client.ListContainers(docker.ListContainersOptions{})
if err != nil {
return nil, err
}

for _, value := range containers {
dockerManifestID, dockerContainerName := parseDockerName(value.Names[0])
if dockerManifestID != manifestID {
continue
}
inspectResult, err := client.InspectContainer(value.ID)
if err != nil {
return nil, err
}
if inspectResult == nil {
// Why did we not get an error?
info[dockerContainerName] = docker.Container{}
} else {
info[dockerContainerName] = *inspectResult
}
}
return info, nil
}

// Converts "-" to "_-_" and "_" to "___" so that we can use "--" to meaningfully separate parts of a docker name.
func escapeDash(in string) (out string) {
out = strings.Replace(in, "_", "___", -1)
Expand Down
215 changes: 54 additions & 161 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,52 +170,6 @@ func (kl *Kubelet) LogEvent(event *api.Event) error {
return err
}

// Return a map of docker containers that we manage. The map key is the docker container ID
func (kl *Kubelet) getDockerContainers() (map[DockerID]docker.APIContainers, error) {
result := map[DockerID]docker.APIContainers{}
containerList, err := kl.DockerClient.ListContainers(docker.ListContainersOptions{})
if err != nil {
return nil, err
}
for _, value := range containerList {
// Skip containers that we didn't create to allow users to manually
// spin up their own containers if they want.
if !strings.HasPrefix(value.Names[0], "/"+containerNamePrefix+"--") {
continue
}
result[DockerID(value.ID)] = value
}
return result, nil
}

// Return Docker's container ID for a manifest's container. Returns an empty string if it doesn't exist.
func (kl *Kubelet) getContainerID(manifest *api.ContainerManifest, container *api.Container) (DockerID, error) {
dockerContainers, err := kl.getDockerContainers()
if err != nil {
return "", err
}
for id, dockerContainer := range dockerContainers {
manifestID, containerName := parseDockerName(dockerContainer.Names[0])
if manifestID == manifest.ID && containerName == container.Name {
return DockerID(id), nil
}
}
return "", nil
}

func (kl *Kubelet) getContainer(ID DockerID) (*docker.APIContainers, error) {
dockerContainers, err := kl.getDockerContainers()
if err != nil {
return nil, err
}
for dockerID, dockerContainer := range dockerContainers {
if dockerID == ID {
return &dockerContainer, nil
}
}
return nil, nil
}

func makeEnvironmentVariables(container *api.Container) []string {
var result []string
for _, value := range container.Env {
Expand Down Expand Up @@ -571,11 +525,6 @@ func (kl *Kubelet) WatchEtcd(watchChannel <-chan *etcd.Response, updateChannel c

const networkContainerName = "net"

// Return the docker ID for a manifest's network container. Returns an empty string if it doesn't exist.
func (kl *Kubelet) getNetworkContainerID(manifest *api.ContainerManifest) (DockerID, error) {
return kl.getContainerID(manifest, &api.Container{Name: networkContainerName})
}

// Create a network container for a manifest. Returns the docker container ID of the newly created container.
func (kl *Kubelet) createNetworkContainer(manifest *api.ContainerManifest) (DockerID, error) {
var ports []api.Port
Expand All @@ -594,65 +543,56 @@ func (kl *Kubelet) createNetworkContainer(manifest *api.ContainerManifest) (Dock
return kl.runContainer(manifest, container, "")
}

func (kl *Kubelet) syncManifest(manifest *api.ContainerManifest, keepChannel chan<- DockerID) error {
func (kl *Kubelet) syncManifest(manifest *api.ContainerManifest, dockerContainers DockerContainers, keepChannel chan<- DockerID) error {
// Make sure we have a network container
netID, err := kl.getNetworkContainerID(manifest)
if err != nil {
glog.Errorf("Failed to introspect network container. (%v) Skipping manifest %s", err, manifest.ID)
return err
}
if netID == "" {
glog.Infof("Network container doesn't exist, creating")
netID, err = kl.createNetworkContainer(manifest)
var netID DockerID
if networkDockerContainer, found := dockerContainers.FindPodContainer(manifest.ID, networkContainerName); found {
netID = DockerID(networkDockerContainer.ID)
} else {
dockerNetworkID, err := kl.createNetworkContainer(manifest)
if err != nil {
glog.Errorf("Failed to introspect network container. (%v) Skipping manifest %s", err, manifest.ID)
return err
}
netID = dockerNetworkID
}
keepChannel <- netID

for _, container := range manifest.Containers {
containerID, err := kl.getContainerID(manifest, &container)
if err != nil {
glog.Errorf("Error finding container: %v skipping manifest %s container %s.", err, manifest.ID, container.Name)
continue
}
if containerID == "" {
glog.Infof("%+v doesn't exist, creating", container)
kl.DockerPuller.Pull(container.Image)
if err != nil {
glog.Errorf("Failed to create container: %v skipping manifest %s container %s.", err, manifest.ID, container.Name)
continue
}
containerID, err = kl.runContainer(manifest, &container, "container:"+string(netID))
if err != nil {
// TODO(bburns) : Perhaps blacklist a container after N failures?
glog.Errorf("Error running manifest %s container %s: %v", manifest.ID, container.Name, err)
continue
}
} else {
if dockerContainer, found := dockerContainers.FindPodContainer(manifest.ID, container.Name); found {
containerID := DockerID(dockerContainer.ID)
glog.Infof("manifest %s container %s exists as %v", manifest.ID, container.Name, containerID)
glog.V(1).Infof("manifest %s container %s exists as %v", manifest.ID, container.Name, containerID)
dockerContainer, err := kl.getContainer(containerID)

// TODO: This should probably be separated out into a separate goroutine.
healthy, err := kl.healthy(container, dockerContainer)
if err != nil {
glog.V(1).Infof("health check errored: %v", err)
continue
}
if healthy != health.Healthy {
glog.V(1).Infof("manifest %s container %s is unhealthy.", manifest.ID, container.Name)
if err != nil {
glog.V(1).Infof("Failed to get container info %v, for %s", err, containerID)
continue
}
err = kl.killContainer(*dockerContainer)
if err != nil {
glog.V(1).Infof("Failed to kill container %s: %v", containerID, err)
continue
}
containerID, err = kl.runContainer(manifest, &container, "container:"+string(netID))
if healthy == health.Healthy {
keepChannel <- containerID
continue
}

glog.V(1).Infof("manifest %s container %s is unhealthy %d.", manifest.ID, container.Name, healthy)
if err := kl.killContainer(*dockerContainer); err != nil {
glog.V(1).Infof("Failed to kill container %s: %v", containerID, err)
continue
}
}

glog.Infof("%+v doesn't exist, creating", container)
if err := kl.DockerPuller.Pull(container.Image); err != nil {
glog.Errorf("Failed to create container: %v skipping manifest %s container %s.", err, manifest.ID, container.Name)
continue
}
containerID, err := kl.runContainer(manifest, &container, "container:"+string(netID))
if err != nil {
// TODO(bburns) : Perhaps blacklist a container after N failures?
glog.Errorf("Error running manifest %s container %s: %v", manifest.ID, container.Name, err)
continue
}
keepChannel <- containerID
}
return nil
Expand All @@ -663,11 +603,16 @@ type empty struct{}
// SyncManifests synchronizes the configured list of containers (desired state) with the host current state.
func (kl *Kubelet) SyncManifests(config []api.ContainerManifest) error {
glog.Infof("Desired: %+v", config)
var err error
dockerIdsToKeep := map[DockerID]empty{}
keepChannel := make(chan DockerID, defaultChanSize)
waitGroup := sync.WaitGroup{}

dockerContainers, err := getKubeletDockerContainers(kl.DockerClient)
if err != nil {
glog.Errorf("Error listing containers %#v", dockerContainers)
return err
}

// Check for any containers that need starting
for ix := range config {
waitGroup.Add(1)
Expand All @@ -676,7 +621,7 @@ func (kl *Kubelet) SyncManifests(config []api.ContainerManifest) error {
defer waitGroup.Done()
// necessary to dereference by index here b/c otherwise the shared value
// in the for each is re-used.
err := kl.syncManifest(&config[index], keepChannel)
err := kl.syncManifest(&config[index], dockerContainers, keepChannel)
if err != nil {
glog.Errorf("Error syncing manifest: %v skipping.", err)
}
Expand All @@ -696,15 +641,15 @@ func (kl *Kubelet) SyncManifests(config []api.ContainerManifest) error {
<-ch

// Kill any containers we don't need
existingContainers, err := kl.getDockerContainers()
existingContainers, err := getKubeletDockerContainers(kl.DockerClient)
if err != nil {
glog.Errorf("Error listing containers: %v", err)
return err
}
for id, container := range existingContainers {
if _, ok := dockerIdsToKeep[id]; !ok {
glog.Infof("Killing: %s", id)
err = kl.killContainer(container)
err = kl.killContainer(*container)
if err != nil {
glog.Errorf("Error killing container: %v", err)
}
Expand Down Expand Up @@ -782,67 +727,6 @@ func (kl *Kubelet) syncLoop(updateChannel <-chan manifestUpdate, handler SyncHan
}
}

// getContainerIDFromName looks at the list of containers on the machine and returns the ID of the container whose name
// matches 'name'. It returns the name of the container, or empty string, if the container isn't found.
// it returns true if the container is found, false otherwise, and any error that occurs.
// TODO: This functions exists to support GetContainerInfo and GetContainerStats
// It should be removed once those two functions start taking proper pod.IDs
func (kl *Kubelet) getContainerIDFromName(name string) (DockerID, bool, error) {
containerList, err := kl.DockerClient.ListContainers(docker.ListContainersOptions{})
if err != nil {
return "", false, err
}
for _, value := range containerList {
if strings.Contains(value.Names[0], name) {
return DockerID(value.ID), true, nil
}
}
return "", false, nil
}

// GetPodInfo returns docker info for all containers in the pod/manifest.
func (kl *Kubelet) GetPodInfo(podID string) (api.PodInfo, error) {
info := api.PodInfo{}

containerList, err := kl.DockerClient.ListContainers(docker.ListContainersOptions{})
if err != nil {
return nil, err
}

for _, value := range containerList {
manifestID, containerName := parseDockerName(value.Names[0])
if manifestID != podID {
continue
}
inspectResult, err := kl.DockerClient.InspectContainer(value.ID)
if err != nil {
return nil, err
}
if inspectResult == nil {
// Why did we not get an error?
info[containerName] = docker.Container{}
} else {
info[containerName] = *inspectResult
}
}
return info, nil
}

// Returns the docker id corresponding to pod-id-container-name pair.
func (kl *Kubelet) getDockerIDFromPodIDAndContainerName(podID, containerName string) (DockerID, error) {
containerList, err := kl.DockerClient.ListContainers(docker.ListContainersOptions{})
if err != nil {
return "", err
}
for _, value := range containerList {
manifestID, cName := parseDockerName(value.Names[0])
if manifestID == podID && cName == containerName {
return DockerID(value.ID), nil
}
}
return "", errors.New("couldn't find container")
}

func getCadvisorContainerInfoRequest(req *info.ContainerInfoRequest) *info.ContainerInfoRequest {
ret := &info.ContainerInfoRequest{
NumStats: req.NumStats,
Expand All @@ -864,16 +748,25 @@ func (kl *Kubelet) statsFromContainerPath(containerPath string, req *info.Contai
return cinfo, nil
}

// GetPodInfo returns information from Docker about the containers in a pod
func (kl *Kubelet) GetPodInfo(manifestID string) (api.PodInfo, error) {
return getDockerPodInfo(kl.DockerClient, manifestID)
}

// GetContainerInfo returns stats (from Cadvisor) for a container.
func (kl *Kubelet) GetContainerInfo(podID, containerName string, req *info.ContainerInfoRequest) (*info.ContainerInfo, error) {
func (kl *Kubelet) GetContainerInfo(manifestID, containerName string, req *info.ContainerInfoRequest) (*info.ContainerInfo, error) {
if kl.CadvisorClient == nil {
return nil, nil
}
dockerID, err := kl.getDockerIDFromPodIDAndContainerName(podID, containerName)
if err != nil || len(dockerID) == 0 {
dockerContainers, err := getKubeletDockerContainers(kl.DockerClient)
if err != nil {
return nil, err
}
return kl.statsFromContainerPath(fmt.Sprintf("/docker/%s", string(dockerID)), req)
dockerContainer, found := dockerContainers.FindPodContainer(manifestID, containerName)
if !found {
return nil, errors.New("couldn't find container")
}
return kl.statsFromContainerPath(fmt.Sprintf("/docker/%s", dockerContainer.ID), req)
}

// GetMachineStats returns stats (from Cadvisor) of current machine.
Expand Down
Loading

0 comments on commit 7127eef

Please sign in to comment.