From 1e5878b91099dbc06b86aeaeefa301cb59ca61d1 Mon Sep 17 00:00:00 2001 From: Wei Huang Date: Mon, 8 Feb 2021 16:53:13 -0800 Subject: [PATCH] Introduce a churnOp to scheduler perf testing framework - support two modes: recreate and create - use DynmaicClient to create API objects --- .../config/churn/node-default.yaml | 11 + .../config/churn/pod-default.yaml | 8 + .../config/churn/service-default.yaml | 11 + .../config/performance-config.yaml | 26 +++ .../config/pod-high-priority-large-cpu.yaml | 15 ++ .../scheduler_perf_legacy_test.go | 2 +- .../scheduler_perf/scheduler_perf_test.go | 194 +++++++++++++++++- .../scheduler_perf/scheduler_test.go | 2 +- test/integration/scheduler_perf/util.go | 20 +- 9 files changed, 276 insertions(+), 13 deletions(-) create mode 100644 test/integration/scheduler_perf/config/churn/node-default.yaml create mode 100644 test/integration/scheduler_perf/config/churn/pod-default.yaml create mode 100644 test/integration/scheduler_perf/config/churn/service-default.yaml create mode 100644 test/integration/scheduler_perf/config/pod-high-priority-large-cpu.yaml diff --git a/test/integration/scheduler_perf/config/churn/node-default.yaml b/test/integration/scheduler_perf/config/churn/node-default.yaml new file mode 100644 index 0000000000000..90d919a3200d8 --- /dev/null +++ b/test/integration/scheduler_perf/config/churn/node-default.yaml @@ -0,0 +1,11 @@ +apiVersion: v1 +kind: Node +metadata: + generateName: node-churn- +status: + capacity: + pods: "0" + conditions: + - status: "True" + type: Ready + phase: Running diff --git a/test/integration/scheduler_perf/config/churn/pod-default.yaml b/test/integration/scheduler_perf/config/churn/pod-default.yaml new file mode 100644 index 0000000000000..a8599cdecfe1e --- /dev/null +++ b/test/integration/scheduler_perf/config/churn/pod-default.yaml @@ -0,0 +1,8 @@ +apiVersion: v1 +kind: Pod +metadata: + generateName: pod-churn- +spec: + containers: + - image: k8s.gcr.io/pause:3.4.1 + name: pause diff --git a/test/integration/scheduler_perf/config/churn/service-default.yaml b/test/integration/scheduler_perf/config/churn/service-default.yaml new file mode 100644 index 0000000000000..b8ad80b837840 --- /dev/null +++ b/test/integration/scheduler_perf/config/churn/service-default.yaml @@ -0,0 +1,11 @@ +apiVersion: v1 +kind: Service +metadata: + generateName: service-churn- +spec: + selector: + app: foo + ports: + - protocol: TCP + port: 8080 + targetPort: 8080 diff --git a/test/integration/scheduler_perf/config/performance-config.yaml b/test/integration/scheduler_perf/config/performance-config.yaml index 63c00de28c769..7578e40d84849 100644 --- a/test/integration/scheduler_perf/config/performance-config.yaml +++ b/test/integration/scheduler_perf/config/performance-config.yaml @@ -449,3 +449,29 @@ initNodes: 5000 initPods: 2000 measurePods: 5000 + +- name: SchedulingWithMixedChurn + workloadTemplate: + - opcode: createNodes + countParam: $initNodes + - opcode: churn + mode: recreate + number: 1 + templatePaths: + - config/churn/node-default.yaml + - config/pod-high-priority-large-cpu.yaml + - config/churn/service-default.yaml + intervalMilliseconds: 1000 + - opcode: createPods + countParam: $measurePods + podTemplatePath: config/pod-default.yaml + collectMetrics: true + workloads: + - name: 1000Nodes + params: + initNodes: 1000 + measurePods: 1000 + - name: 5000Nodes + params: + initNodes: 5000 + measurePods: 2000 diff --git a/test/integration/scheduler_perf/config/pod-high-priority-large-cpu.yaml b/test/integration/scheduler_perf/config/pod-high-priority-large-cpu.yaml new file mode 100644 index 0000000000000..06a6e6c2d8bb6 --- /dev/null +++ b/test/integration/scheduler_perf/config/pod-high-priority-large-cpu.yaml @@ -0,0 +1,15 @@ +apiVersion: v1 +kind: Pod +metadata: + generateName: pod-h- +spec: + priority: 10 + containers: + - image: k8s.gcr.io/pause:3.2 + name: pause + ports: + - containerPort: 80 + resources: + requests: + cpu: 9 + memory: 500Mi diff --git a/test/integration/scheduler_perf/scheduler_perf_legacy_test.go b/test/integration/scheduler_perf/scheduler_perf_legacy_test.go index f482034ab503f..e47b737d6998d 100644 --- a/test/integration/scheduler_perf/scheduler_perf_legacy_test.go +++ b/test/integration/scheduler_perf/scheduler_perf_legacy_test.go @@ -441,7 +441,7 @@ func benchmarkScheduling(numExistingPods, minPods int, if b.N < minPods { b.N = minPods } - finalFunc, podInformer, clientset := mustSetupScheduler() + finalFunc, podInformer, clientset, _ := mustSetupScheduler() defer finalFunc() nodePreparer := framework.NewIntegrationTestNodePreparer( diff --git a/test/integration/scheduler_perf/scheduler_perf_test.go b/test/integration/scheduler_perf/scheduler_perf_test.go index 3ed259e8d013f..8bd4b41d71736 100644 --- a/test/integration/scheduler_perf/scheduler_perf_test.go +++ b/test/integration/scheduler_perf/scheduler_perf_test.go @@ -21,16 +21,25 @@ import ( "encoding/json" "fmt" "io/ioutil" + "math" "strings" "sync" "testing" "time" v1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/wait" utilfeature "k8s.io/apiserver/pkg/util/feature" + cacheddiscovery "k8s.io/client-go/discovery/cached" + "k8s.io/client-go/dynamic" coreinformers "k8s.io/client-go/informers/core/v1" clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/restmapper" "k8s.io/component-base/featuregate" featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/klog/v2" @@ -43,7 +52,15 @@ const ( configFile = "config/performance-config.yaml" createNodesOpcode = "createNodes" createPodsOpcode = "createPods" + churnOpcode = "churn" barrierOpcode = "barrier" + + // Two modes supported in "churn" operator. + + // Recreate creates a number of API objects and then delete them, and repeat the iteration. + Recreate = "recreate" + // Create continuously create API objects without deleting them. + Create = "create" ) var ( @@ -90,7 +107,7 @@ func (tc *testCase) collectsMetrics() bool { // workload is a subtest under a testCase that tests the scheduler performance // for a certain ordering of ops. The set of nodes created and pods scheduled -// in a workload may be heterogenous. +// in a workload may be heterogeneous. type workload struct { // Name of the workload. Name string @@ -109,6 +126,7 @@ func (op *op) UnmarshalJSON(b []byte) error { possibleOps := []realOp{ &createNodesOp{}, &createPodsOp{}, + &churnOp{}, &barrierOp{}, // TODO(#93793): add a sleep timer op to simulate waiting? // TODO(#94601): add a delete nodes op to simulate scaling behaviour? @@ -252,6 +270,55 @@ func (cpo createPodsOp) patchParams(w *workload) (realOp, error) { return &cpo, (&cpo).isValid(false) } +// churnOp defines an op where services are created as a part of a workload. +type churnOp struct { + // Must be "churnOp". + Opcode string + // Value must be one of the followings: + // - recreate. In this mode, API objects will be created for N cycles, and then + // deleted in the next N cycles. N is specified by the "Number" field. + // - create. In this mode, API objects will be created (without deletion) until + // reaching a threshold - which is specified by the "Number" field. + Mode string + // Maximum number of API objects to be created. + // Defaults to 0, which means unlimited. + Number int + // Intervals of churning. Defaults to 500 millisecond. + IntervalMilliseconds int64 + // Namespace the churning objects should be created in. Optional, defaults to a unique + // namespace of the format "namespace-". + Namespace *string + // Path of API spec files. + TemplatePaths []string +} + +func (co *churnOp) isValid(_ bool) error { + if co.Opcode != churnOpcode { + return fmt.Errorf("invalid opcode") + } + if co.Mode != Recreate && co.Mode != Create { + return fmt.Errorf("invalid mode: %v. must be one of %v", co.Mode, []string{Recreate, Create}) + } + if co.Number < 0 { + return fmt.Errorf("number (%v) cannot be negative", co.Number) + } + if co.Mode == Recreate && co.Number == 0 { + return fmt.Errorf("number cannot be 0 when mode is %v", Recreate) + } + if len(co.TemplatePaths) == 0 { + return fmt.Errorf("at least one template spec file needs to be specified") + } + return nil +} + +func (*churnOp) collectsMetrics() bool { + return false +} + +func (co churnOp) patchParams(w *workload) (realOp, error) { + return &co, nil +} + // barrierOp defines an op that can be used to wait until all scheduled pods of // one or many namespaces have been bound to nodes. This is useful when pods // were scheduled with SkipWaitToCompletion set to true. @@ -309,7 +376,7 @@ func runWorkload(b *testing.B, tc *testCase, w *workload) []DataItem { // 30 minutes should be plenty enough even for the 5000-node tests. ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute) defer cancel() - finalFunc, podInformer, clientset := mustSetupScheduler() + finalFunc, podInformer, client, dynClient := mustSetupScheduler() b.Cleanup(finalFunc) var mu sync.Mutex @@ -329,7 +396,7 @@ func runWorkload(b *testing.B, tc *testCase, w *workload) []DataItem { } switch concreteOp := realOp.(type) { case *createNodesOp: - nodePreparer, err := getNodePreparer(fmt.Sprintf("node-%d-", opIndex), concreteOp, clientset) + nodePreparer, err := getNodePreparer(fmt.Sprintf("node-%d-", opIndex), concreteOp, client) if err != nil { b.Fatalf("op %d: %v", opIndex, err) } @@ -359,7 +426,7 @@ func runWorkload(b *testing.B, tc *testCase, w *workload) []DataItem { go collector.run(collectorCtx) } } - if err := createPods(namespace, concreteOp, clientset); err != nil { + if err := createPods(namespace, concreteOp, client); err != nil { b.Fatalf("op %d: %v", opIndex, err) } if concreteOp.SkipWaitToCompletion { @@ -387,6 +454,103 @@ func runWorkload(b *testing.B, tc *testCase, w *workload) []DataItem { mu.Unlock() } + case *churnOp: + var namespace string + if concreteOp.Namespace != nil { + namespace = *concreteOp.Namespace + } else { + namespace = fmt.Sprintf("namespace-%d", opIndex) + } + restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cacheddiscovery.NewMemCacheClient(client.Discovery())) + // Ensure the namespace exists. + nsObj := &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}} + if _, err := client.CoreV1().Namespaces().Create(ctx, nsObj, metav1.CreateOptions{}); err != nil && !apierrors.IsAlreadyExists(err) { + b.Fatalf("op %d: unable to create namespace %v: %v", opIndex, namespace, err) + } + + var churnFns []func(name string) string + + for i, path := range concreteOp.TemplatePaths { + unstructuredObj, gvk, err := getUnstructuredFromFile(path) + if err != nil { + b.Fatalf("op %d: unable to parse the %v-th template path: %v", opIndex, i, err) + } + // Obtain GVR. + mapping, err := restMapper.RESTMapping(gvk.GroupKind(), gvk.Version) + if err != nil { + b.Fatalf("op %d: unable to find GVR for %v: %v", opIndex, gvk, err) + } + gvr := mapping.Resource + // Distinguish cluster-scoped with namespaced API objects. + var dynRes dynamic.ResourceInterface + if mapping.Scope.Name() == meta.RESTScopeNameNamespace { + dynRes = dynClient.Resource(gvr).Namespace(namespace) + } else { + dynRes = dynClient.Resource(gvr) + } + + churnFns = append(churnFns, func(name string) string { + if name != "" { + dynRes.Delete(ctx, name, metav1.DeleteOptions{}) + return "" + } + + live, err := dynRes.Create(ctx, unstructuredObj, metav1.CreateOptions{}) + if err != nil { + return "" + } + return live.GetName() + }) + } + + var interval int64 = 500 + if concreteOp.IntervalMilliseconds != 0 { + interval = concreteOp.IntervalMilliseconds + } + ticker := time.NewTicker(time.Duration(interval) * time.Millisecond) + defer ticker.Stop() + + if concreteOp.Mode == Recreate { + go func() { + retVals := make([][]string, len(churnFns)) + // For each churn function, instantiate a slice of strings with length "concreteOp.Number". + for i := range retVals { + retVals[i] = make([]string, concreteOp.Number) + } + + count := 0 + for { + select { + case <-ticker.C: + for i := range churnFns { + retVals[i][count%concreteOp.Number] = churnFns[i](retVals[i][count%concreteOp.Number]) + } + count++ + case <-ctx.Done(): + return + } + } + }() + } else if concreteOp.Mode == Create { + go func() { + count, threshold := 0, concreteOp.Number + if threshold == 0 { + threshold = math.MaxInt32 + } + for count < threshold { + select { + case <-ticker.C: + for i := range churnFns { + churnFns[i]("") + } + count++ + case <-ctx.Done(): + return + } + } + }() + } + case *barrierOp: for _, namespace := range concreteOp.Namespaces { if _, ok := numPodsScheduledPerNamespace[namespace]; !ok { @@ -525,6 +689,28 @@ func getSpecFromFile(path *string, spec interface{}) error { return yaml.UnmarshalStrict(bytes, spec) } +func getUnstructuredFromFile(path string) (*unstructured.Unstructured, *schema.GroupVersionKind, error) { + bytes, err := ioutil.ReadFile(path) + if err != nil { + return nil, nil, err + } + + bytes, err = yaml.YAMLToJSONStrict(bytes) + if err != nil { + return nil, nil, fmt.Errorf("cannot covert YAML to JSON: %v", err) + } + + obj, gvk, err := unstructured.UnstructuredJSONScheme.Decode(bytes, nil, nil) + if err != nil { + return nil, nil, err + } + unstructuredObj, ok := obj.(*unstructured.Unstructured) + if !ok { + return nil, nil, fmt.Errorf("cannot convert spec file in %v to an unstructured obj", path) + } + return unstructuredObj, gvk, nil +} + func getTestCases(path string) ([]*testCase, error) { testCases := make([]*testCase, 0) if err := getSpecFromFile(&path, &testCases); err != nil { diff --git a/test/integration/scheduler_perf/scheduler_test.go b/test/integration/scheduler_perf/scheduler_test.go index 5744eb113242d..f75c2ce81cec4 100644 --- a/test/integration/scheduler_perf/scheduler_test.go +++ b/test/integration/scheduler_perf/scheduler_test.go @@ -116,7 +116,7 @@ type testConfig struct { // getBaseConfig returns baseConfig after initializing number of nodes and pods. func getBaseConfig(nodes int, pods int) *testConfig { - destroyFunc, podInformer, clientset := mustSetupScheduler() + destroyFunc, podInformer, clientset, _ := mustSetupScheduler() return &testConfig{ clientset: clientset, destroyFunc: destroyFunc, diff --git a/test/integration/scheduler_perf/util.go b/test/integration/scheduler_perf/util.go index 7b846c9ed8fcb..98b935666539d 100644 --- a/test/integration/scheduler_perf/util.go +++ b/test/integration/scheduler_perf/util.go @@ -33,6 +33,7 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/dynamic" coreinformers "k8s.io/client-go/informers/core/v1" clientset "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" @@ -55,20 +56,25 @@ var dataItemsDir = flag.String("data-items-dir", "", "destination directory for // mustSetupScheduler starts the following components: // - k8s api server (a.k.a. master) // - scheduler -// It returns clientset and destroyFunc which should be used to +// It returns regular and dynamic clients, and destroyFunc which should be used to // remove resources after finished. // Notes on rate limiter: // - client rate limit is set to 5000. -func mustSetupScheduler() (util.ShutdownFunc, coreinformers.PodInformer, clientset.Interface) { +func mustSetupScheduler() (util.ShutdownFunc, coreinformers.PodInformer, clientset.Interface, dynamic.Interface) { apiURL, apiShutdown := util.StartApiserver() - clientSet := clientset.NewForConfigOrDie(&restclient.Config{ + + cfg := &restclient.Config{ Host: apiURL, ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}, QPS: 5000.0, Burst: 5000, - }) - _, podInformer, schedulerShutdown := util.StartScheduler(clientSet) - fakePVControllerShutdown := util.StartFakePVController(clientSet) + } + + client := clientset.NewForConfigOrDie(cfg) + dynClient := dynamic.NewForConfigOrDie(cfg) + + _, podInformer, schedulerShutdown := util.StartScheduler(client) + fakePVControllerShutdown := util.StartFakePVController(client) shutdownFunc := func() { fakePVControllerShutdown() @@ -76,7 +82,7 @@ func mustSetupScheduler() (util.ShutdownFunc, coreinformers.PodInformer, clients apiShutdown() } - return shutdownFunc, podInformer, clientSet + return shutdownFunc, podInformer, client, dynClient } // Returns the list of scheduled pods in the specified namespaces.