diff --git a/pkg/kubelet/container/fake_runtime.go b/pkg/kubelet/container/fake_runtime.go new file mode 100644 index 0000000000000..ffdf4c8c92dd2 --- /dev/null +++ b/pkg/kubelet/container/fake_runtime.go @@ -0,0 +1,203 @@ +/* +Copyright 2015 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package container + +import ( + "fmt" + "reflect" + "sync" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume" +) + +// FakeRuntime is a fake container runtime for testing. +type FakeRuntime struct { + sync.Mutex + CalledFunctions []string + Podlist []*Pod + ContainerList []*Container + PodStatus api.PodStatus + StartedPods []string + KilledPods []string + StartedContainers []string + KilledContainers []string + VersionInfo map[string]string + Err error +} + +type FakeRuntimeCache struct { + runtime Runtime +} + +func NewFakeRuntimeCache(runtime Runtime) RuntimeCache { + return &FakeRuntimeCache{runtime} +} + +func (f *FakeRuntimeCache) GetPods() ([]*Pod, error) { + return f.runtime.GetPods(false) +} + +func (f *FakeRuntimeCache) ForceUpdateIfOlder(time.Time) error { + return nil +} + +// ClearCalls resets the FakeRuntime to the initial state. +func (f *FakeRuntime) ClearCalls() { + f.Lock() + defer f.Unlock() + + f.CalledFunctions = []string{} + f.Podlist = []*Pod{} + f.ContainerList = []*Container{} + f.PodStatus = api.PodStatus{} + f.StartedPods = []string{} + f.KilledPods = []string{} + f.StartedContainers = []string{} + f.KilledContainers = []string{} + f.VersionInfo = map[string]string{} + f.Err = nil +} + +func (f *FakeRuntime) assertList(expect []string, test []string) error { + if !reflect.DeepEqual(expect, test) { + return fmt.Errorf("expected %#v, got %#v", expect, test) + } + return nil +} + +// AssertCalls test if the invoked functions are as expected. +func (f *FakeRuntime) AssertCalls(calls []string) error { + f.Lock() + defer f.Unlock() + return f.assertList(calls, f.CalledFunctions) +} + +func (f *FakeRuntime) AssertStartedPods(pods []string) error { + f.Lock() + defer f.Unlock() + return f.assertList(pods, f.StartedPods) +} + +func (f *FakeRuntime) AssertKilledPods(pods []string) error { + f.Lock() + defer f.Unlock() + return f.assertList(pods, f.KilledPods) +} + +func (f *FakeRuntime) AssertStartedContainers(containers []string) error { + f.Lock() + defer f.Unlock() + return f.assertList(containers, f.StartedContainers) +} + +func (f *FakeRuntime) AssertKilledContainers(containers []string) error { + f.Lock() + defer f.Unlock() + return f.assertList(containers, f.KilledContainers) +} + +func (f *FakeRuntime) Version() (map[string]string, error) { + f.Lock() + defer f.Unlock() + + f.CalledFunctions = append(f.CalledFunctions, "Version") + return f.VersionInfo, f.Err +} + +func (f *FakeRuntime) GetPods(all bool) ([]*Pod, error) { + f.Lock() + defer f.Unlock() + + f.CalledFunctions = append(f.CalledFunctions, "GetPods") + return f.Podlist, f.Err +} + +func (f *FakeRuntime) RunPod(pod *api.Pod, volumeMap map[string]volume.Interface) error { + f.Lock() + defer f.Unlock() + + f.CalledFunctions = append(f.CalledFunctions, "RunPod") + f.StartedPods = append(f.StartedPods, string(pod.UID)) + for _, c := range pod.Spec.Containers { + f.StartedContainers = append(f.StartedContainers, c.Name) + } + return f.Err +} + +func (f *FakeRuntime) KillPod(pod *api.Pod) error { + f.Lock() + defer f.Unlock() + + f.CalledFunctions = append(f.CalledFunctions, "KillPod") + f.KilledPods = append(f.KilledPods, string(pod.UID)) + for _, c := range pod.Spec.Containers { + f.KilledContainers = append(f.KilledContainers, c.Name) + } + return f.Err +} + +func (f *FakeRuntime) RunContainerInPod(container api.Container, pod *api.Pod, volumeMap map[string]volume.Interface) error { + f.Lock() + defer f.Unlock() + + f.CalledFunctions = append(f.CalledFunctions, "RunContainerInPod") + f.StartedContainers = append(f.StartedContainers, container.Name) + + pod.Spec.Containers = append(pod.Spec.Containers, container) + for _, c := range pod.Spec.Containers { + if c.Name == container.Name { // Container already in the pod. + return f.Err + } + } + pod.Spec.Containers = append(pod.Spec.Containers, container) + return f.Err +} + +func (f *FakeRuntime) KillContainerInPod(container api.Container, pod *api.Pod) error { + f.Lock() + defer f.Unlock() + + f.CalledFunctions = append(f.CalledFunctions, "KillContainerInPod") + f.KilledContainers = append(f.KilledContainers, container.Name) + + var containers []api.Container + for _, c := range pod.Spec.Containers { + if c.Name == container.Name { + continue + } + containers = append(containers, c) + } + return f.Err +} + +func (f *FakeRuntime) GetPodStatus(pod *Pod) (api.PodStatus, error) { + f.Lock() + defer f.Unlock() + + f.CalledFunctions = append(f.CalledFunctions, "GetPodStatus") + return f.PodStatus, f.Err +} + +func (f *FakeRuntime) GetContainers(all bool) ([]*Container, error) { + f.Lock() + defer f.Unlock() + + f.CalledFunctions = append(f.CalledFunctions, "GetContainers") + return f.ContainerList, f.Err +} diff --git a/pkg/kubelet/container/runtime.go b/pkg/kubelet/container/runtime.go new file mode 100644 index 0000000000000..c62a5a52e7959 --- /dev/null +++ b/pkg/kubelet/container/runtime.go @@ -0,0 +1,84 @@ +/* +Copyright 2015 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package container + +import ( + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume" + "github.com/GoogleCloudPlatform/kubernetes/pkg/types" +) + +// Runtime interface defines the interfaces that should be implemented +// by a container runtime. +type Runtime interface { + // Version returns a map of version information of the container runtime. + Version() (map[string]string, error) + // GetPods returns a list containers group by pods. The boolean parameter + // specifies whether the runtime returns all containers including those already + // exited and dead containers (used for garbage collection). + GetPods(all bool) ([]*Pod, error) + // RunPod starts all the containers of a pod within a namespace. + RunPod(*api.Pod, map[string]volume.Interface) error + // KillPod kills all the containers of a pod. + KillPod(*api.Pod) error + // RunContainerInPod starts a container within the same namespace of a pod. + RunContainerInPod(api.Container, *api.Pod, map[string]volume.Interface) error + // KillContainerInPod kills a container in the pod. + KillContainerInPod(api.Container, *api.Pod) error + // GetPodStatus retrieves the status of the pod, including the information of + // all containers in the pod. + GetPodStatus(*Pod) (api.PodStatus, error) + // GetContainers returns all containers on the node, including those are + // not managed by kubelet. If 'all' is false, then only running containers + // are returned. + GetContainers(all bool) ([]*Container, error) + // TODO(yifan): Pull/Remove images +} + +// Pod is a group of containers, with the status of the pod. +type Pod struct { + // The ID of the pod, which can be used to retrieve a particular pod + // from the pod list returned by GetPods(). + ID types.UID + // The name and namespace of the pod, which is readable by human. + Name string + Namespace string + // List of containers that belongs to this pod. It may contain only + // running containers, or mixed with dead ones (when GetPods(true)). + Containers []*Container + // The status of the pod. + // TODO(yifan): Inspect and get the statuses for all pods can be expensive, + // maybe we want to get one pod's status at a time (e.g. GetPodStatus() + // for the particular pod after we GetPods()). + Status api.PodStatus +} + +// Container provides the runtime information for a container, such as ID, hash, +// status of the container. +type Container struct { + // The ID of the container, used by the container runtime to identify + // a container. + ID types.UID + // The name of the container, which should be the same as specified by + // api.Container. + Name string + // The image name of the container. + Image string + // Hash of the container, used for comparison. Optional for containers + // not managed by kubelet. + Hash uint64 +} diff --git a/pkg/kubelet/container/runtime_cache.go b/pkg/kubelet/container/runtime_cache.go new file mode 100644 index 0000000000000..6bda534df563e --- /dev/null +++ b/pkg/kubelet/container/runtime_cache.go @@ -0,0 +1,123 @@ +/* +Copyright 2015 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package container + +import ( + "sync" + "time" +) + +var ( + // TODO(yifan): Maybe set the them as parameters for NewCache(). + defaultCachePeriod = time.Second * 2 + defaultUpdateInterval = time.Millisecond * 100 +) + +type RuntimeCache interface { + GetPods() ([]*Pod, error) + ForceUpdateIfOlder(time.Time) error +} + +// NewRuntimeCache creates a container runtime cache. +func NewRuntimeCache(runtime Runtime) (RuntimeCache, error) { + pods, err := runtime.GetPods(false) + if err != nil { + return nil, err + } + return &runtimeCache{ + runtime: runtime, + cacheTime: time.Now(), + pods: pods, + updating: false, + }, nil +} + +type runtimeCache struct { + sync.Mutex + // The underlying container runtime used to update the cache. + runtime Runtime + // Last time when cache was updated. + cacheTime time.Time + // The content of the cache. + pods []*Pod + // Whether the background thread updating the cache is running. + updating bool + // Time when the background thread should be stopped. + updatingThreadStopTime time.Time +} + +// GetPods returns the cached result for ListPods if the result is not +// outdated, otherwise it will retrieve the newest result. +// If the cache updating loop has stopped, this function will restart it. +func (r *runtimeCache) GetPods() ([]*Pod, error) { + r.Lock() + defer r.Unlock() + if time.Since(r.cacheTime) > defaultCachePeriod { + if err := r.updateCache(); err != nil { + return nil, err + } + } + // Stop refreshing thread if there were no requests within the default cache period + r.updatingThreadStopTime = time.Now().Add(defaultCachePeriod) + if !r.updating { + r.updating = true + go r.startUpdatingCache() + } + return r.pods, nil +} + +func (r *runtimeCache) ForceUpdateIfOlder(minExpectedCacheTime time.Time) error { + r.Lock() + defer r.Unlock() + if r.cacheTime.Before(minExpectedCacheTime) { + return r.updateCache() + } + return nil +} + +func (r *runtimeCache) updateCache() error { + pods, err := r.runtime.GetPods(false) + if err != nil { + return err + } + r.pods = pods + r.cacheTime = time.Now() + return nil +} + +// startUpdateingCache continues to invoke GetPods to get the newest result until +// there is no requests within the default cache period. +func (r *runtimeCache) startUpdatingCache() { + run := true + for run { + time.Sleep(defaultUpdateInterval) + pods, err := r.runtime.GetPods(false) + cacheTime := time.Now() + if err != nil { + continue + } + + r.Lock() + if time.Now().After(r.updatingThreadStopTime) { + r.updating = false + run = false + } + r.pods = pods + r.cacheTime = cacheTime + r.Unlock() + } +}