Skip to content

Commit

Permalink
Merge github.com:grpc/grpc into churn-churn-churn-the-api-gently-down…
Browse files Browse the repository at this point in the history
…-the-stream
  • Loading branch information
ctiller committed May 8, 2015
2 parents 41af13e + e1b5f9b commit 2be6095
Show file tree
Hide file tree
Showing 11 changed files with 1,225 additions and 240 deletions.
190 changes: 178 additions & 12 deletions Makefile

Large diffs are not rendered by default.

74 changes: 72 additions & 2 deletions build.json
Original file line number Diff line number Diff line change
Expand Up @@ -1794,6 +1794,24 @@
"gpr"
]
},
{
"name": "async_unary_ping_pong_test",
"build": "test",
"run": false,
"language": "c++",
"src": [
"test/cpp/qps/async_unary_ping_pong_test.cc"
],
"deps": [
"qps",
"grpc++_test_util",
"grpc_test_util",
"grpc++",
"grpc",
"gpr_test_util",
"gpr"
]
},
{
"name": "channel_arguments_test",
"build": "test",
Expand Down Expand Up @@ -1999,6 +2017,22 @@
"gpr"
]
},
{
"name": "mock_test",
"build": "test",
"language": "c++",
"src": [
"test/cpp/end2end/mock_test.cc"
],
"deps": [
"grpc++_test_util",
"grpc_test_util",
"grpc++",
"grpc",
"gpr_test_util",
"gpr"
]
},
{
"name": "pubsub_client",
"build": "do_not_build",
Expand Down Expand Up @@ -2070,12 +2104,12 @@
]
},
{
"name": "qps_smoke_test",
"name": "qps_test",
"build": "test",
"run": false,
"language": "c++",
"src": [
"test/cpp/qps/smoke_test.cc"
"test/cpp/qps/qps_test.cc"
],
"deps": [
"qps",
Expand Down Expand Up @@ -2124,6 +2158,42 @@
"gpr"
]
},
{
"name": "sync_streaming_ping_pong_test",
"build": "test",
"run": false,
"language": "c++",
"src": [
"test/cpp/qps/sync_streaming_ping_pong_test.cc"
],
"deps": [
"qps",
"grpc++_test_util",
"grpc_test_util",
"grpc++",
"grpc",
"gpr_test_util",
"gpr"
]
},
{
"name": "sync_unary_ping_pong_test",
"build": "test",
"run": false,
"language": "c++",
"src": [
"test/cpp/qps/sync_unary_ping_pong_test.cc"
],
"deps": [
"qps",
"grpc++_test_util",
"grpc_test_util",
"grpc++",
"grpc",
"gpr_test_util",
"gpr"
]
},
{
"name": "thread_pool_test",
"build": "test",
Expand Down
13 changes: 12 additions & 1 deletion include/grpc++/async_unary_call.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,19 @@
#include <grpc/support/log.h>

namespace grpc {

template <class R>
class ClientAsyncResponseReaderInterface {
public:
virtual ~ClientAsyncResponseReaderInterface() {}
virtual void ReadInitialMetadata(void* tag) = 0;
virtual void Finish(R* msg, Status* status, void* tag) = 0;

};

template <class R>
class ClientAsyncResponseReader GRPC_FINAL {
class ClientAsyncResponseReader GRPC_FINAL
: public ClientAsyncResponseReaderInterface<R> {
public:
ClientAsyncResponseReader(ChannelInterface* channel, CompletionQueue* cq,
const RpcMethod& method, ClientContext* context,
Expand Down
98 changes: 67 additions & 31 deletions include/grpc++/stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,14 @@ class WriterInterface {
};

template <class R>
class ClientReader GRPC_FINAL : public ClientStreamingInterface,
public ReaderInterface<R> {
class ClientReaderInterface : public ClientStreamingInterface,
public ReaderInterface<R> {
public:
virtual void WaitForInitialMetadata() = 0;
};

template <class R>
class ClientReader GRPC_FINAL : public ClientReaderInterface<R> {
public:
// Blocking create a stream and write the first request out.
ClientReader(ChannelInterface* channel, const RpcMethod& method,
Expand All @@ -111,7 +117,7 @@ class ClientReader GRPC_FINAL : public ClientStreamingInterface,
GPR_ASSERT(cq_.Pluck(&buf));
}

virtual bool Read(R* msg) GRPC_OVERRIDE {
bool Read(R* msg) GRPC_OVERRIDE {
CallOpBuffer buf;
if (!context_->initial_metadata_received_) {
buf.AddRecvInitialMetadata(context_);
Expand All @@ -121,7 +127,7 @@ class ClientReader GRPC_FINAL : public ClientStreamingInterface,
return cq_.Pluck(&buf) && buf.got_message;
}

virtual Status Finish() GRPC_OVERRIDE {
Status Finish() GRPC_OVERRIDE {
CallOpBuffer buf;
Status status;
buf.AddClientRecvStatus(context_, &status);
Expand All @@ -137,8 +143,14 @@ class ClientReader GRPC_FINAL : public ClientStreamingInterface,
};

template <class W>
class ClientWriter GRPC_FINAL : public ClientStreamingInterface,
public WriterInterface<W> {
class ClientWriterInterface : public ClientStreamingInterface,
public WriterInterface<W> {
public:
virtual bool WritesDone() = 0;
};

template <class W>
class ClientWriter GRPC_FINAL : public ClientWriterInterface<W> {
public:
// Blocking create a stream.
ClientWriter(ChannelInterface* channel, const RpcMethod& method,
Expand All @@ -152,22 +164,22 @@ class ClientWriter GRPC_FINAL : public ClientStreamingInterface,
cq_.Pluck(&buf);
}

virtual bool Write(const W& msg) GRPC_OVERRIDE {
bool Write(const W& msg) GRPC_OVERRIDE {
CallOpBuffer buf;
buf.AddSendMessage(msg);
call_.PerformOps(&buf);
return cq_.Pluck(&buf);
}

virtual bool WritesDone() {
bool WritesDone() GRPC_OVERRIDE {
CallOpBuffer buf;
buf.AddClientSendClose();
call_.PerformOps(&buf);
return cq_.Pluck(&buf);
}

// Read the final response and wait for the final status.
virtual Status Finish() GRPC_OVERRIDE {
Status Finish() GRPC_OVERRIDE {
CallOpBuffer buf;
Status status;
buf.AddRecvMessage(response_);
Expand All @@ -186,9 +198,16 @@ class ClientWriter GRPC_FINAL : public ClientStreamingInterface,

// Client-side interface for bi-directional streaming.
template <class W, class R>
class ClientReaderWriter GRPC_FINAL : public ClientStreamingInterface,
public WriterInterface<W>,
public ReaderInterface<R> {
class ClientReaderWriterInterface : public ClientStreamingInterface,
public WriterInterface<W>,
public ReaderInterface<R> {
public:
virtual void WaitForInitialMetadata() = 0;
virtual bool WritesDone() = 0;
};

template <class W, class R>
class ClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface<W, R> {
public:
// Blocking create a stream.
ClientReaderWriter(ChannelInterface* channel, const RpcMethod& method,
Expand All @@ -213,7 +232,7 @@ class ClientReaderWriter GRPC_FINAL : public ClientStreamingInterface,
GPR_ASSERT(cq_.Pluck(&buf));
}

virtual bool Read(R* msg) GRPC_OVERRIDE {
bool Read(R* msg) GRPC_OVERRIDE {
CallOpBuffer buf;
if (!context_->initial_metadata_received_) {
buf.AddRecvInitialMetadata(context_);
Expand All @@ -223,21 +242,21 @@ class ClientReaderWriter GRPC_FINAL : public ClientStreamingInterface,
return cq_.Pluck(&buf) && buf.got_message;
}

virtual bool Write(const W& msg) GRPC_OVERRIDE {
bool Write(const W& msg) GRPC_OVERRIDE {
CallOpBuffer buf;
buf.AddSendMessage(msg);
call_.PerformOps(&buf);
return cq_.Pluck(&buf);
}

virtual bool WritesDone() {
bool WritesDone() GRPC_OVERRIDE {
CallOpBuffer buf;
buf.AddClientSendClose();
call_.PerformOps(&buf);
return cq_.Pluck(&buf);
}

virtual Status Finish() GRPC_OVERRIDE {
Status Finish() GRPC_OVERRIDE {
CallOpBuffer buf;
Status status;
buf.AddClientRecvStatus(context_, &status);
Expand Down Expand Up @@ -267,7 +286,7 @@ class ServerReader GRPC_FINAL : public ReaderInterface<R> {
call_->cq()->Pluck(&buf);
}

virtual bool Read(R* msg) GRPC_OVERRIDE {
bool Read(R* msg) GRPC_OVERRIDE {
CallOpBuffer buf;
buf.AddRecvMessage(msg);
call_->PerformOps(&buf);
Expand All @@ -294,7 +313,7 @@ class ServerWriter GRPC_FINAL : public WriterInterface<W> {
call_->cq()->Pluck(&buf);
}

virtual bool Write(const W& msg) GRPC_OVERRIDE {
bool Write(const W& msg) GRPC_OVERRIDE {
CallOpBuffer buf;
if (!ctx_->sent_initial_metadata_) {
buf.AddSendInitialMetadata(&ctx_->initial_metadata_);
Expand Down Expand Up @@ -327,14 +346,14 @@ class ServerReaderWriter GRPC_FINAL : public WriterInterface<W>,
call_->cq()->Pluck(&buf);
}

virtual bool Read(R* msg) GRPC_OVERRIDE {
bool Read(R* msg) GRPC_OVERRIDE {
CallOpBuffer buf;
buf.AddRecvMessage(msg);
call_->PerformOps(&buf);
return call_->cq()->Pluck(&buf) && buf.got_message;
}

virtual bool Write(const W& msg) GRPC_OVERRIDE {
bool Write(const W& msg) GRPC_OVERRIDE {
CallOpBuffer buf;
if (!ctx_->sent_initial_metadata_) {
buf.AddSendInitialMetadata(&ctx_->initial_metadata_);
Expand Down Expand Up @@ -380,8 +399,12 @@ class AsyncWriterInterface {
};

template <class R>
class ClientAsyncReader GRPC_FINAL : public ClientAsyncStreamingInterface,
public AsyncReaderInterface<R> {
class ClientAsyncReaderInterface : public ClientAsyncStreamingInterface,
public AsyncReaderInterface<R> {
};

template <class R>
class ClientAsyncReader GRPC_FINAL : public ClientAsyncReaderInterface<R> {
public:
// Create a stream and write the first request out.
ClientAsyncReader(ChannelInterface* channel, CompletionQueue* cq,
Expand Down Expand Up @@ -431,8 +454,14 @@ class ClientAsyncReader GRPC_FINAL : public ClientAsyncStreamingInterface,
};

template <class W>
class ClientAsyncWriter GRPC_FINAL : public ClientAsyncStreamingInterface,
public AsyncWriterInterface<W> {
class ClientAsyncWriterInterface : public ClientAsyncStreamingInterface,
public AsyncWriterInterface<W> {
public:
virtual void WritesDone(void* tag) = 0;
};

template <class W>
class ClientAsyncWriter GRPC_FINAL : public ClientAsyncWriterInterface<W> {
public:
ClientAsyncWriter(ChannelInterface* channel, CompletionQueue* cq,
const RpcMethod& method, ClientContext* context,
Expand All @@ -459,7 +488,7 @@ class ClientAsyncWriter GRPC_FINAL : public ClientAsyncStreamingInterface,
call_.PerformOps(&write_buf_);
}

void WritesDone(void* tag) {
void WritesDone(void* tag) GRPC_OVERRIDE {
writes_done_buf_.Reset(tag);
writes_done_buf_.AddClientSendClose();
call_.PerformOps(&writes_done_buf_);
Expand Down Expand Up @@ -488,9 +517,16 @@ class ClientAsyncWriter GRPC_FINAL : public ClientAsyncStreamingInterface,

// Client-side interface for bi-directional streaming.
template <class W, class R>
class ClientAsyncReaderWriter GRPC_FINAL : public ClientAsyncStreamingInterface,
public AsyncWriterInterface<W>,
public AsyncReaderInterface<R> {
class ClientAsyncReaderWriterInterface : public ClientAsyncStreamingInterface,
public AsyncWriterInterface<W>,
public AsyncReaderInterface<R> {
public:
virtual void WritesDone(void* tag) = 0;
};

template <class W, class R>
class ClientAsyncReaderWriter GRPC_FINAL
: public ClientAsyncReaderWriterInterface<W, R> {
public:
ClientAsyncReaderWriter(ChannelInterface* channel, CompletionQueue* cq,
const RpcMethod& method, ClientContext* context,
Expand Down Expand Up @@ -524,7 +560,7 @@ class ClientAsyncReaderWriter GRPC_FINAL : public ClientAsyncStreamingInterface,
call_.PerformOps(&write_buf_);
}

void WritesDone(void* tag) {
void WritesDone(void* tag) GRPC_OVERRIDE {
writes_done_buf_.Reset(tag);
writes_done_buf_.AddClientSendClose();
call_.PerformOps(&writes_done_buf_);
Expand Down Expand Up @@ -671,13 +707,13 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
call_.PerformOps(&meta_buf_);
}

virtual void Read(R* msg, void* tag) GRPC_OVERRIDE {
void Read(R* msg, void* tag) GRPC_OVERRIDE {
read_buf_.Reset(tag);
read_buf_.AddRecvMessage(msg);
call_.PerformOps(&read_buf_);
}

virtual void Write(const W& msg, void* tag) GRPC_OVERRIDE {
void Write(const W& msg, void* tag) GRPC_OVERRIDE {
write_buf_.Reset(tag);
if (!ctx_->sent_initial_metadata_) {
write_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
Expand Down
Loading

0 comments on commit 2be6095

Please sign in to comment.