Skip to content

Commit

Permalink
Merge pull request grpc#1758 from dgquintas/iomgr_managed_cbs
Browse files Browse the repository at this point in the history
Revamped iomgr's callback mechanism
  • Loading branch information
ctiller committed Jun 2, 2015
2 parents 84e520f + 69ba871 commit 1b932e7
Show file tree
Hide file tree
Showing 16 changed files with 180 additions and 103 deletions.
11 changes: 9 additions & 2 deletions src/core/channel/child_channel.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ typedef struct {
gpr_uint8 sending_farewell;
/* have we sent farewell (goaway + disconnect) */
gpr_uint8 sent_farewell;

grpc_iomgr_closure finally_destroy_channel_closure;
grpc_iomgr_closure send_farewells_closure;
} lb_channel_data;

typedef struct { grpc_child_channel *channel; } lb_call_data;
Expand Down Expand Up @@ -213,12 +216,16 @@ static void maybe_destroy_channel(grpc_child_channel *channel) {
lb_channel_data *chand = LINK_BACK_ELEM_FROM_CHANNEL(channel)->channel_data;
if (chand->destroyed && chand->disconnected && chand->active_calls == 0 &&
!chand->sending_farewell && !chand->calling_back) {
grpc_iomgr_add_callback(finally_destroy_channel, channel);
chand->finally_destroy_channel_closure.cb = finally_destroy_channel;
chand->finally_destroy_channel_closure.cb_arg = channel;
grpc_iomgr_add_callback(&chand->finally_destroy_channel_closure);
} else if (chand->destroyed && !chand->disconnected &&
chand->active_calls == 0 && !chand->sending_farewell &&
!chand->sent_farewell) {
chand->sending_farewell = 1;
grpc_iomgr_add_callback(send_farewells, channel);
chand->send_farewells_closure.cb = send_farewells;
chand->send_farewells_closure.cb_arg = channel;
grpc_iomgr_add_callback(&chand->send_farewells_closure);
}
}

Expand Down
53 changes: 28 additions & 25 deletions src/core/iomgr/fd_posix.c
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ static grpc_fd *alloc_fd(int fd) {
gpr_mu_init(&r->set_state_mu);
gpr_mu_init(&r->watcher_mu);
}

