Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a heap to client-go. Heap orders items with heap invariant ordering. #49752

Merged
merged 2 commits into from
Aug 11, 2017

Conversation

bsalamat
Copy link
Member

What this PR does / why we need it:
Heap is useful in implementing priority queues. Some components may need such ordering to process their highest priority objects first. Scheduler is going to be the first user of the heap. It will store pending pods ordered by their priority, so that the highest priority pods are popped first to be scheduled.

Which issue this PR fixes (optional, in fixes #<issue number>(, fixes #<issue_number>, ...) format, will close that issue when PR gets merged): fixes #

Special notes for your reviewer:

Release note:

NONE

ref/ #47604
ref/ #48646

@kubernetes/api-reviewers @kubernetes/sig-scheduling-pr-reviews @davidopp
/assign @caesarxuchao

@k8s-ci-robot k8s-ci-robot added sig/scheduling Categorizes an issue or PR as relevant to SIG Scheduling. cncf-cla: yes Indicates the PR's author has signed the CNCF CLA. labels Jul 27, 2017
@k8s-github-robot k8s-github-robot added size/XL Denotes a PR that changes 500-999 lines, ignoring generated files. release-note-none Denotes a PR that doesn't merit a release note. labels Jul 27, 2017
_ = heap.Interface(&HeapData{}) // HeapData is a standard heap
)

// LessFunc compares two objects and returns true if they first one should go
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/LessFunc/Less

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

return obj
}

// --------------------------------------------------------------------
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to add this line ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


// deleteItem removes the item from Heap.items and its key from Heap.queue.
func (h *Heap) deleteItem(key string) {
item := h.data.items[key]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to change to

item, ok := h.data.items[key]

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}

// Pop waits until an item is ready. If multiple items are
// ready, they are returned in the order given by Heap.lessFunc.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/Heap.lessFunc/Heap.data.lessFunc

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@bsalamat
Copy link
Member Author

Thanks, @dixudx for your review.

Copy link
Member

@caesarxuchao caesarxuchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't finished the review yet, so just some preliminary comments.


// addIfNotPresent assumes the lock is already held and adds the the provided
// item to the queue if it does not already exist.
func (h *Heap) addIfNotPresent(key string, obj interface{}) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addIfNotPresentLocked

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


// Indication the queue is closed.
// Used to indicate a queue is closed so a control loop can exit when a queue is empty.
// Currently, not used to gate any of CRED operations.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CRUD?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess! Was copied from fifo.go and was not carrying much info anyways. I deleted the line.

h.lock.Lock()
defer h.lock.Unlock()
if _, exists := h.data.items[key]; exists {
h.deleteItem(key)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not change the item and call Fix()? It's less expensive.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Done.

@bsalamat bsalamat force-pushed the priority_scheduler branch 3 times, most recently from f3f9d2d to 5042dfe Compare July 28, 2017 23:43
@bsalamat
Copy link
Member Author

bsalamat commented Aug 1, 2017

@caesarxuchao Any more comments?

index int // The index of the object's key in the Heap.queue.
}

// HeapData is an internal struct that implements the standard heap interface
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why make internal struct public? heapData?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. Done.

@bsalamat bsalamat force-pushed the priority_scheduler branch from 5042dfe to b9051aa Compare August 1, 2017 20:11
}

// Push is supposed to be called by heap.Push only.
func (h *heapData) Push(key interface{}) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the implementation here confuses me at the first look. it does not take care of the item map. not sure why leave that out.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Heap is the type that manipulates heapData and is in charge of adding/removing elements to all the relevant arrays and maps. heapData implements the standard golang heap interface and the queue is the array that stores elements with the heap ordering. As a result, the heapData.Push and heapData.Pop only add and remove items from the queue. Removal of items from the items map is done by Heap.
In theory, Heap could implement standard golang heap, but it would require Heap to implement Push and Pop of the standard heap interface. In that case, those functions would be public. In order to hide those functions, I have added heapData.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and is in charge of adding/removing elements to all the relevant arrays and maps.

isnt the backing array is managed inside heapdata, and manipulated by the go's heap pkg? it is confusing to manage the "pointer" in heapdata, and store the actual item in heapdata, but manage the actual item outside heapdata.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your point is valid. I made changes to Push and Pop of heapData to perform heapData.items manipulations.

closedLock sync.Mutex
}

// Close the Heap.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does close a heap mean?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pop blocks until there is an element in the queue or the heap is closed. Closing the heap signals the condition variable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably add that to the comment. it was not clear why a heap needs to be closed.

