Skip to content

Commit

Permalink
add CommitMessages documentation (segmentio#58)
Browse files Browse the repository at this point in the history
* add CommitMessages documentation

* Revisit sync commits (segmentio#59)

* revisit sync commits

* drop commit request chan capacity

* remove error prone check

* shorter code
  • Loading branch information
achille-roussel authored Jan 10, 2018
1 parent 61718eb commit 7619f5f
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 82 deletions.
39 changes: 39 additions & 0 deletions commit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package kafka

// A commit represents the instruction of publishing an update of the last
// offset read by a program for a topic and partition.
type commit struct {
topic string
partition int
offset int64
}

// makeCommit builds a commit value from a message, the resulting commit takes
// its topic, partition, and offset from the message.
func makeCommit(msg Message) commit {
return commit{
topic: msg.Topic,
partition: msg.Partition,
offset: msg.Offset,
}
}

// makeCommits generates a slice of commits from a list of messages, it extracts
// the topic, partition, and offset of each message and builds the corresponding
// commit slice.
func makeCommits(msgs ...Message) []commit {
commits := make([]commit, len(msgs))

for i, m := range msgs {
commits[i] = makeCommit(m)
}

return commits
}

// commitRequest is the data type exchanged between the CommitMessages method
// and internals of the reader's implementation.
type commitRequest struct {
commits []commit
errch chan<- error
}
151 changes: 70 additions & 81 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ type Reader struct {
cancel context.CancelFunc
stop context.CancelFunc
done chan struct{}
commits chan []Message
commits chan commitRequest
version int64 // version holds the generation of the spawned readers
offset int64
lag int64
Expand All @@ -96,10 +96,12 @@ type Reader struct {
stats readerStats
}

// useConsumerGroup indicates the Reader is part of a consumer group
func (r *Reader) useConsumerGroup() bool {
return r.config.GroupID != ""
}
// useConsumerGroup indicates whether the Reader is part of a consumer group.
func (r *Reader) useConsumerGroup() bool { return r.config.GroupID != "" }

// useSyncCommits indicates whether the Reader is configured to perform sync or
// async commits.
func (r *Reader) useSyncCommits() bool { return r.config.CommitInterval == 0 }

// membership returns the group generationID and memberID of the reader.
//
Expand Down Expand Up @@ -676,20 +678,16 @@ func (r *Reader) commitOffsetsWithRetry(conn offsetCommitter, offsetStash offset
type offsetStash map[string]map[int]int64

// merge updates the offsetStash with the offsets from the provided messages
func (o offsetStash) merge(msgs ...Message) {
if o == nil {
return
}

for _, m := range msgs {
offsetsByPartition, ok := o[m.Topic]
func (o offsetStash) merge(commits []commit) {
for _, c := range commits {
offsetsByPartition, ok := o[c.topic]
if !ok {
offsetsByPartition = map[int]int64{}
o[m.Topic] = offsetsByPartition
o[c.topic] = offsetsByPartition
}

if offset, ok := offsetsByPartition[m.Partition]; !ok || m.Offset > offset {
offsetsByPartition[m.Partition] = m.Offset
if offset, ok := offsetsByPartition[c.partition]; !ok || c.offset > offset {
offsetsByPartition[c.partition] = c.offset
}
}
}
Expand All @@ -701,35 +699,19 @@ func (o offsetStash) reset() {
}
}

// isEmpty returns true if the offsetStash contains no entries
func (o offsetStash) isEmpty() bool {
return len(o) == 0
}

// commitLoopImmediate handles each commit synchronously
func (r *Reader) commitLoopImmediate(conn offsetCommitter, stop <-chan struct{}) {
offsetsByTopicAndPartition := offsetStash{}

for {
select {
case <-stop:
return

case msgs, ok := <-r.commits:
if !ok {
r.withErrorLogger(func(l *log.Logger) {
l.Println("reader commit channel unexpectedly closed")
})
return
}

offsetsByTopicAndPartition := offsetStash{}
offsetsByTopicAndPartition.merge(msgs...)

if err := r.commitOffsetsWithRetry(conn, offsetsByTopicAndPartition, defaultCommitRetries); err != nil {
r.withErrorLogger(func(l *log.Logger) {
l.Printf("unable to commit offset: %v", err)
})
return
}
case req := <-r.commits:
offsetsByTopicAndPartition.merge(req.commits)
req.errch <- r.commitOffsetsWithRetry(conn, offsetsByTopicAndPartition, defaultCommitRetries)
offsetsByTopicAndPartition.reset()
}
}
}
Expand All @@ -740,40 +722,25 @@ func (r *Reader) commitLoopInterval(conn offsetCommitter, stop <-chan struct{})
ticker := time.NewTicker(r.config.HeartbeatInterval)
defer ticker.Stop()

defer func() {
// commits any outstanding offsets on close
if err := r.commitOffsetsWithRetry(conn, r.offsetStash, defaultCommitRetries); err == nil {
commit := func() {
if err := r.commitOffsetsWithRetry(conn, r.offsetStash, defaultCommitRetries); err != nil {
r.withErrorLogger(func(l *log.Logger) { l.Print(err) })
} else {
r.offsetStash.reset()
}
}()
}

for {
select {
case <-stop:
commit()
return

case <-ticker.C:
if len(r.offsetStash) == 0 {
continue
}

if err := r.commitOffsetsWithRetry(conn, r.offsetStash, defaultCommitRetries); err != nil {
r.withErrorLogger(func(l *log.Logger) {
l.Printf("unable to commit offset: %v", err)
})
return
}
r.offsetStash.reset()

case msgs, ok := <-r.commits:
if !ok {
r.withErrorLogger(func(l *log.Logger) {
l.Println("reader commit channel unexpectedly closed")
})
return
}
commit()

r.offsetStash.merge(msgs...)
case req := <-r.commits:
r.offsetStash.merge(req.commits)
}
}
}
Expand Down Expand Up @@ -1106,7 +1073,7 @@ func NewReader(config ReaderConfig) *Reader {
msgs: make(chan readerMessage, config.QueueCapacity),
cancel: func() {},
done: make(chan struct{}),
commits: make(chan []Message),
commits: make(chan commitRequest),
stop: stop,
offset: firstOffset,
stctx: stctx,
Expand Down Expand Up @@ -1247,6 +1214,47 @@ func (r *Reader) FetchMessage(ctx context.Context) (Message, error) {
}
}

// CommitMessages commits the list of messages passed as argument. The program
// may pass a context to asynchronously cancel the commit operation when it was
// configured to be blocking.
func (r *Reader) CommitMessages(ctx context.Context, msgs ...Message) error {
if !r.useConsumerGroup() {
return errNotAvailable
}

var errch <-chan error
var sync = r.useSyncCommits()
var creq = commitRequest{
commits: makeCommits(msgs...),
}

if sync {
ch := make(chan error, 1)
errch, creq.errch = ch, ch
}

select {
case r.commits <- creq:
case <-ctx.Done():
return ctx.Err()
case <-r.stctx.Done():
// This context is used to ensure we don't allow commits after the
// reader was closed.
return io.ErrClosedPipe
}

if !sync {
return nil
}

select {
case <-ctx.Done():
return ctx.Err()
case err := <-errch:
return err
}
}

// ReadLag returns the current lag of the reader by fetching the last offset of
// the topic and partition and computing the difference between that value and
// the offset of the last message returned by ReadMessage.
Expand Down Expand Up @@ -1501,25 +1509,6 @@ func (r *Reader) start(offsetsByPartition map[int]int64) {
}
}

func (r *Reader) CommitMessages(ctx context.Context, msgs ...Message) error {
if len(msgs) == 0 {
return nil
}

if !r.useConsumerGroup() {
return errNotAvailable
}

select {
case <-ctx.Done():
return ctx.Err()
case <-r.stctx.Done():
return r.stctx.Err()
case r.commits <- msgs:
return nil
}
}

// A reader reads messages from kafka and produces them on its channels, it's
// used as an way to asynchronously fetch messages while the main program reads
// them using the high level reader API.
Expand Down
2 changes: 1 addition & 1 deletion reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1124,7 +1124,7 @@ func TestOffsetStash(t *testing.T) {

for label, test := range tests {
t.Run(label, func(t *testing.T) {
test.Given.merge(test.Messages...)
test.Given.merge(makeCommits(test.Messages...))
if !reflect.DeepEqual(test.Expected, test.Given) {
t.Errorf("expected %v; got %v", test.Expected, test.Given)
}
Expand Down

0 comments on commit 7619f5f

Please sign in to comment.