Skip to content

Commit

Permalink
more review feedback addressed
Browse files Browse the repository at this point in the history
  • Loading branch information
makdharma committed Aug 17, 2016
1 parent 1b8deaa commit f8f8f5a
Showing 1 changed file with 83 additions and 47 deletions.
130 changes: 83 additions & 47 deletions src/core/ext/transport/cronet/transport/cronet_transport.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
#include "src/core/lib/support/string.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/transport/metadata_batch.h"
#include "src/core/lib/transport/static_metadata.h"
#include "src/core/lib/transport/transport_impl.h"
#include "third_party/objective_c/Cronet/cronet_c_for_grpc.h"

Expand All @@ -59,18 +60,13 @@
/* TODO (makdharma): Hook up into the wider tracing mechanism */
int grpc_cronet_trace = 0;

enum OP_RESULT {
enum e_op_result {
ACTION_TAKEN_WITH_CALLBACK,
ACTION_TAKEN_NO_CALLBACK,
NO_ACTION_POSSIBLE
};

/* Used for printing debug */
const char *op_result_string[] = {"ACTION_TAKEN_WITH_CALLBACK",
"ACTION_TAKEN_NO_CALLBACK",
"NO_ACTION_POSSIBLE"};

enum OP_ID {
enum e_op_id {
OP_SEND_INITIAL_METADATA = 0,
OP_SEND_MESSAGE,
OP_SEND_TRAILING_METADATA,
Expand All @@ -87,22 +83,7 @@ enum OP_ID {
OP_NUM_OPS
};

const char *op_id_string[] = {"OP_SEND_INITIAL_METADATA",
"OP_SEND_MESSAGE",
"OP_SEND_TRAILING_METADATA",
"OP_RECV_MESSAGE",
"OP_RECV_INITIAL_METADATA",
"OP_RECV_TRAILING_METADATA",
"OP_CANCEL_ERROR",
"OP_ON_COMPLETE",
"OP_FAILED",
"OP_SUCCEEDED",
"OP_CANCELED",
"OP_RECV_MESSAGE_AND_ON_COMPLETE",
"OP_READ_REQ_MADE",
"OP_NUM_OPS"};

/* Cronet callbacks */
/* Cronet callbacks. See cronet_c_for_grpc.h for documentation for each. */

static void on_request_headers_sent(cronet_bidirectional_stream *);
static void on_response_headers_received(
Expand Down Expand Up @@ -134,6 +115,8 @@ struct grpc_cronet_transport {
};
typedef struct grpc_cronet_transport grpc_cronet_transport;

/* TODO (makdharma): reorder structure for memory efficiency per
http://www.catb.org/esr/structure-packing/#_structure_reordering: */
struct read_state {
/* vars to store data coming from server */
char *read_buffer;
Expand Down Expand Up @@ -204,14 +187,61 @@ struct stream_obj {
};
typedef struct stream_obj stream_obj;

static enum OP_RESULT execute_stream_op(grpc_exec_ctx *exec_ctx,
struct op_and_state *oas);
static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
struct op_and_state *oas);

/*
Utility function to translate enum into string for printing
*/
static const char *op_result_string(enum e_op_result i) {
switch (i) {
case ACTION_TAKEN_WITH_CALLBACK:
return "ACTION_TAKEN_WITH_CALLBACK";
case ACTION_TAKEN_NO_CALLBACK:
return "ACTION_TAKEN_NO_CALLBACK";
case NO_ACTION_POSSIBLE:
return "NO_ACTION_POSSIBLE";
}
GPR_UNREACHABLE_CODE(return "UNKNOWN");
}

static const char *op_id_string(enum e_op_id i) {
switch (i) {
case OP_SEND_INITIAL_METADATA:
return "OP_SEND_INITIAL_METADATA";
case OP_SEND_MESSAGE:
return "OP_SEND_MESSAGE";
case OP_SEND_TRAILING_METADATA:
return "OP_SEND_TRAILING_METADATA";
case OP_RECV_MESSAGE:
return "OP_RECV_MESSAGE";
case OP_RECV_INITIAL_METADATA:
return "OP_RECV_INITIAL_METADATA";
case OP_RECV_TRAILING_METADATA:
return "OP_RECV_TRAILING_METADATA";
case OP_CANCEL_ERROR:
return "OP_CANCEL_ERROR";
case OP_ON_COMPLETE:
return "OP_ON_COMPLETE";
case OP_FAILED:
return "OP_FAILED";
case OP_SUCCEEDED:
return "OP_SUCCEEDED";
case OP_CANCELED:
return "OP_CANCELED";
case OP_RECV_MESSAGE_AND_ON_COMPLETE:
return "OP_RECV_MESSAGE_AND_ON_COMPLETE";
case OP_READ_REQ_MADE:
return "OP_READ_REQ_MADE";
case OP_NUM_OPS:
return "OP_NUM_OPS";
}
}

/*
Add a new stream op to op storage.
*/
static void add_to_storage(struct stream_obj *s, grpc_transport_stream_op *op) {
gpr_mu_lock(&s->mu);
struct op_storage *storage = &s->storage;
/* add new op at the beginning of the linked list. The memory is freed
in remove_from_storage */
Expand All @@ -220,6 +250,7 @@ static void add_to_storage(struct stream_obj *s, grpc_transport_stream_op *op) {
memset(&new_op->state, 0, sizeof(new_op->state));
new_op->s = s;
new_op->done = false;
gpr_mu_lock(&s->mu);
new_op->next = storage->head;
storage->head = new_op;
storage->num_pending_ops++;
Expand Down Expand Up @@ -271,9 +302,9 @@ static void execute_from_storage(stream_obj *s) {
for (struct op_and_state *curr = s->storage.head; curr != NULL;) {
CRONET_LOG(GPR_DEBUG, "calling op at %p. done = %d", curr, curr->done);
GPR_ASSERT(curr->done == 0);
enum OP_RESULT result = execute_stream_op(&exec_ctx, curr);
enum e_op_result result = execute_stream_op(&exec_ctx, curr);
CRONET_LOG(GPR_DEBUG, "execute_stream_op[%p] returns %s", curr,
op_result_string[result]);
op_result_string(result));
/* if this op is done, then remove it and free memory */
if (curr->done) {
struct op_and_state *next = curr->next;
Expand Down Expand Up @@ -372,8 +403,7 @@ static void on_response_headers_received(
memset(&s->state.rs.initial_metadata, 0,
sizeof(s->state.rs.initial_metadata));
grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.initial_metadata);
unsigned int i = 0;
for (i = 0; i < headers->count; i++) {
for (size_t i = 0; i < headers->count; i++) {
grpc_chttp2_incoming_metadata_buffer_add(
&s->state.rs.initial_metadata,
grpc_mdelem_from_metadata_strings(
Expand Down Expand Up @@ -439,8 +469,7 @@ static void on_response_trailers_received(
sizeof(s->state.rs.trailing_metadata));
s->state.rs.trailing_metadata_valid = false;
grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.trailing_metadata);
unsigned int i = 0;
for (i = 0; i < trailers->count; i++) {
for (size_t i = 0; i < trailers->count; i++) {
CRONET_LOG(GPR_DEBUG, "trailer key=%s, value=%s", trailers->headers[i].key,
trailers->headers[i].value);
grpc_chttp2_incoming_metadata_buffer_add(
Expand All @@ -460,10 +489,10 @@ static void on_response_trailers_received(
*/
static void create_grpc_frame(gpr_slice_buffer *write_slice_buffer,
char **pp_write_buffer,
int *p_write_buffer_size) {
size_t *p_write_buffer_size) {
gpr_slice slice = gpr_slice_buffer_take_first(write_slice_buffer);
size_t length = GPR_SLICE_LENGTH(slice);
*p_write_buffer_size = (int)length + GRPC_HEADER_SIZE_IN_BYTES;
*p_write_buffer_size = length + GRPC_HEADER_SIZE_IN_BYTES;
/* This is freed in the on_write_completed callback */
char *write_buffer = gpr_malloc(length + GRPC_HEADER_SIZE_IN_BYTES);
*pp_write_buffer = write_buffer;
Expand Down Expand Up @@ -500,7 +529,8 @@ static void convert_metadata_to_cronet_headers(

/* Walk the linked list again, this time copying the header fields.
s->num_headers can be less than num_headers_available, as some headers
are not used for cronet
are not used for cronet.
TODO (makdharma): Eliminate need to traverse the LL second time for perf.
*/
curr = head;
int num_headers = 0;
Expand All @@ -509,12 +539,12 @@ static void convert_metadata_to_cronet_headers(
curr = curr->next;
const char *key = grpc_mdstr_as_c_string(mdelem->key);
const char *value = grpc_mdstr_as_c_string(mdelem->value);
if (strcmp(key, ":scheme") == 0 || strcmp(key, ":method") == 0 ||
strcmp(key, ":authority") == 0) {
if (mdelem->key == GRPC_MDSTR_METHOD || mdelem->key == GRPC_MDSTR_SCHEME ||
mdelem->key == GRPC_MDSTR_AUTHORITY) {
/* Cronet populates these fields on its own */
continue;
}
if (strcmp(key, ":path") == 0) {
if (mdelem->key == GRPC_MDSTR_PATH) {
/* Create URL by appending :path value to the hostname */
gpr_asprintf(pp_url, "https://%s%s", host, value);
continue;
Expand Down Expand Up @@ -546,13 +576,14 @@ static int parse_grpc_header(const uint8_t *data) {
*/
static bool op_can_be_run(grpc_transport_stream_op *curr_op,
struct op_state *stream_state,
struct op_state *op_state, enum OP_ID op_id) {
struct op_state *op_state, enum e_op_id op_id) {
bool result = true;
/* When call is canceled, every op can be run, except under following
conditions
*/
if (stream_state->state_op_done[OP_CANCEL_ERROR] ||
stream_state->state_callback_received[OP_FAILED]) {
bool is_canceled_of_failed = stream_state->state_op_done[OP_CANCEL_ERROR] ||
stream_state->state_callback_received[OP_FAILED];
if (is_canceled_of_failed) {
if (op_id == OP_SEND_INITIAL_METADATA) result = false;
if (op_id == OP_SEND_MESSAGE) result = false;
if (op_id == OP_SEND_TRAILING_METADATA) result = false;
Expand Down Expand Up @@ -678,17 +709,20 @@ static bool op_can_be_run(grpc_transport_stream_op *curr_op,
!stream_state->state_callback_received[OP_SEND_MESSAGE])
result = false;
}
CRONET_LOG(GPR_DEBUG, "op_can_be_run %s : %s", op_id_string[op_id],
CRONET_LOG(GPR_DEBUG, "op_can_be_run %s : %s", op_id_string(op_id),
result ? "YES" : "NO");
return result;
}

static enum OP_RESULT execute_stream_op(grpc_exec_ctx *exec_ctx,
struct op_and_state *oas) {
/*
TODO (makdharma): Break down this function in smaller chunks for readability.
*/
static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
struct op_and_state *oas) {
grpc_transport_stream_op *stream_op = &oas->op;
struct stream_obj *s = oas->s;
struct op_state *stream_state = &s->state;
enum OP_RESULT result = NO_ACTION_POSSIBLE;
enum e_op_result result = NO_ACTION_POSSIBLE;
if (stream_op->send_initial_metadata &&
op_can_be_run(stream_op, stream_state, &oas->state,
OP_SEND_INITIAL_METADATA)) {
Expand Down Expand Up @@ -743,19 +777,21 @@ static enum OP_RESULT execute_stream_op(grpc_exec_ctx *exec_ctx,
grpc_byte_stream_next(NULL, stream_op->send_message, &slice,
stream_op->send_message->length, NULL);
/* Check that compression flag is OFF. We don't support compression yet. */
gpr_log(GPR_ERROR, "Compression is not supported");
GPR_ASSERT(stream_op->send_message->flags == 0);
gpr_slice_buffer_add(&write_slice_buffer, slice);
gpr_log(GPR_ERROR, "Empty request is not supported");
GPR_ASSERT(write_slice_buffer.count ==
1); /* Empty request not handled yet */
if (write_slice_buffer.count > 0) {
int write_buffer_size;
size_t write_buffer_size;
create_grpc_frame(&write_slice_buffer, &stream_state->ws.write_buffer,
&write_buffer_size);
CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_write (%p, %p)",
s->cbs, stream_state->ws.write_buffer);
stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
cronet_bidirectional_stream_write(s->cbs, stream_state->ws.write_buffer,
write_buffer_size, false);
(int)write_buffer_size, false);
result = ACTION_TAKEN_WITH_CALLBACK;
}
stream_state->state_op_done[OP_SEND_MESSAGE] = true;
Expand Down

0 comments on commit f8f8f5a

Please sign in to comment.