Skip to content

Commit

Permalink
Complete chaincode execution on stream termination
Browse files Browse the repository at this point in the history
When the chaincode stream terminated with an error, ProcessStream would
return an error but did not do anything to release go routines waiting
for chaincode execution to complete. This would delay the release of the
transaction simulator lock until the execute timeout occurred.

This commit provides a "streamDone" signal back to the execute loop that
is used to complete execution when the chaincode stream terminates.

FAB-16610 #done

Change-Id: I9ba0b3549015a49ae40e8e10b5326e8a8681d128
Signed-off-by: Matthew Sykes <sykesmat@us.ibm.com>
  • Loading branch information
sykesm committed Nov 7, 2019
1 parent cd97bc3 commit fbb4c89
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 7 deletions.
25 changes: 20 additions & 5 deletions core/chaincode/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ type Handler struct {
UUIDGenerator UUIDGenerator
// AppConfig is used to retrieve the application config for a channel
AppConfig ApplicationConfigRetriever
// Metrics holds chaincode handler metrics
Metrics *HandlerMetrics

// state holds the current handler state. It will be created, established, or
// ready.
Expand All @@ -135,8 +137,10 @@ type Handler struct {
chatStream ccintf.ChaincodeStream
// errChan is used to communicate errors from the async send to the receive loop
errChan chan error
// Metrics holds chaincode handler metrics
Metrics *HandlerMetrics
// mutex is used to serialze the stream closed chan.
mutex sync.Mutex
// streamDoneChan is closed when the chaincode stream terminates.
streamDoneChan chan struct{}
}

// handleMessage is called by ProcessStream to dispatch messages.
Expand Down Expand Up @@ -342,9 +346,20 @@ func (h *Handler) deregister() {
h.Registry.Deregister(h.chaincodeID)
}

func (h *Handler) streamDone() <-chan struct{} {
h.mutex.Lock()
defer h.mutex.Unlock()
return h.streamDoneChan
}

func (h *Handler) ProcessStream(stream ccintf.ChaincodeStream) error {
defer h.deregister()

h.mutex.Lock()
h.streamDoneChan = make(chan struct{})
h.mutex.Unlock()
defer close(h.streamDoneChan)

h.chatStream = stream
h.errChan = make(chan error, 1)

Expand Down Expand Up @@ -1198,9 +1213,9 @@ func (h *Handler) Execute(txParams *ccprovider.TransactionParams, namespace stri
// are typically treated as error
case <-time.After(timeout):
err = errors.New("timeout expired while executing transaction")
h.Metrics.ExecuteTimeouts.With(
"chaincode", h.chaincodeID,
).Add(1)
h.Metrics.ExecuteTimeouts.With("chaincode", h.chaincodeID).Add(1)
case <-h.streamDone():
err = errors.New("chaincode stream terminated")
}

return ccresp, err
Expand Down
8 changes: 8 additions & 0 deletions core/chaincode/handler_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,11 @@ func SetHandlerChaincodeID(h *Handler, chaincodeID string) {
func SetHandlerChatStream(h *Handler, chatStream ccintf.ChaincodeStream) {
h.chatStream = chatStream
}

func StreamDone(h *Handler) <-chan struct{} {
return h.streamDone()
}

func SetStreamDoneChan(h *Handler, ch chan struct{}) {
h.streamDoneChan = ch
}
36 changes: 34 additions & 2 deletions core/chaincode/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2539,6 +2539,23 @@ var _ = Describe("Handler", func() {
})
})

Context("when the chaincode stream terminates", func() {
It("returns an error", func() {
streamDoneChan := make(chan struct{})
chaincode.SetStreamDoneChan(handler, streamDoneChan)

errCh := make(chan error, 1)
go func() {
_, err := handler.Execute(txParams, "chaincode-name", incomingMessage, time.Hour)
errCh <- err
}()
Consistently(errCh).ShouldNot(Receive())

close(streamDoneChan)
Eventually(errCh).Should(Receive(MatchError("chaincode stream terminated")))
})
})

Context("when execute times out", func() {
It("returns an error", func() {
errCh := make(chan error, 1)
Expand Down Expand Up @@ -2716,6 +2733,22 @@ var _ = Describe("Handler", func() {
Eventually(fakeChatStream.RecvCallCount).Should(Equal(100))
})

It("manages the stream done channel", func() {
releaseChan := make(chan struct{})
fakeChatStream.RecvStub = func() (*pb.ChaincodeMessage, error) {
<-releaseChan
return nil, errors.New("cc-went-away")
}
go handler.ProcessStream(fakeChatStream)
Eventually(fakeChatStream.RecvCallCount).Should(Equal(1))

streamDoneChan := chaincode.StreamDone(handler)
Consistently(streamDoneChan).ShouldNot(Receive())

close(releaseChan)
Eventually(streamDoneChan).Should(BeClosed())
})

Context("when receive fails with an io.EOF", func() {
BeforeEach(func() {
fakeChatStream.RecvReturns(nil, io.EOF)
Expand Down Expand Up @@ -2817,8 +2850,7 @@ var _ = Describe("Handler", func() {
Context("when an async error is sent", func() {
var (
incomingMessage *pb.ChaincodeMessage

recvChan chan *pb.ChaincodeMessage
recvChan chan *pb.ChaincodeMessage
)

BeforeEach(func() {
Expand Down

0 comments on commit fbb4c89

Please sign in to comment.