Skip to content

Commit

Permalink
Merge pull request grpc#1396 from ctiller/one-read
Browse files Browse the repository at this point in the history
Deliver data up from tcp immediately
  • Loading branch information
yang-g committed May 4, 2015
2 parents e901ff5 + 534874b commit 539af31
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 69 deletions.
156 changes: 89 additions & 67 deletions src/core/iomgr/tcp_posix.c
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,8 @@ typedef struct {
grpc_endpoint base;
grpc_fd *em_fd;
int fd;
int iov_size; /* Number of slices to allocate per read attempt */
int finished_edge;
size_t slice_size;
gpr_refcount refcount;

Expand Down Expand Up @@ -315,9 +317,7 @@ static void call_read_cb(grpc_tcp *tcp, gpr_slice *slices, size_t nslices,

#define INLINE_SLICE_BUFFER_SIZE 8
#define MAX_READ_IOVEC 4
static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success) {
grpc_tcp *tcp = (grpc_tcp *)arg;
int iov_size = 1;
static void grpc_tcp_continue_read(grpc_tcp *tcp) {
gpr_slice static_read_slices[INLINE_SLICE_BUFFER_SIZE];
struct msghdr msg;
struct iovec iov[MAX_READ_IOVEC];
Expand All @@ -327,96 +327,116 @@ static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success) {
gpr_slice *final_slices;
size_t final_nslices;

GPR_ASSERT(!tcp->finished_edge);
GRPC_TIMER_BEGIN(GRPC_PTAG_HANDLE_READ, 0);
slice_state_init(&read_state, static_read_slices, INLINE_SLICE_BUFFER_SIZE,
0);

if (!success) {
call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_SHUTDOWN);
grpc_tcp_unref(tcp);
return;
allocated_bytes = slice_state_append_blocks_into_iovec(
&read_state, iov, tcp->iov_size, tcp->slice_size);

msg.msg_name = NULL;
msg.msg_namelen = 0;
msg.msg_iov = iov;
msg.msg_iovlen = tcp->iov_size;
msg.msg_control = NULL;
msg.msg_controllen = 0;
msg.msg_flags = 0;

GRPC_TIMER_MARK(RECVMSG_BEGIN, 0);
do {
read_bytes = recvmsg(tcp->fd, &msg, 0);
} while (read_bytes < 0 && errno == EINTR);
GRPC_TIMER_MARK(RECVMSG_END, 0);

if (read_bytes < allocated_bytes) {
/* TODO(klempner): Consider a second read first, in hopes of getting a
* quick EAGAIN and saving a bunch of allocations. */
slice_state_remove_last(&read_state, read_bytes < 0
? allocated_bytes
: allocated_bytes - read_bytes);
}

/* TODO(klempner): Limit the amount we read at once. */
for (;;) {
allocated_bytes = slice_state_append_blocks_into_iovec(
&read_state, iov, iov_size, tcp->slice_size);

msg.msg_name = NULL;
msg.msg_namelen = 0;
msg.msg_iov = iov;
msg.msg_iovlen = iov_size;
msg.msg_control = NULL;
msg.msg_controllen = 0;
msg.msg_flags = 0;

GRPC_TIMER_BEGIN(GRPC_PTAG_RECVMSG, 0);
do {
read_bytes = recvmsg(tcp->fd, &msg, 0);
} while (read_bytes < 0 && errno == EINTR);
GRPC_TIMER_END(GRPC_PTAG_RECVMSG, 0);

if (read_bytes < allocated_bytes) {
/* TODO(klempner): Consider a second read first, in hopes of getting a
* quick EAGAIN and saving a bunch of allocations. */
slice_state_remove_last(&read_state, read_bytes < 0
? allocated_bytes
: allocated_bytes - read_bytes);
}

if (read_bytes < 0) {
/* NB: After calling the user_cb a parallel call of the read handler may
* be running. */
if (errno == EAGAIN) {
if (slice_state_has_available(&read_state)) {
/* TODO(klempner): We should probably do the call into the application
without all this junk on the stack */
/* FIXME(klempner): Refcount properly */
slice_state_transfer_ownership(&read_state, &final_slices,
&final_nslices);
call_read_cb(tcp, final_slices, final_nslices, GRPC_ENDPOINT_CB_OK);
slice_state_destroy(&read_state);
grpc_tcp_unref(tcp);
} else {
/* Spurious read event, consume it here */
slice_state_destroy(&read_state);
grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure);
}
} else {
/* TODO(klempner): Log interesting errors */
call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_ERROR);
slice_state_destroy(&read_state);
grpc_tcp_unref(tcp);
if (read_bytes < 0) {
/* NB: After calling the user_cb a parallel call of the read handler may
* be running. */
if (errno == EAGAIN) {
if (tcp->iov_size > 1) {
tcp->iov_size /= 2;
}
return;
} else if (read_bytes == 0) {
/* 0 read size ==> end of stream */
if (slice_state_has_available(&read_state)) {
/* there were bytes already read: pass them up to the application */
/* TODO(klempner): We should probably do the call into the application
without all this junk on the stack */
/* FIXME(klempner): Refcount properly */
slice_state_transfer_ownership(&read_state, &final_slices,
&final_nslices);
call_read_cb(tcp, final_slices, final_nslices, GRPC_ENDPOINT_CB_EOF);
tcp->finished_edge = 1;
call_read_cb(tcp, final_slices, final_nslices, GRPC_ENDPOINT_CB_OK);
slice_state_destroy(&read_state);
grpc_tcp_unref(tcp);
} else {
call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_EOF);
/* We've consumed the edge, request a new one */
slice_state_destroy(&read_state);
grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure);
}
} else {
/* TODO(klempner): Log interesting errors */
call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_ERROR);
slice_state_destroy(&read_state);
grpc_tcp_unref(tcp);
return;
} else if (iov_size < MAX_READ_IOVEC) {
++iov_size;
}
} else if (read_bytes == 0) {
/* 0 read size ==> end of stream */
if (slice_state_has_available(&read_state)) {
/* there were bytes already read: pass them up to the application */
slice_state_transfer_ownership(&read_state, &final_slices,
&final_nslices);
call_read_cb(tcp, final_slices, final_nslices, GRPC_ENDPOINT_CB_EOF);
} else {
call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_EOF);
}
slice_state_destroy(&read_state);
grpc_tcp_unref(tcp);
} else {
if (tcp->iov_size < MAX_READ_IOVEC) {
++tcp->iov_size;
}
GPR_ASSERT(slice_state_has_available(&read_state));
slice_state_transfer_ownership(&read_state, &final_slices,
&final_nslices);
call_read_cb(tcp, final_slices, final_nslices, GRPC_ENDPOINT_CB_OK);
slice_state_destroy(&read_state);
grpc_tcp_unref(tcp);
}

