Skip to content

Commit

Permalink
revisit wire corruption detection (segmentio#324)
Browse files Browse the repository at this point in the history
* revisit wire corruption detection

* PR comments
  • Loading branch information
Achille authored Oct 7, 2019
1 parent 5341c5f commit 5e37e04
Showing 1 changed file with 34 additions and 32 deletions.
66 changes: 34 additions & 32 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ type Conn struct {
// base network connection
conn net.Conn

// number of inflight requests on the connection.
inflight int32

// offset management (synchronized on the mutex field)
mutex sync.Mutex
offset int64
Expand Down Expand Up @@ -1236,6 +1239,18 @@ func (c *Conn) writeOperation(write func(time.Time, int32) error, read func(time
return c.do(&c.wdeadline, write, read)
}

func (c *Conn) enter() {
atomic.AddInt32(&c.inflight, +1)
}

func (c *Conn) leave() {
atomic.AddInt32(&c.inflight, -1)
}

func (c *Conn) concurrency() int {
return int(atomic.LoadInt32(&c.inflight))
}

func (c *Conn) do(d *connDeadline, write func(time.Time, int32) error, read func(time.Time, int) error) error {
id, err := c.doRequest(d, write)
if err != nil {
Expand All @@ -1261,6 +1276,7 @@ func (c *Conn) do(d *connDeadline, write func(time.Time, int32) error, read func
}

func (c *Conn) doRequest(d *connDeadline, write func(time.Time, int32) error) (id int32, err error) {
c.enter()
c.wlock.Lock()
c.correlationID++
id = c.correlationID
Expand All @@ -1272,67 +1288,53 @@ func (c *Conn) doRequest(d *connDeadline, write func(time.Time, int32) error) (i
// recoverable state so we're better off just giving up at this point to
// avoid any risk of corrupting the following operations.
c.conn.Close()
c.leave()
}

c.wlock.Unlock()
return
}

func (c *Conn) waitResponse(d *connDeadline, id int32) (deadline time.Time, size int, lock *sync.Mutex, err error) {
// I applied exactly zero scientific process to choose this value,
// it seemed to worked fine in practice tho.
//
// My guess is 100 iterations where the goroutine gets descheduled
// by calling runtime.Gosched() may end up on a wait of ~10ms to ~1s
// (if the programs is heavily CPU bound and has lots of goroutines),
// so it should allow for bailing quickly without taking too much risk
// to get false positives.
const maxAttempts = 100
var lastID int32

for attempt := 0; attempt < maxAttempts; {
for {
var rsz int32
var rid int32

c.rlock.Lock()
deadline = d.setConnReadDeadline(c.conn)
rsz, rid, err = c.peekResponseSizeAndID()

if rsz, rid, err = c.peekResponseSizeAndID(); err != nil {
if err != nil {
d.unsetConnReadDeadline()
c.conn.Close()
c.rlock.Unlock()
return
break
}

if id == rid {
c.skipResponseSizeAndID()
size, lock = int(rsz-4), &c.rlock
return
// Don't unlock the read mutex to yield ownership to the caller.
break
}

if c.concurrency() == 1 {
// If the goroutine is the only one waiting on this connection it
// should be impossible to read a correlation id different from the
// one it expects. This is a sign that the data we are reading on
// the wire is corrupted and the connection needs to be closed.
err = io.ErrNoProgress
c.rlock.Unlock()
break
}

// Optimistically release the read lock if a response has already
// been received but the current operation is not the target for it.
c.rlock.Unlock()
runtime.Gosched()

// This check is a safety mechanism, if we make too many loop
// iterations and always draw the same id then we could be facing
// corrupted data on the wire, or the goroutine(s) sharing ownership
// of this connection may have panicked and therefore will not be able
// to participate in consuming bytes from the wire. To prevent entering
// an infinite loop which reads the same value over and over we bail
// with the uncommon io.ErrNoProgress error which should give a good
// enough signal about what is going wrong.
if rid != lastID {
attempt++
} else {
attempt = 0
}

lastID = rid
}

err = io.ErrNoProgress
c.leave()
return
}

Expand Down

0 comments on commit 5e37e04

Please sign in to comment.