Skip to content

Commit

Permalink
Expose new core functionality to Python
Browse files Browse the repository at this point in the history
  • Loading branch information
soltanmm committed Aug 12, 2015
1 parent b5ea2f8 commit be55ca7
Show file tree
Hide file tree
Showing 8 changed files with 196 additions and 20 deletions.
9 changes: 9 additions & 0 deletions src/python/grpcio/grpc/_adapter/_c/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ Call *pygrpc_Call_new_empty(CompletionQueue *cq);
void pygrpc_Call_dealloc(Call *self);
PyObject *pygrpc_Call_start_batch(Call *self, PyObject *args, PyObject *kwargs);
PyObject *pygrpc_Call_cancel(Call *self, PyObject *args, PyObject *kwargs);
PyObject *pygrpc_Call_peer(Call *self);
extern PyTypeObject pygrpc_Call_type;


Expand All @@ -129,6 +130,11 @@ Channel *pygrpc_Channel_new(
void pygrpc_Channel_dealloc(Channel *self);
Call *pygrpc_Channel_create_call(
Channel *self, PyObject *args, PyObject *kwargs);
PyObject *pygrpc_Channel_check_connectivity_state(Channel *self, PyObject *args,
PyObject *kwargs);
PyObject *pygrpc_Channel_watch_connectivity_state(Channel *self, PyObject *args,
PyObject *kwargs);
PyObject *pygrpc_Channel_target(Channel *self);
extern PyTypeObject pygrpc_Channel_type;


Expand Down Expand Up @@ -181,6 +187,9 @@ pygrpc_tag *pygrpc_produce_request_tag(PyObject *user_tag, Call *empty_call);
/* Construct a tag associated with a server shutdown. */
pygrpc_tag *pygrpc_produce_server_shutdown_tag(PyObject *user_tag);

/* Construct a tag associated with a channel state change. */
pygrpc_tag *pygrpc_produce_channel_state_change_tag(PyObject *user_tag);

/* Frees all resources owned by the tag and the tag itself. */
void pygrpc_discard_tag(pygrpc_tag *tag);

Expand Down
8 changes: 8 additions & 0 deletions src/python/grpcio/grpc/_adapter/_c/types/call.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
PyMethodDef pygrpc_Call_methods[] = {
{"start_batch", (PyCFunction)pygrpc_Call_start_batch, METH_KEYWORDS, ""},
{"cancel", (PyCFunction)pygrpc_Call_cancel, METH_KEYWORDS, ""},
{"peer", (PyCFunction)pygrpc_Call_peer, METH_NOARGS, ""},
{NULL}
};
const char pygrpc_Call_doc[] = "See grpc._adapter._types.Call.";
Expand Down Expand Up @@ -161,3 +162,10 @@ PyObject *pygrpc_Call_cancel(Call *self, PyObject *args, PyObject *kwargs) {
}
return PyInt_FromLong(errcode);
}

PyObject *pygrpc_Call_peer(Call *self) {
char *peer = grpc_call_get_peer(self->c_call);
PyObject *py_peer = PyString_FromString(peer);
gpr_free(peer);
return py_peer;
}
54 changes: 53 additions & 1 deletion src/python/grpcio/grpc/_adapter/_c/types/channel.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,14 @@
#define PY_SSIZE_T_CLEAN
#include <Python.h>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>


PyMethodDef pygrpc_Channel_methods[] = {
{"create_call", (PyCFunction)pygrpc_Channel_create_call, METH_KEYWORDS, ""},
{"check_connectivity_state", (PyCFunction)pygrpc_Channel_check_connectivity_state, METH_KEYWORDS, ""},
{"watch_connectivity_state", (PyCFunction)pygrpc_Channel_watch_connectivity_state, METH_KEYWORDS, ""},
{"target", (PyCFunction)pygrpc_Channel_target, METH_NOARGS, ""},
{NULL}
};
const char pygrpc_Channel_doc[] = "See grpc._adapter._types.Channel.";
Expand Down Expand Up @@ -122,7 +126,7 @@ Call *pygrpc_Channel_create_call(
const char *host;
double deadline;
char *keywords[] = {"cq", "method", "host", "deadline", NULL};
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O!ssd:create_call", keywords,
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O!szd:create_call", keywords,
&pygrpc_CompletionQueue_type, &cq, &method, &host, &deadline)) {
return NULL;
}
Expand All @@ -132,3 +136,51 @@ Call *pygrpc_Channel_create_call(
pygrpc_cast_double_to_gpr_timespec(deadline));
return call;
}

PyObject *pygrpc_Channel_check_connectivity_state(
Channel *self, PyObject *args, PyObject *kwargs) {
PyObject *py_try_to_connect;
int try_to_connect;
char *keywords[] = {"try_to_connect", NULL};
grpc_connectivity_state state;
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O:connectivity_state", keywords,
&py_try_to_connect)) {
return NULL;
}
if (!PyBool_Check(py_try_to_connect)) {
Py_XDECREF(py_try_to_connect);
return NULL;
}
try_to_connect = Py_True == py_try_to_connect;
Py_DECREF(py_try_to_connect);
state = grpc_channel_check_connectivity_state(self->c_chan, try_to_connect);
return PyInt_FromLong(state);
}

