Skip to content

Commit

Permalink
Merge github.com:grpc/grpc into propagate
Browse files Browse the repository at this point in the history
  • Loading branch information
ctiller committed Aug 5, 2015
2 parents 9974078 + 183c9f7 commit a600d58
Show file tree
Hide file tree
Showing 19 changed files with 124 additions and 20 deletions.
10 changes: 10 additions & 0 deletions include/grpc++/server_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,14 @@ class ServerContext {

const struct census_context* census_context() const;

// Async only. Has to be called before the rpc starts.
// Returns the tag in completion queue when the rpc finishes.
// IsCancelled() can then be called to check whether the rpc was cancelled.
void AsyncNotifyWhenDone(void* tag) {
has_notify_when_done_tag_ = true;
async_notify_when_done_tag_ = tag;
}

private:
friend class ::grpc::testing::InteropContextInspector;
friend class ::grpc::Server;
Expand Down Expand Up @@ -165,6 +173,8 @@ class ServerContext {
void set_call(grpc_call* call);

CompletionOp* completion_op_;
bool has_notify_when_done_tag_;
void* async_notify_when_done_tag_;

gpr_timespec deadline_;
grpc_call* call_;
Expand Down
2 changes: 1 addition & 1 deletion include/grpc/grpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,7 @@ void grpc_server_register_completion_queue(grpc_server *server,
/** Add a HTTP2 over plaintext over tcp listener.
Returns bound port number on success, 0 on failure.
REQUIRES: server not started */
int grpc_server_add_http2_port(grpc_server *server, const char *addr);
int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr);

/** Start a server - tells all listeners to start listening */
void grpc_server_start(grpc_server *server);
Expand Down
2 changes: 1 addition & 1 deletion src/core/surface/server_chttp2.c
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ static void destroy(grpc_server *server, void *tcpp) {
grpc_tcp_server_destroy(tcp, grpc_server_listener_destroy_done, server);
}

int grpc_server_add_http2_port(grpc_server *server, const char *addr) {
int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) {
grpc_resolved_addresses *resolved = NULL;
grpc_tcp_server *tcp = NULL;
size_t i;
Expand Down
2 changes: 1 addition & 1 deletion src/cpp/server/insecure_server_credentials.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class InsecureServerCredentialsImpl GRPC_FINAL : public ServerCredentials {
public:
int AddPortToServer(const grpc::string& addr,
grpc_server* server) GRPC_OVERRIDE {
return grpc_server_add_http2_port(server, addr.c_str());
return grpc_server_add_insecure_http2_port(server, addr.c_str());
}
};
} // namespace
Expand Down
23 changes: 21 additions & 2 deletions src/cpp/server/server_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,23 @@ namespace grpc {
class ServerContext::CompletionOp GRPC_FINAL : public CallOpSetInterface {
public:
// initial refs: one in the server context, one in the cq
CompletionOp() : refs_(2), finalized_(false), cancelled_(0) {}
CompletionOp() : has_tag_(false), tag_(nullptr), refs_(2), finalized_(false), cancelled_(0) {}

void FillOps(grpc_op* ops, size_t* nops) GRPC_OVERRIDE;
bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE;

bool CheckCancelled(CompletionQueue* cq);

void set_tag(void* tag) {
has_tag_ = true;
tag_ = tag;
}

void Unref();

private:
bool has_tag_;
void* tag_;
grpc::mutex mu_;
int refs_;
bool finalized_;
Expand Down Expand Up @@ -90,25 +97,34 @@ void ServerContext::CompletionOp::FillOps(grpc_op* ops, size_t* nops) {
bool ServerContext::CompletionOp::FinalizeResult(void** tag, bool* status) {
grpc::unique_lock<grpc::mutex> lock(mu_);
finalized_ = true;
bool ret = false;
if (has_tag_) {
*tag = tag_;
ret = true;
}
if (!*status) cancelled_ = 1;
if (--refs_ == 0) {
lock.unlock();
delete this;
}
return false;
return ret;
}

// ServerContext body

ServerContext::ServerContext()
: completion_op_(nullptr),
has_notify_when_done_tag_(false),
async_notify_when_done_tag_(nullptr),
call_(nullptr),
cq_(nullptr),
sent_initial_metadata_(false) {}

ServerContext::ServerContext(gpr_timespec deadline, grpc_metadata* metadata,
size_t metadata_count)
: completion_op_(nullptr),
has_notify_when_done_tag_(false),
async_notify_when_done_tag_(nullptr),
deadline_(deadline),
call_(nullptr),
cq_(nullptr),
Expand All @@ -133,6 +149,9 @@ ServerContext::~ServerContext() {
void ServerContext::BeginCompletionOp(Call* call) {
GPR_ASSERT(!completion_op_);
completion_op_ = new CompletionOp();
if (has_notify_when_done_tag_) {
completion_op_->set_tag(async_notify_when_done_tag_);
}
call->PerformOps(completion_op_);
}

Expand Down
2 changes: 1 addition & 1 deletion src/csharp/ext/grpc_csharp_ext.c
Original file line number Diff line number Diff line change
Expand Up @@ -732,7 +732,7 @@ grpcsharp_server_create(grpc_completion_queue *cq,

GPR_EXPORT gpr_int32 GPR_CALLTYPE
grpcsharp_server_add_insecure_http2_port(grpc_server *server, const char *addr) {
return grpc_server_add_http2_port(server, addr);
return grpc_server_add_insecure_http2_port(server, addr);
}

GPR_EXPORT void GPR_CALLTYPE grpcsharp_server_start(grpc_server *server) {
Expand Down
4 changes: 2 additions & 2 deletions src/node/ext/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,8 @@ NAN_METHOD(Server::AddHttp2Port) {
grpc_server_credentials *creds = creds_object->GetWrappedServerCredentials();
int port;
if (creds == NULL) {
port = grpc_server_add_http2_port(server->wrapped_server,
*NanUtf8String(args[0]));
port = grpc_server_add_insecure_http2_port(server->wrapped_server,
*NanUtf8String(args[0]));
} else {
port = grpc_server_add_secure_http2_port(server->wrapped_server,
*NanUtf8String(args[0]),
Expand Down
2 changes: 1 addition & 1 deletion src/php/ext/grpc/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ PHP_METHOD(Server, addHttp2Port) {
"add_http2_port expects a string", 1 TSRMLS_CC);
return;
}
RETURN_LONG(grpc_server_add_http2_port(server->wrapped, addr));
RETURN_LONG(grpc_server_add_insecure_http2_port(server->wrapped, addr));
}

PHP_METHOD(Server, addSecureHttp2Port) {
Expand Down
2 changes: 1 addition & 1 deletion src/python/grpcio/grpc/_adapter/_c/types/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ PyObject *pygrpc_Server_add_http2_port(
port = grpc_server_add_secure_http2_port(
self->c_serv, addr, creds->c_creds);
} else {
port = grpc_server_add_http2_port(self->c_serv, addr);
port = grpc_server_add_insecure_http2_port(self->c_serv, addr);
}
return PyInt_FromLong(port);

Expand Down
3 changes: 2 additions & 1 deletion src/ruby/ext/grpc/rb_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,8 @@ static VALUE grpc_rb_server_add_http2_port(int argc, VALUE *argv, VALUE self) {
rb_raise(rb_eRuntimeError, "destroyed!");
return Qnil;
} else if (rb_creds == Qnil) {
recvd_port = grpc_server_add_http2_port(s->wrapped, StringValueCStr(port));
recvd_port =
grpc_server_add_insecure_http2_port(s->wrapped, StringValueCStr(port));
if (recvd_port == 0) {
rb_raise(rb_eRuntimeError,
"could not add port %s to server, not sure why",
Expand Down
4 changes: 2 additions & 2 deletions test/core/end2end/dualstack_socket_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ void test_connect(const char *server_host, const char *client_host, int port,
cq = grpc_completion_queue_create();
server = grpc_server_create(NULL);
grpc_server_register_completion_queue(server, cq);
GPR_ASSERT((got_port = grpc_server_add_http2_port(server, server_hostport)) >
0);
GPR_ASSERT((got_port = grpc_server_add_insecure_http2_port(
server, server_hostport)) > 0);
if (port == 0) {
port = got_port;
} else {
Expand Down
2 changes: 1 addition & 1 deletion test/core/end2end/fixtures/chttp2_fullstack.c
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ void chttp2_init_server_fullstack(grpc_end2end_test_fixture *f,
}
f->server = grpc_server_create(server_args);
grpc_server_register_completion_queue(f->server, f->cq);
GPR_ASSERT(grpc_server_add_http2_port(f->server, ffd->localaddr));
GPR_ASSERT(grpc_server_add_insecure_http2_port(f->server, ffd->localaddr));
grpc_server_start(f->server);
}

Expand Down
2 changes: 1 addition & 1 deletion test/core/end2end/fixtures/chttp2_fullstack_compression.c
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ void chttp2_init_server_fullstack_compression(grpc_end2end_test_fixture *f,
}
f->server = grpc_server_create(ffd->server_args_compression);
grpc_server_register_completion_queue(f->server, f->cq);
GPR_ASSERT(grpc_server_add_http2_port(f->server, ffd->localaddr));
GPR_ASSERT(grpc_server_add_insecure_http2_port(f->server, ffd->localaddr));
grpc_server_start(f->server);
}

Expand Down
2 changes: 1 addition & 1 deletion test/core/end2end/fixtures/chttp2_fullstack_uds_posix.c
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ void chttp2_init_server_fullstack(grpc_end2end_test_fixture *f,
}
f->server = grpc_server_create(server_args);
grpc_server_register_completion_queue(f->server, f->cq);
GPR_ASSERT(grpc_server_add_http2_port(f->server, ffd->localaddr));
GPR_ASSERT(grpc_server_add_insecure_http2_port(f->server, ffd->localaddr));
grpc_server_start(f->server);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ void chttp2_init_server_fullstack(grpc_end2end_test_fixture *f,
}
f->server = grpc_server_create(server_args);
grpc_server_register_completion_queue(f->server, f->cq);
GPR_ASSERT(grpc_server_add_http2_port(f->server, ffd->localaddr));
GPR_ASSERT(grpc_server_add_insecure_http2_port(f->server, ffd->localaddr));
grpc_server_start(f->server);
}

Expand Down
2 changes: 1 addition & 1 deletion test/core/end2end/fixtures/chttp2_fullstack_with_poll.c
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ void chttp2_init_server_fullstack(grpc_end2end_test_fixture *f,
}
f->server = grpc_server_create(server_args);
grpc_server_register_completion_queue(f->server, f->cq);
GPR_ASSERT(grpc_server_add_http2_port(f->server, ffd->localaddr));
GPR_ASSERT(grpc_server_add_insecure_http2_port(f->server, ffd->localaddr));
grpc_server_start(f->server);
}

Expand Down
2 changes: 1 addition & 1 deletion test/core/end2end/multiple_server_queues_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ int main(int argc, char **argv) {
cq2 = grpc_completion_queue_create();
server = grpc_server_create(NULL);
grpc_server_register_completion_queue(server, cq1);
grpc_server_add_http2_port(server, "[::]:0");
grpc_server_add_insecure_http2_port(server, "[::]:0");
grpc_server_register_completion_queue(server, cq2);
grpc_server_start(server);
grpc_server_shutdown_and_notify(server, cq2, NULL);
Expand Down
2 changes: 1 addition & 1 deletion test/core/fling/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ int main(int argc, char **argv) {
grpc_server_credentials_release(ssl_creds);
} else {
server = grpc_server_create(NULL);
GPR_ASSERT(grpc_server_add_http2_port(server, addr));
GPR_ASSERT(grpc_server_add_insecure_http2_port(server, addr));
}
grpc_server_register_completion_queue(server, cq);
grpc_server_start(server);
Expand Down
74 changes: 74 additions & 0 deletions test/cpp/end2end/async_end2end_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,80 @@ TEST_F(AsyncEnd2endTest, MetadataRpc) {
EXPECT_EQ(meta6.second, server_trailing_metadata.find(meta6.first)->second);
EXPECT_GE(server_trailing_metadata.size(), static_cast<size_t>(2));
}

// Server uses AsyncNotifyWhenDone API to check for cancellation
TEST_F(AsyncEnd2endTest, ServerCheckCancellation) {
ResetStub();

EchoRequest send_request;
EchoRequest recv_request;
EchoResponse send_response;
EchoResponse recv_response;
Status recv_status;

ClientContext cli_ctx;
ServerContext srv_ctx;
grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);

send_request.set_message("Hello");
std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));

srv_ctx.AsyncNotifyWhenDone(tag(5));
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2));

Verifier().Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());

cli_ctx.TryCancel();
Verifier().Expect(5, true).Verify(cq_.get());
EXPECT_TRUE(srv_ctx.IsCancelled());

response_reader->Finish(&recv_response, &recv_status, tag(4));
Verifier().Expect(4, false).Verify(cq_.get());

EXPECT_EQ(StatusCode::CANCELLED, recv_status.error_code());
}

// Server uses AsyncNotifyWhenDone API to check for normal finish
TEST_F(AsyncEnd2endTest, ServerCheckDone) {
ResetStub();

EchoRequest send_request;
EchoRequest recv_request;
EchoResponse send_response;
EchoResponse recv_response;
Status recv_status;

ClientContext cli_ctx;
ServerContext srv_ctx;
grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);

send_request.set_message("Hello");
std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));

srv_ctx.AsyncNotifyWhenDone(tag(5));
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2));

Verifier().Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());

send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(3));
Verifier().Expect(3, true).Verify(cq_.get());
Verifier().Expect(5, true).Verify(cq_.get());
EXPECT_FALSE(srv_ctx.IsCancelled());

response_reader->Finish(&recv_response, &recv_status, tag(4));
Verifier().Expect(4, true).Verify(cq_.get());

EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok());
}

} // namespace
} // namespace testing
} // namespace grpc
Expand Down

0 comments on commit a600d58

Please sign in to comment.