Skip to content

Commit

Permalink
fix: ensure namespace parallelism and parallelism work together. Fixes
Browse files Browse the repository at this point in the history
…#10985 (#14039)

Signed-off-by: isubasinghe <isitha@pipekit.io>
  • Loading branch information
isubasinghe authored Jan 9, 2025
1 parent e088cfc commit 09d5ee7
Show file tree
Hide file tree
Showing 9 changed files with 451 additions and 434 deletions.
2 changes: 2 additions & 0 deletions docs/parallelism.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ data:
namespaceParallelism: "4"
```
When namespace parallelism is enabled, it is plausible for a workflow with a lower priority to be run first if a namespace is at its namespace parallelism limits.
!!! Note
Workflows that are executing but restricted from running more nodes due to other mechanisms will still count toward parallelism limits.
Expand Down
8 changes: 3 additions & 5 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,11 +242,8 @@ func NewWorkflowController(ctx context.Context, restConfig *rest.Config, kubecli
}

func (wfc *WorkflowController) newThrottler() sync.Throttler {
f := func(key string) { wfc.wfQueue.AddRateLimited(key) }
return sync.ChainThrottler{
sync.NewThrottler(wfc.Config.Parallelism, sync.SingleBucket, f),
sync.NewThrottler(wfc.Config.NamespaceParallelism, sync.NamespaceBucket, f),
}
f := func(key string) { wfc.wfQueue.Add(key) }
return sync.NewMultiThrottler(wfc.Config.Parallelism, wfc.Config.NamespaceParallelism, f)
}

// runGCcontroller runs the workflow garbage collector controller
Expand Down Expand Up @@ -482,6 +479,7 @@ func (wfc *WorkflowController) notifySemaphoreConfigUpdate(cm *apiv1.ConfigMap)
log.Warnf("received object from indexer %s is not an unstructured", indexes.SemaphoreConfigIndexName)
continue
}
log.Infof("Adding workflow %s/%s", un.GetNamespace(), un.GetName())
wfc.wfQueue.AddRateLimited(fmt.Sprintf("%s/%s", un.GetNamespace(), un.GetName()))
}
}
Expand Down
1 change: 1 addition & 0 deletions workflow/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -953,6 +953,7 @@ func TestNotifySemaphoreConfigUpdate(t *testing.T) {
for i := 0; i < 3; i++ {
key, _ := controller.wfQueue.Get()
controller.wfQueue.Done(key)
controller.wfQueue.Forget(key)
}
assert.Equal(0, controller.wfQueue.Len())

Expand Down
41 changes: 0 additions & 41 deletions workflow/sync/chain_throttler.go

This file was deleted.

24 changes: 0 additions & 24 deletions workflow/sync/chain_throttler_test.go

This file was deleted.

241 changes: 241 additions & 0 deletions workflow/sync/multi_throttler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
package sync

import (
"container/heap"
"sync"
"time"

"k8s.io/client-go/tools/cache"

wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
)

//go:generate mockery --name=Throttler

// Throttler allows the controller to limit number of items it is processing in parallel.
// Items are processed in priority order, and one processing starts, other items (including higher-priority items)
// will be kept pending until the processing is complete.
// Implementations should be idempotent.
type Throttler interface {
Init(wfs []wfv1.Workflow) error
Add(key Key, priority int32, creationTime time.Time)
// Admit returns if the item should be processed.
Admit(key Key) bool
// Remove notifies throttler that item processing is no longer needed
Remove(key Key)
}

type Key = string
type QueueFunc func(Key)

// NewMultiThrottler creates a new multi throttler for throttling both namespace and global parallelism, a parallelism value of zero disables throttling
func NewMultiThrottler(parallelism int, namespaceParallelismLimit int, queue QueueFunc) Throttler {
namespaceParallelism := make(map[string]int)
return &multiThrottler{
queue: queue,
namespaceParallelism: namespaceParallelism,
namespaceParallelismDefault: namespaceParallelismLimit,
totalParallelism: parallelism,
running: make(map[Key]bool),
pending: make(map[string]*priorityQueue),
lock: &sync.Mutex{},
}
}

type multiThrottler struct {
queue QueueFunc
namespaceParallelism map[string]int
namespaceParallelismDefault int
totalParallelism int
running map[Key]bool
pending map[string]*priorityQueue
lock *sync.Mutex
}

func (m *multiThrottler) Init(wfs []wfv1.Workflow) error {
m.lock.Lock()
defer m.lock.Unlock()

keys := []Key{}
for _, wf := range wfs {
if wf.Status.Phase != wfv1.WorkflowRunning {
continue
}
key, err := cache.MetaNamespaceKeyFunc(&wf)
if err != nil {
return err
}
keys = append(keys, key)
}

for _, key := range keys {
m.running[key] = true
}
return nil
}

func (m *multiThrottler) namespaceCount(namespace string) (int, int) {
setLimit, has := m.namespaceParallelism[namespace]
if !has {
m.namespaceParallelism[namespace] = m.namespaceParallelismDefault
setLimit = m.namespaceParallelismDefault
}
if setLimit == 0 {
// return count is no longer accurate, but preserves behaviour
return 0, 0
}
count := 0
for key := range m.running {
ns, _, _ := cache.SplitMetaNamespaceKey(key)
if ns == namespace {
count++
}
}
return count, setLimit
}

func (m *multiThrottler) namespaceAllows(namespace string) bool {
count, limit := m.namespaceCount(namespace)
return count < limit || limit == 0
}

func (m *multiThrottler) Add(key Key, priority int32, creationTime time.Time) {
m.lock.Lock()
defer m.lock.Unlock()
namespace, _, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return
}
_, ok := m.pending[namespace]
if !ok {
m.pending[namespace] = &priorityQueue{itemByKey: make(map[string]*item)}
}

m.pending[namespace].add(key, priority, creationTime)
m.queueThrottled()
}

func (m *multiThrottler) Admit(key Key) bool {
m.lock.Lock()
defer m.lock.Unlock()

_, ok := m.running[key]
if ok {
return true
}
m.queueThrottled()
return false
}

func (m *multiThrottler) Remove(key Key) {
m.lock.Lock()
defer m.lock.Unlock()

namespace, _, _ := cache.SplitMetaNamespaceKey(key)
delete(m.running, key)
m.pending[namespace].remove(key)
m.queueThrottled()
}

func (m *multiThrottler) queueThrottled() {
if m.totalParallelism != 0 && len(m.running) >= m.totalParallelism {
return
}

minPq := &priorityQueue{itemByKey: make(map[string]*item)}

for _, pq := range m.pending {
if len(pq.items) == 0 {
continue
}
currItem := pq.peek()

namespace, _, err := cache.SplitMetaNamespaceKey(currItem.key)
if err != nil {
return
}
if !m.namespaceAllows(namespace) {
continue
}

minPq.add(currItem.key, currItem.priority, currItem.creationTime)
}
if len(minPq.items) > 0 {
bestItem := minPq.pop()
bestNamespace, _, _ := cache.SplitMetaNamespaceKey(bestItem.key)
m.pending[bestNamespace].pop()
m.running[bestItem.key] = true
m.queue(bestItem.key)
}
}

type item struct {
key string
creationTime time.Time
priority int32
index int
}

type priorityQueue struct {
items []*item
itemByKey map[string]*item
}

func (pq *priorityQueue) pop() *item {
return heap.Pop(pq).(*item)
}

func (pq *priorityQueue) peek() *item {
return pq.items[0]
}

func (pq *priorityQueue) add(key Key, priority int32, creationTime time.Time) {
if res, ok := pq.itemByKey[key]; ok {
if res.priority != priority {
res.priority = priority
heap.Fix(pq, res.index)
}
} else {
heap.Push(pq, &item{key: key, priority: priority, creationTime: creationTime})
}
}

func (pq *priorityQueue) remove(key Key) {
if item, ok := pq.itemByKey[key]; ok {
heap.Remove(pq, item.index)
delete(pq.itemByKey, key)
}
}

func (pq priorityQueue) Len() int { return len(pq.items) }

func (pq priorityQueue) Less(i, j int) bool {
if pq.items[i].priority == pq.items[j].priority {
return pq.items[i].creationTime.Before(pq.items[j].creationTime)
}
return pq.items[i].priority > pq.items[j].priority
}

func (pq priorityQueue) Swap(i, j int) {
pq.items[i], pq.items[j] = pq.items[j], pq.items[i]
pq.items[i].index = i
pq.items[j].index = j
}

func (pq *priorityQueue) Push(x interface{}) {
n := len(pq.items)
item := x.(*item)
item.index = n
pq.items = append(pq.items, item)
pq.itemByKey[item.key] = item
}

func (pq *priorityQueue) Pop() interface{} {
old := pq.items
n := len(old)
item := old[n-1]
item.index = -1
pq.items = old[0 : n-1]
delete(pq.itemByKey, item.key)
return item
}
Loading

0 comments on commit 09d5ee7

Please sign in to comment.