Skip to content

Commit

Permalink
added OptFns to sns/sqs and extra logs
Browse files Browse the repository at this point in the history
  • Loading branch information
roblaszczak committed Oct 15, 2024
1 parent bcd0160 commit 8cf4b95
Show file tree
Hide file tree
Showing 11 changed files with 233 additions and 37 deletions.
20 changes: 0 additions & 20 deletions internal/endpoint.go

This file was deleted.

6 changes: 6 additions & 0 deletions sns/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ import (
type PublisherConfig struct {
AWSConfig aws.Config

// OptFns are options for the SNS client.
OptFns []func(*sns.Options)

CreateTopicConfig ConfigAttributes
DoNotCreateTopicIfNotExists bool

Expand Down Expand Up @@ -64,6 +67,9 @@ func GenerateCreateTopicInputDefault(ctx context.Context, topic TopicName, attrs
type SubscriberConfig struct {
AWSConfig aws.Config

// OptFns are options for the SNS client.
OptFns []func(*sns.Options)

TopicResolver TopicResolver

GenerateSqsQueueName GenerateSqsQueueNameFn
Expand Down
4 changes: 2 additions & 2 deletions sns/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ package sns

import (
"context"
"errors"
"fmt"

"github.com/ThreeDotsLabs/watermill"

Check failure on line 8 in sns/publisher.go

View workflow job for this annotation

GitHub Actions / ci / build

missing go.sum entry for module providing package github.com/ThreeDotsLabs/watermill (imported by github.com/ThreeDotsLabs/watermill-amazonsqs/sns); to add:
"github.com/ThreeDotsLabs/watermill/message"
"github.com/aws/aws-sdk-go-v2/service/sns"
"github.com/aws/aws-sdk-go-v2/service/sns/types"
"github.com/pkg/errors"
)

type Publisher struct {
Expand All @@ -25,7 +25,7 @@ func NewPublisher(config PublisherConfig, logger watermill.LoggerAdapter) (*Publ
}

return &Publisher{
sns: sns.NewFromConfig(config.AWSConfig),
sns: sns.NewFromConfig(config.AWSConfig, config.OptFns...),
config: config,
logger: logger,
}, nil
Expand Down
60 changes: 53 additions & 7 deletions sns/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,15 @@ package sns_test
import (
"context"
"fmt"
"net/url"
"testing"

"github.com/ThreeDotsLabs/watermill-amazonsqs/internal"
amazonsns "github.com/aws/aws-sdk-go-v2/service/sns"
amazonsqs "github.com/aws/aws-sdk-go-v2/service/sqs"

"github.com/ThreeDotsLabs/watermill-amazonsqs/sqs"
awsconfig "github.com/aws/aws-sdk-go-v2/config"
transport "github.com/aws/smithy-go/endpoints"
"github.com/stretchr/testify/assert"

"github.com/aws/aws-sdk-go-v2/aws"
Expand Down Expand Up @@ -55,18 +59,27 @@ func TestPubSub_arn_topic_resolver(t *testing.T) {
return createPubSubWithConfig(
t,
sns.PublisherConfig{
AWSConfig: cfg,
AWSConfig: cfg,
OptFns: []func(*amazonsns.Options){
GetEndpointResolverSns(),
},
CreateTopicConfig: sns.ConfigAttributes{},
Marshaler: sns.DefaultMarshalerUnmarshaler{},
TopicResolver: sns.TransparentTopicResolver{},
},
sns.SubscriberConfig{
AWSConfig: cfg,
AWSConfig: cfg,
OptFns: []func(*amazonsns.Options){
GetEndpointResolverSns(),
},
GenerateSqsQueueName: sns.GenerateSqsQueueNameEqualToTopicName,
TopicResolver: sns.TransparentTopicResolver{},
},
sqs.SubscriberConfig{
AWSConfig: cfg,
OptFns: []func(*amazonsqs.Options){
GetEndpointResolverSqs(),
},
QueueConfigAttributes: sqs.QueueConfigAttributes{
// Default value is 30 seconds - need to be lower for tests
VisibilityTimeout: "1",
Expand Down Expand Up @@ -112,18 +125,27 @@ func createPubSub(t *testing.T) (message.Publisher, message.Subscriber) {
return createPubSubWithConfig(
t,
sns.PublisherConfig{
AWSConfig: cfg,
AWSConfig: cfg,
OptFns: []func(*amazonsns.Options){
GetEndpointResolverSns(),
},
CreateTopicConfig: sns.ConfigAttributes{},
TopicResolver: topicResolver,
Marshaler: sns.DefaultMarshalerUnmarshaler{},
},
sns.SubscriberConfig{
AWSConfig: cfg,
AWSConfig: cfg,
OptFns: []func(*amazonsns.Options){
GetEndpointResolverSns(),
},
TopicResolver: topicResolver,
GenerateSqsQueueName: sns.GenerateSqsQueueNameEqualToTopicName,
},
sqs.SubscriberConfig{
AWSConfig: cfg,
OptFns: []func(*amazonsqs.Options){
GetEndpointResolverSqs(),
},
QueueConfigAttributes: sqs.QueueConfigAttributes{
// Default value is 30 seconds - need to be lower for tests
VisibilityTimeout: "1",
Expand All @@ -141,20 +163,29 @@ func createPubSubWithConsumerGroup(t *testing.T, consumerGroup string) (message.
return createPubSubWithConfig(
t,
sns.PublisherConfig{
AWSConfig: cfg,
AWSConfig: cfg,
OptFns: []func(*amazonsns.Options){
GetEndpointResolverSns(),
},
CreateTopicConfig: sns.ConfigAttributes{},
Marshaler: sns.DefaultMarshalerUnmarshaler{},
TopicResolver: topicResolver,
},
sns.SubscriberConfig{
AWSConfig: cfg,
OptFns: []func(*amazonsns.Options){
GetEndpointResolverSns(),
},
GenerateSqsQueueName: func(ctx context.Context, sqsTopic sns.TopicArn) (string, error) {
return consumerGroup, nil
},
TopicResolver: topicResolver,
},
sqs.SubscriberConfig{
AWSConfig: cfg,
OptFns: []func(*amazonsqs.Options){
GetEndpointResolverSqs(),
},
QueueConfigAttributes: sqs.QueueConfigAttributes{
// Default value is 30 seconds - need to be lower for tests
VisibilityTimeout: "1",
Expand Down Expand Up @@ -185,10 +216,25 @@ func GetAWSConfig(t *testing.T) aws.Config {

cfg, err := awsconfig.LoadDefaultConfig(
context.Background(),
internal.SetEndPoint("http://localhost:4566"),
awsconfig.WithRegion("us-west-2"),
)
require.NoError(t, err)

return cfg
}

func GetEndpointResolverSns() func(*amazonsns.Options) {
return amazonsns.WithEndpointResolverV2(sns.OverrideEndpointResolver{
Endpoint: transport.Endpoint{
URI: url.URL{Scheme: "http", Host: "localhost:4566"},
},
})
}

func GetEndpointResolverSqs() func(*amazonsqs.Options) {
return amazonsqs.WithEndpointResolverV2(sqs.OverrideEndpointResolver{
Endpoint: transport.Endpoint{
URI: url.URL{Scheme: "http", Host: "localhost:4566"},
},
})
}
39 changes: 39 additions & 0 deletions sns/resolver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package sns

import (
"context"

"github.com/aws/aws-sdk-go-v2/service/sns"
"github.com/aws/smithy-go/endpoints"
)

// OverrideEndpointResolver is a custom endpoint resolver that always returns the same endpoint.
// It can be used to use AWS emulators like Localstack.
//
// For example:
// import (
//
// "github.com/ThreeDotsLabs/watermill-amazonsns/sns"
// amazonsns "github.com/aws/aws-sdk-go-v2/service/sns"
// "github.com/aws/smithy-go/transport"
//
// )
//
// pub, err := sns.NewPublisher(sns.PublisherConfig{
// AWSConfig: cfg,
// Marshaler: sns.DefaultMarshalerUnmarshaler{},
// OptFns: []func(*amazonsns.Options){
// amazonsns.WithEndpointResolverV2(sns.OverrideEndpointResolver{
// Endpoint: transport.Endpoint{
// URI: url.URL{Scheme: "http", Host: "localstack:4566"},
// },
// }),
// },
// }, logger)
type OverrideEndpointResolver struct {
Endpoint transport.Endpoint
}

func (o OverrideEndpointResolver) ResolveEndpoint(ctx context.Context, params sns.EndpointParameters) (transport.Endpoint, error) {
return o.Endpoint, nil
}
19 changes: 17 additions & 2 deletions sns/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ func NewSubscriber(
return &Subscriber{
config: config,
logger: logger,
sns: sns.NewFromConfig(config.AWSConfig),
sqsClient: awsSqs.NewFromConfig(config.AWSConfig),
sns: sns.NewFromConfig(config.AWSConfig, config.OptFns...),
sqsClient: awsSqs.NewFromConfig(sqsConfig.AWSConfig, sqsConfig.OptFns...),
sqs: sqs,
}, nil
}
Expand All @@ -60,6 +60,12 @@ func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *messa
return nil, err
}

logger := s.logger.With(watermill.LogFields{
"topic": topic,
"topic_arn": snsTopicArn,
})
logger.Debug("Subscribing to topic", nil)

if !s.config.DoNotCreateSqsSubscription {
if err := s.SubscribeInitializeWithContext(ctx, topic); err != nil {
return nil, err
Expand All @@ -71,6 +77,8 @@ func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *messa
return nil, fmt.Errorf("failed to generate SQS queue name: %w", err)
}

logger.Debug("Subscribing to SQS", watermill.LogFields{"sqs_topic": sqsTopic})

return s.sqs.Subscribe(ctx, sqsTopic)
}

Expand All @@ -79,11 +87,18 @@ func (s *Subscriber) SubscribeInitialize(topic string) error {
}

func (s *Subscriber) SubscribeInitializeWithContext(ctx context.Context, topic string) error {
logger := s.logger.With(watermill.LogFields{
"topic": topic,
})
logger.Debug("Initializing SNS subscription", nil)

snsTopicArn, err := s.config.TopicResolver.ResolveTopic(ctx, topic)
if err != nil {
return err
}

logger.Debug("Resolved topic", watermill.LogFields{"topic_arn": snsTopicArn})

sqsTopic, err := s.config.GenerateSqsQueueName(ctx, snsTopicArn)
if err != nil {
return fmt.Errorf("failed to generate SQS queue name: %w", err)
Expand Down
6 changes: 6 additions & 0 deletions sqs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ import (
type SubscriberConfig struct {
AWSConfig aws.Config

// OptFns are options for the SQS client.
OptFns []func(*sqs.Options)

DoNotCreateQueueIfNotExists bool

QueueUrlResolver QueueUrlResolver
Expand Down Expand Up @@ -83,6 +86,9 @@ func (c SubscriberConfig) Validate() error {
type PublisherConfig struct {
AWSConfig aws.Config

// OptFns are options for the SQS client.
OptFns []func(*sqs.Options)

CreateQueueConfig QueueConfigAttributes
DoNotCacheQueues bool

Expand Down
4 changes: 2 additions & 2 deletions sqs/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ package sqs

import (
"context"
"errors"
"fmt"

"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/aws/aws-sdk-go-v2/service/sqs/types"
"github.com/pkg/errors"
)

type Publisher struct {
Expand All @@ -25,7 +25,7 @@ func NewPublisher(config PublisherConfig, logger watermill.LoggerAdapter) (*Publ
}

return &Publisher{
sqs: sqs.NewFromConfig(config.AWSConfig),
sqs: sqs.NewFromConfig(config.AWSConfig, config.OptFns...),
config: config,
logger: logger,
}, nil
Expand Down
Loading

0 comments on commit 8cf4b95

Please sign in to comment.