forked from segmentio/kafka-go
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
SASL Mechanism: AWS MSK IAM (making requested edits) (segmentio#798)
Co-authored-by: Achille <achille.roussel@gmail.com> Co-authored-by: Christian Maher <maher.cs@gmail.com>
- Loading branch information
1 parent
2e02f37
commit e88d48a
Showing
5 changed files
with
308 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
module github.com/segmentio/kafka-go/sasl/aws_msk_iam | ||
|
||
go 1.15 | ||
|
||
require ( | ||
github.com/aws/aws-sdk-go v1.41.3 | ||
github.com/segmentio/kafka-go v0.4.24 | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
github.com/aws/aws-sdk-go v1.41.3 h1:deglLZ1jjHdhkd6Rbad1MZM4gL+1pfnTfjuFk6CGJFM= | ||
github.com/aws/aws-sdk-go v1.41.3/go.mod h1:585smgzpB/KqRA+K3y/NL/oYRqQvpNJYvLm+LY1U59Q= | ||
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= | ||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= | ||
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw= | ||
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= | ||
github.com/frankban/quicktest v1.11.3 h1:8sXhOn0uLys67V8EsXLc6eszDs8VXWxL3iRvebPhedY= | ||
github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k= | ||
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= | ||
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= | ||
github.com/google/go-cmp v0.5.4 h1:L8R9j+yAqZuZjsqh/z+F1NCffTKKLShY6zXTItVIZ8M= | ||
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= | ||
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= | ||
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= | ||
github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8= | ||
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= | ||
github.com/klauspost/compress v1.9.8 h1:VMAMUUOh+gaxKTMk+zqbjsSjsIcUcL/LF4o63i82QyA= | ||
github.com/klauspost/compress v1.9.8/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= | ||
github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= | ||
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= | ||
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= | ||
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= | ||
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= | ||
github.com/pierrec/lz4 v2.6.0+incompatible h1:Ix9yFKn1nSPBLFl/yZknTp8TU5G4Ps0JDmguYK6iH1A= | ||
github.com/pierrec/lz4 v2.6.0+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= | ||
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= | ||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= | ||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= | ||
github.com/segmentio/kafka-go v0.4.24 h1:R3tYSYxyLK3SknDIU15LtpDdq59gRg2/J0GKhDFXrBQ= | ||
github.com/segmentio/kafka-go v0.4.24/go.mod h1:XzMcoMjSzDGHcIwpWUI7GB43iKZ2fTVmryPSGLf/MPg= | ||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= | ||
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= | ||
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= | ||
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk= | ||
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= | ||
github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0= | ||
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= | ||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= | ||
golang.org/x/crypto v0.0.0-20190506204251-e1dfcc566284 h1:rlLehGeYg6jfoyz/eDqDU1iRXLKfR42nnNh57ytKEWo= | ||
golang.org/x/crypto v0.0.0-20190506204251-e1dfcc566284/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= | ||
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= | ||
golang.org/x/net v0.0.0-20210614182718-04defd469f4e h1:XpT3nA5TvE525Ne3hInMh6+GETgn27Zfm9dxsThnX2Q= | ||
golang.org/x/net v0.0.0-20210614182718-04defd469f4e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= | ||
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= | ||
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= | ||
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= | ||
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= | ||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= | ||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= | ||
golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M= | ||
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= | ||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= | ||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= | ||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= | ||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= | ||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= | ||
gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10= | ||
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= | ||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= | ||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,124 @@ | ||
package aws_msk_iam | ||
|
||
import ( | ||
"context" | ||
"encoding/json" | ||
"errors" | ||
"fmt" | ||
"net/http" | ||
"net/url" | ||
"runtime" | ||
"strings" | ||
"time" | ||
|
||
sigv4 "github.com/aws/aws-sdk-go/aws/signer/v4" | ||
"github.com/segmentio/kafka-go/sasl" | ||
) | ||
|
||
const ( | ||
// These constants come from https://github.com/aws/aws-msk-iam-auth#details and | ||
// https://github.com/aws/aws-msk-iam-auth/blob/main/src/main/java/software/amazon/msk/auth/iam/internals/AWS4SignedPayloadGenerator.java. | ||
signVersion = "2020_10_22" | ||
signService = "kafka-cluster" | ||
signAction = "kafka-cluster:Connect" | ||
signVersionKey = "version" | ||
signHostKey = "host" | ||
signUserAgentKey = "user-agent" | ||
signActionKey = "action" | ||
queryActionKey = "Action" | ||
) | ||
|
||
var signUserAgent = fmt.Sprintf("kafka-go/sasl/aws_msk_iam/%s", runtime.Version()) | ||
|
||
// Mechanism implements sasl.Mechanism for the AWS_MSK_IAM mechanism, based on the official java implementation: | ||
// https://github.com/aws/aws-msk-iam-auth | ||
type Mechanism struct { | ||
// The sigv4.Signer to use when signing the request. Required. | ||
Signer *sigv4.Signer | ||
// The region where the msk cluster is hosted, e.g. "us-east-1". Required. | ||
Region string | ||
// The time the request is planned for. Optional, defaults to time.Now() at time of authentication. | ||
SignTime time.Time | ||
// The duration for which the presigned request is active. Optional, defaults to 5 minutes. | ||
Expiry time.Duration | ||
} | ||
|
||
func (m *Mechanism) Name() string { | ||
return "AWS_MSK_IAM" | ||
} | ||
|
||
// Start produces the authentication values required for AWS_MSK_IAM. It produces the following json as a byte array, | ||
// making use of the aws-sdk to produce the signed output. | ||
// { | ||
// "version" : "2020_10_22", | ||
// "host" : "<broker host>", | ||
// "user-agent": "<user agent string from the client>", | ||
// "action": "kafka-cluster:Connect", | ||
// "x-amz-algorithm" : "<algorithm>", | ||
// "x-amz-credential" : "<clientAWSAccessKeyID>/<date in yyyyMMdd format>/<region>/kafka-cluster/aws4_request", | ||
// "x-amz-date" : "<timestamp in yyyyMMdd'T'HHmmss'Z' format>", | ||
// "x-amz-security-token" : "<clientAWSSessionToken if any>", | ||
// "x-amz-signedheaders" : "host", | ||
// "x-amz-expires" : "<expiration in seconds>", | ||
// "x-amz-signature" : "<AWS SigV4 signature computed by the client>" | ||
// } | ||
func (m *Mechanism) Start(ctx context.Context) (sess sasl.StateMachine, ir []byte, err error) { | ||
saslMeta := sasl.MetadataFromContext(ctx) | ||
if saslMeta == nil { | ||
return nil, nil, errors.New("missing sasl metadata") | ||
} | ||
|
||
query := url.Values{ | ||
queryActionKey: {signAction}, | ||
} | ||
|
||
signUrl := url.URL{ | ||
Scheme: "kafka", | ||
Host: saslMeta.Host, | ||
Path: "/", | ||
RawQuery: query.Encode(), | ||
} | ||
|
||
req, err := http.NewRequest("GET", signUrl.String(), nil) | ||
if err != nil { | ||
return nil, nil, err | ||
} | ||
|
||
signTime := m.SignTime | ||
if signTime.IsZero() { | ||
signTime = time.Now() | ||
} | ||
|
||
expiry := m.Expiry | ||
if expiry == 0 { | ||
expiry = 5 * time.Minute | ||
} | ||
|
||
header, err := m.Signer.Presign(req, nil, signService, m.Region, expiry, signTime) | ||
if err != nil { | ||
return nil, nil, err | ||
} | ||
signedMap := map[string]string{ | ||
signVersionKey: signVersion, | ||
signHostKey: signUrl.Host, | ||
signUserAgentKey: signUserAgent, | ||
signActionKey: signAction, | ||
} | ||
// The protocol requires lowercase keys. | ||
for key, vals := range header { | ||
signedMap[strings.ToLower(key)] = vals[0] | ||
} | ||
for key, vals := range req.URL.Query() { | ||
signedMap[strings.ToLower(key)] = vals[0] | ||
} | ||
|
||
signedJson, err := json.Marshal(signedMap) | ||
return m, signedJson, err | ||
} | ||
|
||
func (m *Mechanism) Next(ctx context.Context, challenge []byte) (bool, []byte, error) { | ||
// After the initial step, the authentication is complete | ||
// kafka will return error if it rejected the credentials, so we'll only | ||
// arrive here on success. | ||
return true, nil, nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,105 @@ | ||
package aws_msk_iam | ||
|
||
import ( | ||
"bytes" | ||
"context" | ||
"encoding/json" | ||
"testing" | ||
"time" | ||
|
||
"github.com/segmentio/kafka-go/sasl" | ||
|
||
"github.com/aws/aws-sdk-go/aws/credentials" | ||
sigv4 "github.com/aws/aws-sdk-go/aws/signer/v4" | ||
) | ||
|
||
const ( | ||
accessKeyId = "ACCESS_KEY" | ||
secretAccessKey = "SECRET_KEY" | ||
) | ||
|
||
// using a fixed time allows the signature to be verifiable in a test | ||
var signTime = time.Date(2021, 10, 14, 13, 5, 0, 0, time.UTC) | ||
|
||
func TestAwsMskIamMechanism(t *testing.T) { | ||
tests := []struct { | ||
description string | ||
ctx func() context.Context | ||
shouldFail bool | ||
}{ | ||
{ | ||
description: "with metadata", | ||
ctx: func() context.Context { | ||
return sasl.WithMetadata(context.Background(), &sasl.Metadata{ | ||
Host: "localhost", | ||
Port: 9092, | ||
}) | ||
}, | ||
}, | ||
{ | ||
description: "without metadata", | ||
ctx: func() context.Context { | ||
return context.Background() | ||
}, | ||
shouldFail: true, | ||
}, | ||
} | ||
|
||
for _, tt := range tests { | ||
t.Run(tt.description, func(t *testing.T) { | ||
ctx := tt.ctx() | ||
|
||
creds := credentials.NewStaticCredentials(accessKeyId, secretAccessKey, "") | ||
mskMechanism := &Mechanism{ | ||
Signer: sigv4.NewSigner(creds), | ||
Region: "us-east-1", | ||
SignTime: signTime, | ||
} | ||
|
||
sess, auth, err := mskMechanism.Start(ctx) | ||
if tt.shouldFail { // if error is expected | ||
if err == nil { // but we don't find one | ||
t.Fatal("error expected") | ||
} else { // but we do find one | ||
return // return early since the remaining assertions are irrelevant | ||
} | ||
} else { // if error is not expected (typical) | ||
if err != nil { // but we do find one | ||
t.Fatal(err) | ||
} | ||
} | ||
|
||
if sess != mskMechanism { | ||
t.Error( | ||
"Unexpected session", | ||
"expected", mskMechanism, | ||
"got", sess, | ||
) | ||
} | ||
|
||
expectedMap := map[string]string{ | ||
"version": "2020_10_22", | ||
"action": "kafka-cluster:Connect", | ||
"host": "localhost", | ||
"user-agent": signUserAgent, | ||
"x-amz-algorithm": "AWS4-HMAC-SHA256", | ||
"x-amz-credential": "ACCESS_KEY/20211014/us-east-1/kafka-cluster/aws4_request", | ||
"x-amz-date": "20211014T130500Z", | ||
"x-amz-expires": "300", | ||
"x-amz-signedheaders": "host", | ||
"x-amz-signature": "6b8d25f9b45b9c7db9da855a49112d80379224153a27fd279c305a5b7940d1a7", | ||
} | ||
expectedAuth, err := json.Marshal(expectedMap) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
|
||
if !bytes.Equal(expectedAuth, auth) { | ||
t.Error("Unexpected authentication", | ||
"expected", expectedAuth, | ||
"got", auth, | ||
) | ||
} | ||
}) | ||
} | ||
} |