Skip to content

Commit

Permalink
Fix client_crash_test, implement idempotency, fail_fast for C++
Browse files Browse the repository at this point in the history
  • Loading branch information
ctiller committed Apr 1, 2016
1 parent 2fe814d commit 399b3c4
Show file tree
Hide file tree
Showing 10 changed files with 90 additions and 37 deletions.
36 changes: 24 additions & 12 deletions include/grpc++/impl/codegen/async_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ class ClientAsyncReader GRPC_FINAL : public ClientAsyncReaderInterface<R> {
const W& request, void* tag)
: context_(context), call_(channel->CreateCall(method, context, cq)) {
init_ops_.set_output_tag(tag);
init_ops_.SendInitialMetadata(context->send_initial_metadata_);
init_ops_.SendInitialMetadata(context->send_initial_metadata_,
context->initial_metadata_flags());
// TODO(ctiller): don't assert
GPR_CODEGEN_ASSERT(init_ops_.SendMessage(request).ok());
init_ops_.ClientSendClose();
Expand Down Expand Up @@ -173,7 +174,8 @@ class ClientAsyncWriter GRPC_FINAL : public ClientAsyncWriterInterface<W> {
finish_ops_.RecvMessage(response);

init_ops_.set_output_tag(tag);
init_ops_.SendInitialMetadata(context->send_initial_metadata_);
init_ops_.SendInitialMetadata(context->send_initial_metadata_,
context->initial_metadata_flags());
call_.PerformOps(&init_ops_);
}

Expand Down Expand Up @@ -240,7 +242,8 @@ class ClientAsyncReaderWriter GRPC_FINAL
void* tag)
: context_(context), call_(channel->CreateCall(method, context, cq)) {
init_ops_.set_output_tag(tag);
init_ops_.SendInitialMetadata(context->send_initial_metadata_);
init_ops_.SendInitialMetadata(context->send_initial_metadata_,
context->initial_metadata_flags());
call_.PerformOps(&init_ops_);
}

Expand Down Expand Up @@ -305,7 +308,8 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface,
GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);

meta_ops_.set_output_tag(tag);
meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
meta_ops_.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
ctx_->sent_initial_metadata_ = true;
call_.PerformOps(&meta_ops_);
}
Expand All @@ -319,7 +323,8 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface,
void Finish(const W& msg, const Status& status, void* tag) {
finish_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
finish_ops_.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
ctx_->sent_initial_metadata_ = true;
}
// The response is dropped if the status is not OK.
Expand All @@ -336,7 +341,8 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface,
GPR_CODEGEN_ASSERT(!status.ok());
finish_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
finish_ops_.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
ctx_->sent_initial_metadata_ = true;
}
finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
Expand Down Expand Up @@ -366,15 +372,17 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);

meta_ops_.set_output_tag(tag);
meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
meta_ops_.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
ctx_->sent_initial_metadata_ = true;
call_.PerformOps(&meta_ops_);
}

void Write(const W& msg, void* tag) GRPC_OVERRIDE {
write_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
write_ops_.SendInitialMetadata(ctx_->initial_metadata_);
write_ops_.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
ctx_->sent_initial_metadata_ = true;
}
// TODO(ctiller): don't assert
Expand All @@ -385,7 +393,8 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
void Finish(const Status& status, void* tag) {
finish_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
finish_ops_.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
ctx_->sent_initial_metadata_ = true;
}
finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
Expand Down Expand Up @@ -415,7 +424,8 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);

