Skip to content

Commit

Permalink
Merge pull request #98900 from Huang-Wei/churn-cluster-op
Browse files Browse the repository at this point in the history
Introduce a churnOp to scheduler perf testing framework
  • Loading branch information
k8s-ci-robot authored Mar 11, 2021
2 parents 784df7a + 1e5878b commit 823fa75
Show file tree
Hide file tree
Showing 9 changed files with 276 additions and 13 deletions.
11 changes: 11 additions & 0 deletions test/integration/scheduler_perf/config/churn/node-default.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
apiVersion: v1
kind: Node
metadata:
generateName: node-churn-
status:
capacity:
pods: "0"
conditions:
- status: "True"
type: Ready
phase: Running
8 changes: 8 additions & 0 deletions test/integration/scheduler_perf/config/churn/pod-default.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
apiVersion: v1
kind: Pod
metadata:
generateName: pod-churn-
spec:
containers:
- image: k8s.gcr.io/pause:3.4.1
name: pause
11 changes: 11 additions & 0 deletions test/integration/scheduler_perf/config/churn/service-default.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
apiVersion: v1
kind: Service
metadata:
generateName: service-churn-
spec:
selector:
app: foo
ports:
- protocol: TCP
port: 8080
targetPort: 8080
26 changes: 26 additions & 0 deletions test/integration/scheduler_perf/config/performance-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -449,3 +449,29 @@
initNodes: 5000
initPods: 2000
measurePods: 5000

- name: SchedulingWithMixedChurn
workloadTemplate:
- opcode: createNodes
countParam: $initNodes
- opcode: churn
mode: recreate
number: 1
templatePaths:
- config/churn/node-default.yaml
- config/pod-high-priority-large-cpu.yaml
- config/churn/service-default.yaml
intervalMilliseconds: 1000
- opcode: createPods
countParam: $measurePods
podTemplatePath: config/pod-default.yaml
collectMetrics: true
workloads:
- name: 1000Nodes
params:
initNodes: 1000
measurePods: 1000
- name: 5000Nodes
params:
initNodes: 5000
measurePods: 2000
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
apiVersion: v1
kind: Pod
metadata:
generateName: pod-h-
spec:
priority: 10
containers:
- image: k8s.gcr.io/pause:3.2
name: pause
ports:
- containerPort: 80
resources:
requests:
cpu: 9
memory: 500Mi
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ func benchmarkScheduling(numExistingPods, minPods int,
//lint:ignore SA3001 Set a minimum for b.N to get more meaningful results
b.N = minPods
}
finalFunc, podInformer, clientset := mustSetupScheduler()
finalFunc, podInformer, clientset, _ := mustSetupScheduler()
defer finalFunc()

