From 3489d1ae01c3a030068f1970f5ffa8aa6dc6c559 Mon Sep 17 00:00:00 2001 From: gmarek Date: Tue, 10 Mar 2015 15:09:55 +0100 Subject: [PATCH] Refactor kubelet syncPod method --- pkg/kubelet/kubelet.go | 237 ++++++++++++++++++++++++------------ pkg/kubelet/kubelet_test.go | 23 ++-- 2 files changed, 173 insertions(+), 87 deletions(-) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index e15ba406dbdc4..a5c413d404701 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1183,122 +1183,199 @@ func (kl *Kubelet) pullImageAndRunContainer(pod *api.BoundPod, container *api.Co return containerID, nil } -func (kl *Kubelet) syncPod(pod *api.BoundPod, containersInPod dockertools.DockerContainers) error { +// Structure keeping information on changes that need to happen for a pod. The semantics is as follows: +// - startInfraContainer is true if new Infra Containers have to be started and old one (if running) killed. +// Additionally if it is true then containersToKeep have to be empty +// - infraContainerId have to be set iff startInfraContainer is false. It stores dockerID of running Infra Container +// - containersToStart keeps indices of Specs of containers that have to be started. +// - containersToKeep stores mapping from dockerIDs of running containers to indices of their Specs for containers that +// should be kept running. If startInfraContainer is false then it contains an entry for infraContainerId (mapped to -1). +// It shouldn't be the case where containersToStart is empty and containersToKeep contains only infraContainerId. In such case +// Infra Container should be killed, hence it's removed from this map. +// - all running containers which are NOT contained in containersToKeep should be killed. +type podContainerChangesSpec struct { + startInfraContainer bool + infraContainerId dockertools.DockerID + containersToStart map[int]empty + containersToKeep map[dockertools.DockerID]int +} + +func (kl *Kubelet) computePodContainerChanges(pod *api.BoundPod, containersInPod dockertools.DockerContainers) (podContainerChangesSpec, error) { podFullName := GetPodFullName(pod) uid := pod.UID - glog.V(4).Infof("Syncing Pod, podFullName: %q, uid: %q", podFullName, uid) + glog.V(4).Infof("Syncing Pod %+v, podFullName: %q, uid: %q", pod, podFullName, uid) err := kl.makePodDataDirs(pod) if err != nil { - return err + return podContainerChangesSpec{}, err } + containersToStart := make(map[int]empty) + containersToKeep := make(map[dockertools.DockerID]int) + createPodInfraContainer := false var podStatus api.PodStatus podInfraContainerID, found := kl.getPodInfraContainer(podFullName, uid, containersInPod) - if !found { - glog.V(2).Infof("Pod infra container doesn't exist for pod %q, killing and re-creating the pod", podFullName) - var count int - count, err = kl.killContainersInPod(pod, containersInPod) - if err != nil { - return err - } - podInfraContainerID, err = kl.createPodInfraContainer(pod) - if err != nil { - glog.Errorf("Failed to introspect pod infra container: %v; Skipping pod %q", err, podFullName) - return err - } - if count > 0 { - // Re-list everything, otherwise we'll think we're ok. - containersInPod, err = dockertools.GetKubeletDockerContainers(kl.dockerClient, false) - if err != nil { - glog.Errorf("Error listing containers %#v", containersInPod) - return err - } - } + if found { + glog.V(4).Infof("Found infra pod for %q", podFullName) + containersToKeep[podInfraContainerID] = -1 podStatus, err = kl.GetPodStatus(podFullName, uid) if err != nil { glog.Errorf("Unable to get pod with name %q and uid %q info with error(%v)", podFullName, uid, err) } } else { - podStatus, err = kl.GetPodStatus(podFullName, uid) - if err != nil { - glog.Errorf("Unable to get pod with name %q and uid %q info with error(%v)", podFullName, uid, err) - } + glog.V(2).Infof("No Infra Container for %q found. All containers will be restarted.", podFullName) + createPodInfraContainer = true } - containersInPod.RemoveContainerWithID(podInfraContainerID) - ref, err := api.GetReference(pod) - if err != nil { - glog.Errorf("Couldn't make a ref to pod %q: '%v'", podFullName, err) - } - - podVolumes, err := kl.mountExternalVolumes(pod) - if err != nil { - if ref != nil { - kl.recorder.Eventf(ref, "failedMount", - "Unable to mount volumes for pod %q: %v", podFullName, err) - } - glog.Errorf("Unable to mount volumes for pod %q: %v; skipping pod", podFullName, err) - return err - } - - for _, container := range pod.Spec.Containers { + for index, container := range pod.Spec.Containers { expectedHash := dockertools.HashContainer(&container) - dockerContainerName := dockertools.BuildDockerName(uid, podFullName, &container) if dockerContainer, found, hash := containersInPod.FindPodContainer(podFullName, uid, container.Name); found { containerID := dockertools.DockerID(dockerContainer.ID) glog.V(3).Infof("pod %q container %q exists as %v", podFullName, container.Name, containerID) - // look for changes in the container. - containerChanged := hash != 0 && hash != expectedHash - if !containerChanged { - result, err := kl.probeContainer(pod, podStatus, container, dockerContainer) - if err != nil { - containersInPod.RemoveContainerWithID(containerID) - continue - } - if result == probe.Success { - containersInPod.RemoveContainerWithID(containerID) - continue + if !createPodInfraContainer { + // look for changes in the container. + containerChanged := hash != 0 && hash != expectedHash + if !containerChanged { + result, err := kl.probeContainer(pod, podStatus, container, dockerContainer) + if err != nil { + // TODO(vmarmol): examine this logic. + glog.Infof("probe no-error: %s", container.Name) + containersToKeep[containerID] = index + continue + } + if result == probe.Success { + glog.Infof("probe success: %s", container.Name) + containersToKeep[containerID] = index + continue + } + glog.Infof("pod %q container %q is unhealthy (probe result: %v). Container will be killed and re-created.", podFullName, container.Name, result) + containersToStart[index] = empty{} + } else { + glog.Infof("pod %q container %q hash changed (%d vs %d). Pod will be killed and re-created.", podFullName, container.Name, hash, expectedHash) + createPodInfraContainer = true + delete(containersToKeep, podInfraContainerID) + // If we are to restart Infra Container then we move containersToKeep into containersToStart + // if RestartPolicy allows restarting failed containers. + if pod.Spec.RestartPolicy.Never == nil { + for _, v := range containersToKeep { + containersToStart[v] = empty{} + } + } + containersToStart[index] = empty{} + containersToKeep = make(map[dockertools.DockerID]int) } - glog.Infof("pod %q container %q is unhealthy (probe result: %v). Container will be killed and re-created.", podFullName, container.Name, result) - } else { - glog.Infof("pod %q container %q hash changed (%d vs %d). Container will be killed and re-created.", podFullName, container.Name, hash, expectedHash) - // Also kill associated pod infra container if the container changed. - if err := kl.killContainerByID(string(podInfraContainerID)); err != nil { - glog.V(1).Infof("Failed to kill pod infra container %q: %v", podInfraContainerID, err) - continue + } else { // createPodInfraContainer == true and Container exists + // If we're creating infra containere everything will be killed anyway + // If RestartPolicy is Always or OnFailure we restart containers that were running before we + // killed them when restarting Infra Container. + if pod.Spec.RestartPolicy.Never == nil { + glog.V(1).Infof("Infra Container is being recreated. %q will be restarted.", container.Name) + containersToStart[index] = empty{} } - containersInPod.RemoveContainerWithID(containerID) - } - containersInPod.RemoveContainerWithID(containerID) - if err := kl.killContainer(dockerContainer); err != nil { - glog.V(1).Infof("Failed to kill container %q: %v", dockerContainer.ID, err) continue } + } else { + if kl.shouldContainerBeRestarted(&container, pod) { + // If we are here it means that the container is dead and sould be restarted, or never existed and should + // be created. We may be inserting this ID again if the container has changed and it has + // RestartPolicy::Always, but it's not a big deal. + glog.V(3).Infof("Container %+v is dead, but RestartPolicy says that we should restart it.", container) + containersToStart[index] = empty{} + } } + } - if !kl.shouldContainerBeRestarted(&container, pod) { - continue - } + // After the loop one of the following should be true: + // - createPodInfraContainer is true and containersToKeep is empty + // - createPodInfraContainer is false and containersToKeep contains at least ID of Infra Container - glog.V(3).Infof("Container with name %s doesn't exist, creating", dockerContainerName) + // If Infra container is the last running one, we don't want to keep it. + if !createPodInfraContainer && len(containersToStart) == 0 && len(containersToKeep) == 1 { + containersToKeep = make(map[dockertools.DockerID]int) + } - containerID, err := kl.pullImageAndRunContainer(pod, &container, &podVolumes, podInfraContainerID) - if err == nil { - containersInPod.RemoveContainerWithID(containerID) + return podContainerChangesSpec{ + startInfraContainer: createPodInfraContainer, + infraContainerId: podInfraContainerID, + containersToStart: containersToStart, + containersToKeep: containersToKeep, + }, nil +} + +func (kl *Kubelet) syncPod(pod *api.BoundPod, containersInPod dockertools.DockerContainers) error { + podFullName := GetPodFullName(pod) + uid := pod.UID + containerChanges, err := kl.computePodContainerChanges(pod, containersInPod) + glog.V(3).Infof("Got container changes for pod %q: %+v", podFullName, containerChanges) + if err != nil { + return err + } + + if containerChanges.startInfraContainer || (len(containerChanges.containersToKeep) == 0 && len(containerChanges.containersToStart) == 0) { + if len(containerChanges.containersToKeep) == 0 && len(containerChanges.containersToStart) == 0 { + glog.V(4).Infof("Killing Infra Container for %q becase all other containers are dead.", podFullName) + } else { + glog.V(4).Infof("Killing Infra Container for %q, will start new one", podFullName) + } + // Killing phase: if we want to start new infra container, or nothing is running kill everything (including infra container) + if podInfraContainer, found, _ := containersInPod.FindPodContainer(podFullName, uid, dockertools.PodInfraContainerName); found { + if err := kl.killContainer(podInfraContainer); err != nil { + glog.Warningf("Failed to kill pod infra container %q: %v", podInfraContainer.ID, err) + } + } + _, err = kl.killContainersInPod(pod, containersInPod) + if err != nil { + return err + } + } else { + // Otherwise kill any containers in this pod which are not specified as ones to keep. + for id, container := range containersInPod { + _, keep := containerChanges.containersToKeep[id] + if !keep { + glog.V(3).Infof("Killing unwanted container %+v", container) + err = kl.killContainer(container) + if err != nil { + glog.Errorf("Error killing container: %v", err) + } + } } } - // Kill any remaining containers in this pod which were not identified above (guards against duplicates). - for _, container := range containersInPod { - glog.V(1).Infof("Killing unwanted container in pod %q: %+v", pod.UID, container) - err = kl.killContainer(container) + // Starting phase: if we should create infra container then we do it first + var ref *api.ObjectReference + var podVolumes volumeMap + podInfraContainerID := containerChanges.infraContainerId + if containerChanges.startInfraContainer && (len(containerChanges.containersToStart) > 0) { + ref, err = api.GetReference(pod) + if err != nil { + glog.Errorf("Couldn't make a ref to pod %q: '%v'", podFullName, err) + } + glog.Infof("Creating pod infra container for %q", podFullName) + podInfraContainerID, err = kl.createPodInfraContainer(pod) if err != nil { - glog.Errorf("Error killing container: %v", err) + glog.Errorf("Failed to create pod infra container: %v; Skipping pod %q", err, podFullName) + return err } } + // Mount volumes + podVolumes, err = kl.mountExternalVolumes(pod) + if err != nil { + if ref != nil { + kl.recorder.Eventf(ref, "failedMount", + "Unable to mount volumes for pod %q: %v", podFullName, err) + } + glog.Errorf("Unable to mount volumes for pod %q: %v; skipping pod", podFullName, err) + return err + } + + // Start everything + for container := range containerChanges.containersToStart { + glog.V(4).Infof("Creating container %+v", pod.Spec.Containers[container]) + kl.pullImageAndRunContainer(pod, &pod.Spec.Containers[container], &podVolumes, podInfraContainerID) + } + return nil } diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 63dda32a03f43..293cb34d4503b 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -459,7 +459,7 @@ func TestSyncPodsWithTerminationLog(t *testing.T) { } waitGroup.Wait() verifyCalls(t, fakeDocker, []string{ - "list", "list", "create", "start", "inspect_container", "list", "inspect_container", "inspect_image", "list", "create", "start"}) + "list", "list", "list", "create", "start", "inspect_container", "create", "start"}) fakeDocker.Lock() parts := strings.Split(fakeDocker.Container.HostConfig.Binds[0], ":") @@ -506,7 +506,7 @@ func TestSyncPodsCreatesNetAndContainer(t *testing.T) { waitGroup.Wait() verifyCalls(t, fakeDocker, []string{ - "list", "list", "create", "start", "inspect_container", "list", "inspect_container", "inspect_image", "list", "create", "start"}) + "list", "list", "list", "create", "start", "inspect_container", "create", "start"}) fakeDocker.Lock() @@ -556,7 +556,7 @@ func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) { waitGroup.Wait() verifyCalls(t, fakeDocker, []string{ - "list", "list", "create", "start", "inspect_container", "list", "inspect_container", "inspect_image", "list", "create", "start"}) + "list", "list", "list", "create", "start", "inspect_container", "create", "start"}) fakeDocker.Lock() @@ -701,7 +701,7 @@ func TestSyncPodsDeletesWithNoPodInfraContainer(t *testing.T) { waitGroup.Wait() verifyCalls(t, fakeDocker, []string{ - "list", "list", "stop", "create", "start", "inspect_container", "list", "list", "inspect_container", "inspect_image", "list", "create", "start"}) + "list", "list", "stop", "create", "start", "inspect_container", "create", "start"}) // A map iteration is used to delete containers, so must not depend on // order here. @@ -873,7 +873,7 @@ func TestSyncPodBadHash(t *testing.T) { } //verifyCalls(t, fakeDocker, []string{"list", "stop", "list", "create", "start", "stop", "create", "start", "inspect_container"}) - verifyCalls(t, fakeDocker, []string{"list", "stop", "stop", "list", "create", "start"}) + verifyCalls(t, fakeDocker, []string{"list", "stop", "stop", "create", "start", "inspect_container", "create", "start"}) // A map interation is used to delete containers, so must not depend on // order here. @@ -924,7 +924,7 @@ func TestSyncPodUnhealthy(t *testing.T) { t.Errorf("unexpected error: %v", err) } - verifyCalls(t, fakeDocker, []string{"list", "stop", "list", "create", "start"}) + verifyCalls(t, fakeDocker, []string{"list", "stop", "create", "start"}) // A map interation is used to delete containers, so must not depend on // order here. @@ -1987,7 +1987,16 @@ func TestSyncPodsWithPullPolicy(t *testing.T) { fakeDocker.Lock() - if !reflect.DeepEqual(puller.ImagesPulled, []string{"custom_image_name", "pull_always_image", "pull_if_not_present_image"}) { + pulledImageSet := make(map[string]empty) + for v := range puller.ImagesPulled { + pulledImageSet[puller.ImagesPulled[v]] = empty{} + } + + if !reflect.DeepEqual(pulledImageSet, map[string]empty{ + "custom_image_name": {}, + "pull_always_image": {}, + "pull_if_not_present_image": {}, + }) { t.Errorf("Unexpected pulled containers: %v", puller.ImagesPulled) }