Skip to content

Commit

Permalink
Move closeChan outside of data lock
Browse files Browse the repository at this point in the history
Signed-off-by: Derek McGowan <derek@mcgstyle.net>
  • Loading branch information
dmcgowan committed Jan 2, 2015
1 parent 6daa279 commit 8239520
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 49 deletions.
50 changes: 21 additions & 29 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,17 +262,16 @@ func (s *Connection) addStreamFrame(frame *spdy.SynStreamFrame) {
}

stream := &Stream{
streamId: frame.StreamId,
parent: parent,
conn: s,
startChan: make(chan error),
headers: frame.Headers,
finished: (frame.CFHeader.Flags & spdy.ControlFlagUnidirectional) != 0x00,
replyCond: sync.NewCond(new(sync.Mutex)),
dataChan: make(chan []byte),
headerChan: make(chan http.Header),
closeChan: make(chan bool),
shutdownChan: make(chan struct{}),
streamId: frame.StreamId,
parent: parent,
conn: s,
startChan: make(chan error),
headers: frame.Headers,
finished: (frame.CFHeader.Flags & spdy.ControlFlagUnidirectional) != 0x00,
replyCond: sync.NewCond(new(sync.Mutex)),
dataChan: make(chan []byte),
headerChan: make(chan http.Header),
closeChan: make(chan bool),
}
if frame.CFHeader.Flags&spdy.ControlFlagFin != 0x00 {
close(stream.dataChan)
Expand Down Expand Up @@ -413,15 +412,9 @@ func (s *Connection) handleDataFrame(frame *spdy.DataFrame) error {
stream.dataLock.RLock()
select {
case <-stream.closeChan:
break
default:
debugMessage("(%p) (%d) Data frame send chan", stream, stream.streamId)
select {
case stream.dataChan <- frame.Data:
debugMessage("(%p) (%d) Data frame sent", stream, stream.streamId)
case <-stream.shutdownChan:
debugMessage("(%p) (%d) Data frame not sent (stream shut down)", stream, stream.streamId)
}
debugMessage("(%p) (%d) Data frame not sent (stream shut down)", stream, stream.streamId)
case stream.dataChan <- frame.Data:
debugMessage("(%p) (%d) Data frame sent", stream, stream.streamId)
}
stream.dataLock.RUnlock()
}
Expand Down Expand Up @@ -500,15 +493,14 @@ func (s *Connection) CreateStream(headers http.Header, parent *Stream, fin bool)
}

stream := &Stream{
streamId: streamId,
parent: parent,
conn: s,
startChan: make(chan error),
headers: headers,
dataChan: make(chan []byte),
headerChan: make(chan http.Header),
closeChan: make(chan bool),
shutdownChan: make(chan struct{}),
streamId: streamId,
parent: parent,
conn: s,
startChan: make(chan error),
headers: headers,
dataChan: make(chan []byte),
headerChan: make(chan http.Header),
closeChan: make(chan bool),
}

debugMessage("(%p) (%p) Create stream", s, stream)
Expand Down
24 changes: 4 additions & 20 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,6 @@ type Stream struct {
replyCond *sync.Cond
replied bool
closeChan chan bool

shutdownLock sync.Mutex
shutdownChan chan struct{} // closed when Reset is called (no more R/W).
}

// WriteData writes data to stream, sending a dataframe per call
Expand Down Expand Up @@ -170,16 +167,6 @@ func (s *Stream) Close() error {
func (s *Stream) Reset() error {
s.conn.removeStream(s)

// only close it once.
s.shutdownLock.Lock()
select {
case <-s.shutdownChan:
// already was closed.
default:
close(s.shutdownChan)
}
s.shutdownLock.Unlock()

s.finishLock.Lock()
if s.finished {
s.finishLock.Unlock()
Expand All @@ -188,14 +175,11 @@ func (s *Stream) Reset() error {
s.finished = true
s.finishLock.Unlock()

close(s.closeChan)

// Ensure no frame handlers are attempting to send data
s.dataLock.Lock()
select {
case <-s.closeChan:
break
default:
close(s.dataChan)
close(s.closeChan)
}
close(s.dataChan)
s.dataLock.Unlock()

resetFrame := &spdy.RstStreamFrame{
Expand Down

0 comments on commit 8239520

Please sign in to comment.