Skip to content

Commit

Permalink
Merge pull request grpc#6667 from kpayson64/ruby_eventfd_bug
Browse files Browse the repository at this point in the history
Fixed ruby fd bug
  • Loading branch information
jtattermusch committed Jun 2, 2016
2 parents d06b482 + dce1ee6 commit f78b822
Show file tree
Hide file tree
Showing 7 changed files with 89 additions and 50 deletions.
82 changes: 42 additions & 40 deletions src/ruby/ext/grpc/rb_call.c
Original file line number Diff line number Diff line change
Expand Up @@ -101,30 +101,14 @@ static VALUE sym_message;
static VALUE sym_status;
static VALUE sym_cancelled;

/* hash_all_calls is a hash of Call address -> reference count that is used to
* track the creation and destruction of rb_call instances.
*/
static VALUE hash_all_calls;

/* Destroys a Call. */
static void grpc_rb_call_destroy(void *p) {
grpc_call *call = NULL;
VALUE ref_count = Qnil;
grpc_call* call = NULL;
if (p == NULL) {
return;
};
call = (grpc_call *)p;

ref_count = rb_hash_aref(hash_all_calls, OFFT2NUM((VALUE)call));
if (ref_count == Qnil) {
return; /* No longer in the hash, so already deleted */
} else if (NUM2UINT(ref_count) == 1) {
rb_hash_delete(hash_all_calls, OFFT2NUM((VALUE)call));
grpc_call_destroy(call);
} else {
rb_hash_aset(hash_all_calls, OFFT2NUM((VALUE)call),
UINT2NUM(NUM2UINT(ref_count) - 1));
}
call = (grpc_call *)p;
grpc_call_destroy(call);
}

