Skip to content

Commit

Permalink
Ensure Dialer.DialPartition honors configured timeout and deadline (s…
Browse files Browse the repository at this point in the history
  • Loading branch information
Steve van Loben Sels authored Apr 7, 2020
1 parent fae139e commit e0af1cf
Showing 1 changed file with 11 additions and 12 deletions.
23 changes: 11 additions & 12 deletions dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,18 +95,6 @@ func (d *Dialer) Dial(network string, address string) (*Conn, error) {
// 1 minute, the connect to each single address will be given 15 seconds to
// complete before trying the next one.
func (d *Dialer) DialContext(ctx context.Context, network string, address string) (*Conn, error) {
if d.Timeout != 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, d.Timeout)
defer cancel()
}

if !d.Deadline.IsZero() {
var cancel context.CancelFunc
ctx, cancel = context.WithDeadline(ctx, d.Deadline)
defer cancel()
}

return d.connect(
ctx,
network,
Expand Down Expand Up @@ -258,6 +246,17 @@ func (d *Dialer) connectTLS(ctx context.Context, conn net.Conn, config *tls.Conf
// connect opens a socket connection to the broker, wraps it to create a
// kafka connection, and performs SASL authentication if configured to do so.
func (d *Dialer) connect(ctx context.Context, network, address string, connCfg ConnConfig) (*Conn, error) {
if d.Timeout != 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, d.Timeout)
defer cancel()
}

if !d.Deadline.IsZero() {
var cancel context.CancelFunc
ctx, cancel = context.WithDeadline(ctx, d.Deadline)
defer cancel()
}

c, err := d.dialContext(ctx, network, address)
if err != nil {
Expand Down

0 comments on commit e0af1cf

Please sign in to comment.