Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into fix-retries-2
Browse files Browse the repository at this point in the history
  • Loading branch information
jnjackins committed Jun 15, 2020
2 parents 2695e01 + f490bbc commit 027dc5f
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 94 deletions.
15 changes: 9 additions & 6 deletions consumergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ const (
defaultJoinGroupBackoff = 5 * time.Second

// defaultRetentionTime holds the length of time a the consumer group will be
// saved by kafka
defaultRetentionTime = time.Hour * 24
// saved by kafka. This value tells the broker to use its configured value.
defaultRetentionTime = -1 * time.Millisecond

// defaultPartitionWatchTime contains the amount of time the kafka-go will wait to
// query the brokers looking for partition changes.
Expand Down Expand Up @@ -119,10 +119,13 @@ type ConsumerGroupConfig struct {
// Default: 5s
JoinGroupBackoff time.Duration

// RetentionTime optionally sets the length of time the consumer group will be saved
// by the broker
// RetentionTime optionally sets the length of time the consumer group will
// be saved by the broker. -1 will disable the setting and leave the
// retention up to the broker's offsets.retention.minutes property. By
// default, that setting is 1 day for kafka < 2.0 and 7 days for kafka >=
// 2.0.
//
// Default: 24h
// Default: -1
RetentionTime time.Duration

// StartOffset determines from whence the consumer group should begin
Expand Down Expand Up @@ -212,7 +215,7 @@ func (config *ConsumerGroupConfig) Validate() error {
return errors.New(fmt.Sprintf("JoinGroupBackoff out of bounds: %d", config.JoinGroupBackoff))
}

if config.RetentionTime < 0 {
if config.RetentionTime < 0 && config.RetentionTime != defaultRetentionTime {
return errors.New(fmt.Sprintf("RetentionTime out of bounds: %d", config.RetentionTime))
}

Expand Down
10 changes: 5 additions & 5 deletions consumergroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,11 @@ func TestValidateConsumerGroupConfig(t *testing.T) {
{config: ConsumerGroupConfig{Brokers: []string{"broker1"}, Topics: []string{"t1"}, ID: "group1", SessionTimeout: -1}, errorOccured: true},
{config: ConsumerGroupConfig{Brokers: []string{"broker1"}, Topics: []string{"t1"}, ID: "group1", HeartbeatInterval: 2, SessionTimeout: -1}, errorOccured: true},
{config: ConsumerGroupConfig{Brokers: []string{"broker1"}, Topics: []string{"t1"}, ID: "group1", HeartbeatInterval: 2, SessionTimeout: 2, RebalanceTimeout: -2}, errorOccured: true},
{config: ConsumerGroupConfig{Brokers: []string{"broker1"}, Topics: []string{"t1"}, ID: "group1", HeartbeatInterval: 2, SessionTimeout: 2, RebalanceTimeout: 2, RetentionTime: -1}, errorOccured: true},
{config: ConsumerGroupConfig{Brokers: []string{"broker1"}, Topics: []string{"t1"}, ID: "group1", HeartbeatInterval: 2, SessionTimeout: 2, RebalanceTimeout: 2, RetentionTime: 1, StartOffset: 123}, errorOccured: true},
{config: ConsumerGroupConfig{Brokers: []string{"broker1"}, Topics: []string{"t1"}, ID: "group1", HeartbeatInterval: 2, SessionTimeout: 2, RebalanceTimeout: 2, RetentionTime: 1, PartitionWatchInterval: -1}, errorOccured: true},
{config: ConsumerGroupConfig{Brokers: []string{"broker1"}, Topics: []string{"t1"}, ID: "group1", HeartbeatInterval: 2, SessionTimeout: 2, RebalanceTimeout: 2, RetentionTime: 1, PartitionWatchInterval: 1, JoinGroupBackoff: -1}, errorOccured: true},
{config: ConsumerGroupConfig{Brokers: []string{"broker1"}, Topics: []string{"t1"}, ID: "group1", HeartbeatInterval: 2, SessionTimeout: 2, RebalanceTimeout: 2, RetentionTime: 1, PartitionWatchInterval: 1, JoinGroupBackoff: 1}, errorOccured: false},
{config: ConsumerGroupConfig{Brokers: []string{"broker1"}, Topics: []string{"t1"}, ID: "group1", HeartbeatInterval: 2, SessionTimeout: 2, RebalanceTimeout: 2, RetentionTime: -2}, errorOccured: true},
{config: ConsumerGroupConfig{Brokers: []string{"broker1"}, Topics: []string{"t1"}, ID: "group1", HeartbeatInterval: 2, SessionTimeout: 2, RebalanceTimeout: 2, StartOffset: 123}, errorOccured: true},
{config: ConsumerGroupConfig{Brokers: []string{"broker1"}, Topics: []string{"t1"}, ID: "group1", HeartbeatInterval: 2, SessionTimeout: 2, RebalanceTimeout: 2, PartitionWatchInterval: -1}, errorOccured: true},
{config: ConsumerGroupConfig{Brokers: []string{"broker1"}, Topics: []string{"t1"}, ID: "group1", HeartbeatInterval: 2, SessionTimeout: 2, RebalanceTimeout: 2, PartitionWatchInterval: 1, JoinGroupBackoff: -1}, errorOccured: true},
{config: ConsumerGroupConfig{Brokers: []string{"broker1"}, Topics: []string{"t1"}, ID: "group1", HeartbeatInterval: 2, SessionTimeout: 2, RebalanceTimeout: 2, PartitionWatchInterval: 1, JoinGroupBackoff: 1}, errorOccured: false},
}
for _, test := range tests {
err := test.config.Validate()
Expand Down
23 changes: 11 additions & 12 deletions dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,18 +95,6 @@ func (d *Dialer) Dial(network string, address string) (*Conn, error) {
// 1 minute, the connect to each single address will be given 15 seconds to
// complete before trying the next one.
func (d *Dialer) DialContext(ctx context.Context, network string, address string) (*Conn, error) {
if d.Timeout != 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, d.Timeout)
defer cancel()
}

if !d.Deadline.IsZero() {
var cancel context.CancelFunc
ctx, cancel = context.WithDeadline(ctx, d.Deadline)
defer cancel()
}

return d.connect(
ctx,
network,
Expand Down Expand Up @@ -258,6 +246,17 @@ func (d *Dialer) connectTLS(ctx context.Context, conn net.Conn, config *tls.Conf
// connect opens a socket connection to the broker, wraps it to create a
// kafka connection, and performs SASL authentication if configured to do so.
func (d *Dialer) connect(ctx context.Context, network, address string, connCfg ConnConfig) (*Conn, error) {
if d.Timeout != 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, d.Timeout)
defer cancel()
}

if !d.Deadline.IsZero() {
var cancel context.CancelFunc
ctx, cancel = context.WithDeadline(ctx, d.Deadline)
defer cancel()
}

c, err := d.dialContext(ctx, network, address)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -1196,7 +1196,7 @@ func (r *reader) run(ctx context.Context, offset int64) {
// block relies on the batch repackaging real io.EOF errors as
// io.UnexpectedEOF. otherwise, we would end up swallowing real
// errors here.
break readLoop
errcount = 0
case UnknownTopicOrPartition:
r.withErrorLogger(func(log Logger) {
log.Printf("failed to read from current broker for partition %d of %s at offset %d, topic or parition not found on this broker, %v", r.partition, r.topic, offset, r.brokers)
Expand Down
5 changes: 1 addition & 4 deletions stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,7 @@ func (c *counter) observe(v int64) {
}

func (c *counter) snapshot() int64 {
p := c.ptr()
v := atomic.LoadInt64(p)
atomic.AddInt64(p, -v)
return v
return atomic.SwapInt64(c.ptr(), 0)
}

// gauge is an atomic integer that may be set to any arbitrary value, the value
Expand Down
111 changes: 48 additions & 63 deletions writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,83 +308,50 @@ func (w *Writer) WriteMessages(ctx context.Context, msgs ...Message) error {
res = make(chan error, len(msgs))
}
t0 := time.Now()
defer w.stats.writeTime.observeDuration(time.Since(t0))

for attempt := 0; attempt < w.config.MaxAttempts; attempt++ {
w.mutex.RLock()
w.mutex.RLock()
closed := w.closed
w.mutex.RUnlock()

if w.closed {
w.mutex.RUnlock()
return io.ErrClosedPipe
}
if closed {
return io.ErrClosedPipe
}

for i, msg := range msgs {
if int(msg.size()) > w.config.BatchBytes {
err := MessageTooLargeError{
Message: msg,
Remaining: msgs[i+1:],
}
w.mutex.RUnlock()
return err
}
select {
case w.msgs <- writerMessage{
msg: msg,
res: res,
}:
case <-ctx.Done():
w.mutex.RUnlock()
return ctx.Err()
for i, msg := range msgs {

if int(msg.size()) > w.config.BatchBytes {
err := MessageTooLargeError{
Message: msg,
Remaining: msgs[i+1:],
}
return err
}

w.mutex.RUnlock()

if w.config.Async {
break
}
wm := writerMessage{msg: msg, res: res}

var retry []Message

for i := 0; i != len(msgs); i++ {
select {
case e := <-res:
if e != nil {
if we, ok := e.(*writerError); ok {
w.stats.retries.observe(1)
retry, err = append(retry, we.msg), we.err
} else {
err = e
}
}
case <-ctx.Done():
return ctx.Err()
}
select {
case w.msgs <- wm:
case <-ctx.Done():
return ctx.Err()
}
}

if msgs = retry; len(msgs) == 0 {
break
}
if w.config.Async {
return nil
}

timer := time.NewTimer(backoff(attempt+1, 100*time.Millisecond, 1*time.Second))
for i := 0; i != len(msgs); i++ {
select {
case <-timer.C:
// Only clear the error (so we retry the loop) if we have more retries, otherwise
// we risk silencing the error.
if attempt < w.config.MaxAttempts-1 {
err = nil
case e := <-res:
if e != nil {
err = e
}
case <-ctx.Done():
err = ctx.Err()
case <-w.done:
err = io.ErrClosedPipe
}
timer.Stop()

if err != nil {
break
return ctx.Err()
}
}
w.stats.writeTime.observeDuration(time.Since(t0))

return err
}

Expand Down Expand Up @@ -571,6 +538,7 @@ type writer struct {
codec CompressionCodec
logger Logger
errorLogger Logger
maxAttempts int
}

func newWriter(partition int, config WriterConfig, stats *writerStats) *writer {
Expand All @@ -590,6 +558,7 @@ func newWriter(partition int, config WriterConfig, stats *writerStats) *writer {
codec: config.CompressionCodec,
logger: config.Logger,
errorLogger: config.ErrorLogger,
maxAttempts: config.MaxAttempts,
}
w.join.Add(1)
go w.run()
Expand Down Expand Up @@ -701,13 +670,15 @@ func (w *writer) run() {
if len(batch) == 0 {
continue
}

var err error
if conn, err = w.write(conn, batch, resch); err != nil {
if conn, err = w.writeWithRetries(conn, batch, resch); err != nil {
if conn != nil {
conn.Close()
conn = nil
}
}

idleConnDeadline = time.Now().Add(w.idleConnTimeout)
for i := range batch {
batch[i] = Message{}
Expand Down Expand Up @@ -737,6 +708,20 @@ func (w *writer) dial() (conn *Conn, err error) {
return
}

func (w *writer) writeWithRetries(conn *Conn, batch []Message, resch [](chan<- error)) (*Conn, error) {
var err error

for attempt := 0; attempt < w.maxAttempts; attempt++ {
conn, err = w.write(conn, batch, resch)
if err == nil {
break
}
w.stats.retries.observe(1)
time.Sleep(backoff(attempt+1, 100*time.Millisecond, 1*time.Second))
}
return conn, err
}

func (w *writer) write(conn *Conn, batch []Message, resch [](chan<- error)) (ret *Conn, err error) {
w.stats.writes.observe(1)
if conn == nil {
Expand Down
14 changes: 11 additions & 3 deletions zstd/zstd.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package zstd

import (
"io"
"runtime"
"sync"

zstdlib "github.com/klauspost/compress/zstd"
Expand Down Expand Up @@ -38,8 +39,9 @@ func (c *CompressionCodec) NewReader(r io.Reader) io.ReadCloser {
p := new(reader)
if cached := decPool.Get(); cached == nil {
p.dec, p.err = zstdlib.NewReader(r)
runtime.SetFinalizer(p, finalizeReader)
} else {
p.dec = cached.(*zstdlib.Decoder)
p = cached.(*reader)
p.err = p.dec.Reset(r)
}
return p
Expand All @@ -55,9 +57,8 @@ type reader struct {
// Close implements the io.Closer interface.
func (r *reader) Close() error {
if r.dec != nil {
decPool.Put(r.dec)
r.dec = nil
r.err = io.ErrClosedPipe
decPool.Put(r)
}
return nil
}
Expand Down Expand Up @@ -125,3 +126,10 @@ func (w *writer) ReadFrom(r io.Reader) (n int64, err error) {
}
return w.enc.ReadFrom(r)
}

// finalizeReader closes underlying resources managed by a reader.
func finalizeReader(r *reader) {
if r.dec != nil {
r.dec.Close()
}
}

0 comments on commit 027dc5f

Please sign in to comment.