diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index 845dbb1678a29..3a6e83e2e257c 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -216,7 +216,7 @@ func NewKubeletServer() *KubeletServer { RootDirectory: defaultRootDir, SerializeImagePulls: true, StreamingConnectionIdleTimeout: 5 * time.Minute, - SyncFrequency: 10 * time.Second, + SyncFrequency: 1 * time.Minute, SystemContainer: "", ReconcileCIDR: true, KubeAPIQPS: 5.0, diff --git a/docs/admin/kubelet.md b/docs/admin/kubelet.md index 69b6650186cee..9d746f48d6efd 100644 --- a/docs/admin/kubelet.md +++ b/docs/admin/kubelet.md @@ -131,13 +131,13 @@ kubelet --runonce[=false]: If true, exit after spawning pods from local manifests or remote urls. Exclusive with --api-servers, and --enable-server --serialize-image-pulls[=true]: Pull images one at a time. We recommend *not* changing the default value on nodes that run docker daemon with version < 1.9 or an Aufs storage backend. Issue #10959 has more details. [default=true] --streaming-connection-idle-timeout=5m0s: Maximum time a streaming connection can be idle before the connection is automatically closed. Example: '5m' - --sync-frequency=10s: Max period between synchronizing running containers and config + --sync-frequency=1m0s: Max period between synchronizing running containers and config --system-container="": Optional resource-only container in which to place all non-kernel processes that are not already in a container. Empty for no container. Rolling back the flag requires a reboot. (Default: ""). --tls-cert-file="": File containing x509 Certificate for HTTPS. (CA cert, if any, concatenated after server cert). If --tls-cert-file and --tls-private-key-file are not provided, a self-signed certificate and key are generated for the public address and saved to the directory passed to --cert-dir. --tls-private-key-file="": File containing x509 private key matching --tls-cert-file. ``` -###### Auto generated by spf13/cobra on 10-Nov-2015 +###### Auto generated by spf13/cobra on 11-Nov-2015 diff --git a/pkg/kubelet/container/fake_runtime.go b/pkg/kubelet/container/fake_runtime.go index bb2bc72804fc5..de8853698021e 100644 --- a/pkg/kubelet/container/fake_runtime.go +++ b/pkg/kubelet/container/fake_runtime.go @@ -33,6 +33,7 @@ type FakeRuntime struct { sync.Mutex CalledFunctions []string PodList []*Pod + AllPodList []*Pod ImageList []Image PodStatus api.PodStatus StartedPods []string @@ -89,6 +90,7 @@ func (f *FakeRuntime) ClearCalls() { f.CalledFunctions = []string{} f.PodList = []*Pod{} + f.AllPodList = []*Pod{} f.PodStatus = api.PodStatus{} f.StartedPods = []string{} f.KilledPods = []string{} @@ -155,6 +157,9 @@ func (f *FakeRuntime) GetPods(all bool) ([]*Pod, error) { defer f.Unlock() f.CalledFunctions = append(f.CalledFunctions, "GetPods") + if all { + return f.AllPodList, f.Err + } return f.PodList, f.Err } diff --git a/pkg/kubelet/container/runtime.go b/pkg/kubelet/container/runtime.go index 9a1590a91e048..5f58e1a4e36e7 100644 --- a/pkg/kubelet/container/runtime.go +++ b/pkg/kubelet/container/runtime.go @@ -198,6 +198,15 @@ func (c *ContainerID) UnmarshalJSON(data []byte) error { return c.ParseString(string(data)) } +type ContainerStatus string + +const ( + ContainerStatusRunning ContainerStatus = "running" + ContainerStatusExited ContainerStatus = "exited" + // This unknown encompasses all the statuses that we currently don't care. + ContainerStatusUnknown ContainerStatus = "unknown" +) + // Container provides the runtime information for a container, such as ID, hash, // status of the container. type Container struct { @@ -215,6 +224,8 @@ type Container struct { // The timestamp of the creation time of the container. // TODO(yifan): Consider to move it to api.ContainerStatus. Created int64 + // Status is the status of the container. + Status ContainerStatus } // Basic information about a container image. diff --git a/pkg/kubelet/dockertools/convert.go b/pkg/kubelet/dockertools/convert.go index 7f743f5bd35a1..b55366cf16547 100644 --- a/pkg/kubelet/dockertools/convert.go +++ b/pkg/kubelet/dockertools/convert.go @@ -18,6 +18,7 @@ package dockertools import ( "fmt" + "strings" docker "github.com/fsouza/go-dockerclient" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" @@ -27,6 +28,19 @@ import ( // This file contains helper functions to convert docker API types to runtime // (kubecontainer) types. +func mapStatus(status string) kubecontainer.ContainerStatus { + // Parse the status string in docker.APIContainers. This could break when + // we upgrade docker. + switch { + case strings.HasPrefix(status, "Up"): + return kubecontainer.ContainerStatusRunning + case strings.HasPrefix(status, "Exited"): + return kubecontainer.ContainerStatusExited + default: + return kubecontainer.ContainerStatusUnknown + } +} + // Converts docker.APIContainers to kubecontainer.Container. func toRuntimeContainer(c *docker.APIContainers) (*kubecontainer.Container, error) { if c == nil { @@ -37,12 +51,14 @@ func toRuntimeContainer(c *docker.APIContainers) (*kubecontainer.Container, erro if err != nil { return nil, err } + return &kubecontainer.Container{ ID: kubetypes.DockerID(c.ID).ContainerID(), Name: dockerName.ContainerName, Image: c.Image, Hash: hash, Created: c.Created, + Status: mapStatus(c.Status), }, nil } diff --git a/pkg/kubelet/dockertools/convert_test.go b/pkg/kubelet/dockertools/convert_test.go index acc99f660f287..439100ae41abb 100644 --- a/pkg/kubelet/dockertools/convert_test.go +++ b/pkg/kubelet/dockertools/convert_test.go @@ -24,12 +24,31 @@ import ( kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" ) +func TestMapStatus(t *testing.T) { + testCases := []struct { + input string + expected kubecontainer.ContainerStatus + }{ + {input: "Up 5 hours", expected: kubecontainer.ContainerStatusRunning}, + {input: "Exited (0) 2 hours ago", expected: kubecontainer.ContainerStatusExited}, + {input: "Created", expected: kubecontainer.ContainerStatusUnknown}, + {input: "Random string", expected: kubecontainer.ContainerStatusUnknown}, + } + + for i, test := range testCases { + if actual := mapStatus(test.input); actual != test.expected { + t.Errorf("Test[%d]: expected %q, got %q", i, test.expected, actual) + } + } +} + func TestToRuntimeContainer(t *testing.T) { original := &docker.APIContainers{ ID: "ab2cdf", Image: "bar_image", Created: 12345, Names: []string{"/k8s_bar.5678_foo_ns_1234_42"}, + Status: "Up 5 hours", } expected := &kubecontainer.Container{ ID: kubecontainer.ContainerID{"docker", "ab2cdf"}, @@ -37,6 +56,7 @@ func TestToRuntimeContainer(t *testing.T) { Image: "bar_image", Hash: 0x5678, Created: 12345, + Status: kubecontainer.ContainerStatusRunning, } actual, err := toRuntimeContainer(original) diff --git a/pkg/kubelet/dockertools/docker_test.go b/pkg/kubelet/dockertools/docker_test.go index 82cf3cd3ac537..fb7ecd88ad438 100644 --- a/pkg/kubelet/dockertools/docker_test.go +++ b/pkg/kubelet/dockertools/docker_test.go @@ -580,14 +580,16 @@ func TestFindContainersByPod(t *testing.T) { Namespace: "ns", Containers: []*kubecontainer.Container{ { - ID: kubetypes.DockerID("foobar").ContainerID(), - Name: "foobar", - Hash: 0x1234, + ID: kubetypes.DockerID("foobar").ContainerID(), + Name: "foobar", + Hash: 0x1234, + Status: kubecontainer.ContainerStatusUnknown, }, { - ID: kubetypes.DockerID("baz").ContainerID(), - Name: "baz", - Hash: 0x1234, + ID: kubetypes.DockerID("baz").ContainerID(), + Name: "baz", + Hash: 0x1234, + Status: kubecontainer.ContainerStatusUnknown, }, }, }, @@ -597,9 +599,10 @@ func TestFindContainersByPod(t *testing.T) { Namespace: "ns", Containers: []*kubecontainer.Container{ { - ID: kubetypes.DockerID("barbar").ContainerID(), - Name: "barbar", - Hash: 0x1234, + ID: kubetypes.DockerID("barbar").ContainerID(), + Name: "barbar", + Hash: 0x1234, + Status: kubecontainer.ContainerStatusUnknown, }, }, }, @@ -638,19 +641,22 @@ func TestFindContainersByPod(t *testing.T) { Namespace: "ns", Containers: []*kubecontainer.Container{ { - ID: kubetypes.DockerID("foobar").ContainerID(), - Name: "foobar", - Hash: 0x1234, + ID: kubetypes.DockerID("foobar").ContainerID(), + Name: "foobar", + Hash: 0x1234, + Status: kubecontainer.ContainerStatusUnknown, }, { - ID: kubetypes.DockerID("barfoo").ContainerID(), - Name: "barfoo", - Hash: 0x1234, + ID: kubetypes.DockerID("barfoo").ContainerID(), + Name: "barfoo", + Hash: 0x1234, + Status: kubecontainer.ContainerStatusUnknown, }, { - ID: kubetypes.DockerID("baz").ContainerID(), - Name: "baz", - Hash: 0x1234, + ID: kubetypes.DockerID("baz").ContainerID(), + Name: "baz", + Hash: 0x1234, + Status: kubecontainer.ContainerStatusUnknown, }, }, }, @@ -660,9 +666,10 @@ func TestFindContainersByPod(t *testing.T) { Namespace: "ns", Containers: []*kubecontainer.Container{ { - ID: kubetypes.DockerID("barbar").ContainerID(), - Name: "barbar", - Hash: 0x1234, + ID: kubetypes.DockerID("barbar").ContainerID(), + Name: "barbar", + Hash: 0x1234, + Status: kubecontainer.ContainerStatusUnknown, }, }, }, @@ -672,9 +679,10 @@ func TestFindContainersByPod(t *testing.T) { Namespace: "ns", Containers: []*kubecontainer.Container{ { - ID: kubetypes.DockerID("bazbaz").ContainerID(), - Name: "bazbaz", - Hash: 0x1234, + ID: kubetypes.DockerID("bazbaz").ContainerID(), + Name: "bazbaz", + Hash: 0x1234, + Status: kubecontainer.ContainerStatusUnknown, }, }, }, diff --git a/pkg/kubelet/image_manager_test.go b/pkg/kubelet/image_manager_test.go index 906323c7508f2..73a196785467f 100644 --- a/pkg/kubelet/image_manager_test.go +++ b/pkg/kubelet/image_manager_test.go @@ -84,7 +84,7 @@ func TestDetectImagesInitialDetect(t *testing.T) { makeImage(0, 1024), makeImage(1, 2048), } - fakeRuntime.PodList = []*container.Pod{ + fakeRuntime.AllPodList = []*container.Pod{ { Containers: []*container.Container{ makeContainer(1), @@ -114,7 +114,7 @@ func TestDetectImagesWithNewImage(t *testing.T) { makeImage(0, 1024), makeImage(1, 2048), } - fakeRuntime.PodList = []*container.Pod{ + fakeRuntime.AllPodList = []*container.Pod{ { Containers: []*container.Container{ makeContainer(1), @@ -159,7 +159,7 @@ func TestDetectImagesContainerStopped(t *testing.T) { makeImage(0, 1024), makeImage(1, 2048), } - fakeRuntime.PodList = []*container.Pod{ + fakeRuntime.AllPodList = []*container.Pod{ { Containers: []*container.Container{ makeContainer(1), @@ -175,7 +175,7 @@ func TestDetectImagesContainerStopped(t *testing.T) { require.True(t, ok) // Simulate container being stopped. - fakeRuntime.PodList = []*container.Pod{} + fakeRuntime.AllPodList = []*container.Pod{} err = manager.detectImages(time.Now()) require.NoError(t, err) assert.Equal(manager.imageRecordsLen(), 2) @@ -195,7 +195,7 @@ func TestDetectImagesWithRemovedImages(t *testing.T) { makeImage(0, 1024), makeImage(1, 2048), } - fakeRuntime.PodList = []*container.Pod{ + fakeRuntime.AllPodList = []*container.Pod{ { Containers: []*container.Container{ makeContainer(1), @@ -221,7 +221,7 @@ func TestFreeSpaceImagesInUseContainersAreIgnored(t *testing.T) { makeImage(0, 1024), makeImage(1, 2048), } - fakeRuntime.PodList = []*container.Pod{ + fakeRuntime.AllPodList = []*container.Pod{ { Containers: []*container.Container{ makeContainer(1), @@ -242,7 +242,7 @@ func TestFreeSpaceRemoveByLeastRecentlyUsed(t *testing.T) { makeImage(0, 1024), makeImage(1, 2048), } - fakeRuntime.PodList = []*container.Pod{ + fakeRuntime.AllPodList = []*container.Pod{ { Containers: []*container.Container{ makeContainer(0), @@ -253,7 +253,7 @@ func TestFreeSpaceRemoveByLeastRecentlyUsed(t *testing.T) { // Make 1 be more recently used than 0. require.NoError(t, manager.detectImages(zero)) - fakeRuntime.PodList = []*container.Pod{ + fakeRuntime.AllPodList = []*container.Pod{ { Containers: []*container.Container{ makeContainer(1), @@ -261,7 +261,7 @@ func TestFreeSpaceRemoveByLeastRecentlyUsed(t *testing.T) { }, } require.NoError(t, manager.detectImages(time.Now())) - fakeRuntime.PodList = []*container.Pod{ + fakeRuntime.AllPodList = []*container.Pod{ { Containers: []*container.Container{}, }, @@ -281,7 +281,7 @@ func TestFreeSpaceTiesBrokenByDetectedTime(t *testing.T) { fakeRuntime.ImageList = []container.Image{ makeImage(0, 1024), } - fakeRuntime.PodList = []*container.Pod{ + fakeRuntime.AllPodList = []*container.Pod{ { Containers: []*container.Container{ makeContainer(0), @@ -296,7 +296,7 @@ func TestFreeSpaceTiesBrokenByDetectedTime(t *testing.T) { makeImage(1, 2048), } require.NoError(t, manager.detectImages(time.Now())) - fakeRuntime.PodList = []*container.Pod{} + fakeRuntime.AllPodList = []*container.Pod{} require.NoError(t, manager.detectImages(time.Now())) require.Equal(t, manager.imageRecordsLen(), 2) @@ -317,7 +317,7 @@ func TestFreeSpaceImagesAlsoDoesLookupByRepoTags(t *testing.T) { Size: 2048, }, } - fakeRuntime.PodList = []*container.Pod{ + fakeRuntime.AllPodList = []*container.Pod{ { Containers: []*container.Container{ { diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 0ad13cb675202..67ee295b430f6 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -55,6 +55,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/envvars" "k8s.io/kubernetes/pkg/kubelet/metrics" "k8s.io/kubernetes/pkg/kubelet/network" + "k8s.io/kubernetes/pkg/kubelet/pleg" kubepod "k8s.io/kubernetes/pkg/kubelet/pod" "k8s.io/kubernetes/pkg/kubelet/prober" proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results" @@ -111,6 +112,19 @@ const ( housekeepingPeriod = time.Second * 2 etcHostsPath = "/etc/hosts" + + // Capacity of the channel for recieving pod lifecycle events. This number + // is a bit arbitrary and may be adjusted in the future. + plegChannelCapacity = 1000 + + // Generic PLEG relies on relisting for discovering container events. + // The period directly affects the response time of kubelet. + plegRelistPeriod = time.Second * 3 + + // backOffPeriod is the period to back off when pod syncing resulting in an + // error. It is also used as the base period for the exponential backoff + // container restarts and image pulls. + backOffPeriod = time.Second * 10 ) var ( @@ -351,6 +365,7 @@ func NewMainKubelet( serializeImagePulls, ) + klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod) case "rkt": conf := &rkt.Config{ Path: rktPath, @@ -372,6 +387,7 @@ func NewMainKubelet( } klet.containerRuntime = rktRuntime klet.imageManager = rkt.NewImageManager(rktRuntime) + klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod) // No Docker daemon to put in a container. dockerDaemonContainer = "" @@ -425,11 +441,9 @@ func NewMainKubelet( } klet.runtimeCache = runtimeCache klet.workQueue = queue.NewBasicWorkQueue() - // TODO(yujuhong): backoff and resync interval should be set differently - // once we switch to using pod event generator. - klet.podWorkers = newPodWorkers(runtimeCache, klet.syncPod, recorder, klet.workQueue, klet.resyncInterval, klet.resyncInterval) + klet.podWorkers = newPodWorkers(runtimeCache, klet.syncPod, recorder, klet.workQueue, klet.resyncInterval, backOffPeriod) - klet.backOff = util.NewBackOff(resyncInterval, MaxContainerBackOff) + klet.backOff = util.NewBackOff(backOffPeriod, MaxContainerBackOff) klet.podKillingCh = make(chan *kubecontainer.Pod, podKillingChannelCapacity) klet.sourcesSeen = sets.NewString() return klet, nil @@ -558,6 +572,9 @@ type Kubelet struct { // as it takes time to gather all necessary node information. nodeStatusUpdateFrequency time.Duration + // Generates pod events. + pleg pleg.PodLifecycleEventGenerator + // The name of the resource-only container to run the Kubelet in (empty for no container). // Name must be absolute. resourceContainer string @@ -871,6 +888,8 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) { // Run the system oom watcher forever. kl.statusManager.Start() + // Start the pod lifecycle event generator. + kl.pleg.Start() kl.syncLoop(updates, kl) } @@ -2124,20 +2143,21 @@ func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHand // sync interval is defaulted to 10s. syncTicker := time.NewTicker(time.Second) housekeepingTicker := time.NewTicker(housekeepingPeriod) + plegCh := kl.pleg.Watch() for { if rs := kl.runtimeState.errors(); len(rs) != 0 { glog.Infof("skipping pod synchronization - %v", rs) time.Sleep(5 * time.Second) continue } - if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C) { + if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) { break } } } func (kl *Kubelet) syncLoopIteration(updates <-chan kubetypes.PodUpdate, handler SyncHandler, - syncCh <-chan time.Time, housekeepingCh <-chan time.Time) bool { + syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool { kl.syncLoopMonitor.Store(time.Now()) select { case u, open := <-updates: @@ -2146,6 +2166,7 @@ func (kl *Kubelet) syncLoopIteration(updates <-chan kubetypes.PodUpdate, handler return false } kl.addSource(u.Source) + switch u.Op { case kubetypes.ADD: glog.V(2).Infof("SyncLoop (ADD, %q): %q", u.Source, kubeletutil.FormatPodNames(u.Pods)) @@ -2160,6 +2181,25 @@ func (kl *Kubelet) syncLoopIteration(updates <-chan kubetypes.PodUpdate, handler // TODO: Do we want to support this? glog.Errorf("Kubelet does not support snapshot update") } + case e := <-plegCh: + // Filter out started events since we don't use them now. + if e.Type == pleg.ContainerStarted { + break + } + pod, ok := kl.podManager.GetPodByUID(e.ID) + if !ok { + // If the pod no longer exists, ignore the event. + glog.V(4).Infof("SyncLoop (PLEG): ignore irrelevant event: %#v", e) + break + } + glog.V(2).Infof("SyncLoop (PLEG): %q, event: %#v", kubeletutil.FormatPodName(pod), e) + // Force the container runtime cache to update. + if err := kl.runtimeCache.ForceUpdateIfOlder(time.Now()); err != nil { + glog.Errorf("SyncLoop: unable to update runtime cache") + // TODO (yujuhong): should we delay the sync until container + // runtime can be updated? + } + handler.HandlePodSyncs([]*api.Pod{pod}) case <-syncCh: podsToSync := kl.getPodsToSync() if len(podsToSync) == 0 { diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index e17f972a93547..0f62153de45b6 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -46,6 +46,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/network" + "k8s.io/kubernetes/pkg/kubelet/pleg" kubepod "k8s.io/kubernetes/pkg/kubelet/pod" "k8s.io/kubernetes/pkg/kubelet/prober" proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results" @@ -147,6 +148,8 @@ func newTestKubelet(t *testing.T) *TestKubelet { kubelet.podKillingCh = make(chan *kubecontainer.Pod, 20) kubelet.resyncInterval = 10 * time.Second kubelet.workQueue = queue.NewBasicWorkQueue() + // Relist period does not affect the tests. + kubelet.pleg = pleg.NewGenericPLEG(fakeRuntime, 100, time.Hour) return &TestKubelet{kubelet, fakeRuntime, mockCadvisor, fakeKubeClient, fakeMirrorClient} } @@ -338,15 +341,16 @@ func TestSyncLoopTimeUpdate(t *testing.T) { // Start sync ticker. syncCh := make(chan time.Time, 1) housekeepingCh := make(chan time.Time, 1) + plegCh := make(chan *pleg.PodLifecycleEvent) syncCh <- time.Now() - kubelet.syncLoopIteration(make(chan kubetypes.PodUpdate), kubelet, syncCh, housekeepingCh) + kubelet.syncLoopIteration(make(chan kubetypes.PodUpdate), kubelet, syncCh, housekeepingCh, plegCh) loopTime2 := kubelet.LatestLoopEntryTime() if loopTime2.IsZero() { t.Errorf("Unexpected sync loop time: 0, expected non-zero value.") } syncCh <- time.Now() - kubelet.syncLoopIteration(make(chan kubetypes.PodUpdate), kubelet, syncCh, housekeepingCh) + kubelet.syncLoopIteration(make(chan kubetypes.PodUpdate), kubelet, syncCh, housekeepingCh, plegCh) loopTime3 := kubelet.LatestLoopEntryTime() if !loopTime3.After(loopTime1) { t.Errorf("Sync Loop Time was not updated correctly. Second update timestamp should be greater than first update timestamp") @@ -366,7 +370,7 @@ func TestSyncLoopAbort(t *testing.T) { close(ch) // sanity check (also prevent this test from hanging in the next step) - ok := kubelet.syncLoopIteration(ch, kubelet, make(chan time.Time), make(chan time.Time)) + ok := kubelet.syncLoopIteration(ch, kubelet, make(chan time.Time), make(chan time.Time), make(chan *pleg.PodLifecycleEvent, 1)) if ok { t.Fatalf("expected syncLoopIteration to return !ok since update chan was closed") } diff --git a/pkg/kubelet/pleg/doc.go b/pkg/kubelet/pleg/doc.go new file mode 100644 index 0000000000000..c8782ee898357 --- /dev/null +++ b/pkg/kubelet/pleg/doc.go @@ -0,0 +1,19 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package pleg contains types and a generic implementation of the pod +// lifecycle event generator. +package pleg diff --git a/pkg/kubelet/pleg/generic.go b/pkg/kubelet/pleg/generic.go new file mode 100644 index 0000000000000..dd94b3a99884d --- /dev/null +++ b/pkg/kubelet/pleg/generic.go @@ -0,0 +1,141 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pleg + +import ( + "fmt" + "time" + + "github.com/golang/glog" + kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + "k8s.io/kubernetes/pkg/types" + "k8s.io/kubernetes/pkg/util" +) + +// GenericPLEG is an extremely simple generic PLEG that relies solely on +// periodic listing to discover container changes. It should be be used +// as temporary replacement for container runtimes do not support a proper +// event generator yet. +// +// Note that GenericPLEG assumes that a container would not be created, +// terminated, and garbage collected within one relist period. If such an +// incident happens, GenenricPLEG would miss all events regarding this +// container. In the case of relisting failure, the window may become longer. +// Note that this assumption is not unique -- many kubelet internal components +// rely on terminated containers as tombstones for bookkeeping purposes. The +// garbage collector is implemented to work with such situtations. However, to +// guarantee that kubelet can handle missing container events, it is +// recommended to set the relist period short and have an auxiliary, longer +// periodic sync in kubelet as the safety net. +type GenericPLEG struct { + // The period for relisting. + relistPeriod time.Duration + // The container runtime. + runtime kubecontainer.Runtime + // The channel from which the subscriber listens events. + eventChannel chan *PodLifecycleEvent + // The internal cache for container information. + containers map[string]containerInfo +} + +type containerInfo struct { + podID types.UID + status kubecontainer.ContainerStatus +} + +func NewGenericPLEG(runtime kubecontainer.Runtime, channelCapacity int, + relistPeriod time.Duration) PodLifecycleEventGenerator { + return &GenericPLEG{ + relistPeriod: relistPeriod, + runtime: runtime, + eventChannel: make(chan *PodLifecycleEvent, channelCapacity), + containers: make(map[string]containerInfo), + } +} + +// Returns a channel from which the subscriber can recieve PodLifecycleEvent +// events. +// TODO: support multiple subscribers. +func (g *GenericPLEG) Watch() chan *PodLifecycleEvent { + return g.eventChannel +} + +// Start spawns a goroutine to relist periodically. +func (g *GenericPLEG) Start() { + go util.Until(g.relist, g.relistPeriod, util.NeverStop) +} + +func generateEvent(podID types.UID, cid string, oldStatus, newStatus kubecontainer.ContainerStatus) *PodLifecycleEvent { + if newStatus == oldStatus { + return nil + } + switch newStatus { + case kubecontainer.ContainerStatusRunning: + return &PodLifecycleEvent{ID: podID, Type: ContainerStarted, Data: cid} + case kubecontainer.ContainerStatusExited: + return &PodLifecycleEvent{ID: podID, Type: ContainerDied, Data: cid} + case kubecontainer.ContainerStatusUnknown: + // Don't generate any event if the status is unknown. + return nil + default: + panic(fmt.Sprintf("unrecognized container status: %v", newStatus)) + } + return nil +} + +// relist queries the container runtime for list of pods/containers, compare +// with the internal pods/containers, and generats events accordingly. +func (g *GenericPLEG) relist() { + glog.V(5).Infof("GenericPLEG: Relisting") + // Get all the pods. + pods, err := g.runtime.GetPods(true) + if err != nil { + glog.Errorf("GenericPLEG: Unable to retrieve pods: %v", err) + return + } + + events := []*PodLifecycleEvent{} + containers := make(map[string]containerInfo, len(g.containers)) + // Create a new containers map, compares container statuses, and generates + // correspoinding events. + for _, p := range pods { + for _, c := range p.Containers { + cid := c.ID.ID + // Get the of existing container info. Defaults to status unknown. + oldStatus := kubecontainer.ContainerStatusUnknown + if info, ok := g.containers[cid]; ok { + oldStatus = info.status + } + // Generate an event if required. + glog.V(7).Infof("GenericPLEG: %v/%v: %v -> %v", p.ID, cid, oldStatus, c.Status) + if e := generateEvent(p.ID, cid, oldStatus, c.Status); e != nil { + events = append(events, e) + } + // Write to the new cache. + containers[cid] = containerInfo{podID: p.ID, status: c.Status} + } + } + + // Swap the container info cache. This is purely to avoid the need of + // garbage collection. + g.containers = containers + + // Send out the events. + for i := range events { + g.eventChannel <- events[i] + } +} diff --git a/pkg/kubelet/pleg/generic_test.go b/pkg/kubelet/pleg/generic_test.go new file mode 100644 index 0000000000000..2a974aaef90b8 --- /dev/null +++ b/pkg/kubelet/pleg/generic_test.go @@ -0,0 +1,148 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pleg + +import ( + "reflect" + "sort" + "testing" + "time" + + kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + "k8s.io/kubernetes/pkg/util" +) + +const ( + testContainerRuntimeType = "fooRuntime" +) + +type TestGenericPLEG struct { + pleg *GenericPLEG + runtime *kubecontainer.FakeRuntime +} + +func newTestGenericPLEG() *TestGenericPLEG { + fakeRuntime := &kubecontainer.FakeRuntime{} + // The channel capacity should be large enough to hold all events in a + // single test. + pleg := &GenericPLEG{ + relistPeriod: time.Hour, + runtime: fakeRuntime, + eventChannel: make(chan *PodLifecycleEvent, 100), + containers: make(map[string]containerInfo), + } + return &TestGenericPLEG{pleg: pleg, runtime: fakeRuntime} +} + +func getEventsFromChannel(ch <-chan *PodLifecycleEvent) []*PodLifecycleEvent { + events := []*PodLifecycleEvent{} + for len(ch) > 0 { + e := <-ch + events = append(events, e) + } + return events +} + +func createTestContainer(ID string, status kubecontainer.ContainerStatus) *kubecontainer.Container { + return &kubecontainer.Container{ + ID: kubecontainer.ContainerID{Type: testContainerRuntimeType, ID: ID}, + Status: status, + } +} + +type sortableEvents []*PodLifecycleEvent + +func (a sortableEvents) Len() int { return len(a) } +func (a sortableEvents) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a sortableEvents) Less(i, j int) bool { + if a[i].ID != a[j].ID { + return a[i].ID < a[j].ID + } + return a[i].Data.(string) < a[j].Data.(string) +} + +func verifyEvents(t *testing.T, expected, actual []*PodLifecycleEvent) { + sort.Sort(sortableEvents(expected)) + sort.Sort(sortableEvents(actual)) + if !reflect.DeepEqual(expected, actual) { + t.Errorf("Actual events differ from the expected; diff: %v", util.ObjectDiff(expected, actual)) + } +} + +func TestRelisting(t *testing.T) { + testPleg := newTestGenericPLEG() + pleg, runtime := testPleg.pleg, testPleg.runtime + ch := pleg.Watch() + + // The first relist should send a PodSync event to each pod. + runtime.AllPodList = []*kubecontainer.Pod{ + { + ID: "1234", + Containers: []*kubecontainer.Container{ + createTestContainer("c1", kubecontainer.ContainerStatusExited), + createTestContainer("c2", kubecontainer.ContainerStatusRunning), + createTestContainer("c3", kubecontainer.ContainerStatusUnknown), + }, + }, + { + ID: "4567", + Containers: []*kubecontainer.Container{ + createTestContainer("c1", kubecontainer.ContainerStatusExited), + }, + }, + } + pleg.relist() + // Report every running/exited container if we see them for the first time. + expected := []*PodLifecycleEvent{ + {ID: "1234", Type: ContainerStarted, Data: "c2"}, + {ID: "4567", Type: ContainerDied, Data: "c1"}, + {ID: "1234", Type: ContainerDied, Data: "c1"}, + } + actual := getEventsFromChannel(ch) + verifyEvents(t, expected, actual) + + // The second relist should not send out any event because no container + // changed. + pleg.relist() + verifyEvents(t, expected, actual) + + runtime.AllPodList = []*kubecontainer.Pod{ + { + ID: "1234", + Containers: []*kubecontainer.Container{ + createTestContainer("c2", kubecontainer.ContainerStatusExited), + createTestContainer("c3", kubecontainer.ContainerStatusRunning), + }, + }, + { + ID: "4567", + Containers: []*kubecontainer.Container{ + createTestContainer("c4", kubecontainer.ContainerStatusRunning), + }, + }, + } + pleg.relist() + // Only report containers that transitioned to running or exited status. + expected = []*PodLifecycleEvent{ + {ID: "1234", Type: ContainerDied, Data: "c2"}, + {ID: "1234", Type: ContainerStarted, Data: "c3"}, + {ID: "4567", Type: ContainerStarted, Data: "c4"}, + } + + actual = getEventsFromChannel(ch) + verifyEvents(t, expected, actual) +} diff --git a/pkg/kubelet/pleg/pleg.go b/pkg/kubelet/pleg/pleg.go new file mode 100644 index 0000000000000..57a80f4481ac2 --- /dev/null +++ b/pkg/kubelet/pleg/pleg.go @@ -0,0 +1,50 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pleg + +import ( + "k8s.io/kubernetes/pkg/types" +) + +type PodLifeCycleEventType string + +const ( + ContainerStarted PodLifeCycleEventType = "ContainerStarted" + ContainerDied PodLifeCycleEventType = "ContainerDied" + NetworkSetupCompleted PodLifeCycleEventType = "NetworkSetupCompleted" + NetworkFailed PodLifeCycleEventType = "NetworkFailed" + // PodSync is used to trigger syncing of a pod when the observed change of + // the state of the pod cannot be captured by any single event above. + PodSync PodLifeCycleEventType = "PodSync" +) + +// PodLifecycleEvent is an event that reflects the change of the pod state. +type PodLifecycleEvent struct { + // The pod ID. + ID types.UID + // The type of the event. + Type PodLifeCycleEventType + // The accompanied data which varies based on the event type. + // - ContainerStarted/ContainerStopped: the container name (string). + // - All other event types: unused. + Data interface{} +} + +type PodLifecycleEventGenerator interface { + Start() + Watch() chan *PodLifecycleEvent +} diff --git a/pkg/kubelet/pod/manager.go b/pkg/kubelet/pod/manager.go index 292ec99afcb64..c57c13f4638ee 100644 --- a/pkg/kubelet/pod/manager.go +++ b/pkg/kubelet/pod/manager.go @@ -171,15 +171,8 @@ func (pm *basicManager) getAllPods() []*api.Pod { return append(podsMapToPods(pm.podByUID), podsMapToPods(pm.mirrorPodByUID)...) } -// GetPodByName provides the (non-mirror) pod that matches namespace and name, -// as well as whether the pod was found. -func (pm *basicManager) GetPodByName(namespace, name string) (*api.Pod, bool) { - podFullName := kubecontainer.BuildPodFullName(name, namespace) - return pm.GetPodByFullName(podFullName) -} - -// GetPodByUID provides the (non-mirror) pod that matches pod UID as well as -// whether the pod was found. +// GetPodByUID provides the (non-mirror) pod that matches pod UID, as well as +// whether the pod is found. func (pm *basicManager) GetPodByUID(uid types.UID) (*api.Pod, bool) { pm.lock.RLock() defer pm.lock.RUnlock() @@ -187,6 +180,13 @@ func (pm *basicManager) GetPodByUID(uid types.UID) (*api.Pod, bool) { return pod, ok } +// GetPodByName provides the (non-mirror) pod that matches namespace and name, +// as well as whether the pod was found. +func (pm *basicManager) GetPodByName(namespace, name string) (*api.Pod, bool) { + podFullName := kubecontainer.BuildPodFullName(name, namespace) + return pm.GetPodByFullName(podFullName) +} + // GetPodByName returns the (non-mirror) pod that matches full name, as well as // whether the pod was found. func (pm *basicManager) GetPodByFullName(podFullName string) (*api.Pod, bool) { diff --git a/pkg/kubelet/rkt/rkt.go b/pkg/kubelet/rkt/rkt.go index 3b30e6924c089..a302e6a34e0e7 100644 --- a/pkg/kubelet/rkt/rkt.go +++ b/pkg/kubelet/rkt/rkt.go @@ -793,7 +793,14 @@ func (r *Runtime) GetPods(all bool) ([]*kubecontainer.Pod, error) { var pods []*kubecontainer.Pod for _, u := range units { if strings.HasPrefix(u.Name, kubernetesUnitPrefix) { - if !all && u.SubState != "running" { + var status kubecontainer.ContainerStatus + switch { + case u.SubState == "running": + status = kubecontainer.ContainerStatusRunning + default: + status = kubecontainer.ContainerStatusExited + } + if !all && status != kubecontainer.ContainerStatusRunning { continue } pod, _, err := r.readServiceFile(u.Name)