Skip to content
This repository has been archived by the owner on Feb 26, 2020. It is now read-only.

Commit

Permalink
Pipelined write support
Browse files Browse the repository at this point in the history
  • Loading branch information
rbranson committed Nov 29, 2012
1 parent ad77f6d commit 51ba637
Show file tree
Hide file tree
Showing 4 changed files with 284 additions and 21 deletions.
84 changes: 83 additions & 1 deletion lib/Client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ Client::Client (SOCKETDESC *sockdesc)
{
m_sock = sockdesc;
m_error = "Unspecified error";
m_pipeline = 0;
}

Client::~Client (void)
Expand All @@ -53,7 +54,6 @@ Client::~Client (void)

void Client::setError(const char *message)
{
assert (m_error == NULL);
m_error = message;
}

Expand Down Expand Up @@ -187,6 +187,12 @@ bool Client::cas(const char *key, size_t cbKey, UINT64 casUnique, void *data, si
m_writer.writeChars(data, cbData);
m_writer.writeChars("\r\n", 2);

if (m_pipeline)
{
m_pipeline += 1;
return true;
}

if (!sendWriteBuffer())
{
return false;
Expand Down Expand Up @@ -235,6 +241,12 @@ bool Client::command(const char *cmd, size_t cbCmd, const char *key, size_t cbKe
m_writer.writeChars(data, cbData);
m_writer.writeChars("\r\n", 2);

if (m_pipeline)
{
m_pipeline += 1;
return true;
}

if (!sendWriteBuffer())
{
PRINTMARK();
Expand Down Expand Up @@ -298,6 +310,12 @@ bool Client::del(const char *key, size_t cbKey, time_t *expiration, bool async)
}
m_writer.writeChars("\r\n", 2);

if (m_pipeline)
{
m_pipeline += 1;
return true;
}

if (!sendWriteBuffer())
{
return false;
Expand Down Expand Up @@ -330,6 +348,12 @@ bool Client::incr(const char *key, size_t cbKey, UINT64 increment, bool async)
}
m_writer.writeChars("\r\n", 2);

if (m_pipeline)
{
m_pipeline += 1;
return true;
}

if (!sendWriteBuffer())
{
return false;
Expand Down Expand Up @@ -361,6 +385,12 @@ bool Client::decr(const char *key, size_t cbKey, UINT64 decrement, bool async)
}
m_writer.writeChars("\r\n", 2);

if (m_pipeline)
{
m_pipeline += 1;
return true;
}

if (!sendWriteBuffer())
{
return false;
Expand Down Expand Up @@ -407,6 +437,58 @@ bool Client::getFlush(void)
return true;
}

bool Client::pipelineBegin(void)
{
if (m_pipeline)
{
return false;
}

m_pipeline = 1;
m_writer.reset();
return true;
}

void Client::pipelineReset(void)
{
m_pipeline = 0;
m_writer.reset();
}

bool Client::pipelineAbort(void)
{
pipelineReset();
return true;
}

bool Client::pipelineFlush(void)
{
if (!m_pipeline || !sendWriteBuffer())
{
pipelineReset();
return false;
}

return true;
}

bool Client::getNextPipelineResult(char **pData, size_t *cbSize)
{
if (m_pipeline <= 1 || !readLine())
{
pipelineReset();
return false;
}

m_pipeline -= 1;
return getResult(pData, cbSize);
}

bool Client::isPipelined(void)
{
return m_pipeline > 0;
}

bool Client::version(char **pVersion, size_t *cbVersion)
{
m_writer.writeChars("version\r\n", 9);
Expand Down
11 changes: 9 additions & 2 deletions lib/Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ class Client
bool getFlush(void);
bool getReadNext(char **key, size_t *cbKey, char **data, size_t *cbData, int *flags, UINT64 *cas, bool *bError);

bool pipelineBegin(void);
bool pipelineFlush(void);
bool pipelineAbort(void);
bool getNextPipelineResult(char **pData, size_t *cbSize);
bool isPipelined(void);

bool set(const char *key, size_t cbKey, void *data, size_t cbData, time_t expiration, int flags, bool async, size_t maxSize);
bool del(const char *key, size_t cbKey, time_t *expiration, bool async);
Expand All @@ -89,12 +94,13 @@ class Client
bool flushAll(time_t *expiration, bool async);
bool getResult(char **pData, size_t *cbSize);
const char *getError(void);
bool sendWriteBuffer(void);

private:
bool command(const char *cmd, size_t cbCmd, const char *key, size_t cbKey, void *data, size_t cbData, time_t expiration, int flags, bool async, size_t maxSize);
bool sendWriteBuffer(void);
bool readLine(void);


void pipelineReset(void);
void setError(const char *message);

private:
Expand All @@ -103,4 +109,5 @@ class Client
PacketReader m_reader;

const char *m_error;
int m_pipeline;
};
72 changes: 54 additions & 18 deletions python/umemcache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ int API_connect(SOCKETDESC *desc, const char *address, int port)

PRINTMARK();

//PyTuple_SET_ITEM doesn't increment ref counter
//PyTuple_SET_ITEM doesn't increment ref counter
//Py_DECREF(PyTuple_GET_ITEM(args, 1));
Py_DECREF(args);
Py_DECREF(method);
Expand Down Expand Up @@ -354,7 +354,7 @@ PyObject *Client_command(PyClient *self, PFN_COMMAND cmd, PyObject *args)

bool bAsync = async ? true : false;

if (!(self->client->*cmd)(pKey, cbKey, pData, cbData, expire, flags, async ? true : false, self->maxSize))
if (!(self->client->*cmd)(pKey, cbKey, pData, cbData, expire, flags, bAsync, self->maxSize))
{
if (!PyErr_Occurred())
{
Expand All @@ -364,7 +364,7 @@ PyObject *Client_command(PyClient *self, PFN_COMMAND cmd, PyObject *args)
return NULL;
}

if (!async)
if (!self->client->isPipelined() && !async)
{
if (self->client->getResult(&pResult, &cbResult))
{
Expand Down Expand Up @@ -694,14 +694,51 @@ PyObject *Client_gets_multi(PyClient *self, PyObject *okeys)
return odict;
}

PyObject *Client_begin_pipeline(PyClient *self, PyObject *args)
{
if (!self->client->pipelineBegin())
{
return PyErr_Format(PyExc_RuntimeError, "pipeline already started");
}

Py_RETURN_NONE;
}

PyObject *Client_abort_pipeline(PyClient *self, PyObject *args)
{
self->client->pipelineAbort();
Py_RETURN_NONE;
}

PyObject *Client_finish_pipeline(PyClient *self, PyObject *args)
{
char *pResult;
size_t cbResult;

if (!self->client->pipelineFlush())
{
return PyErr_Format(PyExc_RuntimeError, "error flushing pipeline");
}

PyObject *oresults = PyList_New(0);

while (self->client->getNextPipelineResult(&pResult, &cbResult))
{
PyObject *oresult = PyString_FromStringAndSize(pResult, cbResult);
PyList_Append(oresults, oresult);
Py_DECREF(oresult);
}

return oresults;
}

PyObject *Client_delete(PyClient *self, PyObject *args)
{
char *pResult;
size_t cbResult;
char *pKey;
size_t cbKey;
int expire = -1;
int flags = 0;
int async = 0;

if (!PyArg_ParseTuple (args, "s#|ib", &pKey, &cbKey, &expire, &async))
Expand All @@ -721,7 +758,7 @@ PyObject *Client_delete(PyClient *self, PyObject *args)
return NULL;
}

if (!async)
if (!self->client->isPipelined() && !async)
{
if (self->client->getResult(&pResult, &cbResult))
{
Expand Down Expand Up @@ -768,7 +805,7 @@ PyObject *Client_cas(PyClient *self, PyObject *args)
return NULL;
}

if (!async)
if (!self->client->isPipelined() && !async)
{
if (self->client->getResult(&pResult, &cbResult))
{
Expand Down Expand Up @@ -810,7 +847,7 @@ PyObject *Client_incr(PyClient *self, PyObject *args)
return NULL;
}

if (!async)
if (!self->client->isPipelined() && !async)
{
if (self->client->getResult(&pResult, &cbResult))
{
Expand Down Expand Up @@ -858,7 +895,7 @@ PyObject *Client_decr(PyClient *self, PyObject *args)
return NULL;
}

if (!async)
if (!self->client->isPipelined() && !async)
{
if (self->client->getResult(&pResult, &cbResult))
{
Expand Down Expand Up @@ -942,7 +979,7 @@ PyObject *Client_flush_all(PyClient *self, PyObject *args)
return NULL;
}

if (!async)
if (!self->client->isPipelined() && !async)
{
if (self->client->getResult(&pResult, &cbResult))
{
Expand Down Expand Up @@ -1009,18 +1046,17 @@ static PyMethodDef Client_methods[] = {
{"version", (PyCFunction) Client_version, METH_NOARGS, "def version(self)"},
{"stats", (PyCFunction) Client_stats, METH_NOARGS, "def stats(self)"},
{"flush_all", (PyCFunction) Client_flush_all, METH_VARARGS, "def flush_all(self, expiration = 0, async = False)"},
{"begin_pipeline", (PyCFunction) Client_begin_pipeline, METH_NOARGS, "def begin_pipeline(self)"},
{"abort_pipeline", (PyCFunction) Client_abort_pipeline, METH_NOARGS, "def abort_pipeline(self)"},
{"finish_pipeline", (PyCFunction) Client_finish_pipeline, METH_NOARGS, "def finish_pipeline(self)"},
{NULL}
};

static PyMemberDef Client_members[] = {
{"max_item_size", T_INT, offsetof(PyClient, maxSize), READONLY,
"Max item size"},
{"sock", T_OBJECT_EX, offsetof(PyClient, sock), READONLY,
"Socket instance"},
{"host", T_OBJECT_EX, offsetof(PyClient, host), READONLY,
"Host"},
{"port", T_INT, offsetof(PyClient, port), READONLY,
"Port"},
{"max_item_size", T_INT, offsetof(PyClient, maxSize), READONLY, "Max item size"},
{"sock", T_OBJECT_EX, offsetof(PyClient, sock), READONLY, "Socket instance"},
{"host", T_OBJECT_EX, offsetof(PyClient, host), READONLY, "Host"},
{"port", T_INT, offsetof(PyClient, port), READONLY, "Port"},
{NULL} /* Sentinel */
};

Expand All @@ -1047,7 +1083,7 @@ static PyTypeObject ClientType = {
0, /* tp_setattro */
0, /* tp_as_buffer */
Py_TPFLAGS_DEFAULT, /* tp_flags */
"Memcache client.\n\n"
"Memcache client.\n\n"
"Options:\n"
"- address: memcache server address.\n"
"- max_item_size: maximum size for an item in memcached.\n"
Expand Down
Loading

0 comments on commit 51ba637

Please sign in to comment.