Skip to content

Commit

Permalink
Emit kubernetes events from KEDA (#1523)
Browse files Browse the repository at this point in the history
* Emit kubernetes events from KEDA

Signed-off-by: Ahmed ElSayed <ahmels@microsoft.com>

* CR comments

Signed-off-by: Ahmed ElSayed <ahmels@microsoft.com>

* Fix CI errors

Signed-off-by: Ahmed ElSayed <ahmels@microsoft.com>

* goimports

Signed-off-by: Ahmed ElSayed <ahmels@microsoft.com>

* Code review comments

Signed-off-by: Ahmed ElSayed <ahmels@microsoft.com>

* Fix CHANGELOG.md

Signed-off-by: Ahmed ElSayed <ahmels@microsoft.com>
ahmelsayed authored Feb 6, 2021
1 parent df9ee68 commit aac70e6
Showing 15 changed files with 218 additions and 9 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -20,6 +20,7 @@
### New

- TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX))
- Emit Kubernetes Events on KEDA events ([#1523](https://github.com/kedacore/keda/pull/1523))

### Improvements

7 changes: 6 additions & 1 deletion adapter/main.go
Original file line number Diff line number Diff line change
@@ -8,6 +8,9 @@ import (
"strconv"
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/record"

appsv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/util/wait"
openapinamer "k8s.io/apiserver/pkg/endpoints/openapi"
@@ -70,7 +73,9 @@ func (a *Adapter) makeProvider(globalHTTPTimeout time.Duration) (provider.Metric
return nil, fmt.Errorf("unable to construct new client (%s)", err)
}

handler := scaling.NewScaleHandler(kubeclient, nil, scheme, globalHTTPTimeout)
broadcaster := record.NewBroadcaster()
recorder := broadcaster.NewRecorder(scheme, corev1.EventSource{Component: "keda-metrics-adapter"})
handler := scaling.NewScaleHandler(kubeclient, nil, scheme, globalHTTPTimeout, recorder)

namespace, err := getWatchNamespace()
if err != nil {
8 changes: 8 additions & 0 deletions api/v1alpha1/condition_types.go
Original file line number Diff line number Diff line change
@@ -115,6 +115,14 @@ func (c *Conditions) GetActiveCondition() Condition {
return c.getCondition(ConditionActive)
}

// GetReadyCondition returns Condition of type Ready
func (c *Conditions) GetReadyCondition() Condition {
if *c == nil {
c = GetInitializedConditions()
}
return c.getCondition(ConditionReady)
}

func (c Conditions) getCondition(conditionType ConditionType) Condition {
for i := range c {
if c[i].Type == conditionType {
13 changes: 12 additions & 1 deletion controllers/scaledjob_controller.go
Original file line number Diff line number Diff line change
@@ -5,6 +5,11 @@ import (
"fmt"
"time"

"github.com/kedacore/keda/v2/pkg/eventreason"
corev1 "k8s.io/api/core/v1"

"k8s.io/client-go/tools/record"

"github.com/go-logr/logr"
batchv1 "k8s.io/api/batch/v1"
"k8s.io/apimachinery/pkg/api/errors"
@@ -29,12 +34,13 @@ type ScaledJobReconciler struct {
Log logr.Logger
Scheme *runtime.Scheme
GlobalHTTPTimeout time.Duration
Recorder record.EventRecorder
scaleHandler scaling.ScaleHandler
}

// SetupWithManager initializes the ScaledJobReconciler instance and starts a new controller managed by the passed Manager instance.
func (r *ScaledJobReconciler) SetupWithManager(mgr ctrl.Manager) error {
r.scaleHandler = scaling.NewScaleHandler(mgr.GetClient(), nil, mgr.GetScheme(), r.GlobalHTTPTimeout)
r.scaleHandler = scaling.NewScaleHandler(mgr.GetClient(), nil, mgr.GetScheme(), r.GlobalHTTPTimeout, mgr.GetEventRecorderFor("scale-handler"))

return ctrl.NewControllerManagedBy(mgr).
// Ignore updates to ScaledJob Status (in this case metadata.Generation does not change)
@@ -84,7 +90,12 @@ func (r *ScaledJobReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
reqLogger.Error(err, msg)
conditions.SetReadyCondition(metav1.ConditionFalse, "ScaledJobCheckFailed", msg)
conditions.SetActiveCondition(metav1.ConditionUnknown, "UnknownState", "ScaledJob check failed")
r.Recorder.Event(scaledJob, corev1.EventTypeWarning, eventreason.ScaledJobCheckFailed, msg)
} else {
wasReady := conditions.GetReadyCondition()
if wasReady.IsFalse() || wasReady.IsUnknown() {
r.Recorder.Event(scaledJob, corev1.EventTypeNormal, eventreason.ScaledJobReady, "ScaledJob is ready for scaling")
}
reqLogger.V(1).Info(msg)
conditions.SetReadyCondition(metav1.ConditionTrue, "ScaledJobReady", msg)
}
4 changes: 4 additions & 0 deletions controllers/scaledjob_finalizer.go
Original file line number Diff line number Diff line change
@@ -3,6 +3,9 @@ package controllers
import (
"context"

"github.com/kedacore/keda/v2/pkg/eventreason"
corev1 "k8s.io/api/core/v1"

"github.com/go-logr/logr"

kedav1alpha1 "github.com/kedacore/keda/v2/api/v1alpha1"
@@ -33,6 +36,7 @@ func (r *ScaledJobReconciler) finalizeScaledJob(logger logr.Logger, scaledJob *k
}

logger.Info("Successfully finalized ScaledJob")
r.Recorder.Event(scaledJob, corev1.EventTypeNormal, eventreason.ScaledJobDeleted, "ScaledJob was deleted")
return nil
}

14 changes: 13 additions & 1 deletion controllers/scaledobject_controller.go
Original file line number Diff line number Diff line change
@@ -6,6 +6,10 @@ import (
"sync"
"time"

"github.com/kedacore/keda/v2/pkg/eventreason"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/record"

"github.com/go-logr/logr"
autoscalingv1 "k8s.io/api/autoscaling/v1"
autoscalingv2beta2 "k8s.io/api/autoscaling/v2beta2"
@@ -46,6 +50,7 @@ type ScaledObjectReconciler struct {
Client client.Client
Scheme *runtime.Scheme
GlobalHTTPTimeout time.Duration
Recorder record.EventRecorder

scaleClient *scale.ScalesGetter
restMapper meta.RESTMapper
@@ -91,7 +96,7 @@ func (r *ScaledObjectReconciler) SetupWithManager(mgr ctrl.Manager) error {
// Init the rest of ScaledObjectReconciler
r.restMapper = mgr.GetRESTMapper()
r.scaledObjectsGenerations = &sync.Map{}
r.scaleHandler = scaling.NewScaleHandler(mgr.GetClient(), r.scaleClient, mgr.GetScheme(), r.GlobalHTTPTimeout)
r.scaleHandler = scaling.NewScaleHandler(mgr.GetClient(), r.scaleClient, mgr.GetScheme(), r.GlobalHTTPTimeout, r.Recorder)

// Start controller
return ctrl.NewControllerManagedBy(mgr).
@@ -159,13 +164,20 @@ func (r *ScaledObjectReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error
reqLogger.Error(err, msg)
conditions.SetReadyCondition(metav1.ConditionFalse, "ScaledObjectCheckFailed", msg)
conditions.SetActiveCondition(metav1.ConditionUnknown, "UnkownState", "ScaledObject check failed")
r.Recorder.Event(scaledObject, corev1.EventTypeWarning, eventreason.ScaledObjectCheckFailed, msg)
} else {
wasReady := conditions.GetReadyCondition()
if wasReady.IsFalse() || wasReady.IsUnknown() {
r.Recorder.Event(scaledObject, corev1.EventTypeNormal, eventreason.ScaledObjectReady, "ScaledObject is ready for scaling")
}
reqLogger.V(1).Info(msg)
conditions.SetReadyCondition(metav1.ConditionTrue, "ScaledObjectReady", msg)
}

if err := kedacontrollerutil.SetStatusConditions(r.Client, reqLogger, scaledObject, &conditions); err != nil {
return ctrl.Result{}, err
}

return ctrl.Result{}, err
}

4 changes: 4 additions & 0 deletions controllers/scaledobject_finalizer.go
Original file line number Diff line number Diff line change
@@ -3,6 +3,9 @@ package controllers
import (
"context"

"github.com/kedacore/keda/v2/pkg/eventreason"
corev1 "k8s.io/api/core/v1"

"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -54,6 +57,7 @@ func (r *ScaledObjectReconciler) finalizeScaledObject(logger logr.Logger, scaled
}

logger.Info("Successfully finalized ScaledObject")
r.Recorder.Event(scaledObject, corev1.EventTypeNormal, eventreason.ScaledObjectDeleted, "ScaledObject was deleted")
return nil
}

58 changes: 58 additions & 0 deletions controllers/triggerauthentication_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package controllers

import (
"context"

"github.com/go-logr/logr"
kedav1alpha1 "github.com/kedacore/keda/v2/api/v1alpha1"
"github.com/kedacore/keda/v2/pkg/eventreason"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)

// +kubebuilder:rbac:groups=keda.sh,resources=triggerauthentications;triggerauthentications/status,verbs="*"

// TriggerAuthenticationReconciler reconciles a TriggerAuthentication object
type TriggerAuthenticationReconciler struct {
Client client.Client
Log logr.Logger
Recorder record.EventRecorder
}

// Reconcile performs reconciliation on the identified TriggerAuthentication resource based on the request information passed, returns the result and an error (if any).
func (r *TriggerAuthenticationReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
reqLogger := r.Log.WithValues("TriggerAuthentication.Namespace", req.Namespace, "TriggerAuthentication.Name", req.Name)

triggerAuthentication := &kedav1alpha1.TriggerAuthentication{}
err := r.Client.Get(context.TODO(), req.NamespacedName, triggerAuthentication)
if err != nil {
if errors.IsNotFound(err) {
return ctrl.Result{}, nil
}
reqLogger.Error(err, "Failed ot get TriggerAuthentication")
return ctrl.Result{}, err
}

if triggerAuthentication.GetDeletionTimestamp() != nil {
r.Recorder.Event(triggerAuthentication, corev1.EventTypeNormal, eventreason.TriggerAuthenticationDeleted, "TriggerAuthentication was deleted")
return ctrl.Result{}, nil
}

if triggerAuthentication.ObjectMeta.Generation == 1 {
r.Recorder.Event(triggerAuthentication, corev1.EventTypeNormal, eventreason.TriggerAuthenticationAdded, "New TriggerAuthentication configured")
}

return ctrl.Result{}, nil
}

// SetupWithManager initializes the TriggerAuthenticationReconciler instance and starts a new controller managed by the passed Manager instance.
func (r *TriggerAuthenticationReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&kedav1alpha1.TriggerAuthentication{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
Complete(r)
}
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -958,6 +958,7 @@ github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfV
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8=
github.com/kedacore/keda v1.5.0 h1:c8xA1Vo3H7rPwFiWUX3CBXnjBSrbYDmUs9iEfDlf4bQ=
github.com/kelseyhightower/envconfig v1.3.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg=
github.com/kelseyhightower/envconfig v1.4.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg=
github.com/kevinburke/ssh_config v0.0.0-20190725054713-01f96b0aa0cd/go.mod h1:CT57kijsi8u/K/BOFA39wgDQJ9CxiF4nAY/ojJ6r6mM=
11 changes: 11 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
@@ -123,12 +123,14 @@ func main() {
}

globalHTTPTimeout := time.Duration(globalHTTPTimeoutMS) * time.Millisecond
eventRecorder := mgr.GetEventRecorderFor("keda-operator")

if err = (&controllers.ScaledObjectReconciler{
Client: mgr.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("ScaledObject"),
Scheme: mgr.GetScheme(),
GlobalHTTPTimeout: globalHTTPTimeout,
Recorder: eventRecorder,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ScaledObject")
os.Exit(1)
@@ -138,10 +140,19 @@ func main() {
Log: ctrl.Log.WithName("controllers").WithName("ScaledJob"),
Scheme: mgr.GetScheme(),
GlobalHTTPTimeout: globalHTTPTimeout,
Recorder: eventRecorder,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ScaledJob")
os.Exit(1)
}
if err = (&controllers.TriggerAuthenticationReconciler{
Client: mgr.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("TriggerAuthentication"),
Recorder: eventRecorder,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "TriggerAuthentication")
os.Exit(1)
}
// +kubebuilder:scaffold:builder

setupLog.Info("Starting manager")
67 changes: 67 additions & 0 deletions pkg/eventreason/eventreason.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
Copyright 2020 The KEDA Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package eventreason

const (
// ScaledObjectReady is for event when a new ScaledObject is ready
ScaledObjectReady = "ScaledObjectReady"

// ScaledJobReady is for event when a new ScaledJob is ready
ScaledJobReady = "ScaledJobReady"

// ScaledObjectCheckFailed is for event when ScaledObject validation check fails
ScaledObjectCheckFailed = "ScaledObjectCheckFailed"

// ScaledJobCheckFailed is for event when ScaledJob validation check fails
ScaledJobCheckFailed = "ScaledJobCheckFailed"

// ScaledObjectDeleted is for event when ScaledObject is deleted
ScaledObjectDeleted = "ScaledObjectDeleted"

// ScaledJobDeleted is for event when ScaledJob is deleted
ScaledJobDeleted = "ScaledJobDeleted"

// KEDAScalersStarted is for event when scalers watch started for ScaledObject or ScaledJob
KEDAScalersStarted = "KEDAScalersStarted"

// KEDAScalersStopped is for event when scalers watch was stopped for ScaledObject or ScaledJob
KEDAScalersStopped = "KEDAScalersStopped"

// KEDAScalerFailed is for event when a scaler fails for a ScaledJob or a ScaledObject
KEDAScalerFailed = "KEDAScalerFailed"

// KEDAScaleTargetActivated is for event when the scale target of ScaledObject was activated
KEDAScaleTargetActivated = "KEDAScaleTargetActivated"

// KEDAScaleTargetDeactivated is for event when the scale target for ScaledObject was deactivated
KEDAScaleTargetDeactivated = "KEDAScaleTargetDeactivated"

// KEDAScaleTargetActivationFailed is for event when the activation the scale target for ScaledObject fails
KEDAScaleTargetActivationFailed = "KEDAScaleTargetActivationFailed"

// KEDAScaleTargetDeactivationFailed is for event when the deactivation of the scale target for ScaledObject fails
KEDAScaleTargetDeactivationFailed = "KEDAScaleTargetDeactivationFailed"

// KEDAJobsCreated is for event when jobs for ScaledJob are created
KEDAJobsCreated = "KEDAJobsCreated"

// TriggerAuthenticationDeleted is for event when a TriggerAuthentication is deleted
TriggerAuthenticationDeleted = "TriggerAuthenticationDeleted"

// TriggerAuthenticationAdded is for event when a TriggerAuthentication is added
TriggerAuthenticationAdded = "TriggerAuthenticationAdded"
)
6 changes: 5 additions & 1 deletion pkg/scaling/executor/scale_executor.go
Original file line number Diff line number Diff line change
@@ -4,6 +4,8 @@ import (
"context"
"fmt"

"k8s.io/client-go/tools/record"

"github.com/go-logr/logr"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
@@ -30,15 +32,17 @@ type scaleExecutor struct {
scaleClient *scale.ScalesGetter
reconcilerScheme *runtime.Scheme
logger logr.Logger
recorder record.EventRecorder
}

// NewScaleExecutor creates a ScaleExecutor object
func NewScaleExecutor(client client.Client, scaleClient *scale.ScalesGetter, reconcilerScheme *runtime.Scheme) ScaleExecutor {
func NewScaleExecutor(client client.Client, scaleClient *scale.ScalesGetter, reconcilerScheme *runtime.Scheme, recorder record.EventRecorder) ScaleExecutor {
return &scaleExecutor{
client: client,
scaleClient: scaleClient,
reconcilerScheme: reconcilerScheme,
logger: logf.Log.WithName("scaleexecutor"),
recorder: recorder,
}
}

3 changes: 3 additions & 0 deletions pkg/scaling/executor/scale_jobs.go
Original file line number Diff line number Diff line change
@@ -5,6 +5,8 @@ import (
"sort"
"strconv"

"github.com/kedacore/keda/v2/pkg/eventreason"

"github.com/go-logr/logr"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
@@ -108,6 +110,7 @@ func (e *scaleExecutor) createJobs(logger logr.Logger, scaledJob *kedav1alpha1.S
}
}
logger.Info("Created jobs", "Number of jobs", scaleTo)
e.recorder.Eventf(scaledJob, corev1.EventTypeNormal, eventreason.KEDAJobsCreated, "Created %d jobs", scaleTo)
}

func (e *scaleExecutor) isJobFinished(j *batchv1.Job) bool {
Loading
Oops, something went wrong.

0 comments on commit aac70e6

Please sign in to comment.