Skip to content

Commit

Permalink
Merge pull request grpc#1661 from ctiller/really-really-finish-the-st…
Browse files Browse the repository at this point in the history
…ream

Send RST_STREAM when closing from the server without getting a client…
  • Loading branch information
yang-g committed May 20, 2015
2 parents edc54b5 + d17403c commit a42c1fe
Show file tree
Hide file tree
Showing 21 changed files with 915 additions and 128 deletions.
250 changes: 248 additions & 2 deletions Makefile

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions src/core/transport/chttp2/frame.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,14 @@ typedef struct {
gpr_uint8 send_ping_ack;
gpr_uint8 process_ping_reply;
gpr_uint8 goaway;
gpr_uint8 rst_stream;

gpr_int64 initial_window_update;
gpr_uint32 window_update;
gpr_uint32 goaway_last_stream_index;
gpr_uint32 goaway_error;
gpr_slice goaway_text;
gpr_uint32 rst_stream_reason;
} grpc_chttp2_parse_state;

#define GRPC_CHTTP2_FRAME_DATA 0
Expand Down
40 changes: 40 additions & 0 deletions src/core/transport/chttp2/frame_rst_stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
*/

#include "src/core/transport/chttp2/frame_rst_stream.h"

#include <grpc/support/log.h>

#include "src/core/transport/chttp2/frame.h"

gpr_slice grpc_chttp2_rst_stream_create(gpr_uint32 id, gpr_uint32 code) {
Expand All @@ -54,3 +57,40 @@ gpr_slice grpc_chttp2_rst_stream_create(gpr_uint32 id, gpr_uint32 code) {

return slice;
}

grpc_chttp2_parse_error grpc_chttp2_rst_stream_parser_begin_frame(
grpc_chttp2_rst_stream_parser *parser, gpr_uint32 length, gpr_uint8 flags) {
if (length != 4) {
gpr_log(GPR_ERROR, "invalid rst_stream: length=%d, flags=%02x", length, flags);
return GRPC_CHTTP2_CONNECTION_ERROR;
}
parser->byte = 0;
return GRPC_CHTTP2_PARSE_OK;
}

grpc_chttp2_parse_error grpc_chttp2_rst_stream_parser_parse(
void *parser, grpc_chttp2_parse_state *state, gpr_slice slice,
int is_last) {
gpr_uint8 *const beg = GPR_SLICE_START_PTR(slice);
gpr_uint8 *const end = GPR_SLICE_END_PTR(slice);
gpr_uint8 *cur = beg;
grpc_chttp2_rst_stream_parser *p = parser;

while (p->byte != 4 && cur != end) {
p->reason_bytes[p->byte] = *cur;
cur++;
p->byte++;
}

if (p->byte == 4) {
GPR_ASSERT(is_last);
state->rst_stream = 1;
state->rst_stream_reason =
(((gpr_uint32)p->reason_bytes[0]) << 24) |
(((gpr_uint32)p->reason_bytes[1]) << 16) |
(((gpr_uint32)p->reason_bytes[2]) << 8) |
(((gpr_uint32)p->reason_bytes[3]));
}

return GRPC_CHTTP2_PARSE_OK;
}
11 changes: 11 additions & 0 deletions src/core/transport/chttp2/frame_rst_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,18 @@
#define GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_FRAME_RST_STREAM_H

#include <grpc/support/slice.h>
#include "src/core/transport/chttp2/frame.h"

typedef struct {
gpr_uint8 byte;
gpr_uint8 reason_bytes[4];
} grpc_chttp2_rst_stream_parser;

gpr_slice grpc_chttp2_rst_stream_create(gpr_uint32 stream_id, gpr_uint32 code);

grpc_chttp2_parse_error grpc_chttp2_rst_stream_parser_begin_frame(
grpc_chttp2_rst_stream_parser *parser, gpr_uint32 length, gpr_uint8 flags);
grpc_chttp2_parse_error grpc_chttp2_rst_stream_parser_parse(
void *parser, grpc_chttp2_parse_state *state, gpr_slice slice, int is_last);

#endif /* GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_FRAME_RST_STREAM_H */
116 changes: 75 additions & 41 deletions src/core/transport/chttp2_transport.c
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,13 @@ typedef enum {
WRITE_STATE_OPEN,
WRITE_STATE_QUEUED_CLOSE,
WRITE_STATE_SENT_CLOSE
} WRITE_STATE;
} write_state;

