Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

prep work for enabling caching #7862

Merged
merged 11 commits into from
Sep 1, 2016
Next Next commit
prep work for enabling caching
Added new header grpc-payload-bin
Added new channel arg for setting max payload size
Ability to create a GET request in client filter
Ability to parse the payload from header in server filter.
  • Loading branch information
makdharma committed Aug 24, 2016
commit 178f4bc24da2b1d4d9f52c010be5a1f63d1c4224
6 changes: 5 additions & 1 deletion include/grpc/impl/codegen/grpc_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -222,10 +222,14 @@ typedef enum grpc_call_error {
#define GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST (0x00000010u)
/** Signal that the call should not return UNAVAILABLE before it has started */
#define GRPC_INITIAL_METADATA_IGNORE_CONNECTIVITY (0x00000020u)
/** Signal that the call is cacheable. GRPC is free to use GET verb */
#define GRPC_INITIAL_METADATA_CACHEABLE_REQUEST (0x00000040u)

/** Mask of all valid flags */
#define GRPC_INITIAL_METADATA_USED_MASK \
(GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST | \
GRPC_INITIAL_METADATA_IGNORE_CONNECTIVITY)
GRPC_INITIAL_METADATA_IGNORE_CONNECTIVITY | \
GRPC_INITIAL_METADATA_CACHEABLE_REQUEST )

