Skip to content

Commit

Permalink
0.4 broker resolver (segmentio#526)
Browse files Browse the repository at this point in the history
* 0.4: kafka.BrokerResolver

* add kafka.Transport.Context

* inline network and address fields in conn type
  • Loading branch information
Achille authored Sep 29, 2020
1 parent fd5a288 commit cb3dd79
Show file tree
Hide file tree
Showing 5 changed files with 196 additions and 28 deletions.
3 changes: 2 additions & 1 deletion client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ func newClient(addr net.Addr) (*Client, func()) {
}

transport := &Transport{
Dial: conns.Dial,
Dial: conns.Dial,
Resolver: NewBrokerResolver(nil),
}

client := &Client{
Expand Down
8 changes: 0 additions & 8 deletions dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,14 +407,6 @@ func LookupPartitions(ctx context.Context, network string, address string, topic
return DefaultDialer.LookupPartitions(ctx, network, address, topic)
}

// The Resolver interface is used as an abstraction to provide service discovery
// of the hosts of a kafka cluster.
type Resolver interface {
// LookupHost looks up the given host using the local resolver.
// It returns a slice of that host's addresses.
LookupHost(ctx context.Context, host string) (addrs []string, err error)
}

func sleep(ctx context.Context, duration time.Duration) bool {
if duration == 0 {
select {
Expand Down
62 changes: 62 additions & 0 deletions resolver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package kafka

import (
"context"
"net"
)

// The Resolver interface is used as an abstraction to provide service discovery
// of the hosts of a kafka cluster.
type Resolver interface {
// LookupHost looks up the given host using the local resolver.
// It returns a slice of that host's addresses.
LookupHost(ctx context.Context, host string) (addrs []string, err error)
}

// BrokerResolver is an interface implemented by types that translate host
// names into a network address.
//
// This resolver is not intended to be a general purpose interface. Instead,
// it is tailored to the particular needs of the kafka protocol, with the goal
// being to provide a flexible mechanism for extending broker name resolution
// while retaining context that is specific to interacting with a kafka cluster.
//
// Resolvers must be safe to use from multiple goroutines.
type BrokerResolver interface {
// Returns the IP addresses of the broker passed as argument.
LookupBrokerIPAddr(ctx context.Context, broker Broker) ([]net.IPAddr, error)
}

// NewBrokerResolver constructs a Resolver from r.
//
// If r is nil, net.DefaultResolver is used instead.
func NewBrokerResolver(r *net.Resolver) BrokerResolver {
return brokerResolver{r}
}

type brokerResolver struct {
*net.Resolver
}

func (r brokerResolver) LookupBrokerIPAddr(ctx context.Context, broker Broker) ([]net.IPAddr, error) {
rslv := r.Resolver
if rslv == nil {
rslv = net.DefaultResolver
}

ipAddrs, err := r.LookupIPAddr(ctx, broker.Host)
if err != nil {
return nil, err
}

if len(ipAddrs) == 0 {
return nil, &net.DNSError{
Err: "no addresses were returned by the resolver",
Name: broker.Host,
IsTemporary: true,
IsNotFound: true,
}
}

return ipAddrs, nil
}
149 changes: 131 additions & 18 deletions transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"net"
"runtime/pprof"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -93,6 +94,25 @@ type Transport struct {
// SASL configures the Transfer to use SASL authentication.
SASL sasl.Mechanism

// An optional resolver used to translate broker host names into network
// addresses.
//
// The resolver will be called for every request (not every connection),
// making it possible to implement ACL policies by validating that the
// program is allowed to connect to the kafka broker. This also means that
// the resolver should probably provide a caching layer to avoid storming
// the service discovery backend with requests.
//
// When set, the Dial function is not responsible for performing name
// resolution, and is always called with a pre-resolved address.
Resolver BrokerResolver

// The background context used to control goroutines started internally by
// the transport.
//
// If nil, context.Background() is used instead.
Context context.Context

mutex sync.RWMutex
pools map[networkAddress]*connPool
}
Expand Down Expand Up @@ -210,7 +230,7 @@ func (t *Transport) grabPool(addr net.Addr) *connPool {
return p
}

ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(t.context())

p = &connPool{
refc: 2,
Expand All @@ -222,6 +242,7 @@ func (t *Transport) grabPool(addr net.Addr) *connPool {
clientID: t.ClientID,
tls: t.TLS,
sasl: t.SASL,
resolver: t.Resolver,

ready: make(event),
wake: make(chan event),
Expand All @@ -239,6 +260,13 @@ func (t *Transport) grabPool(addr net.Addr) *connPool {
return p
}

func (t *Transport) context() context.Context {
if t.Context != nil {
return t.Context
}
return context.Background()
}

type event chan struct{}

func (e event) trigger() { close(e) }
Expand All @@ -255,6 +283,7 @@ type connPool struct {
clientID string
tls *tls.Config
sasl sasl.Mechanism
resolver BrokerResolver
// Signaling mechanisms to orchestrate communications between the pool and
// the rest of the program.
once sync.Once // ensure that `ready` is triggered only once
Expand Down Expand Up @@ -491,9 +520,11 @@ func (p *connPool) update(ctx context.Context, metadata *meta.Response, err erro

for id := range addBrokers {
broker := layout.Brokers[id]
p.conns[id] = p.newConnGroup(&networkAddress{
network: "tcp",
address: broker.String(),
p.conns[id] = p.newBrokerConnGroup(Broker{
Rack: broker.Rack,
Host: broker.Host,
Port: broker.Port,
ID: broker.ID,
})
}
}
Expand Down Expand Up @@ -559,12 +590,12 @@ func (p *connPool) discover(ctx context.Context, wake <-chan event) {
// returned.
func (p *connPool) grabBrokerConn(ctx context.Context, brokerID int) (*conn, error) {
p.mutex.RLock()
c := p.conns[brokerID]
g := p.conns[brokerID]
p.mutex.RUnlock()
if c == nil {
if g == nil {
return nil, BrokerNotAvailable
}
return c.grabConnOrConnect(ctx)
return g.grabConnOrConnect(ctx)
}

// grabClusterConn returns the connection to the kafka cluster that the pool is
Expand Down Expand Up @@ -754,6 +785,20 @@ func (p *connPool) newConnGroup(a net.Addr) *connGroup {
return &connGroup{
addr: a,
pool: p,
broker: Broker{
ID: -1,
},
}
}

func (p *connPool) newBrokerConnGroup(broker Broker) *connGroup {
return &connGroup{
addr: &networkAddress{
network: "tcp",
address: net.JoinHostPort(broker.Host, strconv.Itoa(broker.Port)),
},
pool: p,
broker: broker,
}
}

Expand Down Expand Up @@ -849,7 +894,8 @@ var defaultDialer = net.Dialer{
// actual network connections are lazily open before sending requests, and
// closed if they are unused for longer than the idle timeout.
type connGroup struct {
addr net.Addr
addr net.Addr
broker Broker
// Immutable state of the connection.
pool *connPool
// Shared state of the connection, this is synchronized on the mutex through
Expand All @@ -873,14 +919,50 @@ func (g *connGroup) closeIdleConns() {
}

func (g *connGroup) grabConnOrConnect(ctx context.Context) (*conn, error) {
c := g.grabConn()
var rslv = g.pool.resolver
var addr = g.addr
var c *conn

if rslv == nil {
c = g.grabConn()
} else {
var err error
var broker = g.broker

if broker.ID < 0 {
host, port, err := net.SplitHostPort(addr.String())
if err != nil {
return nil, fmt.Errorf("%s: %w", addr, err)
}
portNumber, err := strconv.Atoi(port)
if err != nil {
return nil, fmt.Errorf("%s: %w", addr, err)
}
broker.Host = host
broker.Port = portNumber
}

ipAddrs, err := rslv.LookupBrokerIPAddr(ctx, broker)
if err != nil {
return nil, err
}

for _, ipAddr := range ipAddrs {
network := addr.Network()
address := net.JoinHostPort(ipAddr.String(), strconv.Itoa(broker.Port))

if c = g.grabConnTo(network, address); c != nil {
break
}
}
}

if c == nil {
connChan := make(chan *conn)
errChan := make(chan error)

go func() {
c, err := g.connect(ctx)
c, err := g.connect(ctx, addr)
if err != nil {
select {
case errChan <- err:
Expand Down Expand Up @@ -909,6 +991,30 @@ func (g *connGroup) grabConnOrConnect(ctx context.Context) (*conn, error) {
return c, nil
}

func (g *connGroup) grabConnTo(network, address string) *conn {
g.mutex.Lock()
defer g.mutex.Unlock()

for i := len(g.idleConns) - 1; i >= 0; i-- {
c := g.idleConns[i]

if c.network == network && c.address == address {
copy(g.idleConns[i:], g.idleConns[i+1:])
n := len(g.idleConns) - 1
g.idleConns[n] = nil
g.idleConns = g.idleConns[:n]

if c.timer != nil {
c.timer.Stop()
}

return c
}
}

return nil
}

func (g *connGroup) grabConn() *conn {
g.mutex.Lock()
defer g.mutex.Unlock()
Expand Down Expand Up @@ -974,14 +1080,14 @@ func (g *connGroup) releaseConn(c *conn) bool {
return true
}

func (g *connGroup) connect(ctx context.Context) (*conn, error) {
func (g *connGroup) connect(ctx context.Context, addr net.Addr) (*conn, error) {
deadline := time.Now().Add(g.pool.dialTimeout)

ctx, cancel := context.WithDeadline(ctx, deadline)
defer cancel()

var network = strings.Split(g.addr.Network(), ",")
var address = strings.Split(g.addr.String(), ",")
var network = strings.Split(addr.Network(), ",")
var address = strings.Split(addr.String(), ",")
var netConn net.Conn
var netAddr net.Addr
var err error
Expand Down Expand Up @@ -1055,18 +1161,25 @@ func (g *connGroup) connect(ctx context.Context) (*conn, error) {
pc.SetDeadline(time.Time{})

reqs := make(chan connRequest)
c := &conn{reqs: reqs, group: g}
c := &conn{
network: netAddr.Network(),
address: netAddr.String(),
reqs: reqs,
group: g,
}
go c.run(pc, reqs)

netConn = nil
return c, nil
}

type conn struct {
reqs chan<- connRequest
once sync.Once
group *connGroup
timer *time.Timer
reqs chan<- connRequest
network string
address string
once sync.Once
group *connGroup
timer *time.Timer
}

func (c *conn) close() {
Expand Down
2 changes: 1 addition & 1 deletion transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestIssue477(t *testing.T) {
},
}

if _, err := cg.connect(context.Background()); err != nil {
if _, err := cg.connect(context.Background(), cg.addr); err != nil {
// An error is expected here because we are not actually establishing
// a TLS connection to a kafka broker.
t.Log(err)
Expand Down

0 comments on commit cb3dd79

Please sign in to comment.