Skip to content

Commit

Permalink
Merge pull request #2994 from brendandburns/exec
Browse files Browse the repository at this point in the history
Track the sources that the kubelet has seen
  • Loading branch information
dchen1107 committed Dec 18, 2014
2 parents da7d5d1 + 7da0378 commit edfae86
Show file tree
Hide file tree
Showing 14 changed files with 191 additions and 74 deletions.
48 changes: 42 additions & 6 deletions pkg/kubelet/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ type PodConfig struct {

// the channel of denormalized changes passed to listeners
updates chan kubelet.PodUpdate

// contains the list of all configured sources
sourcesLock sync.Mutex
sources util.StringSet
}

// NewPodConfig creates an object that can merge many configuration sources into a stream
Expand All @@ -64,16 +68,30 @@ func NewPodConfig(mode PodConfigNotificationMode) *PodConfig {
pods: storage,
mux: config.NewMux(storage),
updates: updates,
sources: util.StringSet{},
}
return podConfig
}

// Channel creates or returns a config source channel. The channel
// only accepts PodUpdates
func (c *PodConfig) Channel(source string) chan<- interface{} {
c.sourcesLock.Lock()
defer c.sourcesLock.Unlock()
c.sources.Insert(source)
return c.mux.Channel(source)
}

// SeenAllSources returns true if this config has received a SET
// message from all configured sources, false otherwise.
func (c *PodConfig) SeenAllSources() bool {
if c.pods == nil {
return false
}
glog.V(6).Infof("Looking for %v, have seen %v", c.sources.List(), c.pods.sourcesSeen)
return c.pods.seenSources(c.sources.List()...)
}

// Updates returns a channel of updates to the configuration, properly denormalized.
func (c *PodConfig) Updates() <-chan kubelet.PodUpdate {
return c.updates
Expand All @@ -98,16 +116,21 @@ type podStorage struct {
// on the updates channel
updateLock sync.Mutex
updates chan<- kubelet.PodUpdate

// contains the set of all sources that have sent at least one SET
sourcesSeenLock sync.Mutex
sourcesSeen util.StringSet
}

// TODO: PodConfigNotificationMode could be handled by a listener to the updates channel
// in the future, especially with multiple listeners.
// TODO: allow initialization of the current state of the store with snapshotted version.
func newPodStorage(updates chan<- kubelet.PodUpdate, mode PodConfigNotificationMode) *podStorage {
return &podStorage{
pods: make(map[string]map[string]*api.BoundPod),
mode: mode,
updates: updates,
pods: make(map[string]map[string]*api.BoundPod),
mode: mode,
updates: updates,
sourcesSeen: util.StringSet{},
}
}

Expand Down Expand Up @@ -138,12 +161,12 @@ func (s *podStorage) Merge(source string, change interface{}) error {
s.updates <- *updates
}
if len(deletes.Pods) > 0 || len(adds.Pods) > 0 {
s.updates <- kubelet.PodUpdate{s.MergedState().([]api.BoundPod), kubelet.SET}
s.updates <- kubelet.PodUpdate{s.MergedState().([]api.BoundPod), kubelet.SET, source}
}

case PodConfigNotificationSnapshot:
if len(updates.Pods) > 0 || len(deletes.Pods) > 0 || len(adds.Pods) > 0 {
s.updates <- kubelet.PodUpdate{s.MergedState().([]api.BoundPod), kubelet.SET}
s.updates <- kubelet.PodUpdate{s.MergedState().([]api.BoundPod), kubelet.SET, source}
}

default:
Expand Down Expand Up @@ -212,6 +235,7 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de

case kubelet.SET:
glog.V(4).Infof("Setting pods for source %s : %v", source, update)
s.markSourceSet(source)
// Clear the old map entries by just creating a new map
oldPods := pods
pods = make(map[string]*api.BoundPod)
Expand Down Expand Up @@ -254,6 +278,18 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
return adds, updates, deletes
}

func (s *podStorage) markSourceSet(source string) {
s.sourcesSeenLock.Lock()
defer s.sourcesSeenLock.Unlock()
s.sourcesSeen.Insert(source)
}

func (s *podStorage) seenSources(sources ...string) bool {
s.sourcesSeenLock.Lock()
defer s.sourcesSeenLock.Unlock()
return s.sourcesSeen.HasAll(sources...)
}

