Skip to content

Commit

Permalink
Isolation level (segmentio#224)
Browse files Browse the repository at this point in the history
* Add ReadBatchWithConfig

* Add isolation level parameter to writeRequest

* Use golang convention for contants' names

* Rename ReadBatchWithConfig

* Rename BatchReadConfig

* Inline config variable

* Add docs for new methods and structures
  • Loading branch information
VictorDenisov authored and achille-roussel committed Mar 5, 2019
1 parent f7759d3 commit d19f52f
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 12 deletions.
49 changes: 39 additions & 10 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,24 @@ type ConnConfig struct {
Partition int
}

// ReadBatchConfig is a configuration object used for reading batches of messages.
type ReadBatchConfig struct {
MinBytes int
MaxBytes int

// IsolationLevel controls the visibility of transactional records.
// ReadUncommitted makes all records visible. With ReadCommitted only
// non-transactional and committed records are visible.
IsolationLevel IsolationLevel
}

type IsolationLevel int8

const (
ReadUncommitted IsolationLevel = 0
ReadCommitted IsolationLevel = 1
)

var (
// DefaultClientID is the default value used as ClientID of kafka
// connections.
Expand Down Expand Up @@ -658,17 +676,27 @@ func (c *Conn) ReadMessage(maxBytes int) (Message, error) {
// gives the minimum and maximum number of bytes that it wants to receive from
// the kafka server.
func (c *Conn) ReadBatch(minBytes, maxBytes int) *Batch {
return c.ReadBatchWith(ReadBatchConfig{
MinBytes: minBytes,
MaxBytes: maxBytes,
})
}

// ReadBatchWith in every way is similar to ReadBatch. ReadBatch is configured
// with the default values in ReadBatchConfig except for minBytes and maxBytes.
func (c *Conn) ReadBatchWith(cfg ReadBatchConfig) *Batch {

var adjustedDeadline time.Time
var maxFetch = int(c.fetchMaxBytes)

if minBytes < 0 || minBytes > maxFetch {
return &Batch{err: fmt.Errorf("kafka.(*Conn).ReadBatch: minBytes of %d out of [1,%d] bounds", minBytes, maxFetch)}
if cfg.MinBytes < 0 || cfg.MinBytes > maxFetch {
return &Batch{err: fmt.Errorf("kafka.(*Conn).ReadBatch: minBytes of %d out of [1,%d] bounds", cfg.MinBytes, maxFetch)}
}
if maxBytes < 0 || maxBytes > maxFetch {
return &Batch{err: fmt.Errorf("kafka.(*Conn).ReadBatch: maxBytes of %d out of [1,%d] bounds", maxBytes, maxFetch)}
if cfg.MaxBytes < 0 || cfg.MaxBytes > maxFetch {
return &Batch{err: fmt.Errorf("kafka.(*Conn).ReadBatch: maxBytes of %d out of [1,%d] bounds", cfg.MaxBytes, maxFetch)}
}
if minBytes > maxBytes {
return &Batch{err: fmt.Errorf("kafka.(*Conn).ReadBatch: minBytes (%d) > maxBytes (%d)", minBytes, maxBytes)}
if cfg.MinBytes > cfg.MaxBytes {
return &Batch{err: fmt.Errorf("kafka.(*Conn).ReadBatch: minBytes (%d) > maxBytes (%d)", cfg.MinBytes, cfg.MaxBytes)}
}

offset, err := c.Seek(c.Offset())
Expand All @@ -689,9 +717,10 @@ func (c *Conn) ReadBatch(minBytes, maxBytes int) *Batch {
c.topic,
c.partition,
offset,
minBytes,
maxBytes+int(c.fetchMinSize),
cfg.MinBytes,
cfg.MaxBytes+int(c.fetchMinSize),
deadlineToTimeout(deadline, now),
int8(cfg.IsolationLevel),
)
default:
return writeFetchRequestV2(
Expand All @@ -701,8 +730,8 @@ func (c *Conn) ReadBatch(minBytes, maxBytes int) *Batch {
c.topic,
c.partition,
offset,
minBytes,
maxBytes+int(c.fetchMinSize),
cfg.MinBytes,
cfg.MaxBytes+int(c.fetchMinSize),
deadlineToTimeout(deadline, now),
)
}
Expand Down
4 changes: 2 additions & 2 deletions write.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func writeFetchRequestV2(w *bufio.Writer, correlationID int32, clientID, topic s
return w.Flush()
}

func writeFetchRequestV5(w *bufio.Writer, correlationID int32, clientID, topic string, partition int32, offset int64, minBytes, maxBytes int, maxWait time.Duration) error {
func writeFetchRequestV5(w *bufio.Writer, correlationID int32, clientID, topic string, partition int32, offset int64, minBytes, maxBytes int, maxWait time.Duration, isolationLevel int8) error {
h := requestHeader{
ApiKey: int16(fetchRequest),
ApiVersion: int16(v5),
Expand All @@ -196,7 +196,7 @@ func writeFetchRequestV5(w *bufio.Writer, correlationID int32, clientID, topic s
writeInt32(w, milliseconds(maxWait))
writeInt32(w, int32(minBytes))
writeInt32(w, int32(maxBytes))
writeInt8(w, int8(0)) // isolation level 0 - read uncommitted
writeInt8(w, isolationLevel) // isolation level 0 - read uncommitted

// topic array
writeArrayLen(w, 1)
Expand Down

0 comments on commit d19f52f

Please sign in to comment.