Skip to content

Commit

Permalink
add mock cache for ut and performance test
Browse files Browse the repository at this point in the history
Signed-off-by: lowang-bh <lhui_wang@163.com>
  • Loading branch information
lowang-bh committed Feb 2, 2024
1 parent a0742c8 commit cf51c87
Show file tree
Hide file tree
Showing 5 changed files with 242 additions and 117 deletions.
8 changes: 8 additions & 0 deletions cmd/scheduler/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,3 +191,11 @@ func (s *ServerOption) ParseCAFiles(decryptFunc DecryptFunc) error {

return nil
}

// Default new and registry a default one
func Default() *ServerOption {
s := NewServerOption()
s.AddFlags(pflag.CommandLine)
s.RegisterOptions()
return s
}
146 changes: 85 additions & 61 deletions pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,22 +407,64 @@ func (pgb *podgroupBinder) Bind(job *schedulingapi.JobInfo, cluster string) (*sc
return job, nil
}

func newSchedulerCache(config *rest.Config, schedulerNames []string, defaultQueue string, nodeSelectors []string, nodeWorkers uint32, ignoredProvisioners []string) *SchedulerCache {
logger := klog.FromContext(context.TODO())
kubeClient, err := kubernetes.NewForConfig(config)
if err != nil {
panic(fmt.Sprintf("failed init kubeClient, with err: %v", err))
// updateNodeSelectors parse and update node selector key value pairs to schedule cache
func (sc *SchedulerCache) updateNodeSelectors(nodeSelectors []string) {
for _, nodeSelectorLabel := range nodeSelectors {
nodeSelectorLabelLen := len(nodeSelectorLabel)
if nodeSelectorLabelLen <= 0 {
continue
}
// check input
index := strings.Index(nodeSelectorLabel, ":")
if index < 0 || index >= (nodeSelectorLabelLen-1) {
continue
}
nodeSelectorLabelName := strings.TrimSpace(nodeSelectorLabel[:index])
nodeSelectorLabelValue := strings.TrimSpace(nodeSelectorLabel[index+1:])
key := nodeSelectorLabelName + ":" + nodeSelectorLabelValue
sc.nodeSelectorLabels[key] = ""
}
vcClient, err := vcclient.NewForConfig(config)
if err != nil {
panic(fmt.Sprintf("failed init vcClient, with err: %v", err))
}

// setBatchBindParallel configure the parallel when binding tasks to apiserver
func (sc *SchedulerCache) setBatchBindParallel() {
sc.BindFlowChannel = make(chan *schedulingapi.TaskInfo, 5000)
var batchNum int
batchNum, err := strconv.Atoi(os.Getenv("BATCH_BIND_NUM"))
if err == nil && batchNum > 0 {
sc.batchNum = batchNum
} else {
sc.batchNum = 1
}
eventClient, err := kubernetes.NewForConfig(config)
if err != nil {
panic(fmt.Sprintf("failed init eventClient, with err: %v", err))
}

func (sc *SchedulerCache) setDefaultVolumeBinder() {
logger := klog.FromContext(context.TODO())
var capacityCheck *volumescheduling.CapacityCheck
if options.ServerOpts.EnableCSIStorage && utilfeature.DefaultFeatureGate.Enabled(features.CSIStorage) {
capacityCheck = &volumescheduling.CapacityCheck{
CSIDriverInformer: sc.csiDriverInformer,
CSIStorageCapacityInformer: sc.csiStorageCapacityInformer,
}
}
sc.VolumeBinder = &defaultVolumeBinder{
volumeBinder: volumescheduling.NewVolumeBinder(
logger,
sc.kubeClient,
sc.podInformer,
sc.nodeInformer,
sc.csiNodeInformer,
sc.pvcInformer,
sc.pvInformer,
sc.scInformer,
capacityCheck,
30*time.Second,
),
}
}

// create default queue
// newDefaultQueue init default queue
func newDefaultQueue(vcClient vcclient.Interface, defaultQueue string) {
reclaimable := true
defaultQue := vcv1beta1.Queue{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -434,7 +476,7 @@ func newSchedulerCache(config *rest.Config, schedulerNames []string, defaultQueu
},
}

err = retry.OnError(wait.Backoff{
err := retry.OnError(wait.Backoff{
Steps: 60,
Duration: time.Second,
Factor: 1,
Expand All @@ -448,6 +490,24 @@ func newSchedulerCache(config *rest.Config, schedulerNames []string, defaultQueu
if err != nil && !apierrors.IsAlreadyExists(err) {
panic(fmt.Errorf("failed init default queue, with err: %v", err))
}
}

func newSchedulerCache(config *rest.Config, schedulerNames []string, defaultQueue string, nodeSelectors []string, nodeWorkers uint32, ignoredProvisioners []string) *SchedulerCache {
kubeClient, err := kubernetes.NewForConfig(config)
if err != nil {
panic(fmt.Sprintf("failed init kubeClient, with err: %v", err))
}
vcClient, err := vcclient.NewForConfig(config)
if err != nil {
panic(fmt.Sprintf("failed init vcClient, with err: %v", err))
}
eventClient, err := kubernetes.NewForConfig(config)
if err != nil {
panic(fmt.Sprintf("failed init eventClient, with err: %v", err))
}

// create default queue
newDefaultQueue(vcClient, defaultQueue)
klog.Infof("Create init queue named default")

sc := &SchedulerCache{
Expand Down Expand Up @@ -479,42 +539,21 @@ func newSchedulerCache(config *rest.Config, schedulerNames []string, defaultQueu
sc.IgnoredCSIProvisioners = ignoredProvisionersSet

if len(nodeSelectors) > 0 {
for _, nodeSelectorLabel := range nodeSelectors {
nodeSelectorLabelLen := len(nodeSelectorLabel)
if nodeSelectorLabelLen <= 0 {
continue
}
// check input
index := strings.Index(nodeSelectorLabel, ":")
if index < 0 || index >= (nodeSelectorLabelLen-1) {
continue
}
nodeSelectorLabelName := strings.TrimSpace(nodeSelectorLabel[:index])
nodeSelectorLabelValue := strings.TrimSpace(nodeSelectorLabel[index+1:])
key := nodeSelectorLabelName + ":" + nodeSelectorLabelValue
sc.nodeSelectorLabels[key] = ""
}
sc.updateNodeSelectors(nodeSelectors)
}
// Prepare event clients.
broadcaster := record.NewBroadcaster()
broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: eventClient.CoreV1().Events("")})
sc.Recorder = broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: commonutil.GenerateComponentName(sc.schedulerNames)})

sc.BindFlowChannel = make(chan *schedulingapi.TaskInfo, 5000)
// set concurrency configuration when binding
sc.setBatchBindParallel()
if bindMethodMap == nil {
klog.V(3).Info("no registered bind method, new a default one")
bindMethodMap = NewDefaultBinder(sc.kubeClient, sc.Recorder)
}
sc.Binder = GetBindMethod()

var batchNum int
batchNum, err = strconv.Atoi(os.Getenv("BATCH_BIND_NUM"))
if err == nil && batchNum > 0 {
sc.batchNum = batchNum
} else {
sc.batchNum = 1
}

sc.Evictor = &defaultEvictor{
kubeclient: sc.kubeClient,
recorder: sc.Recorder,
Expand All @@ -530,6 +569,14 @@ func newSchedulerCache(config *rest.Config, schedulerNames []string, defaultQueu
vcclient: sc.vcClient,
}

// add all events handlers
sc.addEventHandler()
// finally, init default volume binder which has dependencies on other informers
sc.setDefaultVolumeBinder()
return sc
}

func (sc *SchedulerCache) addEventHandler() {
informerFactory := informers.NewSharedInformerFactory(sc.kubeClient, 0)
sc.informerFactory = informerFactory
mySchedulerPodName, c := getMultiSchedulerInfo()
Expand Down Expand Up @@ -604,31 +651,9 @@ func newSchedulerCache(config *rest.Config, schedulerNames []string, defaultQueu
},
)

var capacityCheck *volumescheduling.CapacityCheck
if options.ServerOpts.EnableCSIStorage && utilfeature.DefaultFeatureGate.Enabled(features.CSIStorage) {
sc.csiDriverInformer = informerFactory.Storage().V1().CSIDrivers()
sc.csiStorageCapacityInformer = informerFactory.Storage().V1beta1().CSIStorageCapacities()
capacityCheck = &volumescheduling.CapacityCheck{
CSIDriverInformer: sc.csiDriverInformer,
CSIStorageCapacityInformer: sc.csiStorageCapacityInformer,
}
} else {
capacityCheck = nil
}

sc.VolumeBinder = &defaultVolumeBinder{
volumeBinder: volumescheduling.NewVolumeBinder(
logger,
sc.kubeClient,
sc.podInformer,
sc.nodeInformer,
sc.csiNodeInformer,
sc.pvcInformer,
sc.pvInformer,
sc.scInformer,
capacityCheck,
30*time.Second,
),
}

// create informer for pod information
Expand All @@ -637,7 +662,7 @@ func newSchedulerCache(config *rest.Config, schedulerNames []string, defaultQueu
FilterFunc: func(obj interface{}) bool {
switch v := obj.(type) {
case *v1.Pod:
if !responsibleForPod(v, schedulerNames, mySchedulerPodName, c) {
if !responsibleForPod(v, sc.schedulerNames, mySchedulerPodName, c) {
if len(v.Spec.NodeName) == 0 {
return false
}
Expand Down Expand Up @@ -728,7 +753,6 @@ func newSchedulerCache(config *rest.Config, schedulerNames []string, defaultQueu
DeleteFunc: sc.DeleteNumaInfoV1alpha1,
})
}
return sc
}

// Run starts the schedulerCache
Expand Down
126 changes: 126 additions & 0 deletions pkg/scheduler/cache/cache_mock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package cache

import (
"math"
"os"
"strconv"

schedulingv1 "k8s.io/api/scheduling/v1"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"

fakevcClient "volcano.sh/apis/pkg/client/clientset/versioned/fake"
"volcano.sh/volcano/cmd/scheduler/app/options"
schedulingapi "volcano.sh/volcano/pkg/scheduler/api"
)

// NewCustomMockSchedulerCache returns a mock scheduler cache with custom interface
func NewCustomMockSchedulerCache(schedulerName string,
binder Binder,
evictor Evictor,
statusUpdater StatusUpdater,
PodGroupBinder BatchBinder,
volumeBinder VolumeBinder,
recorder record.EventRecorder,
) *SchedulerCache {
msc := newMockSchedulerCache(schedulerName)
// add all events handlers
msc.addEventHandler()
msc.Recorder = recorder
msc.Binder = binder
msc.Evictor = evictor
msc.StatusUpdater = statusUpdater
msc.PodGroupBinder = PodGroupBinder
// use custom volume binder
msc.VolumeBinder = volumeBinder
checkAndSetDefaultInterface(msc)
return msc
}

// NewDefaultMockSchedulerCache returns a mock scheduler cache with interface mocked with default fake clients
// Notes that default events recorder's buffer only has a length 100;
// when use it do performance test, should use a &FakeRecorder{} without length limit to avoid block
func NewDefaultMockSchedulerCache(schedulerName string) *SchedulerCache {
msc := newMockSchedulerCache(schedulerName)
// add all events handlers
msc.addEventHandler()
checkAndSetDefaultInterface(msc)
return msc
}

func checkAndSetDefaultInterface(sc *SchedulerCache) {
if sc.Recorder == nil {
sc.Recorder = record.NewFakeRecorder(100) // to avoid blocking, we can pass in &FakeRecorder{} to NewCustomMockSchedulerCache
}
if sc.Binder == nil {
sc.Binder = &DefaultBinder{
kubeclient: sc.kubeClient,
recorder: sc.Recorder,
}
}
if sc.Evictor == nil {
sc.Evictor = &defaultEvictor{
kubeclient: sc.kubeClient,
recorder: sc.Recorder,
}
}
if sc.StatusUpdater == nil {
sc.StatusUpdater = &defaultStatusUpdater{
kubeclient: sc.kubeClient,
vcclient: sc.vcClient,
}
}
if sc.PodGroupBinder == nil {
sc.PodGroupBinder = &podgroupBinder{
kubeclient: sc.kubeClient,
vcclient: sc.vcClient,
}
}
// finally, init default fake volume binder which has dependencies on other informers
if sc.VolumeBinder == nil {
sc.setDefaultVolumeBinder()
}
}

func getNodeWorkers() uint32 {
if options.ServerOpts.NodeWorkerThreads > 0 {
return options.ServerOpts.NodeWorkerThreads
}
threads, err := strconv.Atoi(os.Getenv("NODE_WORKER_THREADS"))
if err == nil && threads > 0 && threads <= math.MaxUint32 {
return uint32(threads)
}
return 2 //default 2
}

// newMockSchedulerCache init the mock scheduler cache structure
func newMockSchedulerCache(schedulerName string) *SchedulerCache {
msc := &SchedulerCache{
Jobs: make(map[schedulingapi.JobID]*schedulingapi.JobInfo),
Nodes: make(map[string]*schedulingapi.NodeInfo),
Queues: make(map[schedulingapi.QueueID]*schedulingapi.QueueInfo),
PriorityClasses: make(map[string]*schedulingv1.PriorityClass),
errTasks: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
nodeQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
DeletedJobs: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
kubeClient: fake.NewSimpleClientset(),
vcClient: fakevcClient.NewSimpleClientset(),
restConfig: nil,
defaultQueue: "default",
schedulerNames: []string{schedulerName},
nodeSelectorLabels: make(map[string]string),
NamespaceCollection: make(map[string]*schedulingapi.NamespaceCollection),
CSINodesStatus: make(map[string]*schedulingapi.CSINodeStatusInfo),
imageStates: make(map[string]*imageState),

NodeList: []string{},
}
if len(options.ServerOpts.NodeSelector) > 0 {
msc.updateNodeSelectors(options.ServerOpts.NodeSelector)
}
msc.setBatchBindParallel()
msc.nodeWorkers = getNodeWorkers()

return msc
}
Loading

0 comments on commit cf51c87

Please sign in to comment.