Skip to content

Commit

Permalink
Merge pull request kubernetes#20269 from mqliang/sync-delta-fifo
Browse files Browse the repository at this point in the history
add a HasSynced() for DeltaFIFO and FIFO, method, which is very helpful for Informer
  • Loading branch information
lavalamp committed Feb 5, 2016
2 parents 5fe99f4 + 8e615df commit 9b68e8e
Show file tree
Hide file tree
Showing 5 changed files with 170 additions and 6 deletions.
27 changes: 27 additions & 0 deletions pkg/client/cache/delta_fifo.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ type DeltaFIFO struct {
items map[string]Deltas
queue []string

// populated is true if the first batch of items inserted by Replace() has been populated
// or Delete/Add/Update was called first.
populated bool
// initialPopulationCount is the number of items inserted by the first call of Replace()
initialPopulationCount int

// keyFunc is used to make the key used for queued item
// insertion and retrieval, and should be deterministic.
keyFunc KeyFunc
Expand Down Expand Up @@ -141,18 +147,28 @@ func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) {
return f.keyFunc(obj)
}

// Return true if an Add/Update/Delete/AddIfNotPresent are called first,
// or an Update called first but the first batch of items inserted by Replace() has been popped
func (f *DeltaFIFO) HasSynced() bool {
f.lock.Lock()
defer f.lock.Unlock()
return f.populated && f.initialPopulationCount == 0
}

// Add inserts an item, and puts it in the queue. The item is only enqueued
// if it doesn't already exist in the set.
func (f *DeltaFIFO) Add(obj interface{}) error {
f.lock.Lock()
defer f.lock.Unlock()
f.populated = true
return f.queueActionLocked(Added, obj)
}

// Update is just like Add, but makes an Updated Delta.
func (f *DeltaFIFO) Update(obj interface{}) error {
f.lock.Lock()
defer f.lock.Unlock()
f.populated = true
return f.queueActionLocked(Updated, obj)
}

