Skip to content

Commit

Permalink
Merge pull request kubernetes#4804 from wojtek-t/separate_pod_workers
Browse files Browse the repository at this point in the history
Thread-per-pod model in Kubelet.
  • Loading branch information
vmarmol committed Feb 26, 2015
2 parents f4bee7e + 7191c5c commit e41d0bd
Show file tree
Hide file tree
Showing 3 changed files with 184 additions and 119 deletions.
50 changes: 6 additions & 44 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ func NewMainKubelet(
rootDirectory: rootDirectory,
resyncInterval: resyncInterval,
podInfraContainerImage: podInfraContainerImage,
podWorkers: newPodWorkers(),
dockerIDToRef: map[dockertools.DockerID]*api.ObjectReference{},
runner: dockertools.NewDockerContainerCommandRunner(dockerClient),
httpClient: &http.Client{},
Expand All @@ -134,6 +133,7 @@ func NewMainKubelet(
return nil, err
}
klet.dockerCache = dockerCache
klet.podWorkers = newPodWorkers(dockerCache, klet.syncPod)

metrics.Register(dockerCache)

Expand Down Expand Up @@ -453,43 +453,6 @@ func (kl *Kubelet) Run(updates <-chan PodUpdate) {
kl.syncLoop(updates, kl)
}

// Per-pod workers.
type podWorkers struct {
lock sync.Mutex

// Set of pods with existing workers.
workers util.StringSet
}

func newPodWorkers() *podWorkers {
return &podWorkers{
workers: util.NewStringSet(),
}
}

// Runs a worker for "podFullName" asynchronously with the specified "action".
// If the worker for the "podFullName" is already running, functions as a no-op.
func (self *podWorkers) Run(podFullName string, action func()) {
self.lock.Lock()
defer self.lock.Unlock()

// This worker is already running, let it finish.
if self.workers.Has(podFullName) {
return
}
self.workers.Insert(podFullName)

// Run worker async.
go func() {
defer util.HandleCrash()
action()

self.lock.Lock()
defer self.lock.Unlock()
self.workers.Delete(podFullName)
}()
}

func makeBinds(pod *api.BoundPod, container *api.Container, podVolumes volumeMap) []string {
binds := []string{}
for _, mount := range container.VolumeMounts {
Expand Down Expand Up @@ -1333,13 +1296,12 @@ func (kl *Kubelet) SyncPods(pods []api.BoundPod) error {
}

// Run the sync in an async manifest worker.
kl.podWorkers.Run(podFullName, func() {
if err := kl.syncPod(pod, dockerContainers); err != nil {
glog.Errorf("Error syncing pod, skipping: %v", err)
record.Eventf(pod, "failedSync", "Error syncing pod, skipping: %v", err)
}
})
kl.podWorkers.UpdatePod(*pod)
}

// Stop the workers for no-longer existing pods.
kl.podWorkers.ForgetNonExistingPodWorkers(desiredPods)

// Kill any containers we don't need.
killed := []string{}
for ix := range dockerContainers {
Expand Down
Loading

0 comments on commit e41d0bd

Please sign in to comment.