Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Autosize timer list shards #11888

Merged
merged 5 commits into from
Nov 10, 2017
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Autosize shards for timers
  • Loading branch information
ctiller committed Jul 20, 2017
commit 82f9886e3c89fedf70f91824a36f5404995da4b8
27 changes: 17 additions & 10 deletions src/core/lib/iomgr/timer_generic.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "src/core/lib/iomgr/timer.h"

#include <grpc/support/alloc.h>
#include <grpc/support/cpu.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpc/support/sync.h>
Expand All @@ -35,8 +36,6 @@

#define INVALID_HEAP_INDEX 0xffffffffu

#define LOG2_NUM_SHARDS 5
#define NUM_SHARDS (1 << LOG2_NUM_SHARDS)
#define ADD_DEADLINE_SCALE 0.33
#define MIN_QUEUE_WINDOW_DURATION 0.01
#define MAX_QUEUE_WINDOW_DURATION 1
Expand Down Expand Up @@ -70,14 +69,16 @@ typedef struct {
grpc_timer list;
} timer_shard;

static size_t g_num_shards;

/* Array of timer shards. Whenever a timer (grpc_timer *) is added, its address
* is hashed to select the timer shard to add the timer to */
static timer_shard g_shards[NUM_SHARDS];
static timer_shard *g_shards;

/* Maintains a sorted list of timer shards (sorted by their min_deadline, i.e
* the deadline of the next timer in each shard).
* Access to this is protected by g_shared_mutables.mu */
static timer_shard *g_shard_queue[NUM_SHARDS];
static timer_shard **g_shard_queue;

/* Thread local variable that stores the deadline of the next timer the thread
* has last-seen. This is an optimization to prevent the thread from checking
Expand Down Expand Up @@ -120,6 +121,10 @@ static gpr_atm compute_min_deadline(timer_shard *shard) {
void grpc_timer_list_init(grpc_exec_ctx *exec_ctx) {
uint32_t i;

g_num_shards = GPR_MIN(1, 2 * gpr_cpu_num_cores());
g_shards = gpr_zalloc(g_num_shards * sizeof(*g_shards));
g_shard_queue = gpr_zalloc(g_num_shards * sizeof(*g_shard_queue));

g_shared_mutables.initialized = true;
gpr_mu_init(&g_shared_mutables.mu);
g_shared_mutables.min_timer = grpc_exec_ctx_now(exec_ctx);
Expand All @@ -128,7 +133,7 @@ void grpc_timer_list_init(grpc_exec_ctx *exec_ctx) {
grpc_register_tracer(&grpc_timer_trace);
grpc_register_tracer(&grpc_timer_check_trace);

for (i = 0; i < NUM_SHARDS; i++) {
for (i = 0; i < g_num_shards; i++) {
timer_shard *shard = &g_shards[i];
gpr_mu_init(&shard->mu);
grpc_time_averaged_stats_init(&shard->stats, 1.0 / ADD_DEADLINE_SCALE, 0.1,
Expand All @@ -143,17 +148,19 @@ void grpc_timer_list_init(grpc_exec_ctx *exec_ctx) {
}

void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx) {
int i;
size_t i;
run_some_expired_timers(
exec_ctx, GPR_ATM_MAX, NULL,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Timer list shutdown"));
for (i = 0; i < NUM_SHARDS; i++) {
for (i = 0; i < g_num_shards; i++) {
timer_shard *shard = &g_shards[i];
gpr_mu_destroy(&shard->mu);
grpc_timer_heap_destroy(&shard->heap);
}
gpr_mu_destroy(&g_shared_mutables.mu);
gpr_tls_destroy(&g_last_seen_min_timer);
gpr_free(g_shards);
gpr_free(g_shard_queue);
g_shared_mutables.initialized = false;
}

Expand Down Expand Up @@ -187,7 +194,7 @@ static void note_deadline_change(timer_shard *shard) {
g_shard_queue[shard->shard_queue_index - 1]->min_deadline) {
swap_adjacent_shards_in_queue(shard->shard_queue_index - 1);
}
while (shard->shard_queue_index < NUM_SHARDS - 1 &&
while (shard->shard_queue_index < g_num_shards - 1 &&
shard->min_deadline >
g_shard_queue[shard->shard_queue_index + 1]->min_deadline) {
swap_adjacent_shards_in_queue(shard->shard_queue_index);
Expand All @@ -197,7 +204,7 @@ static void note_deadline_change(timer_shard *shard) {
void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
grpc_millis deadline, grpc_closure *closure) {
int is_first_timer = 0;
timer_shard *shard = &g_shards[GPR_HASH_POINTER(timer, NUM_SHARDS)];
timer_shard *shard = &g_shards[GPR_HASH_POINTER(timer, g_num_shards)];
timer->closure = closure;
timer->deadline = deadline;

Expand Down Expand Up @@ -283,7 +290,7 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) {
return;
}

timer_shard *shard = &g_shards[GPR_HASH_POINTER(timer, NUM_SHARDS)];
timer_shard *shard = &g_shards[GPR_HASH_POINTER(timer, g_num_shards)];
gpr_mu_lock(&shard->mu);
if (GRPC_TRACER_ON(grpc_timer_trace)) {
gpr_log(GPR_DEBUG, "TIMER %p: CANCEL pending=%s", timer,
Expand Down