gpr_atm_rel_store(&r->refst, 1);
gpr_atm_rel_store(&r->readst, NOT_READY);
gpr_atm_rel_store(&r->writest, NOT_READY);
Expand All @@ -116,8 +117,7 @@ static void ref_by(grpc_fd *fd, int n) {
static void unref_by(grpc_fd *fd, int n) {
gpr_atm old = gpr_atm_full_fetch_add(&fd->refst, -n);
if (old == n) {
close(fd->fd);
grpc_iomgr_add_callback(fd->on_done, fd->on_done_user_data);
grpc_iomgr_add_callback(&fd->on_done_closure);
freelist_fd(fd);
grpc_iomgr_unref();
} else {
Expand Down Expand Up @@ -180,8 +180,8 @@ static void wake_all_watchers_locked(grpc_fd *fd) {
}

void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_cb_func on_done, void *user_data) {
fd->on_done = on_done ? on_done : do_nothing;
fd->on_done_user_data = user_data;
grpc_iomgr_closure_init(&fd->on_done_closure, on_done ? on_done : do_nothing,
user_data);
shutdown(fd->fd, SHUT_RDWR);
ref_by(fd, 1); /* remove active status, but keep referenced */
gpr_mu_lock(&fd->watcher_mu);
Expand All @@ -195,21 +195,20 @@ void grpc_fd_ref(grpc_fd *fd) { ref_by(fd, 2); }

void grpc_fd_unref(grpc_fd *fd) { unref_by(fd, 2); }

static void make_callback(grpc_iomgr_cb_func cb, void *arg, int success,
static void process_callback(grpc_iomgr_closure *closure, int success,
int allow_synchronous_callback) {
if (allow_synchronous_callback) {
cb(arg, success);
closure->cb(closure->cb_arg, success);
} else {
grpc_iomgr_add_delayed_callback(cb, arg, success);
grpc_iomgr_add_delayed_callback(closure, success);
}
}

static void make_callbacks(grpc_iomgr_closure *callbacks, size_t n, int success,
int allow_synchronous_callback) {
static void process_callbacks(grpc_iomgr_closure *callbacks, size_t n,
int success, int allow_synchronous_callback) {
size_t i;
for (i = 0; i < n; i++) {
make_callback(callbacks[i].cb, callbacks[i].cb_arg, success,
allow_synchronous_callback);
process_callback(callbacks + i, success, allow_synchronous_callback);
}
}

Expand All @@ -234,10 +233,9 @@ static void notify_on(grpc_fd *fd, gpr_atm *st, grpc_iomgr_closure *closure,
/* swap was unsuccessful due to an intervening set_ready call.
Fall through to the READY code below */
case READY:
assert(gpr_atm_no_barrier_load(st) == READY);
GPR_ASSERT(gpr_atm_no_barrier_load(st) == READY);
gpr_atm_rel_store(st, NOT_READY);
make_callback(closure->cb, closure->cb_arg,
!gpr_atm_acq_load(&fd->shutdown),
process_callback(closure, !gpr_atm_acq_load(&fd->shutdown),
allow_synchronous_callback);
return;
default: /* WAITING */
Expand All @@ -251,7 +249,7 @@ static void notify_on(grpc_fd *fd, gpr_atm *st, grpc_iomgr_closure *closure,
abort();
}

static void set_ready_locked(gpr_atm *st, grpc_iomgr_closure *callbacks,
static void set_ready_locked(gpr_atm *st, grpc_iomgr_closure **callbacks,
size_t *ncallbacks) {
gpr_intptr state = gpr_atm_acq_load(st);

Expand All @@ -269,9 +267,9 @@ static void set_ready_locked(gpr_atm *st, grpc_iomgr_closure *callbacks,
Fall through to the WAITING code below */
state = gpr_atm_acq_load(st);
default: /* waiting */
assert(gpr_atm_no_barrier_load(st) != READY &&
gpr_atm_no_barrier_load(st) != NOT_READY);
callbacks[(*ncallbacks)++] = *(grpc_iomgr_closure *)state;
GPR_ASSERT(gpr_atm_no_barrier_load(st) != READY &&
gpr_atm_no_barrier_load(st) != NOT_READY);
callbacks[(*ncallbacks)++] = (grpc_iomgr_closure *)state;
gpr_atm_rel_store(st, NOT_READY);
return;
}
Expand All @@ -282,25 +280,30 @@ static void set_ready(grpc_fd *fd, gpr_atm *st,
/* only one set_ready can be active at once (but there may be a racing
notify_on) */
int success;
grpc_iomgr_closure cb;
grpc_iomgr_closure* closure;
size_t ncb = 0;

gpr_mu_lock(&fd->set_state_mu);
set_ready_locked(st, &cb, &ncb);
set_ready_locked(st, &closure, &ncb);
gpr_mu_unlock(&fd->set_state_mu);
success = !gpr_atm_acq_load(&fd->shutdown);
make_callbacks(&cb, ncb, success, allow_synchronous_callback);
GPR_ASSERT(ncb <= 1);
if (ncb > 0) {
process_callbacks(closure, ncb, success, allow_synchronous_callback);
}
}

void grpc_fd_shutdown(grpc_fd *fd) {
grpc_iomgr_closure cb[2];
size_t ncb = 0;
gpr_mu_lock(&fd->set_state_mu);
GPR_ASSERT(!gpr_atm_no_barrier_load(&fd->shutdown));
gpr_atm_rel_store(&fd->shutdown, 1);
set_ready_locked(&fd->readst, cb, &ncb);
set_ready_locked(&fd->writest, cb, &ncb);
set_ready_locked(&fd->readst, &fd->shutdown_closures[0], &ncb);
set_ready_locked(&fd->writest, &fd->shutdown_closures[0], &ncb);
gpr_mu_unlock(&fd->set_state_mu);
make_callbacks(cb, ncb, 0, 0);
GPR_ASSERT(ncb <= 2);
process_callbacks(fd->shutdown_closures[0], ncb, 0 /* GPR_FALSE */,
0 /* GPR_FALSE */);
}

void grpc_fd_notify_on_read(grpc_fd *fd, grpc_iomgr_closure *closure) {
Expand Down
10 changes: 3 additions & 7 deletions src/core/iomgr/fd_posix.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,6 @@
#include <grpc/support/sync.h>
#include <grpc/support/time.h>

typedef struct {
grpc_iomgr_cb_func cb;
void *cb_arg;
} grpc_iomgr_closure;

typedef struct grpc_fd grpc_fd;

typedef struct grpc_fd_watcher {
Expand Down Expand Up @@ -96,9 +91,10 @@ struct grpc_fd {
gpr_atm readst;
gpr_atm writest;

grpc_iomgr_cb_func on_done;
void *on_done_user_data;
struct grpc_fd *freelist_next;

grpc_iomgr_closure on_done_closure;
grpc_iomgr_closure *shutdown_closures[2];
};

/* Create a wrapped file descriptor.
Expand Down
68 changes: 32 additions & 36 deletions src/core/iomgr/iomgr.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,10 @@
#include <grpc/support/thd.h>
#include <grpc/support/sync.h>

typedef struct delayed_callback {
grpc_iomgr_cb_func cb;
void *cb_arg;
int success;
struct delayed_callback *next;
} delayed_callback;

static gpr_mu g_mu;
static gpr_cv g_rcv;
static delayed_callback *g_cbs_head = NULL;
static delayed_callback *g_cbs_tail = NULL;
static grpc_iomgr_closure *g_cbs_head = NULL;
static grpc_iomgr_closure *g_cbs_tail = NULL;
static int g_shutdown;
static int g_refs;
static gpr_event g_background_callback_executor_done;
Expand All @@ -66,12 +59,11 @@ static void background_callback_executor(void *ignored) {
gpr_timespec short_deadline =
gpr_time_add(gpr_now(), gpr_time_from_millis(100));
if (g_cbs_head) {
delayed_callback *cb = g_cbs_head;
g_cbs_head = cb->next;
grpc_iomgr_closure *closure = g_cbs_head;
g_cbs_head = closure->next;
if (!g_cbs_head) g_cbs_tail = NULL;
gpr_mu_unlock(&g_mu);
cb->cb(cb->cb_arg, cb->success);
gpr_free(cb);
closure->cb(closure->cb_arg, closure->success);
gpr_mu_lock(&g_mu);
} else if (grpc_alarm_check(&g_mu, gpr_now(), &deadline)) {
} else {
Expand Down Expand Up @@ -103,7 +95,7 @@ void grpc_iomgr_init(void) {
}

void grpc_iomgr_shutdown(void) {
delayed_callback *cb;
grpc_iomgr_closure *closure;
gpr_timespec shutdown_deadline =
gpr_time_add(gpr_now(), gpr_time_from_seconds(10));

Expand All @@ -114,13 +106,12 @@ void grpc_iomgr_shutdown(void) {
gpr_log(GPR_DEBUG, "Waiting for %d iomgr objects to be destroyed%s", g_refs,
g_cbs_head ? " and executing final callbacks" : "");
while (g_cbs_head) {
cb = g_cbs_head;
g_cbs_head = cb->next;
closure = g_cbs_head;
g_cbs_head = closure->next;
if (!g_cbs_head) g_cbs_tail = NULL;
gpr_mu_unlock(&g_mu);

cb->cb(cb->cb_arg, 0);
gpr_free(cb);
closure->cb(closure->cb_arg, 0);
gpr_mu_lock(&g_mu);
}
if (g_refs) {
Expand Down Expand Up @@ -167,42 +158,48 @@ void grpc_iomgr_unref(void) {
gpr_mu_unlock(&g_mu);
}

void grpc_iomgr_add_delayed_callback(grpc_iomgr_cb_func cb, void *cb_arg,
int success) {
delayed_callback *dcb = gpr_malloc(sizeof(delayed_callback));
dcb->cb = cb;
dcb->cb_arg = cb_arg;
dcb->success = success;

void grpc_iomgr_closure_init(grpc_iomgr_closure *closure, grpc_iomgr_cb_func cb,
void *cb_arg) {
closure->cb = cb;
closure->cb_arg = cb_arg;
closure->next = NULL;
}

void grpc_iomgr_add_delayed_callback(grpc_iomgr_closure *closure, int success) {
closure->success = success;
gpr_mu_lock(&g_mu);
dcb->next = NULL;
closure->next = NULL;
if (!g_cbs_tail) {
g_cbs_head = g_cbs_tail = dcb;
g_cbs_head = g_cbs_tail = closure;
} else {
g_cbs_tail->next = dcb;
g_cbs_tail = dcb;
g_cbs_tail->next = closure;
g_cbs_tail = closure;
}
gpr_mu_unlock(&g_mu);
}

void grpc_iomgr_add_callback(grpc_iomgr_cb_func cb, void *cb_arg) {
grpc_iomgr_add_delayed_callback(cb, cb_arg, 1);

void grpc_iomgr_add_callback(grpc_iomgr_closure *closure) {
grpc_iomgr_add_delayed_callback(closure, 1 /* GPR_TRUE */);
}


int grpc_maybe_call_delayed_callbacks(gpr_mu *drop_mu, int success) {
int n = 0;
gpr_mu *retake_mu = NULL;
delayed_callback *cb;
grpc_iomgr_closure *closure;
for (;;) {
/* check for new work */
if (!gpr_mu_trylock(&g_mu)) {
break;
}
cb = g_cbs_head;
if (!cb) {
closure = g_cbs_head;
if (!closure) {
gpr_mu_unlock(&g_mu);
break;
}
g_cbs_head = cb->next;
g_cbs_head = closure->next;
if (!g_cbs_head) g_cbs_tail = NULL;
gpr_mu_unlock(&g_mu);
/* if we have a mutex to drop, do so before executing work */
Expand All @@ -211,8 +208,7 @@ int grpc_maybe_call_delayed_callbacks(gpr_mu *drop_mu, int success) {
retake_mu = drop_mu;
drop_mu = NULL;
}
cb->cb(cb->cb_arg, success && cb->success);
gpr_free(cb);
closure->cb(closure->cb_arg, success && closure->success);
n++;
}
if (retake_mu) {
Expand Down
37 changes: 33 additions & 4 deletions src/core/iomgr/iomgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,43 @@
#ifndef GRPC_INTERNAL_CORE_IOMGR_IOMGR_H
#define GRPC_INTERNAL_CORE_IOMGR_IOMGR_H

/* gRPC Callback definition */
/** gRPC Callback definition.
*
* \param arg Arbitrary input.
* \param success An indication on the state of the iomgr. On false, cleanup
* actions should be taken (eg, shutdown). */
typedef void (*grpc_iomgr_cb_func)(void *arg, int success);

/** A closure over a grpc_iomgr_cb_func. */
typedef struct grpc_iomgr_closure {
/** Bound callback. */
grpc_iomgr_cb_func cb;

/** Arguments to be passed to "cb". */
void *cb_arg;

/** Internal. A boolean indication to "cb" on the state of the iomgr.
* For instance, closures created during a shutdown would have this field set
* to false. */
int success;

/**< Internal. Do not touch */
struct grpc_iomgr_closure *next;
} grpc_iomgr_closure;

/** Initializes \a closure with \a cb and \a cb_arg. */
void grpc_iomgr_closure_init(grpc_iomgr_closure *closure, grpc_iomgr_cb_func cb,
void *cb_arg);

/** Initializes the iomgr. */
void grpc_iomgr_init(void);

/** Signals the intention to shutdown the iomgr. */
void grpc_iomgr_shutdown(void);

/* This function is called from within a callback or from anywhere else
and causes the invocation of a callback at some point in the future */
void grpc_iomgr_add_callback(grpc_iomgr_cb_func cb, void *cb_arg);
/** Registers a closure to be invoked at some point in the future.
*
* Can be called from within a callback or from anywhere else */
void grpc_iomgr_add_callback(grpc_iomgr_closure *closure);

#endif /* GRPC_INTERNAL_CORE_IOMGR_IOMGR_H */
4 changes: 1 addition & 3 deletions src/core/iomgr/iomgr_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,10 @@
#define GRPC_INTERNAL_CORE_IOMGR_IOMGR_INTERNAL_H

#include "src/core/iomgr/iomgr.h"
#include "src/core/iomgr/iomgr_internal.h"
#include <grpc/support/sync.h>

int grpc_maybe_call_delayed_callbacks(gpr_mu *drop_mu, int success);
void grpc_iomgr_add_delayed_callback(grpc_iomgr_cb_func cb, void *cb_arg,
int success);
void grpc_iomgr_add_delayed_callback(grpc_iomgr_closure *iocb, int success);

void grpc_iomgr_ref(void);
void grpc_iomgr_unref(void);
Expand Down
Loading

0 comments on commit 1b932e7

Please sign in to comment.