Skip to content

Commit

Permalink
Merge pull request grpc#7640 from markdroth/handshaker_api
Browse files Browse the repository at this point in the history
Change handshaker API to support passing leftover bytes read between handshakers.
  • Loading branch information
kpayson64 authored Aug 15, 2016
2 parents 8d46873 + fddb3d3 commit 9c9d577
Show file tree
Hide file tree
Showing 22 changed files with 131 additions and 83 deletions.
19 changes: 13 additions & 6 deletions src/core/ext/transport/chttp2/client/insecure/channel_create.c
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,21 @@ static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg,
}

static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint,
grpc_channel_args *args, void *user_data,
grpc_channel_args *args,
gpr_slice_buffer *read_buffer, void *user_data,
grpc_error *error) {
connector *c = user_data;
c->result->transport =
grpc_create_chttp2_transport(exec_ctx, args, endpoint, 1);
GPR_ASSERT(c->result->transport);
grpc_chttp2_transport_start_reading(exec_ctx, c->result->transport, NULL, 0);
c->result->channel_args = args;
if (error != GRPC_ERROR_NONE) {
grpc_channel_args_destroy(args);
gpr_free(read_buffer);
} else {
c->result->transport =
grpc_create_chttp2_transport(exec_ctx, args, endpoint, 1);
GPR_ASSERT(c->result->transport);
grpc_chttp2_transport_start_reading(exec_ctx, c->result->transport,
read_buffer);
c->result->channel_args = args;
}
grpc_closure *notify = c->notify;
c->notify = NULL;
grpc_exec_ctx_sched(exec_ctx, notify, error, NULL);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ grpc_channel *grpc_insecure_channel_create_from_fd(
grpc_channel *channel = grpc_channel_create(
&exec_ctx, target, final_args, GRPC_CLIENT_DIRECT_CHANNEL, transport);
grpc_channel_args_destroy(final_args);
grpc_chttp2_transport_start_reading(&exec_ctx, transport, NULL, 0);
grpc_chttp2_transport_start_reading(&exec_ctx, transport, NULL);

grpc_exec_ctx_finish(&exec_ctx);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,7 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
gpr_mu_unlock(&c->mu);
c->result->transport = grpc_create_chttp2_transport(
exec_ctx, c->args.channel_args, secure_endpoint, 1);
grpc_chttp2_transport_start_reading(exec_ctx, c->result->transport, NULL,
0);
grpc_chttp2_transport_start_reading(exec_ctx, c->result->transport, NULL);
auth_context_arg = grpc_auth_context_to_arg(auth_context);
c->result->channel_args =
grpc_channel_args_copy_and_add(c->tmp_args, &auth_context_arg, 1);
Expand All @@ -126,21 +125,23 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
}

static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint,
grpc_channel_args *args, void *user_data,
grpc_channel_args *args,
gpr_slice_buffer *read_buffer, void *user_data,
grpc_error *error) {
connector *c = user_data;
c->tmp_args = args;
if (error != GRPC_ERROR_NONE) {
gpr_free(read_buffer);
grpc_closure *notify = c->notify;
c->notify = NULL;
grpc_exec_ctx_sched(exec_ctx, notify, error, NULL);
} else {
// TODO(roth, jboeuf): Convert security connector handshaking to use new
// handshake API, and then move the code from on_secure_handshake_done()
// into this function.
c->tmp_args = args;
grpc_channel_security_connector_do_handshake(
exec_ctx, c->security_connector, endpoint, c->args.deadline,
on_secure_handshake_done, c);
exec_ctx, c->security_connector, endpoint, read_buffer,
c->args.deadline, on_secure_handshake_done, c);
}
}

Expand Down
6 changes: 4 additions & 2 deletions src/core/ext/transport/chttp2/server/insecure/server_chttp2.c
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ typedef struct server_connect_state {
} server_connect_state;

