Skip to content

Commit

Permalink
Merge pull request grpc#588 from yang-g/c++api
Browse files Browse the repository at this point in the history
Async client api change. Add a ClientAsyncResponseReader.
  • Loading branch information
ctiller committed Feb 18, 2015
2 parents 1ff680a + 68bc778 commit 8a287d1
Show file tree
Hide file tree
Showing 9 changed files with 213 additions and 128 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3009,6 +3009,7 @@ LIBGRPC++_SRC = \
src/cpp/util/time.cc \

PUBLIC_HEADERS_CXX += \
include/grpc++/async_unary_call.h \
include/grpc++/channel_arguments.h \
include/grpc++/channel_interface.h \
include/grpc++/client_context.h \
Expand Down
1 change: 1 addition & 0 deletions build.json
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,7 @@
"build": "all",
"language": "c++",
"public_headers": [
"include/grpc++/async_unary_call.h",
"include/grpc++/channel_arguments.h",
"include/grpc++/channel_interface.h",
"include/grpc++/client_context.h",
Expand Down
144 changes: 144 additions & 0 deletions include/grpc++/async_unary_call.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/

#ifndef __GRPCPP_ASYNC_UNARY_CALL_H__
#define __GRPCPP_ASYNC_UNARY_CALL_H__

#include <grpc++/channel_interface.h>
#include <grpc++/client_context.h>
#include <grpc++/completion_queue.h>
#include <grpc++/server_context.h>
#include <grpc++/impl/call.h>
#include <grpc++/impl/service_type.h>
#include <grpc++/status.h>
#include <grpc/support/log.h>

namespace grpc {
template <class R>
class ClientAsyncResponseReader final {
public:
ClientAsyncResponseReader(ChannelInterface* channel, CompletionQueue* cq,
const RpcMethod& method, ClientContext* context,
const google::protobuf::Message& request, void* tag)
: context_(context),
call_(channel->CreateCall(method, context, cq)) {
init_buf_.Reset(tag);
init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_);
init_buf_.AddSendMessage(request);
init_buf_.AddClientSendClose();
call_.PerformOps(&init_buf_);
}

void ReadInitialMetadata(void* tag) {
GPR_ASSERT(!context_->initial_metadata_received_);

meta_buf_.Reset(tag);
meta_buf_.AddRecvInitialMetadata(context_);
call_.PerformOps(&meta_buf_);
}

void Finish(R* msg, Status* status, void* tag) {
finish_buf_.Reset(tag);
if (!context_->initial_metadata_received_) {
finish_buf_.AddRecvInitialMetadata(context_);
}
finish_buf_.AddRecvMessage(msg);
finish_buf_.AddClientRecvStatus(context_, status);
call_.PerformOps(&finish_buf_);
}


private:
ClientContext* context_ = nullptr;
Call call_;
CallOpBuffer init_buf_;
CallOpBuffer meta_buf_;
CallOpBuffer finish_buf_;
};

template <class W>
class ServerAsyncResponseWriter final : public ServerAsyncStreamingInterface {
public:
explicit ServerAsyncResponseWriter(ServerContext* ctx)
: call_(nullptr, nullptr, nullptr), ctx_(ctx) {}

void SendInitialMetadata(void* tag) {
GPR_ASSERT(!ctx_->sent_initial_metadata_);

meta_buf_.Reset(tag);
meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
call_.PerformOps(&meta_buf_);
}

void Finish(const W& msg, const Status& status, void* tag) {
finish_buf_.Reset(tag);
if (!ctx_->sent_initial_metadata_) {
finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
// The response is dropped if the status is not OK.
if (status.IsOk()) {
finish_buf_.AddSendMessage(msg);
}
bool cancelled = false;
finish_buf_.AddServerRecvClose(&cancelled);
finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
call_.PerformOps(&finish_buf_);
}

void FinishWithError(const Status& status, void* tag) {
GPR_ASSERT(!status.IsOk());
finish_buf_.Reset(tag);
if (!ctx_->sent_initial_metadata_) {
finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
bool cancelled = false;
finish_buf_.AddServerRecvClose(&cancelled);
finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
call_.PerformOps(&finish_buf_);
}

private:
void BindCall(Call* call) override { call_ = *call; }

Call call_;
ServerContext* ctx_;
CallOpBuffer meta_buf_;
CallOpBuffer finish_buf_;
};

} // namespace grpc

#endif // __GRPCPP_ASYNC_UNARY_CALL_H__
4 changes: 4 additions & 0 deletions include/grpc++/client_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ template <class W>
class ClientAsyncWriter;
template <class R, class W>
class ClientAsyncReaderWriter;
template <class R>
class ClientAsyncResponseReader;

class ClientContext {
public:
Expand Down Expand Up @@ -119,6 +121,8 @@ class ClientContext {
friend class ::grpc::ClientAsyncWriter;
template <class R, class W>
friend class ::grpc::ClientAsyncReaderWriter;
template <class R>
friend class ::grpc::ClientAsyncResponseReader;

grpc_call *call() { return call_; }
void set_call(grpc_call *call) {
Expand Down
7 changes: 0 additions & 7 deletions include/grpc++/impl/client_unary_call.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,6 @@ class CompletionQueue;
class RpcMethod;
class Status;

// Wrapper that begins an asynchronous unary call
void AsyncUnaryCall(ChannelInterface *channel, const RpcMethod &method,
ClientContext *context,
const google::protobuf::Message &request,
google::protobuf::Message *result, Status *status,
CompletionQueue *cq, void *tag);

// Wrapper that performs a blocking unary call
Status BlockingUnaryCall(ChannelInterface *channel, const RpcMethod &method,
ClientContext *context,
Expand Down
54 changes: 0 additions & 54 deletions include/grpc++/stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -550,60 +550,6 @@ class ClientAsyncReaderWriter final : public ClientAsyncStreamingInterface,
CallOpBuffer finish_buf_;
};

// TODO(yangg) Move out of stream.h
template <class W>
class ServerAsyncResponseWriter final : public ServerAsyncStreamingInterface {
public:
explicit ServerAsyncResponseWriter(ServerContext* ctx)
: call_(nullptr, nullptr, nullptr), ctx_(ctx) {}

void SendInitialMetadata(void* tag) {
GPR_ASSERT(!ctx_->sent_initial_metadata_);

meta_buf_.Reset(tag);
meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
call_.PerformOps(&meta_buf_);
}

void Finish(const W& msg, const Status& status, void* tag) {
finish_buf_.Reset(tag);
if (!ctx_->sent_initial_metadata_) {
finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
// The response is dropped if the status is not OK.
if (status.IsOk()) {
finish_buf_.AddSendMessage(msg);
}
bool cancelled = false;
finish_buf_.AddServerRecvClose(&cancelled);
finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
call_.PerformOps(&finish_buf_);
}

void FinishWithError(const Status& status, void* tag) {
GPR_ASSERT(!status.IsOk());
finish_buf_.Reset(tag);
if (!ctx_->sent_initial_metadata_) {
finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
bool cancelled = false;
finish_buf_.AddServerRecvClose(&cancelled);
finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
call_.PerformOps(&finish_buf_);
}

private:
void BindCall(Call* call) override { call_ = *call; }

Call call_;
ServerContext* ctx_;
CallOpBuffer meta_buf_;
CallOpBuffer finish_buf_;
};

template <class W, class R>
class ServerAsyncReader : public ServerAsyncStreamingInterface,
public AsyncReaderInterface<R> {
Expand Down
22 changes: 13 additions & 9 deletions src/compiler/cpp_generator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ std::string GetHeaderIncludes(const google::protobuf::FileDescriptor *file) {
"class RpcService;\n"
"class ServerContext;\n";
if (HasUnaryCalls(file)) {
temp.append(
"template <class OutMessage> class ClientAsyncResponseReader;\n");
temp.append(
"template <class OutMessage> class ServerAsyncResponseWriter;\n");
}
Expand Down Expand Up @@ -160,7 +162,8 @@ std::string GetHeaderIncludes(const google::protobuf::FileDescriptor *file) {
}

std::string GetSourceIncludes() {
return "#include <grpc++/channel_interface.h>\n"
return "#include <grpc++/async_unary_call.h>\n"
"#include <grpc++/channel_interface.h>\n"
"#include <grpc++/impl/client_unary_call.h>\n"
"#include <grpc++/impl/rpc_method.h>\n"
"#include <grpc++/impl/rpc_service_method.h>\n"
Expand All @@ -181,9 +184,9 @@ void PrintHeaderClientMethod(google::protobuf::io::Printer *printer,
"::grpc::Status $Method$(::grpc::ClientContext* context, "
"const $Request$& request, $Response$* response);\n");
printer->Print(*vars,
"void $Method$(::grpc::ClientContext* context, "
"const $Request$& request, $Response$* response, "
"::grpc::Status* status, "
"::grpc::ClientAsyncResponseReader< $Response$>* "
"$Method$(::grpc::ClientContext* context, "
"const $Request$& request, "
"::grpc::CompletionQueue* cq, void* tag);\n");
} else if (ClientOnlyStreaming(method)) {
printer->Print(*vars,
Expand Down Expand Up @@ -378,14 +381,15 @@ void PrintSourceClientMethod(google::protobuf::io::Printer *printer,
"context, request, response);\n"
"}\n\n");
printer->Print(*vars,
"void $Service$::Stub::$Method$("
"::grpc::ClientContext* context, "
"const $Request$& request, $Response$* response, ::grpc::Status* status, "
"::grpc::ClientAsyncResponseReader< $Response$>* "
"$Service$::Stub::$Method$(::grpc::ClientContext* context, "
"const $Request$& request, "
"::grpc::CompletionQueue* cq, void* tag) {\n");
printer->Print(*vars,
" ::grpc::AsyncUnaryCall(channel(),"
" return new ClientAsyncResponseReader< $Response$>("
"channel(), cq, "
"::grpc::RpcMethod($Service$_method_names[$Idx$]), "
"context, request, response, status, cq, tag);\n"
"context, request, tag);\n"
"}\n\n");
} else if (ClientOnlyStreaming(method)) {
printer->Print(
Expand Down
26 changes: 0 additions & 26 deletions src/cpp/client/client_unary_call.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,30 +60,4 @@ Status BlockingUnaryCall(ChannelInterface *channel, const RpcMethod &method,
GPR_ASSERT((cq.Pluck(&buf) && buf.got_message) || !status.IsOk());
return status;
}

class ClientAsyncRequest final : public CallOpBuffer {
public:
void FinalizeResult(void **tag, bool *status) override {
CallOpBuffer::FinalizeResult(tag, status);
delete this;
}
};

void AsyncUnaryCall(ChannelInterface *channel, const RpcMethod &method,
ClientContext *context,
const google::protobuf::Message &request,
google::protobuf::Message *result, Status *status,
CompletionQueue *cq, void *tag) {
ClientAsyncRequest *buf = new ClientAsyncRequest;
buf->Reset(tag);
Call call(channel->CreateCall(method, context, cq));
buf->AddSendInitialMetadata(context);
buf->AddSendMessage(request);
buf->AddRecvInitialMetadata(context);
buf->AddRecvMessage(result);
buf->AddClientSendClose();
buf->AddClientRecvStatus(context, status);
call.PerformOps(buf);
}

} // namespace grpc
Loading

0 comments on commit 8a287d1

Please sign in to comment.