Skip to content

Commit

Permalink
Added support for SASL PLAIN and SCRAM mechanisms (segmentio#223)
Browse files Browse the repository at this point in the history
  • Loading branch information
stevevls authored Mar 18, 2019
1 parent 1c2c0d0 commit 49088e0
Show file tree
Hide file tree
Showing 15 changed files with 684 additions and 20 deletions.
37 changes: 32 additions & 5 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ jobs:
- image: circleci/golang
- image: wurstmeister/zookeeper
ports: ['2181:2181']
- image: wurstmeister/kafka:0.10.1.0
- image: wurstmeister/kafka:0.10.1.1
ports: ['9092:9092']
environment:
KAFKA_BROKER_ID: '1'
Expand All @@ -18,6 +18,12 @@ jobs:
KAFKA_ADVERTISED_PORT: '9092'
KAFKA_ZOOKEEPER_CONNECT: 'localhost:2181'
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_LISTENERS: 'PLAINTEXT://:9092,SASL_PLAINTEXT://:9093'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093'
KAFKA_SASL_ENABLED_MECHANISMS: 'PLAIN'
KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf"
CUSTOM_INIT_SCRIPT: |-
echo -e 'KafkaServer {\norg.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n };' > /opt/kafka/config/kafka_server_jaas.conf;
steps:
- checkout
- setup_remote_docker: { reusable: true, docker_layer_caching: true }
Expand All @@ -32,8 +38,8 @@ jobs:
- image: circleci/golang
- image: wurstmeister/zookeeper
ports: ['2181:2181']
- image: wurstmeister/kafka:0.11.0.1
ports: ['9092:9092']
- image: wurstmeister/kafka:2.11-0.11.0.3
ports: ['9092:9092','9093:9093']
environment:
KAFKA_BROKER_ID: '1'
KAFKA_CREATE_TOPICS: 'test-writer-0:3:1,test-writer-1:3:1'
Expand All @@ -42,6 +48,13 @@ jobs:
KAFKA_ADVERTISED_PORT: '9092'
KAFKA_ZOOKEEPER_CONNECT: 'localhost:2181'
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_LISTENERS: 'PLAINTEXT://:9092,SASL_PLAINTEXT://:9093'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093'
KAFKA_SASL_ENABLED_MECHANISMS: 'PLAIN,SCRAM-SHA-256,SCRAM-SHA-512'
KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf"
CUSTOM_INIT_SCRIPT: |-
echo -e 'KafkaServer {\norg.apache.kafka.common.security.scram.ScramLoginModule required\n username="adminscram"\n password="admin-secret";\n org.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n };' > /opt/kafka/config/kafka_server_jaas.conf;
/opt/kafka/bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin-secret-256],SCRAM-SHA-512=[password=admin-secret-512]' --entity-type users --entity-name adminscram
steps:
- checkout
- setup_remote_docker: { reusable: true, docker_layer_caching: true }
Expand All @@ -57,7 +70,7 @@ jobs:
- image: wurstmeister/zookeeper
ports: ['2181:2181']
- image: wurstmeister/kafka:2.11-1.1.1
ports: ['9092:9092']
ports: ['9092:9092','9093:9093']
environment:
KAFKA_BROKER_ID: '1'
KAFKA_CREATE_TOPICS: 'test-writer-0:3:1,test-writer-1:3:1'
Expand All @@ -66,6 +79,13 @@ jobs:
KAFKA_ADVERTISED_PORT: '9092'
KAFKA_ZOOKEEPER_CONNECT: 'localhost:2181'
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_LISTENERS: 'PLAINTEXT://:9092,SASL_PLAINTEXT://:9093'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093'
KAFKA_SASL_ENABLED_MECHANISMS: 'PLAIN,SCRAM-SHA-256,SCRAM-SHA-512'
KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf"
CUSTOM_INIT_SCRIPT: |-
echo -e 'KafkaServer {\norg.apache.kafka.common.security.scram.ScramLoginModule required\n username="adminscram"\n password="admin-secret";\n org.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n };' > /opt/kafka/config/kafka_server_jaas.conf;
/opt/kafka/bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin-secret-256],SCRAM-SHA-512=[password=admin-secret-512]' --entity-type users --entity-name adminscram
steps:
- checkout
- setup_remote_docker: { reusable: true, docker_layer_caching: true }
Expand All @@ -81,7 +101,7 @@ jobs:
- image: wurstmeister/zookeeper
ports: ['2181:2181']
- image: wurstmeister/kafka:2.12-2.1.0
ports: ['9092:9092']
ports: ['9092:9092','9093:9093']
environment:
KAFKA_BROKER_ID: '1'
KAFKA_CREATE_TOPICS: 'test-writer-0:3:1,test-writer-1:3:1'
Expand All @@ -90,6 +110,13 @@ jobs:
KAFKA_ADVERTISED_PORT: '9092'
KAFKA_ZOOKEEPER_CONNECT: 'localhost:2181'
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_LISTENERS: 'PLAINTEXT://:9092,SASL_PLAINTEXT://:9093'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093'
KAFKA_SASL_ENABLED_MECHANISMS: SCRAM-SHA-256,SCRAM-SHA-512,PLAIN
KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf"
CUSTOM_INIT_SCRIPT: |-
echo -e 'KafkaServer {\norg.apache.kafka.common.security.scram.ScramLoginModule required\n username="adminscram"\n password="admin-secret";\n org.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n };' > /opt/kafka/config/kafka_server_jaas.conf;
/opt/kafka/bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin-secret-256],SCRAM-SHA-512=[password=admin-secret-512]' --entity-type users --entity-name adminscram
steps:
- checkout
- setup_remote_docker: { reusable: true, docker_layer_caching: true }
Expand Down
85 changes: 85 additions & 0 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1333,3 +1333,88 @@ func (d *connDeadline) unsetConnWriteDeadline() {
d.wconn = nil
d.mutex.Unlock()
}