meta_ops_.set_output_tag(tag);
meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
meta_ops_.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
ctx_->sent_initial_metadata_ = true;
call_.PerformOps(&meta_ops_);
}
Expand All @@ -429,7 +439,8 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
void Write(const W& msg, void* tag) GRPC_OVERRIDE {
write_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
write_ops_.SendInitialMetadata(ctx_->initial_metadata_);
write_ops_.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
ctx_->sent_initial_metadata_ = true;
}
// TODO(ctiller): don't assert
Expand All @@ -440,7 +451,8 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
void Finish(const Status& status, void* tag) {
finish_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
finish_ops_.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
ctx_->sent_initial_metadata_ = true;
}
finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
Expand Down
9 changes: 6 additions & 3 deletions include/grpc++/impl/codegen/async_unary_call.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ class ClientAsyncResponseReader GRPC_FINAL
call_(channel->CreateCall(method, context, cq)),
collection_(new CallOpSetCollection) {
collection_->init_buf_.SetCollection(collection_);
collection_->init_buf_.SendInitialMetadata(context->send_initial_metadata_);
collection_->init_buf_.SendInitialMetadata(
context->send_initial_metadata_, context->initial_metadata_flags());
// TODO(ctiller): don't assert
GPR_CODEGEN_ASSERT(collection_->init_buf_.SendMessage(request).ok());
collection_->init_buf_.ClientSendClose();
Expand Down Expand Up @@ -122,15 +123,17 @@ class ServerAsyncResponseWriter GRPC_FINAL
GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);

meta_buf_.set_output_tag(tag);
meta_buf_.SendInitialMetadata(ctx_->initial_metadata_);
meta_buf_.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
ctx_->sent_initial_metadata_ = true;
call_.PerformOps(&meta_buf_);
}

