Skip to content

Commit

Permalink
Merge pull request grpc#1023 from vjpai/async
Browse files Browse the repository at this point in the history
Non-blocking Next method for C++ async completion queue
  • Loading branch information
yang-g committed Mar 17, 2015
2 parents d942720 + 7aadf46 commit 700bdeb
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 8 deletions.
20 changes: 16 additions & 4 deletions include/grpc++/completion_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#ifndef GRPCXX_COMPLETION_QUEUE_H
#define GRPCXX_COMPLETION_QUEUE_H

#include <chrono>
#include <grpc++/impl/client_unary_call.h>

struct grpc_completion_queue;
Expand Down Expand Up @@ -75,10 +76,21 @@ class CompletionQueue {
explicit CompletionQueue(grpc_completion_queue *take);
~CompletionQueue();

// Blocking read from queue.
// Returns true if an event was received, false if the queue is ready
// for destruction.
bool Next(void **tag, bool *ok);
// Tri-state return for AsyncNext: SHUTDOWN, GOT_EVENT, TIMEOUT
enum NextStatus {SHUTDOWN, GOT_EVENT, TIMEOUT};

// Nonblocking (until deadline) read from queue.
// Cannot rely on result of tag or ok if return is TIMEOUT
NextStatus AsyncNext(void **tag, bool *ok,
std::chrono::system_clock::time_point deadline);

// Blocking (until deadline) read from queue.
// Returns false if the queue is ready for destruction, true if event
bool Next(void **tag, bool *ok) {
return (AsyncNext(tag,ok,
std::chrono::system_clock::time_point::max()) !=
SHUTDOWN);
}

// Shutdown has to be called, and the CompletionQueue can only be
// destructed when false is returned from Next().
Expand Down
15 changes: 11 additions & 4 deletions src/cpp/common/completion_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,19 +57,26 @@ class EventDeleter {
}
};

bool CompletionQueue::Next(void** tag, bool* ok) {
CompletionQueue::NextStatus
CompletionQueue::AsyncNext(void** tag, bool* ok,
std::chrono::system_clock::time_point deadline) {
std::unique_ptr<grpc_event, EventDeleter> ev;

gpr_timespec gpr_deadline;
Timepoint2Timespec(deadline, &gpr_deadline);
for (;;) {
ev.reset(grpc_completion_queue_next(cq_, gpr_inf_future));
ev.reset(grpc_completion_queue_next(cq_, gpr_deadline));
if (!ev) { /* got a NULL back because deadline passed */
return TIMEOUT;
}
if (ev->type == GRPC_QUEUE_SHUTDOWN) {
return false;
return SHUTDOWN;
}
auto cq_tag = static_cast<CompletionQueueTag*>(ev->tag);
*ok = ev->data.op_complete == GRPC_OP_OK;
*tag = cq_tag;
if (cq_tag->FinalizeResult(tag, ok)) {
return true;
return GOT_EVENT;
}
}
}
Expand Down
58 changes: 58 additions & 0 deletions test/cpp/end2end/async_end2end_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,20 @@ void verify_ok(CompletionQueue* cq, int i, bool expect_ok) {
EXPECT_EQ(tag(i), got_tag);
}

void verify_timed_ok(CompletionQueue* cq, int i, bool expect_ok,
std::chrono::system_clock::time_point deadline =
std::chrono::system_clock::time_point::max(),
CompletionQueue::NextStatus expected_outcome =
CompletionQueue::GOT_EVENT) {
bool ok;
void* got_tag;
EXPECT_EQ(cq->AsyncNext(&got_tag, &ok, deadline), expected_outcome);
if (expected_outcome == CompletionQueue::GOT_EVENT) {
EXPECT_EQ(expect_ok, ok);
EXPECT_EQ(tag(i), got_tag);
}
}

class AsyncEnd2endTest : public ::testing::Test {
protected:
AsyncEnd2endTest() : service_(&srv_cq_) {}
Expand Down Expand Up @@ -166,6 +180,50 @@ TEST_F(AsyncEnd2endTest, SequentialRpcs) {
SendRpc(10);
}

// Test a simple RPC using the async version of Next
TEST_F(AsyncEnd2endTest, AsyncNextRpc) {
ResetStub();

EchoRequest send_request;
EchoRequest recv_request;
EchoResponse send_response;
EchoResponse recv_response;
Status recv_status;

ClientContext cli_ctx;
ServerContext srv_ctx;
grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);

send_request.set_message("Hello");
std::unique_ptr<ClientAsyncResponseReader<EchoResponse> >
response_reader(stub_->AsyncEcho(&cli_ctx, send_request,
&cli_cq_, tag(1)));

std::chrono::system_clock::time_point
time_now(std::chrono::system_clock::now()),
time_limit(std::chrono::system_clock::now()+std::chrono::seconds(5));
verify_timed_ok(&srv_cq_, -1, true, time_now, CompletionQueue::TIMEOUT);
verify_timed_ok(&cli_cq_, -1, true, time_now, CompletionQueue::TIMEOUT);

service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, &srv_cq_,
tag(2));

verify_timed_ok(&srv_cq_, 2, true, time_limit);
EXPECT_EQ(send_request.message(), recv_request.message());
verify_timed_ok(&cli_cq_, 1, true, time_limit);

send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(3));
verify_timed_ok(&srv_cq_, 3, true);

response_reader->Finish(&recv_response, &recv_status, tag(4));
verify_timed_ok(&cli_cq_, 4, true);

EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.IsOk());

}

// Two pings and a final pong.
TEST_F(AsyncEnd2endTest, SimpleClientStreaming) {
ResetStub();
Expand Down

0 comments on commit 700bdeb

Please sign in to comment.