Skip to content

Commit

Permalink
Merge pull request kubernetes#5748 from yujuhong/refactor
Browse files Browse the repository at this point in the history
Kubelet: add podManager for managing internal pod storage
  • Loading branch information
vmarmol committed Mar 23, 2015
2 parents 5c71e2b + f440989 commit fe65aa5
Show file tree
Hide file tree
Showing 5 changed files with 279 additions and 165 deletions.
148 changes: 17 additions & 131 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,8 @@ func NewMainKubelet(
imageManager: imageManager,
}

klet.podManager = newBasicPodManager(klet.kubeClient)

dockerCache, err := dockertools.NewDockerCache(dockerClient)
if err != nil {
return nil, err
Expand All @@ -253,8 +255,6 @@ func NewMainKubelet(

klet.podStatuses = make(map[string]api.PodStatus)

klet.mirrorManager = newBasicMirrorManager(klet.kubeClient)

return klet, nil
}

Expand Down Expand Up @@ -285,20 +285,10 @@ type Kubelet struct {
podStatusUpdateFrequency time.Duration
sourcesReady SourcesReadyFn

// Protects the pods array
// We make complete array copies out of this while locked, which is OK because once added to this array,
// pods are immutable
podLock sync.RWMutex
pods []api.Pod
// Record the set of mirror pods (see mirror_manager.go for more details);
// similar to pods, this is not immutable and is protected by the same podLock.
// Note that Kubelet.pods do not contain mirror pods as they are filtered
// out beforehand.
mirrorPods mirrorPods

podManager podManager
// A pod status cache stores statuses for pods (both rejected and synced).
// Note that currently no thread attempts to acquire podStatusesLock while
// holding podLock, and vice versa. If you intend to change this usage
// accessing podManager, and vice versa. If you intend to change this usage
// pattern, please explicitly impose an acquiring order to avoid deadlocks
// and document such an order in the comment.
podStatusesLock sync.RWMutex
Expand Down Expand Up @@ -353,9 +343,6 @@ type Kubelet struct {
// the EventRecorder to use
recorder record.EventRecorder

// A mirror pod manager which provides helper functions.
mirrorManager mirrorManager

// Policy for handling garbage collection of dead containers.
containerGC containerGC

Expand Down Expand Up @@ -1445,7 +1432,7 @@ func (kl *Kubelet) syncPod(pod *api.Pod, hasMirrorPod bool, containersInPod dock

if !hasMirrorPod && isStaticPod(pod) {
glog.V(4).Infof("Creating a mirror pod %q", podFullName)
if err := kl.mirrorManager.CreateMirrorPod(*pod, kl.hostname); err != nil {
if err := kl.podManager.CreateMirrorPod(*pod, kl.hostname); err != nil {
glog.Errorf("Failed creating a mirror pod %q: %#v", podFullName, err)
}
}
Expand Down Expand Up @@ -1572,7 +1559,7 @@ func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metric
}

// Run the sync in an async manifest worker.
kl.podWorkers.UpdatePod(pod, kl.mirrorPods.HasMirrorPod(uid), func() {
kl.podWorkers.UpdatePod(pod, mirrorPods.HasMirrorPod(uid), func() {
metrics.SyncPodLatency.WithLabelValues(podSyncTypes[pod.UID].String()).Observe(metrics.SinceInMicroseconds(start))
})

Expand Down Expand Up @@ -1641,33 +1628,11 @@ func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metric
}

// Remove any orphaned mirror pods.
deleteOrphanedMirrorPods(mirrorPods, kl.mirrorManager)
kl.podManager.DeleteOrphanedMirrorPods(&mirrorPods)

return err
}

func updatePods(changed []api.Pod, current []api.Pod) []api.Pod {
updated := []api.Pod{}
m := map[types.UID]*api.Pod{}
for i := range changed {
pod := &changed[i]
m[pod.UID] = pod
}

for i := range current {
pod := &current[i]
if m[pod.UID] != nil {
updated = append(updated, *m[pod.UID])
glog.V(4).Infof("pod with UID: %q has a new spec %+v", pod.UID, *m[pod.UID])
} else {
updated = append(updated, *pod)
glog.V(4).Infof("pod with UID: %q stay with the same spec %+v", pod.UID, *pod)
}
}

return updated
}

type podsByCreationTime []api.Pod

func (s podsByCreationTime) Len() int {
Expand Down Expand Up @@ -1771,7 +1736,7 @@ func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) {
podSyncTypes := make(map[types.UID]metrics.SyncPodType)
select {
case u := <-updates:
kl.updatePods(u, podSyncTypes)
kl.podManager.UpdatePods(u, podSyncTypes)
unsyncedPod = true
case <-time.After(kl.resyncInterval):
glog.V(4).Infof("Periodic sync")
Expand All @@ -1782,7 +1747,7 @@ func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) {
for unsyncedPod {
select {
case u := <-updates:
kl.updatePods(u, podSyncTypes)
kl.podManager.UpdatePods(u, podSyncTypes)
case <-time.After(5 * time.Millisecond):
// Break the for loop.
unsyncedPod = false
Expand Down Expand Up @@ -1830,52 +1795,6 @@ func (kl *Kubelet) syncStatus(deadline time.Duration) {
t.Stop()
}

// Update the Kubelet's internal pods with those provided by the update.
// Records new and updated pods in newPods and updatedPods.
func (kl *Kubelet) updatePods(u PodUpdate, podSyncTypes map[types.UID]metrics.SyncPodType) {
kl.podLock.Lock()
defer kl.podLock.Unlock()
switch u.Op {
case SET:
glog.V(3).Infof("SET: Containers changed")
newPods, newMirrorPods := filterAndCategorizePods(u.Pods)

// Store the new pods. Don't worry about filtering host ports since those
// pods will never be looked up.
existingPods := make(map[types.UID]struct{})
for i := range kl.pods {
existingPods[kl.pods[i].UID] = struct{}{}
}
for _, pod := range newPods {
if _, ok := existingPods[pod.UID]; !ok {
podSyncTypes[pod.UID] = metrics.SyncPodCreate
}
}
// Actually update the pods.
kl.pods = newPods
kl.mirrorPods = newMirrorPods
case UPDATE:
glog.V(3).Infof("Update: Containers changed")

// Store the updated pods. Don't worry about filtering host ports since those
// pods will never be looked up.
for i := range u.Pods {
podSyncTypes[u.Pods[i].UID] = metrics.SyncPodUpdate
}
allPods := updatePods(u.Pods, kl.pods)
kl.pods, kl.mirrorPods = filterAndCategorizePods(allPods)
default:
panic("syncLoop does not support incremental changes")
}

// Mark all remaining pods as sync.
for i := range kl.pods {
if _, ok := podSyncTypes[kl.pods[i].UID]; !ok {
podSyncTypes[u.Pods[i].UID] = metrics.SyncPodSync
}
}
}

// Returns Docker version for this Kubelet.
func (kl *Kubelet) GetDockerVersion() ([]uint, error) {
if kl.dockerClient == nil {
Expand Down Expand Up @@ -1937,31 +1856,17 @@ func (kl *Kubelet) GetHostname() string {
// GetPods returns all pods bound to the kubelet and their spec, and the mirror
// pod map.
func (kl *Kubelet) GetPods() ([]api.Pod, mirrorPods) {
kl.podLock.RLock()
defer kl.podLock.RUnlock()
return append([]api.Pod{}, kl.pods...), kl.mirrorPods
return kl.podManager.GetPods()
}

func (kl *Kubelet) GetPodByFullName(podFullName string) (*api.Pod, bool) {
name, namespace, err := ParsePodFullName(podFullName)
if err != nil {
return nil, false
}
return kl.GetPodByName(namespace, name)
return kl.podManager.GetPodByFullName(podFullName)
}

// GetPodByName provides the first pod that matches namespace and name, as well
// as whether the pod was found.
func (kl *Kubelet) GetPodByName(namespace, name string) (*api.Pod, bool) {
kl.podLock.RLock()
defer kl.podLock.RUnlock()
for i := range kl.pods {
pod := kl.pods[i]
if pod.Namespace == namespace && pod.Name == name {
return &pod, true
}
}
return nil, false
return kl.podManager.GetPodByName(namespace, name)
}

// updateNodeStatus updates node status to master with retries.
Expand Down Expand Up @@ -2108,7 +2013,7 @@ func getPodReadyCondition(spec *api.PodSpec, info api.PodInfo) []api.PodConditio

// GetPodStatus returns information from Docker about the containers in a pod
func (kl *Kubelet) GetPodStatus(podFullName string, uid types.UID) (api.PodStatus, error) {
uid = kl.translatePodUID(uid)
uid = kl.podManager.TranslatePodUID(uid)

// Check to see if we have a cached version of the status.
cachedPodStatus, found := kl.getPodStatusFromCache(podFullName)
Expand Down Expand Up @@ -2172,7 +2077,7 @@ func (kl *Kubelet) ServeLogs(w http.ResponseWriter, req *http.Request) {

// Run a command in a container, returns the combined stdout, stderr as an array of bytes
func (kl *Kubelet) RunInContainer(podFullName string, uid types.UID, container string, cmd []string) ([]byte, error) {
uid = kl.translatePodUID(uid)
uid = kl.podManager.TranslatePodUID(uid)

if kl.runner == nil {
return nil, fmt.Errorf("no runner specified.")
Expand All @@ -2191,7 +2096,7 @@ func (kl *Kubelet) RunInContainer(podFullName string, uid types.UID, container s
// ExecInContainer executes a command in a container, connecting the supplied
// stdin/stdout/stderr to the command's IO streams.
func (kl *Kubelet) ExecInContainer(podFullName string, uid types.UID, container string, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error {
uid = kl.translatePodUID(uid)
uid = kl.podManager.TranslatePodUID(uid)

if kl.runner == nil {
return fmt.Errorf("no runner specified.")
Expand All @@ -2210,7 +2115,7 @@ func (kl *Kubelet) ExecInContainer(podFullName string, uid types.UID, container
// PortForward connects to the pod's port and copies data between the port
// and the stream.
func (kl *Kubelet) PortForward(podFullName string, uid types.UID, port uint16, stream io.ReadWriteCloser) error {
uid = kl.translatePodUID(uid)
uid = kl.podManager.TranslatePodUID(uid)

if kl.runner == nil {
return fmt.Errorf("no runner specified.")
Expand Down Expand Up @@ -2244,29 +2149,10 @@ func (kl *Kubelet) StreamingConnectionIdleTimeout() time.Duration {
return kl.streamingConnectionIdleTimeout
}

// If the UID belongs to a mirror pod, maps it to the UID of its static pod.
// Otherwise, return the original UID. All public-facing functions should
// perform this translation for UIDs because user may provide a mirror pod UID,
// which is not recognized by internal Kubelet functions.
func (kl *Kubelet) translatePodUID(uid types.UID) types.UID {
if uid == "" {
return uid
}

kl.podLock.RLock()
defer kl.podLock.RUnlock()
staticUID, ok := kl.mirrorPods.GetStaticUID(uid)
if ok {
return staticUID
} else {
return uid
}
}

// GetContainerInfo returns stats (from Cadvisor) for a container.
func (kl *Kubelet) GetContainerInfo(podFullName string, uid types.UID, containerName string, req *cadvisorApi.ContainerInfoRequest) (*cadvisorApi.ContainerInfo, error) {

uid = kl.translatePodUID(uid)
uid = kl.podManager.TranslatePodUID(uid)

dockerContainers, err := dockertools.GetKubeletDockerContainers(kl.dockerClient, false)
if err != nil {
Expand Down
Loading

0 comments on commit fe65aa5

Please sign in to comment.