Skip to content

Commit

Permalink
Change PodWorkers to have desired cache.
Browse files Browse the repository at this point in the history
  • Loading branch information
wojtek-t committed Mar 11, 2015
1 parent 6d465c4 commit 80576df
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 27 deletions.
18 changes: 18 additions & 0 deletions pkg/kubelet/dockertools/docker_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

type DockerCache interface {
RunningContainers() (DockerContainers, error)
ForceUpdateIfOlder(time.Time) error
}

func NewDockerCache(client DockerInterface) (DockerCache, error) {
Expand All @@ -49,6 +50,9 @@ type dockerCache struct {
updatingThreadStopTime time.Time
}

// Ensure that dockerCache abides by the DockerCache interface.
var _ DockerCache = new(dockerCache)

func (d *dockerCache) RunningContainers() (DockerContainers, error) {
d.lock.Lock()
defer d.lock.Unlock()
Expand All @@ -69,6 +73,20 @@ func (d *dockerCache) RunningContainers() (DockerContainers, error) {
return d.containers, nil
}

func (d *dockerCache) ForceUpdateIfOlder(minExpectedCacheTime time.Time) error {
d.lock.Lock()
defer d.lock.Unlock()
if d.cacheTime.Before(minExpectedCacheTime) {
containers, err := GetKubeletDockerContainers(d.client, false)
if err != nil {
return err
}
d.containers = containers
d.cacheTime = time.Now()
}
return nil
}

func (d *dockerCache) startUpdatingCache() {
run := true
for run {
Expand Down
5 changes: 5 additions & 0 deletions pkg/kubelet/dockertools/fake_docker_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"reflect"
"sync"
"time"

"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/fsouza/go-dockerclient"
Expand Down Expand Up @@ -246,3 +247,7 @@ func NewFakeDockerCache(client DockerInterface) DockerCache {
func (f *FakeDockerCache) RunningContainers() (DockerContainers, error) {
return GetKubeletDockerContainers(f.client, false)
}

func (f *FakeDockerCache) ForceUpdateIfOlder(time.Time) error {
return nil
}
69 changes: 42 additions & 27 deletions pkg/kubelet/pod_workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package kubelet

import (
"sync"
"time"

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
Expand All @@ -41,6 +42,9 @@ type podWorkers struct {
// Currently all update request for a given pod coming when another
// update of this pod is being processed are ignored.
isWorking map[types.UID]bool
// Tracks the last undelivered work item for this pod - a work item is
// undelivered if it comes in while the worker is working.
lastUndeliveredWorkUpdate map[types.UID]workUpdate
// DockerCache is used for listing running containers.
dockerCache dockertools.DockerCache

Expand All @@ -63,22 +67,26 @@ type workUpdate struct {

func newPodWorkers(dockerCache dockertools.DockerCache, syncPodFn syncPodFnType, recorder record.EventRecorder) *podWorkers {
return &podWorkers{
podUpdates: map[types.UID]chan workUpdate{},
isWorking: map[types.UID]bool{},
dockerCache: dockerCache,
syncPodFn: syncPodFn,
recorder: recorder,
podUpdates: map[types.UID]chan workUpdate{},
isWorking: map[types.UID]bool{},
lastUndeliveredWorkUpdate: map[types.UID]workUpdate{},
dockerCache: dockerCache,
syncPodFn: syncPodFn,
recorder: recorder,
}
}

func (p *podWorkers) managePodLoop(podUpdates <-chan workUpdate) {
var minDockerCacheTime time.Time
for newWork := range podUpdates {
// Since we use docker cache, getting current state shouldn't cause
// performance overhead on Docker. Moreover, as long as we run syncPod
// no matter if it changes anything, having an old version of "containers"
// can cause starting eunended containers.
func() {
defer p.setIsWorking(newWork.pod.UID, false)
defer p.checkForUpdates(newWork.pod.UID, newWork.updateCompleteFn)
// We would like to have the state of Docker from at least the moment
// when we finished the previous processing of that pod.
if err := p.dockerCache.ForceUpdateIfOlder(minDockerCacheTime); err != nil {
glog.Errorf("Error updating docker cache: %v", err)
return
}
containers, err := p.dockerCache.RunningContainers()
if err != nil {
glog.Errorf("Error listing containers while syncing pod: %v", err)
Expand All @@ -91,6 +99,7 @@ func (p *podWorkers) managePodLoop(podUpdates <-chan workUpdate) {
p.recorder.Eventf(newWork.pod, "failedSync", "Error syncing pod, skipping: %v", err)
return
}
minDockerCacheTime = time.Now()

newWork.updateCompleteFn()
}()
Expand All @@ -106,33 +115,28 @@ func (p *podWorkers) UpdatePod(pod *api.BoundPod, updateComplete func()) {
p.podLock.Lock()
defer p.podLock.Unlock()
if podUpdates, exists = p.podUpdates[uid]; !exists {
// Currently all update request for a given pod coming when another
// update of this pod is being processed are ignored.
// We need to have a buffer here, because checkForUpdates() method that
// puts an update into channel is called from the same goroutine where
// the channel is consumed. However, it is guaranteed that in such case
// the channel is empty, so buffer of size 1 is enough.
podUpdates = make(chan workUpdate, 1)
p.podUpdates[uid] = podUpdates
go func() {
defer util.HandleCrash()
p.managePodLoop(podUpdates)
}()
}
// TODO(wojtek-t): Consider changing to the following model:
// - add a cache of "desired" pod state
// - whenever an update of a pod comes, we update the "desired" cache
// - if per-pod goroutine is currently iddle, we send the it immediately
// to the per-pod goroutine and clear the cache;
// - when per-pod goroutine finishes processing an update it checks the
// desired cache for next update to proces
// - the crucial thing in this approach is that we don't accumulate multiple
// updates for a given pod (at any point in time there will be at most
// one update queued for a given pod, plus potentially one currently being
// processed) and additionally don't rely on the fact that an update will
// be resend (because we don't drop it)
if !p.isWorking[pod.UID] {
p.isWorking[pod.UID] = true
podUpdates <- workUpdate{
pod: pod,
updateCompleteFn: updateComplete,
}
} else {
p.lastUndeliveredWorkUpdate[pod.UID] = workUpdate{
pod: pod,
updateCompleteFn: updateComplete,
}
}
}

Expand All @@ -143,12 +147,23 @@ func (p *podWorkers) ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty
if _, exists := desiredPods[key]; !exists {
close(channel)
delete(p.podUpdates, key)
// If there is an undelivered work update for this pod we need to remove it
// since per-pod goroutine won't be able to put it to the already closed
// channel when it finish processing the current work update.
if _, cached := p.lastUndeliveredWorkUpdate[key]; cached {
delete(p.lastUndeliveredWorkUpdate, key)
}
}
}
}

func (p *podWorkers) setIsWorking(uid types.UID, isWorking bool) {
func (p *podWorkers) checkForUpdates(uid types.UID, updateComplete func()) {
p.podLock.Lock()
p.isWorking[uid] = isWorking
p.podLock.Unlock()
defer p.podLock.Unlock()
if workUpdate, exists := p.lastUndeliveredWorkUpdate[uid]; exists {
p.podUpdates[uid] <- workUpdate
delete(p.lastUndeliveredWorkUpdate, uid)
} else {
p.isWorking[uid] = false
}
}

0 comments on commit 80576df

Please sign in to comment.