Skip to content

Commit

Permalink
fix multi DEL, a bug on parse key, and remove assert on conn_sendv
Browse files Browse the repository at this point in the history
  • Loading branch information
idning committed Jul 4, 2014
1 parent 9739f3a commit 3064dc2
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 53 deletions.
54 changes: 54 additions & 0 deletions scripts/test_mget.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#!/usr/bin/env python
#coding: utf-8
#file : test_mget.py
#author : ning
#date : 2014-04-01 13:15:48

'''
usage:
export REDIS_HOST=127.0.0.1
export REDIS_PORT=4000
nosetests
'''

import os
import redis

host = os.environ['REDIS_HOST']
port = int(os.environ['REDIS_PORT'])


def test_mget(cnt=1000):
print 'test_many', cnt
r = redis.StrictRedis(host, port)

#insert
pipe = r.pipeline(transaction=False)
for i in range(cnt):
pipe.set('kkk-%s'%i, 'vvv-%s'%i)
pipe.execute()

keys = ['kkk-%s' % i for i in range(cnt)]

#mget to check
vals = r.mget(keys)
#print vals
for i in range(cnt):
assert('vvv-%s'%i == vals[i])

#del
assert (cnt == r.delete(*keys) )

#mget again
vals = r.mget(keys)
for i in range(cnt):
assert(None == vals[i])

def test_many_mget():
for i in range(1, 10000, 17):
test_mget(i)
pass