GRPC_TIMER_END(GRPC_PTAG_HANDLE_READ, 0);
}

static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success) {
grpc_tcp *tcp = (grpc_tcp *)arg;
GPR_ASSERT(!tcp->finished_edge);

if (!success) {
call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_SHUTDOWN);
grpc_tcp_unref(tcp);
} else {
grpc_tcp_continue_read(tcp);
}
}

static void grpc_tcp_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb,
void *user_data) {
grpc_tcp *tcp = (grpc_tcp *)ep;
GPR_ASSERT(tcp->read_cb == NULL);
tcp->read_cb = cb;
tcp->read_user_data = user_data;
gpr_ref(&tcp->refcount);
grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure);
if (tcp->finished_edge) {
tcp->finished_edge = 0;
grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure);
} else {
grpc_iomgr_add_callback(grpc_tcp_handle_read, tcp);
}
}

#define MAX_WRITE_IOVEC 16
Expand Down Expand Up @@ -554,6 +574,8 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size) {
tcp->read_user_data = NULL;
tcp->write_user_data = NULL;
tcp->slice_size = slice_size;
tcp->iov_size = 1;
tcp->finished_edge = 1;
slice_state_init(&tcp->write_state, NULL, 0, 0);
/* paired with unref in grpc_tcp_destroy */
gpr_ref_init(&tcp->refcount, 1);
Expand Down
7 changes: 5 additions & 2 deletions src/core/surface/call.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "src/core/surface/call.h"
#include "src/core/channel/channel_stack.h"
#include "src/core/iomgr/alarm.h"
#include "src/core/profiling/timers.h"
#include "src/core/support/string.h"
#include "src/core/surface/byte_buffer_queue.h"
#include "src/core/surface/channel.h"
Expand Down Expand Up @@ -405,14 +406,14 @@ static void lock(grpc_call *call) { gpr_mu_lock(&call->mu); }

static int need_more_data(grpc_call *call) {
return is_op_live(call, GRPC_IOREQ_RECV_INITIAL_METADATA) ||
is_op_live(call, GRPC_IOREQ_RECV_MESSAGE) ||
(is_op_live(call, GRPC_IOREQ_RECV_MESSAGE) && grpc_bbq_empty(&call->incoming_queue)) ||
is_op_live(call, GRPC_IOREQ_RECV_TRAILING_METADATA) ||
is_op_live(call, GRPC_IOREQ_RECV_STATUS) ||
is_op_live(call, GRPC_IOREQ_RECV_STATUS_DETAILS) ||
(is_op_live(call, GRPC_IOREQ_RECV_CLOSE) &&
grpc_bbq_empty(&call->incoming_queue)) ||
(call->write_state == WRITE_STATE_INITIAL && !call->is_client &&
call->read_state != READ_STATE_STREAM_CLOSED);
call->read_state < READ_STATE_GOT_INITIAL_METADATA);
}

