Skip to content

Commit

Permalink
Keep LB policy alive during high freq of resolver updates
Browse files Browse the repository at this point in the history
  • Loading branch information
dgquintas committed Apr 13, 2017
1 parent 7f2a15a commit 956f700
Showing 1 changed file with 66 additions and 11 deletions.
77 changes: 66 additions & 11 deletions src/core/ext/filters/client_channel/client_channel.c
Original file line number Diff line number Diff line change
Expand Up @@ -236,14 +236,19 @@ static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx,
grpc_connectivity_state state,
grpc_error *error,
const char *reason) {
if ((state == GRPC_CHANNEL_TRANSIENT_FAILURE ||
state == GRPC_CHANNEL_SHUTDOWN) &&
chand->lb_policy != NULL) {
/* cancel picks with wait_for_ready=false */
grpc_lb_policy_cancel_picks_locked(
exec_ctx, chand->lb_policy,
/* mask= */ GRPC_INITIAL_METADATA_WAIT_FOR_READY,
/* check= */ 0, GRPC_ERROR_REF(error));
if (chand->lb_policy != NULL) {
if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
/* cancel picks with wait_for_ready=false */
grpc_lb_policy_cancel_picks_locked(
exec_ctx, chand->lb_policy,
/* mask= */ GRPC_INITIAL_METADATA_WAIT_FOR_READY,
/* check= */ 0, GRPC_ERROR_REF(error));
} else if (state == GRPC_CHANNEL_SHUTDOWN) {
/* cancel all picks */
grpc_lb_policy_cancel_picks_locked(exec_ctx, chand->lb_policy,
/* mask= */ 0, /* check= */ 0,
GRPC_ERROR_REF(error));
}
}
grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state, error,
reason);
Expand Down Expand Up @@ -346,6 +351,37 @@ static void parse_retry_throttle_params(const grpc_json *field, void *arg) {
}
}

// Wrap a closure associated with \a lb_policy. The associated callback (\a
// wrapped_on_pick_closure_cb) is responsible for unref'ing \a lb_policy after
// scheduling \a wrapped_closure.
typedef struct wrapped_on_pick_closure_arg {
/* the closure instance using this struct as argument */
grpc_closure wrapper_closure;

/* the original closure. Usually a on_complete/notify cb for pick() and ping()
* calls against the internal RR instance, respectively. */
grpc_closure *wrapped_closure;

/* The policy instance related to the closure */
grpc_lb_policy *lb_policy;

/* heap memory to be freed upon closure execution. Usually this arg. */
void *free_when_done;
} wrapped_on_pick_closure_arg;

// Invoke \a arg->wrapped_closure, unref \a arg->lb_policy and free
// arg->free_when_done (usually \a arg itself).
static void wrapped_on_pick_closure_cb(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
wrapped_on_pick_closure_arg *wc_arg = arg;
GPR_ASSERT(wc_arg->wrapped_closure != NULL);
GPR_ASSERT(wc_arg->lb_policy != NULL);
GPR_ASSERT(wc_arg->free_when_done != NULL);
grpc_closure_sched(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_REF(error));
GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->lb_policy, "pick_subchannel_wrapping");
gpr_free(wc_arg->free_when_done);
}

static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
void *arg, grpc_error *error) {
channel_data *chand = arg;
Expand Down Expand Up @@ -1031,11 +1067,30 @@ static bool pick_subchannel_locked(
const grpc_lb_policy_pick_args inputs = {
initial_metadata, initial_metadata_flags, &calld->lb_token_mdelem,
gpr_inf_future(GPR_CLOCK_MONOTONIC)};
const bool result = grpc_lb_policy_pick_locked(
exec_ctx, lb_policy, &inputs, connected_subchannel, NULL, on_ready);

// Wrap the user-provided callback in order to hold a strong reference to
// the LB policy for the duration of the pick.
wrapped_on_pick_closure_arg *w_on_pick_arg =
gpr_zalloc(sizeof(*w_on_pick_arg));
grpc_closure_init(&w_on_pick_arg->wrapper_closure,
wrapped_on_pick_closure_cb, w_on_pick_arg,
grpc_schedule_on_exec_ctx);
w_on_pick_arg->wrapped_closure = on_ready;
GRPC_LB_POLICY_REF(lb_policy, "pick_subchannel_wrapping");
w_on_pick_arg->lb_policy = lb_policy;
w_on_pick_arg->free_when_done = w_on_pick_arg;
const bool pick_done = grpc_lb_policy_pick_locked(
exec_ctx, lb_policy, &inputs, connected_subchannel, NULL,
&w_on_pick_arg->wrapper_closure);
if (pick_done) {
/* synchronous grpc_lb_policy_pick call. Unref the LB policy. */
GRPC_LB_POLICY_UNREF(exec_ctx, w_on_pick_arg->lb_policy,
"pick_subchannel_wrapping");
gpr_free(w_on_pick_arg->free_when_done);
}
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "pick_subchannel");
GPR_TIMER_END("pick_subchannel", 0);
return result;
return pick_done;
}
if (chand->resolver != NULL && !chand->started_resolving) {
chand->started_resolving = true;
Expand Down

0 comments on commit 956f700

Please sign in to comment.