Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make grpc_tcp_listener private. #4680

Merged
merged 14 commits into from
Jan 25, 2016
Prev Previous commit
Next Next commit
Add shutdown_starting callbacks to tcp_server.
tcp_server_posix_test illustrates how this can be used to implement a
weak referencing mechanism.
  • Loading branch information
daniel-j-born committed Jan 15, 2016
commit 9c12bc252e7bb8cdfe051884862e17d4850f3ab0
7 changes: 7 additions & 0 deletions src/core/iomgr/tcp_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#ifndef GRPC_INTERNAL_CORE_IOMGR_TCP_SERVER_H
#define GRPC_INTERNAL_CORE_IOMGR_TCP_SERVER_H

#include "src/core/iomgr/closure.h"
#include "src/core/iomgr/endpoint.h"

/* Forward decl of grpc_tcp_server */
Expand Down Expand Up @@ -90,6 +91,12 @@ int grpc_tcp_server_port_fd(grpc_tcp_server *s, unsigned port_index,
/* Ref s and return s. */
grpc_tcp_server *grpc_tcp_server_ref(grpc_tcp_server *s);

/* shutdown_starting is called when ref count has reached zero and the server is
about to be destroyed. The server will be deleted after it returns. Calling
grpc_tcp_server_ref() from it has no effect. */
void grpc_tcp_server_shutdown_starting_add(grpc_tcp_server *s,
grpc_closure *shutdown_starting);

/* If the recount drops to zero, delete s, and call (exec_ctx==NULL) or enqueue
a call (exec_ctx!=NULL) to shutdown_complete. */
void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s);
Expand Down
20 changes: 19 additions & 1 deletion src/core/iomgr/tcp_server_posix.c
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ struct grpc_tcp_server {
grpc_tcp_listener *tail;
unsigned nports;

/* List of closures passed to shutdown_starting_add(). */
grpc_closure_list shutdown_starting;

/* shutdown callback */
grpc_closure *shutdown_complete;

Expand All @@ -144,6 +147,8 @@ grpc_tcp_server *grpc_tcp_server_create(grpc_closure *shutdown_complete) {
s->active_ports = 0;
s->destroyed_ports = 0;
s->shutdown = 0;
s->shutdown_starting.head = NULL;
s->shutdown_starting.tail = NULL;
s->shutdown_complete = shutdown_complete;
s->on_accept_cb = NULL;
s->on_accept_cb_arg = NULL;
Expand Down Expand Up @@ -585,13 +590,26 @@ grpc_tcp_server *grpc_tcp_server_ref(grpc_tcp_server *s) {
return s;
}

void grpc_tcp_server_shutdown_starting_add(grpc_tcp_server *s,
grpc_closure *shutdown_starting) {
gpr_mu_lock(&s->mu);
grpc_closure_list_add(&s->shutdown_starting, shutdown_starting, 1);
gpr_mu_unlock(&s->mu);
}

void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
if (gpr_unref(&s->refs)) {
/* Complete shutdown_starting work before destroying. */
grpc_exec_ctx local_exec_ctx = GRPC_EXEC_CTX_INIT;
gpr_mu_lock(&s->mu);
grpc_exec_ctx_enqueue_list(&local_exec_ctx, &s->shutdown_starting);
gpr_mu_unlock(&s->mu);
if (exec_ctx == NULL) {
grpc_exec_ctx local_exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_exec_ctx_flush(&local_exec_ctx);
tcp_server_destroy(&local_exec_ctx, s);
grpc_exec_ctx_finish(&local_exec_ctx);
} else {
grpc_exec_ctx_finish(&local_exec_ctx);
tcp_server_destroy(exec_ctx, s);
}
}
Expand Down
20 changes: 19 additions & 1 deletion src/core/iomgr/tcp_server_windows.c
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ struct grpc_tcp_server {
grpc_tcp_listener *head;
grpc_tcp_listener *tail;

/* List of closures passed to shutdown_starting_add(). */
grpc_closure_list shutdown_starting;

/* shutdown callback */
grpc_closure *shutdown_complete;
};
Expand All @@ -108,6 +111,8 @@ grpc_tcp_server *grpc_tcp_server_create(grpc_closure *shutdown_complete) {
s->on_accept_cb_arg = NULL;
s->head = NULL;
s->tail = NULL;
s->shutdown_starting.head = NULL;
s->shutdown_starting.tail = NULL;
s->shutdown_complete = shutdown_complete;
return s;
}
Expand Down Expand Up @@ -135,6 +140,13 @@ grpc_tcp_server *grpc_tcp_server_ref(grpc_tcp_server *s) {
return s;
}

void grpc_tcp_server_shutdown_starting_add(grpc_tcp_server *s,
grpc_closure *shutdown_starting) {
gpr_mu_lock(&s->mu);
grpc_closure_list_add(&s->shutdown_starting, shutdown_starting, 1);
gpr_mu_unlock(&s->mu);
}

static void tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
int immediately_done = 0;
grpc_tcp_listener *sp;
Expand All @@ -158,11 +170,17 @@ static void tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {

void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
if (gpr_unref(&s->refs)) {
/* Complete shutdown_starting work before destroying. */
grpc_exec_ctx local_exec_ctx = GRPC_EXEC_CTX_INIT;
gpr_mu_lock(&s->mu);
grpc_exec_ctx_enqueue_list(&local_exec_ctx, &s->shutdown_starting);
gpr_mu_unlock(&s->mu);
if (exec_ctx == NULL) {
grpc_exec_ctx local_exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_exec_ctx_flush(&local_exec_ctx);
tcp_server_destroy(&local_exec_ctx, s);
grpc_exec_ctx_finish(&local_exec_ctx);
} else {
grpc_exec_ctx_finish(&local_exec_ctx);
tcp_server_destroy(exec_ctx, s);
}
}
Expand Down
52 changes: 48 additions & 4 deletions test/core/iomgr/tcp_server_posix_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,23 +60,54 @@ typedef struct on_connect_result {
int server_fd;
} on_connect_result;

void on_connect_result_init(on_connect_result *result) {
typedef struct server_weak_ref {
grpc_tcp_server *server;

/* arg is this server_weak_ref. */
grpc_closure server_shutdown;
} server_weak_ref;

static on_connect_result g_result = {NULL, 0, 0, -1};

static void on_connect_result_init(on_connect_result *result) {
result->server = NULL;
result->port_index = 0;
result->fd_index = 0;
result->server_fd = -1;
}

void on_connect_result_set(on_connect_result *result,
const grpc_tcp_server_acceptor *acceptor) {
static void on_connect_result_set(on_connect_result *result,
const grpc_tcp_server_acceptor *acceptor) {
result->server = grpc_tcp_server_ref(acceptor->from_server);
result->port_index = acceptor->port_index;
result->fd_index = acceptor->fd_index;
result->server_fd = grpc_tcp_server_port_fd(
result->server, acceptor->port_index, acceptor->fd_index);
}

static on_connect_result g_result = {NULL, 0, 0, -1};

static void server_weak_ref_shutdown(grpc_exec_ctx *exec_ctx, void *arg,
int success) {
server_weak_ref *weak_ref = arg;
weak_ref->server = NULL;
}

static void server_weak_ref_init(server_weak_ref *weak_ref) {
weak_ref->server = NULL;
grpc_closure_init(&weak_ref->server_shutdown, server_weak_ref_shutdown,
weak_ref);
}

/* Make weak_ref->server_shutdown a shutdown_starting cb on server.
grpc_tcp_server promises that the server object will live until
weak_ref->server_shutdown has returned. A strong ref on grpc_tcp_server
should be held until server_weak_ref_set() returns to avoid a race where the
server is deleted before the shutdown_starting cb is added. */
static void server_weak_ref_set(server_weak_ref *weak_ref,
grpc_tcp_server *server) {
grpc_tcp_server_shutdown_starting_add(server, &weak_ref->server_shutdown);
weak_ref->server = server;
}

static void on_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp,
grpc_tcp_server_acceptor *acceptor) {
Expand Down Expand Up @@ -182,6 +213,8 @@ static void test_connect(unsigned n) {
grpc_tcp_server *s = grpc_tcp_server_create(NULL);
grpc_pollset *pollsets[1];
unsigned i;
server_weak_ref weak_ref;
server_weak_ref_init(&weak_ref);
LOG_TEST("test_connect");
gpr_log(GPR_INFO, "clients=%d", n);
memset(&addr, 0, sizeof(addr));
Expand Down Expand Up @@ -242,6 +275,9 @@ static void test_connect(unsigned n) {
GPR_ASSERT(result.port_index == 0);
GPR_ASSERT(result.fd_index < svr_fd_count);
GPR_ASSERT(result.server == s);
if (weak_ref.server == NULL) {
server_weak_ref_set(&weak_ref, result.server);
}
grpc_tcp_server_unref(&exec_ctx, result.server);

on_connect_result_init(&result);
Expand All @@ -256,7 +292,15 @@ static void test_connect(unsigned n) {
grpc_tcp_server_unref(&exec_ctx, result.server);
}

/* Weak ref to server valid until final unref. */
GPR_ASSERT(weak_ref.server != NULL);
GPR_ASSERT(grpc_tcp_server_port_fd(s, 0, 0) >= 0);

grpc_tcp_server_unref(&exec_ctx, s);

/* Weak ref lost. */
GPR_ASSERT(weak_ref.server == NULL);

grpc_exec_ctx_finish(&exec_ctx);
}

Expand Down