Skip to content

Commit

Permalink
Fix a TSAN reported error
Browse files Browse the repository at this point in the history
We now pass down pointers to closures instead of (callback, arg) pair
elements separately. This allows us to store one word atomically, fixing
a race condition.

All call sites have been updated to the new API. No new allocations are
incurred. grpc_fd_state is deleted to avoid any temptation to ever add
anything there again.
  • Loading branch information
ctiller committed Feb 18, 2015
1 parent 9be83ee commit 0fcd53c
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 68 deletions.
78 changes: 34 additions & 44 deletions src/core/iomgr/fd_posix.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
#include <grpc/support/log.h>
#include <grpc/support/useful.h>

enum descriptor_state { NOT_READY, READY, WAITING };
enum descriptor_state { NOT_READY = 0, READY = 1 }; /* or a pointer to a closure to call */

/* We need to keep a freelist not because of any concerns of malloc performance
* but instead so that implementations with multiple threads in (for example)
Expand Down Expand Up @@ -88,8 +88,8 @@ static grpc_fd *alloc_fd(int fd) {
gpr_mu_init(&r->watcher_mu);
}
gpr_atm_rel_store(&r->refst, 1);
gpr_atm_rel_store(&r->readst.state, NOT_READY);
gpr_atm_rel_store(&r->writest.state, NOT_READY);
gpr_atm_rel_store(&r->readst, NOT_READY);
gpr_atm_rel_store(&r->writest, NOT_READY);
gpr_atm_rel_store(&r->shutdown, 0);
r->fd = fd;
r->watcher_root.next = r->watcher_root.prev = &r->watcher_root;
Expand Down Expand Up @@ -166,11 +166,6 @@ void grpc_fd_ref(grpc_fd *fd) { ref_by(fd, 2); }

void grpc_fd_unref(grpc_fd *fd) { unref_by(fd, 2); }

typedef struct {
grpc_iomgr_cb_func cb;
void *arg;
} callback;

static void make_callback(grpc_iomgr_cb_func cb, void *arg, int success,
int allow_synchronous_callback) {
if (allow_synchronous_callback) {
Expand All @@ -180,28 +175,26 @@ static void make_callback(grpc_iomgr_cb_func cb, void *arg, int success,
}
}

static void make_callbacks(callback *callbacks, size_t n, int success,
static void make_callbacks(grpc_iomgr_closure *callbacks, size_t n, int success,
int allow_synchronous_callback) {
size_t i;
for (i = 0; i < n; i++) {
make_callback(callbacks[i].cb, callbacks[i].arg, success,
make_callback(callbacks[i].cb, callbacks[i].cb_arg, success,
allow_synchronous_callback);
}
}

static void notify_on(grpc_fd *fd, grpc_fd_state *st, grpc_iomgr_cb_func cb,
void *arg, int allow_synchronous_callback) {
switch ((enum descriptor_state)gpr_atm_acq_load(&st->state)) {
static void notify_on(grpc_fd *fd, gpr_atm *st, grpc_iomgr_closure *closure,
int allow_synchronous_callback) {
switch (gpr_atm_acq_load(st)) {
case NOT_READY:
/* There is no race if the descriptor is already ready, so we skip
the interlocked op in that case. As long as the app doesn't
try to set the same upcall twice (which it shouldn't) then
oldval should never be anything other than READY or NOT_READY. We
don't
check for user error on the fast path. */
st->cb = cb;
st->cb_arg = arg;
if (gpr_atm_rel_cas(&st->state, NOT_READY, WAITING)) {
if (gpr_atm_rel_cas(st, NOT_READY, (gpr_intptr) closure)) {
/* swap was successful -- the closure will run after the next
set_ready call. NOTE: we don't have an ABA problem here,
since we should never have concurrent calls to the same
Expand All @@ -212,12 +205,12 @@ static void notify_on(grpc_fd *fd, grpc_fd_state *st, grpc_iomgr_cb_func cb,
/* swap was unsuccessful due to an intervening set_ready call.
Fall through to the READY code below */
case READY:
assert(gpr_atm_acq_load(&st->state) == READY);
gpr_atm_rel_store(&st->state, NOT_READY);
make_callback(cb, arg, !gpr_atm_acq_load(&fd->shutdown),
assert(gpr_atm_acq_load(st) == READY);
gpr_atm_rel_store(st, NOT_READY);
make_callback(closure->cb, closure->cb_arg, !gpr_atm_acq_load(&fd->shutdown),
allow_synchronous_callback);
return;
case WAITING:
default: /* WAITING */
/* upcallptr was set to a different closure. This is an error! */
gpr_log(GPR_ERROR,
"User called a notify_on function with a previous callback still "
Expand All @@ -228,38 +221,37 @@ static void notify_on(grpc_fd *fd, grpc_fd_state *st, grpc_iomgr_cb_func cb,
abort();
}

static void set_ready_locked(grpc_fd_state *st, callback *callbacks,
static void set_ready_locked(gpr_atm *st, grpc_iomgr_closure *callbacks,
size_t *ncallbacks) {
callback *c;
gpr_intptr state = gpr_atm_acq_load(st);

switch ((enum descriptor_state)gpr_atm_acq_load(&st->state)) {
switch (state) {
case READY:
/* duplicate ready, ignore */
return;
case NOT_READY:
if (gpr_atm_rel_cas(&st->state, NOT_READY, READY)) {
if (gpr_atm_rel_cas(st, NOT_READY, READY)) {
/* swap was successful -- the closure will run after the next
notify_on call. */
return;
}
/* swap was unsuccessful due to an intervening set_ready call.
Fall through to the WAITING code below */
case WAITING:
assert(gpr_atm_acq_load(&st->state) == WAITING);
c = &callbacks[(*ncallbacks)++];
c->cb = st->cb;
c->arg = st->cb_arg;
gpr_atm_rel_store(&st->state, NOT_READY);
return;
case READY:
/* duplicate ready, ignore */
state = gpr_atm_acq_load(st);
default: /* waiting */
assert(gpr_atm_acq_load(st) != READY && gpr_atm_acq_load(st) != NOT_READY);
callbacks[(*ncallbacks)++] = *(grpc_iomgr_closure*)state;
gpr_atm_rel_store(st, NOT_READY);
return;
}
}

static void set_ready(grpc_fd *fd, grpc_fd_state *st,
static void set_ready(grpc_fd *fd, gpr_atm *st,
int allow_synchronous_callback) {
/* only one set_ready can be active at once (but there may be a racing
notify_on) */
int success;
callback cb;
grpc_iomgr_closure cb;
size_t ncb = 0;
gpr_mu_lock(&fd->set_state_mu);
set_ready_locked(st, &cb, &ncb);
Expand All @@ -269,7 +261,7 @@ static void set_ready(grpc_fd *fd, grpc_fd_state *st,
}

void grpc_fd_shutdown(grpc_fd *fd) {
callback cb[2];
grpc_iomgr_closure cb[2];
size_t ncb = 0;
gpr_mu_lock(&fd->set_state_mu);
GPR_ASSERT(!gpr_atm_acq_load(&fd->shutdown));
Expand All @@ -280,14 +272,12 @@ void grpc_fd_shutdown(grpc_fd *fd) {
make_callbacks(cb, ncb, 0, 0);
}

void grpc_fd_notify_on_read(grpc_fd *fd, grpc_iomgr_cb_func read_cb,
void *read_cb_arg) {
notify_on(fd, &fd->readst, read_cb, read_cb_arg, 0);
void grpc_fd_notify_on_read(grpc_fd *fd, grpc_iomgr_closure *closure) {
notify_on(fd, &fd->readst, closure, 0);
}

void grpc_fd_notify_on_write(grpc_fd *fd, grpc_iomgr_cb_func write_cb,
void *write_cb_arg) {
notify_on(fd, &fd->writest, write_cb, write_cb_arg, 0);
void grpc_fd_notify_on_write(grpc_fd *fd, grpc_iomgr_closure *closure) {
notify_on(fd, &fd->writest, closure, 0);
}

gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
Expand All @@ -303,8 +293,8 @@ gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
watcher->fd = fd;
gpr_mu_unlock(&fd->watcher_mu);

return (gpr_atm_acq_load(&fd->readst.state) != READY ? read_mask : 0) |
(gpr_atm_acq_load(&fd->writest.state) != READY ? write_mask : 0);
return (gpr_atm_acq_load(&fd->readst) != READY ? read_mask : 0) |
(gpr_atm_acq_load(&fd->writest) != READY ? write_mask : 0);
}

void grpc_fd_end_poll(grpc_fd_watcher *watcher) {
Expand Down
14 changes: 5 additions & 9 deletions src/core/iomgr/fd_posix.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,7 @@
typedef struct {
grpc_iomgr_cb_func cb;
void *cb_arg;
int success;
gpr_atm state;
} grpc_fd_state;
} grpc_iomgr_closure;

typedef struct grpc_fd grpc_fd;

Expand All @@ -71,8 +69,8 @@ struct grpc_fd {
gpr_mu watcher_mu;
grpc_fd_watcher watcher_root;

grpc_fd_state readst;
grpc_fd_state writest;
gpr_atm readst;
gpr_atm writest;

grpc_iomgr_cb_func on_done;
void *on_done_user_data;
Expand Down Expand Up @@ -126,12 +124,10 @@ void grpc_fd_shutdown(grpc_fd *fd);
underlying platform. This means that users must drain fd in read_cb before
calling notify_on_read again. Users are also expected to handle spurious
events, i.e read_cb is called while nothing can be readable from fd */
void grpc_fd_notify_on_read(grpc_fd *fd, grpc_iomgr_cb_func read_cb,
void *read_cb_arg);
void grpc_fd_notify_on_read(grpc_fd *fd, grpc_iomgr_closure *closure);

/* Exactly the same semantics as above, except based on writable events. */
void grpc_fd_notify_on_write(grpc_fd *fd, grpc_iomgr_cb_func write_cb,
void *write_cb_arg);
void grpc_fd_notify_on_write(grpc_fd *fd, grpc_iomgr_closure *closure);

/* Notification from the poller to an fd that it has become readable or
writable.
Expand Down
7 changes: 5 additions & 2 deletions src/core/iomgr/tcp_client_posix.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ typedef struct {
gpr_timespec deadline;
grpc_alarm alarm;
int refs;
grpc_iomgr_closure write_closure;
} async_connect;

static int prepare_socket(const struct sockaddr *addr, int fd) {
Expand Down Expand Up @@ -136,7 +137,7 @@ static void on_writable(void *acp, int success) {
opened too many network connections. The "easy" fix:
don't do that! */
gpr_log(GPR_ERROR, "kernel out of buffers");
grpc_fd_notify_on_write(ac->fd, on_writable, ac);
grpc_fd_notify_on_write(ac->fd, &ac->write_closure);
return;
} else {
switch (so_error) {
Expand Down Expand Up @@ -229,9 +230,11 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep),
ac->fd = grpc_fd_create(fd);
gpr_mu_init(&ac->mu);
ac->refs = 2;
ac->write_closure.cb = on_writable;
ac->write_closure.cb_arg = ac;

grpc_alarm_init(&ac->alarm, deadline, on_alarm, ac, gpr_now());
grpc_fd_notify_on_write(ac->fd, on_writable, ac);
grpc_fd_notify_on_write(ac->fd, &ac->write_closure);
}

#endif
15 changes: 11 additions & 4 deletions src/core/iomgr/tcp_posix.c
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,9 @@ typedef struct {
void *write_user_data;

grpc_tcp_slice_state write_state;

grpc_iomgr_closure read_closure;
grpc_iomgr_closure write_closure;
} grpc_tcp;

static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success);
Expand Down Expand Up @@ -370,7 +373,7 @@ static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success) {
} else {
/* Spurious read event, consume it here */
slice_state_destroy(&read_state);
grpc_fd_notify_on_read(tcp->em_fd, grpc_tcp_handle_read, tcp);
grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure);
}
} else {
/* TODO(klempner): Log interesting errors */
Expand Down Expand Up @@ -405,7 +408,7 @@ static void grpc_tcp_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb,
tcp->read_cb = cb;
tcp->read_user_data = user_data;
gpr_ref(&tcp->refcount);
grpc_fd_notify_on_read(tcp->em_fd, grpc_tcp_handle_read, tcp);
grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure);
}

#define MAX_WRITE_IOVEC 16
Expand Down Expand Up @@ -468,7 +471,7 @@ static void grpc_tcp_handle_write(void *arg /* grpc_tcp */, int success) {

write_status = grpc_tcp_flush(tcp);
if (write_status == GRPC_ENDPOINT_WRITE_PENDING) {
grpc_fd_notify_on_write(tcp->em_fd, grpc_tcp_handle_write, tcp);
grpc_fd_notify_on_write(tcp->em_fd, &tcp->write_closure);
} else {
slice_state_destroy(&tcp->write_state);
if (write_status == GRPC_ENDPOINT_WRITE_DONE) {
Expand Down Expand Up @@ -513,7 +516,7 @@ static grpc_endpoint_write_status grpc_tcp_write(grpc_endpoint *ep,
gpr_ref(&tcp->refcount);
tcp->write_cb = cb;
tcp->write_user_data = user_data;
grpc_fd_notify_on_write(tcp->em_fd, grpc_tcp_handle_write, tcp);
grpc_fd_notify_on_write(tcp->em_fd, &tcp->write_closure);
}

return status;
Expand Down Expand Up @@ -541,6 +544,10 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size) {
/* paired with unref in grpc_tcp_destroy */
gpr_ref_init(&tcp->refcount, 1);
tcp->em_fd = em_fd;
tcp->read_closure.cb = grpc_tcp_handle_read;
tcp->read_closure.cb_arg = tcp;
tcp->write_closure.cb = grpc_tcp_handle_write;
tcp->write_closure.cb_arg = tcp;
return &tcp->base;
}

Expand Down
7 changes: 5 additions & 2 deletions src/core/iomgr/tcp_server_posix.c
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ typedef struct {
struct sockaddr_un un;
} addr;
int addr_len;
grpc_iomgr_closure read_closure;
} server_port;

static void unlink_if_unix_domain_socket(const struct sockaddr_un *un) {
Expand Down Expand Up @@ -244,7 +245,7 @@ static void on_read(void *arg, int success) {
case EINTR:
continue;
case EAGAIN:
grpc_fd_notify_on_read(sp->emfd, on_read, sp);
grpc_fd_notify_on_read(sp->emfd, &sp->read_closure);
return;
default:
gpr_log(GPR_ERROR, "Failed accept4: %s", strerror(errno));
Expand Down Expand Up @@ -393,7 +394,9 @@ void grpc_tcp_server_start(grpc_tcp_server *s, grpc_pollset **pollsets,
for (j = 0; j < pollset_count; j++) {
grpc_pollset_add_fd(pollsets[j], s->ports[i].emfd);
}
grpc_fd_notify_on_read(s->ports[i].emfd, on_read, &s->ports[i]);
s->ports[i].read_closure.cb = on_read;
s->ports[i].read_closure.cb_arg = &s->ports[i];
grpc_fd_notify_on_read(s->ports[i].emfd, &s->ports[i].read_closure);
s->active_ports++;
}
gpr_mu_unlock(&s->mu);
Expand Down
Loading

0 comments on commit 0fcd53c

Please sign in to comment.