Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend RabbitMQ scaler to support count unacked messages #700

Merged
merged 2 commits into from
Apr 8, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Fix code review feedback
Signed-off-by: Alex Emelyanov <holyketzer@gmail.com>
  • Loading branch information
holyketzer committed Mar 28, 2020
commit ce4c38578b651ac6dbcdba885a8b9727d40325c9
32 changes: 16 additions & 16 deletions pkg/scalers/rabbitmq_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import (
const (
rabbitQueueLengthMetricName = "queueLength"
rabbitMetricType = "External"
rabbitCountUnacked = "countUnacked"
defaultCountUnacked = false
rabbitIncludeUnacked = "includeUnacked"
defaultIncludeUnacked = false
)

type rabbitMQScaler struct {
Expand All @@ -33,11 +33,11 @@ type rabbitMQScaler struct {
}

type rabbitMQMetadata struct {
queueName string
host string // connection string for AMQP protocol
apiHost string // connection string for management API requests
queueLength int
countUnacked bool // if true uses HTTP API and requires apiHost, if false uses AMQP and requires host
queueName string
host string // connection string for AMQP protocol
apiHost string // connection string for management API requests
queueLength int
includeUnacked bool // if true uses HTTP API and requires apiHost, if false uses AMQP and requires host
}

type queueInfo struct {
Expand All @@ -55,7 +55,7 @@ func NewRabbitMQScaler(resolvedEnv, metadata, authParams map[string]string) (Sca
return nil, fmt.Errorf("error parsing rabbitmq metadata: %s", err)
}

if meta.countUnacked {
if meta.includeUnacked {
return &rabbitMQScaler{metadata: meta}, nil
} else {
conn, ch, err := getConnectionAndChannel(meta.host)
Expand All @@ -74,16 +74,16 @@ func NewRabbitMQScaler(resolvedEnv, metadata, authParams map[string]string) (Sca
func parseRabbitMQMetadata(resolvedEnv, metadata, authParams map[string]string) (*rabbitMQMetadata, error) {
meta := rabbitMQMetadata{}

meta.countUnacked = defaultCountUnacked
if val, ok := metadata[rabbitCountUnacked]; ok {
countUnacked, err := strconv.ParseBool(val)
meta.includeUnacked = defaultIncludeUnacked
if val, ok := metadata[rabbitIncludeUnacked]; ok {
includeUnacked, err := strconv.ParseBool(val)
if err != nil {
return nil, fmt.Errorf("countUnacked parsing error %s", err.Error())
return nil, fmt.Errorf("includeUnacked parsing error %s", err.Error())
}
meta.countUnacked = countUnacked
meta.includeUnacked = includeUnacked
}

if meta.countUnacked {
if meta.includeUnacked {
if val, ok := authParams["apiHost"]; ok {
meta.apiHost = val
} else if val, ok := metadata["apiHost"]; ok {
Expand Down Expand Up @@ -149,7 +149,7 @@ func getConnectionAndChannel(host string) (*amqp.Connection, *amqp.Channel, erro

// Close disposes of RabbitMQ connections
func (s *rabbitMQScaler) Close() error {
if s.metadata.countUnacked == false {
if s.connection != nil {
err := s.connection.Close()
if err != nil {
rabbitmqLog.Error(err, "Error closing rabbitmq connection")
Expand All @@ -170,7 +170,7 @@ func (s *rabbitMQScaler) IsActive(ctx context.Context) (bool, error) {
}

func (s *rabbitMQScaler) getQueueMessages() (int, error) {
if s.metadata.countUnacked {
if s.metadata.includeUnacked {
info, err := s.getQueueInfoViaHttp()
if err != nil {
return -1, err
Expand Down
12 changes: 6 additions & 6 deletions pkg/scalers/rabbitmq_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ var testRabbitMQMetadata = []parseRabbitMQMetadataTestData{
{map[string]string{"queueLength": "10", "host": host}, true, map[string]string{}},
// host defined in authParams
{map[string]string{"queueLength": "10"}, true, map[string]string{"host": host}},
// properly formed metadata with countUnacked
{map[string]string{"queueLength": "10", "queueName": "sample", "apiHost": apiHost, "countUnacked": "true"}, false, map[string]string{}},
// properly formed metadata with includeUnacked
{map[string]string{"queueLength": "10", "queueName": "sample", "apiHost": apiHost, "includeUnacked": "true"}, false, map[string]string{}},
}

func TestRabbitMQParseMetadata(t *testing.T) {
Expand Down Expand Up @@ -83,10 +83,10 @@ func TestGetQueueInfo(t *testing.T) {
resolvedEnv := map[string]string{apiHost: fmt.Sprintf("%s/%s", apiStub.URL, "myhost")}

metadata := map[string]string{
"queueLength": "10",
"queueName": "evaluate_trials",
"apiHost": apiHost,
"countUnacked": "true",
"queueLength": "10",
"queueName": "evaluate_trials",
"apiHost": apiHost,
"includeUnacked": "true",
}

s, err := NewRabbitMQScaler(resolvedEnv, metadata, map[string]string{})
Expand Down