Skip to content

Commit

Permalink
Add new Datadog scaler (#2354)
Browse files Browse the repository at this point in the history
Signed-off-by: Ara Pulido <ara.pulido@datadoghq.com>
  • Loading branch information
arapulido authored Jan 12, 2022
1 parent 92c75bc commit aa20943
Show file tree
Hide file tree
Showing 7 changed files with 694 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

- Add New Relic Scaler ([#2387](https://github.com/kedacore/keda/pull/2387))
- Add ActiveMQ Scaler ([#2305](https://github.com/kedacore/keda/pull/2305))
- Add New Datadog Scaler ([#2354](https://github.com/kedacore/keda/pull/2354))

### Improvements

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/Azure/azure-storage-queue-go v0.0.0-20191125232315-636801874cdd
github.com/Azure/go-autorest/autorest v0.11.22
github.com/Azure/go-autorest/autorest/azure/auth v0.5.9
github.com/DataDog/datadog-api-client-go v1.6.0 // indirect
github.com/Huawei/gophercloud v1.0.21
github.com/Shopify/sarama v1.30.0
github.com/aws/aws-sdk-go v1.42.16
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBp
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/DataDog/datadog-api-client-go v1.6.0 h1:ccMzM4vw37/8ww9VKKydWMrI+xEs0uE13O5mkG9Ny/8=
github.com/DataDog/datadog-api-client-go v1.6.0/go.mod h1:QzaQF1cDO1/BIQG1fz14VrY+6RECUGkiwzDCtVbfP5c=
github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ=
github.com/Huawei/gophercloud v1.0.21 h1:HhtzZzRGZiVmLypqHlXrGAcdC1TJW99FLewfPSVktpY=
github.com/Huawei/gophercloud v1.0.21/go.mod h1:TUtAO2PE+Nj7/QdfUXbhi5Xu0uFKVccyukPA7UCxD9w=
Expand Down
306 changes: 306 additions & 0 deletions pkg/scalers/datadog_scaler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,306 @@
package scalers

import (
"context"
"fmt"
"strconv"
"strings"
"time"

"k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log"

datadog "github.com/DataDog/datadog-api-client-go/api/v1/datadog"
kedautil "github.com/kedacore/keda/v2/pkg/util"
)

type datadogScaler struct {
metadata *datadogMetadata
apiClient *datadog.APIClient
}

type valueType int

const (
average = iota
global
)

type datadogMetadata struct {
apiKey string
appKey string
datadogSite string
query string
queryValue int
vType valueType
metricName string
age int
}

var datadogLog = logf.Log.WithName("datadog_scaler")

// NewDatadogScaler creates a new Datadog scaler
func NewDatadogScaler(ctx context.Context, config *ScalerConfig) (Scaler, error) {
meta, err := parseDatadogMetadata(config)
if err != nil {
return nil, fmt.Errorf("error parsing Datadog metadata: %s", err)
}

apiClient, err := newDatadogConnection(ctx, meta, config)
if err != nil {
return nil, fmt.Errorf("error establishing Datadog connection: %s", err)
}
return &datadogScaler{
metadata: meta,
apiClient: apiClient,
}, nil
}

func parseDatadogMetadata(config *ScalerConfig) (*datadogMetadata, error) {
meta := datadogMetadata{}

if val, ok := config.TriggerMetadata["query"]; ok {
meta.query = val
} else {
return nil, fmt.Errorf("no query given")
}

if val, ok := config.TriggerMetadata["queryValue"]; ok {
queryValue, err := strconv.Atoi(val)
if err != nil {
return nil, fmt.Errorf("queryValue parsing error %s", err.Error())
}
meta.queryValue = queryValue
} else {
return nil, fmt.Errorf("no queryValue given")
}

if val, ok := config.TriggerMetadata["age"]; ok {
age, err := strconv.Atoi(val)
if err != nil {
return nil, fmt.Errorf("age parsing error %s", err.Error())
}
meta.age = age
} else {
meta.age = 90 // Default window 90 seconds
}

// For all the points in a given window, we take the rollup to the window size
rollup := fmt.Sprintf(".rollup(avg, %d)", meta.age)
meta.query += rollup

if val, ok := config.TriggerMetadata["type"]; ok {
val = strings.ToLower(val)
switch val {
case "average":
meta.vType = average
case "global":
meta.vType = global
default:
return nil, fmt.Errorf("type has to be global or average")
}
} else {
meta.vType = average // Default to average between pods
}

if val, ok := config.AuthParams["apiKey"]; ok {
meta.apiKey = val
} else {
return nil, fmt.Errorf("no api key given")
}

if val, ok := config.AuthParams["appKey"]; ok {
meta.appKey = val
} else {
return nil, fmt.Errorf("no app key given")
}

siteVal := "datadoghq.com"

if val, ok := config.AuthParams["datadogSite"]; ok && val != "" {
siteVal = val
}

meta.datadogSite = siteVal

metricName := meta.query[0:strings.Index(meta.query, "{")]
meta.metricName = GenerateMetricNameWithIndex(config.ScalerIndex, kedautil.NormalizeString(fmt.Sprintf("datadog-%s", metricName)))

return &meta, nil
}

// newDatddogConnection tests a connection to the Datadog API
func newDatadogConnection(ctx context.Context, meta *datadogMetadata, config *ScalerConfig) (*datadog.APIClient, error) {
ctx = context.WithValue(
ctx,
datadog.ContextAPIKeys,
map[string]datadog.APIKey{
"apiKeyAuth": {
Key: meta.apiKey,
},
"appKeyAuth": {
Key: meta.appKey,
},
},
)

ctx = context.WithValue(ctx,
datadog.ContextServerVariables,
map[string]string{
"site": meta.datadogSite,
})

configuration := datadog.NewConfiguration()
configuration.HTTPClient = kedautil.CreateHTTPClient(config.GlobalHTTPTimeout, false)
apiClient := datadog.NewAPIClient(configuration)

_, _, err := apiClient.AuthenticationApi.Validate(ctx)
if err != nil {
return nil, fmt.Errorf("error connecting to Datadog API endpoint: %v", err)
}

return apiClient, nil
}

// No need to close connections
func (s *datadogScaler) Close(context.Context) error {
return nil
}

// IsActive returns true if we are able to get metrics from Datadog
func (s *datadogScaler) IsActive(ctx context.Context) (bool, error) {
ctx = context.WithValue(
ctx,
datadog.ContextAPIKeys,
map[string]datadog.APIKey{
"apiKeyAuth": {
Key: s.metadata.apiKey,
},
"appKeyAuth": {
Key: s.metadata.appKey,
},
},
)

ctx = context.WithValue(ctx,
datadog.ContextServerVariables,
map[string]string{
"site": s.metadata.datadogSite,
})

resp, _, err := s.apiClient.MetricsApi.QueryMetrics(ctx, time.Now().Unix()-int64(s.metadata.age), time.Now().Unix(), s.metadata.query)

if err != nil {
return false, err
}

series := resp.GetSeries()

if len(series) == 0 {
return false, nil
}

points := series[0].GetPointlist()

if len(points) == 0 {
return false, nil
}

return true, nil
}

// getQueryResult returns result of the scaler query
func (s *datadogScaler) getQueryResult(ctx context.Context) (int, error) {
ctx = context.WithValue(
ctx,
datadog.ContextAPIKeys,
map[string]datadog.APIKey{
"apiKeyAuth": {
Key: s.metadata.apiKey,
},
"appKeyAuth": {
Key: s.metadata.appKey,
},
},
)

ctx = context.WithValue(ctx,
datadog.ContextServerVariables,
map[string]string{
"site": s.metadata.datadogSite,
})

resp, _, err := s.apiClient.MetricsApi.QueryMetrics(ctx, time.Now().Unix()-int64(s.metadata.age), time.Now().Unix(), s.metadata.query)
if err != nil {
return -1, fmt.Errorf("error when retrieving Datadog metrics: %s", err)
}

series := resp.GetSeries()

if len(series) == 0 {
return 0, fmt.Errorf("no Datadog metrics returned")
}

points := series[0].GetPointlist()

if len(points) == 0 || len(points[0]) < 2 {
return 0, fmt.Errorf("no Datadog metrics returned")
}

return int(*points[0][1]), nil
}

// GetMetricSpecForScaling returns the MetricSpec for the Horizontal Pod Autoscaler
func (s *datadogScaler) GetMetricSpecForScaling(context.Context) []v2beta2.MetricSpec {
externalMetric := new(v2beta2.ExternalMetricSource)

targetQueryValue := resource.NewQuantity(int64(s.metadata.queryValue), resource.DecimalSI)

switch s.metadata.vType {
case average:
externalMetric = &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Name: s.metadata.metricName,
},
Target: v2beta2.MetricTarget{
Type: v2beta2.AverageValueMetricType,
AverageValue: targetQueryValue,
},
}
case global:
externalMetric = &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Name: s.metadata.metricName,
},
Target: v2beta2.MetricTarget{
Type: v2beta2.ValueMetricType,
Value: targetQueryValue,
},
}
}
metricSpec := v2beta2.MetricSpec{
External: externalMetric, Type: externalMetricType,
}
return []v2beta2.MetricSpec{metricSpec}
}

// GetMetrics returns value for a supported metric and an error if there is a problem getting the metric
func (s *datadogScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
num, err := s.getQueryResult(ctx)
if err != nil {
datadogLog.Error(err, "error getting metrics from Datadog")
return []external_metrics.ExternalMetricValue{}, fmt.Errorf("error getting metrics from Datadog: %s", err)
}

metric := external_metrics.ExternalMetricValue{
MetricName: s.metadata.metricName,
Value: *resource.NewQuantity(int64(num), resource.DecimalSI),
Timestamp: metav1.Now(),
}

return append([]external_metrics.ExternalMetricValue{}, metric), nil
}
Loading

0 comments on commit aa20943

Please sign in to comment.