Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added a concurrency policy option for scheduledScan CRD #1749

Merged
merged 5 commits into from
Jul 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions operator/apis/execution/v1/scheduledscan_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,20 @@ type ScheduledScanSpec struct {
// +kubebuilder:validation:Optional
// +kubebuilder:validation:Minimum=0
SuccessfulJobsHistoryLimit *int32 `json:"successfulJobsHistoryLimit,omitempty"`

// FailedJobsHistoryLimit determines how many failed past Scans will be kept until the oldest one will be deleted, defaults to 3. When set to 0, Scans will be deleted directly after failure
// +kubebuilder:validation:Optional
// +kubebuilder:validation:Minimum=0
FailedJobsHistoryLimit *int32 `json:"failedJobsHistoryLimit,omitempty"`

// Specifies how to treat concurrent executions of a Job.
// Valid values are:
// - "Allow" (default): allows CronJobs to run concurrently;
// - "Forbid": forbids concurrent runs, skipping next run if previous run hasn't finished yet;
// - "Replace": cancels currently running job and replaces it with a new one
// +optional
ConcurrencyPolicy ConcurrencyPolicy `json:"concurrencyPolicy,omitempty"`

// ScanSpec describes the scan which should be started regularly
ScanSpec *ScanSpec `json:"scanSpec"`

Expand All @@ -43,6 +52,25 @@ type ScheduledScanSpec struct {
RetriggerOnScanTypeChange bool `json:"retriggerOnScanTypeChange,omitempty"`
}

// ConcurrencyPolicy describes how the job will be handled.
// Only one of the following concurrent policies may be specified.
// If none of the following policies is specified, the default one
// is AllowConcurrent.
// +kubebuilder:validation:Enum=Allow;Forbid;Replace
type ConcurrencyPolicy string

const (
// AllowConcurrent allows CronJobs to run concurrently.
AllowConcurrent ConcurrencyPolicy = "Allow"

// ForbidConcurrent forbids concurrent runs, skipping next run if previous
// hasn't finished yet.
ForbidConcurrent ConcurrencyPolicy = "Forbid"

// ReplaceConcurrent cancels currently running job and replaces it with a new one.
ReplaceConcurrent ConcurrencyPolicy = "Replace"
)

// ScheduledScanStatus defines the observed state of ScheduledScan
type ScheduledScanStatus struct {
// INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,17 @@ spec:
spec:
description: ScheduledScanSpec defines the desired state of ScheduledScan
properties:
concurrencyPolicy:
description: 'Specifies how to treat concurrent executions of a Job.
Valid values are: - "Allow" (default): allows CronJobs to run concurrently;
- "Forbid": forbids concurrent runs, skipping next run if previous
run hasn''t finished yet; - "Replace": cancels currently running
job and replaces it with a new one'
enum:
- Allow
- Forbid
- Replace
type: string
failedJobsHistoryLimit:
description: FailedJobsHistoryLimit determines how many failed past
Scans will be kept until the oldest one will be deleted, defaults
Expand Down
6 changes: 3 additions & 3 deletions operator/controllers/execution/scantype_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ var _ = Describe("ScanType controller", func() {

createNamespace(ctx, namespace)
createScanType(ctx, namespace)
scheduledScan := createScheduledScanWithInterval(ctx, namespace, true)
scheduledScan := createScheduledScanWithInterval(ctx, namespace, true, 42*time.Hour, executionv1.ForbidConcurrent)

// ensure that the ScheduledScan has been triggered
waitForScheduledScanToBeTriggered(ctx, namespace, timeout)
Expand Down Expand Up @@ -77,7 +77,7 @@ var _ = Describe("ScanType controller", func() {

createNamespace(ctx, namespace)
createScanType(ctx, namespace)
scheduledScan := createScheduledScanWithInterval(ctx, namespace, true)
scheduledScan := createScheduledScanWithInterval(ctx, namespace, true, 42*time.Hour, executionv1.ForbidConcurrent)

// ensure that the ScheduledScan has been triggered
waitForScheduledScanToBeTriggered(ctx, namespace, timeout)
Expand Down Expand Up @@ -107,7 +107,7 @@ var _ = Describe("ScanType controller", func() {

createNamespace(ctx, namespace)
createScanType(ctx, namespace)
scheduledScan := createScheduledScanWithInterval(ctx, namespace, false)
scheduledScan := createScheduledScanWithInterval(ctx, namespace, false, 42*time.Hour, executionv1.ForbidConcurrent)

// ensure that the ScheduledScan has been triggered
waitForScheduledScanToBeTriggered(ctx, namespace, timeout)
Expand Down
38 changes: 38 additions & 0 deletions operator/controllers/execution/scheduledscan_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,30 @@ func (r *ScheduledScanReconciler) Reconcile(ctx context.Context, req ctrl.Reques
return ctrl.Result{}, err
}

InProgressScans := getScansInProgress(childScans.Items)

// check if it is time to start the next Scan
if !time.Now().Before(nextSchedule) {
// check concurrency policy
if scheduledScan.Spec.ConcurrencyPolicy == executionv1.ForbidConcurrent && len(InProgressScans) > 0 {
log.V(8).Info("concurrency policy blocks concurrent runs, skipping", "num active", len(InProgressScans))
r.Recorder.Event(&scheduledScan, "Normal", "ConcurrencyPolicyBlocks", "Concurrency policy blocks concurrent runs, skipping")
return ctrl.Result{RequeueAfter: 1 * time.Minute}, nil
}

// ...or instruct us to replace existing ones...
if scheduledScan.Spec.ConcurrencyPolicy == executionv1.ReplaceConcurrent {
for _, scan := range InProgressScans {
// we don't care if the job was already deleted
if err := r.Delete(context.Background(), &scan, client.PropagationPolicy(metav1.DeletePropagationBackground)); (err) != nil {
log.Error(err, "unable to delete active job", "job", scan)
r.Recorder.Event(&scheduledScan, "Warning", "JobDeletionFailed", fmt.Sprintf("Unable to delete active job: %s, error: %v", scan.Name, err))
return ctrl.Result{}, err
}
r.Recorder.Event(&scheduledScan, "Normal", "JobReplaced", fmt.Sprintf("Active job %s replaced", scan.Name))
}
}

if scheduledScan.Spec.RetriggerOnScanTypeChange == true {
// generate hash for current state of the configured ScanType
var scanType executionv1.ScanType
Expand Down Expand Up @@ -237,6 +259,22 @@ func getScansWithState(scans []executionv1.Scan, state string) []executionv1.Sca
return newScans
}

// Returns a sorted list of scans in progress
func getScansInProgress(scans []executionv1.Scan) []executionv1.Scan {
// Get a sorted list of scans.
var newScans []executionv1.Scan
for _, scan := range scans {
if scan.Status.State != "Done" && scan.Status.State != "Errored" {
newScans = append(newScans, scan)
}
}
sort.Slice(newScans, func(i, j int) bool {
return newScans[i].ObjectMeta.CreationTimestamp.Before(&newScans[j].ObjectMeta.CreationTimestamp)
})

return newScans
}

// DeleteOldScans when exceeding the history limit
func (r *ScheduledScanReconciler) deleteOldScans(scans []executionv1.Scan, maxCount int32) error {
for i, scan := range scans {
Expand Down
71 changes: 70 additions & 1 deletion operator/controllers/execution/scheduledscan_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package controllers

import (
"context"
"time"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
Expand Down Expand Up @@ -75,7 +76,7 @@ var _ = Describe("ScheduledScan controller", func() {

createNamespace(ctx, namespace)
createScanType(ctx, namespace)
scheduledScan := createScheduledScanWithInterval(ctx, namespace, true)
scheduledScan := createScheduledScanWithInterval(ctx, namespace, true, 42*time.Hour, executionv1.ForbidConcurrent)

var scanlist executionv1.ScanList
// ensure that the ScheduledScan has been triggered
Expand Down Expand Up @@ -108,4 +109,72 @@ var _ = Describe("ScheduledScan controller", func() {
Expect(scheduledScan.Status.Findings.FindingCategories).Should(Equal(map[string]uint64{"Open Port": 42}))
})
})

Context("A Scan is triggred due to a Scheduled Scan with a ConcurrencyPolicy", func() {
It("A second scheduled scan should not start before the first one is finished if the concurency policy is set to ForbidConcurrent", func() {

ctx := context.Background()
namespace := "scheduled-scan-triggerd-concurrency-forbid-test"
createNamespace(ctx, namespace)
createScanType(ctx, namespace)
createScheduledScanWithInterval(ctx, namespace, true, 1*time.Second, executionv1.ForbidConcurrent)

var scanlist executionv1.ScanList
// ensure that the ScheduledScan has been triggered
waitForScheduledScanToBeTriggered(ctx, namespace, timeout)
k8sClient.List(ctx, &scanlist, client.InNamespace(namespace))

Expect(scanlist.Items).Should(HaveLen(1))
time.Sleep(2 * time.Second)
// make sure that no second scan has been triggered
k8sClient.List(ctx, &scanlist, client.InNamespace(namespace))
Expect(scanlist.Items).Should(HaveLen(1))

})

It("A second scheduled scan should start before the first one is finished if the concurency policy is set to AllowConcurrent", func() {

ctx := context.Background()
namespace := "scheduled-scan-triggerd-concurrency-allow-test"
createNamespace(ctx, namespace)
createScanType(ctx, namespace)
createScheduledScanWithInterval(ctx, namespace, true, 1*time.Second, executionv1.AllowConcurrent)

var scanlist executionv1.ScanList
// ensure that the ScheduledScan has been triggered
waitForScheduledScanToBeTriggered(ctx, namespace, timeout)
k8sClient.List(ctx, &scanlist, client.InNamespace(namespace))
Expect(scanlist.Items).ShouldNot(BeEmpty())

time.Sleep(2 * time.Second)

// make sure more than one scan has been triggered
k8sClient.List(ctx, &scanlist, client.InNamespace(namespace))
Expect(scanlist.Items).ShouldNot(HaveLen(1))
})

It("A second scheduled scan should replace the first one, before the first one is finished if the concurency policy is set to ReplaceConcurrent", func() {

ctx := context.Background()
namespace := "scheduled-scan-triggerd-concurrency-replace-test"
createNamespace(ctx, namespace)
createScanType(ctx, namespace)
createScheduledScanWithInterval(ctx, namespace, true, 1*time.Second, executionv1.ReplaceConcurrent)

var scanlist executionv1.ScanList
// ensure that the ScheduledScan has been triggered
waitForScheduledScanToBeTriggered(ctx, namespace, timeout)
k8sClient.List(ctx, &scanlist, client.InNamespace(namespace))
Expect(scanlist.Items).Should(HaveLen(1))
firstScanName := scanlist.Items[0].Name

time.Sleep(2 * time.Second)

// make sure the first scan has been replaced
k8sClient.List(ctx, &scanlist, client.InNamespace(namespace))
secondScanName := scanlist.Items[0].Name
Expect(scanlist.Items).Should(HaveLen(1))
Expect(firstScanName).ShouldNot(Equal(secondScanName))
})
})
})
5 changes: 3 additions & 2 deletions operator/controllers/execution/test_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func createScanType(ctx context.Context, namespace string) {
Expect(k8sClient.Create(ctx, scanType)).Should(Succeed())
}

func createScheduledScanWithInterval(ctx context.Context, namespace string, retriggerOnScanTypeChange bool) executionv1.ScheduledScan {
func createScheduledScanWithInterval(ctx context.Context, namespace string, retriggerOnScanTypeChange bool, interval time.Duration, concurrencyPolicy executionv1.ConcurrencyPolicy) executionv1.ScheduledScan {
namespaceLocalResourceMode := executionv1.NamespaceLocal

scheduledScan := executionv1.ScheduledScan{
Expand All @@ -75,8 +75,9 @@ func createScheduledScanWithInterval(ctx context.Context, namespace string, retr
Namespace: namespace,
},
Spec: executionv1.ScheduledScanSpec{
Interval: metav1.Duration{Duration: 42 * time.Hour},
Interval: metav1.Duration{Duration: interval},
RetriggerOnScanTypeChange: retriggerOnScanTypeChange,
ConcurrencyPolicy: concurrencyPolicy,
ScanSpec: &executionv1.ScanSpec{
ScanType: "nmap",
ResourceMode: &namespaceLocalResourceMode,
Expand Down