Skip to content

Commit

Permalink
Merge pull request #5685 from yujuhong/pod_status
Browse files Browse the repository at this point in the history
Kubelet: per-pod workers should avoid grabbing the pod array lock
  • Loading branch information
vmarmol committed Mar 20, 2015
2 parents 699dc96 + 0d0fb5f commit b6d2cf4
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 28 deletions.
61 changes: 33 additions & 28 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -1245,12 +1245,12 @@ func (kl *Kubelet) computePodContainerChanges(pod *api.Pod, hasMirrorPod bool, c
result, err := kl.probeContainer(pod, podStatus, container, dockerContainer.ID, dockerContainer.Created)
if err != nil {
// TODO(vmarmol): examine this logic.
glog.Infof("probe no-error: %s", container.Name)
glog.Infof("probe no-error: %q", container.Name)
containersToKeep[containerID] = index
continue
}
if result == probe.Success {
glog.Infof("probe success: %s", container.Name)
glog.Infof("probe success: %q", container.Name)
containersToKeep[containerID] = index
continue
}
Expand Down Expand Up @@ -1314,7 +1314,7 @@ func (kl *Kubelet) syncPod(pod *api.Pod, hasMirrorPod bool, containersInPod dock

// Before returning, regenerate status and store it in the cache.
defer func() {
status, err := kl.generatePodStatus(podFullName, uid)
status, err := kl.generatePodStatusByPod(pod)
if err != nil {
glog.Errorf("Unable to generate status for pod with name %q and uid %q info with error(%v)", podFullName, uid, err)
} else {
Expand Down Expand Up @@ -1467,7 +1467,7 @@ func (kl *Kubelet) cleanupOrphanedVolumes(pods []api.Pod, running []*docker.Cont
if _, ok := desiredVolumes[name]; !ok {
parts := strings.Split(name, "/")
if runningSet.Has(parts[0]) {
glog.Infof("volume %s, still has a container running %s, skipping teardown", name, parts[0])
glog.Infof("volume %q, still has a container running %q, skipping teardown", name, parts[0])
continue
}
//TODO (jonesdl) We should somehow differentiate between volumes that are supposed
Expand Down Expand Up @@ -1767,9 +1767,9 @@ func (kl *Kubelet) syncStatus(deadline time.Duration) {
}
_, err = kl.kubeClient.Pods(pod.Namespace).UpdateStatus(pod.Name, &status)
if err != nil {
glog.Warningf("Error updating status for pod %s: %v (full pod: %s)", pod.Name, err, pod)
glog.Warningf("Error updating status for pod %q: %v (full pod: %q)", pod.Name, err, pod)
} else {
glog.V(3).Infof("Status for pod %q updated successfully: %s", pod.Name, pod)
glog.V(3).Infof("Status for pod %q updated successfully: %q", pod.Name, pod)
}
}
t.Stop()
Expand Down Expand Up @@ -1887,7 +1887,16 @@ func (kl *Kubelet) GetPods() ([]api.Pod, util.StringSet) {
return append([]api.Pod{}, kl.pods...), kl.mirrorPods
}

// GetPodByName provides the first pod that matches namespace and name, as well as whether the node was found.
func (kl *Kubelet) GetPodByFullName(podFullName string) (*api.Pod, bool) {
name, namespace, err := ParsePodFullName(podFullName)
if err != nil {
return nil, false
}
return kl.GetPodByName(namespace, name)
}

// GetPodByName provides the first pod that matches namespace and name, as well
// as whether the pod was found.
func (kl *Kubelet) GetPodByName(namespace, name string) (*api.Pod, bool) {
kl.podLock.RLock()
defer kl.podLock.RUnlock()
Expand Down Expand Up @@ -1917,10 +1926,10 @@ func (kl *Kubelet) updateNodeStatus() error {
func (kl *Kubelet) tryUpdateNodeStatus() error {
node, err := kl.kubeClient.Nodes().Get(kl.hostname)
if err != nil {
return fmt.Errorf("error getting node %s: %v", kl.hostname, err)
return fmt.Errorf("error getting node %q: %v", kl.hostname, err)
}
if node == nil {
return fmt.Errorf("no node instance returned for %v", kl.hostname)
return fmt.Errorf("no node instance returned for %q", kl.hostname)
}

// TODO: Post NotReady if we cannot get MachineInfo from cAdvisor. This needs to start
Expand Down Expand Up @@ -2042,41 +2051,37 @@ func getPodReadyCondition(spec *api.PodSpec, info api.PodInfo) []api.PodConditio
return ready
}

func (kl *Kubelet) GetPodByFullName(podFullName string) (*api.PodSpec, bool) {
kl.podLock.RLock()
defer kl.podLock.RUnlock()
for _, pod := range kl.pods {
if GetPodFullName(&pod) == podFullName {
return &pod.Spec, true
}
}
return nil, false
}

// GetPodStatus returns information from Docker about the containers in a pod
func (kl *Kubelet) GetPodStatus(podFullName string, uid types.UID) (api.PodStatus, error) {
// Check to see if we have a cached version of the status.
cachedPodStatus, found := kl.getPodStatusFromCache(podFullName)
if found {
glog.V(3).Infof("Returning cached status for %s", podFullName)
glog.V(3).Infof("Returning cached status for %q", podFullName)
return cachedPodStatus, nil
}
return kl.generatePodStatus(podFullName, uid)
}

func (kl *Kubelet) generatePodStatus(podFullName string, uid types.UID) (api.PodStatus, error) {
glog.V(3).Infof("Generating status for %s", podFullName)

spec, found := kl.GetPodByFullName(podFullName)
pod, found := kl.GetPodByFullName(podFullName)
if !found {
return api.PodStatus{}, fmt.Errorf("Couldn't find spec for pod %s", podFullName)
return api.PodStatus{}, fmt.Errorf("couldn't find pod %q", podFullName)
}
return kl.generatePodStatusByPod(pod)
}

// By passing the pod directly, this method avoids pod lookup, which requires
// grabbing a lock.
func (kl *Kubelet) generatePodStatusByPod(pod *api.Pod) (api.PodStatus, error) {
podFullName := GetPodFullName(pod)
glog.V(3).Infof("Generating status for %q", podFullName)

podStatus, err := dockertools.GetDockerPodStatus(kl.dockerClient, *spec, podFullName, uid)
spec := &pod.Spec
podStatus, err := dockertools.GetDockerPodStatus(kl.dockerClient, *spec, podFullName, pod.UID)

if err != nil {
// Error handling
glog.Infof("Query docker container info for pod %s failed with error (%v)", podFullName, err)
glog.Infof("Query docker container info for pod %q failed with error (%v)", podFullName, err)
if strings.Contains(err.Error(), "resource temporarily unavailable") {
// Leave upstream layer to decide what to do
return api.PodStatus{}, err
Expand Down Expand Up @@ -2153,7 +2158,7 @@ func (kl *Kubelet) PortForward(podFullName string, uid types.UID, port uint16, s
}
podInfraContainer, found, _ := dockerContainers.FindPodContainer(podFullName, uid, dockertools.PodInfraContainerName)
if !found {
return fmt.Errorf("Unable to find pod infra container for pod %s, uid %v", podFullName, uid)
return fmt.Errorf("Unable to find pod infra container for pod %q, uid %v", podFullName, uid)
}
return kl.runner.PortForward(podInfraContainer.ID, port, stream)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/kubelet/runonce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func TestRunOnce(t *testing.T) {
rootDirectory: "/tmp/kubelet",
recorder: &record.FakeRecorder{},
cadvisor: cadvisor,
podStatuses: make(map[string]api.PodStatus),
}

kb.networkPlugin, _ = network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil))
Expand Down

0 comments on commit b6d2cf4

Please sign in to comment.