// saslHandshake sends the SASL handshake message. This will determine whether
// the Mechanism is supported by the cluster. If it's not, this function will
// error out with UnsupportedSASLMechanism.
//
// If the mechanism is unsupported, the handshake request will reply with the
// list of the cluster's configured mechanisms, which could potentially be used
// to facilitate negotiation. At the moment, we are not negotiating the
// mechanism as we believe that brokers are usually known to the client, and
// therefore the client should already know which mechanisms are supported.
//
// See http://kafka.apache.org/protocol.html#The_Messages_SaslHandshake
func (c *Conn) saslHandshake(mechanism string) error {
// The wire format for V0 and V1 is identical, but the version
// number will affect how the SASL authentication
// challenge/responses are sent
var resp saslHandshakeResponseV0
version := v0
if c.apiVersions[saslHandshakeRequest].MaxVersion >= 1 {
version = v1
}

err := c.writeOperation(
func(deadline time.Time, id int32) error {
return c.writeRequest(saslHandshakeRequest, version, id, &saslHandshakeRequestV0{Mechanism: mechanism})
},
func(deadline time.Time, size int) error {
return expectZeroSize(func() (int, error) {
return (&resp).readFrom(&c.rbuf, size)
}())
},
)
if err == nil && resp.ErrorCode != 0 {
err = Error(resp.ErrorCode)
}
return err
}

