Skip to content

Commit

Permalink
support custom resolvers
Browse files Browse the repository at this point in the history
  • Loading branch information
Achille Roussel committed May 31, 2017
1 parent 73de687 commit c6b27e0
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 3 deletions.
5 changes: 4 additions & 1 deletion conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package kafka
import (
"context"
"fmt"
"net"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -36,7 +37,9 @@ func TestConn(t *testing.T) {

topic := fmt.Sprintf("kafka-go-%02d", atomic.AddInt32(&id, 1))

conn, err := (&Dialer{}).DialLeader(ctx, "tcp", "localhost:9092", topic, 0)
conn, err := (&Dialer{
Resolver: &net.Resolver{PreferGo: true},
}).DialLeader(ctx, "tcp", "localhost:9092", topic, 0)
if err != nil {
t.Fatal("failed to open a new kafka connection:", err)
}
Expand Down
33 changes: 31 additions & 2 deletions dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type Dialer struct {
KeepAlive time.Duration

// Resolver optionally specifies an alternate resolver to use.
Resolver *net.Resolver
Resolver Resolver
}

// Dial connects to the address on the named network.
Expand Down Expand Up @@ -170,15 +170,36 @@ func (d *Dialer) LookupLeader(ctx context.Context, network string, address strin
}

func (d *Dialer) dialContext(ctx context.Context, network string, address string) (net.Conn, error) {
if r := d.Resolver; r != nil {
host, port := splitHostPort(address)
addrs, err := r.LookupHost(ctx, host)
if err != nil {
return nil, err
}
if len(addrs) != 0 {
address = addrs[0]
}
if len(port) != 0 {
address, _ = splitHostPort(address)
address = net.JoinHostPort(address, port)
}
}
return (&net.Dialer{
LocalAddr: d.LocalAddr,
DualStack: d.DualStack,
FallbackDelay: d.FallbackDelay,
KeepAlive: d.KeepAlive,
Resolver: d.Resolver,
}).DialContext(ctx, network, address)
}

// The Resolver interface is used as an abstraction to provide service discovery
// of the hosts of a kafka cluster.
type Resolver interface {
// LookupHost looks up the given host using the local resolver.
// It returns a slice of that host's addresses.
LookupHost(ctx context.Context, host string) (addrs []string, err error)
}

func sleep(ctx context.Context, duration time.Duration) {
timer := time.NewTimer(duration)
select {
Expand All @@ -195,3 +216,11 @@ func backoff(attempt int, min time.Duration, max time.Duration) time.Duration {
}
return d
}

func splitHostPort(s string) (host string, port string) {
host, port, _ = net.SplitHostPort(s)
if len(host) == 0 && len(port) == 0 {
host = s
}
return
}

0 comments on commit c6b27e0

Please sign in to comment.