Skip to content

Commit

Permalink
Merge pull request #123039 from tallclair/configutil
Browse files Browse the repository at this point in the history
Clean up single use pkg/util/config
  • Loading branch information
k8s-ci-robot authored Jan 31, 2024
2 parents 7080b51 + 77f03c1 commit 46c9bd1
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 143 deletions.
20 changes: 9 additions & 11 deletions pkg/kubelet/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"k8s.io/kubernetes/pkg/kubelet/events"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/kubelet/util/format"
"k8s.io/kubernetes/pkg/util/config"
)

// PodConfigNotificationMode describes how changes are sent to the update channel.
Expand Down Expand Up @@ -61,7 +60,7 @@ type podStartupSLIObserver interface {
// in order.
type PodConfig struct {
pods *podStorage
mux *config.Mux
mux *mux

// the channel of denormalized changes passed to listeners
updates chan kubetypes.PodUpdate
Expand All @@ -78,7 +77,7 @@ func NewPodConfig(mode PodConfigNotificationMode, recorder record.EventRecorder,
storage := newPodStorage(updates, mode, recorder, startupSLIObserver)
podConfig := &PodConfig{
pods: storage,
mux: config.NewMux(storage),
mux: newMux(storage),
updates: updates,
sources: sets.String{},
}
Expand Down Expand Up @@ -113,7 +112,7 @@ func (c *PodConfig) Updates() <-chan kubetypes.PodUpdate {

// Sync requests the full configuration be delivered to the update channel.
func (c *PodConfig) Sync() {
c.pods.Sync()
c.pods.sync()
}

// podStorage manages the current pod state at any point in time and ensures updates
Expand Down Expand Up @@ -194,7 +193,7 @@ func (s *podStorage) Merge(source string, change interface{}) error {

case PodConfigNotificationSnapshotAndUpdates:
if len(removes.Pods) > 0 || len(adds.Pods) > 0 || firstSet {
s.updates <- kubetypes.PodUpdate{Pods: s.MergedState().([]*v1.Pod), Op: kubetypes.SET, Source: source}
s.updates <- kubetypes.PodUpdate{Pods: s.mergedState().([]*v1.Pod), Op: kubetypes.SET, Source: source}
}
if len(updates.Pods) > 0 {
s.updates <- *updates
Expand All @@ -205,7 +204,7 @@ func (s *podStorage) Merge(source string, change interface{}) error {

case PodConfigNotificationSnapshot:
if len(updates.Pods) > 0 || len(deletes.Pods) > 0 || len(adds.Pods) > 0 || len(removes.Pods) > 0 || firstSet {
s.updates <- kubetypes.PodUpdate{Pods: s.MergedState().([]*v1.Pod), Op: kubetypes.SET, Source: source}
s.updates <- kubetypes.PodUpdate{Pods: s.mergedState().([]*v1.Pod), Op: kubetypes.SET, Source: source}
}

case PodConfigNotificationUnknown:
Expand Down Expand Up @@ -471,15 +470,14 @@ func checkAndUpdatePod(existing, ref *v1.Pod) (needUpdate, needReconcile, needGr
return
}

// Sync sends a copy of the current state through the update channel.
func (s *podStorage) Sync() {
// sync sends a copy of the current state through the update channel.
func (s *podStorage) sync() {
s.updateLock.Lock()
defer s.updateLock.Unlock()
s.updates <- kubetypes.PodUpdate{Pods: s.MergedState().([]*v1.Pod), Op: kubetypes.SET, Source: kubetypes.AllSource}
s.updates <- kubetypes.PodUpdate{Pods: s.mergedState().([]*v1.Pod), Op: kubetypes.SET, Source: kubetypes.AllSource}
}

// Object implements config.Accessor
func (s *podStorage) MergedState() interface{} {
func (s *podStorage) mergedState() interface{} {
s.podLock.RLock()
defer s.podLock.RUnlock()
pods := make([]*v1.Pod, 0)
Expand Down
80 changes: 9 additions & 71 deletions pkg/util/config/config.go → pkg/kubelet/config/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,35 +23,28 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
)

type Merger interface {
type merger interface {
// Invoked when a change from a source is received. May also function as an incremental
// merger if you wish to consume changes incrementally. Must be reentrant when more than
// one source is defined.
Merge(source string, update interface{}) error
}

// MergeFunc implements the Merger interface
type MergeFunc func(source string, update interface{}) error

func (f MergeFunc) Merge(source string, update interface{}) error {
return f(source, update)
}

// Mux is a class for merging configuration from multiple sources. Changes are
// mux is a class for merging configuration from multiple sources. Changes are
// pushed via channels and sent to the merge function.
type Mux struct {
type mux struct {
// Invoked when an update is sent to a source.
merger Merger
merger merger

// Sources and their lock.
sourceLock sync.RWMutex
// Maps source names to channels
sources map[string]chan interface{}
}

// NewMux creates a new mux that can merge changes from multiple sources.
func NewMux(merger Merger) *Mux {
mux := &Mux{
// newMux creates a new mux that can merge changes from multiple sources.
func newMux(merger merger) *mux {
mux := &mux{
sources: make(map[string]chan interface{}),
merger: merger,
}
Expand All @@ -63,7 +56,7 @@ func NewMux(merger Merger) *Mux {
// source will return the same channel. This allows change and state based sources
// to use the same channel. Different source names however will be treated as a
// union.
func (m *Mux) ChannelWithContext(ctx context.Context, source string) chan interface{} {
func (m *mux) ChannelWithContext(ctx context.Context, source string) chan interface{} {
if len(source) == 0 {
panic("Channel given an empty name")
}
Expand All @@ -80,63 +73,8 @@ func (m *Mux) ChannelWithContext(ctx context.Context, source string) chan interf
return newChannel
}

func (m *Mux) listen(source string, listenChannel <-chan interface{}) {
func (m *mux) listen(source string, listenChannel <-chan interface{}) {
for update := range listenChannel {
m.merger.Merge(source, update)
}
}

// Accessor is an interface for retrieving the current merge state.
type Accessor interface {
// MergedState returns a representation of the current merge state.
// Must be reentrant when more than one source is defined.
MergedState() interface{}
}

// AccessorFunc implements the Accessor interface.
type AccessorFunc func() interface{}

func (f AccessorFunc) MergedState() interface{} {
return f()
}

type Listener interface {
// OnUpdate is invoked when a change is made to an object.
OnUpdate(instance interface{})
}

// ListenerFunc receives a representation of the change or object.
type ListenerFunc func(instance interface{})

func (f ListenerFunc) OnUpdate(instance interface{}) {
f(instance)
}

type Broadcaster struct {
// Listeners for changes and their lock.
listenerLock sync.RWMutex
listeners []Listener
}

// NewBroadcaster registers a set of listeners that support the Listener interface
// and notifies them all on changes.
func NewBroadcaster() *Broadcaster {
return &Broadcaster{}
}

// Add registers listener to receive updates of changes.
func (b *Broadcaster) Add(listener Listener) {
b.listenerLock.Lock()
defer b.listenerLock.Unlock()
b.listeners = append(b.listeners, listener)
}

// Notify notifies all listeners.
func (b *Broadcaster) Notify(instance interface{}) {
b.listenerLock.RLock()
listeners := b.listeners
b.listenerLock.RUnlock()
for _, listener := range listeners {
listener.OnUpdate(instance)
}
}
48 changes: 7 additions & 41 deletions pkg/util/config/config_test.go → pkg/kubelet/config/mux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestConfigurationChannels(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

mux := NewMux(nil)
mux := newMux(nil)
channelOne := mux.ChannelWithContext(ctx, "one")
if channelOne != mux.ChannelWithContext(ctx, "one") {
t.Error("Didn't get the same muxuration channel back with the same name")
Expand Down Expand Up @@ -58,35 +58,23 @@ func TestMergeInvoked(t *testing.T) {
defer cancel()

merger := MergeMock{"one", "test", t}
mux := NewMux(&merger)
mux := newMux(&merger)
mux.ChannelWithContext(ctx, "one") <- "test"
}

func TestMergeFuncInvoked(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// mergeFunc implements the Merger interface
type mergeFunc func(source string, update interface{}) error

ch := make(chan bool)
mux := NewMux(MergeFunc(func(source string, update interface{}) error {
if source != "one" {
t.Errorf("Expected %s, Got %s", "one", source)
}
if update.(string) != "test" {
t.Errorf("Expected %s, Got %s", "test", update)
}
ch <- true
return nil
}))
mux.ChannelWithContext(ctx, "one") <- "test"
<-ch
func (f mergeFunc) Merge(source string, update interface{}) error {
return f(source, update)
}

func TestSimultaneousMerge(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

ch := make(chan bool, 2)
mux := NewMux(MergeFunc(func(source string, update interface{}) error {
mux := newMux(mergeFunc(func(source string, update interface{}) error {
switch source {
case "one":
if update.(string) != "test" {
Expand All @@ -109,25 +97,3 @@ func TestSimultaneousMerge(t *testing.T) {
<-ch
<-ch
}

func TestBroadcaster(t *testing.T) {
b := NewBroadcaster()
b.Notify(struct{}{})

ch := make(chan bool, 2)
b.Add(ListenerFunc(func(object interface{}) {
if object != "test" {
t.Errorf("Expected %s, Got %s", "test", object)
}
ch <- true
}))
b.Add(ListenerFunc(func(object interface{}) {
if object != "test" {
t.Errorf("Expected %s, Got %s", "test", object)
}
ch <- true
}))
b.Notify("test")
<-ch
<-ch
}
20 changes: 0 additions & 20 deletions pkg/util/config/doc.go

This file was deleted.

0 comments on commit 46c9bd1

Please sign in to comment.