// When Close() is called, the h.closed is set and the condition is broadcast.
// Which causes this loop to continue and return from the Pop().
if h.IsClosed() {
return nil, fmt.Errorf("Heap is closed")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

heap is closed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -0,0 +1,325 @@
/*
Copyright 2014 The Kubernetes Authors.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2017

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@bsalamat bsalamat force-pushed the priority_scheduler branch 4 times, most recently from e75545b to 2d22d91 Compare August 2, 2017 19:56
@xiang90
Copy link
Contributor

xiang90 commented Aug 2, 2017

@bsalamat

LGTM. Thanks! You need to ping the owners to review it again and get approved though :P

@bsalamat
Copy link
Member Author

bsalamat commented Aug 2, 2017

Thanks, @xiang90! I have already spoken with Chao. He is very busy, but he will take a look soon.

Copy link
Member

@caesarxuchao caesarxuchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a few comments on the implementation. I'll review the tests later.

// AddIfNotPresent inserts an item, and puts it in the queue. If an item with
// the key is present in the map, no changes is made to the item.
//
// This is useful in a single producer/consumer scenario so that the consumer can
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean the consumer's requeue function should call AddIfNotPresent? Why it only useful in a single producer/consumer scenario? Isn't it useful in general?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a comment copied from fifo.go. The comment doesn't say it is only useful in a single producer/consumer scenario, but if there are multiple consumers, the main benefit of it, which is not contending with the producer, goes away. Consider this scenario:

  1. Producer adds item: key: test, value: 1
  2. Consumer 1 takes the item with value 1 and processes it.
  3. Producer adds item: key: test, value: 2
  4. Consumer 2 takes the item with value 2 and processes it.
  5. Consumer 1 faces an error, requeues the stale item with value 1.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation.

func (h *Heap) deleteItem(key string) {
item, ok := h.data.items[key]
if ok {
heap.Remove(h.data, item.index)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not inline deleteItem in Delete and save one existence check?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Done

if h.IsClosed() {
return nil, fmt.Errorf("heap is closed")
}
h.cond.Wait()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Close() needs to be protected by h.lock. Otherwise if Close() is called between line 248 and 249, Wait() will sleep forever.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And if Close() has to be proteced by h.lock anyway, then we can remove the h.closeLock.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a very good point. Looks like fifo.go has the same problem. I used fifo.go as a template to implement heap.go.
I think Close() should hold both closedLock and lock. IsClosed() holds closedLock() only. It cannot hold the same lock as it is called inside Pop and after locking lock.

)

type LessFunc func(interface{}, interface{}) bool
type HeapItem struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make it private?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

old := h.queue
n := len(old)
key := old[n-1]
h.queue = old[0 : n-1]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not

key := h.queue(len(h.queue)-1)
h.queue = h.queue[:len(h.queue)-1]

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

// This is an error
return nil
}
obj := item.obj
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this extra copy needed? Can we just return item.obj?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

func (h *Heap) Pop() (interface{}, error) {
h.lock.Lock()
defer h.lock.Unlock()
for {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this loop?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

h.lock.RLock()
defer h.lock.RUnlock()
item, exists := h.data.items[key]
var obj interface{}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not

if !exists {
  return nil, false, nil
}
return item.obj, true, nil

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Member

@caesarxuchao caesarxuchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left a few more comments. Thanks @bsalamat.

}

// TestHeap_Delete tests Heap.Delete and ensures that heap invariant is
// preserved after adding items.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:s/adding/deleting

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


// Indication the queue is closed.
// Used to indicate a queue is closed so a control loop can exit when a queue is empty.
closed bool
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we prevent adding more elements to the heap after it's closed? If you agree that's the expected behavior, then consider adding some tests.

Copy link
Member Author

@bsalamat bsalamat Aug 5, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. This is another example of following whatever fifo.go did. FIFO does not prevent addition and I didn't either, but I just added the checks and a test.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only downside of this check is that IsClosed holds a lock to check whether the heap is closed. Doing so before every addition to the heap hurts performance. That may be the reason fifo.go doesn't check it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the code to directly check closed instead of calling IsClosed in order to avoid the extra lock/unlock.

t.Errorf("unexpected item in the list: %v", key)
}
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a test Pop() of an empty heap properly returns after Close() is called?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@bsalamat bsalamat force-pushed the priority_scheduler branch from 2d22d91 to bed1e5f Compare August 5, 2017 01:05
Copy link
Member Author

@bsalamat bsalamat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @caesarxuchao! PTAL.

)

type LessFunc func(interface{}, interface{}) bool
type HeapItem struct {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

old := h.queue
n := len(old)
key := old[n-1]
h.queue = old[0 : n-1]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

// This is an error
return nil
}
obj := item.obj
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


// Indication the queue is closed.
// Used to indicate a queue is closed so a control loop can exit when a queue is empty.
closed bool
Copy link
Member Author

@bsalamat bsalamat Aug 5, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. This is another example of following whatever fifo.go did. FIFO does not prevent addition and I didn't either, but I just added the checks and a test.

// AddIfNotPresent inserts an item, and puts it in the queue. If an item with
// the key is present in the map, no changes is made to the item.
//
// This is useful in a single producer/consumer scenario so that the consumer can
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a comment copied from fifo.go. The comment doesn't say it is only useful in a single producer/consumer scenario, but if there are multiple consumers, the main benefit of it, which is not contending with the producer, goes away. Consider this scenario:

  1. Producer adds item: key: test, value: 1
  2. Consumer 1 takes the item with value 1 and processes it.
  3. Producer adds item: key: test, value: 2
  4. Consumer 2 takes the item with value 2 and processes it.
  5. Consumer 1 faces an error, requeues the stale item with value 1.

func (h *Heap) deleteItem(key string) {
item, ok := h.data.items[key]
if ok {
heap.Remove(h.data, item.index)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Done

func (h *Heap) Pop() (interface{}, error) {
h.lock.Lock()
defer h.lock.Unlock()
for {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

h.lock.RLock()
defer h.lock.RUnlock()
item, exists := h.data.items[key]
var obj interface{}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}

// TestHeap_Delete tests Heap.Delete and ensures that heap invariant is
// preserved after adding items.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

t.Errorf("unexpected item in the list: %v", key)
}
}
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@bsalamat bsalamat force-pushed the priority_scheduler branch 2 times, most recently from a21b514 to 2c3b39e Compare August 9, 2017 06:52
@bsalamat
Copy link
Member Author

bsalamat commented Aug 9, 2017

ping @caesarxuchao

// AddIfNotPresent inserts an item, and puts it in the queue. If an item with
// the key is present in the map, no changes is made to the item.
//
// This is useful in a single producer/consumer scenario so that the consumer can
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation.

// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
// When Close() is called, the h.closed is set and the condition is broadcast.
// Which causes this loop to continue and return from the Pop().
if h.IsClosed() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you directly check h.closed here? Then IsClosed() can hold the lock, and closedLock can be removed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise lgtm. Thanks @bsalamat.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Thanks! PTAL.

@bsalamat bsalamat force-pushed the priority_scheduler branch from 2c3b39e to 897f5b4 Compare August 10, 2017 17:40
@caesarxuchao
Copy link
Member

/lgtm

@k8s-ci-robot k8s-ci-robot added the lgtm "Looks good to me", indicates that a PR is ready to be merged. label Aug 10, 2017
@k8s-github-robot
Copy link

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: bsalamat, caesarxuchao

Associated issue: 47604

The full list of commands accepted by this bot can be found here.

Needs approval from an approver in each of these OWNERS Files:

You can indicate your approval by writing /approve in a comment
You can cancel your approval by writing /approve cancel in a comment

@k8s-github-robot k8s-github-robot added the approved Indicates a PR has been approved by an approver from all required OWNERS files. label Aug 10, 2017
@caesarxuchao
Copy link
Member

@bsalamat please squash as you see proper.

@bsalamat bsalamat force-pushed the priority_scheduler branch from 897f5b4 to 68926a2 Compare August 10, 2017 18:15
@k8s-github-robot k8s-github-robot removed the lgtm "Looks good to me", indicates that a PR is ready to be merged. label Aug 10, 2017
@bsalamat
Copy link
Member Author

Thanks, @caesarxuchao! Done.

@caesarxuchao caesarxuchao added the lgtm "Looks good to me", indicates that a PR is ready to be merged. label Aug 10, 2017
@bsalamat
Copy link
Member Author

/retest

@k8s-github-robot
Copy link

/test all [submit-queue is verifying that this PR is safe to merge]

@k8s-github-robot
Copy link

Automatic merge from submit-queue

@k8s-github-robot k8s-github-robot merged commit 868fef1 into kubernetes:master Aug 11, 2017
@bsalamat bsalamat deleted the priority_scheduler branch August 11, 2017 17:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
approved Indicates a PR has been approved by an approver from all required OWNERS files. cncf-cla: yes Indicates the PR's author has signed the CNCF CLA. lgtm "Looks good to me", indicates that a PR is ready to be merged. release-note-none Denotes a PR that doesn't merit a release note. sig/scheduling Categorizes an issue or PR as relevant to SIG Scheduling. size/XL Denotes a PR that changes 500-999 lines, ignoring generated files.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants