From 32fd331e73981641bb421f4b5d29e005c3519e28 Mon Sep 17 00:00:00 2001 From: Yu-Ju Hong Date: Thu, 5 Mar 2015 10:49:36 -0800 Subject: [PATCH] Revert "Split up kubelet "source seen" logic" We want to sync pods from file/http/etcd sources to the apiserver, hence differentiating sources is no longer desired. This reverts commit 110ab6f1bd54e10d9e323fdb4534638d0c1c637c. --- cmd/kubelet/app/server.go | 2 +- pkg/kubelet/config/config.go | 18 +++++++--- pkg/kubelet/kubelet.go | 22 ++++++------ pkg/kubelet/kubelet_test.go | 65 ++---------------------------------- pkg/kubelet/types.go | 19 ----------- pkg/kubelet/types_test.go | 44 ------------------------ 6 files changed, 26 insertions(+), 144 deletions(-) delete mode 100644 pkg/kubelet/types_test.go diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index 3fcec121b9275..fc7c20e1b30b1 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -402,7 +402,7 @@ func createAndInitKubelet(kc *KubeletConfig, pc *config.PodConfig) (*kubelet.Kub kc.RegistryBurst, kc.MinimumGCAge, kc.MaxContainerCount, - pc.IsSourceSeen, + pc.SeenAllSources, kc.ClusterDomain, net.IP(kc.ClusterDNS), kc.MasterServiceNamespace, diff --git a/pkg/kubelet/config/config.go b/pkg/kubelet/config/config.go index 2254bee198d42..8c1e5ac59d9a0 100644 --- a/pkg/kubelet/config/config.go +++ b/pkg/kubelet/config/config.go @@ -55,6 +55,10 @@ type PodConfig struct { // the channel of denormalized changes passed to listeners updates chan kubelet.PodUpdate + + // contains the list of all configured sources + sourcesLock sync.Mutex + sources util.StringSet } // NewPodConfig creates an object that can merge many configuration sources into a stream @@ -66,6 +70,7 @@ func NewPodConfig(mode PodConfigNotificationMode, recorder record.EventRecorder) pods: storage, mux: config.NewMux(storage), updates: updates, + sources: util.StringSet{}, } return podConfig } @@ -73,17 +78,20 @@ func NewPodConfig(mode PodConfigNotificationMode, recorder record.EventRecorder) // Channel creates or returns a config source channel. The channel // only accepts PodUpdates func (c *PodConfig) Channel(source string) chan<- interface{} { + c.sourcesLock.Lock() + defer c.sourcesLock.Unlock() + c.sources.Insert(source) return c.mux.Channel(source) } -// IsSourceSeen returns true if the specified source string has previously -// been marked as seen. -func (c *PodConfig) IsSourceSeen(source string) bool { +// SeenAllSources returns true if this config has received a SET +// message from all configured sources, false otherwise. +func (c *PodConfig) SeenAllSources() bool { if c.pods == nil { return false } - glog.V(6).Infof("Looking for %v, have seen %v", source, c.pods.sourcesSeen) - return c.pods.seenSources(source) + glog.V(6).Infof("Looking for %v, have seen %v", c.sources.List(), c.pods.sourcesSeen) + return c.pods.seenSources(c.sources.List()...) } // Updates returns a channel of updates to the configuration, properly denormalized. diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index e12b84a5f951e..c87606ddcdcd4 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -67,7 +67,7 @@ type SyncHandler interface { SyncPods(pods []api.BoundPod, podSyncTypes map[types.UID]metrics.SyncPodType, startTime time.Time) error } -type SourceReadyFn func(source string) bool +type SourcesReadyFn func() bool type volumeMap map[string]volume.Interface @@ -84,7 +84,7 @@ func NewMainKubelet( pullBurst int, minimumGCAge time.Duration, maxContainerCount int, - sourceReady SourceReadyFn, + sourcesReady SourcesReadyFn, clusterDomain string, clusterDNS net.IP, masterServiceNamespace string, @@ -128,7 +128,7 @@ func NewMainKubelet( pullBurst: pullBurst, minimumGCAge: minimumGCAge, maxContainerCount: maxContainerCount, - sourceReady: sourceReady, + sourcesReady: sourcesReady, clusterDomain: clusterDomain, clusterDNS: clusterDNS, serviceLister: serviceLister, @@ -178,7 +178,7 @@ type Kubelet struct { podInfraContainerImage string podWorkers *podWorkers resyncInterval time.Duration - sourceReady SourceReadyFn + sourcesReady SourcesReadyFn // Protects the pods array // We make complete array copies out of this while locked, which is OK because once added to this array, @@ -1395,10 +1395,15 @@ func (kl *Kubelet) SyncPods(allPods []api.BoundPod, podSyncTypes map[types.UID]m metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers))) } } - // Stop the workers for no-longer existing pods. kl.podWorkers.ForgetNonExistingPodWorkers(desiredPods) + if !kl.sourcesReady() { + // If the sources aren't ready, skip deletion, as we may accidentally delete pods + // for sources that haven't reported yet. + glog.V(4).Infof("Skipping deletes, sources aren't ready yet.") + return nil + } // Kill any containers we don't need. killed := []string{} for ix := range dockerContainers { @@ -1408,13 +1413,6 @@ func (kl *Kubelet) SyncPods(allPods []api.BoundPod, podSyncTypes map[types.UID]m // syncPod() will handle this one. continue } - _, _, podAnnotations := ParsePodFullName(podFullName) - if source := podAnnotations[ConfigSourceAnnotationKey]; !kl.sourceReady(source) { - // If the source for this container is not ready, skip deletion, so that we don't accidentally - // delete containers for sources that haven't reported yet. - glog.V(4).Infof("Skipping delete of container (%q), source (%s) aren't ready yet.", podFullName, source) - continue - } pc := podContainer{podFullName, uid, containerName} if _, ok := desiredContainers[pc]; !ok { glog.V(1).Infof("Killing unwanted container %+v", pc) diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 1098b40569bd5..38ecc10ce0785 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -78,7 +78,7 @@ func newTestKubelet(t *testing.T) (*Kubelet, *dockertools.FakeDockerClient, *syn return err }, recorder) - kubelet.sourceReady = func(source string) bool { return true } + kubelet.sourcesReady = func() bool { return true } kubelet.masterServiceNamespace = api.NamespaceDefault kubelet.serviceLister = testServiceLister{} kubelet.readiness = newReadinessStates() @@ -722,7 +722,7 @@ func TestSyncPodsDeletesWithNoPodInfraContainer(t *testing.T) { func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) { ready := false kubelet, fakeDocker, _ := newTestKubelet(t) - kubelet.sourceReady = func(source string) bool { return ready } + kubelet.sourcesReady = func() bool { return ready } fakeDocker.ContainerList = []docker.APIContainers{ { @@ -762,67 +762,6 @@ func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) { } } -func TestSyncPodsDeletesWhenContainerSourceReady(t *testing.T) { - ready := false - kubelet, fakeDocker, _ := newTestKubelet(t) - kubelet.sourceReady = func(source string) bool { - if source == "testSource" { - return ready - } - return false - } - - fakeDocker.ContainerList = []docker.APIContainers{ - { - // the k8s prefix is required for the kubelet to manage the container - Names: []string{"/k8s_boo_bar.default.testSource_12345678_42"}, - ID: "7492", - }, - { - // pod infra container - Names: []string{"/k8s_POD_boo.default.testSource_12345678_42"}, - ID: "3542", - }, - - { - // the k8s prefix is required for the kubelet to manage the container - Names: []string{"/k8s_foo_bar.new.otherSource_12345678_42"}, - ID: "1234", - }, - { - // pod infra container - Names: []string{"/k8s_POD_foo.new.otherSource_12345678_42"}, - ID: "9876", - }, - } - if err := kubelet.SyncPods([]api.BoundPod{}, emptyPodUIDs, time.Now()); err != nil { - t.Errorf("unexpected error: %v", err) - } - // Validate nothing happened. - verifyCalls(t, fakeDocker, []string{"list"}) - fakeDocker.ClearCalls() - - ready = true - if err := kubelet.SyncPods([]api.BoundPod{}, emptyPodUIDs, time.Now()); err != nil { - t.Errorf("unexpected error: %v", err) - } - verifyCalls(t, fakeDocker, []string{"list", "stop", "stop", "inspect_container", "inspect_container"}) - - // Validate container for testSource are killed because testSource is reported as seen, but - // containers for otherSource are not killed because otherSource has not. - expectedToStop := map[string]bool{ - "7492": true, - "3542": true, - "1234": false, - "9876": false, - } - if len(fakeDocker.Stopped) != 2 || - !expectedToStop[fakeDocker.Stopped[0]] || - !expectedToStop[fakeDocker.Stopped[1]] { - t.Errorf("Wrong containers were stopped: %v", fakeDocker.Stopped) - } -} - func TestSyncPodsDeletes(t *testing.T) { kubelet, fakeDocker, _ := newTestKubelet(t) fakeDocker.ContainerList = []docker.APIContainers{ diff --git a/pkg/kubelet/types.go b/pkg/kubelet/types.go index 5bf978d77d035..211e32a922745 100644 --- a/pkg/kubelet/types.go +++ b/pkg/kubelet/types.go @@ -18,10 +18,8 @@ package kubelet import ( "fmt" - "strings" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - "github.com/golang/glog" ) const ConfigSourceAnnotationKey = "kubernetes.io/config.source" @@ -73,20 +71,3 @@ type PodUpdate struct { func GetPodFullName(pod *api.BoundPod) string { return fmt.Sprintf("%s.%s.%s", pod.Name, pod.Namespace, pod.Annotations[ConfigSourceAnnotationKey]) } - -// ParsePodFullName unpacks a pod full name and returns the pod name, namespace, and annotations. -// If the pod full name is invalid, empty strings are returend. -func ParsePodFullName(podFullName string) (podName, podNamespace string, podAnnotations map[string]string) { - parts := strings.Split(podFullName, ".") - expectedNumFields := 3 - actualNumFields := len(parts) - if actualNumFields != expectedNumFields { - glog.Errorf("found a podFullName (%q) with too few fields: expected %d, actual %d.", podFullName, expectedNumFields, actualNumFields) - return - } - podName = parts[0] - podNamespace = parts[1] - podAnnotations = make(map[string]string) - podAnnotations[ConfigSourceAnnotationKey] = parts[2] - return -} diff --git a/pkg/kubelet/types_test.go b/pkg/kubelet/types_test.go deleted file mode 100644 index b655aaf411e03..0000000000000 --- a/pkg/kubelet/types_test.go +++ /dev/null @@ -1,44 +0,0 @@ -/* -Copyright 2015 Google Inc. All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package kubelet - -import ( - "testing" -) - -func TestParsePodFullName(t *testing.T) { - // Arrange - podFullName := "ca4e7148-9ab9-11e4-924c-f0921cde18c1.default.etcd" - - // Act - podName, podNamespace, podAnnotations := ParsePodFullName(podFullName) - - // Assert - expectedPodName := "ca4e7148-9ab9-11e4-924c-f0921cde18c1" - expectedPodNamespace := "default" - expectedSource := "etcd" - if podName != expectedPodName { - t.Errorf("Unexpected PodName. Expected: %q Actual: %q", expectedPodName, podName) - } - if podNamespace != expectedPodNamespace { - t.Errorf("Unexpected PodNamespace. Expected: %q Actual: %q", expectedPodNamespace, podNamespace) - } - if podAnnotations[ConfigSourceAnnotationKey] != expectedSource { - t.Errorf("Unexpected PodSource. Expected: %q Actual: %q", expectedPodNamespace, podNamespace) - } - -}