Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into add_udp_server_2
Browse files Browse the repository at this point in the history
  • Loading branch information
rjshade committed Aug 13, 2015
2 parents 00436a2 + c49a48b commit 5c45b94
Show file tree
Hide file tree
Showing 106 changed files with 1,155 additions and 563 deletions.
2 changes: 2 additions & 0 deletions include/grpc++/completion_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ template <class ServiceType, class RequestType, class ResponseType>
class ServerStreamingHandler;
template <class ServiceType, class RequestType, class ResponseType>
class BidiStreamingHandler;
class UnknownMethodHandler;

class ChannelInterface;
class ClientContext;
Expand Down Expand Up @@ -138,6 +139,7 @@ class CompletionQueue : public GrpcLibrary {
friend class ServerStreamingHandler;
template <class ServiceType, class RequestType, class ResponseType>
friend class BidiStreamingHandler;
friend class UnknownMethodHandler;
friend class ::grpc::Server;
friend class ::grpc::ServerContext;
template <class InputMessage, class OutputMessage>
Expand Down
8 changes: 8 additions & 0 deletions include/grpc++/impl/call.h
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ class CallOpSendInitialMetadata {
grpc_op* op = &ops[(*nops)++];
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->flags = 0;
op->reserved = NULL;
op->data.send_initial_metadata.count = initial_metadata_count_;
op->data.send_initial_metadata.metadata = initial_metadata_;
}
Expand Down Expand Up @@ -206,6 +207,7 @@ class CallOpSendMessage {
grpc_op* op = &ops[(*nops)++];
op->op = GRPC_OP_SEND_MESSAGE;
op->flags = write_options_.flags();
op->reserved = NULL;
op->data.send_message = send_buf_;
// Flags are per-message: clear them after use.
write_options_.Clear();
Expand Down Expand Up @@ -248,6 +250,7 @@ class CallOpRecvMessage {
grpc_op* op = &ops[(*nops)++];
op->op = GRPC_OP_RECV_MESSAGE;
op->flags = 0;
op->reserved = NULL;
op->data.recv_message = &recv_buf_;
}

Expand Down Expand Up @@ -313,6 +316,7 @@ class CallOpGenericRecvMessage {
grpc_op* op = &ops[(*nops)++];
op->op = GRPC_OP_RECV_MESSAGE;
op->flags = 0;
op->reserved = NULL;
op->data.recv_message = &recv_buf_;
}

Expand Down Expand Up @@ -350,6 +354,7 @@ class CallOpClientSendClose {
grpc_op* op = &ops[(*nops)++];
op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
op->flags = 0;
op->reserved = NULL;
}
void FinishOp(bool* status, int max_message_size) { send_ = false; }

Expand Down Expand Up @@ -383,6 +388,7 @@ class CallOpServerSendStatus {
op->data.send_status_from_server.status_details =
send_status_details_.empty() ? nullptr : send_status_details_.c_str();
op->flags = 0;
op->reserved = NULL;
}

void FinishOp(bool* status, int max_message_size) {
Expand Down Expand Up @@ -416,6 +422,7 @@ class CallOpRecvInitialMetadata {
op->op = GRPC_OP_RECV_INITIAL_METADATA;
op->data.recv_initial_metadata = &recv_initial_metadata_arr_;
op->flags = 0;
op->reserved = NULL;
}
void FinishOp(bool* status, int max_message_size) {
if (recv_initial_metadata_ == nullptr) return;
Expand Down Expand Up @@ -453,6 +460,7 @@ class CallOpClientRecvStatus {
op->data.recv_status_on_client.status_details_capacity =
&status_details_capacity_;
op->flags = 0;
op->reserved = NULL;
}

void FinishOp(bool* status, int max_message_size) {
Expand Down
15 changes: 15 additions & 0 deletions include/grpc++/impl/rpc_service_method.h
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,21 @@ class BidiStreamingHandler : public MethodHandler {
ServiceType* service_;
};

// Handle unknown method by returning UNIMPLEMENTED error.
class UnknownMethodHandler : public MethodHandler {
public:
void RunHandler(const HandlerParameter& param) GRPC_FINAL {
Status status(StatusCode::UNIMPLEMENTED, "");
CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops;
if (!param.server_context->sent_initial_metadata_) {
ops.SendInitialMetadata(param.server_context->initial_metadata_);
}
ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
param.call->PerformOps(&ops);
param.call->cq()->Pluck(&ops);
}
};

// Server side rpc method class
class RpcServiceMethod : public RpcMethod {
public:
Expand Down
2 changes: 2 additions & 0 deletions include/grpc++/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,8 @@ class Server GRPC_FINAL : public GrpcLibrary, private CallHook {
grpc::condition_variable callback_cv_;

std::list<SyncRequest>* sync_methods_;
std::unique_ptr<RpcServiceMethod> unknown_method_;
bool has_generic_service_;

// Pointer to the c grpc server.
grpc_server* const server_;
Expand Down
2 changes: 2 additions & 0 deletions include/grpc++/server_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ template <class ServiceType, class RequestType, class ResponseType>
class ServerStreamingHandler;
template <class ServiceType, class RequestType, class ResponseType>
class BidiStreamingHandler;
class UnknownMethodHandler;

class Call;
class CallOpBuffer;
Expand Down Expand Up @@ -159,6 +160,7 @@ class ServerContext {
friend class ServerStreamingHandler;
template <class ServiceType, class RequestType, class ResponseType>
friend class BidiStreamingHandler;
friend class UnknownMethodHandler;
friend class ::grpc::ClientContext;

// Prevent copying.
Expand Down
4 changes: 4 additions & 0 deletions include/grpc/byte_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,12 @@ typedef enum {
} grpc_byte_buffer_type;

struct grpc_byte_buffer {
void *reserved;
grpc_byte_buffer_type type;
union {
struct {
void *reserved[8];
} reserved;
struct {
grpc_compression_algorithm compression;
gpr_slice_buffer slice_buffer;
Expand Down
38 changes: 25 additions & 13 deletions include/grpc/grpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -202,13 +202,14 @@ typedef struct grpc_metadata {
const char *key;
const char *value;
size_t value_length;
gpr_uint32 flags;

/** The following fields are reserved for grpc internal use.
There is no need to initialize them, and they will be set to garbage
during
calls to grpc. */
struct {
void *obfuscated[3];
void *obfuscated[4];
} internal_data;
} grpc_metadata;

Expand Down Expand Up @@ -251,6 +252,7 @@ typedef struct {
char *host;
size_t host_capacity;
gpr_timespec deadline;
void *reserved;
} grpc_call_details;

void grpc_call_details_init(grpc_call_details *details);
Expand Down Expand Up @@ -306,7 +308,13 @@ typedef struct grpc_op {
grpc_op_type op;
/** Write flags bitset for grpc_begin_messages */
gpr_uint32 flags;
/** Reserved for future usage */
void *reserved;
union {
/** Reserved for future usage */
struct {
void *reserved[8];
} reserved;
struct {
size_t count;
grpc_metadata *metadata;
Expand Down Expand Up @@ -408,7 +416,7 @@ void grpc_shutdown(void);
const char *grpc_version_string(void);

/** Create a completion queue */
grpc_completion_queue *grpc_completion_queue_create(void);
grpc_completion_queue *grpc_completion_queue_create(void *reserved);

/** Blocks until an event is available, the completion queue is being shut down,
or deadline is reached.
Expand All @@ -419,7 +427,7 @@ grpc_completion_queue *grpc_completion_queue_create(void);
Callers must not call grpc_completion_queue_next and
grpc_completion_queue_pluck simultaneously on the same completion queue. */
grpc_event grpc_completion_queue_next(grpc_completion_queue *cq,
gpr_timespec deadline);
gpr_timespec deadline, void *reserved);

/** Blocks until an event with tag 'tag' is available, the completion queue is
being shutdown or deadline is reached.
Expand All @@ -433,7 +441,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cq,
Completion queues support a maximum of GRPC_MAX_COMPLETION_QUEUE_PLUCKERS
concurrently executing plucks at any time. */
grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cq, void *tag,
gpr_timespec deadline);
gpr_timespec deadline, void *reserved);

/** Maximum number of outstanding grpc_completion_queue_pluck executions per
completion queue */
Expand Down Expand Up @@ -476,17 +484,17 @@ grpc_call *grpc_channel_create_call(grpc_channel *channel,
gpr_uint32 propagation_mask,
grpc_completion_queue *completion_queue,
const char *method, const char *host,
gpr_timespec deadline);
gpr_timespec deadline, void *reserved);

/** Pre-register a method/host pair on a channel. */
void *grpc_channel_register_call(grpc_channel *channel, const char *method,
const char *host);
const char *host, void *reserved);

/** Create a call given a handle returned from grpc_channel_register_call */
grpc_call *grpc_channel_create_registered_call(
grpc_channel *channel, grpc_call *parent_call, gpr_uint32 propagation_mask,
grpc_completion_queue *completion_queue, void *registered_call_handle,
gpr_timespec deadline);
gpr_timespec deadline, void *reserved);

/** Start a batch of operations defined in the array ops; when complete, post a
completion of type 'tag' to the completion queue bound to the call.
Expand All @@ -500,7 +508,7 @@ grpc_call *grpc_channel_create_registered_call(
containing just send operations independently from batches containing just
receive operations. */
grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
size_t nops, void *tag);
size_t nops, void *tag, void *reserved);

/** Returns a newly allocated string representing the endpoint to which this
call is communicating with. The string is in the uri format accepted by
Expand Down Expand Up @@ -532,7 +540,8 @@ char *grpc_channel_get_target(grpc_channel *channel);
more on this. The data in 'args' need only live through the invocation of
this function. */
grpc_channel *grpc_insecure_channel_create(const char *target,
const grpc_channel_args *args);
const grpc_channel_args *args,
void *reserved);

/** Create a lame client: this client fails every operation attempted on it. */
grpc_channel *grpc_lame_client_channel_create(const char *target);
Expand All @@ -551,7 +560,7 @@ void grpc_channel_destroy(grpc_channel *channel);
THREAD-SAFETY grpc_call_cancel and grpc_call_cancel_with_status
are thread-safe, and can be called at any point before grpc_call_destroy
is called.*/
grpc_call_error grpc_call_cancel(grpc_call *call);
grpc_call_error grpc_call_cancel(grpc_call *call, void *reserved);

/** Called by clients to cancel an RPC on the server.
Can be called multiple times, from any thread.
Expand All @@ -561,7 +570,8 @@ grpc_call_error grpc_call_cancel(grpc_call *call);
remote endpoint. */
grpc_call_error grpc_call_cancel_with_status(grpc_call *call,
grpc_status_code status,
const char *description);
const char *description,
void *reserved);

/** Destroy a call.
THREAD SAFETY: grpc_call_destroy is thread-compatible */
Expand Down Expand Up @@ -600,14 +610,16 @@ grpc_call_error grpc_server_request_registered_call(
be specified with args. If no additional configuration is needed, args can
be NULL. See grpc_channel_args for more. The data in 'args' need only live
through the invocation of this function. */
grpc_server *grpc_server_create(const grpc_channel_args *args);
grpc_server *grpc_server_create(const grpc_channel_args *args,
void *reserved);

/** Register a completion queue with the server. Must be done for any
notification completion queue that is passed to grpc_server_request_*_call
and to grpc_server_shutdown_and_notify. Must be performed prior to
grpc_server_start. */
void grpc_server_register_completion_queue(grpc_server *server,
grpc_completion_queue *cq);
grpc_completion_queue *cq,
void *reserved);

/** Add a HTTP2 over plaintext over tcp listener.
Returns bound port number on success, 0 on failure.
Expand Down
17 changes: 11 additions & 6 deletions src/core/surface/call.c
Original file line number Diff line number Diff line change
Expand Up @@ -964,7 +964,7 @@ static void call_on_done_recv(void *pc, int success) {
next_child_call = child_call->sibling_next;
if (child_call->cancellation_is_inherited) {
GRPC_CALL_INTERNAL_REF(child_call, "propagate_cancel");
grpc_call_cancel(child_call);
grpc_call_cancel(child_call, NULL);
GRPC_CALL_INTERNAL_UNREF(child_call, "propagate_cancel", 0);
}
child_call = next_child_call;
Expand Down Expand Up @@ -1265,18 +1265,22 @@ void grpc_call_destroy(grpc_call *c) {
c->cancel_alarm |= c->have_alarm;
cancel = c->read_state != READ_STATE_STREAM_CLOSED;
unlock(c);
if (cancel) grpc_call_cancel(c);
if (cancel) grpc_call_cancel(c, NULL);
GRPC_CALL_INTERNAL_UNREF(c, "destroy", 1);
}

grpc_call_error grpc_call_cancel(grpc_call *call) {
return grpc_call_cancel_with_status(call, GRPC_STATUS_CANCELLED, "Cancelled");
grpc_call_error grpc_call_cancel(grpc_call *call, void *reserved) {
GPR_ASSERT(!reserved);
return grpc_call_cancel_with_status(call, GRPC_STATUS_CANCELLED, "Cancelled",
NULL);
}

grpc_call_error grpc_call_cancel_with_status(grpc_call *c,
grpc_status_code status,
const char *description) {
const char *description,
void *reserved) {
grpc_call_error r;
(void) reserved;
lock(c);
r = cancel_with_status(c, status, description);
unlock(c);
Expand Down Expand Up @@ -1513,13 +1517,14 @@ static int are_write_flags_valid(gpr_uint32 flags) {
}

grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
size_t nops, void *tag) {
size_t nops, void *tag, void *reserved) {
grpc_ioreq reqs[GRPC_IOREQ_OP_COUNT];
size_t in;
size_t out;
const grpc_op *op;
grpc_ioreq *req;
void (*finish_func)(grpc_call *, int, void *) = finish_batch;
GPR_ASSERT(!reserved);

GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, tag);

Expand Down
9 changes: 6 additions & 3 deletions src/core/surface/channel.c
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,8 @@ grpc_call *grpc_channel_create_call(grpc_channel *channel,
gpr_uint32 propagation_mask,
grpc_completion_queue *cq,
const char *method, const char *host,
gpr_timespec deadline) {
gpr_timespec deadline, void *reserved) {
GPR_ASSERT(!reserved);
return grpc_channel_create_call_internal(
channel, parent_call, propagation_mask, cq,
grpc_mdelem_from_metadata_strings(
Expand All @@ -182,8 +183,9 @@ grpc_call *grpc_channel_create_call(grpc_channel *channel,
}

void *grpc_channel_register_call(grpc_channel *channel, const char *method,
const char *host) {
const char *host, void *reserved) {
registered_call *rc = gpr_malloc(sizeof(registered_call));
GPR_ASSERT(!reserved);
rc->path = grpc_mdelem_from_metadata_strings(
channel->metadata_context, GRPC_MDSTR_REF(channel->path_string),
grpc_mdstr_from_string(channel->metadata_context, method, 0));
Expand All @@ -200,8 +202,9 @@ void *grpc_channel_register_call(grpc_channel *channel, const char *method,
grpc_call *grpc_channel_create_registered_call(
grpc_channel *channel, grpc_call *parent_call, gpr_uint32 propagation_mask,
grpc_completion_queue *completion_queue, void *registered_call_handle,
gpr_timespec deadline) {
gpr_timespec deadline, void *reserved) {
registered_call *rc = registered_call_handle;
GPR_ASSERT(!reserved);
return grpc_channel_create_call_internal(
channel, parent_call, propagation_mask, completion_queue,
GRPC_MDELEM_REF(rc->path),
Expand Down
4 changes: 3 additions & 1 deletion src/core/surface/channel_create.c
Original file line number Diff line number Diff line change
Expand Up @@ -155,14 +155,16 @@ static const grpc_subchannel_factory_vtable subchannel_factory_vtable = {
- connect to it (trying alternatives as presented)
- perform handshakes */
grpc_channel *grpc_insecure_channel_create(const char *target,
const grpc_channel_args *args) {
const grpc_channel_args *args,
void *reserved) {
grpc_channel *channel = NULL;
#define MAX_FILTERS 3
const grpc_channel_filter *filters[MAX_FILTERS];
grpc_resolver *resolver;
subchannel_factory *f;
grpc_mdctx *mdctx = grpc_mdctx_create();
int n = 0;
GPR_ASSERT(!reserved);
/* TODO(census)
if (grpc_channel_args_is_census_enabled(args)) {
filters[n++] = &grpc_client_census_filter;
Expand Down
Loading

0 comments on commit 5c45b94

Please sign in to comment.