Expand All @@ -166,6 +182,7 @@ func (f *DeltaFIFO) Delete(obj interface{}) error {
}
f.lock.Lock()
defer f.lock.Unlock()
f.populated = true
if f.knownObjects == nil {
if _, exists := f.items[id]; !exists {
// Presumably, this was deleted when a relist happened.
Expand Down Expand Up @@ -203,6 +220,7 @@ func (f *DeltaFIFO) AddIfNotPresent(obj interface{}) error {
}
f.lock.Lock()
defer f.lock.Unlock()
f.populated = true
if _, exists := f.items[id]; exists {
return nil
}
Expand Down Expand Up @@ -354,6 +372,9 @@ func (f *DeltaFIFO) Pop() interface{} {
id := f.queue[0]
f.queue = f.queue[1:]
item, ok := f.items[id]
if f.initialPopulationCount > 0 {
f.initialPopulationCount--
}
if !ok {
// Item may have been deleted subsequently.
continue
Expand All @@ -373,6 +394,12 @@ func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
f.lock.Lock()
defer f.lock.Unlock()
keys := make(sets.String, len(list))

if !f.populated {
f.populated = true
f.initialPopulationCount = len(list)
}

for _, item := range list {
key, err := f.KeyOf(item)
if err != nil {
Expand Down
56 changes: 56 additions & 0 deletions pkg/client/cache/delta_fifo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,3 +327,59 @@ func TestDeltaFIFO_KeyOf(t *testing.T) {
}
}
}

func TestDeltaFIFO_HasSynced(t *testing.T) {
tests := []struct {
actions []func(f *DeltaFIFO)
expectedSynced bool
}{
{
actions: []func(f *DeltaFIFO){},
expectedSynced: false,
},
{
actions: []func(f *DeltaFIFO){
func(f *DeltaFIFO) { f.Add(mkFifoObj("a", 1)) },
},
expectedSynced: true,
},
{
actions: []func(f *DeltaFIFO){
func(f *DeltaFIFO) { f.Replace([]interface{}{}, "0") },
},
expectedSynced: true,
},
{
actions: []func(f *DeltaFIFO){
func(f *DeltaFIFO) { f.Replace([]interface{}{mkFifoObj("a", 1), mkFifoObj("b", 2)}, "0") },
},
expectedSynced: false,
},
{
actions: []func(f *DeltaFIFO){
func(f *DeltaFIFO) { f.Replace([]interface{}{mkFifoObj("a", 1), mkFifoObj("b", 2)}, "0") },
func(f *DeltaFIFO) { f.Pop() },
},
expectedSynced: false,
},
{
actions: []func(f *DeltaFIFO){
func(f *DeltaFIFO) { f.Replace([]interface{}{mkFifoObj("a", 1), mkFifoObj("b", 2)}, "0") },
func(f *DeltaFIFO) { f.Pop() },
func(f *DeltaFIFO) { f.Pop() },
},
expectedSynced: true,
},
}

for i, test := range tests {
f := NewDeltaFIFO(testFifoObjectKeyFunc, nil, nil)

for _, action := range test.actions {
action(f)
}
if e, a := test.expectedSynced, f.HasSynced(); a != e {
t.Errorf("test case %v failed, expected: %v , got %v", i, e, a)
}
}
}
30 changes: 30 additions & 0 deletions pkg/client/cache/fifo.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ type Queue interface {
// as nothing else (presumably more recent)
// has since been added.
AddIfNotPresent(interface{}) error

// Return true if the first batch of items has been popped
HasSynced() bool
}

// FIFO receives adds and updates from a Reflector, and puts them in a queue for
Expand All @@ -52,6 +55,13 @@ type FIFO struct {
// We depend on the property that items in the set are in the queue and vice versa.
items map[string]interface{}
queue []string

// populated is true if the first batch of items inserted by Replace() has been populated
// or Delete/Add/Update was called first.
populated bool
// initialPopulationCount is the number of items inserted by the first call of Replace()
initialPopulationCount int

// keyFunc is used to make the key used for queued item insertion and retrieval, and
// should be deterministic.
keyFunc KeyFunc
Expand All @@ -61,6 +71,14 @@ var (
_ = Queue(&FIFO{}) // FIFO is a Queue
)

// Return true if an Add/Update/Delete/AddIfNotPresent are called first,
// or an Update called first but the first batch of items inserted by Replace() has been popped
func (f *FIFO) HasSynced() bool {
f.lock.Lock()
defer f.lock.Unlock()
return f.populated && f.initialPopulationCount == 0
}

// Add inserts an item, and puts it in the queue. The item is only enqueued
// if it doesn't already exist in the set.
func (f *FIFO) Add(obj interface{}) error {
Expand All @@ -70,6 +88,7 @@ func (f *FIFO) Add(obj interface{}) error {
}
f.lock.Lock()
defer f.lock.Unlock()
f.populated = true
if _, exists := f.items[id]; !exists {
f.queue = append(f.queue, id)
}
Expand All @@ -91,6 +110,7 @@ func (f *FIFO) AddIfNotPresent(obj interface{}) error {
}
f.lock.Lock()
defer f.lock.Unlock()
f.populated = true
if _, exists := f.items[id]; exists {
return nil
}
Expand All @@ -116,6 +136,7 @@ func (f *FIFO) Delete(obj interface{}) error {
}
f.lock.Lock()
defer f.lock.Unlock()
f.populated = true
delete(f.items, id)
return err
}
Expand Down Expand Up @@ -174,6 +195,9 @@ func (f *FIFO) Pop() interface{} {
}
id := f.queue[0]
f.queue = f.queue[1:]
if f.initialPopulationCount > 0 {
f.initialPopulationCount--
}
item, ok := f.items[id]
if !ok {
// Item may have been deleted subsequently.
Expand All @@ -200,6 +224,12 @@ func (f *FIFO) Replace(list []interface{}, resourceVersion string) error {

f.lock.Lock()
defer f.lock.Unlock()

if !f.populated {
f.populated = true
f.initialPopulationCount = len(items)
}

f.items = items
f.queue = f.queue[:0]
for id := range items {
Expand Down
56 changes: 56 additions & 0 deletions pkg/client/cache/fifo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,3 +177,59 @@ func TestFIFO_addIfNotPresent(t *testing.T) {
}
}
}

func TestFIFO_HasSynced(t *testing.T) {
tests := []struct {
actions []func(f *FIFO)
expectedSynced bool
}{
{
actions: []func(f *FIFO){},
expectedSynced: false,
},
{
actions: []func(f *FIFO){
func(f *FIFO) { f.Add(mkFifoObj("a", 1)) },
},
expectedSynced: true,
},
{
actions: []func(f *FIFO){
func(f *FIFO) { f.Replace([]interface{}{}, "0") },
},
expectedSynced: true,
},
{
actions: []func(f *FIFO){
func(f *FIFO) { f.Replace([]interface{}{mkFifoObj("a", 1), mkFifoObj("b", 2)}, "0") },
},
expectedSynced: false,
},
{
actions: []func(f *FIFO){
func(f *FIFO) { f.Replace([]interface{}{mkFifoObj("a", 1), mkFifoObj("b", 2)}, "0") },
func(f *FIFO) { f.Pop() },
},
expectedSynced: false,
},
{
actions: []func(f *FIFO){
func(f *FIFO) { f.Replace([]interface{}{mkFifoObj("a", 1), mkFifoObj("b", 2)}, "0") },
func(f *FIFO) { f.Pop() },
func(f *FIFO) { f.Pop() },
},
expectedSynced: true,
},
}

for i, test := range tests {
f := NewFIFO(testFifoObjectKeyFunc)

for _, action := range test.actions {
action(f)
}
if e, a := test.expectedSynced, f.HasSynced(); a != e {
t.Errorf("test case %v failed, expected: %v , got %v", i, e, a)
}
}
}
7 changes: 1 addition & 6 deletions pkg/controller/framework/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,7 @@ func (c *Controller) Run(stopCh <-chan struct{}) {

// Returns true once this controller has completed an initial resource listing
func (c *Controller) HasSynced() bool {
c.reflectorMutex.RLock()
defer c.reflectorMutex.RUnlock()
if c.reflector == nil {
return false
}
return c.reflector.LastSyncResourceVersion() != ""
return c.config.Queue.HasSynced()
}

// Requeue adds the provided object back into the queue if it does not already exist.
Expand Down

0 comments on commit 9b68e8e

Please sign in to comment.