Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Elasticsearch Scaler based on search template #2311

Merged
merged 25 commits into from
Nov 23, 2021
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
8a1b7b4
adds an elasticsearch scaler based on search template
orphaner Nov 18, 2021
44ba555
chore: proper indent
orphaner Nov 19, 2021
cc4c9f1
chore: fixes lints
orphaner Nov 19, 2021
e8c6415
chore: add changelog entry
orphaner Nov 19, 2021
9d703c2
chore: rename params to parameters
orphaner Nov 19, 2021
7b408fd
chore: fix comment
orphaner Nov 19, 2021
6afb6ca
:chore: trim
orphaner Nov 19, 2021
bef268d
chore: fix imports order
orphaner Nov 19, 2021
4664f21
chore: fix bad if / else / else if
orphaner Nov 19, 2021
0d5042f
chore: fix sort in scale_handler
orphaner Nov 19, 2021
6f35589
chore: fix name for sort-scalers hook
orphaner Nov 19, 2021
4d835d3
chore: trim
orphaner Nov 19, 2021
3108152
chore: fixes after code review
orphaner Nov 22, 2021
8e3b479
chore: fix PR number
orphaner Nov 22, 2021
ca423cb
proper handling of metricName + TU
orphaner Nov 22, 2021
9d5a921
chore: remove useless metadata
orphaner Nov 22, 2021
5dd7818
chore: increase timeout again
orphaner Nov 22, 2021
62512f2
chore: add debug output
orphaner Nov 22, 2021
18b093b
chore: add debug output
orphaner Nov 23, 2021
8c6f836
fix: disable mmap on elasticsearch statefulset
orphaner Nov 23, 2021
75cdc8f
chore: cleanup debug output
orphaner Nov 23, 2021
f9a45ef
Merge branch 'main' into elasticsearch-search-template2
orphaner Nov 23, 2021
c899398
chore: remove debug timeout
orphaner Nov 23, 2021
3dec64d
chore: fix typo in changelog and organize imports
orphaner Nov 23, 2021
718b68a
Merge branch 'kedacore:main' into elasticsearch-search-template2
orphaner Nov 23, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ repos:
entry: "(?i)(black|white)[_-]?(list|List)"
pass_filenames: true
- id: sort-scalers
name: Check if scalers are sorted in scaler_handler.go
name: Check if scalers are sorted in scale_handler.go
language: system
entry: "bash tools/sort_scalers.sh"
files: .*scale_handler\.go$
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
- Improve Redis Scaler, upgrade library, add username and Sentinel support ([#2181](https://github.com/kedacore/keda/pull/2181))
- Add GCP identity authentication when using Pubsub Scaler ([#2225](https://github.com/kedacore/keda/pull/2225))
- Add ScalersCache to reuse scalers unless they need changing ([#2187](https://github.com/kedacore/keda/pull/2187))
- Add an elasticsearch scaler based on search template ([#2311](https://github.com/kedacore/keda/pull/2311))
orphaner marked this conversation as resolved.
Show resolved Hide resolved
- Cache metric names provided by KEDA Metrics Server ([#2279](https://github.com/kedacore/keda/pull/2279))

### Improvements
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ require (
github.com/Shopify/sarama v1.30.0
github.com/aws/aws-sdk-go v1.42.3
github.com/denisenkom/go-mssqldb v0.11.0
github.com/elastic/go-elasticsearch/v7 v7.15.1
github.com/go-logr/logr v0.4.0
github.com/go-playground/assert/v2 v2.0.1
github.com/go-redis/redis/v8 v8.11.4
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,8 @@ github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/elastic/go-elasticsearch/v7 v7.15.1 h1:Wd8RLHb5D8xPBU8vGlnLXyflkso9G+rCmsXjqH8LLQQ=
github.com/elastic/go-elasticsearch/v7 v7.15.1/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4=
github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
github.com/emicklei/go-restful v2.9.5+incompatible/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
Expand Down
276 changes: 276 additions & 0 deletions pkg/scalers/elasticsearch_scaler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,276 @@
package scalers

import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"strconv"
"strings"

"github.com/elastic/go-elasticsearch/v7"
kedautil "github.com/kedacore/keda/v2/pkg/util"
orphaner marked this conversation as resolved.
Show resolved Hide resolved
"github.com/tidwall/gjson"
"k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log"
)

type elasticsearchScaler struct {
metadata *elasticsearchMetadata
esClient *elasticsearch.Client
}

type elasticsearchMetadata struct {
addresses []string
unsafeSsl bool
username string
password string
indexes []string
searchTemplateName string
parameters []string
valueLocation string
targetValue int
metricName string
}

var elasticsearchLog = logf.Log.WithName("elasticsearch_scaler")

// NewElasticsearchScaler creates a new elasticsearch scaler
func NewElasticsearchScaler(config *ScalerConfig) (Scaler, error) {
meta, err := parseElasticsearchMetadata(config)
if err != nil {
return nil, fmt.Errorf("error parsing elasticsearch metadata: %s", err)
}

esClient, err := newElasticsearchClient(meta)
if err != nil {
return nil, fmt.Errorf("error getting elasticsearch client: %s", err)
}
return &elasticsearchScaler{
metadata: meta,
esClient: esClient,
}, nil
}

const defaultUnsafeSsl = false

func parseElasticsearchMetadata(config *ScalerConfig) (*elasticsearchMetadata, error) {
meta := elasticsearchMetadata{}

var err error
addresses, err := GetFromAuthOrMeta(config, "addresses")
if err != nil {
return nil, err
}
meta.addresses = splitAndTrimBySep(addresses, ",")

if val, ok := config.TriggerMetadata["unsafeSsl"]; ok {
meta.unsafeSsl, err = strconv.ParseBool(val)
if err != nil {
return nil, fmt.Errorf("error parsing unsafeSsl: %s", err)
}
} else {
meta.unsafeSsl = defaultUnsafeSsl
}

if val, ok := config.AuthParams["username"]; ok {
meta.username = val
} else if val, ok := config.TriggerMetadata["username"]; ok {
meta.username = val
}

if config.AuthParams["password"] != "" {
meta.password = config.AuthParams["password"]
} else if config.TriggerMetadata["passwordFromEnv"] != "" {
meta.password = config.ResolvedEnv[config.TriggerMetadata["passwordFromEnv"]]
}

index, err := GetFromAuthOrMeta(config, "index")
if err != nil {
return nil, err
}
meta.indexes = splitAndTrimBySep(index, ";")

meta.searchTemplateName, err = GetFromAuthOrMeta(config, "searchTemplateName")
if err != nil {
return nil, err
}

if val, ok := config.TriggerMetadata["parameters"]; ok {
meta.parameters = splitAndTrimBySep(val, ";")
}

meta.valueLocation, err = GetFromAuthOrMeta(config, "valueLocation")
if err != nil {
return nil, err
}

targetValue, err := GetFromAuthOrMeta(config, "targetValue")
if err != nil {
return nil, err
}
meta.targetValue, err = strconv.Atoi(targetValue)
if err != nil {
return nil, fmt.Errorf("targetValue parsing error %s", err.Error())
}

meta.metricName = GenerateMetricNameWithIndex(config.ScalerIndex, kedautil.NormalizeString(fmt.Sprintf("elasticsearch-%s", meta.searchTemplateName)))
return &meta, nil
}

// newElasticsearchClient creates elasticsearch db connection
func newElasticsearchClient(meta *elasticsearchMetadata) (*elasticsearch.Client, error) {
config := elasticsearch.Config{Addresses: meta.addresses}
if meta.username != "" {
config.Username = meta.username
}
if meta.password != "" {
config.Password = meta.password
}

transport := http.DefaultTransport.(*http.Transport)
transport.TLSClientConfig = &tls.Config{InsecureSkipVerify: meta.unsafeSsl}
config.Transport = transport

esClient, err := elasticsearch.NewClient(config)
if err != nil {
elasticsearchLog.Error(err, fmt.Sprintf("Found error when creating client: %s", err))
return nil, err
}

_, err = esClient.Info()
if err != nil {
elasticsearchLog.Error(err, fmt.Sprintf("Found error when pinging search engine: %s", err))
return nil, err
}
return esClient, nil
}

func (s *elasticsearchScaler) Close(ctx context.Context) error {
return nil
}

// IsActive returns true if there are pending messages to be processed
func (s *elasticsearchScaler) IsActive(ctx context.Context) (bool, error) {
messages, err := s.getQueryResult(ctx)
if err != nil {
elasticsearchLog.Error(err, fmt.Sprintf("Error inspecting elasticsearch: %s", err))
return false, err
}
return messages > 0, nil
}

// getQueryResult returns result of the scaler query
func (s *elasticsearchScaler) getQueryResult(ctx context.Context) (int, error) {
// Build the request body.
var body bytes.Buffer
if err := json.NewEncoder(&body).Encode(buildQuery(s.metadata)); err != nil {
elasticsearchLog.Error(err, "Error encoding query: %s", err)
}

// Run the templated search
res, err := s.esClient.SearchTemplate(
&body,
s.esClient.SearchTemplate.WithIndex(s.metadata.indexes...),
s.esClient.SearchTemplate.WithContext(ctx),
)
if err != nil {
elasticsearchLog.Error(err, fmt.Sprintf("Could not query elasticsearch: %s", err))
return 0, err
}

defer res.Body.Close()
b, err := ioutil.ReadAll(res.Body)
if err != nil {
return 0, err
}
v, err := getValueFromSearch(b, s.metadata.valueLocation)
if err != nil {
return 0, err
}
return v, nil
}

func buildQuery(metadata *elasticsearchMetadata) map[string]interface{} {
parameters := map[string]interface{}{}
for _, p := range metadata.parameters {
if p != "" {
kv := splitAndTrimBySep(p, ":")
parameters[kv[0]] = kv[1]
}
}
query := map[string]interface{}{
"id": metadata.searchTemplateName,
}
if len(parameters) > 0 {
query["params"] = parameters
}
return query
}

func getValueFromSearch(body []byte, valueLocation string) (int, error) {
r := gjson.GetBytes(body, valueLocation)
errorMsg := "valueLocation must point to value of type number but got: '%s'"
if r.Type == gjson.String {
q, err := strconv.Atoi(r.String())
if err != nil {
return 0, fmt.Errorf(errorMsg, r.String())
}
return q, nil
}
if r.Type != gjson.Number {
return 0, fmt.Errorf(errorMsg, r.Type.String())
}
return int(r.Num), nil
}

// GetMetricSpecForScaling returns the MetricSpec for the Horizontal Pod Autoscaler
func (s *elasticsearchScaler) GetMetricSpecForScaling(context.Context) []v2beta2.MetricSpec {
targetValue := resource.NewQuantity(int64(s.metadata.targetValue), resource.DecimalSI)

externalMetric := &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Name: s.metadata.metricName,
},
Target: v2beta2.MetricTarget{
Type: v2beta2.AverageValueMetricType,
AverageValue: targetValue,
},
}
metricSpec := v2beta2.MetricSpec{
External: externalMetric, Type: externalMetricType,
}
return []v2beta2.MetricSpec{metricSpec}
}

// GetMetrics returns value for a supported metric and an error if there is a problem getting the metric
func (s *elasticsearchScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
num, err := s.getQueryResult(ctx)
if err != nil {
return []external_metrics.ExternalMetricValue{}, fmt.Errorf("error inspecting elasticsearch: %s", err)
}

metric := external_metrics.ExternalMetricValue{
MetricName: metricName,
Value: *resource.NewQuantity(int64(num), resource.DecimalSI),
Timestamp: metav1.Now(),
}

return append([]external_metrics.ExternalMetricValue{}, metric), nil
}

// Splits a string separated by a specified separator and trims space from all the elements.
func splitAndTrimBySep(s string, sep string) []string {
x := strings.Split(s, sep)
for i := range x {
x[i] = strings.Trim(x[i], " ")
}
return x
}
Loading