nodePreparer := framework.NewIntegrationTestNodePreparer(
Expand Down
194 changes: 190 additions & 4 deletions test/integration/scheduler_perf/scheduler_perf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,25 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"math"
"strings"
"sync"
"testing"
"time"

v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
cacheddiscovery "k8s.io/client-go/discovery/cached"
"k8s.io/client-go/dynamic"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/restmapper"
"k8s.io/component-base/featuregate"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/klog/v2"
Expand All @@ -43,7 +52,15 @@ const (
configFile = "config/performance-config.yaml"
createNodesOpcode = "createNodes"
createPodsOpcode = "createPods"
churnOpcode = "churn"
barrierOpcode = "barrier"

// Two modes supported in "churn" operator.

// Recreate creates a number of API objects and then delete them, and repeat the iteration.
Recreate = "recreate"
// Create continuously create API objects without deleting them.
Create = "create"
)

var (
Expand Down Expand Up @@ -90,7 +107,7 @@ func (tc *testCase) collectsMetrics() bool {

// workload is a subtest under a testCase that tests the scheduler performance
// for a certain ordering of ops. The set of nodes created and pods scheduled
// in a workload may be heterogenous.
// in a workload may be heterogeneous.
type workload struct {
// Name of the workload.
Name string
Expand All @@ -109,6 +126,7 @@ func (op *op) UnmarshalJSON(b []byte) error {
possibleOps := []realOp{
&createNodesOp{},
&createPodsOp{},
&churnOp{},
&barrierOp{},
// TODO(#93793): add a sleep timer op to simulate waiting?
// TODO(#94601): add a delete nodes op to simulate scaling behaviour?
Expand Down Expand Up @@ -252,6 +270,55 @@ func (cpo createPodsOp) patchParams(w *workload) (realOp, error) {
return &cpo, (&cpo).isValid(false)
}

// churnOp defines an op where services are created as a part of a workload.
type churnOp struct {
// Must be "churnOp".
Opcode string
// Value must be one of the followings:
// - recreate. In this mode, API objects will be created for N cycles, and then
// deleted in the next N cycles. N is specified by the "Number" field.
// - create. In this mode, API objects will be created (without deletion) until
// reaching a threshold - which is specified by the "Number" field.
Mode string
// Maximum number of API objects to be created.
// Defaults to 0, which means unlimited.
Number int
// Intervals of churning. Defaults to 500 millisecond.
IntervalMilliseconds int64
// Namespace the churning objects should be created in. Optional, defaults to a unique
// namespace of the format "namespace-<number>".
Namespace *string
// Path of API spec files.
TemplatePaths []string
}

func (co *churnOp) isValid(_ bool) error {
if co.Opcode != churnOpcode {
return fmt.Errorf("invalid opcode")
}
if co.Mode != Recreate && co.Mode != Create {
return fmt.Errorf("invalid mode: %v. must be one of %v", co.Mode, []string{Recreate, Create})
}
if co.Number < 0 {
return fmt.Errorf("number (%v) cannot be negative", co.Number)
}
if co.Mode == Recreate && co.Number == 0 {
return fmt.Errorf("number cannot be 0 when mode is %v", Recreate)
}
if len(co.TemplatePaths) == 0 {
return fmt.Errorf("at least one template spec file needs to be specified")
}
return nil
}

func (*churnOp) collectsMetrics() bool {
return false
}

func (co churnOp) patchParams(w *workload) (realOp, error) {
return &co, nil
}

// barrierOp defines an op that can be used to wait until all scheduled pods of
// one or many namespaces have been bound to nodes. This is useful when pods
// were scheduled with SkipWaitToCompletion set to true.
Expand Down Expand Up @@ -309,7 +376,7 @@ func runWorkload(b *testing.B, tc *testCase, w *workload) []DataItem {
// 30 minutes should be plenty enough even for the 5000-node tests.
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute)
defer cancel()
finalFunc, podInformer, clientset := mustSetupScheduler()
finalFunc, podInformer, client, dynClient := mustSetupScheduler()
b.Cleanup(finalFunc)

var mu sync.Mutex
Expand All @@ -329,7 +396,7 @@ func runWorkload(b *testing.B, tc *testCase, w *workload) []DataItem {
}
switch concreteOp := realOp.(type) {
case *createNodesOp:
nodePreparer, err := getNodePreparer(fmt.Sprintf("node-%d-", opIndex), concreteOp, clientset)
nodePreparer, err := getNodePreparer(fmt.Sprintf("node-%d-", opIndex), concreteOp, client)
if err != nil {
b.Fatalf("op %d: %v", opIndex, err)
}
Expand Down Expand Up @@ -359,7 +426,7 @@ func runWorkload(b *testing.B, tc *testCase, w *workload) []DataItem {
go collector.run(collectorCtx)
}
}
if err := createPods(namespace, concreteOp, clientset); err != nil {
if err := createPods(namespace, concreteOp, client); err != nil {
b.Fatalf("op %d: %v", opIndex, err)
}
if concreteOp.SkipWaitToCompletion {
Expand Down Expand Up @@ -387,6 +454,103 @@ func runWorkload(b *testing.B, tc *testCase, w *workload) []DataItem {
mu.Unlock()
}

case *churnOp:
var namespace string
if concreteOp.Namespace != nil {
namespace = *concreteOp.Namespace
} else {
namespace = fmt.Sprintf("namespace-%d", opIndex)
}
restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cacheddiscovery.NewMemCacheClient(client.Discovery()))
// Ensure the namespace exists.
nsObj := &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}}
if _, err := client.CoreV1().Namespaces().Create(ctx, nsObj, metav1.CreateOptions{}); err != nil && !apierrors.IsAlreadyExists(err) {
b.Fatalf("op %d: unable to create namespace %v: %v", opIndex, namespace, err)
}

var churnFns []func(name string) string

for i, path := range concreteOp.TemplatePaths {
unstructuredObj, gvk, err := getUnstructuredFromFile(path)
if err != nil {
b.Fatalf("op %d: unable to parse the %v-th template path: %v", opIndex, i, err)
}
// Obtain GVR.
mapping, err := restMapper.RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
b.Fatalf("op %d: unable to find GVR for %v: %v", opIndex, gvk, err)
}
gvr := mapping.Resource
// Distinguish cluster-scoped with namespaced API objects.
var dynRes dynamic.ResourceInterface
if mapping.Scope.Name() == meta.RESTScopeNameNamespace {
dynRes = dynClient.Resource(gvr).Namespace(namespace)
} else {
dynRes = dynClient.Resource(gvr)
}

churnFns = append(churnFns, func(name string) string {
if name != "" {
dynRes.Delete(ctx, name, metav1.DeleteOptions{})
return ""
}

live, err := dynRes.Create(ctx, unstructuredObj, metav1.CreateOptions{})
if err != nil {
return ""
}
return live.GetName()
})
}

var interval int64 = 500
if concreteOp.IntervalMilliseconds != 0 {
interval = concreteOp.IntervalMilliseconds
}
ticker := time.NewTicker(time.Duration(interval) * time.Millisecond)
defer ticker.Stop()

if concreteOp.Mode == Recreate {
go func() {
retVals := make([][]string, len(churnFns))
// For each churn function, instantiate a slice of strings with length "concreteOp.Number".
for i := range retVals {
retVals[i] = make([]string, concreteOp.Number)
}

count := 0
for {
select {
case <-ticker.C:
for i := range churnFns {
retVals[i][count%concreteOp.Number] = churnFns[i](retVals[i][count%concreteOp.Number])
}
count++
case <-ctx.Done():
return
}
}
}()
} else if concreteOp.Mode == Create {
go func() {
count, threshold := 0, concreteOp.Number
if threshold == 0 {
threshold = math.MaxInt32
}
for count < threshold {
select {
case <-ticker.C:
for i := range churnFns {
churnFns[i]("")
}
count++
case <-ctx.Done():
return
}
}
}()
}

case *barrierOp:
for _, namespace := range concreteOp.Namespaces {
if _, ok := numPodsScheduledPerNamespace[namespace]; !ok {
Expand Down Expand Up @@ -525,6 +689,28 @@ func getSpecFromFile(path *string, spec interface{}) error {
return yaml.UnmarshalStrict(bytes, spec)
}

func getUnstructuredFromFile(path string) (*unstructured.Unstructured, *schema.GroupVersionKind, error) {
bytes, err := ioutil.ReadFile(path)
if err != nil {
return nil, nil, err
}

bytes, err = yaml.YAMLToJSONStrict(bytes)
if err != nil {
return nil, nil, fmt.Errorf("cannot covert YAML to JSON: %v", err)
}

obj, gvk, err := unstructured.UnstructuredJSONScheme.Decode(bytes, nil, nil)
if err != nil {
return nil, nil, err
}
unstructuredObj, ok := obj.(*unstructured.Unstructured)
if !ok {
return nil, nil, fmt.Errorf("cannot convert spec file in %v to an unstructured obj", path)
}
return unstructuredObj, gvk, nil
}

func getTestCases(path string) ([]*testCase, error) {
testCases := make([]*testCase, 0)
if err := getSpecFromFile(&path, &testCases); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion test/integration/scheduler_perf/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ type testConfig struct {

// getBaseConfig returns baseConfig after initializing number of nodes and pods.
func getBaseConfig(nodes int, pods int) *testConfig {
destroyFunc, podInformer, clientset := mustSetupScheduler()
destroyFunc, podInformer, clientset, _ := mustSetupScheduler()
return &testConfig{
clientset: clientset,
destroyFunc: destroyFunc,
Expand Down
Loading

0 comments on commit 823fa75

Please sign in to comment.