Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Python epoll1 Fork Support #32196

Merged
merged 23 commits into from
Feb 2, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Roughly working.
  • Loading branch information
gnossen committed Jan 18, 2023
commit f3471560e8ae511cb9c2b2c6d5337a0fe152cdf9
7 changes: 7 additions & 0 deletions bazel/cython_library.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,13 @@ def pyx_library(name, deps = [], py_deps = [], srcs = [], **kwargs):
srcs = [stem + ".cpp"],
deps = deps + ["@local_config_python//:python_headers"],
linkshared = 1,
linkopts = [
"-lpthread",
],
defines = [
"GRPC_ENABLE_FORK_SUPPORT=1",
"GRPC_POSIX_FORK_ALLOW_PTHREAD_ATFORK=1",
],
)
shared_objects.append(shared_object_name)

Expand Down
1 change: 1 addition & 0 deletions src/core/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -1562,6 +1562,7 @@ grpc_cc_library(
"absl/strings",
],
deps = [
"forkable",
"event_engine_poller",
"event_engine_time_util",
"iomgr_port",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,7 @@ static void destroy_transport_locked(void* tp, grpc_error_handle /*error*/) {
}

static void destroy_transport(grpc_transport* gt) {
// TODO: Dump stacktrace here.
grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
t->combiner->Run(GRPC_CLOSURE_CREATE(destroy_transport_locked, t, nullptr),
absl::OkStatus());
Expand Down
9 changes: 5 additions & 4 deletions src/core/lib/event_engine/forkable.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,11 @@ void RegisterForkHandlers() {
void PrepareFork() {
std::cerr << "AAAAAAAAAAAAAAAAAAa Entering PrepareFork" << std::endl << std::flush;
grpc_core::MutexLock lock(g_mu.get());
for (auto* forkable : *g_forkables) {
std::cerr << "AAAAAAAAAAAAAAAAAAa Calling forkable->PrepareFork " << forkable << std::endl << std::flush;
forkable->PrepareFork();
std::cerr << "AAAAAAAAAAAAAAAAAAa Called forkable->PrepareFork " << forkable << std::endl << std::flush;
// for (auto* forkable : *g_forkables) {
for (auto forkable_iter = g_forkables->rbegin(); forkable_iter != g_forkables->rend(); ++forkable_iter) {
std::cerr << "AAAAAAAAAAAAAAAAAAa Calling forkable->PrepareFork " << *forkable_iter << std::endl << std::flush;
(*forkable_iter)->PrepareFork();
std::cerr << "AAAAAAAAAAAAAAAAAAa Called forkable->PrepareFork " << *forkable_iter << std::endl << std::flush;
}
std::cerr << "AAAAAAAAAAAAAAAAAAa Exiting PrepareFork" << std::endl << std::flush;
}
Expand Down
25 changes: 17 additions & 8 deletions src/core/lib/event_engine/posix_engine/ev_epoll1_linux.cc
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ void ResetEventManagerOnFork() {
while (!fork_poller_list.empty()) {
Epoll1Poller* poller = fork_poller_list.front();
fork_poller_list.pop_front();
delete poller;
poller->Close();
}
gpr_mu_unlock(&fork_fd_list_mu);
if (grpc_core::Fork::Enabled()) {
Expand Down Expand Up @@ -344,7 +344,7 @@ void Epoll1EventHandle::HandleShutdownInternal(absl::Status why,
}

Epoll1Poller::Epoll1Poller(Scheduler* scheduler)
: scheduler_(scheduler), was_kicked_(false) {
: scheduler_(scheduler), was_kicked_(false), closed_(false) {
g_epoll_set_.epfd = EpollCreateAndCloexec();
wakeup_fd_ = *CreateWakeupFd();
GPR_ASSERT(wakeup_fd_ != nullptr);
Expand All @@ -366,8 +366,9 @@ void Epoll1Poller::Shutdown() {
delete this;
}

Epoll1Poller::~Epoll1Poller() {
std::cerr << "AAAAAAAAAAAAAAAAAAAAA Destroying Epoll1Poller " << this << std::endl << std::flush;
void Epoll1Poller::Close() {
if (closed_) return;

if (g_epoll_set_.epfd >= 0) {
close(g_epoll_set_.epfd);
gnossen marked this conversation as resolved.
Show resolved Hide resolved
g_epoll_set_.epfd = -1;
Expand All @@ -381,6 +382,12 @@ Epoll1Poller::~Epoll1Poller() {
delete handle;
}
}
closed_ = true;
gnossen marked this conversation as resolved.
Show resolved Hide resolved
}

Epoll1Poller::~Epoll1Poller() {
std::cerr << "AAAAAAAAAAAAAAAAAAAAA Destroying Epoll1Poller " << this << std::endl << std::flush;
Close();
}

EventHandle* Epoll1Poller::CreateHandle(int fd, absl::string_view /*name*/,
Expand Down Expand Up @@ -548,12 +555,14 @@ Poller::WorkResult Epoll1Poller::Work(
}

void Epoll1Poller::Kick() {
gpr_log(GPR_INFO, "BBBBBBBBBBBBBBBBBBBBB Entering Epoll1Poller::Kick");
grpc_core::MutexLock lock(&mu_);
if (was_kicked_) {
if (was_kicked_ || closed_) {
return;
}
was_kicked_ = true;
GPR_ASSERT(wakeup_fd_->Wakeup().ok());
gpr_log(GPR_INFO, "BBBBBBBBBBBBBBBBBBBBB Exiting Epoll1Poller::Kick");
}

Epoll1Poller* MakeEpoll1Poller(Scheduler* scheduler) {
Expand All @@ -567,9 +576,9 @@ Epoll1Poller* MakeEpoll1Poller(Scheduler* scheduler) {
void Epoll1Poller::PrepareFork() {
// Set forking flag.
// Kick the event loop.
std::cerr << "AAAAAAAAAAAAAAAAAAAAAAA Kicking event loop" << std::endl << std::flush;
gpr_log(GPR_INFO, "AAAAAAAAAAAAAAAAAAAAAAA Kicking event loop");
Kick();
std::cerr << "AAAAAAAAAAAAAAAAAAAAAAA Kicked event loop" << std::endl << std::flush;
gpr_log(GPR_INFO, "AAAAAAAAAAAAAAAAAAAAAAA Kicked event loop");
}

void Epoll1Poller::PostforkParent() {
Expand Down Expand Up @@ -625,7 +634,7 @@ Poller::WorkResult Epoll1Poller::Work(
void Epoll1Poller::Kick() { GPR_ASSERT(false && "unimplemented"); }



std::cerr << "BBBBBBBBBBBBBBBBBBBBB Entering Epoll1Poller::Kick" << std::endl << std::flush;
// If GRPC_LINUX_EPOLL is not defined, it means epoll is not available. Return
// nullptr.
Epoll1Poller* MakeEpoll1Poller(Scheduler* /*scheduler*/) { return nullptr; }
Expand Down
4 changes: 4 additions & 0 deletions src/core/lib/event_engine/posix_engine/ev_epoll1_linux.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ class Epoll1Poller : public PosixEventPoller, public grpc_event_engine::experime
void PostforkParent() override;
void PostforkChild() override;

void Close();

private:
// This initial vector size may need to be tuned
using Events = absl::InlinedVector<Epoll1EventHandle*, 5>;
Expand All @@ -85,6 +87,7 @@ class Epoll1Poller : public PosixEventPoller, public grpc_event_engine::experime
// on file descriptors that became readable/writable.
bool ProcessEpollEvents(int max_epoll_events_to_handle,
Events& pending_events);

// Do epoll_wait and store the events in g_epoll_set.events field. This does
// not "process" any of the events yet; that is done in ProcessEpollEvents().
// See ProcessEpollEvents() function for more details. It returns the number
Expand Down Expand Up @@ -123,6 +126,7 @@ class Epoll1Poller : public PosixEventPoller, public grpc_event_engine::experime
bool was_kicked_ ABSL_GUARDED_BY(mu_);
std::list<EventHandle*> free_epoll1_handles_list_ ABSL_GUARDED_BY(mu_);
std::unique_ptr<WakeupFd> wakeup_fd_;
bool closed_;
};

// Return an instance of a epoll1 based poller tied to the specified event
Expand Down
8 changes: 7 additions & 1 deletion src/core/lib/event_engine/posix_engine/ev_poll_posix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
#include <memory>
#include <utility>

#include <iostream>

#include "absl/container/inlined_vector.h"
#include "absl/functional/any_invocable.h"
#include "absl/status/status.h"
Expand Down Expand Up @@ -584,7 +586,11 @@ void PollPoller::KickExternal(bool ext) {
GPR_ASSERT(wakeup_fd_->Wakeup().ok());
}

void PollPoller::Kick() { KickExternal(true); }
void PollPoller::Kick() {
gpr_log(GPR_INFO, "BBBBBBBBBBBBBBBBBBBBB Entering PollPoller::Kick");
KickExternal(true);
gpr_log(GPR_INFO, "BBBBBBBBBBBBBBBBBBBBB Exiting PollPoller::Kick");
}

void PollPoller::PollerHandlesListAddHandle(PollEventHandle* handle) {
handle->PollerHandlesListPos().next = poll_handles_list_head_;
Expand Down
18 changes: 15 additions & 3 deletions src/core/lib/event_engine/posix_engine/posix_engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
#include <type_traits>
#include <utility>

#include <iostream>

#include "absl/cleanup/cleanup.h"
#include "absl/functional/any_invocable.h"
#include "absl/meta/type_traits.h"
Expand Down Expand Up @@ -285,7 +287,8 @@ void PosixEventEngine::OnConnectFinishInternal(int connection_handle) {
PosixEnginePollerManager::PosixEnginePollerManager(
std::shared_ptr<ThreadPool> executor)
: poller_(grpc_event_engine::experimental::MakeDefaultPoller(this)),
executor_(std::move(executor)) {}
executor_(std::move(executor)),
trigger_shutdown_called_(false) {}

PosixEnginePollerManager::PosixEnginePollerManager(PosixEventPoller* poller)
: poller_(poller),
Expand All @@ -308,10 +311,15 @@ void PosixEnginePollerManager::Run(absl::AnyInvocable<void()> cb) {
}

void PosixEnginePollerManager::TriggerShutdown() {
GPR_DEBUG_ASSERT(trigger_shutdown_called_ == false);
gnossen marked this conversation as resolved.
Show resolved Hide resolved
trigger_shutdown_called_ = true;
gpr_log(GPR_INFO, "BBBBBBBBBBBBBBBBBBBBB Entering PosixEnginePollerManager::TriggerShutdown");
// If the poller is external, dont try to shut it down. Otherwise
// set poller state to PollerState::kShuttingDown.
gpr_log(GPR_INFO, "BBBBBBBBBBBBBBBBBBBBB Resetting poller_state_)");
if (poller_state_.exchange(PollerState::kShuttingDown) ==
PollerState::kExternal) {
gpr_log(GPR_INFO, "BBBBBBBBBBBBBBBBBBBBB Unsetting poller_)");
poller_ = nullptr;
return;
}
Expand All @@ -333,13 +341,12 @@ PosixEventEngine::PosixEventEngine(PosixEventPoller* poller)

PosixEventEngine::PosixEventEngine()
: connection_shards_(std::max(2 * gpr_cpu_num_cores(), 1u)),
executor_(nullptr),
executor_(std::make_shared<ThreadPool>()),
timer_manager_(executor_) {
if (grpc_core::IsPosixEventEngineEnablePollingEnabled()) {
poller_manager_ = std::make_shared<PosixEnginePollerManager>(executor_);
// The threadpool must be instantiated after the poller otherwise, the process
// will deadlock when forking.
executor_ = std::make_shared<ThreadPool>();
if (poller_manager_->Poller() != nullptr) {
executor_->Run([poller_manager = poller_manager_]() {
PollerWorkInternal(poller_manager);
Expand Down Expand Up @@ -403,6 +410,7 @@ struct PosixEventEngine::ClosureData final : public EventEngine::Closure {

PosixEventEngine::~PosixEventEngine() {
{
gpr_log(GPR_INFO, "BBBBBBBBBBBBBBBBBBBBB Entering ~PosixEventEngine");
grpc_core::MutexLock lock(&mu_);
if (GRPC_TRACE_FLAG_ENABLED(grpc_event_engine_trace)) {
for (auto handle : known_handles_) {
Expand All @@ -415,10 +423,14 @@ PosixEventEngine::~PosixEventEngine() {
}
GPR_ASSERT(GPR_LIKELY(known_handles_.empty()));
}
gpr_log(GPR_INFO, "BBBBBBBBBBBBBBBBBBBBB Calling timer_manager_.Shutdown()");
timer_manager_.Shutdown();
gpr_log(GPR_INFO, "BBBBBBBBBBBBBBBBBBBBB Called timer_manager_.Shutdown()");
#ifdef GRPC_POSIX_SOCKET_TCP
if (poller_manager_ != nullptr) {
gpr_log(GPR_INFO, "BBBBBBBBBBBBBBBBBBBBB Calling poller_manager_->TriggerShutdown()");
poller_manager_->TriggerShutdown();
gpr_log(GPR_INFO, "BBBBBBBBBBBBBBBBBBBBB Called poller_manager_->TriggerShutdown()");
}
#endif // GRPC_POSIX_SOCKET_TCP
executor_->Quiesce();
Expand Down
1 change: 1 addition & 0 deletions src/core/lib/event_engine/posix_engine/posix_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ class PosixEnginePollerManager
grpc_event_engine::experimental::PosixEventPoller* poller_ = nullptr;
std::atomic<PollerState> poller_state_{PollerState::kOk};
std::shared_ptr<ThreadPool> executor_;
bool trigger_shutdown_called_;
};
#endif // GRPC_POSIX_SOCKET_TCP

Expand Down
4 changes: 2 additions & 2 deletions src/core/lib/gpr/log_linux.cc
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ void gpr_default_log(gpr_log_func_args* args) {
}

std::string prefix = absl::StrFormat(
"%s%s.%09" PRId32 " %7ld %s:%d]", gpr_log_severity_string(args->severity),
time_buffer, now.tv_nsec, tid, display_file, args->line);
"%s%s.%09" PRId32 " pid%d %7ld %s:%d]", gpr_log_severity_string(args->severity),
time_buffer, now.tv_nsec, getpid(), tid, display_file, args->line);

absl::optional<std::string> stack_trace =
gpr_should_log_stacktrace(args->severity)
Expand Down
18 changes: 14 additions & 4 deletions src/core/lib/gprpp/fork.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
#include "src/core/lib/gprpp/global_config_env.h"
#include "src/core/lib/gprpp/no_destruct.h"

#include <iostream>

//
// NOTE: FORKING IS NOT GENERALLY SUPPORTED, THIS IS ONLY INTENDED TO WORK
// AROUND VERY SPECIFIC USE CASES.
Expand Down Expand Up @@ -190,10 +192,18 @@ void Fork::DoDecExecCtxCount() {

void Fork::SetResetChildPollingEngineFunc(
Fork::child_postfork_func reset_child_polling_engine) {
reset_child_polling_engine_ = reset_child_polling_engine;
std::cerr << "AAAAAAAAAAAAAAAAAAAAAAA SetResetChildPollingEngineFunc: " << (void*)reset_child_polling_engine << std::endl << std::flush;
if (reset_child_polling_engine_ == nullptr) {
reset_child_polling_engine_ = new std::vector<Fork::child_postfork_func>();
}
if (reset_child_polling_engine == nullptr) {
reset_child_polling_engine_->clear();
} else {
reset_child_polling_engine_->emplace_back(reset_child_polling_engine);
}
}
Fork::child_postfork_func Fork::GetResetChildPollingEngineFunc() {
return reset_child_polling_engine_;
std::vector<Fork::child_postfork_func> Fork::GetResetChildPollingEngineFunc() {
return *reset_child_polling_engine_;
}

bool Fork::BlockExecCtx() {
Expand Down Expand Up @@ -228,5 +238,5 @@ void Fork::AwaitThreads() {

std::atomic<bool> Fork::support_enabled_(false);
bool Fork::override_enabled_ = false;
Fork::child_postfork_func Fork::reset_child_polling_engine_ = nullptr;
std::vector<Fork::child_postfork_func>* Fork::reset_child_polling_engine_ = nullptr;
} // namespace grpc_core
5 changes: 3 additions & 2 deletions src/core/lib/gprpp/fork.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <grpc/support/port_platform.h>

#include <atomic>
#include <vector>

//
// NOTE: FORKING IS NOT GENERALLY SUPPORTED, THIS IS ONLY INTENDED TO WORK
Expand Down Expand Up @@ -58,7 +59,7 @@ class Fork {
// reset the polling engine's internal state.
static void SetResetChildPollingEngineFunc(
child_postfork_func reset_child_polling_engine);
static child_postfork_func GetResetChildPollingEngineFunc();
static std::vector<child_postfork_func> GetResetChildPollingEngineFunc();
gnossen marked this conversation as resolved.
Show resolved Hide resolved

// Check if there is a single active ExecCtx
// (the one used to invoke this function). If there are more,
Expand Down Expand Up @@ -87,7 +88,7 @@ class Fork {

static std::atomic<bool> support_enabled_;
static bool override_enabled_;
static child_postfork_func reset_child_polling_engine_;
static std::vector<child_postfork_func>* reset_child_polling_engine_;
};

} // namespace grpc_core
Expand Down
5 changes: 5 additions & 0 deletions src/core/lib/iomgr/ev_epoll1_linux.cc
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,7 @@ static void fd_shutdown_internal(grpc_fd* fd, grpc_error_handle why,

// Might be called multiple times
static void fd_shutdown(grpc_fd* fd, grpc_error_handle why) {
gpr_log(GPR_ERROR, "Shutting down fd %d", fd->fd);
fd_shutdown_internal(fd, why, false);
}

Expand Down Expand Up @@ -1279,6 +1280,7 @@ const grpc_event_engine_vtable grpc_ev_epoll1_posix = {
// the global epoll fd. This allows gRPC to shutdown in the child process
// without interfering with connections or RPCs ongoing in the parent.
static void reset_event_manager_on_fork() {
std::cerr << "AAAAAAAAAAAAAAAAAAAA Calling reset_event_manager_on_fork" << std::endl << std::flush;
gpr_mu_lock(&fork_fd_list_mu);
while (fork_fd_list_head != nullptr) {
close(fork_fd_list_head->fd);
Expand All @@ -1288,6 +1290,7 @@ static void reset_event_manager_on_fork() {
gpr_mu_unlock(&fork_fd_list_mu);
shutdown_engine();
init_epoll1_linux();
std::cerr << "AAAAAAAAAAAAAAAAAAAA Called reset_event_manager_on_fork" << std::endl << std::flush;
}

// It is possible that GLIBC has epoll but the underlying kernel doesn't.
Expand All @@ -1311,7 +1314,9 @@ static bool init_epoll1_linux() {
return false;
}

std::cerr << "AAAAAAAAAAAAAAAAAAAA In init_epoll1_linux" << std::endl << std::flush;
if (grpc_core::Fork::Enabled()) {
std::cerr << "AAAAAAAAAAAAAAAAAAAA Registering fork handler: " << (void*)&reset_event_manager_on_fork << std::endl << std::flush;
gpr_mu_init(&fork_fd_list_mu);
grpc_core::Fork::SetResetChildPollingEngineFunc(
reset_event_manager_on_fork);
Expand Down
2 changes: 1 addition & 1 deletion src/core/lib/iomgr/ev_posix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ void grpc_fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
}

void grpc_fd_shutdown(grpc_fd* fd, grpc_error_handle why) {
GRPC_POLLING_API_TRACE("fd_shutdown(%d)", grpc_fd_wrapped_fd(fd));
GRPC_POLLING_API_TRACE("fd_shutdown(%d): %s", grpc_fd_wrapped_fd(fd), why.message());
GRPC_FD_TRACE("fd_shutdown(%d)", grpc_fd_wrapped_fd(fd));
g_event_engine->fd_shutdown(fd, why);
}
Expand Down
10 changes: 6 additions & 4 deletions src/core/lib/iomgr/fork_posix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@

#include <string.h>

#include <iostream>

#include <grpc/fork.h>
#include <grpc/grpc.h>
#include <grpc/support/log.h>
Expand Down Expand Up @@ -99,10 +101,10 @@ void grpc_postfork_child() {
if (!skipped_handler) {
grpc_core::Fork::AllowExecCtx();
grpc_core::ExecCtx exec_ctx;
grpc_core::Fork::child_postfork_func reset_polling_engine =
grpc_core::Fork::GetResetChildPollingEngineFunc();
if (reset_polling_engine != nullptr) {
reset_polling_engine();
for (auto reset_polling_engine : grpc_core::Fork::GetResetChildPollingEngineFunc()) {
if (reset_polling_engine != nullptr) {
reset_polling_engine();
}
}
grpc_timer_manager_set_threading(true);
grpc_core::Executor::SetThreadingAll(true);
Expand Down
Loading