Skip to content

Commit

Permalink
Revert "Split up kubelet "source seen" logic"
Browse files Browse the repository at this point in the history
We want to sync pods from file/http/etcd sources to the apiserver, hence
differentiating sources is no longer desired.

This reverts commit 110ab6f.
  • Loading branch information
yujuhong committed Mar 6, 2015
1 parent 2d0743b commit 32fd331
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 144 deletions.
2 changes: 1 addition & 1 deletion cmd/kubelet/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ func createAndInitKubelet(kc *KubeletConfig, pc *config.PodConfig) (*kubelet.Kub
kc.RegistryBurst,
kc.MinimumGCAge,
kc.MaxContainerCount,
pc.IsSourceSeen,
pc.SeenAllSources,
kc.ClusterDomain,
net.IP(kc.ClusterDNS),
kc.MasterServiceNamespace,
Expand Down
18 changes: 13 additions & 5 deletions pkg/kubelet/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ type PodConfig struct {

// the channel of denormalized changes passed to listeners
updates chan kubelet.PodUpdate

// contains the list of all configured sources
sourcesLock sync.Mutex
sources util.StringSet
}

// NewPodConfig creates an object that can merge many configuration sources into a stream
Expand All @@ -66,24 +70,28 @@ func NewPodConfig(mode PodConfigNotificationMode, recorder record.EventRecorder)
pods: storage,
mux: config.NewMux(storage),
updates: updates,
sources: util.StringSet{},
}
return podConfig
}

// Channel creates or returns a config source channel. The channel
// only accepts PodUpdates
func (c *PodConfig) Channel(source string) chan<- interface{} {
c.sourcesLock.Lock()
defer c.sourcesLock.Unlock()
c.sources.Insert(source)
return c.mux.Channel(source)
}

