Skip to content
This repository has been archived by the owner on Feb 26, 2020. It is now read-only.

Commit

Permalink
Merge pull request #21 from rbranson/master
Browse files Browse the repository at this point in the history
Improved error handling & support for pipelining
  • Loading branch information
Jonas Tärnström committed Dec 17, 2012
2 parents ad77f6d + 28ba028 commit 931f768
Show file tree
Hide file tree
Showing 7 changed files with 463 additions and 58 deletions.
155 changes: 149 additions & 6 deletions lib/Client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include <stdio.h>
#include <assert.h>
#include <sstream>
#include <string.h>

//#define PRINTMARK() fprintf(stderr, "%s: MARK(%d)\n", __FILE__, __LINE__)
#define PRINTMARK()
Expand All @@ -43,6 +44,7 @@ Client::Client (SOCKETDESC *sockdesc)
{
m_sock = sockdesc;
m_error = "Unspecified error";
m_pipeline = 0;
}

Client::~Client (void)
Expand All @@ -53,7 +55,6 @@ Client::~Client (void)

void Client::setError(const char *message)
{
assert (m_error == NULL);
m_error = message;
}

Expand All @@ -78,8 +79,18 @@ bool Client::connect(const char *address, int port)

bool Client::readLine(void)
{
while (!m_reader.haveLine())
while (true)
{
if (m_reader.haveLine())
{
if (extractErrorFromReader())
{
return false;
}

return true;
}

size_t bytesToRead = m_reader.getEndPtr () - m_reader.getWritePtr();

if (bytesToRead > 65536)
Expand Down Expand Up @@ -187,6 +198,12 @@ bool Client::cas(const char *key, size_t cbKey, UINT64 casUnique, void *data, si
m_writer.writeChars(data, cbData);
m_writer.writeChars("\r\n", 2);

if (m_pipeline)
{
m_pipeline += 1;
return true;
}

if (!sendWriteBuffer())
{
return false;
Expand Down Expand Up @@ -235,6 +252,12 @@ bool Client::command(const char *cmd, size_t cbCmd, const char *key, size_t cbKe
m_writer.writeChars(data, cbData);
m_writer.writeChars("\r\n", 2);

if (m_pipeline)
{
m_pipeline += 1;
return true;
}

if (!sendWriteBuffer())
{
PRINTMARK();
Expand All @@ -246,7 +269,6 @@ bool Client::command(const char *cmd, size_t cbCmd, const char *key, size_t cbKe
return true;
}


if (!readLine())
{
PRINTMARK();
Expand Down Expand Up @@ -298,6 +320,12 @@ bool Client::del(const char *key, size_t cbKey, time_t *expiration, bool async)
}
m_writer.writeChars("\r\n", 2);

if (m_pipeline)
{
m_pipeline += 1;
return true;
}

if (!sendWriteBuffer())
{
return false;
Expand Down Expand Up @@ -330,6 +358,12 @@ bool Client::incr(const char *key, size_t cbKey, UINT64 increment, bool async)
}
m_writer.writeChars("\r\n", 2);

if (m_pipeline)
{
m_pipeline += 1;
return true;
}

if (!sendWriteBuffer())
{
return false;
Expand Down Expand Up @@ -361,6 +395,12 @@ bool Client::decr(const char *key, size_t cbKey, UINT64 decrement, bool async)
}
m_writer.writeChars("\r\n", 2);

if (m_pipeline)
{
m_pipeline += 1;
return true;
}

if (!sendWriteBuffer())
{
return false;
Expand Down Expand Up @@ -407,6 +447,58 @@ bool Client::getFlush(void)
return true;
}

bool Client::pipelineBegin(void)
{
if (m_pipeline)
{
return false;
}

m_pipeline = 1;
m_writer.reset();
return true;
}

void Client::pipelineReset(void)
{
m_pipeline = 0;
m_writer.reset();
}

bool Client::pipelineAbort(void)
{
pipelineReset();
return true;
}

bool Client::pipelineFlush(void)
{
if (!m_pipeline || !sendWriteBuffer())
{
pipelineReset();
return false;
}

return true;
}

bool Client::getNextPipelineResult(char **pData, size_t *cbSize)
{
if (m_pipeline <= 1 || !readLine())
{
pipelineReset();
return false;
}

m_pipeline -= 1;
return getResult(pData, cbSize);
}

bool Client::isPipelined(void)
{
return m_pipeline > 0;
}

bool Client::version(char **pVersion, size_t *cbVersion)
{
m_writer.writeChars("version\r\n", 9);
Expand Down Expand Up @@ -503,6 +595,12 @@ bool Client::flushAll(time_t *expiration, bool async)

m_writer.writeChars("\r\n", 2);

if (m_pipeline)
{
m_pipeline += 1;
return true;
}

if (!sendWriteBuffer())
{
return false;
Expand All @@ -521,10 +619,47 @@ bool Client::flushAll(time_t *expiration, bool async)
return true;
}

bool Client::extractErrorFromReader(void)
{
static const char *RESPONSE_ERROR = "ERROR";
static const char *RESPONSE_CLIENT_ERROR = "CLIENT_ERROR";
static const char *RESPONSE_SERVER_ERROR = "SERVER_ERROR";
static const size_t RESPONSE_ERROR_SIZE = strlen(RESPONSE_ERROR);
static const size_t RESPONSE_CLIENT_ERROR_SIZE = strlen(RESPONSE_CLIENT_ERROR);
static const size_t RESPONSE_SERVER_ERROR_SIZE = strlen(RESPONSE_SERVER_ERROR);

if (m_reader.beginsWithString(RESPONSE_ERROR, RESPONSE_ERROR_SIZE) ||
m_reader.beginsWithString(RESPONSE_CLIENT_ERROR, RESPONSE_CLIENT_ERROR_SIZE) ||
m_reader.beginsWithString(RESPONSE_SERVER_ERROR, RESPONSE_SERVER_ERROR_SIZE))
{
size_t cbError = 0;
char *errorString = (char *)m_reader.readUntil(&cbError, '\r');

if (cbError > 1)
{
errorString[cbError] = '\0';
}
else
{
errorString = "malformed error received";
}

setError(errorString);
m_reader.skip();
return true;
}

return false;
}


bool Client::getReadNext(char **key, size_t *cbKey, char **data, size_t *cbData, int *_flags, UINT64 *_cas, bool *bError)
{
static const char *END_OF_RESPONSE = "END\r\n";
static const char *BEGIN_OF_VALUE = "VALUE ";
static const size_t END_OF_RESPONSE_SIZE = strlen(END_OF_RESPONSE);
static const size_t BEGIN_OF_VALUE_SIZE = strlen(BEGIN_OF_VALUE);

*bError = false;

if (!readLine())
Expand All @@ -533,13 +668,22 @@ bool Client::getReadNext(char **key, size_t *cbKey, char **data, size_t *cbData,
return false;
}

if (m_reader.readBytes(6) == NULL)
if (m_reader.beginsWithString(END_OF_RESPONSE, END_OF_RESPONSE_SIZE))
{
// "END\r\n" was recieved
m_reader.skip();
return false;
}

const char *valuePrefix = (const char *)m_reader.readBytes(BEGIN_OF_VALUE_SIZE);

if (valuePrefix == NULL || memcmp(valuePrefix, BEGIN_OF_VALUE, BEGIN_OF_VALUE_SIZE) != 0)
{
*bError = true;
setError("malformed response: expected VALUE");
m_reader.skip();
return false;
}

*key = (char *) m_reader.readUntil(cbKey, ' ');

if (*key == NULL)
Expand All @@ -550,7 +694,6 @@ bool Client::getReadNext(char **key, size_t *cbKey, char **data, size_t *cbData,

*(*key + *cbKey) = '\0';


if (m_reader.readBytes(1) == NULL)
{
*bError = true;
Expand Down
12 changes: 10 additions & 2 deletions lib/Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ class Client
bool getFlush(void);
bool getReadNext(char **key, size_t *cbKey, char **data, size_t *cbData, int *flags, UINT64 *cas, bool *bError);

bool pipelineBegin(void);
bool pipelineFlush(void);
bool pipelineAbort(void);
bool getNextPipelineResult(char **pData, size_t *cbSize);
bool isPipelined(void);

bool set(const char *key, size_t cbKey, void *data, size_t cbData, time_t expiration, int flags, bool async, size_t maxSize);
bool del(const char *key, size_t cbKey, time_t *expiration, bool async);
Expand All @@ -89,18 +94,21 @@ class Client
bool flushAll(time_t *expiration, bool async);
bool getResult(char **pData, size_t *cbSize);
const char *getError(void);
bool sendWriteBuffer(void);

private:
bool command(const char *cmd, size_t cbCmd, const char *key, size_t cbKey, void *data, size_t cbData, time_t expiration, int flags, bool async, size_t maxSize);
bool sendWriteBuffer(void);
bool readLine(void);


void pipelineReset(void);
void setError(const char *message);
bool extractErrorFromReader(void);

private:
SOCKETDESC *m_sock;
PacketWriter m_writer;
PacketReader m_reader;

const char *m_error;
int m_pipeline;
};
10 changes: 10 additions & 0 deletions lib/PacketReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "socketdefs.h"

#include <stdio.h>
#include <string.h>

#define BYTEORDER_UINT16(_x) (_x)
#define BYTEORDER_UINT32(_x) (_x)
Expand Down Expand Up @@ -109,6 +110,15 @@ char *PacketReader::getEndPtr()

extern void PrintBuffer(FILE *file, void *_offset, size_t len, int perRow);

bool PacketReader::beginsWithString(const char *str, size_t cbsize)
{
if (m_readCursor + cbsize > m_packetEnd)
{
return false;
}

return memcmp(m_readCursor, str, cbsize) == 0;
}

bool PacketReader::readNumeric (UINT64 *value)
{
Expand Down
3 changes: 2 additions & 1 deletion lib/PacketReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ class PacketReader

size_t getBytesLeft();
void rewind(size_t num);
bool beginsWithString(const char *str, size_t cbsize);
};

#endif
#endif
Loading

0 comments on commit 931f768

Please sign in to comment.