Skip to content

Commit

Permalink
Merge pull request grpc#1871 from tbetbetbe/grpc_ruby_corrects_the_ca…
Browse files Browse the repository at this point in the history
…ncel_after_first_response_test

Corrects the cancel_after_first_response test
  • Loading branch information
jtattermusch committed Jun 2, 2015
2 parents a6de798 + d62d345 commit 67a3c16
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 41 deletions.
4 changes: 3 additions & 1 deletion src/ruby/bin/interop/interop_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ def cancel_after_begin
op = @stub.streaming_input_call(reqs, return_op: true)
op.cancel
assert_raises(GRPC::Cancelled) { op.execute }
assert(op.cancelled, 'call operation should be CANCELLED')
p 'OK: cancel_after_begin'
end

Expand All @@ -282,7 +283,8 @@ def cancel_after_first_response
ppp = PingPongPlayer.new(msg_sizes)
op = @stub.full_duplex_call(ppp.each_item, return_op: true)
ppp.canceller_op = op # causes ppp to cancel after the 1st message
assert_raises(GRPC::Cancelled) { op.execute.each { |r| ppp.queue.push(r) } }
op.execute.each { |r| ppp.queue.push(r) }
assert(op.cancelled, 'call operation should be CANCELLED')
p 'OK: cancel_after_first_response'
end

Expand Down
11 changes: 7 additions & 4 deletions src/ruby/lib/grpc/generic/active_call.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ module GRPC
# The ActiveCall class provides simple methods for sending marshallable
# data to a call
class ActiveCall
include Core::StatusCodes
include Core::TimeConsts
include Core::CallOps
extend Forwardable
Expand Down Expand Up @@ -129,6 +128,11 @@ def output_metadata
@output_metadata ||= {}
end

# cancelled indicates if the call was cancelled
def cancelled
!@call.status.nil? && @call.status.code == Core::StatusCodes::CANCELLED
end

# multi_req_view provides a restricted view of this ActiveCall for use
# in a server client-streaming handler.
def multi_req_view
Expand Down Expand Up @@ -162,6 +166,7 @@ def writes_done(assert_finished = true)
ops[RECV_STATUS_ON_CLIENT] = nil if assert_finished
batch_result = @call.run_batch(@cq, self, INFINITE_FUTURE, ops)
return unless assert_finished
@call.status = batch_result.status
batch_result.check_status
end

Expand All @@ -178,6 +183,7 @@ def finished
@call.metadata.merge!(batch_result.status.metadata)
end
end
@call.status = batch_result.status
batch_result.check_status
end

Expand Down Expand Up @@ -410,9 +416,6 @@ def bidi_streamer(requests, **kw, &blk)
start_call(**kw) unless @started
bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, @deadline)
bd.run_on_client(requests, &blk)
rescue GRPC::Core::CallError => e
finished # checks for Cancelled
raise e
end

# run_server_bidi orchestrates a BiDi stream processing on a server.
Expand Down
64 changes: 28 additions & 36 deletions src/ruby/lib/grpc/generic/bidi_call.rb
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,9 @@ def initialize(call, q, marshal, unmarshal, deadline)
# @param requests the Enumerable of requests to send
# @return an Enumerator of requests to yield
def run_on_client(requests, &blk)
@enq_th = start_write_loop(requests)
@enq_th = Thread.new { write_loop(requests) }
@loop_th = start_read_loop
replies = each_queued_msg
return replies if blk.nil?
replies.each { |r| blk.call(r) }
each_queued_msg(&blk)
end

# Begins orchestration of the Bidi stream for a server generating replies.
Expand All @@ -98,9 +96,8 @@ def run_on_client(requests, &blk)
# @param gen_each_reply [Proc] generates the BiDi stream replies.
def run_on_server(gen_each_reply)
replys = gen_each_reply.call(each_queued_msg)
@enq_th = start_write_loop(replys, is_client: false)
@loop_th = start_read_loop
@enq_th.join if @enq_th.alive?
write_loop(replys, is_client: false)
end

private
Expand All @@ -126,37 +123,32 @@ def each_queued_msg
end
end

# during bidi-streaming, read the requests to send from a separate thread
# read so that read_loop does not block waiting for requests to read.
def start_write_loop(requests, is_client: true)
Thread.new do # TODO: run on a thread pool
GRPC.logger.debug('bidi-write-loop: starting')
begin
write_tag = Object.new
count = 0
requests.each do |req|
GRPC.logger.debug("bidi-write-loop: #{count}")
count += 1
payload = @marshal.call(req)
@call.run_batch(@cq, write_tag, INFINITE_FUTURE,
SEND_MESSAGE => payload)
end
GRPC.logger.debug("bidi-write-loop: #{count} writes done")
if is_client
GRPC.logger.debug("bidi-write-loop: client sent #{count}, waiting")
@call.run_batch(@cq, write_tag, INFINITE_FUTURE,
SEND_CLOSE_FROM_CLIENT => nil)
batch_result = @call.run_batch(@cq, write_tag, INFINITE_FUTURE,
RECV_STATUS_ON_CLIENT => nil)
batch_result.check_status
end
rescue StandardError => e
GRPC.logger.warn('bidi-write-loop: failed')
GRPC.logger.warn(e)
raise e
end
GRPC.logger.debug('bidi-write-loop: finished')
def write_loop(requests, is_client: true)
GRPC.logger.debug('bidi-write-loop: starting')
write_tag = Object.new
count = 0
requests.each do |req|
GRPC.logger.debug("bidi-write-loop: #{count}")
count += 1
payload = @marshal.call(req)
@call.run_batch(@cq, write_tag, INFINITE_FUTURE,
SEND_MESSAGE => payload)
end
GRPC.logger.debug("bidi-write-loop: #{count} writes done")
if is_client
GRPC.logger.debug("bidi-write-loop: client sent #{count}, waiting")
batch_result = @call.run_batch(@cq, write_tag, INFINITE_FUTURE,
SEND_CLOSE_FROM_CLIENT => nil,
RECV_STATUS_ON_CLIENT => nil)
@call.status = batch_result.status
batch_result.check_status
GRPC.logger.debug("bidi-write-loop: done status #{@call.status}")
end
GRPC.logger.debug('bidi-write-loop: finished')
rescue StandardError => e
GRPC.logger.warn('bidi-write-loop: failed')
GRPC.logger.warn(e)
raise e
end

# starts the read loop
Expand Down

0 comments on commit 67a3c16

Please sign in to comment.