Skip to content

Commit

Permalink
Don't try to read/write if connection is closed
Browse files Browse the repository at this point in the history
There appears to be an issue on OS X where the following sequence of
events occurs, leading to a deadlock:

1. Remote side closes its connection
2. Local side receives io.EOF trying to read the next frame
3. Local idleAwareFramer monitor goroutine exits, so nothing is
receiving from resetChan any more
4. Code using spdystream isn't aware of the EOF and tries to Reset() all
streams and Close() the connection
5. The attempt to write the stream reset frame does not return an error
when it logically should
6. The idleAwareFramer tries to send to the resetChan
7. In some instances, resetChan's buffer is full (because the monitor
isn't running any more)
8. We block here

To address this, add checks at the beginning of ReadFrame and WriteFrame
in idleAwareFramer to see if the connection's closeChan is closed. If
so, immediately return an error instead of trying to read/write using
the underlying framer.

Also remove some timeouts in the test code - these should be handled by
passing -test.timeout to the test executable instead.

Signed-off-by: Andy Goldstein <agoldste@redhat.com>
  • Loading branch information
Andy Goldstein committed Mar 9, 2015
1 parent e9bf991 commit b069eac
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 13 deletions.
12 changes: 12 additions & 0 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ Loop:
}

func (i *idleAwareFramer) WriteFrame(frame spdy.Frame) error {
select {
case <-i.conn.closeChan:
return errors.New("error writing frame: connection closed")
default:
}

err := i.f.WriteFrame(frame)
if err != nil {
return err
Expand All @@ -104,6 +110,12 @@ func (i *idleAwareFramer) WriteFrame(frame spdy.Frame) error {
}

func (i *idleAwareFramer) ReadFrame() (spdy.Frame, error) {
select {
case <-i.conn.closeChan:
return nil, errors.New("error reading frame: connection closed")
default:
}

frame, err := i.f.ReadFrame()
if err != nil {
return nil, err
Expand Down
119 changes: 106 additions & 13 deletions spdy_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package spdystream

import (
"bufio"
"bytes"
"io"
"net"
"net/http"
"net/http/httptest"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -322,8 +324,6 @@ func TestUnexpectedRemoteConnectionClosed(t *testing.T) {
if e == nil || e != io.EOF {
t.Fatalf("(%d) Expected to get an EOF stream error", tix)
}
case <-time.After(500 * time.Millisecond):
t.Fatalf("(%d) Timeout waiting for stream closure", tix)
}

closeErr = conn.Close()
Expand Down Expand Up @@ -381,8 +381,6 @@ func TestCloseNotification(t *testing.T) {
var serverConn net.Conn
select {
case serverConn = <-serverConnChan:
case <-time.After(500 * time.Millisecond):
t.Fatal("Timed out waiting for connection closed notification")
}

err = serverConn.Close()
Expand Down Expand Up @@ -522,11 +520,7 @@ func TestIdleNoData(t *testing.T) {
go spdyConn.Serve(NoOpStreamHandler)

spdyConn.SetIdleTimeout(10 * time.Millisecond)
select {
case <-spdyConn.CloseChan():
case <-time.After(20 * time.Millisecond):
t.Fatal("Timed out waiting for idle connection closure")
}
<-spdyConn.CloseChan()

closeErr := server.Close()
if closeErr != nil {
Expand Down Expand Up @@ -577,8 +571,6 @@ func TestIdleWithData(t *testing.T) {

writesFinished := false

expired := time.NewTimer(200 * time.Millisecond)

Loop:
for {
select {
Expand All @@ -589,8 +581,6 @@ Loop:
t.Fatal("Connection closed before all writes finished")
}
break Loop
case <-expired.C:
t.Fatal("Timed out waiting for idle connection closure")
}
}

Expand Down Expand Up @@ -784,6 +774,109 @@ func TestStreamResetWithDataRemaining(t *testing.T) {
wg.Wait()
}

type roundTripper struct {
conn net.Conn
}

func (s *roundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
r := *req
req = &r

conn, err := net.Dial("tcp", req.URL.Host)
if err != nil {
return nil, err
}

err = req.Write(conn)
if err != nil {
return nil, err
}

resp, err := http.ReadResponse(bufio.NewReader(conn), req)
if err != nil {
return nil, err
}

s.conn = conn

return resp, nil
}

// see https://github.com/GoogleCloudPlatform/kubernetes/issues/4882
func TestFramingAfterRemoteConnectionClosed(t *testing.T) {
server := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
streamCh := make(chan *Stream)

w.WriteHeader(http.StatusSwitchingProtocols)

netconn, _, _ := w.(http.Hijacker).Hijack()
conn, _ := NewConnection(netconn, true)
go conn.Serve(func(s *Stream) {
s.SendReply(http.Header{}, false)
streamCh <- s
})

stream := <-streamCh
io.Copy(stream, stream)

closeChan := make(chan struct{})
go func() {
stream.Reset()
conn.Close()
close(closeChan)
}()

<-closeChan
}))

server.Start()
defer server.Close()

req, err := http.NewRequest("GET", server.URL, nil)
if err != nil {
t.Fatalf("Error creating request: %s", err)
}

rt := &roundTripper{}
client := &http.Client{Transport: rt}

_, err = client.Do(req)
if err != nil {
t.Fatalf("unexpected error from client.Do: %s", err)
}

conn, err := NewConnection(rt.conn, false)
go conn.Serve(NoOpStreamHandler)

stream, err := conn.CreateStream(http.Header{}, nil, false)
if err != nil {
t.Fatalf("error creating client stream: %s", err)
}

n, err := stream.Write([]byte("hello"))
if err != nil {
t.Fatalf("error writing to stream: %s", err)
}
if n != 5 {
t.Fatalf("Expected to write 5 bytes, but actually wrote %d", n)
}

b := make([]byte, 5)
n, err = stream.Read(b)
if err != nil {
t.Fatalf("error reading from stream: %s", err)
}
if n != 5 {
t.Fatalf("Expected to read 5 bytes, but actually read %d", n)
}
if e, a := "hello", string(b[0:n]); e != a {
t.Fatalf("expected '%s', got '%s'", e, a)
}

stream.Reset()
conn.Close()
}

var authenticated bool

func authStreamHandler(stream *Stream) {
Expand Down

0 comments on commit b069eac

Please sign in to comment.