Skip to content

Commit

Permalink
Merge pull request grpc#2818 from ctiller/y12kdm3
Browse files Browse the repository at this point in the history
Add a test of non-blocking API behavior
  • Loading branch information
vjpai committed Aug 24, 2015
2 parents e180766 + b7e22b9 commit d2ea031
Show file tree
Hide file tree
Showing 28 changed files with 349 additions and 165 deletions.
3 changes: 1 addition & 2 deletions include/grpc++/impl/call.h
Original file line number Diff line number Diff line change
Expand Up @@ -541,8 +541,7 @@ class CallOpSet : public CallOpSetInterface,
template <class Op1 = CallNoOp<1>, class Op2 = CallNoOp<2>,
class Op3 = CallNoOp<3>, class Op4 = CallNoOp<4>,
class Op5 = CallNoOp<5>, class Op6 = CallNoOp<6>>
class SneakyCallOpSet GRPC_FINAL
: public CallOpSet<Op1, Op2, Op3, Op4, Op5, Op6> {
class SneakyCallOpSet : public CallOpSet<Op1, Op2, Op3, Op4, Op5, Op6> {
public:
bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
typedef CallOpSet<Op1, Op2, Op3, Op4, Op5, Op6> Base;
Expand Down
16 changes: 11 additions & 5 deletions include/grpc++/impl/rpc_service_method.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,13 +211,19 @@ class BidiStreamingHandler : public MethodHandler {
// Handle unknown method by returning UNIMPLEMENTED error.
class UnknownMethodHandler : public MethodHandler {
public:
void RunHandler(const HandlerParameter& param) GRPC_FINAL {
template <class T>
static void FillOps(ServerContext* context, T* ops) {
Status status(StatusCode::UNIMPLEMENTED, "");
CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops;
if (!param.server_context->sent_initial_metadata_) {
ops.SendInitialMetadata(param.server_context->initial_metadata_);
if (!context->sent_initial_metadata_) {
ops->SendInitialMetadata(context->initial_metadata_);
context->sent_initial_metadata_ = true;
}
ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
ops->ServerSendStatus(context->trailing_metadata_, status);
}

void RunHandler(const HandlerParameter& param) GRPC_FINAL {
CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops;
FillOps(param.server_context, &ops);
param.call->PerformOps(&ops);
param.call->cq()->Pluck(&ops);
}
Expand Down
17 changes: 12 additions & 5 deletions include/grpc++/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class Server GRPC_FINAL : public GrpcLibrary, private CallHook {
// Add a listening port. Can be called multiple times.
int AddListeningPort(const grpc::string& addr, ServerCredentials* creds);
// Start the server.
bool Start();
bool Start(ServerCompletionQueue** cqs, size_t num_cqs);

void HandleQueueClosed();
void RunRpc();
Expand All @@ -112,7 +112,8 @@ class Server GRPC_FINAL : public GrpcLibrary, private CallHook {
public:
BaseAsyncRequest(Server* server, ServerContext* context,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq, void* tag);
CompletionQueue* call_cq, void* tag,
bool delete_on_finalize);
virtual ~BaseAsyncRequest();

bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE;
Expand All @@ -123,6 +124,7 @@ class Server GRPC_FINAL : public GrpcLibrary, private CallHook {
ServerAsyncStreamingInterface* const stream_;
CompletionQueue* const call_cq_;
void* const tag_;
const bool delete_on_finalize_;
grpc_call* call_;
grpc_metadata_array initial_metadata_array_;
};
Expand Down Expand Up @@ -184,19 +186,24 @@ class Server GRPC_FINAL : public GrpcLibrary, private CallHook {
Message* const request_;
};

class GenericAsyncRequest GRPC_FINAL : public BaseAsyncRequest {
class GenericAsyncRequest : public BaseAsyncRequest {
public:
GenericAsyncRequest(Server* server, GenericServerContext* context,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag);
ServerCompletionQueue* notification_cq, void* tag,
bool delete_on_finalize);

bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE;

private:
grpc_call_details call_details_;
};

class UnimplementedAsyncRequestContext;
class UnimplementedAsyncRequest;
class UnimplementedAsyncResponse;

template <class Message>
void RequestAsyncCall(void* registered_method, ServerContext* context,
ServerAsyncStreamingInterface* stream,
Expand All @@ -221,7 +228,7 @@ class Server GRPC_FINAL : public GrpcLibrary, private CallHook {
ServerCompletionQueue* notification_cq,
void* tag) {
new GenericAsyncRequest(this, context, stream, call_cq, notification_cq,
tag);
tag, true);
}

const int max_message_size_;
Expand Down
4 changes: 2 additions & 2 deletions include/grpc++/server_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ class ServerBuilder {
void SetThreadPool(ThreadPoolInterface* thread_pool);

// Add a completion queue for handling asynchronous services
// Caller is required to keep this completion queue live until calling
// BuildAndStart()
// Caller is required to keep this completion queue live until
// the server is destroyed.
std::unique_ptr<ServerCompletionQueue> AddCompletionQueue();

// Return a running server which is ready for processing rpcs.
Expand Down
2 changes: 2 additions & 0 deletions include/grpc++/stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -761,6 +761,8 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
}

private:
friend class ::grpc::Server;

void BindCall(Call* call) GRPC_OVERRIDE { call_ = *call; }

Call call_;
Expand Down
7 changes: 3 additions & 4 deletions src/core/iomgr/pollset.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,9 @@ void grpc_pollset_destroy(grpc_pollset *pollset);
grpc_pollset_work, and it is guaranteed that GRPC_POLLSET_MU(pollset) will
not be released by grpc_pollset_work AFTER worker has been destroyed.
Returns true if some work has been done, and false if the deadline
expired. */
int grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker,
gpr_timespec deadline);
Tries not to block past deadline. */
void grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker,
gpr_timespec now, gpr_timespec deadline);

/* Break one polling thread out of polling work for this pollset.
If specific_worker is GRPC_POLLSET_KICK_BROADCAST, kick ALL the workers.
Expand Down
2 changes: 1 addition & 1 deletion src/core/iomgr/pollset_multipoller_with_epoll.c
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ static void multipoll_with_epoll_pollset_maybe_work(
pfds[1].events = POLLIN;
pfds[1].revents = 0;

poll_rv = poll(pfds, 2, timeout_ms);
poll_rv = grpc_poll_function(pfds, 2, timeout_ms);

if (poll_rv < 0) {
if (errno != EINTR) {
Expand Down
2 changes: 1 addition & 1 deletion src/core/iomgr/pollset_multipoller_with_poll_posix.c
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ static void multipoll_with_poll_pollset_maybe_work(
POLLOUT, &watchers[i]);
}

r = poll(pfds, pfd_count, timeout);
r = grpc_poll_function(pfds, pfd_count, timeout);

for (i = 1; i < pfd_count; i++) {
grpc_fd_end_poll(&watchers[i], pfds[i].revents & POLLIN,
Expand Down
15 changes: 6 additions & 9 deletions src/core/iomgr/pollset_posix.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
#include "src/core/iomgr/pollset_posix.h"

#include <errno.h>
#include <poll.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
Expand All @@ -57,6 +56,8 @@
GPR_TLS_DECL(g_current_thread_poller);
GPR_TLS_DECL(g_current_thread_worker);

grpc_poll_function_type grpc_poll_function = poll;

static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
worker->prev->next = worker->next;
worker->next->prev = worker->prev;
Expand Down Expand Up @@ -89,6 +90,7 @@ static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
}

void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) {
/* pollset->mu already held */
if (specific_worker != NULL) {
if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) {
for (specific_worker = p->root_worker.next;
Expand Down Expand Up @@ -168,14 +170,10 @@ static void finish_shutdown(grpc_pollset *pollset) {
pollset->shutdown_done_cb(pollset->shutdown_done_arg);
}

int grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker,
gpr_timespec deadline) {
void grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker,
gpr_timespec now, gpr_timespec deadline) {
/* pollset->mu already held */
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
int added_worker = 0;
if (gpr_time_cmp(now, deadline) > 0) {
return 0;
}
/* this must happen before we (potentially) drop pollset->mu */
worker->next = worker->prev = NULL;
/* TODO(ctiller): pool these */
Expand Down Expand Up @@ -217,7 +215,6 @@ int grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker,
gpr_mu_lock(&pollset->mu);
}
}
return 1;
}

void grpc_pollset_shutdown(grpc_pollset *pollset,
Expand Down Expand Up @@ -456,7 +453,7 @@ static void basic_pollset_maybe_work(grpc_pollset *pollset,

/* poll fd count (argument 2) is shortened by one if we have no events
to poll on - such that it only includes the kicker */
r = poll(pfd, nfds, timeout);
r = grpc_poll_function(pfd, nfds, timeout);
GRPC_TIMER_MARK(GRPC_PTAG_POLL_FINISHED, r);

if (fd) {
Expand Down
6 changes: 6 additions & 0 deletions src/core/iomgr/pollset_posix.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
#ifndef GRPC_INTERNAL_CORE_IOMGR_POLLSET_POSIX_H
#define GRPC_INTERNAL_CORE_IOMGR_POLLSET_POSIX_H

#include <poll.h>

#include <grpc/support/sync.h>
#include "src/core/iomgr/wakeup_fd_posix.h"

Expand Down Expand Up @@ -118,4 +120,8 @@ void grpc_poll_become_multipoller(grpc_pollset *pollset, struct grpc_fd **fds,
* be locked) */
int grpc_pollset_has_workers(grpc_pollset *pollset);

/* override to allow tests to hook poll() usage */
typedef int (*grpc_poll_function_type)(struct pollfd *, nfds_t, int);
extern grpc_poll_function_type grpc_poll_function;

#endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_POSIX_H */
10 changes: 2 additions & 8 deletions src/core/iomgr/pollset_windows.c
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,9 @@ void grpc_pollset_destroy(grpc_pollset *pollset) {
gpr_mu_destroy(&pollset->mu);
}

int grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker,
gpr_timespec deadline) {
gpr_timespec now;
void grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker,
gpr_timespec now, gpr_timespec deadline) {
int added_worker = 0;
now = gpr_now(GPR_CLOCK_MONOTONIC);
if (gpr_time_cmp(now, deadline) > 0) {
return 0 /* GPR_FALSE */;
}
worker->next = worker->prev = NULL;
gpr_cv_init(&worker->cv);
if (grpc_maybe_call_delayed_callbacks(&pollset->mu, 1 /* GPR_TRUE */)) {
Expand All @@ -127,7 +122,6 @@ int grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker,
if (added_worker) {
remove_worker(pollset, worker);
}
return 1 /* GPR_TRUE */;
}

void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) {
Expand Down
2 changes: 1 addition & 1 deletion src/core/security/google_default_credentials.c
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ static int is_stack_running_on_compute_engine(void) {
gpr_mu_lock(GRPC_POLLSET_MU(&detector.pollset));
while (!detector.is_done) {
grpc_pollset_worker worker;
grpc_pollset_work(&detector.pollset, &worker,
grpc_pollset_work(&detector.pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
gpr_inf_future(GPR_CLOCK_MONOTONIC));
}
gpr_mu_unlock(GRPC_POLLSET_MU(&detector.pollset));
Expand Down
16 changes: 14 additions & 2 deletions src/core/surface/completion_queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,9 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
gpr_timespec deadline, void *reserved) {
grpc_event ret;
grpc_pollset_worker worker;
int first_loop = 1;
gpr_timespec now;

GPR_ASSERT(!reserved);

deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
Expand All @@ -196,12 +199,15 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
ret.type = GRPC_QUEUE_SHUTDOWN;
break;
}
if (!grpc_pollset_work(&cc->pollset, &worker, deadline)) {
now = gpr_now(GPR_CLOCK_MONOTONIC);
if (!first_loop && gpr_time_cmp(now, deadline) >= 0) {
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT;
break;
}
first_loop = 0;
grpc_pollset_work(&cc->pollset, &worker, now, deadline);
}
GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
GRPC_CQ_INTERNAL_UNREF(cc, "next");
Expand Down Expand Up @@ -239,6 +245,9 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
grpc_cq_completion *c;
grpc_cq_completion *prev;
grpc_pollset_worker worker;
gpr_timespec now;
int first_loop = 1;

GPR_ASSERT(!reserved);

deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
Expand Down Expand Up @@ -281,13 +290,16 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
ret.type = GRPC_QUEUE_TIMEOUT;
break;
}
if (!grpc_pollset_work(&cc->pollset, &worker, deadline)) {
now = gpr_now(GPR_CLOCK_MONOTONIC);
if (!first_loop && gpr_time_cmp(now, deadline) >= 0) {
del_plucker(cc, tag, &worker);
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT;
break;
}
first_loop = 0;
grpc_pollset_work(&cc->pollset, &worker, now, deadline);
del_plucker(cc, tag, &worker);
}
done:
Expand Down
Loading

0 comments on commit d2ea031

Please sign in to comment.