package kafka import ( "bufio" "errors" "io" "sync" "time" ) // A Batch is an iterator over a sequence of messages fetched from a kafka // server. // // Batches are created by calling (*Conn).ReadBatch. They hold a internal lock // on the connection, which is released when the batch is closed. Failing to // call a batch's Close method will likely result in a dead-lock when trying to // use the connection. // // Batches are safe to use concurrently from multiple goroutines. type Batch struct { mutex sync.Mutex conn *Conn lock *sync.Mutex msgs *messageSetReader deadline time.Time throttle time.Duration topic string partition int offset int64 highWaterMark int64 err error // The last offset in the batch. // // We use lastOffset to skip offsets that have been compacted away. // // We store lastOffset because we get lastOffset when we read a new message // but only try to handle compaction when we receive an EOF. However, when // we get an EOF we do not get the lastOffset. So there is a mismatch // between when we receive it and need to use it. lastOffset int64 } // Throttle gives the throttling duration applied by the kafka server on the // connection. func (batch *Batch) Throttle() time.Duration { return batch.throttle } // Watermark returns the current highest watermark in a partition. func (batch *Batch) HighWaterMark() int64 { return batch.highWaterMark } // Partition returns the batch partition. func (batch *Batch) Partition() int { return batch.partition } // Offset returns the offset of the next message in the batch. func (batch *Batch) Offset() int64 { batch.mutex.Lock() offset := batch.offset batch.mutex.Unlock() return offset } // Close closes the batch, releasing the connection lock and returning an error // if reading the batch failed for any reason. func (batch *Batch) Close() error { batch.mutex.Lock() err := batch.close() batch.mutex.Unlock() return err } func (batch *Batch) close() (err error) { conn := batch.conn lock := batch.lock batch.conn = nil batch.lock = nil if batch.msgs != nil { batch.msgs.discard() } if err = batch.err; errors.Is(batch.err, io.EOF) { err = nil } if conn != nil { conn.rdeadline.unsetConnReadDeadline() conn.mutex.Lock() conn.offset = batch.offset conn.mutex.Unlock() if err != nil { var kafkaError Error if !errors.As(err, &kafkaError) && !errors.Is(err, io.ErrShortBuffer) { conn.Close() } } } if lock != nil { lock.Unlock() } return } // Err returns a non-nil error if the batch is broken. This is the same error // that would be returned by Read, ReadMessage or Close (except in the case of // io.EOF which is never returned by Close). // // This method is useful when building retry mechanisms for (*Conn).ReadBatch, // the program can check whether the batch carried a error before attempting to // read the first message. // // Note that checking errors on a batch is optional, calling Read or ReadMessage // is always valid and can be used to either read a message or an error in cases // where that's convenient. func (batch *Batch) Err() error { return batch.err } // Read reads the value of the next message from the batch into b, returning the // number of bytes read, or an error if the next message couldn't be read. // // If an error is returned the batch cannot be used anymore and calling Read // again will keep returning that error. All errors except io.EOF (indicating // that the program consumed all messages from the batch) are also returned by // Close. // // The method fails with io.ErrShortBuffer if the buffer passed as argument is // too small to hold the message value. func (batch *Batch) Read(b []byte) (int, error) { n := 0 batch.mutex.Lock() offset := batch.offset _, _, _, err := batch.readMessage( func(r *bufio.Reader, size int, nbytes int) (int, error) { if nbytes < 0 { return size, nil } return discardN(r, size, nbytes) }, func(r *bufio.Reader, size int, nbytes int) (int, error) { if nbytes < 0 { return size, nil } // make sure there are enough bytes for the message value. return // errShortRead if the message is truncated. if nbytes > size { return size, errShortRead } n = nbytes // return value if nbytes > cap(b) { nbytes = cap(b) } if nbytes > len(b) { b = b[:nbytes] } nbytes, err := io.ReadFull(r, b[:nbytes]) if err != nil { return size - nbytes, err } return discardN(r, size-nbytes, n-nbytes) }, ) if err == nil && n > len(b) { n, err = len(b), io.ErrShortBuffer batch.err = io.ErrShortBuffer batch.offset = offset // rollback } batch.mutex.Unlock() return n, err } // ReadMessage reads and return the next message from the batch. // // Because this method allocate memory buffers for the message key and value // it is less memory-efficient than Read, but has the advantage of never // failing with io.ErrShortBuffer. func (batch *Batch) ReadMessage() (Message, error) { msg := Message{} batch.mutex.Lock() var offset, timestamp int64 var headers []Header var err error offset, timestamp, headers, err = batch.readMessage( func(r *bufio.Reader, size int, nbytes int) (remain int, err error) { msg.Key, remain, err = readNewBytes(r, size, nbytes) return }, func(r *bufio.Reader, size int, nbytes int) (remain int, err error) { msg.Value, remain, err = readNewBytes(r, size, nbytes) return }, ) // A batch may start before the requested offset so skip messages // until the requested offset is reached. for batch.conn != nil && offset < batch.conn.offset { if err != nil { break } offset, timestamp, headers, err = batch.readMessage( func(r *bufio.Reader, size int, nbytes int) (remain int, err error) { msg.Key, remain, err = readNewBytes(r, size, nbytes) return }, func(r *bufio.Reader, size int, nbytes int) (remain int, err error) { msg.Value, remain, err = readNewBytes(r, size, nbytes) return }, ) } batch.mutex.Unlock() msg.Topic = batch.topic msg.Partition = batch.partition msg.Offset = offset msg.HighWaterMark = batch.highWaterMark msg.Time = makeTime(timestamp) msg.Headers = headers return msg, err } func (batch *Batch) readMessage( key func(*bufio.Reader, int, int) (int, error), val func(*bufio.Reader, int, int) (int, error), ) (offset int64, timestamp int64, headers []Header, err error) { if err = batch.err; err != nil { return } var lastOffset int64 offset, lastOffset, timestamp, headers, err = batch.msgs.readMessage(batch.offset, key, val) switch { case err == nil: batch.offset = offset + 1 batch.lastOffset = lastOffset case errors.Is(err, errShortRead): // As an "optimization" kafka truncates the returned response after // producing MaxBytes, which could then cause the code to return // errShortRead. err = batch.msgs.discard() switch { case err != nil: // Since io.EOF is used by the batch to indicate that there is are // no more messages to consume, it is crucial that any io.EOF errors // on the underlying connection are repackaged. Otherwise, the // caller can't tell the difference between a batch that was fully // consumed or a batch whose connection is in an error state. batch.err = dontExpectEOF(err) case batch.msgs.remaining() == 0: // Because we use the adjusted deadline we could end up returning // before the actual deadline occurred. This is necessary otherwise // timing out the connection for real could end up leaving it in an // unpredictable state, which would require closing it. // This design decision was made to maximize the chances of keeping // the connection open, the trade off being to lose precision on the // read deadline management. err = checkTimeoutErr(batch.deadline) batch.err = err // Checks the following: // - `batch.err` for a "success" from the previous timeout check // - `batch.msgs.lengthRemain` to ensure that this EOF is not due // to MaxBytes truncation // - `batch.lastOffset` to ensure that the message format contains // `lastOffset` if errors.Is(batch.err, io.EOF) && batch.msgs.lengthRemain == 0 && batch.lastOffset != -1 { // Log compaction can create batches that end with compacted // records so the normal strategy that increments the "next" // offset as records are read doesn't work as the compacted // records are "missing" and never get "read". // // In order to reliably reach the next non-compacted offset we // jump past the saved lastOffset. batch.offset = batch.lastOffset + 1 } } default: // Since io.EOF is used by the batch to indicate that there is are // no more messages to consume, it is crucial that any io.EOF errors // on the underlying connection are repackaged. Otherwise, the // caller can't tell the difference between a batch that was fully // consumed or a batch whose connection is in an error state. batch.err = dontExpectEOF(err) } return } func checkTimeoutErr(deadline time.Time) (err error) { if !deadline.IsZero() && time.Now().After(deadline) { err = RequestTimedOut } else { err = io.EOF } return }