Skip to content
This repository has been archived by the owner on Feb 26, 2020. It is now read-only.

Commit

Permalink
Reverted pipeline functionality. Change error to be stored as std::st…
Browse files Browse the repository at this point in the history
…ring. Bumped version
  • Loading branch information
jtarnstrom committed Feb 15, 2013
1 parent ab30f37 commit c2b02fc
Show file tree
Hide file tree
Showing 5 changed files with 10 additions and 337 deletions.
92 changes: 1 addition & 91 deletions lib/Client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ Client::Client (SOCKETDESC *sockdesc)
, m_reader (1024 * 1200)
{
m_sock = sockdesc;
m_error = "Unspecified error";
m_pipeline = 0;
}

Client::~Client (void)
Expand All @@ -60,7 +58,7 @@ void Client::setError(const char *message)

const char *Client::getError(void)
{
return m_error;
return m_error.c_str();
}

bool Client::connect(const char *address, int port)
Expand Down Expand Up @@ -198,12 +196,6 @@ bool Client::cas(const char *key, size_t cbKey, UINT64 casUnique, void *data, si
m_writer.writeChars(data, cbData);
m_writer.writeChars("\r\n", 2);

if (m_pipeline)
{
m_pipeline += 1;
return true;
}

if (!sendWriteBuffer())
{
return false;
Expand Down Expand Up @@ -252,12 +244,6 @@ bool Client::command(const char *cmd, size_t cbCmd, const char *key, size_t cbKe
m_writer.writeChars(data, cbData);
m_writer.writeChars("\r\n", 2);

if (m_pipeline)
{
m_pipeline += 1;
return true;
}

if (!sendWriteBuffer())
{
PRINTMARK();
Expand Down Expand Up @@ -320,12 +306,6 @@ bool Client::del(const char *key, size_t cbKey, time_t *expiration, bool async)
}
m_writer.writeChars("\r\n", 2);

if (m_pipeline)
{
m_pipeline += 1;
return true;
}

if (!sendWriteBuffer())
{
return false;
Expand Down Expand Up @@ -358,12 +338,6 @@ bool Client::incr(const char *key, size_t cbKey, UINT64 increment, bool async)
}
m_writer.writeChars("\r\n", 2);

if (m_pipeline)
{
m_pipeline += 1;
return true;
}

if (!sendWriteBuffer())
{
return false;
Expand Down Expand Up @@ -395,12 +369,6 @@ bool Client::decr(const char *key, size_t cbKey, UINT64 decrement, bool async)
}
m_writer.writeChars("\r\n", 2);

if (m_pipeline)
{
m_pipeline += 1;
return true;
}

if (!sendWriteBuffer())
{
return false;
Expand Down Expand Up @@ -447,58 +415,6 @@ bool Client::getFlush(void)
return true;
}

bool Client::pipelineBegin(void)
{
if (m_pipeline)
{
return false;
}

m_pipeline = 1;
m_writer.reset();
return true;
}

void Client::pipelineReset(void)
{
m_pipeline = 0;
m_writer.reset();
}

bool Client::pipelineAbort(void)
{
pipelineReset();
return true;
}

bool Client::pipelineFlush(void)
{
if (!m_pipeline || !sendWriteBuffer())
{
pipelineReset();
return false;
}

return true;
}

bool Client::getNextPipelineResult(char **pData, size_t *cbSize)
{
if (m_pipeline <= 1 || !readLine())
{
pipelineReset();
return false;
}

m_pipeline -= 1;
return getResult(pData, cbSize);
}

bool Client::isPipelined(void)
{
return m_pipeline > 0;
}

bool Client::version(char **pVersion, size_t *cbVersion)
{
m_writer.writeChars("version\r\n", 9);
Expand Down Expand Up @@ -595,12 +511,6 @@ bool Client::flushAll(time_t *expiration, bool async)

m_writer.writeChars("\r\n", 2);

if (m_pipeline)
{
m_pipeline += 1;
return true;
}

if (!sendWriteBuffer())
{
return false;
Expand Down
11 changes: 2 additions & 9 deletions lib/Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "mcdefs.h"
#include "PacketWriter.h"
#include "PacketReader.h"
#include <string>

typedef struct SOCKETDESC
{
Expand Down Expand Up @@ -71,12 +72,6 @@ class Client
bool getFlush(void);
bool getReadNext(char **key, size_t *cbKey, char **data, size_t *cbData, int *flags, UINT64 *cas, bool *bError);

bool pipelineBegin(void);
bool pipelineFlush(void);
bool pipelineAbort(void);
bool getNextPipelineResult(char **pData, size_t *cbSize);
bool isPipelined(void);

bool set(const char *key, size_t cbKey, void *data, size_t cbData, time_t expiration, int flags, bool async, size_t maxSize);
bool del(const char *key, size_t cbKey, time_t *expiration, bool async);
bool add(const char *key, size_t cbKey, void *data, size_t cbData, time_t expiration, int flags, bool async, size_t maxSize);
Expand All @@ -100,7 +95,6 @@ class Client
bool command(const char *cmd, size_t cbCmd, const char *key, size_t cbKey, void *data, size_t cbData, time_t expiration, int flags, bool async, size_t maxSize);
bool readLine(void);

void pipelineReset(void);
void setError(const char *message);
bool extractErrorFromReader(void);

Expand All @@ -109,6 +103,5 @@ class Client
PacketWriter m_writer;
PacketReader m_reader;

const char *m_error;
int m_pipeline;
std::string m_error;
};
83 changes: 6 additions & 77 deletions python/umemcache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ PyObject *Client_command(PyClient *self, PFN_COMMAND cmd, PyObject *args)
return NULL;
}

if (!self->client->isPipelined() && !async)
if (!async)
{
if (self->client->getResult(&pResult, &cbResult))
{
Expand Down Expand Up @@ -422,11 +422,6 @@ PyObject *Client_get(PyClient *self, PyObject *args)
return NULL;
}

if (self->client->isPipelined())
{
return PyErr_Format(umemcache_MemcachedError, "Operation cannot be performed inside a pipeline");
}

self->client->getBegin();

self->client->getKeyWrite(pKey, cbKey);
Expand Down Expand Up @@ -488,11 +483,6 @@ PyObject *Client_gets(PyClient *self, PyObject *args)
return NULL;
}

if (self->client->isPipelined())
{
return PyErr_Format(umemcache_MemcachedError, "Operation cannot be performed inside a pipeline");
}

self->client->getsBegin();

self->client->getKeyWrite(pKey, cbKey);
Expand Down Expand Up @@ -553,11 +543,6 @@ PyObject *Client_get_multi(PyClient *self, PyObject *okeys)
UINT64 cas;
int flags;

if (self->client->isPipelined())
{
return PyErr_Format(umemcache_MemcachedError, "Operation cannot be performed inside a pipeline");
}

self->client->getBegin();

PyObject *iterator = PyObject_GetIter(okeys);
Expand Down Expand Up @@ -639,11 +624,6 @@ PyObject *Client_gets_multi(PyClient *self, PyObject *okeys)
UINT64 cas;
int flags;

if (self->client->isPipelined())
{
return PyErr_Format(umemcache_MemcachedError, "Operation cannot be performed inside a pipeline");
}

self->client->getsBegin();

PyObject *iterator = PyObject_GetIter(okeys);
Expand Down Expand Up @@ -716,44 +696,6 @@ PyObject *Client_gets_multi(PyClient *self, PyObject *okeys)
return odict;
}

PyObject *Client_begin_pipeline(PyClient *self, PyObject *args)
{
if (!self->client->pipelineBegin())
{
return PyErr_Format(umemcache_MemcachedError, "pipeline already started");
}

Py_RETURN_NONE;
}

PyObject *Client_abort_pipeline(PyClient *self, PyObject *args)
{
self->client->pipelineAbort();
Py_RETURN_NONE;
}

PyObject *Client_finish_pipeline(PyClient *self, PyObject *args)
{
char *pResult;
size_t cbResult;

if (!self->client->pipelineFlush())
{
return PyErr_Format(umemcache_MemcachedError, "error flushing pipeline");
}

PyObject *oresults = PyList_New(0);

while (self->client->getNextPipelineResult(&pResult, &cbResult))
{
PyObject *oresult = PyString_FromStringAndSize(pResult, cbResult);
PyList_Append(oresults, oresult);
Py_DECREF(oresult);
}

return oresults;
}

PyObject *Client_delete(PyClient *self, PyObject *args)
{
char *pResult;
Expand All @@ -780,7 +722,7 @@ PyObject *Client_delete(PyClient *self, PyObject *args)
return NULL;
}

if (!self->client->isPipelined() && !async)
if (!async)
{
if (self->client->getResult(&pResult, &cbResult))
{
Expand Down Expand Up @@ -827,7 +769,7 @@ PyObject *Client_cas(PyClient *self, PyObject *args)
return NULL;
}

if (!self->client->isPipelined() && !async)
if (!async)
{
if (self->client->getResult(&pResult, &cbResult))
{
Expand Down Expand Up @@ -869,7 +811,7 @@ PyObject *Client_incr(PyClient *self, PyObject *args)
return NULL;
}

if (!self->client->isPipelined() && !async)
if (!async)
{
if (self->client->getResult(&pResult, &cbResult))
{
Expand Down Expand Up @@ -917,7 +859,7 @@ PyObject *Client_decr(PyClient *self, PyObject *args)
return NULL;
}

if (!self->client->isPipelined() && !async)
if (!async)
{
if (self->client->getResult(&pResult, &cbResult))
{
Expand All @@ -944,11 +886,6 @@ PyObject *Client_version(PyClient *self, PyObject *args)
char *pVersion;
size_t cbVersion;

if (self->client->isPipelined())
{
return PyErr_Format(umemcache_MemcachedError, "Operation cannot be performed inside a pipeline");
}

if (!self->client->version(&pVersion, &cbVersion))
{
return PyErr_Format(umemcache_MemcachedError, "Could not retrieve version");
Expand All @@ -964,11 +901,6 @@ PyObject *Client_stats(PyClient *self, PyObject *args)
size_t cbName;
size_t cbValue;

if (self->client->isPipelined())
{
return PyErr_Format(umemcache_MemcachedError, "Operation cannot be performed inside a pipeline");
}

if (!self->client->stats(NULL, 0))
{
return PyErr_Format(umemcache_MemcachedError, "Stats command failed");
Expand Down Expand Up @@ -1011,7 +943,7 @@ PyObject *Client_flush_all(PyClient *self, PyObject *args)
return NULL;
}

if (!self->client->isPipelined() && !async)
if (!async)
{
if (self->client->getResult(&pResult, &cbResult))
{
Expand Down Expand Up @@ -1112,9 +1044,6 @@ static PyMethodDef Client_methods[] = {
{"flush_all", (PyCFunction) Client_flush_all, METH_VARARGS, "def flush_all(self, expiration = 0, async = False)"},
{"set_timeout", (PyCFunction) Client_set_timeout, METH_VARARGS, "def set_timeout(self, value)"},
{"get_timeout", (PyCFunction) Client_get_timeout, METH_NOARGS, "def get_timeout(self)"},
{"begin_pipeline", (PyCFunction) Client_begin_pipeline, METH_NOARGS, "def begin_pipeline(self)"},
{"abort_pipeline", (PyCFunction) Client_abort_pipeline, METH_NOARGS, "def abort_pipeline(self)"},
{"finish_pipeline", (PyCFunction) Client_finish_pipeline, METH_NOARGS, "def finish_pipeline(self)"},
{NULL}
};

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@


setup (name = 'umemcache',
version = "1.6.1",
version = "1.6.3",
description = "Ultra fast memcache client written in highly optimized C++ with Python bindings",
long_description = README,
ext_modules = [module1],
Expand Down
Loading

4 comments on commit c2b02fc

@rbranson
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Were you experiencing issues with the pipeline functionality?

@jskorpan
Copy link

@jskorpan jskorpan commented on c2b02fc Apr 4, 2013 via email

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rbranson
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you provide the specifics? I'd love to help with this. We run this code in production at Instagram, so it's definitely in our best interest to get it right. I'll admit I'm nowhere near approaching an expert C/C++ developer.

@jskorpan
Copy link

@jskorpan jskorpan commented on c2b02fc Apr 4, 2013 via email

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.