PyObject *pygrpc_Channel_watch_connectivity_state(
Channel *self, PyObject *args, PyObject *kwargs) {
PyObject *tag;
double deadline;
int last_observed_state;
CompletionQueue *completion_queue;
char *keywords[] = {"last_observed_state", "deadline",
"completion_queue", "tag"};
if (!PyArg_ParseTupleAndKeywords(
args, kwargs, "idO!O:watch_connectivity_state", keywords,
&last_observed_state, &deadline, &pygrpc_CompletionQueue_type,
&completion_queue, &tag)) {
return NULL;
}
grpc_channel_watch_connectivity_state(
self->c_chan, (grpc_connectivity_state)last_observed_state,
pygrpc_cast_double_to_gpr_timespec(deadline), completion_queue->c_cq,
pygrpc_produce_channel_state_change_tag(tag));
Py_RETURN_NONE;
}

PyObject *pygrpc_Channel_target(Channel *self) {
char *target = grpc_channel_get_target(self->c_chan);
PyObject *py_target = PyString_FromString(target);
gpr_free(target);
return py_target;
}
21 changes: 19 additions & 2 deletions src/python/grpcio/grpc/_adapter/_c/utility.c
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,19 @@ pygrpc_tag *pygrpc_produce_server_shutdown_tag(PyObject *user_tag) {
return tag;
}

pygrpc_tag *pygrpc_produce_channel_state_change_tag(PyObject *user_tag) {
pygrpc_tag *tag = gpr_malloc(sizeof(pygrpc_tag));
tag->user_tag = user_tag;
Py_XINCREF(tag->user_tag);
tag->call = NULL;
tag->ops = NULL;
tag->nops = 0;
grpc_call_details_init(&tag->request_call_details);
grpc_metadata_array_init(&tag->request_metadata);
tag->is_new_call = 0;
return tag;
}

void pygrpc_discard_tag(pygrpc_tag *tag) {
if (!tag) {
return;
Expand Down Expand Up @@ -139,7 +152,7 @@ PyObject *pygrpc_consume_event(grpc_event event) {
}

int pygrpc_produce_op(PyObject *op, grpc_op *result) {
static const int OP_TUPLE_SIZE = 5;
static const int OP_TUPLE_SIZE = 6;
static const int STATUS_TUPLE_SIZE = 2;
static const int TYPE_INDEX = 0;
static const int INITIAL_METADATA_INDEX = 1;
Expand All @@ -148,6 +161,7 @@ int pygrpc_produce_op(PyObject *op, grpc_op *result) {
static const int STATUS_INDEX = 4;
static const int STATUS_CODE_INDEX = 0;
static const int STATUS_DETAILS_INDEX = 1;
static const int WRITE_FLAGS_INDEX = 5;
int type;
Py_ssize_t message_size;
char *message;
Expand All @@ -170,7 +184,10 @@ int pygrpc_produce_op(PyObject *op, grpc_op *result) {
return 0;
}
c_op.op = type;
c_op.flags = 0;
c_op.flags = PyInt_AsLong(PyTuple_GET_ITEM(op, WRITE_FLAGS_INDEX));
if (PyErr_Occurred()) {
return 0;
}
switch (type) {
case GRPC_OP_SEND_INITIAL_METADATA:
if (!pygrpc_cast_pyseq_to_send_metadata(
Expand Down
2 changes: 1 addition & 1 deletion src/python/grpcio/grpc/_adapter/_intermediary_low.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def invoke(self, completion_queue, metadata_tag, finish_tag):

def write(self, message, tag):
return self._internal.start_batch([
_types.OpArgs.send_message(message)
_types.OpArgs.send_message(message, 0)
], _TagAdapter(tag, Event.Kind.WRITE_ACCEPTED))

def complete(self, tag):
Expand Down
14 changes: 14 additions & 0 deletions src/python/grpcio/grpc/_adapter/_low.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ def cancel(self, code=None, details=None):
else:
return self.call.cancel(code, details)

def peer(self):
return self.call.peer()


class Channel(_types.Channel):

Expand All @@ -84,6 +87,17 @@ def __init__(self, target, args, creds=None):
def create_call(self, completion_queue, method, host, deadline=None):
return Call(self.channel.create_call(completion_queue.completion_queue, method, host, deadline))

def check_connectivity_state(self, try_to_connect):
return self.channel.check_connectivity_state(try_to_connect)

def watch_connectivity_state(self, last_observed_state, deadline,
completion_queue, tag):
self.channel.watch_connectivity_state(
last_observed_state, deadline, completion_queue.completion_queue, tag)

def target(self):
return self.channel.target()


_NO_TAG = object()

Expand Down
Loading

0 comments on commit be55ca7

Please sign in to comment.