func filterInvalidPods(pods []api.BoundPod, source string) (filtered []*api.BoundPod) {
names := util.StringSet{}
for i := range pods {
Expand All @@ -280,7 +316,7 @@ func filterInvalidPods(pods []api.BoundPod, source string) (filtered []*api.Boun
func (s *podStorage) Sync() {
s.updateLock.Lock()
defer s.updateLock.Unlock()
s.updates <- kubelet.PodUpdate{s.MergedState().([]api.BoundPod), kubelet.SET}
s.updates <- kubelet.PodUpdate{s.MergedState().([]api.BoundPod), kubelet.SET, kubelet.AllSource}
}

// Object implements config.Accessor
Expand Down
89 changes: 47 additions & 42 deletions pkg/kubelet/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
)

const (
NoneSource = ""
TestSource = "test"
)

func expectEmptyChannel(t *testing.T, ch <-chan interface{}) {
select {
case update := <-ch:
Expand Down Expand Up @@ -58,23 +63,23 @@ func CreateValidPod(name, namespace, source string) api.BoundPod {
}
}

func CreatePodUpdate(op kubelet.PodOperation, pods ...api.BoundPod) kubelet.PodUpdate {
func CreatePodUpdate(op kubelet.PodOperation, source string, pods ...api.BoundPod) kubelet.PodUpdate {
// We deliberately return an empty slice instead of a nil pointer here
// because reflect.DeepEqual differentiates between the two and we need to
// pick one for consistency.
newPods := make([]api.BoundPod, len(pods))
if len(pods) == 0 {
return kubelet.PodUpdate{newPods, op}
return kubelet.PodUpdate{newPods, op, source}
}
for i := range pods {
newPods[i] = pods[i]
}
return kubelet.PodUpdate{newPods, op}
return kubelet.PodUpdate{newPods, op, source}
}

func createPodConfigTester(mode PodConfigNotificationMode) (chan<- interface{}, <-chan kubelet.PodUpdate, *PodConfig) {
config := NewPodConfig(mode)
channel := config.Channel("test")
channel := config.Channel(TestSource)
ch := config.Updates()
return channel, ch, config
}
Expand Down Expand Up @@ -102,63 +107,63 @@ func TestNewPodAdded(t *testing.T) {
channel, ch, config := createPodConfigTester(PodConfigNotificationIncremental)

// see an update
podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", ""))
podUpdate := CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "new", ""))
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", "test")))
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "new", "test")))

config.Sync()
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "new", "test")))
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, kubelet.AllSource, CreateValidPod("foo", "new", "test")))
}

func TestNewPodAddedInvalidNamespace(t *testing.T) {
channel, ch, config := createPodConfigTester(PodConfigNotificationIncremental)

// see an update
podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "", ""))
podUpdate := CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "", ""))
channel <- podUpdate
config.Sync()
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET))
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, kubelet.AllSource))
}

func TestNewPodAddedDefaultNamespace(t *testing.T) {
channel, ch, config := createPodConfigTester(PodConfigNotificationIncremental)

// see an update
podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "default", ""))
podUpdate := CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "default", ""))
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "default", "test")))
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "default", "test")))

config.Sync()
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "default", "test")))
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, kubelet.AllSource, CreateValidPod("foo", "default", "test")))
}

func TestNewPodAddedDifferentNamespaces(t *testing.T) {
channel, ch, config := createPodConfigTester(PodConfigNotificationIncremental)

// see an update
podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "default", ""))
podUpdate := CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "default", ""))
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "default", "test")))
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "default", "test")))

// see an update in another namespace
podUpdate = CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", ""))
podUpdate = CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "new", ""))
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", "test")))
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "new", "test")))

config.Sync()
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "default", "test"), CreateValidPod("foo", "new", "test")))
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, kubelet.AllSource, CreateValidPod("foo", "default", "test"), CreateValidPod("foo", "new", "test")))
}

func TestInvalidPodFiltered(t *testing.T) {
channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental)

// see an update
podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", ""))
podUpdate := CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "new", ""))
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", "test")))
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "new", "test")))

// add an invalid update
podUpdate = CreatePodUpdate(kubelet.UPDATE, api.BoundPod{ObjectMeta: api.ObjectMeta{Name: "foo"}})
podUpdate = CreatePodUpdate(kubelet.UPDATE, NoneSource, api.BoundPod{ObjectMeta: api.ObjectMeta{Name: "foo"}})
channel <- podUpdate
expectNoPodUpdate(t, ch)
}
Expand All @@ -167,79 +172,79 @@ func TestNewPodAddedSnapshotAndUpdates(t *testing.T) {
channel, ch, config := createPodConfigTester(PodConfigNotificationSnapshotAndUpdates)

// see an set
podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", ""))
podUpdate := CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "new", ""))
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "new", "test")))
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, TestSource, CreateValidPod("foo", "new", "test")))

config.Sync()
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "new", "test")))
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, kubelet.AllSource, CreateValidPod("foo", "new", "test")))

// container updates are separated as UPDATE
pod := podUpdate.Pods[0]
pod.Spec.Containers = []api.Container{{Name: "bar", Image: "test"}}
channel <- CreatePodUpdate(kubelet.ADD, pod)
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.UPDATE, pod))
channel <- CreatePodUpdate(kubelet.ADD, NoneSource, pod)
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.UPDATE, NoneSource, pod))
}

