Skip to content

Commit

Permalink
Merge pull request grpc#16434 from ericgribkoff/fork_support_v2_mac_poll
Browse files Browse the repository at this point in the history
Support tracking and closing fds post-fork in ev_poll_posix
  • Loading branch information
ericgribkoff authored Aug 24, 2018
2 parents 35479b8 + edac3c6 commit b1f0d68
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 10 deletions.
114 changes: 113 additions & 1 deletion src/core/lib/iomgr/ev_poll_posix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,19 @@ typedef struct grpc_fd_watcher {
grpc_fd* fd;
} grpc_fd_watcher;

typedef struct grpc_cached_wakeup_fd grpc_cached_wakeup_fd;

/* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
struct grpc_fork_fd_list {
/* Only one of fd or cached_wakeup_fd will be set. The unused field will be
set to nullptr. */
grpc_fd* fd;
grpc_cached_wakeup_fd* cached_wakeup_fd;

grpc_fork_fd_list* next;
grpc_fork_fd_list* prev;
};

struct grpc_fd {
int fd;
/* refst format:
Expand Down Expand Up @@ -108,8 +121,18 @@ struct grpc_fd {
grpc_closure* on_done_closure;

grpc_iomgr_object iomgr_object;

/* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
grpc_fork_fd_list* fork_fd_list;
};

/* True when GRPC_ENABLE_FORK_SUPPORT=1. We do not support fork with poll-cv */
static bool track_fds_for_fork = false;

/* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
static grpc_fork_fd_list* fork_fd_list_head = nullptr;
static gpr_mu fork_fd_list_mu;

/* Begin polling on an fd.
Registers that the given pollset is interested in this fd - so that if read
or writability interest changes, the pollset can be kicked to pick up that
Expand Down Expand Up @@ -156,6 +179,9 @@ static void fd_unref(grpc_fd* fd);
typedef struct grpc_cached_wakeup_fd {
grpc_wakeup_fd fd;
struct grpc_cached_wakeup_fd* next;

/* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
grpc_fork_fd_list* fork_fd_list;
} grpc_cached_wakeup_fd;

struct grpc_pollset_worker {
Expand Down Expand Up @@ -281,9 +307,61 @@ poll_hash_table poll_cache;
grpc_cv_fd_table g_cvfds;

/*******************************************************************************
* fd_posix.c
* functions to track opened fds. No-ops unless track_fds_for_fork is true.
*/

static void fork_fd_list_remove_node(grpc_fork_fd_list* node) {
if (track_fds_for_fork) {
gpr_mu_lock(&fork_fd_list_mu);
if (fork_fd_list_head == node) {
fork_fd_list_head = node->next;
}
if (node->prev != nullptr) {
node->prev->next = node->next;
}
if (node->next != nullptr) {
node->next->prev = node->prev;
}
gpr_free(node);
gpr_mu_unlock(&fork_fd_list_mu);
}
}

static void fork_fd_list_add_node(grpc_fork_fd_list* node) {
gpr_mu_lock(&fork_fd_list_mu);
node->next = fork_fd_list_head;
node->prev = nullptr;
if (fork_fd_list_head != nullptr) {
fork_fd_list_head->prev = node;
}
fork_fd_list_head = node;
gpr_mu_unlock(&fork_fd_list_mu);
}

static void fork_fd_list_add_grpc_fd(grpc_fd* fd) {
if (track_fds_for_fork) {
fd->fork_fd_list =
static_cast<grpc_fork_fd_list*>(gpr_malloc(sizeof(grpc_fork_fd_list)));
fd->fork_fd_list->fd = fd;
fd->fork_fd_list->cached_wakeup_fd = nullptr;
fork_fd_list_add_node(fd->fork_fd_list);
}
}

static void fork_fd_list_add_wakeup_fd(grpc_cached_wakeup_fd* fd) {
if (track_fds_for_fork) {
fd->fork_fd_list =
static_cast<grpc_fork_fd_list*>(gpr_malloc(sizeof(grpc_fork_fd_list)));
fd->fork_fd_list->cached_wakeup_fd = fd;
fd->fork_fd_list->fd = nullptr;
fork_fd_list_add_node(fd->fork_fd_list);
}
}

/*******************************************************************************
* fd_posix.c
*/

#ifndef NDEBUG
#define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
#define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
Expand Down Expand Up @@ -319,6 +397,7 @@ static void unref_by(grpc_fd* fd, int n) {
if (old == n) {
gpr_mu_destroy(&fd->mu);
grpc_iomgr_unregister_object(&fd->iomgr_object);
fork_fd_list_remove_node(fd->fork_fd_list);
if (fd->shutdown) GRPC_ERROR_UNREF(fd->shutdown_error);
gpr_free(fd);
} else {
Expand Down Expand Up @@ -347,6 +426,7 @@ static grpc_fd* fd_create(int fd, const char* name, bool track_err) {
gpr_asprintf(&name2, "%s fd=%d", name, fd);
grpc_iomgr_register_object(&r->iomgr_object, name2);
gpr_free(name2);
fork_fd_list_add_grpc_fd(r);
return r;
}

Expand Down Expand Up @@ -822,6 +902,7 @@ static void pollset_destroy(grpc_pollset* pollset) {
GPR_ASSERT(!pollset_has_workers(pollset));
while (pollset->local_wakeup_cache) {
grpc_cached_wakeup_fd* next = pollset->local_wakeup_cache->next;
fork_fd_list_remove_node(pollset->local_wakeup_cache->fork_fd_list);
grpc_wakeup_fd_destroy(&pollset->local_wakeup_cache->fd);
gpr_free(pollset->local_wakeup_cache);
pollset->local_wakeup_cache = next;
Expand Down Expand Up @@ -895,6 +976,7 @@ static grpc_error* pollset_work(grpc_pollset* pollset,
worker.wakeup_fd = static_cast<grpc_cached_wakeup_fd*>(
gpr_malloc(sizeof(*worker.wakeup_fd)));
error = grpc_wakeup_fd_init(&worker.wakeup_fd->fd);
fork_fd_list_add_wakeup_fd(worker.wakeup_fd);
if (error != GRPC_ERROR_NONE) {
GRPC_LOG_IF_ERROR("pollset_work", GRPC_ERROR_REF(error));
return error;
Expand Down Expand Up @@ -1705,6 +1787,10 @@ static void shutdown_engine(void) {
if (grpc_cv_wakeup_fds_enabled()) {
global_cv_fd_table_shutdown();
}
if (track_fds_for_fork) {
gpr_mu_destroy(&fork_fd_list_mu);
grpc_core::Fork::SetResetChildPollingEngineFunc(nullptr);
}
}

static const grpc_event_engine_vtable vtable = {
Expand Down Expand Up @@ -1742,6 +1828,26 @@ static const grpc_event_engine_vtable vtable = {
shutdown_engine,
};

/* Called by the child process's post-fork handler to close open fds, including
* worker wakeup fds. This allows gRPC to shutdown in the child process without
* interfering with connections or RPCs ongoing in the parent. */
static void reset_event_manager_on_fork() {
gpr_mu_lock(&fork_fd_list_mu);
while (fork_fd_list_head != nullptr) {
if (fork_fd_list_head->fd != nullptr) {
close(fork_fd_list_head->fd->fd);
fork_fd_list_head->fd->fd = -1;
} else {
close(fork_fd_list_head->cached_wakeup_fd->fd.read_fd);
fork_fd_list_head->cached_wakeup_fd->fd.read_fd = -1;
close(fork_fd_list_head->cached_wakeup_fd->fd.write_fd);
fork_fd_list_head->cached_wakeup_fd->fd.write_fd = -1;
}
fork_fd_list_head = fork_fd_list_head->next;
}
gpr_mu_unlock(&fork_fd_list_mu);
}

const grpc_event_engine_vtable* grpc_init_poll_posix(bool explicit_request) {
if (!grpc_has_wakeup_fd()) {
gpr_log(GPR_ERROR, "Skipping poll because of no wakeup fd.");
Expand All @@ -1750,6 +1856,12 @@ const grpc_event_engine_vtable* grpc_init_poll_posix(bool explicit_request) {
if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
return nullptr;
}
if (grpc_core::Fork::Enabled()) {
track_fds_for_fork = true;
gpr_mu_init(&fork_fd_list_mu);
grpc_core::Fork::SetResetChildPollingEngineFunc(
reset_event_manager_on_fork);
}
return &vtable;
}

Expand Down
6 changes: 6 additions & 0 deletions src/core/lib/iomgr/fork_posix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ void grpc_prefork() {
"environment variable GRPC_ENABLE_FORK_SUPPORT=1");
return;
}
if (strcmp(grpc_get_poll_strategy_name(), "epoll1") != 0 &&
strcmp(grpc_get_poll_strategy_name(), "poll") != 0) {
gpr_log(GPR_ERROR,
"Fork support is only compatible with the epoll1 and poll polling "
"strategies");
}
if (!grpc_core::Fork::BlockExecCtx()) {
gpr_log(GPR_INFO,
"Other threads are currently calling into gRPC, skipping fork() "
Expand Down
9 changes: 0 additions & 9 deletions src/python/grpcio/grpc/_cython/_cygrpc/fork_posix.pyx.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ _GRPC_ENABLE_FORK_SUPPORT = (
os.environ.get('GRPC_ENABLE_FORK_SUPPORT', '0')
.lower() in _TRUE_VALUES)

_GRPC_POLL_STRATEGY = os.environ.get('GRPC_POLL_STRATEGY')

cdef void __prefork() nogil:
with gil:
with _fork_state.fork_in_progress_condition:
Expand Down Expand Up @@ -82,13 +80,6 @@ cdef void __postfork_child() nogil:
def fork_handlers_and_grpc_init():
grpc_init()
if _GRPC_ENABLE_FORK_SUPPORT:
# TODO(ericgribkoff) epoll1 is default for grpcio distribution. Decide whether to expose
# grpc_get_poll_strategy_name() from ev_posix.cc to get actual polling choice.
if _GRPC_POLL_STRATEGY is not None and _GRPC_POLL_STRATEGY != "epoll1":
_LOGGER.error(
'gRPC Python fork support is only compatible with the epoll1 '
'polling engine')
return
with _fork_state.fork_handler_registered_lock:
if not _fork_state.fork_handler_registered:
pthread_atfork(&__prefork, &__postfork_parent, &__postfork_child)
Expand Down

0 comments on commit b1f0d68

Please sign in to comment.