void Finish(const W& msg, const Status& status, void* tag) {
finish_buf_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
finish_buf_.SendInitialMetadata(ctx_->initial_metadata_);
finish_buf_.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
ctx_->sent_initial_metadata_ = true;
}
// The response is dropped if the status is not OK.
Expand Down
7 changes: 5 additions & 2 deletions include/grpc++/impl/codegen/call.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,10 @@ class CallOpSendInitialMetadata {
CallOpSendInitialMetadata() : send_(false) {}

void SendInitialMetadata(
const std::multimap<grpc::string, grpc::string>& metadata) {
const std::multimap<grpc::string, grpc::string>& metadata,
uint32_t flags) {
send_ = true;
flags_ = flags;
initial_metadata_count_ = metadata.size();
initial_metadata_ = FillMetadataArray(metadata);
}
Expand All @@ -192,7 +194,7 @@ class CallOpSendInitialMetadata {
if (!send_) return;
grpc_op* op = &ops[(*nops)++];
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->flags = 0;
op->flags = flags_;
op->reserved = NULL;
op->data.send_initial_metadata.count = initial_metadata_count_;
op->data.send_initial_metadata.metadata = initial_metadata_;
Expand All @@ -204,6 +206,7 @@ class CallOpSendInitialMetadata {
}

bool send_;
uint32_t flags_;
size_t initial_metadata_count_;
grpc_metadata* initial_metadata_;
};
Expand Down
13 changes: 13 additions & 0 deletions include/grpc++/impl/codegen/client_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,12 @@ class ClientContext {
deadline_ = deadline_tp.raw_time();
}

/// EXPERIMENTAL: Set this request to be idempotent
void set_idempotent(bool idempotent) { idempotent_ = idempotent; }

/// EXPERIMENTAL: Trigger fail-fast or not on this request
void set_fail_fast(bool fail_fast) { fail_fast_ = fail_fast; }

#ifndef GRPC_CXX0X_NO_CHRONO
/// Return the deadline for the client call.
std::chrono::system_clock::time_point deadline() {
Expand Down Expand Up @@ -328,9 +334,16 @@ class ClientContext {
grpc_call* call() { return call_; }
void set_call(grpc_call* call, const std::shared_ptr<Channel>& channel);

uint32_t initial_metadata_flags() const {
return (idempotent_ ? GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST : 0) |
(fail_fast_ ? 0 : GRPC_INITIAL_METADATA_IGNORE_CONNECTIVITY);
}

grpc::string authority() { return authority_; }

bool initial_metadata_received_;
bool fail_fast_;
bool idempotent_;
std::shared_ptr<Channel> channel_;
grpc::mutex mu_;
grpc_call* call_;
Expand Down
3 changes: 2 additions & 1 deletion include/grpc++/impl/codegen/client_unary_call.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method,
if (!status.ok()) {
return status;
}
ops.SendInitialMetadata(context->send_initial_metadata_);
ops.SendInitialMetadata(context->send_initial_metadata_,
context->initial_metadata_flags());
ops.RecvInitialMetadata(context);
ops.RecvMessage(result);
ops.ClientSendClose();
Expand Down
29 changes: 18 additions & 11 deletions include/grpc++/impl/codegen/method_handler_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ namespace grpc {
template <class ServiceType, class RequestType, class ResponseType>
class RpcMethodHandler : public MethodHandler {
public:
RpcMethodHandler(
std::function<Status(ServiceType*, ServerContext*, const RequestType*,
ResponseType*)> func,
ServiceType* service)
RpcMethodHandler(std::function<Status(ServiceType*, ServerContext*,
const RequestType*, ResponseType*)>
func,
ServiceType* service)
: func_(func), service_(service) {}

void RunHandler(const HandlerParameter& param) GRPC_FINAL {
Expand All @@ -63,7 +63,8 @@ class RpcMethodHandler : public MethodHandler {
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
CallOpServerSendStatus>
ops;
ops.SendInitialMetadata(param.server_context->initial_metadata_);
ops.SendInitialMetadata(param.server_context->initial_metadata_,
param.server_context->initial_metadata_flags());
if (status.ok()) {
status = ops.SendMessage(rsp);
}
Expand All @@ -87,7 +88,8 @@ class ClientStreamingHandler : public MethodHandler {
public:
ClientStreamingHandler(
std::function<Status(ServiceType*, ServerContext*,
ServerReader<RequestType>*, ResponseType*)> func,
ServerReader<RequestType>*, ResponseType*)>
func,
ServiceType* service)
: func_(func), service_(service) {}

Expand All @@ -100,7 +102,8 @@ class ClientStreamingHandler : public MethodHandler {
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
CallOpServerSendStatus>
ops;
ops.SendInitialMetadata(param.server_context->initial_metadata_);
ops.SendInitialMetadata(param.server_context->initial_metadata_,
param.server_context->initial_metadata_flags());
if (status.ok()) {
status = ops.SendMessage(rsp);
}
Expand All @@ -122,7 +125,8 @@ class ServerStreamingHandler : public MethodHandler {
public:
ServerStreamingHandler(
std::function<Status(ServiceType*, ServerContext*, const RequestType*,
ServerWriter<ResponseType>*)> func,
ServerWriter<ResponseType>*)>
func,
ServiceType* service)
: func_(func), service_(service) {}

Expand All @@ -138,7 +142,8 @@ class ServerStreamingHandler : public MethodHandler {

CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops;
if (!param.server_context->sent_initial_metadata_) {
ops.SendInitialMetadata(param.server_context->initial_metadata_);
ops.SendInitialMetadata(param.server_context->initial_metadata_,
param.server_context->initial_metadata_flags());
}
ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
param.call->PerformOps(&ops);
Expand Down Expand Up @@ -170,7 +175,8 @@ class BidiStreamingHandler : public MethodHandler {

CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops;
if (!param.server_context->sent_initial_metadata_) {
ops.SendInitialMetadata(param.server_context->initial_metadata_);
ops.SendInitialMetadata(param.server_context->initial_metadata_,
param.server_context->initial_metadata_flags());
}
ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
param.call->PerformOps(&ops);
Expand All @@ -191,7 +197,8 @@ class UnknownMethodHandler : public MethodHandler {
static void FillOps(ServerContext* context, T* ops) {
Status status(StatusCode::UNIMPLEMENTED, "");
if (!context->sent_initial_metadata_) {
ops->SendInitialMetadata(context->initial_metadata_);
ops->SendInitialMetadata(context->initial_metadata_,
context->initial_metadata_flags());
context->sent_initial_metadata_ = true;
}
ops->ServerSendStatus(context->trailing_metadata_, status);
Expand Down
2 changes: 2 additions & 0 deletions include/grpc++/impl/codegen/server_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@ class ServerContext {

void set_call(grpc_call* call);

uint32_t initial_metadata_flags() const { return 0; }

CompletionOp* completion_op_;
bool has_notify_when_done_tag_;
void* async_notify_when_done_tag_;
Expand Down
24 changes: 16 additions & 8 deletions include/grpc++/impl/codegen/sync_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,8 @@ class ClientReader GRPC_FINAL : public ClientReaderInterface<R> {
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
CallOpClientSendClose>
ops;
ops.SendInitialMetadata(context->send_initial_metadata_);
ops.SendInitialMetadata(context->send_initial_metadata_,
context->initial_metadata_flags());
// TODO(ctiller): don't assert
GPR_CODEGEN_ASSERT(ops.SendMessage(request).ok());
ops.ClientSendClose();
Expand Down Expand Up @@ -190,7 +191,8 @@ class ClientWriter : public ClientWriterInterface<W> {
finish_ops_.RecvMessage(response);

CallOpSet<CallOpSendInitialMetadata> ops;
ops.SendInitialMetadata(context->send_initial_metadata_);
ops.SendInitialMetadata(context->send_initial_metadata_,
context->initial_metadata_flags());
call_.PerformOps(&ops);
cq_.Pluck(&ops);
}
Expand Down Expand Up @@ -268,7 +270,8 @@ class ClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface<W, R> {
ClientContext* context)
: context_(context), call_(channel->CreateCall(method, context, &cq_)) {
CallOpSet<CallOpSendInitialMetadata> ops;
ops.SendInitialMetadata(context->send_initial_metadata_);
ops.SendInitialMetadata(context->send_initial_metadata_,
context->initial_metadata_flags());
call_.PerformOps(&ops);
cq_.Pluck(&ops);
}
Expand Down Expand Up @@ -334,7 +337,8 @@ class ServerReader GRPC_FINAL : public ReaderInterface<R> {
GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);

CallOpSet<CallOpSendInitialMetadata> ops;
ops.SendInitialMetadata(ctx_->initial_metadata_);
ops.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
ctx_->sent_initial_metadata_ = true;
call_->PerformOps(&ops);
call_->cq()->Pluck(&ops);
Expand All @@ -361,7 +365,8 @@ class ServerWriter GRPC_FINAL : public WriterInterface<W> {
GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);

CallOpSet<CallOpSendInitialMetadata> ops;
ops.SendInitialMetadata(ctx_->initial_metadata_);
ops.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
ctx_->sent_initial_metadata_ = true;
call_->PerformOps(&ops);
call_->cq()->Pluck(&ops);
Expand All @@ -374,7 +379,8 @@ class ServerWriter GRPC_FINAL : public WriterInterface<W> {
return false;
}
if (!ctx_->sent_initial_metadata_) {
ops.SendInitialMetadata(ctx_->initial_metadata_);
ops.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
ctx_->sent_initial_metadata_ = true;
}
call_->PerformOps(&ops);
Expand All @@ -397,7 +403,8 @@ class ServerReaderWriter GRPC_FINAL : public WriterInterface<W>,
GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);

CallOpSet<CallOpSendInitialMetadata> ops;
ops.SendInitialMetadata(ctx_->initial_metadata_);
ops.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
ctx_->sent_initial_metadata_ = true;
call_->PerformOps(&ops);
call_->cq()->Pluck(&ops);
Expand All @@ -417,7 +424,8 @@ class ServerReaderWriter GRPC_FINAL : public WriterInterface<W>,
return false;
}
if (!ctx_->sent_initial_metadata_) {
ops.SendInitialMetadata(ctx_->initial_metadata_);
ops.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
ctx_->sent_initial_metadata_ = true;
}
call_->PerformOps(&ops);
Expand Down
Loading

0 comments on commit 399b3c4

Please sign in to comment.