static void unlock(grpc_call *call) {
Expand Down Expand Up @@ -685,6 +686,7 @@ static int add_slice_to_message(grpc_call *call, gpr_slice slice) {
static void call_on_done_recv(void *pc, int success) {
grpc_call *call = pc;
size_t i;
GRPC_TIMER_MARK(CALL_ON_DONE_RECV_BEGIN, 0);
lock(call);
call->receiving = 0;
if (success) {
Expand Down Expand Up @@ -729,6 +731,7 @@ static void call_on_done_recv(void *pc, int success) {
unlock(call);

GRPC_CALL_INTERNAL_UNREF(call, "receiving", 0);
GRPC_TIMER_MARK(CALL_ON_DONE_RECV_END, 0);
}

static grpc_mdelem_list chain_metadata_from_app(grpc_call *call, size_t count,
Expand Down
1 change: 1 addition & 0 deletions src/core/surface/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ void grpc_init(void) {
grpc_register_tracer("channel", &grpc_trace_channel);
grpc_register_tracer("surface", &grpc_surface_trace);
grpc_register_tracer("http", &grpc_http_trace);
grpc_register_tracer("flowctl", &grpc_flowctl_trace);
grpc_register_tracer("batch", &grpc_trace_batch);
grpc_security_pre_init();
grpc_iomgr_init();
Expand Down
29 changes: 29 additions & 0 deletions src/core/transport/chttp2_transport.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include <stdio.h>
#include <string.h>

#include "src/core/profiling/timers.h"
#include "src/core/support/string.h"
#include "src/core/transport/chttp2/frame_data.h"
#include "src/core/transport/chttp2/frame_goaway.h"
Expand Down Expand Up @@ -64,6 +65,7 @@
#define CLIENT_CONNECT_STRLEN 24

int grpc_http_trace = 0;
int grpc_flowctl_trace = 0;

typedef struct transport transport;
typedef struct stream stream;
Expand All @@ -74,6 +76,12 @@ typedef struct stream stream;
else \
stmt

#define FLOWCTL_TRACE(t, obj, dir, id, delta) \
if (!grpc_flowctl_trace) \
; \
else \
flowctl_trace(t, #dir, obj->dir##_window, id, delta)

/* streams are kept in various linked lists depending on what things need to
happen to them... this enum labels each list */
typedef enum {
Expand Down Expand Up @@ -382,6 +390,12 @@ static void add_to_pollset_locked(transport *t, grpc_pollset *pollset);
static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op);
static void add_metadata_batch(transport *t, stream *s);

static void flowctl_trace(transport *t, const char *flow, gpr_int32 window,
gpr_uint32 id, gpr_int32 delta) {
gpr_log(GPR_DEBUG, "HTTP:FLOW:%p:%d:%s: %d + %d = %d", t, id, flow, window,
delta, window + delta);
}

/*
* CONSTRUCTION/DESTRUCTION/REFCOUNTING
*/
Expand Down Expand Up @@ -772,6 +786,8 @@ static void unlock(transport *t) {
grpc_stream_op_buffer nuke_now;
const grpc_transport_callbacks *cb = t->cb;

GRPC_TIMER_MARK(HTTP2_UNLOCK_BEGIN, 0);

grpc_sopb_init(&nuke_now);
if (t->nuke_later_sopb.nops) {
grpc_sopb_swap(&nuke_now, &t->nuke_later_sopb);
Expand Down Expand Up @@ -820,6 +836,8 @@ static void unlock(transport *t) {
/* finally unlock */
gpr_mu_unlock(&t->mu);

GRPC_TIMER_MARK(HTTP2_UNLOCK_CLEANUP, 0);

/* perform some callbacks if necessary */
for (i = 0; i < num_goaways; i++) {
cb->goaway(t->cb_user_data, &t->base, goaways[i].status, goaways[i].debug);
Expand Down Expand Up @@ -850,6 +868,8 @@ static void unlock(transport *t) {
grpc_sopb_destroy(&nuke_now);

gpr_free(goaways);

GRPC_TIMER_MARK(HTTP2_UNLOCK_END, 0);
}

/*
Expand Down Expand Up @@ -896,6 +916,8 @@ static int prepare_write(transport *t) {
window_delta = grpc_chttp2_preencode(
s->outgoing_sopb->ops, &s->outgoing_sopb->nops,
GPR_MIN(t->outgoing_window, s->outgoing_window), &s->writing_sopb);
FLOWCTL_TRACE(t, t, outgoing, 0, -(gpr_int64)window_delta);
FLOWCTL_TRACE(t, s, outgoing, s->id, -(gpr_int64)window_delta);
t->outgoing_window -= window_delta;
s->outgoing_window -= window_delta;

Expand Down Expand Up @@ -924,6 +946,7 @@ static int prepare_write(transport *t) {
if (!s->read_closed && window_delta) {
gpr_slice_buffer_add(
&t->outbuf, grpc_chttp2_window_update_create(s->id, window_delta));
FLOWCTL_TRACE(t, s, incoming, s->id, window_delta);
s->incoming_window += window_delta;
}
}
Expand All @@ -933,6 +956,7 @@ static int prepare_write(transport *t) {
window_delta = t->connection_window_target - t->incoming_window;
gpr_slice_buffer_add(&t->outbuf,
grpc_chttp2_window_update_create(0, window_delta));
FLOWCTL_TRACE(t, t, incoming, 0, window_delta);
t->incoming_window += window_delta;
}

Expand Down Expand Up @@ -1259,6 +1283,8 @@ static grpc_chttp2_parse_error update_incoming_window(transport *t, stream *s) {
return GRPC_CHTTP2_CONNECTION_ERROR;
}

FLOWCTL_TRACE(t, t, incoming, 0, -(gpr_int64)t->incoming_frame_size);
FLOWCTL_TRACE(t, s, incoming, s->id, -(gpr_int64)t->incoming_frame_size);
t->incoming_window -= t->incoming_frame_size;
s->incoming_window -= t->incoming_frame_size;

Expand Down Expand Up @@ -1608,6 +1634,7 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) {
for (i = 0; i < t->stream_map.count; i++) {
stream *s = (stream *)(t->stream_map.values[i]);
int was_window_empty = s->outgoing_window <= 0;
FLOWCTL_TRACE(t, s, outgoing, s->id, st.initial_window_update);
s->outgoing_window += st.initial_window_update;
if (was_window_empty && s->outgoing_window > 0 && s->outgoing_sopb &&
s->outgoing_sopb->nops > 0) {
Expand All @@ -1626,6 +1653,7 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) {
GRPC_CHTTP2_FLOW_CONTROL_ERROR),
GRPC_CHTTP2_FLOW_CONTROL_ERROR, NULL, 1);
} else {
FLOWCTL_TRACE(t, s, outgoing, s->id, st.window_update);
s->outgoing_window += st.window_update;
/* if this window update makes outgoing ops writable again,
flag that */
Expand All @@ -1640,6 +1668,7 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) {
if (!is_window_update_legal(st.window_update, t->outgoing_window)) {
drop_connection(t);
} else {
FLOWCTL_TRACE(t, t, outgoing, 0, st.window_update);
t->outgoing_window += st.window_update;
}
}
Expand Down
1 change: 1 addition & 0 deletions src/core/transport/chttp2_transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include "src/core/transport/transport.h"

extern int grpc_http_trace;
extern int grpc_flowctl_trace;

void grpc_create_chttp2_transport(grpc_transport_setup_callback setup,
void *arg,
Expand Down

0 comments on commit 539af31

Please sign in to comment.