Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add ut common framework #3343

Merged
merged 1 commit into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
feat: add ut common framework
fix: fake bind is async process and need to wait it fininsh; add binder chan
buffer in some UT
fix: add lock for fakeBinder

Signed-off-by: lowang-bh <lhui_wang@163.com>
  • Loading branch information
lowang-bh committed Mar 11, 2024
commit 28173006405767eb9efb801fa9126bb1c3d94d0c
4 changes: 2 additions & 2 deletions pkg/scheduler/actions/allocate/allocate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func TestAllocate(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
binder := &util.FakeBinder{
Binds: map[string]string{},
Channel: make(chan string),
Channel: make(chan string, 10),
}
schedulerCache := &cache.SchedulerCache{
Nodes: make(map[string]*api.NodeInfo),
Expand Down Expand Up @@ -323,7 +323,7 @@ func TestAllocateWithDynamicPVC(t *testing.T) {
fakeVolumeBinder := util.NewFakeVolumeBinder(kubeClient)
binder := &util.FakeBinder{
Binds: map[string]string{},
Channel: make(chan string),
Channel: make(chan string, 10),
}
schedulerCache := &cache.SchedulerCache{
Nodes: make(map[string]*api.NodeInfo),
Expand Down
35 changes: 20 additions & 15 deletions pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,22 @@ func (su *defaultStatusUpdater) UpdatePodGroup(pg *schedulingapi.PodGroup) (*sch
return podGroupInfo, nil
}

// UpdateQueueStatus will update the status of queue
func (su *defaultStatusUpdater) UpdateQueueStatus(queue *schedulingapi.QueueInfo) error {
var newQueue = &vcv1beta1.Queue{}
if err := schedulingscheme.Scheme.Convert(queue.Queue, newQueue, nil); err != nil {
klog.Errorf("error occurred in converting scheduling.Queue to v1beta1.Queue: %s", err.Error())
return err
}

_, err := su.vcclient.SchedulingV1beta1().Queues().UpdateStatus(context.TODO(), newQueue, metav1.UpdateOptions{})
if err != nil {
klog.Errorf("error occurred in updating Queue <%s>: %s", newQueue.Name, err.Error())
return err
}
return nil
}

type defaultVolumeBinder struct {
volumeBinder volumescheduling.SchedulerVolumeBinder
}
Expand Down Expand Up @@ -441,7 +457,7 @@ func (sc *SchedulerCache) setBatchBindParallel() {
func (sc *SchedulerCache) setDefaultVolumeBinder() {
logger := klog.FromContext(context.TODO())
var capacityCheck *volumescheduling.CapacityCheck
if options.ServerOpts.EnableCSIStorage && utilfeature.DefaultFeatureGate.Enabled(features.CSIStorage) {
if options.ServerOpts != nil && options.ServerOpts.EnableCSIStorage && utilfeature.DefaultFeatureGate.Enabled(features.CSIStorage) {
capacityCheck = &volumescheduling.CapacityCheck{
CSIDriverInformer: sc.csiDriverInformer,
CSIStorageCapacityInformer: sc.csiStorageCapacityInformer,
Expand Down Expand Up @@ -655,7 +671,7 @@ func (sc *SchedulerCache) addEventHandler() {
},
)

if options.ServerOpts.EnableCSIStorage && utilfeature.DefaultFeatureGate.Enabled(features.CSIStorage) {
if options.ServerOpts != nil && options.ServerOpts.EnableCSIStorage && utilfeature.DefaultFeatureGate.Enabled(features.CSIStorage) {
sc.csiDriverInformer = informerFactory.Storage().V1().CSIDrivers()
sc.csiStorageCapacityInformer = informerFactory.Storage().V1beta1().CSIStorageCapacities()
}
Expand Down Expand Up @@ -693,7 +709,7 @@ func (sc *SchedulerCache) addEventHandler() {
},
})

if options.ServerOpts.EnablePriorityClass && utilfeature.DefaultFeatureGate.Enabled(features.PriorityClass) {
if options.ServerOpts != nil && options.ServerOpts.EnablePriorityClass && utilfeature.DefaultFeatureGate.Enabled(features.PriorityClass) {
sc.pcInformer = informerFactory.Scheduling().V1().PriorityClasses()
sc.pcInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: sc.AddPriorityClass,
Expand Down Expand Up @@ -1378,18 +1394,7 @@ func (sc *SchedulerCache) UpdateJobStatus(job *schedulingapi.JobInfo, updatePG b

// UpdateQueueStatus update the status of queue.
func (sc *SchedulerCache) UpdateQueueStatus(queue *schedulingapi.QueueInfo) error {
var newQueue = &vcv1beta1.Queue{}
if err := schedulingscheme.Scheme.Convert(queue.Queue, newQueue, nil); err != nil {
klog.Errorf("error occurred in converting scheduling.Queue to v1beta1.Queue: %s", err.Error())
return err
}

_, err := sc.vcClient.SchedulingV1beta1().Queues().UpdateStatus(context.TODO(), newQueue, metav1.UpdateOptions{})
if err != nil {
klog.Errorf("error occurred in updating Queue <%s>: %s", newQueue.Name, err.Error())
return err
}
return nil
return sc.StatusUpdater.UpdateQueueStatus(queue)
}

func (sc *SchedulerCache) recordPodGroupEvent(podGroup *schedulingapi.PodGroup, eventType, reason, msg string) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/scheduler/cache/cache_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func checkAndSetDefaultInterface(sc *SchedulerCache) {
}

func getNodeWorkers() uint32 {
if options.ServerOpts.NodeWorkerThreads > 0 {
if options.ServerOpts != nil && options.ServerOpts.NodeWorkerThreads > 0 {
return options.ServerOpts.NodeWorkerThreads
}
threads, err := strconv.Atoi(os.Getenv("NODE_WORKER_THREADS"))
Expand Down Expand Up @@ -116,7 +116,7 @@ func newMockSchedulerCache(schedulerName string) *SchedulerCache {

NodeList: []string{},
}
if len(options.ServerOpts.NodeSelector) > 0 {
if options.ServerOpts != nil && len(options.ServerOpts.NodeSelector) > 0 {
msc.updateNodeSelectors(options.ServerOpts.NodeSelector)
}
msc.setBatchBindParallel()
Expand Down
1 change: 1 addition & 0 deletions pkg/scheduler/cache/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ type Evictor interface {
type StatusUpdater interface {
UpdatePodCondition(pod *v1.Pod, podCondition *v1.PodCondition) (*v1.Pod, error)
UpdatePodGroup(pg *api.PodGroup) (*api.PodGroup, error)
UpdateQueueStatus(queue *api.QueueInfo) error
}

// BatchBinder updates podgroup or job information
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/plugins/drf/hdrf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ func TestHDRF(t *testing.T) {

binder := &util.FakeBinder{
Binds: map[string]string{},
Channel: make(chan string),
Channel: make(chan string, 300),
}
schedulerCache := &cache.SchedulerCache{
Nodes: make(map[string]*api.NodeInfo),
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/plugins/predicates/predicates_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func TestEventHandler(t *testing.T) {
// initialize schedulerCache
binder := &util.FakeBinder{
Binds: map[string]string{},
Channel: make(chan string),
Channel: make(chan string, 10),
}
recorder := record.NewFakeRecorder(100)
go func() {
Expand Down
234 changes: 234 additions & 0 deletions pkg/scheduler/uthelper/helper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
/*
Copyright 2024 The Volcano Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package uthelper

import (
"fmt"
"reflect"
"time"

v1 "k8s.io/api/core/v1"
schedulingv1 "k8s.io/api/scheduling/v1"

"volcano.sh/apis/pkg/apis/scheduling"
vcapisv1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
"volcano.sh/volcano/pkg/scheduler/api"
"volcano.sh/volcano/pkg/scheduler/cache"
"volcano.sh/volcano/pkg/scheduler/conf"
"volcano.sh/volcano/pkg/scheduler/framework"
"volcano.sh/volcano/pkg/scheduler/util"
)

// RegistPlugins plugins
func RegistPlugins(plugins map[string]framework.PluginBuilder) {
for name, plugin := range plugins {
framework.RegisterPluginBuilder(name, plugin)
}
}

// TestCommonStruct is the most common used resource when do UT
// others can wrap it in a new struct
type TestCommonStruct struct {
Name string
Plugins map[string]framework.PluginBuilder // plugins for each case
Pods []*v1.Pod
Nodes []*v1.Node
PodGroups []*vcapisv1.PodGroup
Queues []*vcapisv1.Queue
PriClass []*schedulingv1.PriorityClass
Bind map[string]string // bind results: ns/podName -> nodeName
PipeLined map[string][]string // pipelined results: map[jobID][]{nodename}
Evicted []string // evicted pods list of ns/podName
Status map[api.JobID]scheduling.PodGroupPhase // final status
BindsNum int // binds events numbers
EvictNum int // evict events numbers, include preempted and reclaimed evict events

// fake interface instance when check results need
stop chan struct{}
binder cache.Binder
evictor cache.Evictor
stsUpdator cache.StatusUpdater
volBinder cache.VolumeBinder
ssn *framework.Session // store opened session
}

var _ Interface = &TestCommonStruct{}

// RegistSession open session with tiers and configuration, and mock schedulerCache with self-defined FakeBinder and FakeEvictor
func (test *TestCommonStruct) RegistSession(tiers []conf.Tier, config []conf.Configuration) *framework.Session {
binder := &util.FakeBinder{
Binds: map[string]string{},
Channel: make(chan string),
}
evictor := &util.FakeEvictor{
Channel: make(chan string),
}
stsUpdator := &util.FakeStatusUpdater{}
test.binder = binder
test.evictor = evictor
test.stop = make(chan struct{})
// Create scheduler cache with self-defined binder and evictor
schedulerCache := cache.NewCustomMockSchedulerCache("utmock-scheduler", binder, evictor, stsUpdator, nil, nil, nil)
test.stsUpdator = schedulerCache.StatusUpdater
test.volBinder = schedulerCache.VolumeBinder

for _, node := range test.Nodes {
schedulerCache.AddOrUpdateNode(node)
}
for _, pod := range test.Pods {
schedulerCache.AddPod(pod)
}
for _, pg := range test.PodGroups {
schedulerCache.AddPodGroupV1beta1(pg)
}
for _, queue := range test.Queues {
schedulerCache.AddQueueV1beta1(queue)
}
for _, pc := range test.PriClass {
schedulerCache.AddPriorityClass(pc)
}

RegistPlugins(test.Plugins)
ssn := framework.OpenSession(schedulerCache, tiers, config)
test.ssn = ssn
schedulerCache.Run(test.stop)
return ssn
}

// Run choose to run passed in actions; if no actions provided, will panic
func (test *TestCommonStruct) Run(actions []framework.Action) {
if len(actions) == 0 {
panic("no actions provided, please specify a list of actions to execute")
}
for _, action := range actions {
action.Initialize()
action.Execute(test.ssn)
action.UnInitialize()
}
}

// Close do release resource and clean up
func (test *TestCommonStruct) Close() {
framework.CloseSession(test.ssn)
framework.CleanupPluginBuilders()
close(test.stop)
}

// CheckAll checks all the need status
func (test *TestCommonStruct) CheckAll(caseIndex int) (err error) {
if err = test.CheckBind(caseIndex); err != nil {
return
}
if err = test.CheckEvict(caseIndex); err != nil {
return
}
if err = test.CheckPipelined(caseIndex); err != nil {
return
}
return test.CheckPGStatus(caseIndex)
}

// CheckBind check expected bind result
func (test *TestCommonStruct) CheckBind(caseIndex int) error {
binder := test.binder.(*util.FakeBinder)
for i := 0; i < test.BindsNum; i++ {
select {
case <-binder.Channel:
case <-time.After(300 * time.Millisecond):
return fmt.Errorf("Failed to get Bind request in case %d(%s).", caseIndex, test.Name)
}
}

if len(test.Bind) != len(binder.Binds) {
return fmt.Errorf("case %d(%s) check bind: \nwant: %v, \ngot %v ", caseIndex, test.Name, test.Bind, binder.Binds)
}
for key, value := range test.Bind {
got := binder.Binds[key]
if value != got {
return fmt.Errorf("case %d(%s) check bind: \nwant: %v->%v\n got: %v->%v ", caseIndex, test.Name, key, value, key, got)
}
}
return nil
}

// CheckEvict check the evicted result
func (test *TestCommonStruct) CheckEvict(caseIndex int) error {
evictor := test.evictor.(*util.FakeEvictor)
for i := 0; i < test.EvictNum; i++ {
select {
case <-evictor.Channel:
case <-time.After(300 * time.Millisecond):
return fmt.Errorf("Failed to get Evict request in case %d(%s).", caseIndex, test.Name)
}
}

evicts := evictor.Evicts()
if len(test.Evicted) != len(evicts) {
return fmt.Errorf("case %d(%s) check evict: \nwant: %v, \ngot %v ", caseIndex, test.Name, test.Evicted, evicts)
}

expect := map[string]int{} // evicted number
got := map[string]int{}
for _, v := range test.Evicted {
expect[v]++
}
for _, v := range evicts {
got[v]++
}

if !reflect.DeepEqual(expect, got) {
return fmt.Errorf("case %d(%s) check evict: \nwant: %v\n got: %v ", caseIndex, test.Name, expect, got)
}
return nil
}

// CheckPGStatus check job's podgroups status
func (test *TestCommonStruct) CheckPGStatus(caseIndex int) error {
ssn := test.ssn
for jobID, phase := range test.Status {
job := ssn.Jobs[jobID]
if job == nil {
return fmt.Errorf("case %d(%s) check podgroup status, job <%v> doesn't exist in session", caseIndex, test.Name, jobID)
}
got := job.PodGroup.Status.Phase
if phase != got {
return fmt.Errorf("case %d(%s) check podgroup <%v> status:\n want: %v, got: %v", caseIndex, test.Name, jobID, phase, got)
}
}
return nil
}

// CheckPipelined checks pipeline results
func (test *TestCommonStruct) CheckPipelined(caseIndex int) error {
ssn := test.ssn
for jobID, nodes := range test.PipeLined {
job := ssn.Jobs[api.JobID(jobID)]
if job == nil {
return fmt.Errorf("case %d(%s) check pipeline, job <%v> doesn't exist in session", caseIndex, test.Name, jobID)
}
pipeLined := job.TaskStatusIndex[api.Pipelined]
if len(pipeLined) == 0 {
return fmt.Errorf("case %d(%s) check pipeline, want pipelined job: %v, actualy, no tasks pipelined to nodes %v", caseIndex, test.Name, jobID, nodes)
}
for _, task := range pipeLined {
if !Contains(nodes, task.NodeName) {
return fmt.Errorf("case %d(%s) check pipeline: actual: %v->%v, want: %v->%v", caseIndex, test.Name, task.Name, task.NodeName, task.Name, nodes)
}
}
}
return nil
}
Loading
Loading