-
Notifications
You must be signed in to change notification settings - Fork 40.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add a heap to client-go. Heap orders items with heap invariant ordering. #49752
Add a heap to client-go. Heap orders items with heap invariant ordering. #49752
Conversation
_ = heap.Interface(&HeapData{}) // HeapData is a standard heap | ||
) | ||
|
||
// LessFunc compares two objects and returns true if they first one should go |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/LessFunc/Less
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
return obj | ||
} | ||
|
||
// -------------------------------------------------------------------- |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to add this line ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
||
// deleteItem removes the item from Heap.items and its key from Heap.queue. | ||
func (h *Heap) deleteItem(key string) { | ||
item := h.data.items[key] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better to change to
item, ok := h.data.items[key]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
} | ||
|
||
// Pop waits until an item is ready. If multiple items are | ||
// ready, they are returned in the order given by Heap.lessFunc. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/Heap.lessFunc
/Heap.data.lessFunc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
Thanks, @dixudx for your review. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't finished the review yet, so just some preliminary comments.
|
||
// addIfNotPresent assumes the lock is already held and adds the the provided | ||
// item to the queue if it does not already exist. | ||
func (h *Heap) addIfNotPresent(key string, obj interface{}) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
addIfNotPresentLocked
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
|
||
// Indication the queue is closed. | ||
// Used to indicate a queue is closed so a control loop can exit when a queue is empty. | ||
// Currently, not used to gate any of CRED operations. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CRUD?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess! Was copied from fifo.go and was not carrying much info anyways. I deleted the line.
h.lock.Lock() | ||
defer h.lock.Unlock() | ||
if _, exists := h.data.items[key]; exists { | ||
h.deleteItem(key) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not change the item and call Fix()? It's less expensive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. Done.
f3f9d2d
to
5042dfe
Compare
@caesarxuchao Any more comments? |
index int // The index of the object's key in the Heap.queue. | ||
} | ||
|
||
// HeapData is an internal struct that implements the standard heap interface |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why make internal struct public? heapData?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. Done.
5042dfe
to
b9051aa
Compare
} | ||
|
||
// Push is supposed to be called by heap.Push only. | ||
func (h *heapData) Push(key interface{}) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the implementation here confuses me at the first look. it does not take care of the item map. not sure why leave that out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Heap
is the type that manipulates heapData
and is in charge of adding/removing elements to all the relevant arrays and maps. heapData
implements the standard golang heap interface and the queue
is the array that stores elements with the heap ordering. As a result, the heapData.Push and heapData.Pop only add and remove items from the queue
. Removal of items from the items
map is done by Heap
.
In theory, Heap
could implement standard golang heap, but it would require Heap
to implement Push
and Pop
of the standard heap interface. In that case, those functions would be public. In order to hide those functions, I have added heapData
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and is in charge of adding/removing elements to all the relevant arrays and maps.
isnt the backing array is managed inside heapdata, and manipulated by the go's heap pkg? it is confusing to manage the "pointer" in heapdata, and store the actual item in heapdata, but manage the actual item outside heapdata.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Your point is valid. I made changes to Push and Pop of heapData to perform heapData.items manipulations.
closedLock sync.Mutex | ||
} | ||
|
||
// Close the Heap. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does close a heap mean?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pop blocks until there is an element in the queue or the heap is closed. Closing the heap signals the condition variable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably add that to the comment. it was not clear why a heap needs to be closed.
// When Close() is called, the h.closed is set and the condition is broadcast. | ||
// Which causes this loop to continue and return from the Pop(). | ||
if h.IsClosed() { | ||
return nil, fmt.Errorf("Heap is closed") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
heap is closed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -0,0 +1,325 @@ | |||
/* | |||
Copyright 2014 The Kubernetes Authors. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2017
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
e75545b
to
2d22d91
Compare
LGTM. Thanks! You need to ping the owners to review it again and get approved though :P |
Thanks, @xiang90! I have already spoken with Chao. He is very busy, but he will take a look soon. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a few comments on the implementation. I'll review the tests later.
// AddIfNotPresent inserts an item, and puts it in the queue. If an item with | ||
// the key is present in the map, no changes is made to the item. | ||
// | ||
// This is useful in a single producer/consumer scenario so that the consumer can |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean the consumer's requeue function should call AddIfNotPresent? Why it only useful in a single producer/consumer scenario? Isn't it useful in general?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a comment copied from fifo.go. The comment doesn't say it is only useful in a single producer/consumer scenario, but if there are multiple consumers, the main benefit of it, which is not contending with the producer, goes away. Consider this scenario:
- Producer adds item: key: test, value: 1
- Consumer 1 takes the item with value 1 and processes it.
- Producer adds item: key: test, value: 2
- Consumer 2 takes the item with value 2 and processes it.
- Consumer 1 faces an error, requeues the stale item with value 1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the explanation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the explanation.
func (h *Heap) deleteItem(key string) { | ||
item, ok := h.data.items[key] | ||
if ok { | ||
heap.Remove(h.data, item.index) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not inline deleteItem
in Delete
and save one existence check?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Done
if h.IsClosed() { | ||
return nil, fmt.Errorf("heap is closed") | ||
} | ||
h.cond.Wait() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Close() needs to be protected by h.lock. Otherwise if Close()
is called between line 248 and 249, Wait()
will sleep forever.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And if Close() has to be proteced by h.lock anyway, then we can remove the h.closeLock.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a very good point. Looks like fifo.go has the same problem. I used fifo.go as a template to implement heap.go.
I think Close() should hold both closedLock
and lock
. IsClosed() holds closedLock()
only. It cannot hold the same lock
as it is called inside Pop and after locking lock
.
) | ||
|
||
type LessFunc func(interface{}, interface{}) bool | ||
type HeapItem struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make it private?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
old := h.queue | ||
n := len(old) | ||
key := old[n-1] | ||
h.queue = old[0 : n-1] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not
key := h.queue(len(h.queue)-1)
h.queue = h.queue[:len(h.queue)-1]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
// This is an error | ||
return nil | ||
} | ||
obj := item.obj |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this extra copy needed? Can we just return item.obj?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
func (h *Heap) Pop() (interface{}, error) { | ||
h.lock.Lock() | ||
defer h.lock.Unlock() | ||
for { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove this loop?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
h.lock.RLock() | ||
defer h.lock.RUnlock() | ||
item, exists := h.data.items[key] | ||
var obj interface{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not
if !exists {
return nil, false, nil
}
return item.obj, true, nil
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left a few more comments. Thanks @bsalamat.
} | ||
|
||
// TestHeap_Delete tests Heap.Delete and ensures that heap invariant is | ||
// preserved after adding items. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:s/adding/deleting
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
||
// Indication the queue is closed. | ||
// Used to indicate a queue is closed so a control loop can exit when a queue is empty. | ||
closed bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we prevent adding more elements to the heap after it's closed? If you agree that's the expected behavior, then consider adding some tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. This is another example of following whatever fifo.go did. FIFO does not prevent addition and I didn't either, but I just added the checks and a test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The only downside of this check is that IsClosed
holds a lock to check whether the heap is closed. Doing so before every addition to the heap hurts performance. That may be the reason fifo.go doesn't check it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed the code to directly check closed
instead of calling IsClosed
in order to avoid the extra lock/unlock.
t.Errorf("unexpected item in the list: %v", key) | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add a test Pop() of an empty heap properly returns after Close() is called?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
2d22d91
to
bed1e5f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, @caesarxuchao! PTAL.
) | ||
|
||
type LessFunc func(interface{}, interface{}) bool | ||
type HeapItem struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
old := h.queue | ||
n := len(old) | ||
key := old[n-1] | ||
h.queue = old[0 : n-1] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
// This is an error | ||
return nil | ||
} | ||
obj := item.obj |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
||
// Indication the queue is closed. | ||
// Used to indicate a queue is closed so a control loop can exit when a queue is empty. | ||
closed bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. This is another example of following whatever fifo.go did. FIFO does not prevent addition and I didn't either, but I just added the checks and a test.
// AddIfNotPresent inserts an item, and puts it in the queue. If an item with | ||
// the key is present in the map, no changes is made to the item. | ||
// | ||
// This is useful in a single producer/consumer scenario so that the consumer can |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a comment copied from fifo.go. The comment doesn't say it is only useful in a single producer/consumer scenario, but if there are multiple consumers, the main benefit of it, which is not contending with the producer, goes away. Consider this scenario:
- Producer adds item: key: test, value: 1
- Consumer 1 takes the item with value 1 and processes it.
- Producer adds item: key: test, value: 2
- Consumer 2 takes the item with value 2 and processes it.
- Consumer 1 faces an error, requeues the stale item with value 1.
func (h *Heap) deleteItem(key string) { | ||
item, ok := h.data.items[key] | ||
if ok { | ||
heap.Remove(h.data, item.index) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Done
func (h *Heap) Pop() (interface{}, error) { | ||
h.lock.Lock() | ||
defer h.lock.Unlock() | ||
for { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
h.lock.RLock() | ||
defer h.lock.RUnlock() | ||
item, exists := h.data.items[key] | ||
var obj interface{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
} | ||
|
||
// TestHeap_Delete tests Heap.Delete and ensures that heap invariant is | ||
// preserved after adding items. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
t.Errorf("unexpected item in the list: %v", key) | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
a21b514
to
2c3b39e
Compare
ping @caesarxuchao |
// AddIfNotPresent inserts an item, and puts it in the queue. If an item with | ||
// the key is present in the map, no changes is made to the item. | ||
// | ||
// This is useful in a single producer/consumer scenario so that the consumer can |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the explanation.
// When the queue is empty, invocation of Pop() is blocked until new item is enqueued. | ||
// When Close() is called, the h.closed is set and the condition is broadcast. | ||
// Which causes this loop to continue and return from the Pop(). | ||
if h.IsClosed() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you directly check h.closed
here? Then IsClosed()
can hold the lock
, and closedLock
can be removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Otherwise lgtm. Thanks @bsalamat.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Thanks! PTAL.
2c3b39e
to
897f5b4
Compare
/lgtm |
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: bsalamat, caesarxuchao Associated issue: 47604 The full list of commands accepted by this bot can be found here.
Needs approval from an approver in each of these OWNERS Files:
You can indicate your approval by writing |
@bsalamat please squash as you see proper. |
897f5b4
to
68926a2
Compare
Thanks, @caesarxuchao! Done. |
/retest |
/test all [submit-queue is verifying that this PR is safe to merge] |
Automatic merge from submit-queue |
What this PR does / why we need it:
Heap is useful in implementing priority queues. Some components may need such ordering to process their highest priority objects first. Scheduler is going to be the first user of the heap. It will store pending pods ordered by their priority, so that the highest priority pods are popped first to be scheduled.
Which issue this PR fixes (optional, in
fixes #<issue number>(, fixes #<issue_number>, ...)
format, will close that issue when PR gets merged): fixes #Special notes for your reviewer:
Release note:
ref/ #47604
ref/ #48646
@kubernetes/api-reviewers @kubernetes/sig-scheduling-pr-reviews @davidopp
/assign @caesarxuchao