Skip to content

Commit

Permalink
Kubelet: per-pod workers should avoid grabbing the pod array lock
Browse files Browse the repository at this point in the history
Per-pod worker syncs the pod and container status, and write the pod status in
the pod status cache. Given that it already owns a copy of the pod, it can
bypass the additional pod lookup step completely. This change adds a new
generatePodStatusByPod() method to achieve this. In general, per-pod worker
should avoid accessing the internal pod array completely, as this would may
lead to high contention.

This change also changes the return type of GetPodByFullName to reflect the
name, and consolidates GetPodByFullName() and GetPodByName().
  • Loading branch information
yujuhong committed Mar 20, 2015
1 parent 1cbde2c commit 0d0fb5f
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 28 deletions.
61 changes: 33 additions & 28 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -1245,12 +1245,12 @@ func (kl *Kubelet) computePodContainerChanges(pod *api.Pod, hasMirrorPod bool, c
result, err := kl.probeContainer(pod, podStatus, container, dockerContainer.ID, dockerContainer.Created)
if err != nil {
// TODO(vmarmol): examine this logic.
glog.Infof("probe no-error: %s", container.Name)
glog.Infof("probe no-error: %q", container.Name)
containersToKeep[containerID] = index
continue
}
if result == probe.Success {
glog.Infof("probe success: %s", container.Name)
glog.Infof("probe success: %q", container.Name)
containersToKeep[containerID] = index
continue
}
Expand Down Expand Up @@ -1314,7 +1314,7 @@ func (kl *Kubelet) syncPod(pod *api.Pod, hasMirrorPod bool, containersInPod dock

// Before returning, regenerate status and store it in the cache.
defer func() {
status, err := kl.generatePodStatus(podFullName, uid)
status, err := kl.generatePodStatusByPod(pod)
if err != nil {
glog.Errorf("Unable to generate status for pod with name %q and uid %q info with error(%v)", podFullName, uid, err)
} else {
Expand Down Expand Up @@ -1467,7 +1467,7 @@ func (kl *Kubelet) cleanupOrphanedVolumes(pods []api.Pod, running []*docker.Cont
if _, ok := desiredVolumes[name]; !ok {
parts := strings.Split(name, "/")
if runningSet.Has(parts[0]) {
glog.Infof("volume %s, still has a container running %s, skipping teardown", name, parts[0])
glog.Infof("volume %q, still has a container running %q, skipping teardown", name, parts[0])
continue
}
//TODO (jonesdl) We should somehow differentiate between volumes that are supposed
Expand Down Expand Up @@ -1767,9 +1767,9 @@ func (kl *Kubelet) syncStatus(deadline time.Duration) {
}
_, err = kl.kubeClient.Pods(pod.Namespace).UpdateStatus(pod.Name, &status)
if err != nil {
glog.Warningf("Error updating status for pod %s: %v (full pod: %s)", pod.Name, err, pod)
glog.Warningf("Error updating status for pod %q: %v (full pod: %q)", pod.Name, err, pod)
} else {
glog.V(3).Infof("Status for pod %q updated successfully: %s", pod.Name, pod)
glog.V(3).Infof("Status for pod %q updated successfully: %q", pod.Name, pod)
}
}
t.Stop()
Expand Down Expand Up @@ -1887,7 +1887,16 @@ func (kl *Kubelet) GetPods() ([]api.Pod, util.StringSet) {
return append([]api.Pod{}, kl.pods...), kl.mirrorPods
}

// GetPodByName provides the first pod that matches namespace and name, as well as whether the node was found.
func (kl *Kubelet) GetPodByFullName(podFullName string) (*api.Pod, bool) {
name, namespace, err := ParsePodFullName(podFullName)
if err != nil {
return nil, false
}
return kl.GetPodByName(namespace, name)
}

// GetPodByName provides the first pod that matches namespace and name, as well
// as whether the pod was found.
func (kl *Kubelet) GetPodByName(namespace, name string) (*api.Pod, bool) {
kl.podLock.RLock()
defer kl.podLock.RUnlock()
Expand Down Expand Up @@ -1917,10 +1926,10 @@ func (kl *Kubelet) updateNodeStatus() error {
func (kl *Kubelet) tryUpdateNodeStatus() error {
node, err := kl.kubeClient.Nodes().Get(kl.hostname)
if err != nil {
return fmt.Errorf("error getting node %s: %v", kl.hostname, err)
return fmt.Errorf("error getting node %q: %v", kl.hostname, err)
}
if node == nil {
return fmt.Errorf("no node instance returned for %v", kl.hostname)
return fmt.Errorf("no node instance returned for %q", kl.hostname)
}

// TODO: Post NotReady if we cannot get MachineInfo from cAdvisor. This needs to start
Expand Down Expand Up @@ -2042,41 +2051,37 @@ func getPodReadyCondition(spec *api.PodSpec, info api.PodInfo) []api.PodConditio
return ready
}

func (kl *Kubelet) GetPodByFullName(podFullName string) (*api.PodSpec, bool) {
kl.podLock.RLock()
defer kl.podLock.RUnlock()
for _, pod := range kl.pods {
if GetPodFullName(&pod) == podFullName {
return &pod.Spec, true
}
}
return nil, false
}

// GetPodStatus returns information from Docker about the containers in a pod
func (kl *Kubelet) GetPodStatus(podFullName string, uid types.UID) (api.PodStatus, error) {
// Check to see if we have a cached version of the status.
cachedPodStatus, found := kl.getPodStatusFromCache(podFullName)
if found {
glog.V(3).Infof("Returning cached status for %s", podFullName)
glog.V(3).Infof("Returning cached status for %q", podFullName)
return cachedPodStatus, nil
}
return kl.generatePodStatus(podFullName, uid)
}

func (kl *Kubelet) generatePodStatus(podFullName string, uid types.UID) (api.PodStatus, error) {
glog.V(3).Infof("Generating status for %s", podFullName)

spec, found := kl.GetPodByFullName(podFullName)
pod, found := kl.GetPodByFullName(podFullName)
if !found {
return api.PodStatus{}, fmt.Errorf("Couldn't find spec for pod %s", podFullName)
return api.PodStatus{}, fmt.Errorf("couldn't find pod %q", podFullName)
}
return kl.generatePodStatusByPod(pod)
}

// By passing the pod directly, this method avoids pod lookup, which requires
// grabbing a lock.
func (kl *Kubelet) generatePodStatusByPod(pod *api.Pod) (api.PodStatus, error) {
podFullName := GetPodFullName(pod)
glog.V(3).Infof("Generating status for %q", podFullName)

podStatus, err := dockertools.GetDockerPodStatus(kl.dockerClient, *spec, podFullName, uid)
spec := &pod.Spec
podStatus, err := dockertools.GetDockerPodStatus(kl.dockerClient, *spec, podFullName, pod.UID)

if err != nil {
// Error handling
glog.Infof("Query docker container info for pod %s failed with error (%v)", podFullName, err)
glog.Infof("Query docker container info for pod %q failed with error (%v)", podFullName, err)
if strings.Contains(err.Error(), "resource temporarily unavailable") {
// Leave upstream layer to decide what to do
return api.PodStatus{}, err
Expand Down Expand Up @@ -2153,7 +2158,7 @@ func (kl *Kubelet) PortForward(podFullName string, uid types.UID, port uint16, s
}
podInfraContainer, found, _ := dockerContainers.FindPodContainer(podFullName, uid, dockertools.PodInfraContainerName)
if !found {
return fmt.Errorf("Unable to find pod infra container for pod %s, uid %v", podFullName, uid)
return fmt.Errorf("Unable to find pod infra container for pod %q, uid %v", podFullName, uid)
}
return kl.runner.PortForward(podInfraContainer.ID, port, stream)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/kubelet/runonce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func TestRunOnce(t *testing.T) {
rootDirectory: "/tmp/kubelet",
recorder: &record.FakeRecorder{},
cadvisor: cadvisor,
podStatuses: make(map[string]api.PodStatus),
}

kb.networkPlugin, _ = network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil))
Expand Down

0 comments on commit 0d0fb5f

Please sign in to comment.