func TestNewPodAddedSnapshot(t *testing.T) {
channel, ch, config := createPodConfigTester(PodConfigNotificationSnapshot)

// see an set
podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", ""))
podUpdate := CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "new", ""))
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "new", "test")))
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, TestSource, CreateValidPod("foo", "new", "test")))

config.Sync()
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "new", "test")))
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, kubelet.AllSource, CreateValidPod("foo", "new", "test")))

// container updates are separated as UPDATE
pod := podUpdate.Pods[0]
pod.Spec.Containers = []api.Container{{Name: "bar", Image: "test"}}
channel <- CreatePodUpdate(kubelet.ADD, pod)
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, pod))
channel <- CreatePodUpdate(kubelet.ADD, NoneSource, pod)
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, TestSource, pod))
}

func TestNewPodAddedUpdatedRemoved(t *testing.T) {
channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental)

// should register an add
podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", ""))
podUpdate := CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "new", ""))
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", "test")))
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "new", "test")))

// should ignore ADDs that are identical
expectNoPodUpdate(t, ch)

// an kubelet.ADD should be converted to kubelet.UPDATE
pod := CreateValidPod("foo", "new", "test")
pod.Spec.Containers = []api.Container{{Name: "bar", Image: "test"}}
podUpdate = CreatePodUpdate(kubelet.ADD, pod)
podUpdate = CreatePodUpdate(kubelet.ADD, NoneSource, pod)
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.UPDATE, pod))
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.UPDATE, NoneSource, pod))

podUpdate = CreatePodUpdate(kubelet.REMOVE, api.BoundPod{ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: "new"}})
podUpdate = CreatePodUpdate(kubelet.REMOVE, NoneSource, api.BoundPod{ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: "new"}})
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.REMOVE, pod))
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.REMOVE, NoneSource, pod))
}

func TestNewPodAddedUpdatedSet(t *testing.T) {
channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental)

// should register an add
podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", ""), CreateValidPod("foo2", "new", ""), CreateValidPod("foo3", "new", ""))
podUpdate := CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "new", ""), CreateValidPod("foo2", "new", ""), CreateValidPod("foo3", "new", ""))
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", "test"), CreateValidPod("foo2", "new", "test"), CreateValidPod("foo3", "new", "test")))
expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "new", "test"), CreateValidPod("foo2", "new", "test"), CreateValidPod("foo3", "new", "test")))

// should ignore ADDs that are identical
expectNoPodUpdate(t, ch)

// should be converted to an kubelet.ADD, kubelet.REMOVE, and kubelet.UPDATE
pod := CreateValidPod("foo2", "new", "test")
pod.Spec.Containers = []api.Container{{Name: "bar", Image: "test"}}
podUpdate = CreatePodUpdate(kubelet.SET, pod, CreateValidPod("foo3", "new", ""), CreateValidPod("foo4", "new", "test"))
podUpdate = CreatePodUpdate(kubelet.SET, NoneSource, pod, CreateValidPod("foo3", "new", ""), CreateValidPod("foo4", "new", "test"))
channel <- podUpdate
expectPodUpdate(t, ch,
CreatePodUpdate(kubelet.REMOVE, CreateValidPod("foo", "new", "test")),
CreatePodUpdate(kubelet.ADD, CreateValidPod("foo4", "new", "test")),
CreatePodUpdate(kubelet.UPDATE, pod))
CreatePodUpdate(kubelet.REMOVE, NoneSource, CreateValidPod("foo", "new", "test")),
CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo4", "new", "test")),
CreatePodUpdate(kubelet.UPDATE, NoneSource, pod))
}
2 changes: 1 addition & 1 deletion pkg/kubelet/config/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (s *sourceEtcd) run() {
}

glog.V(4).Infof("Received state from etcd watch: %+v", pods)
s.updates <- kubelet.PodUpdate{pods, kubelet.SET}
s.updates <- kubelet.PodUpdate{pods, kubelet.SET, kubelet.EtcdSource}
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/kubelet/config/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,14 @@ func (s *sourceFile) extractFromPath() error {
if err != nil {
return err
}
s.updates <- kubelet.PodUpdate{pods, kubelet.SET}
s.updates <- kubelet.PodUpdate{pods, kubelet.SET, kubelet.FileSource}

case statInfo.Mode().IsRegular():
pod, err := extractFromFile(path)
if err != nil {
return err
}
s.updates <- kubelet.PodUpdate{[]api.BoundPod{pod}, kubelet.SET}
s.updates <- kubelet.PodUpdate{[]api.BoundPod{pod}, kubelet.SET, kubelet.FileSource}

default:
return fmt.Errorf("path is not a directory or file")
Expand Down
Loading

0 comments on commit edfae86

Please sign in to comment.