// saslAuthenticate sends the SASL authenticate message. This function must
// be immediately preceded by a successful saslHandshake.
//
// See http://kafka.apache.org/protocol.html#The_Messages_SaslAuthenticate
func (c *Conn) saslAuthenticate(data []byte) ([]byte, error) {
// if we sent a v1 handshake, then we must encapsulate the authentication
// request in a saslAuthenticateRequest. otherwise, we read and write raw
// bytes.
if c.apiVersions[saslHandshakeRequest].MaxVersion >= 1 {
var request = saslAuthenticateRequestV0{Data: data}
var response saslAuthenticateResponseV0

err := c.writeOperation(
func(deadline time.Time, id int32) error {
return c.writeRequest(saslAuthenticateRequest, v0, id, request)
},
func(deadline time.Time, size int) error {
return expectZeroSize(func() (remain int, err error) {
return (&response).readFrom(&c.rbuf, size)
}())
},
)
if err == nil && response.ErrorCode != 0 {
err = Error(response.ErrorCode)
}
return response.Data, err
}

// fall back to opaque bytes on the wire. the broker is expecting these if
// it just processed a v0 sasl handshake.
writeInt32(&c.wbuf, int32(len(data)))
if _, err := c.wbuf.Write(data); err != nil {
return nil, err
}
if err := c.wbuf.Flush(); err != nil {
return nil, err
}

var respLen int32
_, err := readInt32(&c.rbuf, 4, &respLen)
if err != nil {
return nil, err
}

resp, _, err := readNewBytes(&c.rbuf, int(respLen), int(respLen))
return resp, err
}
20 changes: 19 additions & 1 deletion conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"testing"
"time"

ktesting "github.com/segmentio/kafka-go/testing"
"golang.org/x/net/nettest"
)

Expand Down Expand Up @@ -257,7 +258,7 @@ func TestConn(t *testing.T) {
)