/** A single metadata element */
typedef struct grpc_metadata {
Expand Down
58 changes: 52 additions & 6 deletions src/core/lib/channel/http_client_filter.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ typedef struct call_data {
grpc_linked_mdelem te_trailers;
grpc_linked_mdelem content_type;
grpc_linked_mdelem user_agent;
grpc_linked_mdelem payload_bin;

grpc_metadata_batch *recv_initial_metadata;

Expand All @@ -64,6 +65,7 @@ typedef struct call_data {
typedef struct channel_data {
grpc_mdelem *static_scheme;
grpc_mdelem *user_agent;
size_t max_payload_size_for_get;
} channel_data;

typedef struct {
Expand Down Expand Up @@ -134,17 +136,43 @@ static void hc_mutate_op(grpc_call_element *elem,
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;

/* Decide which HTTP VERB to use */
grpc_mdelem *method = GRPC_MDELEM_METHOD_POST;
Copy link
Member

@ctiller ctiller Aug 31, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This whole block (up until line 261) could probably go into an if (op->send_initial_metadata) right?
That way we'd skip doing some complicated processing on other code paths (like, say, recv_initial_metadata)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

if ((op->send_initial_metadata_flags &
GRPC_INITIAL_METADATA_CACHEABLE_REQUEST) &&
op->send_message != NULL &&
op->send_message->length < channeld->max_payload_size_for_get) {
method = GRPC_MDELEM_METHOD_GET;
} else if (op->send_initial_metadata_flags &
GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) {
method = GRPC_MDELEM_METHOD_PUT;
}

if (method == GRPC_MDELEM_METHOD_GET) {
gpr_slice slice;
/* TODO (makdharma): extend code for messages with multiple slices */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs to be completed before submission

if (grpc_byte_stream_next(NULL, op->send_message, &slice,
op->send_message->length, NULL)) {
grpc_mdelem *payload_bin = grpc_mdelem_from_metadata_strings(
GRPC_MDSTR_GRPC_PAYLOAD_BIN,
grpc_mdstr_from_buffer(GPR_SLICE_START_PTR(slice),
GPR_SLICE_LENGTH(slice)));
grpc_metadata_batch_add_tail(op->send_initial_metadata,
&calld->payload_bin, payload_bin);
} else {
gpr_log(GPR_ERROR, "send_message could not be read");
}
op->send_message = NULL;
}

if (op->send_initial_metadata != NULL) {
grpc_metadata_batch_filter(op->send_initial_metadata, client_strip_filter,
elem);
/* Send : prefixed headers, which have to be before any application
layer headers. */
grpc_metadata_batch_add_head(
op->send_initial_metadata, &calld->method,
op->send_initial_metadata_flags &
GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST
? GRPC_MDELEM_METHOD_PUT
: GRPC_MDELEM_METHOD_POST);
grpc_metadata_batch_add_head(op->send_initial_metadata, &calld->method,
method);
grpc_metadata_batch_add_head(op->send_initial_metadata, &calld->scheme,
channeld->static_scheme);
grpc_metadata_batch_add_tail(op->send_initial_metadata, &calld->te_trailers,
Expand Down Expand Up @@ -210,6 +238,22 @@ static grpc_mdelem *scheme_from_args(const grpc_channel_args *args) {
return GRPC_MDELEM_SCHEME_HTTP;
}

static size_t max_payload_size_from_args(const grpc_channel_args *args) {
if (args != NULL) {
for (size_t i = 0; i < args->num_args; ++i) {
if (0 == strcmp(args->args[i].key, GRPC_ARG_MAX_PAYLOAD_SIZE_FOR_GET)) {
if (args->args[i].type != GRPC_ARG_INTEGER) {
gpr_log(GPR_ERROR, "%s: must be an integer",
GRPC_ARG_MAX_PAYLOAD_SIZE_FOR_GET);
} else {
return args->args[i].value.integer;
}
}
}
}
return kMaxPayloadSizeForGet;
}

static grpc_mdstr *user_agent_from_args(const grpc_channel_args *args,
const char *transport_name) {
gpr_strvec v;
Expand Down Expand Up @@ -267,6 +311,8 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(!args->is_last);
GPR_ASSERT(args->optional_transport != NULL);
chand->static_scheme = scheme_from_args(args->channel_args);
chand->max_payload_size_for_get =
max_payload_size_from_args(args->channel_args);
chand->user_agent = grpc_mdelem_from_metadata_strings(
GRPC_MDSTR_USER_AGENT,
user_agent_from_args(args->channel_args,
Expand Down
4 changes: 4 additions & 0 deletions src/core/lib/channel/http_client_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,8 @@ extern const grpc_channel_filter grpc_http_client_filter;
/* Channel arg to override the http2 :scheme header */
#define GRPC_ARG_HTTP2_SCHEME "grpc.http2_scheme"

/* Channel arg to determine maximum size of payload eligable for GET request */
#define GRPC_ARG_MAX_PAYLOAD_SIZE_FOR_GET "grpc.max_payload_size_for_get"
static const size_t kMaxPayloadSizeForGet = 2048;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move to implementation file (no need for this to be exposed)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to expose this channel arg. It is the only way a client app can prevent GET requests at run-time.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the kMaxPayloadSizeForGet needn't be exposed though

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ping on this one.


#endif /* GRPC_CORE_LIB_CHANNEL_HTTP_CLIENT_FILTER_H */
69 changes: 66 additions & 3 deletions src/core/lib/channel/http_server_filter.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,28 @@ typedef struct call_data {
uint8_t seen_scheme;
uint8_t seen_te_trailers;
uint8_t seen_authority;
uint8_t seen_payload_bin;
grpc_linked_mdelem status;
grpc_linked_mdelem content_type;

grpc_metadata_batch *recv_initial_metadata;
bool *recv_idempotent_request;
/** Closure to call when finished with the hs_on_recv hook */
grpc_closure *on_done_recv;
/** Closure to call when we retrieve read message from the payload-bin header
*/
grpc_closure *recv_message_ready;
grpc_closure *on_complete;
grpc_byte_stream **pp_recv_message;
gpr_slice_buffer read_slice_buffer;
grpc_slice_buffer_stream read_stream;

/** Receive closures are chained: we inject this closure as the on_done_recv
up-call on transport_op, and remember to call our on_done_recv member
after handling it. */
grpc_closure hs_on_recv;
grpc_closure hs_on_complete;
grpc_closure hs_recv_message_ready;
} call_data;

typedef struct channel_data { uint8_t unused; } channel_data;
Expand All @@ -76,14 +87,14 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {

/* Check if it is one of the headers we care about. */
if (md == GRPC_MDELEM_TE_TRAILERS || md == GRPC_MDELEM_METHOD_POST ||
md == GRPC_MDELEM_METHOD_PUT || md == GRPC_MDELEM_SCHEME_HTTP ||
md == GRPC_MDELEM_SCHEME_HTTPS ||
md == GRPC_MDELEM_METHOD_PUT || md == GRPC_MDELEM_METHOD_GET ||
md == GRPC_MDELEM_SCHEME_HTTP || md == GRPC_MDELEM_SCHEME_HTTPS ||
md == GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC) {
/* swallow it */
if (md == GRPC_MDELEM_METHOD_POST) {
calld->seen_method = 1;
*calld->recv_idempotent_request = false;
} else if (md == GRPC_MDELEM_METHOD_PUT) {
} else if (md == GRPC_MDELEM_METHOD_PUT || md == GRPC_MDELEM_METHOD_GET) {
calld->seen_method = 1;
*calld->recv_idempotent_request = true;
} else if (md->key == GRPC_MDSTR_SCHEME) {
Expand Down Expand Up @@ -137,6 +148,16 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
GRPC_MDSTR_AUTHORITY, GRPC_MDSTR_REF(md->value));
calld->seen_authority = 1;
return authority;
} else if (md->key == GRPC_MDSTR_GRPC_PAYLOAD_BIN) {
/* Retrieve the payload from the value of the 'grpc-internal-payload-bin'
header field */
calld->seen_payload_bin = 1;
gpr_slice_buffer_init(&calld->read_slice_buffer);
gpr_slice_buffer_add(&calld->read_slice_buffer,
gpr_slice_ref(md->value->slice));
grpc_slice_buffer_stream_init(&calld->read_stream,
&calld->read_slice_buffer, 0);
return NULL;
} else {
return md;
}
Expand Down Expand Up @@ -189,6 +210,33 @@ static void hs_on_recv(grpc_exec_ctx *exec_ctx, void *user_data,
GRPC_ERROR_UNREF(err);
}

static void hs_on_complete(grpc_exec_ctx *exec_ctx, void *user_data,
grpc_error *err) {
grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
/* Call recv_message_ready if we got the payload via the header field */
if (calld->seen_payload_bin && calld->recv_message_ready != NULL) {
*calld->pp_recv_message = (grpc_byte_stream *)&calld->read_stream;
calld->recv_message_ready->cb(exec_ctx, calld->recv_message_ready->cb_arg,
err);
calld->recv_message_ready = NULL;
}
calld->on_complete->cb(exec_ctx, calld->on_complete->cb_arg, err);
}

static void hs_recv_message_ready(grpc_exec_ctx *exec_ctx, void *user_data,
grpc_error *err) {
grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
if (calld->seen_payload_bin) {
/* do nothing. This is probably a GET request, and payload will be returned
in hs_on_complete callback. */
} else {
calld->recv_message_ready->cb(exec_ctx, calld->recv_message_ready->cb_arg,
err);
}
}

static void hs_mutate_op(grpc_call_element *elem,
grpc_transport_stream_op *op) {
/* grab pointers to our data from the call element */
Expand All @@ -203,6 +251,11 @@ static void hs_mutate_op(grpc_call_element *elem,
GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC);
}

if (op->on_complete) {
calld->on_complete = op->on_complete;
op->on_complete = &calld->hs_on_complete;
}

if (op->recv_initial_metadata) {
/* substitute our callback for the higher callback */
GPR_ASSERT(op->recv_idempotent_request != NULL);
Expand All @@ -211,6 +264,14 @@ static void hs_mutate_op(grpc_call_element *elem,
calld->on_done_recv = op->recv_initial_metadata_ready;
op->recv_initial_metadata_ready = &calld->hs_on_recv;
}

if (op->recv_message) {
calld->recv_message_ready = op->recv_message_ready;
calld->pp_recv_message = op->recv_message;
if (op->recv_message_ready) {
op->recv_message_ready = &calld->hs_recv_message_ready;
}
}
}

static void hs_start_transport_op(grpc_exec_ctx *exec_ctx,
Expand All @@ -232,6 +293,8 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
/* initialize members */
memset(calld, 0, sizeof(*calld));
grpc_closure_init(&calld->hs_on_recv, hs_on_recv, elem);
grpc_closure_init(&calld->hs_on_complete, hs_on_complete, elem);
grpc_closure_init(&calld->hs_recv_message_ready, hs_recv_message_ready, elem);
return GRPC_ERROR_NONE;
}

Expand Down
Loading