Skip to content

Commit

Permalink
port Reader to use Conn, drop sarama dependency
Browse files Browse the repository at this point in the history
  • Loading branch information
Achille Roussel committed May 31, 2017
1 parent b20b363 commit 894ff5e
Show file tree
Hide file tree
Showing 12 changed files with 764 additions and 135 deletions.
30 changes: 30 additions & 0 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func (batch *Batch) close() (err error) {
}

if conn != nil {
conn.rdeadline.unsetConnReadDeadline()
conn.mutex.Lock()
conn.offset = batch.offset
conn.mutex.Unlock()
Expand All @@ -89,6 +90,9 @@ func (batch *Batch) close() (err error) {
// again will keep returning that error. All errors except io.EOF (indicating
// that the program consumed all messages from the batch) are also returned by
// Close.
//
// The method fails with io.ErrShortBuffer if the buffer passed as argument is
// too small to hold the message value.
func (batch *Batch) Read(b []byte) (int, error) {
n := 0

Expand Down Expand Up @@ -128,6 +132,32 @@ func (batch *Batch) Read(b []byte) (int, error) {
return n, err
}

// ReadMessage reads and return the next message from the batch.
//
// Because this method allocate memory buffers for the message key and value
// it is less memory-efficient than Read, but has the advantage of never
// failing with io.ErrShortBuffer.
func (batch *Batch) ReadMessage() (Message, error) {
msg := Message{}
batch.mutex.Lock()

offset, timestamp, err := batch.readMessage(
func(r *bufio.Reader, size int, nbytes int) (remain int, err error) {
msg.Key, remain, err = readNewBytes(r, size, nbytes)
return
},
func(r *bufio.Reader, size int, nbytes int) (remain int, err error) {
msg.Value, remain, err = readNewBytes(r, size, nbytes)
return
},
)

batch.mutex.Unlock()
msg.Offset = offset
msg.Time = timestampToTime(timestamp)
return msg, err
}

func (batch *Batch) readMessage(
key func(r *bufio.Reader, size int, nbytes int) (int, error),
val func(r *bufio.Reader, size int, nbytes int) (int, error),
Expand Down
44 changes: 28 additions & 16 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,9 +210,6 @@ func (c *Conn) Offset() (offset int64, whence int) {
// offset, and 2 means relative to the last offset.
// The method returns the new absoluate offset of the connection.
func (c *Conn) Seek(offset int64, whence int) (int64, error) {
if offset < 0 {
return 0, fmt.Errorf("invalid negative offset (offset = %d)", offset)
}
switch whence {
case 0, 1, 2:
default:
Expand Down Expand Up @@ -250,8 +247,9 @@ func (c *Conn) Seek(offset int64, whence int) (int64, error) {
return offset, nil
}

// Read reads the message at the current offset of the connection, advancing
// the offset on success so the next call to Read produces the next message.
// Read reads the message at the current offset from the connection, advancing
// the offset on success so the next call to a read method will produce the next
// message.
// The method returns the number of bytes read, or an error if something went
// wrong.
//
Expand All @@ -260,22 +258,36 @@ func (c *Conn) Seek(offset int64, whence int) (int64, error) {
// be read and written by multiple goroutines, they could read duplicates, or
// messages may be seen by only some of the goroutines.
//
// The method fails with io.ErrShortBuffer if the buffer passed as argument is
// too small to hold the message value.
//
// This method is provided to satisfies the net.Conn interface but is much less
// efficient than using the more general purpose ReadBatch method.
func (c *Conn) Read(b []byte) (int, error) {
if len(b) == 0 {
return 0, nil
}

batch := c.ReadBatch(1, len(b))
n, readErr := batch.Read(b)
closeErr := batch.Close()

if readErr == nil && closeErr != nil {
readErr = closeErr
}
n, err := batch.Read(b)
return n, coalesceErrors(silentEOF(err), batch.Close())
}

return n, readErr
// ReadMessage reads the message at the current offset from the connection,
// advancing the offset on success so the next call to a read method will
// produce the next message.
//
// Because this method allocate memory buffers for the message key and value
// it is less memory-efficient than Read, but has the advantage of never
// failing with io.ErrShortBuffer.
//
// While it is safe to call Read concurrently from multiple goroutines it may
// be hard for the program to prodict the results as the connection offset will
// be read and written by multiple goroutines, they could read duplicates, or
// messages may be seen by only some of the goroutines.
//
// This method is provided for convenience purposes but is much less efficient
// than using the more general purpose ReadBatch method.
func (c *Conn) ReadMessage(maxBytes int) (Message, error) {
batch := c.ReadBatch(1, maxBytes)
msg, err := batch.ReadMessage()
return msg, coalesceErrors(silentEOF(err), batch.Close())
}

// ReadBatch reads a batch of messages from the kafka server. The method always
Expand Down
14 changes: 6 additions & 8 deletions conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ func init() {
rand.Seed(time.Now().UnixNano())
}

func makeTopic() string {
return fmt.Sprintf("kafka-go-%016x", rand.Int63())
}

func TestConn(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -167,10 +171,6 @@ func TestConn(t *testing.T) {
kafka = "localhost:9092"
)

makeTopic := func() string {
return fmt.Sprintf("kafka-go-%016x", rand.Int63())
}

for _, test := range tests {
testFunc := test.function
t.Run(test.scenario, func(t *testing.T) {
Expand Down Expand Up @@ -361,15 +361,13 @@ func testConnWriteBatchReadSequentially(t *testing.T, conn *Conn) {
t.Fatal(err)
}

b := make([]byte, 128)

for i := 0; i != 10; i++ {
n, err := conn.Read(b)
msg, err := conn.ReadMessage(128)
if err != nil {
t.Error(err)
continue
}
s := string(b[:n])
s := string(msg.Value)
if v, err := strconv.Atoi(s); err != nil {
t.Error(err)
} else if v != i {
Expand Down
29 changes: 26 additions & 3 deletions dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,27 @@ func (d *Dialer) dialContext(ctx context.Context, network string, address string
}).DialContext(ctx, network, address)
}

// DefaultDialer is the default dialer used when none is specified.
var DefaultDialer = &Dialer{
Timeout: 10 * time.Second,
DualStack: true,
}

// Dial is a convenience wrapper for DefaultDialer.Dial.
func Dial(network string, address string) (*Conn, error) {
return DefaultDialer.Dial(network, address)
}

// DialContext is a convenience wrapper for DefaultDialer.DialContext.
func DialContext(ctx context.Context, network string, address string) (*Conn, error) {
return DefaultDialer.DialContext(ctx, network, address)
}

// DialLeader is a convenience wrapper for DefaultDialer.DialLeader.
func DialLeader(ctx context.Context, network string, address string, topic string, partition int) (*Conn, error) {
return DefaultDialer.DialLeader(ctx, network, address, topic, partition)
}

// The Resolver interface is used as an abstraction to provide service discovery
// of the hosts of a kafka cluster.
type Resolver interface {
Expand All @@ -198,13 +219,15 @@ type Resolver interface {
LookupHost(ctx context.Context, host string) (addrs []string, err error)
}

func sleep(ctx context.Context, duration time.Duration) {
func sleep(ctx context.Context, duration time.Duration) bool {
timer := time.NewTimer(duration)
defer timer.Stop()
select {
case <-ctx.Done():
case <-timer.C:
return true
case <-ctx.Done():
return false
}
timer.Stop()
}

func backoff(attempt int, min time.Duration, max time.Duration) time.Duration {
Expand Down
14 changes: 10 additions & 4 deletions discard.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,20 @@ func discardInt64(r *bufio.Reader, sz int) (int, error) {
}

func discardString(r *bufio.Reader, sz int) (int, error) {
return readStringWith(r, sz, func(r *bufio.Reader, sz int, len int16) (int, error) {
return discardN(r, sz, int(len))
return readStringWith(r, sz, func(r *bufio.Reader, sz int, n int) (int, error) {
if n < 0 {
return sz, nil
}
return discardN(r, sz, n)
})
}

func discardBytes(r *bufio.Reader, sz int) (int, error) {
return readBytesWith(r, sz, func(r *bufio.Reader, sz int, len int32) (int, error) {
return discardN(r, sz, int(len))
return readBytesWith(r, sz, func(r *bufio.Reader, sz int, n int) (int, error) {
if n < 0 {
return sz, nil
}
return discardN(r, sz, n)
})
}

Expand Down
51 changes: 35 additions & 16 deletions error.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package kafka

import "fmt"
import (
"fmt"
"io"
)

// Error represents the different error codes that may be returned by kafka.
type Error int
Expand Down Expand Up @@ -57,7 +60,7 @@ const (
InvalidProducerIDMapping Error = 49
InvalidTransactionTimeout Error = 50
ConcurrentTransactions Error = 51
TransactionCorrdinatorFenced Error = 52
TransactionCoordinatorFenced Error = 52
TransactionalIDAuthorizationFailed Error = 53
SecurityDisabled Error = 54
BrokerAuthorizationFailed Error = 55
Expand Down Expand Up @@ -175,29 +178,29 @@ func (e Error) Title() string {
case UnsupportedForMessageFormat:
return "Unsupported For Message Format"
case PolicyViolation:
return ""
return "Policy Violation"
case OutOfOrderSequenceNumber:
return ""
return "Out Of Order Sequence Number"
case DuplicateSequenceNumber:
return ""
return "Duplicate Sequence Number"
case InvalidProducerEpoch:
return ""
return "Invalid Producer Epoch"
case InvalidTransactionState:
return ""
return "Invalid Transaction State"
case InvalidProducerIDMapping:
return ""
return "Invalid Producer ID Mapping"
case InvalidTransactionTimeout:
return ""
return "Invalid Transaction Timeout"
case ConcurrentTransactions:
return ""
case TransactionCorrdinatorFenced:
return ""
return "Concurrent Transactions"
case TransactionCoordinatorFenced:
return "Transaction Coordinator Fenced"
case TransactionalIDAuthorizationFailed:
return ""
return "Transactional ID Authorization Failed"
case SecurityDisabled:
return ""
return "Security Disabled"
case BrokerAuthorizationFailed:
return ""
return "Broker Authorization Failed"
}
return ""
}
Expand Down Expand Up @@ -307,7 +310,7 @@ func (e Error) Description() string {
return "the transaction timeout is larger than the maximum value allowed by the broker (as configured by max.transaction.timeout.ms)"
case ConcurrentTransactions:
return "the producer attempted to update a transaction while another concurrent operation on the same transaction was ongoing"
case TransactionCorrdinatorFenced:
case TransactionCoordinatorFenced:
return "the transaction coordinator sending a WriteTxnMarker is no longer the current coordinator for a given producer"
case TransactionalIDAuthorizationFailed:
return "the transactional ID authorization failed"
Expand All @@ -332,3 +335,19 @@ func isTemporary(err error) bool {
})
return ok && e.Temporary()
}

func silentEOF(err error) error {
if err == io.EOF {
err = nil
}
return err
}

func coalesceErrors(errs ...error) error {
for _, err := range errs {
if err != nil {
return err
}
}
return nil
}
Loading

0 comments on commit 894ff5e

Please sign in to comment.