Skip to content

Commit

Permalink
Merge pull request grpc#3726 from dgquintas/iomgr_executor
Browse files Browse the repository at this point in the history
Introducing grpc_executor, for all your threading needs
  • Loading branch information
ctiller committed Oct 19, 2015
2 parents 8093b57 + 661ad7f commit e2a1bf4
Show file tree
Hide file tree
Showing 19 changed files with 286 additions and 29 deletions.
6 changes: 6 additions & 0 deletions BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ cc_library(
"src/core/iomgr/endpoint.h",
"src/core/iomgr/endpoint_pair.h",
"src/core/iomgr/exec_ctx.h",
"src/core/iomgr/executor.h",
"src/core/iomgr/fd_posix.h",
"src/core/iomgr/iocp_windows.h",
"src/core/iomgr/iomgr.h",
Expand Down Expand Up @@ -321,6 +322,7 @@ cc_library(
"src/core/iomgr/endpoint_pair_posix.c",
"src/core/iomgr/endpoint_pair_windows.c",
"src/core/iomgr/exec_ctx.c",
"src/core/iomgr/executor.c",
"src/core/iomgr/fd_posix.c",
"src/core/iomgr/iocp_windows.c",
"src/core/iomgr/iomgr.c",
Expand Down Expand Up @@ -469,6 +471,7 @@ cc_library(
"src/core/iomgr/endpoint.h",
"src/core/iomgr/endpoint_pair.h",
"src/core/iomgr/exec_ctx.h",
"src/core/iomgr/executor.h",
"src/core/iomgr/fd_posix.h",
"src/core/iomgr/iocp_windows.h",
"src/core/iomgr/iomgr.h",
Expand Down Expand Up @@ -585,6 +588,7 @@ cc_library(
"src/core/iomgr/endpoint_pair_posix.c",
"src/core/iomgr/endpoint_pair_windows.c",
"src/core/iomgr/exec_ctx.c",
"src/core/iomgr/executor.c",
"src/core/iomgr/fd_posix.c",
"src/core/iomgr/iocp_windows.c",
"src/core/iomgr/iomgr.c",
Expand Down Expand Up @@ -1111,6 +1115,7 @@ objc_library(
"src/core/iomgr/endpoint_pair_posix.c",
"src/core/iomgr/endpoint_pair_windows.c",
"src/core/iomgr/exec_ctx.c",
"src/core/iomgr/executor.c",
"src/core/iomgr/fd_posix.c",
"src/core/iomgr/iocp_windows.c",
"src/core/iomgr/iomgr.c",
Expand Down Expand Up @@ -1256,6 +1261,7 @@ objc_library(
"src/core/iomgr/endpoint.h",
"src/core/iomgr/endpoint_pair.h",
"src/core/iomgr/exec_ctx.h",
"src/core/iomgr/executor.h",
"src/core/iomgr/fd_posix.h",
"src/core/iomgr/iocp_windows.h",
"src/core/iomgr/iomgr.h",
Expand Down
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4082,6 +4082,7 @@ LIBGRPC_SRC = \
src/core/iomgr/endpoint_pair_posix.c \
src/core/iomgr/endpoint_pair_windows.c \
src/core/iomgr/exec_ctx.c \
src/core/iomgr/executor.c \
src/core/iomgr/fd_posix.c \
src/core/iomgr/iocp_windows.c \
src/core/iomgr/iomgr.c \
Expand Down Expand Up @@ -4362,6 +4363,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/iomgr/endpoint_pair_posix.c \
src/core/iomgr/endpoint_pair_windows.c \
src/core/iomgr/exec_ctx.c \
src/core/iomgr/executor.c \
src/core/iomgr/fd_posix.c \
src/core/iomgr/iocp_windows.c \
src/core/iomgr/iomgr.c \
Expand Down
1 change: 1 addition & 0 deletions binding.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@
'src/core/iomgr/endpoint_pair_posix.c',
'src/core/iomgr/endpoint_pair_windows.c',
'src/core/iomgr/exec_ctx.c',
'src/core/iomgr/executor.c',
'src/core/iomgr/fd_posix.c',
'src/core/iomgr/iocp_windows.c',
'src/core/iomgr/iomgr.c',
Expand Down
2 changes: 2 additions & 0 deletions build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ filegroups:
- src/core/iomgr/endpoint.h
- src/core/iomgr/endpoint_pair.h
- src/core/iomgr/exec_ctx.h
- src/core/iomgr/executor.h
- src/core/iomgr/fd_posix.h
- src/core/iomgr/iocp_windows.h
- src/core/iomgr/iomgr.h
Expand Down Expand Up @@ -254,6 +255,7 @@ filegroups:
- src/core/iomgr/endpoint_pair_posix.c
- src/core/iomgr/endpoint_pair_windows.c
- src/core/iomgr/exec_ctx.c
- src/core/iomgr/executor.c
- src/core/iomgr/fd_posix.c
- src/core/iomgr/iocp_windows.c
- src/core/iomgr/iomgr.c
Expand Down
3 changes: 3 additions & 0 deletions gRPC.podspec
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ Pod::Spec.new do |s|
'src/core/iomgr/endpoint.h',
'src/core/iomgr/endpoint_pair.h',
'src/core/iomgr/exec_ctx.h',
'src/core/iomgr/executor.h',
'src/core/iomgr/fd_posix.h',
'src/core/iomgr/iocp_windows.h',
'src/core/iomgr/iomgr.h',
Expand Down Expand Up @@ -332,6 +333,7 @@ Pod::Spec.new do |s|
'src/core/iomgr/endpoint_pair_posix.c',
'src/core/iomgr/endpoint_pair_windows.c',
'src/core/iomgr/exec_ctx.c',
'src/core/iomgr/executor.c',
'src/core/iomgr/fd_posix.c',
'src/core/iomgr/iocp_windows.c',
'src/core/iomgr/iomgr.c',
Expand Down Expand Up @@ -479,6 +481,7 @@ Pod::Spec.new do |s|
'src/core/iomgr/endpoint.h',
'src/core/iomgr/endpoint_pair.h',
'src/core/iomgr/exec_ctx.h',
'src/core/iomgr/executor.h',
'src/core/iomgr/fd_posix.h',
'src/core/iomgr/iocp_windows.h',
'src/core/iomgr/iomgr.h',
Expand Down
10 changes: 10 additions & 0 deletions src/core/iomgr/closure.c
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,16 @@ void grpc_closure_list_move(grpc_closure_list *src, grpc_closure_list *dst) {
src->head = src->tail = NULL;
}

grpc_closure *grpc_closure_list_pop(grpc_closure_list *list) {
grpc_closure *head;
if (list->head == NULL) {
return NULL;
}
head = list->head;
list->head = list->head->next;
return head;
}

typedef struct {
grpc_iomgr_cb_func cb;
void *cb_arg;
Expand Down
9 changes: 9 additions & 0 deletions src/core/iomgr/closure.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,18 @@ grpc_closure *grpc_closure_create(grpc_iomgr_cb_func cb, void *cb_arg);
#define GRPC_CLOSURE_LIST_INIT \
{ NULL, NULL }

/** add \a closure to the end of \a list and set \a closure's success to \a
* success */
void grpc_closure_list_add(grpc_closure_list *list, grpc_closure *closure,
int success);

/** append all closures from \a src to \a dst and empty \a src. */
void grpc_closure_list_move(grpc_closure_list *src, grpc_closure_list *dst);

/** pop (return and remove) the head closure from \a list. */
grpc_closure *grpc_closure_list_pop(grpc_closure_list *list);

/** return whether \a list is empty. */
int grpc_closure_list_empty(grpc_closure_list list);

#endif /* GRPC_INTERNAL_CORE_IOMGR_CLOSURE_H */
148 changes: 148 additions & 0 deletions src/core/iomgr/executor.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/

#include "src/core/iomgr/executor.h"

#include <string.h>

#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
#include <grpc/support/thd.h>
#include "src/core/iomgr/exec_ctx.h"

typedef struct grpc_executor_data {
int busy; /**< is the thread currently running? */
int shutting_down; /**< has \a grpc_shutdown() been invoked? */
int pending_join; /**< has the thread finished but not been joined? */
grpc_closure_list closures; /**< collection of pending work */
gpr_thd_id tid; /**< thread id of the thread, only valid if \a busy or \a
pending_join are true */
gpr_thd_options options;
gpr_mu mu;
} grpc_executor;

static grpc_executor g_executor;

void grpc_executor_init() {
memset(&g_executor, 0, sizeof(grpc_executor));
gpr_mu_init(&g_executor.mu);
g_executor.options = gpr_thd_options_default();
gpr_thd_options_set_joinable(&g_executor.options);
}

/* thread body */
static void closure_exec_thread_func(void *ignored) {
grpc_closure *closure;

grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
while (1) {
gpr_mu_lock(&g_executor.mu);
if (g_executor.shutting_down != 0) {
gpr_mu_unlock(&g_executor.mu);
break;
}
closure = grpc_closure_list_pop(&g_executor.closures);
if (closure == NULL) {
/* no more work, time to die */
GPR_ASSERT(g_executor.busy == 1);
g_executor.busy = 0;
gpr_mu_unlock(&g_executor.mu);
break;
}
gpr_mu_unlock(&g_executor.mu);
closure->cb(&exec_ctx, closure->cb_arg, closure->success);
grpc_exec_ctx_flush(&exec_ctx);
}
grpc_exec_ctx_finish(&exec_ctx);
}

/* Spawn the thread if new work has arrived a no thread is up */
static void maybe_spawn_locked() {
if (grpc_closure_list_empty(g_executor.closures) == 1) {
return;
}
if (g_executor.shutting_down == 1) {
return;
}

if (g_executor.busy != 0) {
/* Thread still working. New work will be picked up by already running
* thread. Not spawning anything. */
return;
} else if (g_executor.pending_join != 0) {
/* Pickup the remains of the previous incarnations of the thread. */
gpr_thd_join(g_executor.tid);
g_executor.pending_join = 0;
}

/* All previous instances of the thread should have been joined at this point.
* Spawn time! */
g_executor.busy = 1;
gpr_thd_new(&g_executor.tid, closure_exec_thread_func, NULL,
&g_executor.options);
g_executor.pending_join = 1;
}

void grpc_executor_enqueue(grpc_closure *closure, int success) {
gpr_mu_lock(&g_executor.mu);
if (g_executor.shutting_down == 0) {
grpc_closure_list_add(&g_executor.closures, closure, success);
maybe_spawn_locked();
}
gpr_mu_unlock(&g_executor.mu);
}

void grpc_executor_shutdown() {
int pending_join;
grpc_closure *closure;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;

gpr_mu_lock(&g_executor.mu);
pending_join = g_executor.pending_join;
g_executor.shutting_down = 1;
gpr_mu_unlock(&g_executor.mu);
/* we can release the lock at this point despite the access to the closure
* list below because we aren't accepting new work */

/* Execute pending callbacks, some may be performing cleanups */
while ((closure = grpc_closure_list_pop(&g_executor.closures)) != NULL) {
closure->cb(&exec_ctx, closure->cb_arg, closure->success);
}
grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(grpc_closure_list_empty(g_executor.closures));
if (pending_join) {
gpr_thd_join(g_executor.tid);
}
gpr_mu_destroy(&g_executor.mu);
}
53 changes: 53 additions & 0 deletions src/core/iomgr/executor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/

#ifndef GRPC_INTERNAL_CORE_IOMGR_EXECUTOR_H
#define GRPC_INTERNAL_CORE_IOMGR_EXECUTOR_H

#include "src/core/iomgr/closure.h"

/** Initialize the global executor.
*
* This mechanism is meant to outsource work (grpc_closure instances) to a
* thread, for those cases where blocking isn't an option but there isn't a
* non-blocking solution available. */
void grpc_executor_init();

/** Enqueue \a closure for its eventual execution of \a f(arg) on a separate
* thread */
void grpc_executor_enqueue(grpc_closure *closure, int success);

/** Shutdown the executor, running all pending work as part of the call */
void grpc_executor_shutdown();

#endif /* GRPC_INTERNAL_CORE_IOMGR_EXECUTOR_H */
Loading

0 comments on commit e2a1bf4

Please sign in to comment.