Skip to content

Commit

Permalink
Merge pull request grpc#2750 from ctiller/propagate
Browse files Browse the repository at this point in the history
Core-supported context propagation
  • Loading branch information
yang-g committed Aug 6, 2015
2 parents 2ba9e5d + 44912d7 commit e5b0459
Show file tree
Hide file tree
Showing 60 changed files with 6,510 additions and 1,196 deletions.
1,987 changes: 1,788 additions & 199 deletions Makefile

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions build.json
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@
"name": "grpc_test_util_base",
"headers": [
"test/core/end2end/cq_verifier.h",
"test/core/end2end/fixtures/proxy.h",
"test/core/iomgr/endpoint_tests.h",
"test/core/security/oauth2_utils.h",
"test/core/util/grpc_profiler.h",
Expand All @@ -339,6 +340,7 @@
],
"src": [
"test/core/end2end/cq_verifier.c",
"test/core/end2end/fixtures/proxy.c",
"test/core/iomgr/endpoint_tests.c",
"test/core/security/oauth2_utils.c",
"test/core/util/grpc_profiler.c",
Expand Down
38 changes: 33 additions & 5 deletions include/grpc/grpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ typedef enum grpc_call_error {
GRPC_CALL_ERROR_INVALID_FLAGS,
/** invalid metadata was passed to this call */
GRPC_CALL_ERROR_INVALID_METADATA,
/** invalid message was passed to this call */
GRPC_CALL_ERROR_INVALID_MESSAGE,
/** completion queue for notification has not been registered with the
server */
GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE
Expand Down Expand Up @@ -308,8 +310,8 @@ typedef struct grpc_op {
value, or reuse it in a future op. */
grpc_metadata_array *recv_initial_metadata;
/** ownership of the byte buffer is moved to the caller; the caller must
call
grpc_byte_buffer_destroy on this value, or reuse it in a future op. */
call grpc_byte_buffer_destroy on this value, or reuse it in a future op.
*/
grpc_byte_buffer **recv_message;
struct {
/** ownership of the array is with the caller, but ownership of the
Expand Down Expand Up @@ -351,6 +353,26 @@ typedef struct grpc_op {
} data;
} grpc_op;

/* Propagation bits: this can be bitwise or-ed to form propagation_mask for
* grpc_call */
/** Propagate deadline */
#define GRPC_PROPAGATE_DEADLINE ((gpr_uint32)1)
/** Propagate census context */
#define GRPC_PROPAGATE_CENSUS_STATS_CONTEXT ((gpr_uint32)2)
#define GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT ((gpr_uint32)4)
/** Propagate cancellation */
#define GRPC_PROPAGATE_CANCELLATION ((gpr_uint32)8)

/* Default propagation mask: clients of the core API are encouraged to encode
deltas from this in their implementations... ie write:
GRPC_PROPAGATE_DEFAULTS & ~GRPC_PROPAGATE_DEADLINE to disable deadline
propagation. Doing so gives flexibility in the future to define new
propagation types that are default inherited or not. */
#define GRPC_PROPAGATE_DEFAULTS \
((gpr_uint32)(( \
0xffff | GRPC_PROPAGATE_DEADLINE | GRPC_PROPAGATE_CENSUS_STATS_CONTEXT | \
GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT | GRPC_PROPAGATE_CANCELLATION)))

/** Initialize the grpc library.
It is not safe to call any other grpc functions before calling this.
Expand Down Expand Up @@ -430,8 +452,13 @@ void grpc_channel_watch_connectivity_state(

/** Create a call given a grpc_channel, in order to call 'method'. All
completions are sent to 'completion_queue'. 'method' and 'host' need only
live through the invocation of this function. */
live through the invocation of this function.
If parent_call is non-NULL, it must be a server-side call. It will be used
to propagate properties from the server call to this new client call.
*/
grpc_call *grpc_channel_create_call(grpc_channel *channel,
grpc_call *parent_call,
gpr_uint32 propagation_mask,
grpc_completion_queue *completion_queue,
const char *method, const char *host,
gpr_timespec deadline);
Expand All @@ -442,8 +469,9 @@ void *grpc_channel_register_call(grpc_channel *channel, const char *method,

/** Create a call given a handle returned from grpc_channel_register_call */
grpc_call *grpc_channel_create_registered_call(
grpc_channel *channel, grpc_completion_queue *completion_queue,
void *registered_call_handle, gpr_timespec deadline);
grpc_channel *channel, grpc_call *parent_call, gpr_uint32 propagation_mask,
grpc_completion_queue *completion_queue, void *registered_call_handle,
gpr_timespec deadline);

/** Start a batch of operations defined in the array ops; when complete, post a
completion of type 'tag' to the completion queue bound to the call.
Expand Down
99 changes: 93 additions & 6 deletions src/core/surface/call.c
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ typedef enum {
struct grpc_call {
grpc_completion_queue *cq;
grpc_channel *channel;
grpc_call *parent;
grpc_call *first_child;
grpc_mdctx *metadata_context;
/* TODO(ctiller): share with cq if possible? */
gpr_mu mu;
Expand Down Expand Up @@ -176,6 +178,8 @@ struct grpc_call {
gpr_uint8 cancel_alarm;
/** bitmask of allocated completion events in completions */
gpr_uint8 allocated_completions;
/** flag indicating that cancellation is inherited */
gpr_uint8 cancellation_is_inherited;

/* flags with bits corresponding to write states allowing us to determine
what was sent */
Expand Down Expand Up @@ -267,6 +271,11 @@ struct grpc_call {

/** completion events - for completion queue use */
grpc_cq_completion completions[MAX_CONCURRENT_COMPLETIONS];

/** siblings: children of the same parent form a list, and this list is protected under
parent->mu */
grpc_call *sibling_next;
grpc_call *sibling_prev;
};

#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1))
Expand All @@ -290,7 +299,9 @@ static void finished_loose_op(void *call, int success);
static void lock(grpc_call *call);
static void unlock(grpc_call *call);

grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call,
gpr_uint32 propagation_mask,
grpc_completion_queue *cq,
const void *server_transport_data,
grpc_mdelem **add_initial_metadata,
size_t add_initial_metadata_count,
Expand All @@ -306,9 +317,10 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
gpr_mu_init(&call->completion_mu);
call->channel = channel;
call->cq = cq;
if (cq) {
if (cq != NULL) {
GRPC_CQ_INTERNAL_REF(cq, "bind");
}
call->parent = parent_call;
call->is_client = server_transport_data == NULL;
for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) {
call->request_set[i] = REQSET_EMPTY;
Expand Down Expand Up @@ -347,6 +359,46 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
}
grpc_call_stack_init(channel_stack, server_transport_data, initial_op_ptr,
CALL_STACK_FROM_CALL(call));
if (parent_call != NULL) {
GRPC_CALL_INTERNAL_REF(parent_call, "child");
GPR_ASSERT(call->is_client);
GPR_ASSERT(!parent_call->is_client);

gpr_mu_lock(&parent_call->mu);

if (propagation_mask & GRPC_PROPAGATE_DEADLINE) {
send_deadline = gpr_time_min(
gpr_convert_clock_type(send_deadline,
parent_call->send_deadline.clock_type),
parent_call->send_deadline);
}
/* for now GRPC_PROPAGATE_TRACING_CONTEXT *MUST* be passed with
* GRPC_PROPAGATE_STATS_CONTEXT */
/* TODO(ctiller): This should change to use the appropriate census start_op
* call. */
if (propagation_mask & GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT) {
GPR_ASSERT(propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT);
grpc_call_context_set(call, GRPC_CONTEXT_TRACING,
parent_call->context[GRPC_CONTEXT_TRACING].value,
NULL);
} else {
GPR_ASSERT(propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT);
}
if (propagation_mask & GRPC_PROPAGATE_CANCELLATION) {
call->cancellation_is_inherited = 1;
}

if (parent_call->first_child == NULL) {
parent_call->first_child = call;
call->sibling_next = call->sibling_prev = call;
} else {
call->sibling_next = parent_call->first_child;
call->sibling_prev = parent_call->first_child->sibling_prev;
call->sibling_next->sibling_prev = call->sibling_prev->sibling_next = call;
}

gpr_mu_unlock(&parent_call->mu);
}
if (gpr_time_cmp(send_deadline, gpr_inf_future(send_deadline.clock_type)) !=
0) {
set_deadline_alarm(call, send_deadline);
Expand Down Expand Up @@ -404,6 +456,20 @@ void grpc_call_internal_ref(grpc_call *c) {
static void destroy_call(void *call, int ignored_success) {
size_t i;
grpc_call *c = call;
grpc_call *parent = c->parent;
if (parent) {
gpr_mu_lock(&parent->mu);
if (call == parent->first_child) {
parent->first_child = c->sibling_next;
if (c == parent->first_child) {
parent->first_child = NULL;
}
c->sibling_prev->sibling_next = c->sibling_next;
c->sibling_next->sibling_prev = c->sibling_prev;
}
gpr_mu_unlock(&parent->mu);
GRPC_CALL_INTERNAL_UNREF(parent, "child", 1);
}
grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c));
GRPC_CHANNEL_INTERNAL_UNREF(c->channel, "call");
gpr_mu_destroy(&c->mu);
Expand Down Expand Up @@ -870,6 +936,8 @@ static int add_slice_to_message(grpc_call *call, gpr_slice slice) {

static void call_on_done_recv(void *pc, int success) {
grpc_call *call = pc;
grpc_call *child_call;
grpc_call *next_child_call;
size_t i;
GRPC_TIMER_BEGIN(GRPC_PTAG_CALL_ON_DONE_RECV, 0);
lock(call);
Expand Down Expand Up @@ -903,6 +971,19 @@ static void call_on_done_recv(void *pc, int success) {
GPR_ASSERT(call->read_state <= READ_STATE_STREAM_CLOSED);
call->read_state = READ_STATE_STREAM_CLOSED;
call->cancel_alarm |= call->have_alarm;
/* propagate cancellation to any interested children */
child_call = call->first_child;
if (child_call != NULL) {
do {
next_child_call = child_call->sibling_next;
if (child_call->cancellation_is_inherited) {
GRPC_CALL_INTERNAL_REF(child_call, "propagate_cancel");
grpc_call_cancel(child_call);
GRPC_CALL_INTERNAL_UNREF(child_call, "propagate_cancel", 0);
}
child_call = next_child_call;
} while (child_call != call->first_child);
}
GRPC_CALL_INTERNAL_UNREF(call, "closed", 0);
}
finish_read_ops(call);
Expand Down Expand Up @@ -1283,9 +1364,9 @@ static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline) {
}
GRPC_CALL_INTERNAL_REF(call, "alarm");
call->have_alarm = 1;
grpc_alarm_init(&call->alarm,
gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC),
call_alarm, call, gpr_now(GPR_CLOCK_MONOTONIC));
call->send_deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
grpc_alarm_init(&call->alarm, call->send_deadline, call_alarm, call,
gpr_now(GPR_CLOCK_MONOTONIC));
}

/* we offset status by a small amount when storing it into transport metadata
Expand Down Expand Up @@ -1377,7 +1458,8 @@ static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) {
}
}
if (gpr_time_cmp(md->deadline, gpr_inf_future(md->deadline.clock_type)) !=
0) {
0 &&
!call->is_client) {
set_deadline_alarm(call, md->deadline);
}
if (!is_trailing) {
Expand Down Expand Up @@ -1465,6 +1547,9 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
if (!are_write_flags_valid(op->flags)) {
return GRPC_CALL_ERROR_INVALID_FLAGS;
}
if (op->data.send_message == NULL) {
return GRPC_CALL_ERROR_INVALID_MESSAGE;
}
req = &reqs[out++];
req->op = GRPC_IOREQ_SEND_MESSAGE;
req->data.send_message = op->data.send_message;
Expand Down Expand Up @@ -1514,6 +1599,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
req = &reqs[out++];
req->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
req->data.recv_metadata = op->data.recv_initial_metadata;
req->data.recv_metadata->count = 0;
req->flags = op->flags;
break;
case GRPC_OP_RECV_MESSAGE:
Expand Down Expand Up @@ -1545,6 +1631,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
req->op = GRPC_IOREQ_RECV_TRAILING_METADATA;
req->data.recv_metadata =
op->data.recv_status_on_client.trailing_metadata;
req->data.recv_metadata->count = 0;
req = &reqs[out++];
req->op = GRPC_IOREQ_RECV_CLOSE;
finish_func = finish_batch_with_close;
Expand Down
4 changes: 3 additions & 1 deletion src/core/surface/call.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@ typedef struct {
typedef void (*grpc_ioreq_completion_func)(grpc_call *call, int success,
void *user_data);

grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call,
gpr_uint32 propagation_mask,
grpc_completion_queue *cq,
const void *server_transport_data,
grpc_mdelem **add_initial_metadata,
size_t add_initial_metadata_count,
Expand Down
19 changes: 12 additions & 7 deletions src/core/surface/channel.c
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ char *grpc_channel_get_target(grpc_channel *channel) {
}

static grpc_call *grpc_channel_create_call_internal(
grpc_channel *channel, grpc_completion_queue *cq, grpc_mdelem *path_mdelem,
grpc_channel *channel, grpc_call *parent_call, gpr_uint32 propagation_mask,
grpc_completion_queue *cq, grpc_mdelem *path_mdelem,
grpc_mdelem *authority_mdelem, gpr_timespec deadline) {
grpc_mdelem *send_metadata[2];
int num_metadata = 0;
Expand All @@ -158,16 +159,18 @@ static grpc_call *grpc_channel_create_call_internal(
send_metadata[num_metadata++] = authority_mdelem;
}

return grpc_call_create(channel, cq, NULL, send_metadata,
num_metadata, deadline);
return grpc_call_create(channel, parent_call, propagation_mask, cq, NULL,
send_metadata, num_metadata, deadline);
}

grpc_call *grpc_channel_create_call(grpc_channel *channel,
grpc_call *parent_call,
gpr_uint32 propagation_mask,
grpc_completion_queue *cq,
const char *method, const char *host,
gpr_timespec deadline) {
return grpc_channel_create_call_internal(
channel, cq,
channel, parent_call, propagation_mask, cq,
grpc_mdelem_from_metadata_strings(
channel->metadata_context, GRPC_MDSTR_REF(channel->path_string),
grpc_mdstr_from_string(channel->metadata_context, method, 0)),
Expand Down Expand Up @@ -195,11 +198,13 @@ void *grpc_channel_register_call(grpc_channel *channel, const char *method,
}

grpc_call *grpc_channel_create_registered_call(
grpc_channel *channel, grpc_completion_queue *completion_queue,
void *registered_call_handle, gpr_timespec deadline) {
grpc_channel *channel, grpc_call *parent_call, gpr_uint32 propagation_mask,
grpc_completion_queue *completion_queue, void *registered_call_handle,
gpr_timespec deadline) {
registered_call *rc = registered_call_handle;
return grpc_channel_create_call_internal(
channel, completion_queue, GRPC_MDELEM_REF(rc->path),
channel, parent_call, propagation_mask, completion_queue,
GRPC_MDELEM_REF(rc->path),
rc->authority ? GRPC_MDELEM_REF(rc->authority) : NULL, deadline);
}

Expand Down
5 changes: 5 additions & 0 deletions src/core/surface/completion_queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ void grpc_cq_internal_unref(grpc_completion_queue *cc) {
}

void grpc_cq_begin_op(grpc_completion_queue *cc) {
#ifndef NDEBUG
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
GPR_ASSERT(!cc->shutdown_called);
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
#endif
gpr_ref(&cc->pending_events);
}

Expand Down
4 changes: 2 additions & 2 deletions src/core/surface/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -644,8 +644,8 @@ static void accept_stream(void *cd, grpc_transport *transport,
const void *transport_server_data) {
channel_data *chand = cd;
/* create a call */
grpc_call_create(chand->channel, NULL, transport_server_data, NULL, 0,
gpr_inf_future(GPR_CLOCK_REALTIME));
grpc_call_create(chand->channel, NULL, 0, NULL, transport_server_data, NULL,
0, gpr_inf_future(GPR_CLOCK_MONOTONIC));
}

static void channel_connectivity_changed(void *cd, int iomgr_status_ignored) {
Expand Down
20 changes: 10 additions & 10 deletions src/cpp/client/channel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,16 @@ Channel::~Channel() { grpc_channel_destroy(c_channel_); }
Call Channel::CreateCall(const RpcMethod& method, ClientContext* context,
CompletionQueue* cq) {
const char* host_str = host_.empty() ? NULL : host_.c_str();
auto c_call =
method.channel_tag() && context->authority().empty()
? grpc_channel_create_registered_call(c_channel_, cq->cq(),
method.channel_tag(),
context->raw_deadline())
: grpc_channel_create_call(c_channel_, cq->cq(), method.name(),
context->authority().empty()
? host_str
: context->authority().c_str(),
context->raw_deadline());
auto c_call = method.channel_tag() && context->authority().empty()
? grpc_channel_create_registered_call(
c_channel_, NULL, GRPC_PROPAGATE_DEFAULTS, cq->cq(),
method.channel_tag(), context->raw_deadline())
: grpc_channel_create_call(
c_channel_, NULL, GRPC_PROPAGATE_DEFAULTS, cq->cq(),
method.name(), context->authority().empty()
? host_str
: context->authority().c_str(),
context->raw_deadline());
grpc_census_call_set_context(c_call, context->census_context());
GRPC_TIMER_MARK(GRPC_PTAG_CPP_CALL_CREATED, c_call);
context->set_call(c_call, shared_from_this());
Expand Down
Loading

0 comments on commit e5b0459

Please sign in to comment.