From 1171bca79222af3b10b6050d824cca37f614831f Mon Sep 17 00:00:00 2001 From: Andy Goldstein Date: Mon, 9 Mar 2015 16:03:38 -0400 Subject: [PATCH] bump(docker/spdystream):e731c8f9f19ffd7e51a469a2de1580c1dfbb4fae Fixes #4882 --- Godeps/Godeps.json | 2 +- .../docker/spdystream/connection.go | 44 ++++--- .../github.com/docker/spdystream/spdy_test.go | 119 ++++++++++++++++-- .../github.com/docker/spdystream/stream.go | 4 - 4 files changed, 131 insertions(+), 38 deletions(-) diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 67046acf22ffa..ae0450be5adb2 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -114,7 +114,7 @@ }, { "ImportPath": "github.com/docker/spdystream", - "Rev": "e9bf9912b85eec0ed6aaf317808a0eab25e3ca43" + "Rev": "e731c8f9f19ffd7e51a469a2de1580c1dfbb4fae" }, { "ImportPath": "github.com/elazarl/go-bindata-assetfs", diff --git a/Godeps/_workspace/src/github.com/docker/spdystream/connection.go b/Godeps/_workspace/src/github.com/docker/spdystream/connection.go index 58261b2e0bc66..846b934a93637 100644 --- a/Godeps/_workspace/src/github.com/docker/spdystream/connection.go +++ b/Godeps/_workspace/src/github.com/docker/spdystream/connection.go @@ -31,6 +31,7 @@ type AuthHandler func(header http.Header, slot uint8, parent uint32) bool type idleAwareFramer struct { f *spdy.Framer conn *Connection + writeLock sync.Mutex resetChan chan struct{} setTimeoutChan chan time.Duration timeout time.Duration @@ -47,8 +48,9 @@ func newIdleAwareFramer(framer *spdy.Framer) *idleAwareFramer { func (i *idleAwareFramer) monitor() { var ( - timer *time.Timer - expired <-chan time.Time + timer *time.Timer + expired <-chan time.Time + resetChan = i.resetChan ) Loop: for { @@ -67,7 +69,7 @@ Loop: timer.Reset(timeout) } } - case <-i.resetChan: + case <-resetChan: if timer != nil && i.timeout > 0 { timer.Reset(i.timeout) } @@ -87,12 +89,25 @@ Loop: if timer != nil { timer.Stop() } + i.writeLock.Lock() + close(resetChan) + i.resetChan = nil + i.writeLock.Unlock() break Loop } } + + // Drain resetChan + for _ = range resetChan { + } } func (i *idleAwareFramer) WriteFrame(frame spdy.Frame) error { + i.writeLock.Lock() + defer i.writeLock.Unlock() + if i.resetChan == nil { + return io.EOF + } err := i.f.WriteFrame(frame) if err != nil { return err @@ -109,15 +124,18 @@ func (i *idleAwareFramer) ReadFrame() (spdy.Frame, error) { return nil, err } + // resetChan should never be closed since it is only closed + // when the connection has closed its closeChan. This closure + // only occurs after all Reads have finished + // TODO (dmcgowan): refactor relationship into connection i.resetChan <- struct{}{} return frame, nil } type Connection struct { - conn net.Conn - framer *idleAwareFramer - writeLock sync.Mutex + conn net.Conn + framer *idleAwareFramer closeChan chan bool goneAway bool @@ -209,9 +227,7 @@ func (s *Connection) Ping() (time.Duration, error) { frame := &spdy.PingFrame{Id: pid} startTime := time.Now() - s.writeLock.Lock() writeErr := s.framer.WriteFrame(frame) - s.writeLock.Unlock() if writeErr != nil { return time.Duration(0), writeErr } @@ -512,8 +528,6 @@ func (s *Connection) handleDataFrame(frame *spdy.DataFrame) error { func (s *Connection) handlePingFrame(frame *spdy.PingFrame) error { if s.pingId&0x01 != frame.Id&0x01 { - s.writeLock.Lock() - defer s.writeLock.Unlock() return s.framer.WriteFrame(frame) } pingChan, pingOk := s.pingChans[frame.Id] @@ -663,9 +677,7 @@ func (s *Connection) Close() error { Status: spdy.GoAwayOK, } - s.writeLock.Lock() err := s.framer.WriteFrame(goAwayFrame) - s.writeLock.Unlock() if err != nil { return err } @@ -750,8 +762,6 @@ func (s *Connection) sendHeaders(headers http.Header, stream *Stream, fin bool) CFHeader: spdy.ControlFrameHeader{Flags: flags}, } - s.writeLock.Lock() - defer s.writeLock.Unlock() return s.framer.WriteFrame(headerFrame) } @@ -767,8 +777,6 @@ func (s *Connection) sendReply(headers http.Header, stream *Stream, fin bool) er CFHeader: spdy.ControlFrameHeader{Flags: flags}, } - s.writeLock.Lock() - defer s.writeLock.Unlock() return s.framer.WriteFrame(replyFrame) } @@ -778,8 +786,6 @@ func (s *Connection) sendResetFrame(status spdy.RstStreamStatus, streamId spdy.S Status: status, } - s.writeLock.Lock() - defer s.writeLock.Unlock() return s.framer.WriteFrame(resetFrame) } @@ -806,8 +812,6 @@ func (s *Connection) sendStream(stream *Stream, fin bool) error { CFHeader: spdy.ControlFrameHeader{Flags: flags}, } - s.writeLock.Lock() - defer s.writeLock.Unlock() return s.framer.WriteFrame(streamFrame) } diff --git a/Godeps/_workspace/src/github.com/docker/spdystream/spdy_test.go b/Godeps/_workspace/src/github.com/docker/spdystream/spdy_test.go index a9f28224aaf79..9c8fa131a7ebd 100644 --- a/Godeps/_workspace/src/github.com/docker/spdystream/spdy_test.go +++ b/Godeps/_workspace/src/github.com/docker/spdystream/spdy_test.go @@ -1,10 +1,12 @@ package spdystream import ( + "bufio" "bytes" "io" "net" "net/http" + "net/http/httptest" "sync" "testing" "time" @@ -322,8 +324,6 @@ func TestUnexpectedRemoteConnectionClosed(t *testing.T) { if e == nil || e != io.EOF { t.Fatalf("(%d) Expected to get an EOF stream error", tix) } - case <-time.After(500 * time.Millisecond): - t.Fatalf("(%d) Timeout waiting for stream closure", tix) } closeErr = conn.Close() @@ -381,8 +381,6 @@ func TestCloseNotification(t *testing.T) { var serverConn net.Conn select { case serverConn = <-serverConnChan: - case <-time.After(500 * time.Millisecond): - t.Fatal("Timed out waiting for connection closed notification") } err = serverConn.Close() @@ -522,11 +520,7 @@ func TestIdleNoData(t *testing.T) { go spdyConn.Serve(NoOpStreamHandler) spdyConn.SetIdleTimeout(10 * time.Millisecond) - select { - case <-spdyConn.CloseChan(): - case <-time.After(20 * time.Millisecond): - t.Fatal("Timed out waiting for idle connection closure") - } + <-spdyConn.CloseChan() closeErr := server.Close() if closeErr != nil { @@ -577,8 +571,6 @@ func TestIdleWithData(t *testing.T) { writesFinished := false - expired := time.NewTimer(200 * time.Millisecond) - Loop: for { select { @@ -589,8 +581,6 @@ Loop: t.Fatal("Connection closed before all writes finished") } break Loop - case <-expired.C: - t.Fatal("Timed out waiting for idle connection closure") } } @@ -784,6 +774,109 @@ func TestStreamResetWithDataRemaining(t *testing.T) { wg.Wait() } +type roundTripper struct { + conn net.Conn +} + +func (s *roundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + r := *req + req = &r + + conn, err := net.Dial("tcp", req.URL.Host) + if err != nil { + return nil, err + } + + err = req.Write(conn) + if err != nil { + return nil, err + } + + resp, err := http.ReadResponse(bufio.NewReader(conn), req) + if err != nil { + return nil, err + } + + s.conn = conn + + return resp, nil +} + +// see https://github.com/GoogleCloudPlatform/kubernetes/issues/4882 +func TestFramingAfterRemoteConnectionClosed(t *testing.T) { + server := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + streamCh := make(chan *Stream) + + w.WriteHeader(http.StatusSwitchingProtocols) + + netconn, _, _ := w.(http.Hijacker).Hijack() + conn, _ := NewConnection(netconn, true) + go conn.Serve(func(s *Stream) { + s.SendReply(http.Header{}, false) + streamCh <- s + }) + + stream := <-streamCh + io.Copy(stream, stream) + + closeChan := make(chan struct{}) + go func() { + stream.Reset() + conn.Close() + close(closeChan) + }() + + <-closeChan + })) + + server.Start() + defer server.Close() + + req, err := http.NewRequest("GET", server.URL, nil) + if err != nil { + t.Fatalf("Error creating request: %s", err) + } + + rt := &roundTripper{} + client := &http.Client{Transport: rt} + + _, err = client.Do(req) + if err != nil { + t.Fatalf("unexpected error from client.Do: %s", err) + } + + conn, err := NewConnection(rt.conn, false) + go conn.Serve(NoOpStreamHandler) + + stream, err := conn.CreateStream(http.Header{}, nil, false) + if err != nil { + t.Fatalf("error creating client stream: %s", err) + } + + n, err := stream.Write([]byte("hello")) + if err != nil { + t.Fatalf("error writing to stream: %s", err) + } + if n != 5 { + t.Fatalf("Expected to write 5 bytes, but actually wrote %d", n) + } + + b := make([]byte, 5) + n, err = stream.Read(b) + if err != nil { + t.Fatalf("error reading from stream: %s", err) + } + if n != 5 { + t.Fatalf("Expected to read 5 bytes, but actually read %d", n) + } + if e, a := "hello", string(b[0:n]); e != a { + t.Fatalf("expected '%s', got '%s'", e, a) + } + + stream.Reset() + conn.Close() +} + var authenticated bool func authStreamHandler(stream *Stream) { diff --git a/Godeps/_workspace/src/github.com/docker/spdystream/stream.go b/Godeps/_workspace/src/github.com/docker/spdystream/stream.go index 1c5f79ae14033..45e65c1be8417 100644 --- a/Godeps/_workspace/src/github.com/docker/spdystream/stream.go +++ b/Godeps/_workspace/src/github.com/docker/spdystream/stream.go @@ -59,8 +59,6 @@ func (s *Stream) WriteData(data []byte, fin bool) error { Data: data, } - s.conn.writeLock.Lock() - defer s.conn.writeLock.Unlock() debugMessage("(%p) (%d) Writing data frame", s, s.streamId) return s.conn.framer.WriteFrame(dataFrame) } @@ -186,8 +184,6 @@ func (s *Stream) resetStream() error { StreamId: s.streamId, Status: spdy.Cancel, } - s.conn.writeLock.Lock() - defer s.conn.writeLock.Unlock() return s.conn.framer.WriteFrame(resetFrame) }