typedef enum {
DONT_SEND_CLOSED = 0,
SEND_CLOSED,
SEND_CLOSED_WITH_RST_STREAM
} send_closed;

typedef struct {
stream *head;
Expand Down Expand Up @@ -267,6 +273,7 @@ struct transport {
grpc_chttp2_window_update_parser window_update;
grpc_chttp2_settings_parser settings;
grpc_chttp2_ping_parser ping;
grpc_chttp2_rst_stream_parser rst_stream;
} simple_parsers;

/* goaway */
Expand Down Expand Up @@ -312,8 +319,8 @@ struct stream {
/* when the application requests writes be closed, the write_closed is
'queued'; when the close is flow controlled into the send path, we are
'sending' it; when the write has been performed it is 'sent' */
WRITE_STATE write_state;
gpr_uint8 send_closed;
write_state write_state;
send_closed send_closed;
gpr_uint8 read_closed;
gpr_uint8 cancelled;

Expand Down Expand Up @@ -937,7 +944,11 @@ static int prepare_write(transport *t) {

if (s->write_state == WRITE_STATE_QUEUED_CLOSE &&
s->outgoing_sopb->nops == 0) {
s->send_closed = 1;
if (!t->is_client && !s->read_closed) {
s->send_closed = SEND_CLOSED_WITH_RST_STREAM;
} else {
s->send_closed = SEND_CLOSED;
}
}
if (s->writing_sopb.nops > 0 || s->send_closed) {
stream_list_join(t, s, WRITING);
Expand Down Expand Up @@ -982,9 +993,12 @@ static void finalize_outbuf(transport *t) {

while ((s = stream_list_remove_head(t, WRITING))) {
grpc_chttp2_encode(s->writing_sopb.ops, s->writing_sopb.nops,
s->send_closed, s->id, &t->hpack_compressor, &t->outbuf);
s->send_closed != DONT_SEND_CLOSED, s->id, &t->hpack_compressor, &t->outbuf);
s->writing_sopb.nops = 0;
if (s->send_closed) {
if (s->send_closed == SEND_CLOSED_WITH_RST_STREAM) {
gpr_slice_buffer_add(&t->outbuf, grpc_chttp2_rst_stream_create(s->id, GRPC_CHTTP2_NO_ERROR));
}
if (s->send_closed != DONT_SEND_CLOSED) {
stream_list_join(t, s, WRITTEN_CLOSED);
}
}
Expand All @@ -999,9 +1013,10 @@ static void finish_write_common(transport *t, int success) {
}
while ((s = stream_list_remove_head(t, WRITTEN_CLOSED))) {
s->write_state = WRITE_STATE_SENT_CLOSE;
if (1||!s->cancelled) {
maybe_finish_read(t, s);
if (!t->is_client) {
s->read_closed = 1;
}
maybe_finish_read(t, s);
}
t->outbuf.count = 0;
t->outbuf.length = 0;
Expand Down Expand Up @@ -1214,12 +1229,14 @@ static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id,
if (s) {
/* clear out any unreported input & output: nobody cares anymore */
had_outgoing = s->outgoing_sopb && s->outgoing_sopb->nops != 0;
schedule_nuke_sopb(t, &s->parser.incoming_sopb);
if (s->outgoing_sopb) {
schedule_nuke_sopb(t, s->outgoing_sopb);
s->outgoing_sopb = NULL;
stream_list_remove(t, s, WRITABLE);
schedule_cb(t, s->send_done_closure, 0);
if (error_code != GRPC_CHTTP2_NO_ERROR) {
schedule_nuke_sopb(t, &s->parser.incoming_sopb);
if (s->outgoing_sopb) {
schedule_nuke_sopb(t, s->outgoing_sopb);
s->outgoing_sopb = NULL;
stream_list_remove(t, s, WRITABLE);
schedule_cb(t, s->send_done_closure, 0);
}
}
if (s->cancelled) {
send_rst = 0;
Expand All @@ -1228,31 +1245,34 @@ static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id,
s->cancelled = 1;
stream_list_join(t, s, CANCELLED);

gpr_ltoa(local_status, buffer);
add_incoming_metadata(
t, s,
grpc_mdelem_from_strings(t->metadata_context, "grpc-status", buffer));
if (!optional_message) {
switch (local_status) {
case GRPC_STATUS_CANCELLED:
add_incoming_metadata(
t, s, grpc_mdelem_from_strings(t->metadata_context,
"grpc-message", "Cancelled"));
break;
default:
break;
}
} else {
if (error_code != GRPC_CHTTP2_NO_ERROR) {
/* synthesize a status if we don't believe we'll get one */
gpr_ltoa(local_status, buffer);
add_incoming_metadata(
t, s,
grpc_mdelem_from_metadata_strings(
t->metadata_context,
grpc_mdstr_from_string(t->metadata_context, "grpc-message"),
grpc_mdstr_ref(optional_message)));
grpc_mdelem_from_strings(t->metadata_context, "grpc-status", buffer));
if (!optional_message) {
switch (local_status) {
case GRPC_STATUS_CANCELLED:
add_incoming_metadata(
t, s, grpc_mdelem_from_strings(t->metadata_context,
"grpc-message", "Cancelled"));
break;
default:
break;
}
} else {
add_incoming_metadata(
t, s,
grpc_mdelem_from_metadata_strings(
t->metadata_context,
grpc_mdstr_from_string(t->metadata_context, "grpc-message"),
grpc_mdstr_ref(optional_message)));
}
add_metadata_batch(t, s);
}
add_metadata_batch(t, s);
maybe_finish_read(t, s);
}
maybe_finish_read(t, s);
}
if (!id) send_rst = 0;
if (send_rst) {
Expand Down Expand Up @@ -1527,6 +1547,19 @@ static int init_ping_parser(transport *t) {
return ok;
}

static int init_rst_stream_parser(transport *t) {
int ok = GRPC_CHTTP2_PARSE_OK ==
grpc_chttp2_rst_stream_parser_begin_frame(&t->simple_parsers.rst_stream,
t->incoming_frame_size,
t->incoming_frame_flags);
if (!ok) {
drop_connection(t);
}
t->parser = grpc_chttp2_rst_stream_parser_parse;
t->parser_data = &t->simple_parsers.rst_stream;
return ok;
}

static int init_goaway_parser(transport *t) {
int ok =
GRPC_CHTTP2_PARSE_OK ==
Expand Down Expand Up @@ -1581,12 +1614,7 @@ static int init_frame_parser(transport *t) {
gpr_log(GPR_ERROR, "Unexpected CONTINUATION frame");
return 0;
case GRPC_CHTTP2_FRAME_RST_STREAM:
/* TODO(ctiller): actually parse the reason */
cancel_stream_id(
t, t->incoming_stream_id,
grpc_chttp2_http2_error_to_grpc_status(GRPC_CHTTP2_CANCEL),
GRPC_CHTTP2_CANCEL, 0);
return init_skip_frame(t, 0);
return init_rst_stream_parser(t);
case GRPC_CHTTP2_FRAME_SETTINGS:
return init_settings_frame_parser(t);
case GRPC_CHTTP2_FRAME_WINDOW_UPDATE:
Expand Down Expand Up @@ -1650,6 +1678,12 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) {
if (st.goaway) {
add_goaway(t, st.goaway_error, st.goaway_text);
}
if (st.rst_stream) {
cancel_stream_id(
t, t->incoming_stream_id,
grpc_chttp2_http2_error_to_grpc_status(st.rst_stream_reason),
st.rst_stream_reason, 0);
}
if (st.process_ping_reply) {
for (i = 0; i < t->ping_count; i++) {
if (0 ==
Expand Down
28 changes: 16 additions & 12 deletions src/node/test/end_to_end_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -286,20 +286,24 @@ describe('end-to-end', function() {
assert.ifError(err);
assert(response['send metadata']);
assert.strictEqual(response.read.toString(), requests[0]);
var end_batch = {};
end_batch[grpc.opType.RECV_CLOSE_ON_SERVER] = true;
end_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = {
'metadata': {},
'code': grpc.status.OK,
'details': status_text
};
end_batch[grpc.opType.RECV_MESSAGE] = true;
server_call.startBatch(end_batch, function(err, response) {
var snd_batch = {};
snd_batch[grpc.opType.RECV_MESSAGE] = true;
server_call.startBatch(snd_batch, function(err, response) {
assert.ifError(err);
assert(response['send status']);
assert(!response.cancelled);
assert.strictEqual(response.read.toString(), requests[1]);
done();
var end_batch = {};
end_batch[grpc.opType.RECV_CLOSE_ON_SERVER] = true;
end_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = {
'metadata': {},
'code': grpc.status.OK,
'details': status_text
};
server_call.startBatch(end_batch, function(err, response) {
assert.ifError(err);
assert(response['send status']);
assert(!response.cancelled);
done();
});
});
});
});
Expand Down
14 changes: 9 additions & 5 deletions test/core/end2end/cq_verifier.c
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,15 @@ static gpr_slice merge_slices(gpr_slice *slices, size_t nslices) {
}

static int byte_buffer_eq_slice(grpc_byte_buffer *bb, gpr_slice b) {
gpr_slice a =
merge_slices(bb->data.slice_buffer.slices, bb->data.slice_buffer.count);
int ok = GPR_SLICE_LENGTH(a) == GPR_SLICE_LENGTH(b) &&
0 == memcmp(GPR_SLICE_START_PTR(a), GPR_SLICE_START_PTR(b),
GPR_SLICE_LENGTH(a));
gpr_slice a;
int ok;

if (!bb) return 0;

a = merge_slices(bb->data.slice_buffer.slices, bb->data.slice_buffer.count);
ok = GPR_SLICE_LENGTH(a) == GPR_SLICE_LENGTH(b) &&
0 == memcmp(GPR_SLICE_START_PTR(a), GPR_SLICE_START_PTR(b),
GPR_SLICE_LENGTH(a));
gpr_slice_unref(a);
gpr_slice_unref(b);
return ok;
Expand Down
1 change: 1 addition & 0 deletions test/core/end2end/gen_build_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
'request_response_with_payload_and_call_creds': TestOptions(flaky=False, secure=True),
'request_with_large_metadata': default_test_options,
'request_with_payload': default_test_options,
'server_finishes_request': default_test_options,
'simple_delayed_request': default_test_options,
'simple_request': default_test_options,
'simple_request_with_high_initial_sequence_number': default_test_options,
Expand Down
23 changes: 15 additions & 8 deletions test/core/end2end/tests/invoke_large_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -171,23 +171,30 @@ static void test_invoke_large_request(grpc_end2end_test_config config) {
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op++;
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message = &request_payload_recv;
op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(102)));

cq_expect_completion(v_server, tag(102), 1);
cq_verify(v_server);

op = ops;
op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
op->data.recv_close_on_server.cancelled = &was_cancelled;
op++;
op->op = GRPC_OP_SEND_MESSAGE;
op->data.send_message = response_payload;
op++;
op = ops;
op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
op->data.send_status_from_server.trailing_metadata_count = 0;
op->data.send_status_from_server.status = GRPC_STATUS_UNIMPLEMENTED;
op->data.send_status_from_server.status_details = "xyz";
op++;
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message = &request_payload_recv;
op++;
op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
op->data.recv_close_on_server.cancelled = &was_cancelled;
op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(102)));
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(103)));

cq_expect_completion(v_server, tag(102), 1);
cq_expect_completion(v_server, tag(103), 1);
cq_verify(v_server);

cq_expect_completion(v_client, tag(1), 1);
Expand Down
3 changes: 1 addition & 2 deletions test/core/end2end/tests/max_message_length.c
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,7 @@ static void test_max_message_length(grpc_end2end_test_config config) {
cq_expect_completion(v_client, tag(1), 1);
cq_verify(v_client);

GPR_ASSERT(status == GRPC_STATUS_CANCELLED);
GPR_ASSERT(0 == strcmp(details, "Cancelled"));
GPR_ASSERT(status != GRPC_STATUS_OK);
GPR_ASSERT(0 == strcmp(call_details.method, "/foo"));
GPR_ASSERT(0 == strcmp(call_details.host, "foo.test.google.fr:1234"));
GPR_ASSERT(was_cancelled == 1);
Expand Down
Loading

0 comments on commit a42c1fe

Please sign in to comment.