Skip to content

Commit

Permalink
Merge pull request #39 from ncdc/idle-timeout
Browse files Browse the repository at this point in the history
Add connection idling
  • Loading branch information
dmcgowan committed Jan 29, 2015
2 parents 17e280a + 54b6d06 commit 81afc53
Show file tree
Hide file tree
Showing 2 changed files with 138 additions and 2 deletions.
43 changes: 41 additions & 2 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,46 @@ type StreamHandler func(stream *Stream)

type AuthHandler func(header http.Header, slot uint8, parent uint32) bool

type idleAwareFramer struct {
f *spdy.Framer
conn *Connection
}

func (i *idleAwareFramer) WriteFrame(frame spdy.Frame) error {
err := i.f.WriteFrame(frame)
if err != nil {
return err
}

if i.conn.idleTimeout > 0 {
i.conn.conn.SetDeadline(time.Now().Add(i.conn.idleTimeout))
}
return nil
}

func (i *idleAwareFramer) ReadFrame() (spdy.Frame, error) {
frame, err := i.f.ReadFrame()
if err != nil {
return nil, err
}

if i.conn.idleTimeout > 0 {
i.conn.conn.SetDeadline(time.Now().Add(i.conn.idleTimeout))
}
return frame, nil
}

type Connection struct {
conn net.Conn
framer *spdy.Framer
framer *idleAwareFramer
writeLock sync.Mutex

closeChan chan bool
goneAway bool
lastStreamChan chan<- *Stream
goAwayTimeout time.Duration
closeTimeout time.Duration
idleTimeout time.Duration

streamLock *sync.RWMutex
streamCond *sync.Cond
Expand All @@ -64,6 +94,7 @@ func NewConnection(conn net.Conn, server bool) (*Connection, error) {
if framerErr != nil {
return nil, framerErr
}
idleAwareFramer := &idleAwareFramer{f: framer}
var sid spdy.StreamId
var rid spdy.StreamId
var pid uint32
Expand All @@ -82,7 +113,7 @@ func NewConnection(conn net.Conn, server bool) (*Connection, error) {

session := &Connection{
conn: conn,
framer: framer,
framer: idleAwareFramer,

closeChan: make(chan bool),
goAwayTimeout: time.Duration(0),
Expand All @@ -99,6 +130,7 @@ func NewConnection(conn net.Conn, server bool) (*Connection, error) {

shutdownChan: make(chan error),
}
idleAwareFramer.conn = session

return session, nil
}
Expand Down Expand Up @@ -668,6 +700,13 @@ func (s *Connection) SetCloseTimeout(timeout time.Duration) {
s.closeTimeout = timeout
}

// SetIdleTimeout sets the amount of time the connection may sit idle before
// it is forcefully terminated.
func (s *Connection) SetIdleTimeout(timeout time.Duration) {
s.idleTimeout = timeout
s.conn.SetDeadline(time.Now().Add(s.idleTimeout))
}

func (s *Connection) sendHeaders(headers http.Header, stream *Stream, fin bool) error {
var flags spdy.ControlFlags
if fin {
Expand Down
97 changes: 97 additions & 0 deletions spdy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,103 @@ func TestCloseNotification(t *testing.T) {
}
}

func TestIdleNoData(t *testing.T) {
var wg sync.WaitGroup
server, listen, serverErr := runServer(&wg)
if serverErr != nil {
t.Fatalf("Error initializing server: %s", serverErr)
}

conn, dialErr := net.Dial("tcp", listen)
if dialErr != nil {
t.Fatalf("Error dialing server: %s", dialErr)
}

spdyConn, spdyErr := NewConnection(conn, false)
if spdyErr != nil {
t.Fatalf("Error creating spdy connection: %s", spdyErr)
}
go spdyConn.Serve(NoOpStreamHandler)

spdyConn.SetIdleTimeout(100 * time.Millisecond)
select {
case <-spdyConn.CloseChan():
case <-time.After(150 * time.Millisecond):
t.Fatal("Timed out waiting for idle connection closure")
}

closeErr := server.Close()
if closeErr != nil {
t.Fatalf("Error shutting down server: %s", closeErr)
}
wg.Wait()
}

func TestIdleWithData(t *testing.T) {
var wg sync.WaitGroup
server, listen, serverErr := runServer(&wg)
if serverErr != nil {
t.Fatalf("Error initializing server: %s", serverErr)
}

conn, dialErr := net.Dial("tcp", listen)
if dialErr != nil {
t.Fatalf("Error dialing server: %s", dialErr)
}

spdyConn, spdyErr := NewConnection(conn, false)
if spdyErr != nil {
t.Fatalf("Error creating spdy connection: %s", spdyErr)
}
go spdyConn.Serve(NoOpStreamHandler)

spdyConn.SetIdleTimeout(25 * time.Millisecond)

stream, err := spdyConn.CreateStream(http.Header{}, nil, false)
if err != nil {
t.Fatalf("Error creating stream: %v", err)
}

writeCh := make(chan struct{})

go func() {
b := []byte{1, 2, 3, 4, 5}
for i := 0; i < 10; i++ {
_, err = stream.Write(b)
if err != nil {
t.Fatalf("Error writing to stream: %v", err)
}
time.Sleep(10 * time.Millisecond)
}
close(writeCh)
}()

writesFinished := false

expired := time.NewTimer(200 * time.Millisecond)

Loop:
for {
select {
case <-writeCh:
writesFinished = true
case <-spdyConn.CloseChan():
if !writesFinished {
t.Fatal("Connection closed before all writes finished")
}
break Loop
case <-expired.C:
t.Fatal("Timed out waiting for idle connection closure")
}
}

closeErr := server.Close()
if closeErr != nil {
t.Fatalf("Error shutting down server: %s", closeErr)
}
wg.Wait()
}

var authenticated bool

func authStreamHandler(stream *Stream) {
Expand Down

0 comments on commit 81afc53

Please sign in to comment.