Skip to content

Commit

Permalink
Fix possible data loss after goaway received
Browse files Browse the repository at this point in the history
All frames are processed by workers running in goroutines. If 1 worker's
queue is backed up with data frames and another worker receives and
processes a goaway frame, it's a race as to whether or not the workers
will be able to process the pending data frames before the streams are
removed from the streams map.

This fix processes the goaway frame synchronously in Serve() instead of
in a worker goroutine. We now wait for all the workers to drain their
frame queues before tearing down the streams.
  • Loading branch information
Andy Goldstein committed Sep 2, 2015
1 parent b2c3287 commit dc558d3
Showing 1 changed file with 22 additions and 8 deletions.
30 changes: 22 additions & 8 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,18 +262,29 @@ func (s *Connection) Ping() (time.Duration, error) {
// which are needed to fully initiate connections. Both clients and servers
// should call Serve in a separate goroutine before creating streams.
func (s *Connection) Serve(newHandler StreamHandler) {
// use a WaitGroup to wait for all frames to be drained after receiving
// go-away.
var wg sync.WaitGroup

// Parition queues to ensure stream frames are handled
// by the same worker, ensuring order is maintained
frameQueues := make([]*PriorityFrameQueue, FRAME_WORKERS)
for i := 0; i < FRAME_WORKERS; i++ {
frameQueues[i] = NewPriorityFrameQueue(QUEUE_SIZE)

// Ensure frame queue is drained when connection is closed
go func(frameQueue *PriorityFrameQueue) {
<-s.closeChan
frameQueue.Drain()
}(frameQueues[i])

go s.frameHandler(frameQueues[i], newHandler)
wg.Add(1)
go func(frameQueue *PriorityFrameQueue) {
// let the WaitGroup know this worker is done
defer wg.Done()

s.frameHandler(frameQueue, newHandler)
}(frameQueues[i])
}

var partitionRoundRobin int
Expand All @@ -283,7 +294,7 @@ func (s *Connection) Serve(newHandler StreamHandler) {
if err != io.EOF {
fmt.Errorf("frame read error: %s", err)
} else {
debugMessage("EOF received")
debugMessage("(%q) EOF received", s)
}
break
}
Expand Down Expand Up @@ -317,9 +328,9 @@ func (s *Connection) Serve(newHandler StreamHandler) {
partition = partitionRoundRobin
partitionRoundRobin = (partitionRoundRobin + 1) % FRAME_WORKERS
case *spdy.GoAwayFrame:
priority = 0
partition = partitionRoundRobin
partitionRoundRobin = (partitionRoundRobin + 1) % FRAME_WORKERS
// handle to go-away frame synchronously here, then exit the loop
s.handleGoAwayFrame(frame)
break
default:
priority = 7
partition = partitionRoundRobin
Expand All @@ -328,6 +339,9 @@ func (s *Connection) Serve(newHandler StreamHandler) {
frameQueues[partition].Push(readFrame, priority)
}
close(s.closeChan)
// wait for all frame handler workers to indicate they've drained their queues
// before closing remote channels and emptying s.streams
wg.Wait()

s.streamCond.L.Lock()
// notify streams that they're now closed, which will
Expand Down Expand Up @@ -514,12 +528,12 @@ func (s *Connection) handleDataFrame(frame *spdy.DataFrame) error {
debugMessage("(%p) Data frame received for %d", s, frame.StreamId)
stream, streamOk := s.getStream(frame.StreamId)
if !streamOk {
debugMessage("Data frame gone away for %d", frame.StreamId)
debugMessage("(%p) Data frame gone away for %d", s, frame.StreamId)
// Stream has already gone away
return nil
}
if !stream.replied {
debugMessage("Data frame not replied %d", frame.StreamId)
debugMessage("(%p) Data frame not replied %d", s, frame.StreamId)
// No reply received...Protocol error?
return nil
}
Expand Down Expand Up @@ -871,7 +885,7 @@ func (s *Connection) addStream(stream *Stream) {
func (s *Connection) removeStream(stream *Stream) {
s.streamCond.L.Lock()
delete(s.streams, stream.streamId)
debugMessage("Stream removed, broadcasting: %d", stream.streamId)
debugMessage("(%p) (%p) Stream removed, broadcasting: %d", s, stream, stream.streamId)
s.streamCond.Broadcast()
s.streamCond.L.Unlock()
}
Expand Down

0 comments on commit dc558d3

Please sign in to comment.