static size_t md_ary_datasize(const void *p) {
Expand All @@ -151,7 +135,7 @@ static const rb_data_type_t grpc_rb_md_ary_data_type = {
* touches a hash object.
* TODO(yugui) Directly use st_table and call the free function earlier?
*/
0,
0,
#endif
};

Expand All @@ -163,12 +147,7 @@ static const rb_data_type_t grpc_call_data_type = {
NULL,
NULL,
#ifdef RUBY_TYPED_FREE_IMMEDIATELY
/* it is unsafe to specify RUBY_TYPED_FREE_IMMEDIATELY because
* grpc_rb_call_destroy
* touches a hash object.
* TODO(yugui) Directly use st_table and call the free function earlier?
*/
0,
RUBY_TYPED_FREE_IMMEDIATELY
#endif
};

Expand All @@ -190,6 +169,11 @@ const char *grpc_call_error_detail_of(grpc_call_error err) {
static VALUE grpc_rb_call_cancel(VALUE self) {
grpc_call *call = NULL;
grpc_call_error err;
if (RTYPEDDATA_DATA(self) == NULL) {
//This call has been closed
return Qnil;
}

TypedData_Get_Struct(self, grpc_call, &grpc_call_data_type, call);
err = grpc_call_cancel(call, NULL);
if (err != GRPC_CALL_OK) {
Expand All @@ -200,11 +184,29 @@ static VALUE grpc_rb_call_cancel(VALUE self) {
return Qnil;
}

/* Releases the c-level resources associated with a call
Once a call has been closed, no further requests can be
processed.
*/
static VALUE grpc_rb_call_close(VALUE self) {
grpc_call *call = NULL;
TypedData_Get_Struct(self, grpc_call, &grpc_call_data_type, call);
if(call != NULL) {
grpc_call_destroy(call);
RTYPEDDATA_DATA(self) = NULL;
}
return Qnil;
}

/* Called to obtain the peer that this call is connected to. */
static VALUE grpc_rb_call_get_peer(VALUE self) {
VALUE res = Qnil;
grpc_call *call = NULL;
char *peer = NULL;
if (RTYPEDDATA_DATA(self) == NULL) {
rb_raise(grpc_rb_eCallError, "Cannot get peer value on closed call");
return Qnil;
}
TypedData_Get_Struct(self, grpc_call, &grpc_call_data_type, call);
peer = grpc_call_get_peer(call);
res = rb_str_new2(peer);
Expand All @@ -218,6 +220,10 @@ static VALUE grpc_rb_call_get_peer_cert(VALUE self) {
grpc_call *call = NULL;
VALUE res = Qnil;
grpc_auth_context *ctx = NULL;
if (RTYPEDDATA_DATA(self) == NULL) {
rb_raise(grpc_rb_eCallError, "Cannot get peer cert on closed call");
return Qnil;
}
TypedData_Get_Struct(self, grpc_call, &grpc_call_data_type, call);

ctx = grpc_call_auth_context(call);
Expand Down Expand Up @@ -323,6 +329,10 @@ static VALUE grpc_rb_call_set_credentials(VALUE self, VALUE credentials) {
grpc_call *call = NULL;
grpc_call_credentials *creds;
grpc_call_error err;
if (RTYPEDDATA_DATA(self) == NULL) {
rb_raise(grpc_rb_eCallError, "Cannot set credentials of closed call");
return Qnil;
}
TypedData_Get_Struct(self, grpc_call, &grpc_call_data_type, call);
creds = grpc_rb_get_wrapped_call_credentials(credentials);
err = grpc_call_set_credentials(call, creds);
Expand Down Expand Up @@ -731,7 +741,7 @@ static VALUE grpc_run_batch_stack_build_result(run_batch_stack *st) {
}
tag = Object.new
timeout = 10
call.start_batch(cqueue, tag, timeout, ops)
call.start_batch(cq, tag, timeout, ops)
Start a batch of operations defined in the array ops; when complete, post a
completion of type 'tag' to the completion queue bound to the call.
Expand All @@ -749,6 +759,10 @@ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE cqueue, VALUE tag,
VALUE result = Qnil;
VALUE rb_write_flag = rb_ivar_get(self, id_write_flag);
unsigned write_flag = 0;
if (RTYPEDDATA_DATA(self) == NULL) {
rb_raise(grpc_rb_eCallError, "Cannot run batch on closed call");
return Qnil;
}
TypedData_Get_Struct(self, grpc_call, &grpc_call_data_type, call);

/* Validate the ops args, adding them to a ruby array */
Expand Down Expand Up @@ -888,6 +902,7 @@ void Init_grpc_call() {
/* Add ruby analogues of the Call methods. */
rb_define_method(grpc_rb_cCall, "run_batch", grpc_rb_call_run_batch, 4);
rb_define_method(grpc_rb_cCall, "cancel", grpc_rb_call_cancel, 0);
rb_define_method(grpc_rb_cCall, "close", grpc_rb_call_close, 0);
rb_define_method(grpc_rb_cCall, "peer", grpc_rb_call_get_peer, 0);
rb_define_method(grpc_rb_cCall, "peer_cert", grpc_rb_call_get_peer_cert, 0);
rb_define_method(grpc_rb_cCall, "status", grpc_rb_call_get_status, 0);
Expand Down Expand Up @@ -925,11 +940,6 @@ void Init_grpc_call() {
"BatchResult", "send_message", "send_metadata", "send_close",
"send_status", "message", "metadata", "status", "cancelled", NULL);

/* The hash for reference counting calls, to ensure they can't be destroyed
* more than once */
hash_all_calls = rb_hash_new();
rb_define_const(grpc_rb_cCall, "INTERNAL_ALL_CALLs", hash_all_calls);

Init_grpc_error_codes();
Init_grpc_op_codes();
Init_grpc_write_flags();
Expand All @@ -944,16 +954,8 @@ grpc_call *grpc_rb_get_wrapped_call(VALUE v) {

/* Obtains the wrapped object for a given call */
VALUE grpc_rb_wrap_call(grpc_call *c) {
VALUE obj = Qnil;
if (c == NULL) {
return Qnil;
}
obj = rb_hash_aref(hash_all_calls, OFFT2NUM((VALUE)c));
if (obj == Qnil) { /* Not in the hash add it */
rb_hash_aset(hash_all_calls, OFFT2NUM((VALUE)c), UINT2NUM(1));
} else {
rb_hash_aset(hash_all_calls, OFFT2NUM((VALUE)c),
UINT2NUM(NUM2UINT(obj) + 1));
}
return TypedData_Wrap_Struct(grpc_rb_cCall, &grpc_call_data_type, c);
}
13 changes: 13 additions & 0 deletions src/ruby/ext/grpc/rb_completion_queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,14 @@ static rb_data_type_t grpc_rb_completion_queue_data_type = {
#endif
};

/* Releases the c-level resources associated with a completion queue */
static VALUE grpc_rb_completion_queue_close(VALUE self) {
grpc_completion_queue* cq = grpc_rb_get_wrapped_completion_queue(self);
grpc_rb_completion_queue_destroy(cq);
RTYPEDDATA_DATA(self) = NULL;
return Qnil;
}

/* Allocates a completion queue. */
static VALUE grpc_rb_completion_queue_alloc(VALUE cls) {
grpc_completion_queue *cq = grpc_completion_queue_create(NULL);
Expand Down Expand Up @@ -212,6 +220,11 @@ void Init_grpc_completion_queue() {
this func, so no separate initialization step is necessary. */
rb_define_alloc_func(grpc_rb_cCompletionQueue,
grpc_rb_completion_queue_alloc);

/* close: Provides a way to close the underlying file descriptor without
waiting for ruby garbage collection. */
rb_define_method(grpc_rb_cCompletionQueue, "close",
grpc_rb_completion_queue_close, 0);
}

/* Gets the wrapped completion queue from the ruby wrapper */
Expand Down
2 changes: 1 addition & 1 deletion src/ruby/ext/grpc/rb_grpc.c
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ void Init_grpc_c() {
grpc_rb_mGrpcCore = rb_define_module_under(grpc_rb_mGRPC, "Core");
grpc_rb_sNewServerRpc =
rb_struct_define("NewServerRpc", "method", "host",
"deadline", "metadata", "call", NULL);
"deadline", "metadata", "call", "cq", NULL);
grpc_rb_sStatus =
rb_struct_define("Status", "code", "details", "metadata", NULL);
sym_code = ID2SYM(rb_intern("code"));
Expand Down
6 changes: 3 additions & 3 deletions src/ruby/ext/grpc/rb_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ static VALUE grpc_rb_server_request_call(VALUE self, VALUE cqueue,
err = grpc_server_request_call(
s->wrapped, &call, &st.details, &st.md_ary,
grpc_rb_get_wrapped_completion_queue(cqueue),
grpc_rb_get_wrapped_completion_queue(cqueue),
grpc_rb_get_wrapped_completion_queue(s->mark),
ROBJECT(tag_new));
if (err != GRPC_CALL_OK) {
grpc_request_call_stack_cleanup(&st);
Expand All @@ -244,7 +244,7 @@ static VALUE grpc_rb_server_request_call(VALUE self, VALUE cqueue,
return Qnil;
}

ev = grpc_rb_completion_queue_pluck_event(cqueue, tag_new, timeout);
ev = grpc_rb_completion_queue_pluck_event(s->mark, tag_new, timeout);
if (ev.type == GRPC_QUEUE_TIMEOUT) {
grpc_request_call_stack_cleanup(&st);
return Qnil;
Expand All @@ -262,7 +262,7 @@ static VALUE grpc_rb_server_request_call(VALUE self, VALUE cqueue,
rb_str_new2(st.details.host),
rb_funcall(rb_cTime, id_at, 2, INT2NUM(deadline.tv_sec),
INT2NUM(deadline.tv_nsec)),
grpc_rb_md_ary_to_h(&st.md_ary), grpc_rb_wrap_call(call), NULL);
grpc_rb_md_ary_to_h(&st.md_ary), grpc_rb_wrap_call(call), cqueue, NULL);
grpc_request_call_stack_cleanup(&st);
return result;
}
Expand Down
4 changes: 3 additions & 1 deletion src/ruby/lib/grpc/generic/active_call.rb
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def self.client_invoke(call, q, metadata = {})
#
# @param call [Call] the call used by the ActiveCall
# @param q [CompletionQueue] the completion queue used to accept
# the call
# the call. This queue will be closed on call completion.
# @param marshal [Function] f(obj)->string that marshal requests
# @param unmarshal [Function] f(string)->obj that unmarshals responses
# @param deadline [Fixnum] the deadline for the call to complete
Expand Down Expand Up @@ -191,6 +191,8 @@ def finished
@call.status = batch_result.status
op_is_done
batch_result.check_status
@call.close
@cq.close
end

# remote_send sends a request to the remote endpoint.
Expand Down
20 changes: 20 additions & 0 deletions src/ruby/lib/grpc/generic/bidi_call.rb
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ def initialize(call, q, marshal, unmarshal, metadata_tag: nil)
@readq = Queue.new
@unmarshal = unmarshal
@metadata_tag = metadata_tag
@reads_complete = false
@writes_complete = false
@complete = false
@done_mutex = Mutex.new
end

# Begins orchestration of the Bidi stream for a client sending requests.
Expand Down Expand Up @@ -115,6 +119,16 @@ def notify_done
@op_notifier.notify(self)
end

# signals that a bidi operation is complete (read + write)
def finished
@done_mutex.synchronize do
return unless @reads_complete && @writes_complete && !@complete
@call.close
@cq.close
@complete = true
end
end

# performs a read using @call.run_batch, ensures metadata is set up
def read_using_run_batch
ops = { RECV_MESSAGE => nil }
Expand Down Expand Up @@ -163,12 +177,16 @@ def write_loop(requests, is_client: true)
SEND_CLOSE_FROM_CLIENT => nil)
GRPC.logger.debug('bidi-write-loop: done')
notify_done
@writes_complete = true
finished
end
GRPC.logger.debug('bidi-write-loop: finished')
rescue StandardError => e
GRPC.logger.warn('bidi-write-loop: failed')
GRPC.logger.warn(e)
notify_done
@writes_complete = true
finished
raise e
end

Expand Down Expand Up @@ -212,6 +230,8 @@ def start_read_loop(is_client: true)
@readq.push(e) # let each_queued_msg terminate with this error
end
GRPC.logger.debug('bidi-read-loop: finished')
@reads_complete = true
finished
end
end
end
Expand Down
12 changes: 7 additions & 5 deletions src/ruby/lib/grpc/generic/rpc_server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ def available?(an_rpc)
return an_rpc if @pool.jobs_waiting <= @max_waiting_requests
GRPC.logger.warn("NOT AVAILABLE: too many jobs_waiting: #{an_rpc}")
noop = proc { |x| x }
c = ActiveCall.new(an_rpc.call, @cq, noop, noop, an_rpc.deadline)
c = ActiveCall.new(an_rpc.call, an_rpc.cq, noop, noop, an_rpc.deadline)
c.send_status(GRPC::Core::StatusCodes::RESOURCE_EXHAUSTED, '')
nil
end
Expand All @@ -366,7 +366,7 @@ def implemented?(an_rpc)
return an_rpc if rpc_descs.key?(mth)
GRPC.logger.warn("UNIMPLEMENTED: #{an_rpc}")
noop = proc { |x| x }
c = ActiveCall.new(an_rpc.call, @cq, noop, noop, an_rpc.deadline)
c = ActiveCall.new(an_rpc.call, an_rpc.cq, noop, noop, an_rpc.deadline)
c.send_status(GRPC::Core::StatusCodes::UNIMPLEMENTED, '')
nil
end
Expand All @@ -377,7 +377,8 @@ def loop_handle_server_calls
loop_tag = Object.new
while running_state == :running
begin
an_rpc = @server.request_call(@cq, loop_tag, INFINITE_FUTURE)
comp_queue = Core::CompletionQueue.new
an_rpc = @server.request_call(comp_queue, loop_tag, INFINITE_FUTURE)
break if (!an_rpc.nil?) && an_rpc.call.nil?
active_call = new_active_server_call(an_rpc)
unless active_call.nil?
Expand Down Expand Up @@ -416,15 +417,16 @@ def new_active_server_call(an_rpc)
unless @connect_md_proc.nil?
connect_md = @connect_md_proc.call(an_rpc.method, an_rpc.metadata)
end
an_rpc.call.run_batch(@cq, handle_call_tag, INFINITE_FUTURE,
an_rpc.call.run_batch(an_rpc.cq, handle_call_tag, INFINITE_FUTURE,
SEND_INITIAL_METADATA => connect_md)

return nil unless available?(an_rpc)
return nil unless implemented?(an_rpc)

# Create the ActiveCall
GRPC.logger.info("deadline is #{an_rpc.deadline}; (now=#{Time.now})")
rpc_desc = rpc_descs[an_rpc.method.to_sym]
c = ActiveCall.new(an_rpc.call, @cq,
c = ActiveCall.new(an_rpc.call, an_rpc.cq,
rpc_desc.marshal_proc, rpc_desc.unmarshal_proc(:input),
an_rpc.deadline)
mth = an_rpc.method.to_sym
Expand Down

0 comments on commit f78b822

Please sign in to comment.