Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Escape queueName and vhostName in RabbitMQ Scaler before use them in query string (bug fix) #2055

Merged
merged 10 commits into from
Aug 26, 2021
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
- Improve validation in Cron scaler in case start & end input is same.([#2032](https://github.com/kedacore/keda/pull/2032))
- Improve the cron validation in Cron Scaler ([#2038](https://github.com/kedacore/keda/pull/2038))
- Add Bearer auth for Metrics API scaler ([#2028](https://github.com/kedacore/keda/pull/2028))
- Escape `queueName` and `vhostName` in RabbitMQ Scaler before use them in query string (bug fix) ([#2055](https://github.com/kedacore/keda/pull/2055))

### Breaking Changes

Expand Down
24 changes: 14 additions & 10 deletions pkg/scalers/rabbitmq_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ type queueInfo struct {
Name string `json:"name"`
}

type regexQueueInfo struct {
Queues []queueInfo `json:"items"`
}

type messageStat struct {
PublishDetail publishDetail `json:"publish_details"`
}
Expand Down Expand Up @@ -200,12 +204,12 @@ func parseRabbitMQMetadata(config *ScalerConfig) (*rabbitMQMetadata, error) {

// Resolve metricName
if val, ok := config.TriggerMetadata["metricName"]; ok {
meta.metricName = kedautil.NormalizeString(fmt.Sprintf("%s-%s", "rabbitmq", val))
meta.metricName = kedautil.NormalizeString(fmt.Sprintf("%s-%s", "rabbitmq", url.QueryEscape(val)))
} else {
if meta.mode == rabbitModeQueueLength {
meta.metricName = kedautil.NormalizeString(fmt.Sprintf("%s-%s", "rabbitmq", meta.queueName))
meta.metricName = kedautil.NormalizeString(fmt.Sprintf("%s-%s", "rabbitmq", url.QueryEscape(meta.queueName)))
} else {
meta.metricName = kedautil.NormalizeString(fmt.Sprintf("%s-%s", "rabbitmq-rate", meta.queueName))
meta.metricName = kedautil.NormalizeString(fmt.Sprintf("%s-%s", "rabbitmq-rate", url.QueryEscape(meta.queueName)))
}
}

Expand Down Expand Up @@ -340,12 +344,12 @@ func getJSON(s *rabbitMQScaler, url string) (queueInfo, error) {

if r.StatusCode == 200 {
if s.metadata.useRegex {
var results []queueInfo
err = json.NewDecoder(r.Body).Decode(&results)
var queues regexQueueInfo
err = json.NewDecoder(r.Body).Decode(&queues)
if err != nil {
return result, err
return queueInfo{}, err
}
result, err := getComposedQueue(s, results)
result, err := getComposedQueue(s, queues.Queues)
return result, err
}

Expand All @@ -368,7 +372,7 @@ func (s *rabbitMQScaler) getQueueInfoViaHTTP() (*queueInfo, error) {

// Override vhost if requested.
if s.metadata.vhostName != nil {
vhost = "/" + *s.metadata.vhostName
vhost = "/" + url.QueryEscape(*s.metadata.vhostName)
}

if vhost == "" || vhost == "/" || vhost == "//" {
Expand All @@ -378,9 +382,9 @@ func (s *rabbitMQScaler) getQueueInfoViaHTTP() (*queueInfo, error) {
parsedURL.Path = ""
var getQueueInfoManagementURI string
if s.metadata.useRegex {
getQueueInfoManagementURI = fmt.Sprintf("%s/%s%s", parsedURL.String(), "api/queues?use_regex=true&pagination=false&name=", s.metadata.queueName)
getQueueInfoManagementURI = fmt.Sprintf("%s/%s%s", parsedURL.String(), "api/queues?page=1&use_regex=true&pagination=false&name=", url.QueryEscape(s.metadata.queueName))
} else {
getQueueInfoManagementURI = fmt.Sprintf("%s/%s%s/%s", parsedURL.String(), "api/queues", vhost, s.metadata.queueName)
getQueueInfoManagementURI = fmt.Sprintf("%s/%s%s/%s", parsedURL.String(), "api/queues", vhost, url.QueryEscape(s.metadata.queueName))
}

var info queueInfo
Expand Down
78 changes: 39 additions & 39 deletions pkg/scalers/rabbitmq_scaler_test.go

Large diffs are not rendered by default.

13 changes: 8 additions & 5 deletions tests/scalers/rabbitmq-helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,14 @@ export class RabbitMQHelper {
)
}

static publishMessages(t, namespace: string, connectionString: string, messageCount: number) {
static publishMessages(t, namespace: string, connectionString: string, messageCount: number, queueName: string) {
// publish messages
const tmpFile = tmp.fileSync()
fs.writeFileSync(tmpFile.name, publishYaml.replace('{{CONNECTION_STRING}}', connectionString)
.replace('{{MESSAGE_COUNT}}', messageCount.toString()))
.replace('{{MESSAGE_COUNT}}', messageCount.toString())
.replace('{{QUEUE_NAME}}', queueName)
.replace('{{QUEUE_NAME}}', queueName))

t.is(
0,
sh.exec(`kubectl apply -f ${tmpFile.name} --namespace ${namespace}`).code,
Expand All @@ -52,15 +55,15 @@ export class RabbitMQHelper {
const publishYaml = `apiVersion: batch/v1
kind: Job
metadata:
name: rabbitmq-publish
name: rabbitmq-publish-{{QUEUE_NAME}}
spec:
template:
spec:
containers:
- name: rabbitmq-client
image: jeffhollan/rabbitmq-client:dev
image: ghcr.io/kedacore/tests-rabbitmq
imagePullPolicy: Always
command: ["send", "{{CONNECTION_STRING}}", "{{MESSAGE_COUNT}}"]
command: ["send", "{{CONNECTION_STRING}}", "{{MESSAGE_COUNT}}", "{{QUEUE_NAME}}"]
restartPolicy: Never`

const rabbitmqDeployYaml = `apiVersion: v1
Expand Down
4 changes: 2 additions & 2 deletions tests/scalers/rabbitmq-queue-amqp.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ test.serial('Deployment should have 0 replicas on start', t => {
})

test.serial(`Deployment should scale to 4 with ${messageCount} messages on the queue then back to 0`, async t => {
RabbitMQHelper.publishMessages(t, testNamespace, connectionString, messageCount)
RabbitMQHelper.publishMessages(t, testNamespace, connectionString, messageCount, queueName)

// with messages published, the consumer deployment should start receiving the messages
t.true(await waitForDeploymentReplicaCount(4, 'test-deployment', testNamespace, 20, 5000), 'Replica count should be 4 after 10 seconds')
Expand Down Expand Up @@ -79,7 +79,7 @@ spec:
spec:
containers:
- name: rabbitmq-consumer
image: jeffhollan/rabbitmq-client:dev
image: ghcr.io/kedacore/tests-rabbitmq
imagePullPolicy: Always
command:
- receive
Expand Down
12 changes: 8 additions & 4 deletions tests/scalers/rabbitmq-queue-http-regex.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import {waitForDeploymentReplicaCount} from "./helpers";
const testNamespace = 'rabbitmq-queue-http-regex-test'
const rabbitmqNamespace = 'rabbitmq-http-regex-test'
const queueName = 'hello'
const dummyQueueName1 = 'hello-1'
const dummyQueueName2 = 'hellohellohello'
const username = "test-user"
const password = "test-password"
const vhost = "test-vh-regex"
Expand All @@ -20,7 +22,7 @@ test.before(t => {

sh.config.silent = true
// create deployment
const httpConnectionString = `http://${username}:${password}@rabbitmq.${rabbitmqNamespace}.svc.cluster.local/${vhost}`
const httpConnectionString = `http://${username}:${password}@rabbitmq.${rabbitmqNamespace}.svc.cluster.local`

RabbitMQHelper.createDeployment(t, testNamespace, deployYaml, connectionString, httpConnectionString, queueName)
})
Expand All @@ -33,7 +35,9 @@ test.serial('Deployment should have 0 replicas on start', t => {
})

test.serial(`Deployment should scale to 4 with ${messageCount} messages on the queue then back to 0`, async t => {
RabbitMQHelper.publishMessages(t, testNamespace, connectionString, messageCount)
RabbitMQHelper.publishMessages(t, testNamespace, connectionString, messageCount, dummyQueueName1)
RabbitMQHelper.publishMessages(t, testNamespace, connectionString, messageCount, dummyQueueName2)
RabbitMQHelper.publishMessages(t, testNamespace, connectionString, messageCount, queueName)

// with messages published, the consumer deployment should start receiving the messages
t.true(await waitForDeploymentReplicaCount(4, 'test-deployment', testNamespace, 20, 5000), 'Replica count should be 4 after 10 seconds')
Expand Down Expand Up @@ -81,7 +85,7 @@ spec:
spec:
containers:
- name: rabbitmq-consumer
image: jeffhollan/rabbitmq-client:dev
image: ghcr.io/kedacore/tests-rabbitmq
imagePullPolicy: Always
command:
- receive
Expand All @@ -105,7 +109,7 @@ spec:
triggers:
- type: rabbitmq
metadata:
queueName: {{QUEUE_NAME}}
queueName: "^hell.{1}$"
hostFromEnv: RabbitApiHost
protocol: http
useRegex: 'true'
Expand Down
4 changes: 2 additions & 2 deletions tests/scalers/rabbitmq-queue-http.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ test.serial('Deployment should have 0 replicas on start', async t => {
})

test.serial(`Deployment should scale to 4 with ${messageCount} messages on the queue then back to 0`, async t => {
RabbitMQHelper.publishMessages(t, testNamespace, connectionString, messageCount)
RabbitMQHelper.publishMessages(t, testNamespace, connectionString, messageCount, queueName)

// with messages published, the consumer deployment should start receiving the messages
t.true(await waitForDeploymentReplicaCount(4, 'test-deployment', testNamespace, 30, 5000))
Expand Down Expand Up @@ -78,7 +78,7 @@ spec:
spec:
containers:
- name: rabbitmq-consumer
image: jeffhollan/rabbitmq-client:dev
image: ghcr.io/kedacore/tests-rabbitmq
imagePullPolicy: Always
command:
- receive
Expand Down
4 changes: 2 additions & 2 deletions tests/scalers/rabbitmq-queue-trigger-auth.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ test.serial('Deployment should have 0 replicas on start', t => {
})

test.serial(`Deployment should scale to 4 with ${messageCount} messages on the queue then back to 0`, t => {
RabbitMQHelper.publishMessages(t, testNamespace, connectionString, messageCount)
RabbitMQHelper.publishMessages(t, testNamespace, connectionString, messageCount, queueName)

// with messages published, the consumer deployment should start receiving the messages
let replicaCount = '0'
Expand Down Expand Up @@ -102,7 +102,7 @@ spec:
spec:
containers:
- name: rabbitmq-consumer
image: jeffhollan/rabbitmq-client:dev
image: ghcr.io/kedacore/tests-rabbitmq
imagePullPolicy: Always
command:
- receive
Expand Down