for _, test := range tests {
if !KafkaIsAtLeast(test.minVersion) {
if !ktesting.KafkaIsAtLeast(test.minVersion) {
t.Log("skipping " + test.scenario + " because broker is not at least version " + test.minVersion)
continue
}
Expand Down Expand Up @@ -980,6 +981,23 @@ func testBrokers(t *testing.T, conn *Conn) {
}
}

func TestUnsupportedSASLMechanism(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

conn, err := (&Dialer{
Resolver: &net.Resolver{},
}).DialContext(ctx, "tcp", "127.0.0.1:9093")
if err != nil {
t.Fatal("failed to open a new kafka connection:", err)
}
defer conn.Close()

if err := conn.saslHandshake("FOO"); err != UnsupportedSASLMechanism {
t.Errorf("Expected UnsupportedSASLMechanism but got %v", err)
}
}

const benchmarkMessageCount = 100

func BenchmarkConn(b *testing.B) {
Expand Down
82 changes: 70 additions & 12 deletions dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@ package kafka
import (
"context"
"crypto/tls"
"io"
"net"
"strconv"
"strings"
"time"

"github.com/segmentio/kafka-go/sasl"
)

// The Dialer type mirrors the net.Dialer API but is designed to open kafka
Expand Down Expand Up @@ -61,6 +64,10 @@ type Dialer struct {
// TLS enables Dialer to open secure connections. If nil, standard net.Conn
// will be used.
TLS *tls.Config

// SASLMechanism configures the Dialer to use SASL authentication. If nil,
// no authentication will be performed.
SASLMechanism sasl.Mechanism
}

// Dial connects to the address on the named network.
Expand Down Expand Up @@ -94,11 +101,7 @@ func (d *Dialer) DialContext(ctx context.Context, network string, address string
defer cancel()
}

c, err := d.dialContext(ctx, network, address)
if err != nil {
return nil, err
}
return NewConnWith(c, ConnConfig{ClientID: d.ClientID}), nil
return d.connect(ctx, network, address, ConnConfig{ClientID: d.ClientID})
}

// DialLeader opens a connection to the leader of the partition for a given
Expand All @@ -121,16 +124,11 @@ func (d *Dialer) DialLeader(ctx context.Context, network string, address string,
// descriptor. It's strongly advised to use descriptor of the partition that comes out of
// functions LookupPartition or LookupPartitions.
func (d *Dialer) DialPartition(ctx context.Context, network string, address string, partition Partition) (*Conn, error) {
c, err := d.dialContext(ctx, network, net.JoinHostPort(partition.Leader.Host, strconv.Itoa(partition.Leader.Port)))
if err != nil {
return nil, err
}

return NewConnWith(c, ConnConfig{
return d.connect(ctx, network, net.JoinHostPort(partition.Leader.Host, strconv.Itoa(partition.Leader.Port)), ConnConfig{
ClientID: d.ClientID,
Topic: partition.Topic,
Partition: partition.ID,
}), nil
})
}

// LookupLeader searches for the kafka broker that is the leader of the
Expand Down Expand Up @@ -242,6 +240,66 @@ func (d *Dialer) connectTLS(ctx context.Context, conn net.Conn, config *tls.Conf
return
}

// connect opens a socket connection to the broker, wraps it to create a
// kafka connection, and performs SASL authentication if configured to do so.
func (d *Dialer) connect(ctx context.Context, network, address string, connCfg ConnConfig) (*Conn, error) {

c, err := d.dialContext(ctx, network, address)
if err != nil {
return nil, err
}

conn := NewConnWith(c, connCfg)

if d.SASLMechanism != nil {
if err := d.authenticateSASL(ctx, conn); err != nil {
_ = conn.Close()
return nil, err
}
}

return conn, nil
}

// authenticateSASL performs all of the required requests to authenticate this
// connection. If any step fails, this function returns with an error. A nil
// error indicates successful authentication.
//
// In case of error, this function *does not* close the connection. That is the
// responsibility of the caller.
func (d *Dialer) authenticateSASL(ctx context.Context, conn *Conn) error {
mech, state, err := d.SASLMechanism.Start(ctx)
if err != nil {
return err
}
err = conn.saslHandshake(mech)
if err != nil {
return err
}

var completed bool
for !completed {
challenge, err := conn.saslAuthenticate(state)
switch err {
case nil:
case io.EOF:
// the broker may communicate a failed exchange by closing the
// connection (esp. in the case where we're passing opaque sasl
// data over the wire since there's no protocol info).
return SASLAuthenticationFailed
default:
return err
}

completed, state, err = d.SASLMechanism.Next(ctx, challenge)
if err != nil {
return err
}
}

return nil
}

func (d *Dialer) dialContext(ctx context.Context, network string, address string) (net.Conn, error) {
if r := d.Resolver; r != nil {
host, port := splitHostPort(address)
Expand Down
11 changes: 9 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
version: "3"
services:
kafka:
image: wurstmeister/kafka:0.11.0.1
image: wurstmeister/kafka:2.11-0.11.0.3
restart: on-failure:3
links:
- zookeeper
ports:
- "9092:9092"
- "9093:9093"
environment:
KAFKA_VERSION: '0.11.0.1'
KAFKA_BROKER_ID: 1
Expand All @@ -17,7 +18,13 @@ services:
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_MESSAGE_MAX_BYTES: 200000000

KAFKA_LISTENERS: 'PLAINTEXT://:9092,SASL_PLAINTEXT://:9093'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093'
KAFKA_SASL_ENABLED_MECHANISMS: 'PLAIN,SCRAM-SHA-256,SCRAM-SHA-512'
KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf"
CUSTOM_INIT_SCRIPT: |-
echo -e 'KafkaServer {\norg.apache.kafka.common.security.scram.ScramLoginModule required\n username="adminscram"\n password="admin-secret";\n org.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n };' > /opt/kafka/config/kafka_server_jaas.conf;
/opt/kafka/bin/kafka-configs.sh --zookeeper zookeeper:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin-secret-256],SCRAM-SHA-512=[password=admin-secret-512]' --entity-type users --entity-name adminscram
zookeeper:
image: wurstmeister/zookeeper
ports:
Expand Down
2 changes: 2 additions & 0 deletions protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ const (
syncGroupRequest apiKey = 14
describeGroupsRequest apiKey = 15
listGroupsRequest apiKey = 16
saslHandshakeRequest apiKey = 17
apiVersionsRequest apiKey = 18
createTopicsRequest apiKey = 19
deleteTopicsRequest apiKey = 20
saslAuthenticateRequest apiKey = 36
)

type apiVersion int16
Expand Down
Loading

0 comments on commit 49088e0

Please sign in to comment.