Skip to content

Commit

Permalink
Merge pull request #66 from ncdc/unblock-stream-read-after-close-then…
Browse files Browse the repository at this point in the history
…-reset

Make sure stream.Read() unblocks for close + reset
  • Loading branch information
dmcgowan committed Mar 10, 2016
2 parents 106e140 + 49d5046 commit 449fdfc
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 4 deletions.
67 changes: 67 additions & 0 deletions spdy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1072,6 +1072,73 @@ func TestClientConnectionStopsServingAfterGoAway(t *testing.T) {
<-readChan
}

func TestStreamReadUnblocksAfterCloseThenReset(t *testing.T) {
listener, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("Error listening: %v", err)
}
listen := listener.Addr().String()

serverConns := make(chan *Connection, 1)
go func() {
conn, connErr := listener.Accept()
if connErr != nil {
t.Fatal(connErr)
}
serverSpdyConn, err := NewConnection(conn, true)
if err != nil {
t.Fatalf("Error creating server connection: %v", err)
}
go serverSpdyConn.Serve(NoOpStreamHandler)
serverConns <- serverSpdyConn
}()

conn, dialErr := net.Dial("tcp", listen)
if dialErr != nil {
t.Fatalf("Error dialing server: %s", dialErr)
}

spdyConn, spdyErr := NewConnection(conn, false)
if spdyErr != nil {
t.Fatalf("Error creating spdy connection: %s", spdyErr)
}
go spdyConn.Serve(NoOpStreamHandler)

stream, err := spdyConn.CreateStream(http.Header{}, nil, false)
if err != nil {
t.Fatalf("Error creating stream: %v", err)
}
if err := stream.WaitTimeout(30 * time.Second); err != nil {
t.Fatalf("Timed out waiting for stream: %v", err)
}

readChan := make(chan struct{})
go func() {
_, err := ioutil.ReadAll(stream)
if err != nil {
t.Fatalf("Error reading stream: %v", err)
}
close(readChan)
}()

serverConn := <-serverConns
defer serverConn.Close()

if err := stream.Close(); err != nil {
t.Fatal(err)
}
if err := stream.Reset(); err != nil {
t.Fatal(err)
}

// make sure close followed by reset unblocks stream.Read()
select {
case <-readChan:
case <-time.After(10 * time.Second):
t.Fatal("Timed out waiting for stream read to unblock")
}
}

var authenticated bool

func authStreamHandler(stream *Stream) {
Expand Down
9 changes: 5 additions & 4 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,11 @@ func (s *Stream) Reset() error {
}

func (s *Stream) resetStream() error {
// Always call closeRemoteChannels, even if s.finished is already true.
// This makes it so that stream.Close() followed by stream.Reset() allows
// stream.Read() to unblock.
s.closeRemoteChannels()

s.finishLock.Lock()
if s.finished {
s.finishLock.Unlock()
Expand All @@ -178,8 +183,6 @@ func (s *Stream) resetStream() error {
s.finished = true
s.finishLock.Unlock()

s.closeRemoteChannels()

resetFrame := &spdy.RstStreamFrame{
StreamId: s.streamId,
Status: spdy.Cancel,
Expand Down Expand Up @@ -320,7 +323,5 @@ func (s *Stream) closeRemoteChannels() {
case <-s.closeChan:
default:
close(s.closeChan)
s.dataLock.Lock()
defer s.dataLock.Unlock()
}
}

0 comments on commit 449fdfc

Please sign in to comment.