// IsSourceSeen returns true if the specified source string has previously
// been marked as seen.
func (c *PodConfig) IsSourceSeen(source string) bool {
// SeenAllSources returns true if this config has received a SET
// message from all configured sources, false otherwise.
func (c *PodConfig) SeenAllSources() bool {
if c.pods == nil {
return false
}
glog.V(6).Infof("Looking for %v, have seen %v", source, c.pods.sourcesSeen)
return c.pods.seenSources(source)
glog.V(6).Infof("Looking for %v, have seen %v", c.sources.List(), c.pods.sourcesSeen)
return c.pods.seenSources(c.sources.List()...)
}

// Updates returns a channel of updates to the configuration, properly denormalized.
Expand Down
22 changes: 10 additions & 12 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type SyncHandler interface {
SyncPods(pods []api.BoundPod, podSyncTypes map[types.UID]metrics.SyncPodType, startTime time.Time) error
}

type SourceReadyFn func(source string) bool
type SourcesReadyFn func() bool

type volumeMap map[string]volume.Interface

Expand All @@ -84,7 +84,7 @@ func NewMainKubelet(
pullBurst int,
minimumGCAge time.Duration,
maxContainerCount int,
sourceReady SourceReadyFn,
sourcesReady SourcesReadyFn,
clusterDomain string,
clusterDNS net.IP,
masterServiceNamespace string,
Expand Down Expand Up @@ -128,7 +128,7 @@ func NewMainKubelet(
pullBurst: pullBurst,
minimumGCAge: minimumGCAge,
maxContainerCount: maxContainerCount,
sourceReady: sourceReady,
sourcesReady: sourcesReady,
clusterDomain: clusterDomain,
clusterDNS: clusterDNS,
serviceLister: serviceLister,
Expand Down Expand Up @@ -178,7 +178,7 @@ type Kubelet struct {
podInfraContainerImage string
podWorkers *podWorkers
resyncInterval time.Duration
sourceReady SourceReadyFn
sourcesReady SourcesReadyFn

// Protects the pods array
// We make complete array copies out of this while locked, which is OK because once added to this array,
Expand Down Expand Up @@ -1395,10 +1395,15 @@ func (kl *Kubelet) SyncPods(allPods []api.BoundPod, podSyncTypes map[types.UID]m
metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))
}
}

// Stop the workers for no-longer existing pods.
kl.podWorkers.ForgetNonExistingPodWorkers(desiredPods)

if !kl.sourcesReady() {
// If the sources aren't ready, skip deletion, as we may accidentally delete pods
// for sources that haven't reported yet.
glog.V(4).Infof("Skipping deletes, sources aren't ready yet.")
return nil
}
// Kill any containers we don't need.
killed := []string{}
for ix := range dockerContainers {
Expand All @@ -1408,13 +1413,6 @@ func (kl *Kubelet) SyncPods(allPods []api.BoundPod, podSyncTypes map[types.UID]m
// syncPod() will handle this one.
continue
}
_, _, podAnnotations := ParsePodFullName(podFullName)
if source := podAnnotations[ConfigSourceAnnotationKey]; !kl.sourceReady(source) {
// If the source for this container is not ready, skip deletion, so that we don't accidentally
// delete containers for sources that haven't reported yet.
glog.V(4).Infof("Skipping delete of container (%q), source (%s) aren't ready yet.", podFullName, source)
continue
}
pc := podContainer{podFullName, uid, containerName}
if _, ok := desiredContainers[pc]; !ok {
glog.V(1).Infof("Killing unwanted container %+v", pc)
Expand Down
65 changes: 2 additions & 63 deletions pkg/kubelet/kubelet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func newTestKubelet(t *testing.T) (*Kubelet, *dockertools.FakeDockerClient, *syn
return err
},
recorder)
kubelet.sourceReady = func(source string) bool { return true }
kubelet.sourcesReady = func() bool { return true }
kubelet.masterServiceNamespace = api.NamespaceDefault
kubelet.serviceLister = testServiceLister{}
kubelet.readiness = newReadinessStates()
Expand Down Expand Up @@ -722,7 +722,7 @@ func TestSyncPodsDeletesWithNoPodInfraContainer(t *testing.T) {
func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) {
ready := false
kubelet, fakeDocker, _ := newTestKubelet(t)
kubelet.sourceReady = func(source string) bool { return ready }
kubelet.sourcesReady = func() bool { return ready }

fakeDocker.ContainerList = []docker.APIContainers{
{
Expand Down Expand Up @@ -762,67 +762,6 @@ func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) {
}
}

func TestSyncPodsDeletesWhenContainerSourceReady(t *testing.T) {
ready := false
kubelet, fakeDocker, _ := newTestKubelet(t)
kubelet.sourceReady = func(source string) bool {
if source == "testSource" {
return ready
}
return false
}

fakeDocker.ContainerList = []docker.APIContainers{
{
// the k8s prefix is required for the kubelet to manage the container
Names: []string{"/k8s_boo_bar.default.testSource_12345678_42"},
ID: "7492",
},
{
// pod infra container
Names: []string{"/k8s_POD_boo.default.testSource_12345678_42"},
ID: "3542",
},

{
// the k8s prefix is required for the kubelet to manage the container
Names: []string{"/k8s_foo_bar.new.otherSource_12345678_42"},
ID: "1234",
},
{
// pod infra container
Names: []string{"/k8s_POD_foo.new.otherSource_12345678_42"},
ID: "9876",
},
}
if err := kubelet.SyncPods([]api.BoundPod{}, emptyPodUIDs, time.Now()); err != nil {
t.Errorf("unexpected error: %v", err)
}
// Validate nothing happened.
verifyCalls(t, fakeDocker, []string{"list"})
fakeDocker.ClearCalls()

ready = true
if err := kubelet.SyncPods([]api.BoundPod{}, emptyPodUIDs, time.Now()); err != nil {
t.Errorf("unexpected error: %v", err)
}
verifyCalls(t, fakeDocker, []string{"list", "stop", "stop", "inspect_container", "inspect_container"})

// Validate container for testSource are killed because testSource is reported as seen, but
// containers for otherSource are not killed because otherSource has not.
expectedToStop := map[string]bool{
"7492": true,
"3542": true,
"1234": false,
"9876": false,
}
if len(fakeDocker.Stopped) != 2 ||
!expectedToStop[fakeDocker.Stopped[0]] ||
!expectedToStop[fakeDocker.Stopped[1]] {
t.Errorf("Wrong containers were stopped: %v", fakeDocker.Stopped)
}
}

func TestSyncPodsDeletes(t *testing.T) {
kubelet, fakeDocker, _ := newTestKubelet(t)
fakeDocker.ContainerList = []docker.APIContainers{
Expand Down
19 changes: 0 additions & 19 deletions pkg/kubelet/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@ package kubelet

import (
"fmt"
"strings"

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/golang/glog"
)

const ConfigSourceAnnotationKey = "kubernetes.io/config.source"
Expand Down Expand Up @@ -73,20 +71,3 @@ type PodUpdate struct {
func GetPodFullName(pod *api.BoundPod) string {
return fmt.Sprintf("%s.%s.%s", pod.Name, pod.Namespace, pod.Annotations[ConfigSourceAnnotationKey])
}

// ParsePodFullName unpacks a pod full name and returns the pod name, namespace, and annotations.
// If the pod full name is invalid, empty strings are returend.
func ParsePodFullName(podFullName string) (podName, podNamespace string, podAnnotations map[string]string) {
parts := strings.Split(podFullName, ".")
expectedNumFields := 3
actualNumFields := len(parts)
if actualNumFields != expectedNumFields {
glog.Errorf("found a podFullName (%q) with too few fields: expected %d, actual %d.", podFullName, expectedNumFields, actualNumFields)
return
}
podName = parts[0]
podNamespace = parts[1]
podAnnotations = make(map[string]string)
podAnnotations[ConfigSourceAnnotationKey] = parts[2]
return
}
44 changes: 0 additions & 44 deletions pkg/kubelet/types_test.go

This file was deleted.

0 comments on commit 32fd331

Please sign in to comment.