Skip to content

Commit

Permalink
Merge pull request #43 from ncdc/fix-idle-data-races
Browse files Browse the repository at this point in the history
Fix races in idling
  • Loading branch information
dmcgowan committed Feb 11, 2015
2 parents 13f2d13 + 101f994 commit 29e1da2
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 56 deletions.
82 changes: 40 additions & 42 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,71 +29,71 @@ type StreamHandler func(stream *Stream)
type AuthHandler func(header http.Header, slot uint8, parent uint32) bool

type idleAwareFramer struct {
f *spdy.Framer
conn *Connection
expired *time.Timer
startCh chan struct{}
f *spdy.Framer
conn *Connection
resetChan chan struct{}
setTimeoutChan chan time.Duration
timeout time.Duration
}

func newIdleAwareFramer(framer *spdy.Framer) *idleAwareFramer {
iaf := &idleAwareFramer{
f: framer,
startCh: make(chan struct{}, 1),
f: framer,
resetChan: make(chan struct{}, 2),
setTimeoutChan: make(chan time.Duration),
}
go iaf.monitor()
return iaf
}

func (i *idleAwareFramer) monitor() {
// wait for a non-zero timeout
<-i.startCh

var (
timer *time.Timer
expired <-chan time.Time
)
Loop:
for {
select {
case <-i.startCh:
// no-op
case <-i.expired.C:
case timeout := <-i.setTimeoutChan:
i.timeout = timeout
if timeout == 0 {
if timer != nil {
timer.Stop()
}
} else {
if timer == nil {
timer = time.NewTimer(timeout)
expired = timer.C
} else {
timer.Reset(timeout)
}
}
case <-i.resetChan:
if timer != nil && i.timeout > 0 {
timer.Reset(i.timeout)
}
case <-expired:
for _, stream := range i.conn.streams {
stream.Reset()
}
i.conn.Close()
break Loop
case <-i.conn.closeChan:
if timer != nil {
timer.Stop()
}
break Loop
}
}
}

func (i *idleAwareFramer) setTimeout(timeout time.Duration) {
switch {
case timeout == 0:
if i.expired != nil {
i.expired.Stop()
}
case timeout > 0:
// TODO there may be a race condition with multiple goroutines calling this,
// as there's no lock around i.expired. A lock would probably result in
// decreased performance, but that needs to be explored.
if i.expired == nil {
i.expired = time.NewTimer(timeout)
// tell the monitor it can start waiting for a timeout
i.startCh <- struct{}{}
} else {
i.expired.Reset(timeout)
}
}
}

func (i *idleAwareFramer) WriteFrame(frame spdy.Frame) error {
err := i.f.WriteFrame(frame)
if err != nil {
return err
}

if i.conn.idleTimeout > 0 {
i.setTimeout(i.conn.idleTimeout)
}
i.resetChan <- struct{}{}

return nil
}

Expand All @@ -103,9 +103,8 @@ func (i *idleAwareFramer) ReadFrame() (spdy.Frame, error) {
return nil, err
}

if i.conn.idleTimeout > 0 {
i.setTimeout(i.conn.idleTimeout)
}
i.resetChan <- struct{}{}

return frame, nil
}

Expand All @@ -119,7 +118,6 @@ type Connection struct {
lastStreamChan chan<- *Stream
goAwayTimeout time.Duration
closeTimeout time.Duration
idleTimeout time.Duration

streamLock *sync.RWMutex
streamCond *sync.Cond
Expand Down Expand Up @@ -183,6 +181,7 @@ func NewConnection(conn net.Conn, server bool) (*Connection, error) {
shutdownChan: make(chan error),
}
idleAwareFramer.conn = session
go idleAwareFramer.monitor()

return session, nil
}
Expand Down Expand Up @@ -730,8 +729,7 @@ func (s *Connection) SetCloseTimeout(timeout time.Duration) {
// SetIdleTimeout sets the amount of time the connection may sit idle before
// it is forcefully terminated.
func (s *Connection) SetIdleTimeout(timeout time.Duration) {
s.idleTimeout = timeout
s.framer.setTimeout(timeout)
s.framer.setTimeoutChan <- timeout
}

func (s *Connection) sendHeaders(headers http.Header, stream *Stream, fin bool) error {
Expand Down
23 changes: 9 additions & 14 deletions spdy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,23 +345,20 @@ func TestCloseNotification(t *testing.T) {
}
listen := listener.Addr().String()

var serverConn net.Conn
var serverSpdyConn *Connection
var err error
closeChan := make(chan struct{}, 1)
serverConnChan := make(chan net.Conn)
go func() {
serverConn, err = listener.Accept()
serverConn, err := listener.Accept()
if err != nil {
t.Fatalf("Error accepting: %v", err)
}

serverSpdyConn, err = NewConnection(serverConn, true)
serverSpdyConn, err := NewConnection(serverConn, true)
if err != nil {
t.Fatalf("Error creating server connection: %v", err)
}
go serverSpdyConn.Serve(NoOpStreamHandler)
<-serverSpdyConn.CloseChan()
close(closeChan)
serverConnChan <- serverConn
}()

conn, dialErr := net.Dial("tcp", listen)
Expand All @@ -376,13 +373,14 @@ func TestCloseNotification(t *testing.T) {
go spdyConn.Serve(NoOpStreamHandler)

// close client conn
err = conn.Close()
err := conn.Close()
if err != nil {
t.Fatalf("Error closing client connection: %v", err)
}

var serverConn net.Conn
select {
case <-closeChan:
case serverConn = <-serverConnChan:
case <-time.After(500 * time.Millisecond):
t.Fatal("Timed out waiting for connection closed notification")
}
Expand Down Expand Up @@ -567,16 +565,13 @@ func TestHalfClosedIdleTimeout(t *testing.T) {
}
listen := listener.Addr().String()

var serverConn net.Conn
var serverSpdyConn *Connection
var err error
go func() {
serverConn, err = listener.Accept()
serverConn, err := listener.Accept()
if err != nil {
t.Fatalf("Error accepting: %v", err)
}

serverSpdyConn, err = NewConnection(serverConn, true)
serverSpdyConn, err := NewConnection(serverConn, true)
if err != nil {
t.Fatalf("Error creating server connection: %v", err)
}
Expand Down

0 comments on commit 29e1da2

Please sign in to comment.