Skip to content

Commit

Permalink
Fix shutdown semantics.
Browse files Browse the repository at this point in the history
Document what they should be, ensure they're triggered, and fix what was broken.
  • Loading branch information
ctiller committed Feb 18, 2015
1 parent e1b97b6 commit aea2fc0
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 16 deletions.
10 changes: 7 additions & 3 deletions include/grpc/grpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -564,15 +564,19 @@ void grpc_server_start(grpc_server *server);

/* Begin shutting down a server.
After completion, no new calls or connections will be admitted.
Existing calls will be allowed to complete. */
Existing calls will be allowed to complete.
Shutdown is idempotent. */
void grpc_server_shutdown(grpc_server *server);

/* As per grpc_server_shutdown, but send a GRPC_SERVER_SHUTDOWN event when
there are no more calls being serviced. */
there are no more calls being serviced.
Shutdown is idempotent, and all tags will be notified at once if multiple
grpc_server_shutdown_and_notify calls are made. */
void grpc_server_shutdown_and_notify(grpc_server *server, void *tag);

/* Destroy a server.
Forcefully cancels all existing calls. */
Forcefully cancels all existing calls.
Implies grpc_server_shutdown() if one was not previously performed. */
void grpc_server_destroy(grpc_server *server);

#ifdef __cplusplus
Expand Down
37 changes: 25 additions & 12 deletions src/core/surface/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ struct grpc_server {
size_t requested_call_capacity;

gpr_uint8 shutdown;
gpr_uint8 have_shutdown_tag;
void *shutdown_tag;
size_t num_shutdown_tags;
void **shutdown_tags;

call_data *lists[CALL_LIST_COUNT];
channel_data root_channel_data;
Expand Down Expand Up @@ -206,6 +206,7 @@ static void server_unref(grpc_server *server) {
gpr_mu_destroy(&server->mu);
gpr_free(server->channel_filters);
gpr_free(server->requested_calls);
gpr_free(server->shutdown_tags);
gpr_free(server);
}
}
Expand Down Expand Up @@ -407,15 +408,17 @@ static void init_call_elem(grpc_call_element *elem,
static void destroy_call_elem(grpc_call_element *elem) {
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
int i;
size_t i;

gpr_mu_lock(&chand->server->mu);
for (i = 0; i < CALL_LIST_COUNT; i++) {
call_list_remove(chand->server, elem->call_data, i);
}
if (chand->server->shutdown && chand->server->have_shutdown_tag &&
chand->server->lists[ALL_CALLS] == NULL) {
grpc_cq_end_server_shutdown(chand->server->cq, chand->server->shutdown_tag);
if (chand->server->shutdown && chand->server->lists[ALL_CALLS] == NULL) {
for (i = 0; i < chand->server->num_shutdown_tags; i++) {
grpc_cq_end_server_shutdown(chand->server->cq,
chand->server->shutdown_tags[i]);
}
}
gpr_mu_unlock(&chand->server->mu);

Expand Down Expand Up @@ -572,6 +575,13 @@ void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag,

/* lock, and gather up some stuff to do */
gpr_mu_lock(&server->mu);
if (have_shutdown_tag) {
grpc_cq_begin_op(server->cq, NULL, GRPC_SERVER_SHUTDOWN);
server->shutdown_tags =
gpr_realloc(server->shutdown_tags,
sizeof(void *) * (server->num_shutdown_tags + 1));
server->shutdown_tags[server->num_shutdown_tags++] = shutdown_tag;
}
if (server->shutdown) {
gpr_mu_unlock(&server->mu);
return;
Expand All @@ -597,12 +607,9 @@ void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag,
server->requested_call_count = 0;

server->shutdown = 1;
server->have_shutdown_tag = have_shutdown_tag;
server->shutdown_tag = shutdown_tag;
if (have_shutdown_tag) {
grpc_cq_begin_op(server->cq, NULL, GRPC_SERVER_SHUTDOWN);
if (server->lists[ALL_CALLS] == NULL) {
grpc_cq_end_server_shutdown(server->cq, shutdown_tag);
if (server->lists[ALL_CALLS] == NULL) {
for (i = 0; i < server->num_shutdown_tags; i++) {
grpc_cq_end_server_shutdown(server->cq, server->shutdown_tags[i]);
}
}
gpr_mu_unlock(&server->mu);
Expand Down Expand Up @@ -653,6 +660,12 @@ void grpc_server_shutdown_and_notify(grpc_server *server, void *tag) {
void grpc_server_destroy(grpc_server *server) {
channel_data *c;
gpr_mu_lock(&server->mu);
if (!server->shutdown) {
gpr_mu_unlock(&server->mu);
grpc_server_shutdown(server);
gpr_mu_lock(&server->mu);
}

for (c = server->root_channel_data.next; c != &server->root_channel_data;
c = c->next) {
shutdown_channel(c);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ static void drain_cq(grpc_completion_queue *cq) {

static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return;
grpc_server_shutdown(f->server);
/* don't shutdown, just destroy, to tickle this code edge */
grpc_server_destroy(f->server);
f->server = NULL;
}
Expand Down

0 comments on commit aea2fc0

Please sign in to comment.