Skip to content

Commit

Permalink
Merge pull request #28179 from deads2k/dedup-workqueue-requeue
Browse files Browse the repository at this point in the history
Automatic merge from submit-queue

dedup workqueue requeuing

Updates `workqueue.AddAfter` to only perform the add for the earliest requested requeue operation.  An earlier time inserts in the earlier slot and removes the old one.  A later time is ignored.

When using this conjunction with an `AddRateLimited` method, you get charged for the additional retry even though you're only queue once.  

This keeps requeues from multiplying for every add.

@liggitt
  • Loading branch information
k8s-merge-robot authored Jul 5, 2016
2 parents 61a92ef + 5659889 commit 65c29da
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 20 deletions.
53 changes: 45 additions & 8 deletions pkg/util/workqueue/delaying_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,12 @@ func NewDelayingQueue() DelayingInterface {

func newDelayingQueue(clock util.Clock) DelayingInterface {
ret := &delayingType{
Interface: New(),
clock: clock,
heartbeat: clock.Tick(maxWait),
stopCh: make(chan struct{}),
waitingForAddCh: make(chan waitFor, 1000),
Interface: New(),
clock: clock,
heartbeat: clock.Tick(maxWait),
stopCh: make(chan struct{}),
waitingTimeByEntry: map[t]time.Time{},
waitingForAddCh: make(chan waitFor, 1000),
}

go ret.waitingLoop()
Expand All @@ -66,6 +67,8 @@ type delayingType struct {

// waitingForAdd is an ordered slice of items to be added to the contained work queue
waitingForAdd []waitFor
// waitingTimeByEntry holds wait time by entry, so we can lookup pre-existing indexes
waitingTimeByEntry map[t]time.Time
// waitingForAddCh is a buffered channel that feeds waitingForAdd
waitingForAddCh chan waitFor
}
Expand Down Expand Up @@ -118,6 +121,7 @@ func (q *delayingType) waitingLoop() {
if q.Interface.ShuttingDown() {
// discard waiting entries
q.waitingForAdd = nil
q.waitingTimeByEntry = nil
return
}

Expand All @@ -130,6 +134,7 @@ func (q *delayingType) waitingLoop() {
break
}
q.Add(entry.data)
delete(q.waitingTimeByEntry, entry.data)
readyEntries++
}
q.waitingForAdd = q.waitingForAdd[readyEntries:]
Expand All @@ -152,7 +157,7 @@ func (q *delayingType) waitingLoop() {

case waitEntry := <-q.waitingForAddCh:
if waitEntry.readyAt.After(q.clock.Now()) {
q.waitingForAdd = insert(q.waitingForAdd, waitEntry)
q.waitingForAdd = insert(q.waitingForAdd, q.waitingTimeByEntry, waitEntry)
} else {
q.Add(waitEntry.data)
}
Expand All @@ -162,7 +167,7 @@ func (q *delayingType) waitingLoop() {
select {
case waitEntry := <-q.waitingForAddCh:
if waitEntry.readyAt.After(q.clock.Now()) {
q.waitingForAdd = insert(q.waitingForAdd, waitEntry)
q.waitingForAdd = insert(q.waitingForAdd, q.waitingTimeByEntry, waitEntry)
} else {
q.Add(waitEntry.data)
}
Expand All @@ -177,7 +182,20 @@ func (q *delayingType) waitingLoop() {
// inserts the given entry into the sorted entries list
// same semantics as append()... the given slice may be modified,
// and the returned value should be used
func insert(entries []waitFor, entry waitFor) []waitFor {
func insert(entries []waitFor, knownEntries map[t]time.Time, entry waitFor) []waitFor {
// if the entry is already in our retry list and the existing time is before the new one, just skip it
existingTime, exists := knownEntries[entry.data]
if exists && existingTime.Before(entry.readyAt) {
return entries
}

// if the entry exists and is scheduled for later, go ahead and remove the entry
if exists {
if existingIndex := findEntryIndex(entries, existingTime, entry.data); existingIndex >= 0 && existingIndex < len(entries) {
entries = append(entries[:existingIndex], entries[existingIndex+1:]...)
}
}

insertionIndex := sort.Search(len(entries), func(i int) bool {
return entry.readyAt.Before(entries[i].readyAt)
})
Expand All @@ -189,5 +207,24 @@ func insert(entries []waitFor, entry waitFor) []waitFor {
// insert the record
entries[insertionIndex] = entry

knownEntries[entry.data] = entry.readyAt

return entries
}

// findEntryIndex returns the index for an existing entry
func findEntryIndex(entries []waitFor, existingTime time.Time, data t) int {
index := sort.Search(len(entries), func(i int) bool {
return entries[i].readyAt.After(existingTime) || existingTime == entries[i].readyAt
})

// we know this is the earliest possible index, but there could be multiple with the same time
// iterate from here to find the dupe
for ; index < len(entries); index++ {
if entries[index].data == data {
break
}
}

return index
}
78 changes: 66 additions & 12 deletions pkg/util/workqueue/delaying_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestSimpleQueue(t *testing.T) {

fakeClock.Step(60 * time.Millisecond)

if err := waitForAdded(t, q, 1); err != nil {
if err := waitForAdded(q, 1); err != nil {
t.Errorf("should have added")
}
item, _ := q.Get()
Expand All @@ -68,6 +68,65 @@ func TestSimpleQueue(t *testing.T) {
}
}

func TestDeduping(t *testing.T) {
fakeClock := util.NewFakeClock(time.Now())
q := newDelayingQueue(fakeClock)

first := "foo"

q.AddAfter(first, 50*time.Millisecond)
if err := waitForWaitingQueueToFill(q); err != nil {
t.Fatalf("unexpected err: %v", err)
}
q.AddAfter(first, 70*time.Millisecond)
if err := waitForWaitingQueueToFill(q); err != nil {
t.Fatalf("unexpected err: %v", err)
}
if q.Len() != 0 {
t.Errorf("should not have added")
}

// step past the first block, we should receive now
fakeClock.Step(60 * time.Millisecond)
if err := waitForAdded(q, 1); err != nil {
t.Errorf("should have added")
}
item, _ := q.Get()
q.Done(item)

// step past the second add
fakeClock.Step(20 * time.Millisecond)
if q.Len() != 0 {
t.Errorf("should not have added")
}

// test again, but this time the earlier should override
q.AddAfter(first, 50*time.Millisecond)
q.AddAfter(first, 30*time.Millisecond)
if err := waitForWaitingQueueToFill(q); err != nil {
t.Fatalf("unexpected err: %v", err)
}
if q.Len() != 0 {
t.Errorf("should not have added")
}

fakeClock.Step(40 * time.Millisecond)
if err := waitForAdded(q, 1); err != nil {
t.Errorf("should have added")
}
item, _ = q.Get()
q.Done(item)

// step past the second add
fakeClock.Step(20 * time.Millisecond)
if q.Len() != 0 {
t.Errorf("should not have added")
}
if q.Len() != 0 {
t.Errorf("should not have added")
}
}

func TestAddTwoFireEarly(t *testing.T) {
fakeClock := util.NewFakeClock(time.Now())
q := newDelayingQueue(fakeClock)
Expand All @@ -88,7 +147,7 @@ func TestAddTwoFireEarly(t *testing.T) {

fakeClock.Step(60 * time.Millisecond)

if err := waitForAdded(t, q, 1); err != nil {
if err := waitForAdded(q, 1); err != nil {
t.Fatalf("unexpected err: %v", err)
}
item, _ := q.Get()
Expand All @@ -99,7 +158,7 @@ func TestAddTwoFireEarly(t *testing.T) {
q.AddAfter(third, 2*time.Second)

fakeClock.Step(1 * time.Second)
if err := waitForAdded(t, q, 1); err != nil {
if err := waitForAdded(q, 1); err != nil {
t.Fatalf("unexpected err: %v", err)
}
item, _ = q.Get()
Expand All @@ -108,7 +167,7 @@ func TestAddTwoFireEarly(t *testing.T) {
}

fakeClock.Step(2 * time.Second)
if err := waitForAdded(t, q, 1); err != nil {
if err := waitForAdded(q, 1); err != nil {
t.Fatalf("unexpected err: %v", err)
}
item, _ = q.Get()
Expand Down Expand Up @@ -139,7 +198,7 @@ func TestCopyShifting(t *testing.T) {

fakeClock.Step(2 * time.Second)

if err := waitForAdded(t, q, 3); err != nil {
if err := waitForAdded(q, 3); err != nil {
t.Fatalf("unexpected err: %v", err)
}
actualFirst, _ := q.Get()
Expand All @@ -156,19 +215,14 @@ func TestCopyShifting(t *testing.T) {
}
}

func waitForAdded(t *testing.T, q DelayingInterface, depth int) error {
err := wait.Poll(1*time.Millisecond, 20*time.Second, func() (done bool, err error) {
func waitForAdded(q DelayingInterface, depth int) error {
return wait.Poll(1*time.Millisecond, 10*time.Second, func() (done bool, err error) {
if q.Len() == depth {
return true, nil
}

return false, nil
})

if err != nil {
t.Logf("failed: len=%v, everything=%#v", q.Len(), q)
}
return err
}

func waitForWaitingQueueToFill(q DelayingInterface) error {
Expand Down

0 comments on commit 65c29da

Please sign in to comment.