Skip to content

Commit

Permalink
Kubelet: add podManager for managing internal pod storage
Browse files Browse the repository at this point in the history
This change moves pod array and mirrorPods into podManager, along with all
methods accessing these internal pod storages. This is the first step of the
refactoring, and no function change is involved.
  • Loading branch information
yujuhong committed Mar 23, 2015
1 parent 737af02 commit f440989
Show file tree
Hide file tree
Showing 5 changed files with 279 additions and 165 deletions.
148 changes: 17 additions & 131 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,8 @@ func NewMainKubelet(
imageManager: imageManager,
}

klet.podManager = newBasicPodManager(klet.kubeClient)

dockerCache, err := dockertools.NewDockerCache(dockerClient)
if err != nil {
return nil, err
Expand All @@ -253,8 +255,6 @@ func NewMainKubelet(

klet.podStatuses = make(map[string]api.PodStatus)

klet.mirrorManager = newBasicMirrorManager(klet.kubeClient)

return klet, nil
}

Expand Down Expand Up @@ -285,20 +285,10 @@ type Kubelet struct {
podStatusUpdateFrequency time.Duration
sourcesReady SourcesReadyFn

// Protects the pods array
// We make complete array copies out of this while locked, which is OK because once added to this array,
// pods are immutable
podLock sync.RWMutex
pods []api.Pod
// Record the set of mirror pods (see mirror_manager.go for more details);
// similar to pods, this is not immutable and is protected by the same podLock.
// Note that Kubelet.pods do not contain mirror pods as they are filtered
// out beforehand.
mirrorPods mirrorPods

podManager podManager
// A pod status cache stores statuses for pods (both rejected and synced).
// Note that currently no thread attempts to acquire podStatusesLock while
// holding podLock, and vice versa. If you intend to change this usage
// accessing podManager, and vice versa. If you intend to change this usage
// pattern, please explicitly impose an acquiring order to avoid deadlocks
// and document such an order in the comment.
podStatusesLock sync.RWMutex
Expand Down Expand Up @@ -353,9 +343,6 @@ type Kubelet struct {
// the EventRecorder to use
recorder record.EventRecorder

// A mirror pod manager which provides helper functions.
mirrorManager mirrorManager

// Policy for handling garbage collection of dead containers.
containerGC containerGC

Expand Down Expand Up @@ -1445,7 +1432,7 @@ func (kl *Kubelet) syncPod(pod *api.Pod, hasMirrorPod bool, containersInPod dock

if !hasMirrorPod && isStaticPod(pod) {
glog.V(4).Infof("Creating a mirror pod %q", podFullName)
if err := kl.mirrorManager.CreateMirrorPod(*pod, kl.hostname); err != nil {
if err := kl.podManager.CreateMirrorPod(*pod, kl.hostname); err != nil {
glog.Errorf("Failed creating a mirror pod %q: %#v", podFullName, err)
}
}
Expand Down Expand Up @@ -1572,7 +1559,7 @@ func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metric
}

// Run the sync in an async manifest worker.
kl.podWorkers.UpdatePod(pod, kl.mirrorPods.HasMirrorPod(uid), func() {
kl.podWorkers.UpdatePod(pod, mirrorPods.HasMirrorPod(uid), func() {
metrics.SyncPodLatency.WithLabelValues(podSyncTypes[pod.UID].String()).Observe(metrics.SinceInMicroseconds(start))
})

Expand Down Expand Up @@ -1641,33 +1628,11 @@ func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metric
}

// Remove any orphaned mirror pods.
deleteOrphanedMirrorPods(mirrorPods, kl.mirrorManager)
kl.podManager.DeleteOrphanedMirrorPods(&mirrorPods)

return err
}

func updatePods(changed []api.Pod, current []api.Pod) []api.Pod {
updated := []api.Pod{}
m := map[types.UID]*api.Pod{}
for i := range changed {
pod := &changed[i]
m[pod.UID] = pod
}

for i := range current {
pod := &current[i]
if m[pod.UID] != nil {
updated = append(updated, *m[pod.UID])
glog.V(4).Infof("pod with UID: %q has a new spec %+v", pod.UID, *m[pod.UID])
} else {
updated = append(updated, *pod)
glog.V(4).Infof("pod with UID: %q stay with the same spec %+v", pod.UID, *pod)
}
}

return updated
}

type podsByCreationTime []api.Pod

func (s podsByCreationTime) Len() int {
Expand Down Expand Up @@ -1771,7 +1736,7 @@ func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) {
podSyncTypes := make(map[types.UID]metrics.SyncPodType)
select {
case u := <-updates:
kl.updatePods(u, podSyncTypes)
kl.podManager.UpdatePods(u, podSyncTypes)
unsyncedPod = true
case <-time.After(kl.resyncInterval):
glog.V(4).Infof("Periodic sync")
Expand All @@ -1782,7 +1747,7 @@ func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) {
for unsyncedPod {
select {
case u := <-updates:
kl.updatePods(u, podSyncTypes)
kl.podManager.UpdatePods(u, podSyncTypes)
case <-time.After(5 * time.Millisecond):
// Break the for loop.
unsyncedPod = false
Expand Down Expand Up @@ -1830,52 +1795,6 @@ func (kl *Kubelet) syncStatus(deadline time.Duration) {
t.Stop()
}

// Update the Kubelet's internal pods with those provided by the update.
// Records new and updated pods in newPods and updatedPods.
func (kl *Kubelet) updatePods(u PodUpdate, podSyncTypes map[types.UID]metrics.SyncPodType) {
kl.podLock.Lock()
defer kl.podLock.Unlock()
switch u.Op {
case SET:
glog.V(3).Infof("SET: Containers changed")
newPods, newMirrorPods := filterAndCategorizePods(u.Pods)

// Store the new pods. Don't worry about filtering host ports since those
// pods will never be looked up.
existingPods := make(map[types.UID]struct{})
for i := range kl.pods {
existingPods[kl.pods[i].UID] = struct{}{}
}
for _, pod := range newPods {
if _, ok := existingPods[pod.UID]; !ok {
podSyncTypes[pod.UID] = metrics.SyncPodCreate
}
}
// Actually update the pods.
kl.pods = newPods
kl.mirrorPods = newMirrorPods
case UPDATE:
glog.V(3).Infof("Update: Containers changed")

// Store the updated pods. Don't worry about filtering host ports since those
// pods will never be looked up.
for i := range u.Pods {
podSyncTypes[u.Pods[i].UID] = metrics.SyncPodUpdate
}
allPods := updatePods(u.Pods, kl.pods)
kl.pods, kl.mirrorPods = filterAndCategorizePods(allPods)
default:
panic("syncLoop does not support incremental changes")
}

// Mark all remaining pods as sync.
for i := range kl.pods {
if _, ok := podSyncTypes[kl.pods[i].UID]; !ok {
podSyncTypes[u.Pods[i].UID] = metrics.SyncPodSync
}
}
}

// Returns Docker version for this Kubelet.
func (kl *Kubelet) GetDockerVersion() ([]uint, error) {
if kl.dockerClient == nil {
Expand Down Expand Up @@ -1937,31 +1856,17 @@ func (kl *Kubelet) GetHostname() string {
// GetPods returns all pods bound to the kubelet and their spec, and the mirror
// pod map.
func (kl *Kubelet) GetPods() ([]api.Pod, mirrorPods) {
kl.podLock.RLock()
defer kl.podLock.RUnlock()
return append([]api.Pod{}, kl.pods...), kl.mirrorPods
return kl.podManager.GetPods()
}

func (kl *Kubelet) GetPodByFullName(podFullName string) (*api.Pod, bool) {
name, namespace, err := ParsePodFullName(podFullName)
if err != nil {
return nil, false
}
return kl.GetPodByName(namespace, name)
return kl.podManager.GetPodByFullName(podFullName)
}

// GetPodByName provides the first pod that matches namespace and name, as well
// as whether the pod was found.
func (kl *Kubelet) GetPodByName(namespace, name string) (*api.Pod, bool) {
kl.podLock.RLock()
defer kl.podLock.RUnlock()
for i := range kl.pods {
pod := kl.pods[i]
if pod.Namespace == namespace && pod.Name == name {
return &pod, true
}
}
return nil, false
return kl.podManager.GetPodByName(namespace, name)
}

// updateNodeStatus updates node status to master with retries.
Expand Down Expand Up @@ -2108,7 +2013,7 @@ func getPodReadyCondition(spec *api.PodSpec, info api.PodInfo) []api.PodConditio

// GetPodStatus returns information from Docker about the containers in a pod
func (kl *Kubelet) GetPodStatus(podFullName string, uid types.UID) (api.PodStatus, error) {
uid = kl.translatePodUID(uid)
uid = kl.podManager.TranslatePodUID(uid)

// Check to see if we have a cached version of the status.
cachedPodStatus, found := kl.getPodStatusFromCache(podFullName)
Expand Down Expand Up @@ -2172,7 +2077,7 @@ func (kl *Kubelet) ServeLogs(w http.ResponseWriter, req *http.Request) {

// Run a command in a container, returns the combined stdout, stderr as an array of bytes
func (kl *Kubelet) RunInContainer(podFullName string, uid types.UID, container string, cmd []string) ([]byte, error) {
uid = kl.translatePodUID(uid)
uid = kl.podManager.TranslatePodUID(uid)

if kl.runner == nil {
return nil, fmt.Errorf("no runner specified.")
Expand All @@ -2191,7 +2096,7 @@ func (kl *Kubelet) RunInContainer(podFullName string, uid types.UID, container s
// ExecInContainer executes a command in a container, connecting the supplied
// stdin/stdout/stderr to the command's IO streams.
func (kl *Kubelet) ExecInContainer(podFullName string, uid types.UID, container string, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error {
uid = kl.translatePodUID(uid)
uid = kl.podManager.TranslatePodUID(uid)

if kl.runner == nil {
return fmt.Errorf("no runner specified.")
Expand All @@ -2210,7 +2115,7 @@ func (kl *Kubelet) ExecInContainer(podFullName string, uid types.UID, container
// PortForward connects to the pod's port and copies data between the port
// and the stream.
func (kl *Kubelet) PortForward(podFullName string, uid types.UID, port uint16, stream io.ReadWriteCloser) error {
uid = kl.translatePodUID(uid)
uid = kl.podManager.TranslatePodUID(uid)

if kl.runner == nil {
return fmt.Errorf("no runner specified.")
Expand Down Expand Up @@ -2244,29 +2149,10 @@ func (kl *Kubelet) StreamingConnectionIdleTimeout() time.Duration {
return kl.streamingConnectionIdleTimeout
}

// If the UID belongs to a mirror pod, maps it to the UID of its static pod.
// Otherwise, return the original UID. All public-facing functions should
// perform this translation for UIDs because user may provide a mirror pod UID,
// which is not recognized by internal Kubelet functions.
func (kl *Kubelet) translatePodUID(uid types.UID) types.UID {
if uid == "" {
return uid
}

kl.podLock.RLock()
defer kl.podLock.RUnlock()
staticUID, ok := kl.mirrorPods.GetStaticUID(uid)
if ok {
return staticUID
} else {
return uid
}
}

// GetContainerInfo returns stats (from Cadvisor) for a container.
func (kl *Kubelet) GetContainerInfo(podFullName string, uid types.UID, containerName string, req *cadvisorApi.ContainerInfoRequest) (*cadvisorApi.ContainerInfo, error) {

uid = kl.translatePodUID(uid)
uid = kl.podManager.TranslatePodUID(uid)

dockerContainers, err := dockertools.GetKubeletDockerContainers(kl.dockerClient, false)
if err != nil {
Expand Down
Loading

0 comments on commit f440989

Please sign in to comment.