Skip to content

Commit

Permalink
Merge pull request grpc#10440 from danzh2010/udpserver
Browse files Browse the repository at this point in the history
Modify  udp_server to postpone shutdown_fd() during shutdown.
  • Loading branch information
yang-g authored Apr 14, 2017
2 parents 940b2bd + 2d38b05 commit 52ff44f
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 17 deletions.
38 changes: 29 additions & 9 deletions src/core/lib/iomgr/udp_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,15 @@ struct grpc_udp_listener {
grpc_resolved_address addr;
grpc_closure read_closure;
grpc_closure write_closure;
// To be called when corresponding QuicGrpcServer closes all active
// connections.
grpc_closure orphan_fd_closure;
grpc_closure destroyed_closure;
grpc_udp_server_read_cb read_cb;
grpc_udp_server_write_cb write_cb;
grpc_udp_server_orphan_cb orphan_cb;
// True if orphan_cb is trigered.
bool orphan_notified;

struct grpc_udp_listener *next;
};
Expand Down Expand Up @@ -146,6 +151,14 @@ grpc_udp_server *grpc_udp_server_create(const grpc_channel_args *args) {
return s;
}

static void shutdown_fd(grpc_exec_ctx *exec_ctx, void *fd, grpc_error *error) {
grpc_fd_shutdown(exec_ctx, (grpc_fd *)fd, GRPC_ERROR_REF(error));
}

static void dummy_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
// No-op.
}

static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) {
if (s->shutdown_complete != NULL) {
grpc_closure_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE);
Expand Down Expand Up @@ -195,12 +208,16 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) {

grpc_closure_init(&sp->destroyed_closure, destroyed_port, s,
grpc_schedule_on_exec_ctx);

/* Call the orphan_cb to signal that the FD is about to be closed and
* should no longer be used. */
GPR_ASSERT(sp->orphan_cb);
sp->orphan_cb(exec_ctx, sp->emfd, sp->server->user_data);

if (!sp->orphan_notified) {
/* Call the orphan_cb to signal that the FD is about to be closed and
* should no longer be used. Because at this point, all listening ports
* have been shutdown already, no need to shutdown again.*/
grpc_closure_init(&sp->orphan_fd_closure, dummy_cb, sp->emfd,
grpc_schedule_on_exec_ctx);
GPR_ASSERT(sp->orphan_cb);
sp->orphan_cb(exec_ctx, sp->emfd, &sp->orphan_fd_closure,
sp->server->user_data);
}
grpc_fd_orphan(exec_ctx, sp->emfd, &sp->destroyed_closure, NULL,
"udp_listener_shutdown");
}
Expand All @@ -225,9 +242,11 @@ void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *s,
if (s->active_ports) {
for (sp = s->head; sp; sp = sp->next) {
GPR_ASSERT(sp->orphan_cb);
sp->orphan_cb(exec_ctx, sp->emfd, sp->server->user_data);
grpc_fd_shutdown(exec_ctx, sp->emfd, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Server destroyed"));
grpc_closure_init(&sp->orphan_fd_closure, shutdown_fd, sp->emfd,
grpc_schedule_on_exec_ctx);
sp->orphan_cb(exec_ctx, sp->emfd, &sp->orphan_fd_closure,
sp->server->user_data);
sp->orphan_notified = true;
}
gpr_mu_unlock(&s->mu);
} else {
Expand Down Expand Up @@ -391,6 +410,7 @@ static int add_socket_to_server(grpc_udp_server *s, int fd,
sp->read_cb = read_cb;
sp->write_cb = write_cb;
sp->orphan_cb = orphan_cb;
sp->orphan_notified = false;
GPR_ASSERT(sp->emfd);
gpr_mu_unlock(&s->mu);
gpr_free(name);
Expand Down
4 changes: 3 additions & 1 deletion src/core/lib/iomgr/udp_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ typedef void (*grpc_udp_server_write_cb)(grpc_exec_ctx *exec_ctx, grpc_fd *emfd,

/* Called when the grpc_fd is about to be orphaned (and the FD closed). */
typedef void (*grpc_udp_server_orphan_cb)(grpc_exec_ctx *exec_ctx,
grpc_fd *emfd, void *user_data);
grpc_fd *emfd,
grpc_closure *shutdown_fd_callback,
void *user_data);

/* Create a server, initially not bound to any ports */
grpc_udp_server *grpc_udp_server_create(const grpc_channel_args *args);
Expand Down
14 changes: 7 additions & 7 deletions test/core/iomgr/udp_server_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ static void on_write(grpc_exec_ctx *exec_ctx, grpc_fd *emfd, void *user_data) {
}

static void on_fd_orphaned(grpc_exec_ctx *exec_ctx, grpc_fd *emfd,
void *user_data) {
grpc_closure *closure, void *user_data) {
gpr_log(GPR_INFO, "gRPC FD about to be orphaned: %d",
grpc_fd_wrapped_fd(emfd));
g_number_of_orphan_calls++;
Expand Down Expand Up @@ -228,9 +228,9 @@ static void test_no_op_with_port_and_start(void) {
grpc_udp_server_destroy(&exec_ctx, s, NULL);
grpc_exec_ctx_finish(&exec_ctx);

/* The server had a single FD, which is orphaned once in *
* deactivated_all_ports, and once in grpc_udp_server_destroy. */
GPR_ASSERT(g_number_of_orphan_calls == 2);
/* The server had a single FD, which is orphaned exactly once in *
* grpc_udp_server_destroy. */
GPR_ASSERT(g_number_of_orphan_calls == 1);
}

static void test_receive(int number_of_clients) {
Expand Down Expand Up @@ -297,9 +297,9 @@ static void test_receive(int number_of_clients) {
grpc_udp_server_destroy(&exec_ctx, s, NULL);
grpc_exec_ctx_finish(&exec_ctx);

/* The server had a single FD, which is orphaned once in *
* deactivated_all_ports, and once in grpc_udp_server_destroy. */
GPR_ASSERT(g_number_of_orphan_calls == 2);
/* The server had a single FD, which is orphaned exactly once in *
* grpc_udp_server_destroy. */
GPR_ASSERT(g_number_of_orphan_calls == 1);

/* The write callback should have fired a few times. */
GPR_ASSERT(g_number_of_writes > 0);
Expand Down

0 comments on commit 52ff44f

Please sign in to comment.