114 changes: 66 additions & 48 deletions src/nc_message.c
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ key_to_idx(struct server_pool *pool, uint8_t *key, uint32_t keylen){
* r->key_end
* */
static rstatus_t
msg_fragment_mget_update_keypos(struct msg *r, struct mbuf ** in_buf){
msg_fragment_argx_update_keypos(struct msg *r, struct mbuf ** in_buf){
struct mbuf *buf;
uint8_t * p;
uint32_t len = 0;
Expand All @@ -490,11 +490,12 @@ msg_fragment_mget_update_keypos(struct msg *r, struct mbuf ** in_buf){
for (; p < buf->last && isdigit(*p); p++) {
len = len * 10 + (uint32_t)(*p - '0');
}

keylen = len;
len += (uint32_t)CRLF_LEN * 2;
len += (uint32_t)(p - buf->pos);

if(mbuf_length(buf) < len){ //key no in this buf, remove it.
if(mbuf_length(buf) < len - CRLF_LEN){ //key no in this buf, remove it.
len -= mbuf_length(buf);
mbuf_remove(&r->mhdr, buf);
mbuf_put(buf);
Expand All @@ -503,13 +504,24 @@ msg_fragment_mget_update_keypos(struct msg *r, struct mbuf ** in_buf){
}

r->key_end = buf->pos + len - CRLF_LEN;
r->key_start = r->key_end - keylen; //TODO, check
r->key_start = r->key_end - keylen;
buf->pos += len - CRLF_LEN;

len = CRLF_LEN;
while(mbuf_length(buf) < len){ //eat CRLF
len -= mbuf_length(buf);
mbuf_remove(&r->mhdr, buf);
mbuf_put(buf);

buf = STAILQ_FIRST(&r->mhdr);
}
buf->pos += len;

return NC_OK;
}

static rstatus_t
msg_fragment_mget(struct context *ctx, struct conn *conn, struct msg *msg){
msg_fragment_argx(struct context *ctx, struct conn *conn, struct msg *msg, struct msg *nmsg){
struct server_pool *pool;
struct mbuf *mbuf, *sub_msg_mbuf;
struct msg ** sub_msgs;
Expand Down Expand Up @@ -542,7 +554,7 @@ msg_fragment_mget(struct context *ctx, struct conn *conn, struct msg *msg){
msg->frag_owner = msg;

for(i = 1; i < msg->narg; i++){ //for each key
msg_fragment_mget_update_keypos(msg, &mbuf);
msg_fragment_argx_update_keypos(msg, &mbuf);

uint8_t * key = msg->key_start;
uint32_t keylen = (uint32_t)(msg->key_end - msg->key_start);
Expand Down Expand Up @@ -587,7 +599,7 @@ msg_fragment_mget(struct context *ctx, struct conn *conn, struct msg *msg){
msg->key_end = mbuf->start + 12;
msg->mlen = 14;

conn->recv_done(ctx, conn, msg, NULL);
conn->recv_done(ctx, conn, msg, nmsg);

for(i = 0; i < pool->ncontinuum; i++){ //prepend mget header, and forward it
struct msg* sub_msg = sub_msgs[i];
Expand All @@ -601,16 +613,21 @@ msg_fragment_mget(struct context *ctx, struct conn *conn, struct msg *msg){
nc_free(sub_msgs);
return NC_ENOMEM;
}
sub_msg_mbuf->last += nc_snprintf(sub_msg_mbuf->last, mbuf_size(sub_msg_mbuf), "*%d\r\n$4\r\nmget\r\n",
sub_msg->narg + 1);
if (msg->type == MSG_REQ_REDIS_MGET){
sub_msg_mbuf->last += nc_snprintf(sub_msg_mbuf->last, mbuf_size(sub_msg_mbuf), "*%d\r\n$4\r\nmget\r\n",
sub_msg->narg + 1);
}else if (msg->type == MSG_REQ_REDIS_DEL){
sub_msg_mbuf->last += nc_snprintf(sub_msg_mbuf->last, mbuf_size(sub_msg_mbuf), "*%d\r\n$3\r\ndel\r\n",
sub_msg->narg + 1);
}
sub_msg->mlen += mbuf_length(sub_msg_mbuf);

STAILQ_INSERT_HEAD(&sub_msg->mhdr, sub_msg_mbuf, next);
sub_msg->type = MSG_REQ_REDIS_MGET;
sub_msg->type = msg->type;
sub_msg->frag_id = msg->frag_id;
sub_msg->frag_owner = msg->frag_owner;
STAILQ_INSERT_HEAD(&sub_msg->mhdr, sub_msg_mbuf, next);

conn->recv_done(ctx, conn, sub_msg, NULL);

conn->recv_done(ctx, conn, sub_msg, nmsg);
msg->nfrag ++;
}

Expand All @@ -624,45 +641,45 @@ msg_parsed(struct context *ctx, struct conn *conn, struct msg *msg)
struct msg *nmsg; //next msg
struct mbuf *mbuf, *nbuf;

if(msg->type == MSG_REQ_REDIS_MGET){
rstatus_t status = msg_fragment_mget(ctx, conn, msg);
if (status != NC_OK) {
return status;
}
return NC_OK;//TODO.
}

mbuf = STAILQ_LAST(&msg->mhdr, mbuf, next);
if (msg->pos == mbuf->last) {
/* no more data to parse */
conn->recv_done(ctx, conn, msg, NULL);
return NC_OK;
}
nmsg = NULL;
}else{

/*
* Input mbuf has un-parsed data. Split mbuf of the current message msg
* into (mbuf, nbuf), where mbuf is the portion of the message that has
* been parsed and nbuf is the portion of the message that is un-parsed.
* Parse nbuf as a new message nmsg in the next iteration.
*/
nbuf = mbuf_split(&msg->mhdr, msg->pos, NULL, NULL);
if (nbuf == NULL) {
return NC_ENOMEM;
}

/*
* Input mbuf has un-parsed data. Split mbuf of the current message msg
* into (mbuf, nbuf), where mbuf is the portion of the message that has
* been parsed and nbuf is the portion of the message that is un-parsed.
* Parse nbuf as a new message nmsg in the next iteration.
*/
nbuf = mbuf_split(&msg->mhdr, msg->pos, NULL, NULL);
if (nbuf == NULL) {
return NC_ENOMEM;
}
nmsg = msg_get(msg->owner, msg->request, conn->redis);
if (nmsg == NULL) {
mbuf_put(nbuf);
return NC_ENOMEM;
}
mbuf_insert(&nmsg->mhdr, nbuf);
nmsg->pos = nbuf->pos;

nmsg = msg_get(msg->owner, msg->request, conn->redis);
if (nmsg == NULL) {
mbuf_put(nbuf);
return NC_ENOMEM;
/* update length of current (msg) and new message (nmsg) */
nmsg->mlen = mbuf_length(nbuf);
msg->mlen -= nmsg->mlen;
}
mbuf_insert(&nmsg->mhdr, nbuf);
nmsg->pos = nbuf->pos;

/* update length of current (msg) and new message (nmsg) */
nmsg->mlen = mbuf_length(nbuf);
msg->mlen -= nmsg->mlen;

conn->recv_done(ctx, conn, msg, nmsg);
if(redis_argx(msg)){
rstatus_t status = msg_fragment_argx(ctx, conn, msg, nmsg);
if (status != NC_OK) {
return status;
}
return NC_OK;//TODO.
}else{
conn->recv_done(ctx, conn, msg, nmsg);
}

return NC_OK;
}
Expand Down Expand Up @@ -956,11 +973,12 @@ msg_send_chain(struct context *ctx, struct conn *conn, struct msg *msg)
}
}

ASSERT(!TAILQ_EMPTY(&send_msgq) && nsend != 0);

conn->smsg = NULL;

n = conn_sendv(conn, &sendv, nsend);
if(!TAILQ_EMPTY(&send_msgq) && nsend != 0){
n = conn_sendv(conn, &sendv, nsend);
}else{
n = 0;
}

nsent = n > 0 ? (size_t)n : 0;

Expand Down
1 change: 1 addition & 0 deletions src/proto/nc_proto.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ rstatus_t memcache_post_splitcopy(struct msg *r);
void memcache_pre_coalesce(struct msg *r);
void memcache_post_coalesce(struct msg *r);

bool redis_argx(struct msg *r);
void redis_parse_req(struct msg *r);
void redis_parse_rsp(struct msg *r);
void redis_pre_splitcopy(struct mbuf *mbuf, void *arg);
Expand Down
30 changes: 25 additions & 5 deletions src/proto/nc_redis.c
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ redis_argn(struct msg *r)
* Return true, if the redis command is a vector command accepting one or
* more keys, otherwise return false
*/
static bool
bool
redis_argx(struct msg *r)
{
switch (r->type) {
Expand Down Expand Up @@ -2129,6 +2129,23 @@ redis_copy_bulk(struct msg *dst, struct msg * src){
return bytes;
}

void
redis_post_coalesce_del(struct msg *request) {
struct msg *response = request->peer;
struct mbuf * mbuf;
uint32_t len;

mbuf = STAILQ_FIRST(&response->mhdr);
mbuf->last = mbuf->pos = mbuf->start; //discard PONG

len = nc_scnprintf(mbuf->last, mbuf_size(mbuf), ":%d\r\n", request->integer);
mbuf->last += len;
response->mlen += (uint32_t)len;

nc_free(request->frag_seq);

}

void
redis_post_coalesce_mget(struct msg *request) {
struct msg *response = request->peer;
Expand All @@ -2138,7 +2155,7 @@ redis_post_coalesce_mget(struct msg *request) {
int i;

mbuf = STAILQ_FIRST(&response->mhdr);
mbuf->last = mbuf->pos = mbuf->start;
mbuf->last = mbuf->pos = mbuf->start; //discard PONG

len = nc_scnprintf(mbuf->last, mbuf_size(mbuf), "*%d\r\n", request->narg - 1);
mbuf->last += len;
Expand Down Expand Up @@ -2200,14 +2217,17 @@ redis_post_coalesce(struct msg *r)
mbuf = STAILQ_FIRST(&pr->mhdr);
ASSERT(mbuf_empty(mbuf));

n = nc_scnprintf(mbuf->last, mbuf_size(mbuf), "*%d\r\n", r->narg - 1);
n = nc_scnprintf(mbuf->last, mbuf_size(mbuf), "*%d\r\n", r->nfrag);
mbuf->last += n;
pr->mlen += (uint32_t)n;
pr->mlen = (uint32_t)n;
break;

case MSG_RSP_REDIS_STATUS: //this is the orig mget msg, PING-PONG
redis_post_coalesce_mget(r);
if (r->type == MSG_REQ_REDIS_MGET){
redis_post_coalesce_mget(r);
}else if (r->type == MSG_REQ_REDIS_DEL){
redis_post_coalesce_del(r);
}
break;

default:
Expand Down

0 comments on commit 3064dc2

Please sign in to comment.