Skip to content

Commit

Permalink
Refactor pkg/kubelet/kubelet.go: syncPod().
Browse files Browse the repository at this point in the history
Makes the syncPod() takes only the containers that belongs to the pod.
  • Loading branch information
Yifan Gu committed Mar 6, 2015
1 parent ca9d243 commit ed1823e
Show file tree
Hide file tree
Showing 5 changed files with 200 additions and 62 deletions.
18 changes: 12 additions & 6 deletions pkg/kubelet/dockertools/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,17 +413,23 @@ func (c DockerContainers) FindPodContainer(podFullName string, uid types.UID, co
return nil, false, 0
}

// Note, this might return containers belong to a different Pod instance with the same name
func (c DockerContainers) FindContainersByPodFullName(podFullName string) map[string]*docker.APIContainers {
containers := make(map[string]*docker.APIContainers)
// RemoveContainerWithID removes the container with the given containerID.
func (c DockerContainers) RemoveContainerWithID(containerID DockerID) {
delete(c, containerID)
}

// FindContainersByPod returns the containers that belong to the pod.
func (c DockerContainers) FindContainersByPod(podUID types.UID, podFullName string) DockerContainers {
containers := make(DockerContainers)

for _, dockerContainer := range c {
if len(dockerContainer.Names) == 0 {
continue
}
dockerManifestID, _, dockerContainerName, _ := ParseDockerName(dockerContainer.Names[0])
if dockerManifestID == podFullName {
containers[dockerContainerName] = dockerContainer
dockerPodName, uuid, _, _ := ParseDockerName(dockerContainer.Names[0])
if podUID == uuid ||
(podUID == "" && podFullName == dockerPodName) {
containers[DockerID(dockerContainer.ID)] = dockerContainer
}
}
return containers
Expand Down
143 changes: 143 additions & 0 deletions pkg/kubelet/dockertools/docker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,3 +451,146 @@ func TestGetRunningContainers(t *testing.T) {
}
}
}

func TestFindContainersByPod(t *testing.T) {
tests := []struct {
testContainers DockerContainers
inputPodID types.UID
inputPodFullName string
expectedContainers DockerContainers
}{
{
DockerContainers{
"foobar": &docker.APIContainers{
ID: "foobar",
Names: []string{"/k8s_foo_qux_1234_42"},
},
"barbar": &docker.APIContainers{
ID: "barbar",
Names: []string{"/k8s_foo_qux_1234_42"},
},
"baz": &docker.APIContainers{
ID: "baz",
Names: []string{"/k8s_foo_qux_1234_42"},
},
},
types.UID("1234"),
"",
DockerContainers{
"foobar": &docker.APIContainers{
ID: "foobar",
Names: []string{"/k8s_foo_qux_1234_42"},
},
"barbar": &docker.APIContainers{
ID: "barbar",
Names: []string{"/k8s_foo_qux_1234_42"},
},
"baz": &docker.APIContainers{
ID: "baz",
Names: []string{"/k8s_foo_qux_1234_42"},
},
},
},
{
DockerContainers{
"foobar": &docker.APIContainers{
ID: "foobar",
Names: []string{"/k8s_foo_qux_1234_42"},
},
"barbar": &docker.APIContainers{
ID: "barbar",
Names: []string{"/k8s_foo_qux_2343_42"},
},
"baz": &docker.APIContainers{
ID: "baz",
Names: []string{"/k8s_foo_qux_1234_42"},
},
},
types.UID("1234"),
"",
DockerContainers{
"foobar": &docker.APIContainers{
ID: "foobar",
Names: []string{"/k8s_foo_qux_1234_42"},
},
"baz": &docker.APIContainers{
ID: "baz",
Names: []string{"/k8s_foo_qux_1234_42"},
},
},
},
{
DockerContainers{
"foobar": &docker.APIContainers{
ID: "foobar",
Names: []string{"/k8s_foo_qux_1234_42"},
},
"barbar": &docker.APIContainers{
ID: "barbar",
Names: []string{"/k8s_foo_qux_2343_42"},
},
"baz": &docker.APIContainers{
ID: "baz",
Names: []string{"/k8s_foo_qux_1234_42"},
},
},
types.UID("5678"),
"",
DockerContainers{},
},
{
DockerContainers{
"foobar": &docker.APIContainers{
ID: "foobar",
Names: []string{"/k8s_foo_qux_1234_42"},
},
"barbar": &docker.APIContainers{
ID: "barbar",
Names: nil,
},
"baz": &docker.APIContainers{
ID: "baz",
Names: []string{"/k8s_foo_qux_5678_42"},
},
},
types.UID("5678"),
"",
DockerContainers{
"baz": &docker.APIContainers{
ID: "baz",
Names: []string{"/k8s_foo_qux_5678_42"},
},
},
},
{
DockerContainers{
"foobar": &docker.APIContainers{
ID: "foobar",
Names: []string{"/k8s_foo_qux_1234_42"},
},
"barbar": &docker.APIContainers{
ID: "barbar",
Names: []string{"/k8s_foo_abc_5678_42"},
},
"baz": &docker.APIContainers{
ID: "baz",
Names: []string{"/k8s_foo_qux_5678_42"},
},
},
"",
"abc",
DockerContainers{
"barbar": &docker.APIContainers{
ID: "barbar",
Names: []string{"/k8s_foo_abc_5678_42"},
},
},
},
}
for _, test := range tests {
result := test.testContainers.FindContainersByPod(test.inputPodID, test.inputPodFullName)
if !reflect.DeepEqual(result, test.expectedContainers) {
t.Errorf("expected: %v, saw: %v", test.expectedContainers, result)
}
}
}
93 changes: 43 additions & 50 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -723,7 +723,7 @@ func (kl *Kubelet) runContainer(pod *api.BoundPod, container *api.Container, pod
if container.Lifecycle != nil && container.Lifecycle.PostStart != nil {
handlerErr := kl.runHandler(GetPodFullName(pod), pod.UID, container, container.Lifecycle.PostStart)
if handlerErr != nil {
kl.killContainerByID(dockerContainer.ID, "")
kl.killContainerByID(dockerContainer.ID)
return dockertools.DockerID(""), fmt.Errorf("failed to call event handler: %v", handlerErr)
}
}
Expand Down Expand Up @@ -873,25 +873,21 @@ func parseResolvConf(reader io.Reader) (nameservers []string, searches []string,

// Kill a docker container
func (kl *Kubelet) killContainer(dockerContainer *docker.APIContainers) error {
return kl.killContainerByID(dockerContainer.ID, dockerContainer.Names[0])
return kl.killContainerByID(dockerContainer.ID)
}

func (kl *Kubelet) killContainerByID(ID, name string) error {
glog.V(2).Infof("Killing container with id %q and name %q", ID, name)
func (kl *Kubelet) killContainerByID(ID string) error {
glog.V(2).Infof("Killing container with id %q", ID)
kl.readiness.remove(ID)
err := kl.dockerClient.StopContainer(ID, 10)
if len(name) == 0 {
return err
}

ref, ok := kl.getRef(dockertools.DockerID(ID))
if !ok {
glog.Warningf("No ref for pod '%v' - '%v'", ID, name)
glog.Warningf("No ref for pod '%v'", ID)
} else {
// TODO: pass reason down here, and state, or move this call up the stack.
kl.recorder.Eventf(ref, "killing", "Killing %v - %v", ID, name)
kl.recorder.Eventf(ref, "killing", "Killing %v", ID)
}

return err
}

Expand Down Expand Up @@ -1007,36 +1003,42 @@ func (kl *Kubelet) killContainersInPod(pod *api.BoundPod, dockerContainers docke

type empty struct{}

func (kl *Kubelet) syncPod(pod *api.BoundPod, dockerContainers dockertools.DockerContainers) error {
// makePodDataDirs creates the dirs for the pod datas.
func (kl *Kubelet) makePodDataDirs(pod *api.BoundPod) error {
uid := pod.UID
if err := os.Mkdir(kl.getPodDir(uid), 0750); err != nil && !os.IsExist(err) {
return err
}
if err := os.Mkdir(kl.getPodVolumesDir(uid), 0750); err != nil && !os.IsExist(err) {
return err
}
if err := os.Mkdir(kl.getPodPluginsDir(uid), 0750); err != nil && !os.IsExist(err) {
return err
}
return nil
}

func (kl *Kubelet) syncPod(pod *api.BoundPod, containersInPod dockertools.DockerContainers) error {
podFullName := GetPodFullName(pod)
uid := pod.UID
containersToKeep := make(map[dockertools.DockerID]empty)
killedContainers := make(map[dockertools.DockerID]empty)
glog.V(4).Infof("Syncing Pod, podFullName: %q, uid: %q", podFullName, uid)

ref, err := api.GetReference(pod)
if err != nil {
glog.Errorf("Couldn't make a ref to pod %q: '%v'", podFullName, err)
}

// Make data dirs.
if err := os.Mkdir(kl.getPodDir(uid), 0750); err != nil && !os.IsExist(err) {
return err
}
if err := os.Mkdir(kl.getPodVolumesDir(uid), 0750); err != nil && !os.IsExist(err) {
return err
}
if err := os.Mkdir(kl.getPodPluginsDir(uid), 0750); err != nil && !os.IsExist(err) {
if err = kl.makePodDataDirs(pod); err != nil {
return err
}

// Make sure we have a pod infra container
var podInfraContainerID dockertools.DockerID
if podInfraDockerContainer, found, _ := dockerContainers.FindPodContainer(podFullName, uid, dockertools.PodInfraContainerName); found {
if podInfraDockerContainer, found, _ := containersInPod.FindPodContainer(podFullName, uid, dockertools.PodInfraContainerName); found {
podInfraContainerID = dockertools.DockerID(podInfraDockerContainer.ID)
} else {
glog.V(2).Infof("Pod infra container doesn't exist for pod %q, killing and re-creating the pod", podFullName)
count, err := kl.killContainersInPod(pod, dockerContainers)
count, err := kl.killContainersInPod(pod, containersInPod)
if err != nil {
return err
}
Expand All @@ -1047,14 +1049,14 @@ func (kl *Kubelet) syncPod(pod *api.BoundPod, dockerContainers dockertools.Docke
}
if count > 0 {
// Re-list everything, otherwise we'll think we're ok.
dockerContainers, err = dockertools.GetKubeletDockerContainers(kl.dockerClient, false)
containersInPod, err = dockertools.GetKubeletDockerContainers(kl.dockerClient, false)
if err != nil {
glog.Errorf("Error listing containers %#v", dockerContainers)
glog.Errorf("Error listing containers %#v", containersInPod)
return err
}
}
}
containersToKeep[podInfraContainerID] = empty{}
containersInPod.RemoveContainerWithID(podInfraContainerID)

podVolumes, err := kl.mountExternalVolumes(pod)
if err != nil {
Expand All @@ -1074,7 +1076,7 @@ func (kl *Kubelet) syncPod(pod *api.BoundPod, dockerContainers dockertools.Docke
for _, container := range pod.Spec.Containers {
expectedHash := dockertools.HashContainer(&container)
dockerContainerName := dockertools.BuildDockerName(uid, podFullName, &container)
if dockerContainer, found, hash := dockerContainers.FindPodContainer(podFullName, uid, container.Name); found {
if dockerContainer, found, hash := containersInPod.FindPodContainer(podFullName, uid, container.Name); found {
containerID := dockertools.DockerID(dockerContainer.ID)
glog.V(3).Infof("pod %q container %q exists as %v", podFullName, container.Name, containerID)

Expand All @@ -1092,7 +1094,7 @@ func (kl *Kubelet) syncPod(pod *api.BoundPod, dockerContainers dockertools.Docke
}
if err != nil {
glog.V(1).Infof("liveness/readiness probe errored: %v", err)
containersToKeep[containerID] = empty{}
containersInPod.RemoveContainerWithID(containerID)
continue
}
if ready == probe.Success {
Expand All @@ -1101,7 +1103,7 @@ func (kl *Kubelet) syncPod(pod *api.BoundPod, dockerContainers dockertools.Docke
kl.readiness.set(dockerContainer.ID, false)
}
if live == probe.Success {
containersToKeep[containerID] = empty{}
containersInPod.RemoveContainerWithID(containerID)
continue
}
ref, ok := kl.getRef(containerID)
Expand All @@ -1118,16 +1120,15 @@ func (kl *Kubelet) syncPod(pod *api.BoundPod, dockerContainers dockertools.Docke
glog.V(1).Infof("Failed to kill container %q: %v", dockerContainer.ID, err)
continue
}
killedContainers[containerID] = empty{}
containersInPod.RemoveContainerWithID(containerID)

if podChanged {
// Also kill associated pod infra container if the pod changed.
if podInfraContainer, found, _ := dockerContainers.FindPodContainer(podFullName, uid, dockertools.PodInfraContainerName); found {
if err := kl.killContainer(podInfraContainer); err != nil {
glog.V(1).Infof("Failed to kill pod infra container %q: %v", podInfraContainer.ID, err)
continue
}
if err := kl.killContainerByID(string(podInfraContainerID)); err != nil {
glog.V(1).Infof("Failed to kill pod infra container %q: %v", podInfraContainerID, err)
continue
}
containersInPod.RemoveContainerWithID(containerID)
}
}

Expand Down Expand Up @@ -1187,23 +1188,15 @@ func (kl *Kubelet) syncPod(pod *api.BoundPod, dockerContainers dockertools.Docke
glog.Errorf("Error running pod %q container %q: %v", podFullName, container.Name, err)
continue
}
containersToKeep[containerID] = empty{}
containersInPod.RemoveContainerWithID(containerID)
}

// Kill any containers in this pod which were not identified above (guards against duplicates).
for id, container := range dockerContainers {
curPodFullName, curUUID, _, _ := dockertools.ParseDockerName(container.Names[0])
if curPodFullName == podFullName && curUUID == uid {
// Don't kill containers we want to keep or those we already killed.
_, keep := containersToKeep[id]
_, killed := killedContainers[id]
if !keep && !killed {
glog.V(1).Infof("Killing unwanted container in pod %q: %+v", curUUID, container)
err = kl.killContainer(container)
if err != nil {
glog.Errorf("Error killing container: %v", err)
}
}
// Kill any remaining containers in this pod which were not identified above (guards against duplicates).
for _, container := range containersInPod {
glog.V(1).Infof("Killing unwanted container in pod %q: %+v", pod.UID, container)
err = kl.killContainer(container)
if err != nil {
glog.Errorf("Error killing container: %v", err)
}
}

Expand Down
5 changes: 0 additions & 5 deletions pkg/kubelet/kubelet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -878,11 +878,6 @@ func TestSyncPodDeletesDuplicate(t *testing.T) {
Names: []string{"/k8s_foo_bar.new.test_12345678_3333"},
ID: "4567",
},
"2304": &docker.APIContainers{
// Container for another pod, untouched.
Names: []string{"/k8s_baz_fiz.new.test_6_42"},
ID: "2304",
},
}
bound := api.BoundPod{
ObjectMeta: api.ObjectMeta{
Expand Down
3 changes: 2 additions & 1 deletion pkg/kubelet/pod_workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ func (p *podWorkers) managePodLoop(podUpdates <-chan workUpdate) {
glog.Errorf("Error listing containers while syncing pod: %v", err)
return
}
err = p.syncPodFn(newWork.pod, containers)

err = p.syncPodFn(newWork.pod, containers.FindContainersByPod(newWork.pod.UID, GetPodFullName(newWork.pod)))
if err != nil {
glog.Errorf("Error syncing pod %s, skipping: %v", newWork.pod.UID, err)
p.recorder.Eventf(newWork.pod, "failedSync", "Error syncing pod, skipping: %v", err)
Expand Down

0 comments on commit ed1823e

Please sign in to comment.