-
Notifications
You must be signed in to change notification settings - Fork 40k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
kubelet: add container runtime cache and fake runtime.
- Loading branch information
Yifan Gu
committed
Mar 20, 2015
1 parent
319d537
commit 487d34e
Showing
2 changed files
with
326 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,203 @@ | ||
/* | ||
Copyright 2015 Google Inc. All rights reserved. | ||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
package container | ||
|
||
import ( | ||
"fmt" | ||
"reflect" | ||
"sync" | ||
"time" | ||
|
||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" | ||
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume" | ||
) | ||
|
||
// FakeRuntime is a fake container runtime for testing. | ||
type FakeRuntime struct { | ||
sync.Mutex | ||
CalledFunctions []string | ||
Podlist []*Pod | ||
ContainerList []*Container | ||
PodStatus api.PodStatus | ||
StartedPods []string | ||
KilledPods []string | ||
StartedContainers []string | ||
KilledContainers []string | ||
VersionInfo map[string]string | ||
Err error | ||
} | ||
|
||
type FakeRuntimeCache struct { | ||
runtime Runtime | ||
} | ||
|
||
func NewFakeRuntimeCache(runtime Runtime) RuntimeCache { | ||
return &FakeRuntimeCache{runtime} | ||
} | ||
|
||
func (f *FakeRuntimeCache) GetPods() ([]*Pod, error) { | ||
return f.runtime.GetPods(false) | ||
} | ||
|
||
func (f *FakeRuntimeCache) ForceUpdateIfOlder(time.Time) error { | ||
return nil | ||
} | ||
|
||
// ClearCalls resets the FakeRuntime to the initial state. | ||
func (f *FakeRuntime) ClearCalls() { | ||
f.Lock() | ||
defer f.Unlock() | ||
|
||
f.CalledFunctions = []string{} | ||
f.Podlist = []*Pod{} | ||
f.ContainerList = []*Container{} | ||
f.PodStatus = api.PodStatus{} | ||
f.StartedPods = []string{} | ||
f.KilledPods = []string{} | ||
f.StartedContainers = []string{} | ||
f.KilledContainers = []string{} | ||
f.VersionInfo = map[string]string{} | ||
f.Err = nil | ||
} | ||
|
||
func (f *FakeRuntime) assertList(expect []string, test []string) error { | ||
if !reflect.DeepEqual(expect, test) { | ||
return fmt.Errorf("expected %#v, got %#v", expect, test) | ||
} | ||
return nil | ||
} | ||
|
||
// AssertCalls test if the invoked functions are as expected. | ||
func (f *FakeRuntime) AssertCalls(calls []string) error { | ||
f.Lock() | ||
defer f.Unlock() | ||
return f.assertList(calls, f.CalledFunctions) | ||
} | ||
|
||
func (f *FakeRuntime) AssertStartedPods(pods []string) error { | ||
f.Lock() | ||
defer f.Unlock() | ||
return f.assertList(pods, f.StartedPods) | ||
} | ||
|
||
func (f *FakeRuntime) AssertKilledPods(pods []string) error { | ||
f.Lock() | ||
defer f.Unlock() | ||
return f.assertList(pods, f.KilledPods) | ||
} | ||
|
||
func (f *FakeRuntime) AssertStartedContainers(containers []string) error { | ||
f.Lock() | ||
defer f.Unlock() | ||
return f.assertList(containers, f.StartedContainers) | ||
} | ||
|
||
func (f *FakeRuntime) AssertKilledContainers(containers []string) error { | ||
f.Lock() | ||
defer f.Unlock() | ||
return f.assertList(containers, f.KilledContainers) | ||
} | ||
|
||
func (f *FakeRuntime) Version() (map[string]string, error) { | ||
f.Lock() | ||
defer f.Unlock() | ||
|
||
f.CalledFunctions = append(f.CalledFunctions, "Version") | ||
return f.VersionInfo, f.Err | ||
} | ||
|
||
func (f *FakeRuntime) GetPods(all bool) ([]*Pod, error) { | ||
f.Lock() | ||
defer f.Unlock() | ||
|
||
f.CalledFunctions = append(f.CalledFunctions, "GetPods") | ||
return f.Podlist, f.Err | ||
} | ||
|
||
func (f *FakeRuntime) RunPod(pod *api.Pod, volumeMap map[string]volume.Interface) error { | ||
f.Lock() | ||
defer f.Unlock() | ||
|
||
f.CalledFunctions = append(f.CalledFunctions, "RunPod") | ||
f.StartedPods = append(f.StartedPods, string(pod.UID)) | ||
for _, c := range pod.Spec.Containers { | ||
f.StartedContainers = append(f.StartedContainers, c.Name) | ||
} | ||
return f.Err | ||
} | ||
|
||
func (f *FakeRuntime) KillPod(pod *api.Pod) error { | ||
f.Lock() | ||
defer f.Unlock() | ||
|
||
f.CalledFunctions = append(f.CalledFunctions, "KillPod") | ||
f.KilledPods = append(f.KilledPods, string(pod.UID)) | ||
for _, c := range pod.Spec.Containers { | ||
f.KilledContainers = append(f.KilledContainers, c.Name) | ||
} | ||
return f.Err | ||
} | ||
|
||
func (f *FakeRuntime) RunContainerInPod(container api.Container, pod *api.Pod, volumeMap map[string]volume.Interface) error { | ||
f.Lock() | ||
defer f.Unlock() | ||
|
||
f.CalledFunctions = append(f.CalledFunctions, "RunContainerInPod") | ||
f.StartedContainers = append(f.StartedContainers, container.Name) | ||
|
||
pod.Spec.Containers = append(pod.Spec.Containers, container) | ||
for _, c := range pod.Spec.Containers { | ||
if c.Name == container.Name { // Container already in the pod. | ||
return f.Err | ||
} | ||
} | ||
pod.Spec.Containers = append(pod.Spec.Containers, container) | ||
return f.Err | ||
} | ||
|
||
func (f *FakeRuntime) KillContainerInPod(container api.Container, pod *api.Pod) error { | ||
f.Lock() | ||
defer f.Unlock() | ||
|
||
f.CalledFunctions = append(f.CalledFunctions, "KillContainerInPod") | ||
f.KilledContainers = append(f.KilledContainers, container.Name) | ||
|
||
var containers []api.Container | ||
for _, c := range pod.Spec.Containers { | ||
if c.Name == container.Name { | ||
continue | ||
} | ||
containers = append(containers, c) | ||
} | ||
return f.Err | ||
} | ||
|
||
func (f *FakeRuntime) GetPodStatus(pod *Pod) (api.PodStatus, error) { | ||
f.Lock() | ||
defer f.Unlock() | ||
|
||
f.CalledFunctions = append(f.CalledFunctions, "GetPodStatus") | ||
return f.PodStatus, f.Err | ||
} | ||
|
||
func (f *FakeRuntime) GetContainers(all bool) ([]*Container, error) { | ||
f.Lock() | ||
defer f.Unlock() | ||
|
||
f.CalledFunctions = append(f.CalledFunctions, "GetContainers") | ||
return f.ContainerList, f.Err | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,123 @@ | ||
/* | ||
Copyright 2015 Google Inc. All rights reserved. | ||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
package container | ||
|
||
import ( | ||
"sync" | ||
"time" | ||
) | ||
|
||
var ( | ||
// TODO(yifan): Maybe set the them as parameters for NewCache(). | ||
defaultCachePeriod = time.Second * 2 | ||
defaultUpdateInterval = time.Millisecond * 100 | ||
) | ||
|
||
type RuntimeCache interface { | ||
GetPods() ([]*Pod, error) | ||
ForceUpdateIfOlder(time.Time) error | ||
} | ||
|
||
// NewRuntimeCache creates a container runtime cache. | ||
func NewRuntimeCache(runtime Runtime) (RuntimeCache, error) { | ||
pods, err := runtime.GetPods(false) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return &runtimeCache{ | ||
runtime: runtime, | ||
cacheTime: time.Now(), | ||
pods: pods, | ||
updating: false, | ||
}, nil | ||
} | ||
|
||
type runtimeCache struct { | ||
sync.Mutex | ||
// The underlying container runtime used to update the cache. | ||
runtime Runtime | ||
// Last time when cache was updated. | ||
cacheTime time.Time | ||
// The content of the cache. | ||
pods []*Pod | ||
// Whether the background thread updating the cache is running. | ||
updating bool | ||
// Time when the background thread should be stopped. | ||
updatingThreadStopTime time.Time | ||
} | ||
|
||
// GetPods returns the cached result for ListPods if the result is not | ||
// outdated, otherwise it will retrieve the newest result. | ||
// If the cache updating loop has stopped, this function will restart it. | ||
func (r *runtimeCache) GetPods() ([]*Pod, error) { | ||
r.Lock() | ||
defer r.Unlock() | ||
if time.Since(r.cacheTime) > defaultCachePeriod { | ||
if err := r.updateCache(); err != nil { | ||
return nil, err | ||
} | ||
} | ||
// Stop refreshing thread if there were no requests within the default cache period | ||
r.updatingThreadStopTime = time.Now().Add(defaultCachePeriod) | ||
if !r.updating { | ||
r.updating = true | ||
go r.startUpdatingCache() | ||
} | ||
return r.pods, nil | ||
} | ||
|
||
func (r *runtimeCache) ForceUpdateIfOlder(minExpectedCacheTime time.Time) error { | ||
r.Lock() | ||
defer r.Unlock() | ||
if r.cacheTime.Before(minExpectedCacheTime) { | ||
return r.updateCache() | ||
} | ||
return nil | ||
} | ||
|
||
func (r *runtimeCache) updateCache() error { | ||
pods, err := r.runtime.GetPods(false) | ||
if err != nil { | ||
return err | ||
} | ||
r.pods = pods | ||
r.cacheTime = time.Now() | ||
return nil | ||
} | ||
|
||
// startUpdateingCache continues to invoke GetPods to get the newest result until | ||
// there is no requests within the default cache period. | ||
func (r *runtimeCache) startUpdatingCache() { | ||
run := true | ||
for run { | ||
time.Sleep(defaultUpdateInterval) | ||
pods, err := r.runtime.GetPods(false) | ||
cacheTime := time.Now() | ||
if err != nil { | ||
continue | ||
} | ||
|
||
r.Lock() | ||
if time.Now().After(r.updatingThreadStopTime) { | ||
r.updating = false | ||
run = false | ||
} | ||
r.pods = pods | ||
r.cacheTime = cacheTime | ||
r.Unlock() | ||
} | ||
} |