static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint,
grpc_channel_args *args, void *user_data,
grpc_channel_args *args,
gpr_slice_buffer *read_buffer, void *user_data,
grpc_error *error) {
server_connect_state *state = user_data;
if (error != GRPC_ERROR_NONE) {
Expand All @@ -64,6 +65,7 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint,
grpc_error_free_string(error_str);
GRPC_ERROR_UNREF(error);
grpc_handshake_manager_shutdown(exec_ctx, state->handshake_mgr);
gpr_free(read_buffer);
} else {
// Beware that the call to grpc_create_chttp2_transport() has to happen
// before grpc_tcp_server_destroy(). This is fine here, but similar code
Expand All @@ -75,7 +77,7 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint,
grpc_server_setup_transport(exec_ctx, state->server, transport,
state->accepting_pollset,
grpc_server_get_channel_args(state->server));
grpc_chttp2_transport_start_reading(exec_ctx, transport, NULL, 0);
grpc_chttp2_transport_start_reading(exec_ctx, transport, read_buffer);
}
// Clean up.
grpc_channel_args_destroy(args);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ void grpc_server_add_insecure_channel_from_fd(grpc_server *server,
&exec_ctx, server_args, server_endpoint, 0 /* is_client */);
grpc_endpoint_add_to_pollset(&exec_ctx, server_endpoint, grpc_cq_pollset(cq));
grpc_server_setup_transport(&exec_ctx, server, transport, NULL, server_args);
grpc_chttp2_transport_start_reading(&exec_ctx, transport, NULL, 0);
grpc_chttp2_transport_start_reading(&exec_ctx, transport, NULL);
grpc_exec_ctx_finish(&exec_ctx);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *statep,
grpc_server_setup_transport(exec_ctx, state->state->server, transport,
state->accepting_pollset, args_copy);
grpc_channel_args_destroy(args_copy);
grpc_chttp2_transport_start_reading(exec_ctx, transport, NULL, 0);
grpc_chttp2_transport_start_reading(exec_ctx, transport, NULL);
} else {
/* We need to consume this here, because the server may already have
* gone away. */
Expand All @@ -128,17 +128,19 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *statep,
}

static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint,
grpc_channel_args *args, void *user_data,
grpc_channel_args *args,
gpr_slice_buffer *read_buffer, void *user_data,
grpc_error *error) {
server_secure_connect *state = user_data;
if (error != GRPC_ERROR_NONE) {
const char *error_str = grpc_error_string(error);
gpr_log(GPR_ERROR, "Handshaking failed: %s", error_str);
grpc_error_free_string(error_str);
GRPC_ERROR_UNREF(error);
grpc_channel_args_destroy(args);
gpr_free(read_buffer);
grpc_handshake_manager_shutdown(exec_ctx, state->handshake_mgr);
grpc_handshake_manager_destroy(exec_ctx, state->handshake_mgr);
grpc_channel_args_destroy(args);
state_unref(state->state);
gpr_free(state);
return;
Expand All @@ -150,8 +152,8 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint,
// into this function.
state->args = args;
grpc_server_security_connector_do_handshake(
exec_ctx, state->state->sc, state->acceptor, endpoint, state->deadline,
on_secure_handshake_done, state);
exec_ctx, state->state->sc, state->acceptor, endpoint, read_buffer,
state->deadline, on_secure_handshake_done, state);
}

static void on_accept(grpc_exec_ctx *exec_ctx, void *statep, grpc_endpoint *tcp,
Expand Down
7 changes: 5 additions & 2 deletions src/core/ext/transport/chttp2/transport/chttp2_transport.c
Original file line number Diff line number Diff line change
Expand Up @@ -2546,9 +2546,12 @@ grpc_transport *grpc_create_chttp2_transport(

void grpc_chttp2_transport_start_reading(grpc_exec_ctx *exec_ctx,
grpc_transport *transport,
gpr_slice *slices, size_t nslices) {
gpr_slice_buffer *read_buffer) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)transport;
REF_TRANSPORT(t, "reading_action"); /* matches unref inside reading_action */
gpr_slice_buffer_addn(&t->read_buffer, slices, nslices);
if (read_buffer != NULL) {
gpr_slice_buffer_move_into(read_buffer, &t->read_buffer);
gpr_free(read_buffer);
}
reading_action(exec_ctx, t, GRPC_ERROR_NONE);
}
4 changes: 3 additions & 1 deletion src/core/ext/transport/chttp2/transport/chttp2_transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,10 @@ grpc_transport *grpc_create_chttp2_transport(
grpc_exec_ctx *exec_ctx, const grpc_channel_args *channel_args,
grpc_endpoint *ep, int is_client);

/// Takes ownership of \a read_buffer, which (if non-NULL) contains
/// leftover bytes previously read from the endpoint (e.g., by handshakers).
void grpc_chttp2_transport_start_reading(grpc_exec_ctx *exec_ctx,
grpc_transport *transport,
gpr_slice *slices, size_t nslices);
gpr_slice_buffer *read_buffer);

#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_CHTTP2_TRANSPORT_H */
24 changes: 15 additions & 9 deletions src/core/lib/channel/handshaker.c
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,13 @@ void grpc_handshaker_do_handshake(grpc_exec_ctx* exec_ctx,
grpc_handshaker* handshaker,
grpc_endpoint* endpoint,
grpc_channel_args* args,
gpr_slice_buffer* read_buffer,
gpr_timespec deadline,
grpc_tcp_server_acceptor* acceptor,
grpc_handshaker_done_cb cb, void* user_data) {
handshaker->vtable->do_handshake(exec_ctx, handshaker, endpoint, args,
deadline, acceptor, cb, user_data);
read_buffer, deadline, acceptor, cb,
user_data);
}

