From 0d0fb5f07b113c4ca382631dbf7165663c56a003 Mon Sep 17 00:00:00 2001 From: Yu-Ju Hong Date: Thu, 19 Mar 2015 16:51:34 -0700 Subject: [PATCH] Kubelet: per-pod workers should avoid grabbing the pod array lock Per-pod worker syncs the pod and container status, and write the pod status in the pod status cache. Given that it already owns a copy of the pod, it can bypass the additional pod lookup step completely. This change adds a new generatePodStatusByPod() method to achieve this. In general, per-pod worker should avoid accessing the internal pod array completely, as this would may lead to high contention. This change also changes the return type of GetPodByFullName to reflect the name, and consolidates GetPodByFullName() and GetPodByName(). --- pkg/kubelet/kubelet.go | 61 ++++++++++++++++++++----------------- pkg/kubelet/runonce_test.go | 1 + 2 files changed, 34 insertions(+), 28 deletions(-) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 32b251df6e135..64366317708ba 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1245,12 +1245,12 @@ func (kl *Kubelet) computePodContainerChanges(pod *api.Pod, hasMirrorPod bool, c result, err := kl.probeContainer(pod, podStatus, container, dockerContainer.ID, dockerContainer.Created) if err != nil { // TODO(vmarmol): examine this logic. - glog.Infof("probe no-error: %s", container.Name) + glog.Infof("probe no-error: %q", container.Name) containersToKeep[containerID] = index continue } if result == probe.Success { - glog.Infof("probe success: %s", container.Name) + glog.Infof("probe success: %q", container.Name) containersToKeep[containerID] = index continue } @@ -1314,7 +1314,7 @@ func (kl *Kubelet) syncPod(pod *api.Pod, hasMirrorPod bool, containersInPod dock // Before returning, regenerate status and store it in the cache. defer func() { - status, err := kl.generatePodStatus(podFullName, uid) + status, err := kl.generatePodStatusByPod(pod) if err != nil { glog.Errorf("Unable to generate status for pod with name %q and uid %q info with error(%v)", podFullName, uid, err) } else { @@ -1467,7 +1467,7 @@ func (kl *Kubelet) cleanupOrphanedVolumes(pods []api.Pod, running []*docker.Cont if _, ok := desiredVolumes[name]; !ok { parts := strings.Split(name, "/") if runningSet.Has(parts[0]) { - glog.Infof("volume %s, still has a container running %s, skipping teardown", name, parts[0]) + glog.Infof("volume %q, still has a container running %q, skipping teardown", name, parts[0]) continue } //TODO (jonesdl) We should somehow differentiate between volumes that are supposed @@ -1767,9 +1767,9 @@ func (kl *Kubelet) syncStatus(deadline time.Duration) { } _, err = kl.kubeClient.Pods(pod.Namespace).UpdateStatus(pod.Name, &status) if err != nil { - glog.Warningf("Error updating status for pod %s: %v (full pod: %s)", pod.Name, err, pod) + glog.Warningf("Error updating status for pod %q: %v (full pod: %q)", pod.Name, err, pod) } else { - glog.V(3).Infof("Status for pod %q updated successfully: %s", pod.Name, pod) + glog.V(3).Infof("Status for pod %q updated successfully: %q", pod.Name, pod) } } t.Stop() @@ -1887,7 +1887,16 @@ func (kl *Kubelet) GetPods() ([]api.Pod, util.StringSet) { return append([]api.Pod{}, kl.pods...), kl.mirrorPods } -// GetPodByName provides the first pod that matches namespace and name, as well as whether the node was found. +func (kl *Kubelet) GetPodByFullName(podFullName string) (*api.Pod, bool) { + name, namespace, err := ParsePodFullName(podFullName) + if err != nil { + return nil, false + } + return kl.GetPodByName(namespace, name) +} + +// GetPodByName provides the first pod that matches namespace and name, as well +// as whether the pod was found. func (kl *Kubelet) GetPodByName(namespace, name string) (*api.Pod, bool) { kl.podLock.RLock() defer kl.podLock.RUnlock() @@ -1917,10 +1926,10 @@ func (kl *Kubelet) updateNodeStatus() error { func (kl *Kubelet) tryUpdateNodeStatus() error { node, err := kl.kubeClient.Nodes().Get(kl.hostname) if err != nil { - return fmt.Errorf("error getting node %s: %v", kl.hostname, err) + return fmt.Errorf("error getting node %q: %v", kl.hostname, err) } if node == nil { - return fmt.Errorf("no node instance returned for %v", kl.hostname) + return fmt.Errorf("no node instance returned for %q", kl.hostname) } // TODO: Post NotReady if we cannot get MachineInfo from cAdvisor. This needs to start @@ -2042,41 +2051,37 @@ func getPodReadyCondition(spec *api.PodSpec, info api.PodInfo) []api.PodConditio return ready } -func (kl *Kubelet) GetPodByFullName(podFullName string) (*api.PodSpec, bool) { - kl.podLock.RLock() - defer kl.podLock.RUnlock() - for _, pod := range kl.pods { - if GetPodFullName(&pod) == podFullName { - return &pod.Spec, true - } - } - return nil, false -} - // GetPodStatus returns information from Docker about the containers in a pod func (kl *Kubelet) GetPodStatus(podFullName string, uid types.UID) (api.PodStatus, error) { // Check to see if we have a cached version of the status. cachedPodStatus, found := kl.getPodStatusFromCache(podFullName) if found { - glog.V(3).Infof("Returning cached status for %s", podFullName) + glog.V(3).Infof("Returning cached status for %q", podFullName) return cachedPodStatus, nil } return kl.generatePodStatus(podFullName, uid) } func (kl *Kubelet) generatePodStatus(podFullName string, uid types.UID) (api.PodStatus, error) { - glog.V(3).Infof("Generating status for %s", podFullName) - - spec, found := kl.GetPodByFullName(podFullName) + pod, found := kl.GetPodByFullName(podFullName) if !found { - return api.PodStatus{}, fmt.Errorf("Couldn't find spec for pod %s", podFullName) + return api.PodStatus{}, fmt.Errorf("couldn't find pod %q", podFullName) } + return kl.generatePodStatusByPod(pod) +} + +// By passing the pod directly, this method avoids pod lookup, which requires +// grabbing a lock. +func (kl *Kubelet) generatePodStatusByPod(pod *api.Pod) (api.PodStatus, error) { + podFullName := GetPodFullName(pod) + glog.V(3).Infof("Generating status for %q", podFullName) - podStatus, err := dockertools.GetDockerPodStatus(kl.dockerClient, *spec, podFullName, uid) + spec := &pod.Spec + podStatus, err := dockertools.GetDockerPodStatus(kl.dockerClient, *spec, podFullName, pod.UID) if err != nil { // Error handling - glog.Infof("Query docker container info for pod %s failed with error (%v)", podFullName, err) + glog.Infof("Query docker container info for pod %q failed with error (%v)", podFullName, err) if strings.Contains(err.Error(), "resource temporarily unavailable") { // Leave upstream layer to decide what to do return api.PodStatus{}, err @@ -2153,7 +2158,7 @@ func (kl *Kubelet) PortForward(podFullName string, uid types.UID, port uint16, s } podInfraContainer, found, _ := dockerContainers.FindPodContainer(podFullName, uid, dockertools.PodInfraContainerName) if !found { - return fmt.Errorf("Unable to find pod infra container for pod %s, uid %v", podFullName, uid) + return fmt.Errorf("Unable to find pod infra container for pod %q, uid %v", podFullName, uid) } return kl.runner.PortForward(podInfraContainer.ID, port, stream) } diff --git a/pkg/kubelet/runonce_test.go b/pkg/kubelet/runonce_test.go index 55b3189d782ac..b7a6941525d7f 100644 --- a/pkg/kubelet/runonce_test.go +++ b/pkg/kubelet/runonce_test.go @@ -77,6 +77,7 @@ func TestRunOnce(t *testing.T) { rootDirectory: "/tmp/kubelet", recorder: &record.FakeRecorder{}, cadvisor: cadvisor, + podStatuses: make(map[string]api.PodStatus), } kb.networkPlugin, _ = network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil))