diff --git a/pkg/kubelet/dockertools/docker_cache.go b/pkg/kubelet/dockertools/docker_cache.go new file mode 100644 index 0000000000000..43a10e65ccc84 --- /dev/null +++ b/pkg/kubelet/dockertools/docker_cache.go @@ -0,0 +1,97 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dockertools + +import ( + "sync" + "time" +) + +type DockerCache interface { + RunningContainers() (DockerContainers, error) +} + +func NewDockerCache(client DockerInterface) (DockerCache, error) { + containers, err := GetKubeletDockerContainers(client, false) + if err != nil { + return nil, err + } + return &dockerCache{ + client: client, + cacheTime: time.Now(), + containers: containers, + updatingCache: false, + }, nil +} + +// dockerCache is a default implementation of DockerCache interface +type dockerCache struct { + // The underlying docker client used to update the cache. + client DockerInterface + + // Mutex protecting all of the following fields. + lock sync.Mutex + // Last time when cache was updated. + cacheTime time.Time + // The content of the cache. + containers DockerContainers + // Whether the background thread updating the cache is running. + updatingCache bool + // Time when the background thread should be stopped. + updatingThreadStopTime time.Time +} + +func (d *dockerCache) RunningContainers() (DockerContainers, error) { + d.lock.Lock() + defer d.lock.Unlock() + if time.Since(d.cacheTime) > 2*time.Second { + containers, err := GetKubeletDockerContainers(d.client, false) + if err != nil { + return containers, err + } + d.containers = containers + d.cacheTime = time.Now() + } + // Stop refreshing thread if there were no requests within last 2 seconds. + d.updatingThreadStopTime = time.Now().Add(time.Duration(2) * time.Second) + if !d.updatingCache { + d.updatingCache = true + go d.startUpdatingCache() + } + return d.containers, nil +} + +func (d *dockerCache) startUpdatingCache() { + run := true + for run { + time.Sleep(100 * time.Millisecond) + containers, err := GetKubeletDockerContainers(d.client, false) + cacheTime := time.Now() + if err != nil { + continue + } + + d.lock.Lock() + if time.Now().After(d.updatingThreadStopTime) { + d.updatingCache = false + run = false + } + d.containers = containers + d.cacheTime = cacheTime + d.lock.Unlock() + } +} diff --git a/pkg/kubelet/dockertools/fake_docker_client.go b/pkg/kubelet/dockertools/fake_docker_client.go index 28c05eb34b72e..d65074920ca79 100644 --- a/pkg/kubelet/dockertools/fake_docker_client.go +++ b/pkg/kubelet/dockertools/fake_docker_client.go @@ -232,3 +232,17 @@ func (f *FakeDockerPuller) IsImagePresent(name string) (bool, error) { } return false, nil } + +type FakeDockerCache struct { + client DockerInterface +} + +func NewFakeDockerCache(client DockerInterface) DockerCache { + return &FakeDockerCache{ + client: client, + } +} + +func (f *FakeDockerCache) RunningContainers() (DockerContainers, error) { + return GetKubeletDockerContainers(f.client, false) +} diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index dd60841bd1660..e4f7aedccf8d2 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -128,10 +128,16 @@ func NewMainKubelet( streamingConnectionIdleTimeout: streamingConnectionIdleTimeout, } - if err := klet.setupDataDirs(); err != nil { + dockerCache, err := dockertools.NewDockerCache(dockerClient) + if err != nil { return nil, err } - if err := klet.volumePluginMgr.InitPlugins(volumePlugins, &volumeHost{klet}); err != nil { + klet.dockerCache = dockerCache + + if err = klet.setupDataDirs(); err != nil { + return nil, err + } + if err = klet.volumePluginMgr.InitPlugins(volumePlugins, &volumeHost{klet}); err != nil { return nil, err } @@ -150,6 +156,7 @@ type serviceLister interface { type Kubelet struct { hostname string dockerClient dockertools.DockerInterface + dockerCache dockertools.DockerCache kubeClient *client.Client rootDirectory string podInfraContainerImage string @@ -1300,7 +1307,7 @@ func (kl *Kubelet) SyncPods(pods []api.BoundPod) error { desiredContainers := make(map[podContainer]empty) desiredPods := make(map[types.UID]empty) - dockerContainers, err := dockertools.GetKubeletDockerContainers(kl.dockerClient, false) + dockerContainers, err := kl.dockerCache.RunningContainers() if err != nil { glog.Errorf("Error listing containers: %#v", dockerContainers) return err diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index b1e45591e3ee7..c1a792f9f5129 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -55,6 +55,7 @@ func newTestKubelet(t *testing.T) (*Kubelet, *dockertools.FakeDockerClient) { kubelet := &Kubelet{} kubelet.dockerClient = fakeDocker + kubelet.dockerCache = dockertools.NewFakeDockerCache(fakeDocker) kubelet.dockerPuller = &dockertools.FakeDockerPuller{} if tempDir, err := ioutil.TempDir("/tmp", "kubelet_test."); err != nil { t.Fatalf("can't make a temp rootdir: %v", err)