Skip to content

Commit

Permalink
Merge pull request grpc#10583 from ctiller/server_start
Browse files Browse the repository at this point in the history
Threading robustness
  • Loading branch information
ctiller authored Apr 14, 2017
2 parents b81fb79 + a103f7b commit 9575627
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 19 deletions.
4 changes: 1 addition & 3 deletions include/grpc++/impl/codegen/server_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,7 @@ class ServerInterface : public CallHook {
/// caller is required to keep all completion queues live until the server is
/// destroyed.
/// \param num_cqs How many completion queues does \a cqs hold.
///
/// \return true on a successful shutdown.
virtual bool Start(ServerCompletionQueue** cqs, size_t num_cqs) = 0;
virtual void Start(ServerCompletionQueue** cqs, size_t num_cqs) = 0;

virtual void ShutdownInternal(gpr_timespec deadline) = 0;

Expand Down
4 changes: 1 addition & 3 deletions include/grpc++/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,7 @@ class Server final : public ServerInterface, private GrpcLibraryCodegen {
/// caller is required to keep all completion queues live until the server is
/// destroyed.
/// \param num_cqs How many completion queues does \a cqs hold.
///
/// \return true on a successful shutdown.
bool Start(ServerCompletionQueue** cqs, size_t num_cqs) override;
void Start(ServerCompletionQueue** cqs, size_t num_cqs) override;

void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) override;

Expand Down
41 changes: 35 additions & 6 deletions src/core/lib/surface/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@

#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/connected_channel.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/support/stack_lockfree.h"
Expand Down Expand Up @@ -211,6 +212,11 @@ struct grpc_server {
gpr_mu mu_global; /* mutex for server and channel state */
gpr_mu mu_call; /* mutex for call-specific state */

/* startup synchronization: flag is protected by mu_global, signals whether
we are doing the listener start routine or not */
bool starting;
gpr_cv starting_cv;

registered_method *registered_methods;
/** one request matcher for unregistered methods */
request_matcher unregistered_request_matcher;
Expand Down Expand Up @@ -388,6 +394,7 @@ static void server_delete(grpc_exec_ctx *exec_ctx, grpc_server *server) {
grpc_channel_args_destroy(exec_ctx, server->channel_args);
gpr_mu_destroy(&server->mu_global);
gpr_mu_destroy(&server->mu_call);
gpr_cv_destroy(&server->starting_cv);
while ((rm = server->registered_methods) != NULL) {
server->registered_methods = rm->next;
if (server->started) {
Expand Down Expand Up @@ -1030,6 +1037,7 @@ grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) {

gpr_mu_init(&server->mu_global);
gpr_mu_init(&server->mu_call);
gpr_cv_init(&server->starting_cv);

/* decremented by grpc_server_destroy */
gpr_ref_init(&server->internal_refcount, 1);
Expand Down Expand Up @@ -1086,8 +1094,22 @@ void *grpc_server_register_method(
return m;
}

static void start_listeners(grpc_exec_ctx *exec_ctx, void *s,
grpc_error *error) {
grpc_server *server = s;
for (listener *l = server->listeners; l; l = l->next) {
l->start(exec_ctx, server, l->arg, server->pollsets, server->pollset_count);
}

gpr_mu_lock(&server->mu_global);
server->starting = false;
gpr_cv_signal(&server->starting_cv);
gpr_mu_unlock(&server->mu_global);

server_unref(exec_ctx, server);
}

void grpc_server_start(grpc_server *server) {
listener *l;
size_t i;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;

Expand Down Expand Up @@ -1121,10 +1143,11 @@ void grpc_server_start(grpc_server *server) {
(size_t)server->max_requested_calls_per_cq, server);
}

for (l = server->listeners; l; l = l->next) {
l->start(&exec_ctx, server, l->arg, server->pollsets,
server->pollset_count);
}
server_ref(server);
server->starting = true;
grpc_closure_sched(&exec_ctx, grpc_closure_create(start_listeners, server,
grpc_executor_scheduler),
GRPC_ERROR_NONE);

grpc_exec_ctx_finish(&exec_ctx);
}
Expand Down Expand Up @@ -1258,8 +1281,14 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
GRPC_API_TRACE("grpc_server_shutdown_and_notify(server=%p, cq=%p, tag=%p)", 3,
(server, cq, tag));

/* lock, and gather up some stuff to do */
/* wait for startup to be finished: locks mu_global */
gpr_mu_lock(&server->mu_global);
while (server->starting) {
gpr_cv_wait(&server->starting_cv, &server->mu_global,
gpr_inf_future(GPR_CLOCK_REALTIME));
}

/* stay locked, and gather up some stuff to do */
grpc_cq_begin_op(cq, tag);
if (server->shutdown_published) {
grpc_cq_end_op(&exec_ctx, cq, tag, GRPC_ERROR_NONE, done_published_shutdown,
Expand Down
5 changes: 1 addition & 4 deletions src/cpp/server/server_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -337,10 +337,7 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
}

auto cqs_data = cqs_.empty() ? nullptr : &cqs_[0];
if (!server->Start(cqs_data, cqs_.size())) {
if (added_port) server->Shutdown();
return nullptr;
}
server->Start(cqs_data, cqs_.size());

for (auto plugin = plugins_.begin(); plugin != plugins_.end(); plugin++) {
(*plugin)->Finish(initializer);
Expand Down
4 changes: 1 addition & 3 deletions src/cpp/server/server_cc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ int Server::AddListeningPort(const grpc::string& addr,
return port;
}

bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
void Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
GPR_ASSERT(!started_);
global_callbacks_->PreServerStart(this);
started_ = true;
Expand Down Expand Up @@ -543,8 +543,6 @@ bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
(*it)->Start();
}

return true;
}

void Server::ShutdownInternal(gpr_timespec deadline) {
Expand Down

0 comments on commit 9575627

Please sign in to comment.