//
Expand Down Expand Up @@ -143,16 +145,17 @@ void grpc_handshake_manager_shutdown(grpc_exec_ctx* exec_ctx,
// handshakers together.
static void call_next_handshaker(grpc_exec_ctx* exec_ctx,
grpc_endpoint* endpoint,
grpc_channel_args* args, void* user_data,
grpc_channel_args* args,
gpr_slice_buffer* read_buffer, void* user_data,
grpc_error* error) {
grpc_handshake_manager* mgr = user_data;
GPR_ASSERT(mgr->state != NULL);
GPR_ASSERT(mgr->state->index < mgr->count);
// If we got an error, skip all remaining handshakers and invoke the
// caller-supplied callback immediately.
if (error != GRPC_ERROR_NONE) {
mgr->state->final_cb(exec_ctx, endpoint, args, mgr->state->final_user_data,
error);
mgr->state->final_cb(exec_ctx, endpoint, args, read_buffer,
mgr->state->final_user_data, error);
return;
}
grpc_handshaker_done_cb cb = call_next_handshaker;
Expand All @@ -163,9 +166,9 @@ static void call_next_handshaker(grpc_exec_ctx* exec_ctx,
user_data = mgr->state->final_user_data;
}
// Invoke handshaker.
grpc_handshaker_do_handshake(exec_ctx, mgr->handshakers[mgr->state->index],
endpoint, args, mgr->state->deadline,
mgr->state->acceptor, cb, user_data);
grpc_handshaker_do_handshake(
exec_ctx, mgr->handshakers[mgr->state->index], endpoint, args,
read_buffer, mgr->state->deadline, mgr->state->acceptor, cb, user_data);
++mgr->state->index;
// If this is the last handshaker, clean up state.
if (mgr->state->index == mgr->count) {
Expand All @@ -180,10 +183,12 @@ void grpc_handshake_manager_do_handshake(
gpr_timespec deadline, grpc_tcp_server_acceptor* acceptor,
grpc_handshaker_done_cb cb, void* user_data) {
grpc_channel_args* args_copy = grpc_channel_args_copy(args);
gpr_slice_buffer* read_buffer = malloc(sizeof(*read_buffer));
gpr_slice_buffer_init(read_buffer);
if (mgr->count == 0) {
// No handshakers registered, so we just immediately call the done
// callback with the passed-in endpoint.
cb(exec_ctx, endpoint, args_copy, user_data, GRPC_ERROR_NONE);
cb(exec_ctx, endpoint, args_copy, read_buffer, user_data, GRPC_ERROR_NONE);
} else {
GPR_ASSERT(mgr->state == NULL);
mgr->state = gpr_malloc(sizeof(struct grpc_handshaker_state));
Expand All @@ -192,6 +197,7 @@ void grpc_handshake_manager_do_handshake(
mgr->state->acceptor = acceptor;
mgr->state->final_cb = cb;
mgr->state->final_user_data = user_data;
call_next_handshaker(exec_ctx, endpoint, args_copy, mgr, GRPC_ERROR_NONE);
call_next_handshaker(exec_ctx, endpoint, args_copy, read_buffer, mgr,
GRPC_ERROR_NONE);
}
}
9 changes: 7 additions & 2 deletions src/core/lib/channel/handshaker.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

#include <grpc/impl/codegen/grpc_types.h>
#include <grpc/impl/codegen/time.h>
#include <grpc/support/slice_buffer.h>

#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/endpoint.h"
Expand All @@ -56,10 +57,11 @@
typedef struct grpc_handshaker grpc_handshaker;

/// Callback type invoked when a handshaker is done.
/// Takes ownership of \a args.
/// Takes ownership of \a args and \a read_buffer.
typedef void (*grpc_handshaker_done_cb)(grpc_exec_ctx* exec_ctx,
grpc_endpoint* endpoint,
grpc_channel_args* args,
gpr_slice_buffer* read_buffer,
void* user_data, grpc_error* error);

struct grpc_handshaker_vtable {
Expand All @@ -72,10 +74,12 @@ struct grpc_handshaker_vtable {

/// Performs handshaking. When finished, calls \a cb with \a user_data.
/// Takes ownership of \a args.
/// Takes ownership of \a read_buffer, which contains leftover bytes read
/// from the endpoint by the previous handshaker.
/// \a acceptor will be NULL for client-side handshakers.
void (*do_handshake)(grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker,
grpc_endpoint* endpoint, grpc_channel_args* args,
gpr_timespec deadline,
gpr_slice_buffer* read_buffer, gpr_timespec deadline,
grpc_tcp_server_acceptor* acceptor,
grpc_handshaker_done_cb cb, void* user_data);
};
Expand All @@ -101,6 +105,7 @@ void grpc_handshaker_do_handshake(grpc_exec_ctx* exec_ctx,
grpc_handshaker* handshaker,
grpc_endpoint* endpoint,
grpc_channel_args* args,
gpr_slice_buffer* read_buffer,
gpr_timespec deadline,
grpc_tcp_server_acceptor* acceptor,
grpc_handshaker_done_cb cb, void* user_data);
Expand Down
8 changes: 6 additions & 2 deletions src/core/lib/http/httpcli_security_connector.c
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ static void httpcli_ssl_destroy(grpc_security_connector *sc) {
static void httpcli_ssl_do_handshake(grpc_exec_ctx *exec_ctx,
grpc_channel_security_connector *sc,
grpc_endpoint *nonsecure_endpoint,
gpr_slice_buffer *read_buffer,
gpr_timespec deadline,
grpc_security_handshake_done_cb cb,
void *user_data) {
Expand All @@ -69,6 +70,7 @@ static void httpcli_ssl_do_handshake(grpc_exec_ctx *exec_ctx,
tsi_result result = TSI_OK;
tsi_handshaker *handshaker;
if (c->handshaker_factory == NULL) {
gpr_free(read_buffer);
cb(exec_ctx, user_data, GRPC_SECURITY_ERROR, NULL, NULL);
return;
}
Expand All @@ -77,10 +79,12 @@ static void httpcli_ssl_do_handshake(grpc_exec_ctx *exec_ctx,
if (result != TSI_OK) {
gpr_log(GPR_ERROR, "Handshaker creation failed with error %s.",
tsi_result_to_string(result));
gpr_free(read_buffer);
cb(exec_ctx, user_data, GRPC_SECURITY_ERROR, NULL, NULL);
} else {
grpc_do_security_handshake(exec_ctx, handshaker, &sc->base, true,
nonsecure_endpoint, deadline, cb, user_data);
nonsecure_endpoint, read_buffer, deadline, cb,
user_data);
}
}

Expand Down Expand Up @@ -183,7 +187,7 @@ static void ssl_handshake(grpc_exec_ctx *exec_ctx, void *arg,
pem_root_certs, pem_root_certs_size, host, &sc) ==
GRPC_SECURITY_OK);
grpc_channel_security_connector_do_handshake(
exec_ctx, sc, tcp, deadline, on_secure_transport_setup_done, c);
exec_ctx, sc, tcp, NULL, deadline, on_secure_transport_setup_done, c);
GRPC_SECURITY_CONNECTOR_UNREF(&sc->base, "httpcli");
}

Expand Down
9 changes: 7 additions & 2 deletions src/core/lib/security/transport/handshake.c
Original file line number Diff line number Diff line change
Expand Up @@ -325,8 +325,9 @@ static void on_timeout(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
void grpc_do_security_handshake(
grpc_exec_ctx *exec_ctx, tsi_handshaker *handshaker,
grpc_security_connector *connector, bool is_client_side,
grpc_endpoint *nonsecure_endpoint, gpr_timespec deadline,
grpc_security_handshake_done_cb cb, void *user_data) {
grpc_endpoint *nonsecure_endpoint, gpr_slice_buffer *read_buffer,
gpr_timespec deadline, grpc_security_handshake_done_cb cb,
void *user_data) {
grpc_security_connector_handshake_list *handshake_node;
grpc_security_handshake *h = gpr_malloc(sizeof(grpc_security_handshake));
memset(h, 0, sizeof(grpc_security_handshake));
Expand All @@ -346,6 +347,10 @@ void grpc_do_security_handshake(
gpr_slice_buffer_init(&h->left_overs);
gpr_slice_buffer_init(&h->outgoing);
gpr_slice_buffer_init(&h->incoming);
if (read_buffer != NULL) {
gpr_slice_buffer_move_into(read_buffer, &h->incoming);
gpr_free(read_buffer);
}
if (!is_client_side) {
grpc_server_security_connector *server_connector =
(grpc_server_security_connector *)connector;
Expand Down
7 changes: 4 additions & 3 deletions src/core/lib/security/transport/handshake.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,13 @@
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/security/transport/security_connector.h"

/* Calls the callback upon completion. Takes owership of handshaker. */
/* Calls the callback upon completion. Takes owership of handshaker and
* read_buffer. */
void grpc_do_security_handshake(
grpc_exec_ctx *exec_ctx, tsi_handshaker *handshaker,
grpc_security_connector *connector, bool is_client_side,
grpc_endpoint *nonsecure_endpoint, gpr_timespec deadline,
grpc_security_handshake_done_cb cb, void *user_data);
grpc_endpoint *nonsecure_endpoint, gpr_slice_buffer *read_buffer,
gpr_timespec deadline, grpc_security_handshake_done_cb cb, void *user_data);

void grpc_security_handshake_shutdown(grpc_exec_ctx *exec_ctx, void *handshake);

Expand Down
Loading

0 comments on commit 9c9d577

Please sign in to comment.