Skip to content

Commit

Permalink
Merge pull request grpc#7669 from apolcyn/reduce_bidi_threads_ga
Browse files Browse the repository at this point in the history
remove dedicated thread for ruby bidi read loop
  • Loading branch information
kpayson64 authored Aug 15, 2016
2 parents 37f6ffa + dc3d561 commit e4947be
Showing 1 changed file with 36 additions and 59 deletions.
95 changes: 36 additions & 59 deletions src/ruby/lib/grpc/generic/bidi_call.rb
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ def initialize(call, marshal, unmarshal, metadata_received: false)
@call = call
@marshal = marshal
@op_notifier = nil # signals completion on clients
@readq = Queue.new
@unmarshal = unmarshal
@metadata_received = metadata_received
@reads_complete = false
Expand All @@ -81,8 +80,7 @@ def initialize(call, marshal, unmarshal, metadata_received: false)
def run_on_client(requests, op_notifier, &blk)
@op_notifier = op_notifier
@enq_th = Thread.new { write_loop(requests) }
@loop_th = start_read_loop
each_queued_msg(&blk)
read_loop(&blk)
end

# Begins orchestration of the Bidi stream for a server generating replies.
Expand All @@ -97,8 +95,7 @@ def run_on_client(requests, op_notifier, &blk)
#
# @param gen_each_reply [Proc] generates the BiDi stream replies.
def run_on_server(gen_each_reply)
replys = gen_each_reply.call(each_queued_msg)
@loop_th = start_read_loop(is_client: false)
replys = gen_each_reply.call(read_loop(is_client: false))
write_loop(replys, is_client: false)
end

Expand Down Expand Up @@ -135,24 +132,6 @@ def read_using_run_batch
batch_result
end

# each_queued_msg yields each message on this instances readq
#
# - messages are added to the readq by #read_loop
# - iteration ends when the instance itself is added
def each_queued_msg
return enum_for(:each_queued_msg) unless block_given?
count = 0
loop do
GRPC.logger.debug("each_queued_msg: waiting##{count}")
count += 1
req = @readq.pop
GRPC.logger.debug("each_queued_msg: req = #{req}")
fail req if req.is_a? StandardError
break if req.equal?(END_OF_READS)
yield req
end
end

def write_loop(requests, is_client: true)
GRPC.logger.debug('bidi-write-loop: starting')
count = 0
Expand Down Expand Up @@ -190,47 +169,45 @@ def write_loop(requests, is_client: true)
raise e
end

# starts the read loop
def start_read_loop(is_client: true)
Thread.new do
GRPC.logger.debug('bidi-read-loop: starting')
begin
count = 0
# queue the initial read before beginning the loop
loop do
GRPC.logger.debug("bidi-read-loop: #{count}")
count += 1
batch_result = read_using_run_batch

# handle the next message
if batch_result.message.nil?
GRPC.logger.debug("bidi-read-loop: null batch #{batch_result}")

if is_client
batch_result = @call.run_batch(RECV_STATUS_ON_CLIENT => nil)
@call.status = batch_result.status
batch_result.check_status
GRPC.logger.debug("bidi-read-loop: done status #{@call.status}")
end

@readq.push(END_OF_READS)
GRPC.logger.debug('bidi-read-loop: done reading!')
break
# Provides an enumerator that yields results of remote reads
def read_loop(is_client: true)
return enum_for(:read_loop,
is_client: is_client) unless block_given?
GRPC.logger.debug('bidi-read-loop: starting')
begin
count = 0
# queue the initial read before beginning the loop
loop do
GRPC.logger.debug("bidi-read-loop: #{count}")
count += 1
batch_result = read_using_run_batch

# handle the next message
if batch_result.message.nil?
GRPC.logger.debug("bidi-read-loop: null batch #{batch_result}")

if is_client
batch_result = @call.run_batch(RECV_STATUS_ON_CLIENT => nil)
@call.status = batch_result.status
batch_result.check_status
GRPC.logger.debug("bidi-read-loop: done status #{@call.status}")
end

# push the latest read onto the queue and continue reading
res = @unmarshal.call(batch_result.message)
@readq.push(res)
GRPC.logger.debug('bidi-read-loop: done reading!')
break
end
rescue StandardError => e
GRPC.logger.warn('bidi: read-loop failed')
GRPC.logger.warn(e)
@readq.push(e) # let each_queued_msg terminate with this error

res = @unmarshal.call(batch_result.message)
yield res
end
GRPC.logger.debug('bidi-read-loop: finished')
@reads_complete = true
finished
rescue StandardError => e
GRPC.logger.warn('bidi: read-loop failed')
GRPC.logger.warn(e)
raise e
end
GRPC.logger.debug('bidi-read-loop: finished')
@reads_complete = true
finished
end
end
end

0 comments on commit e4947be

Please sign in to comment.