Skip to content

Commit

Permalink
Merge pull request grpc#5257 from ctiller/cleaner-posix
Browse files Browse the repository at this point in the history
Separate timer checking from pollsets
  • Loading branch information
vjpai committed Feb 23, 2016
2 parents bfc7ada + 23a3298 commit 349bba9
Show file tree
Hide file tree
Showing 21 changed files with 86 additions and 131 deletions.
3 changes: 0 additions & 3 deletions BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,6 @@ cc_library(
"src/core/iomgr/time_averaged_stats.h",
"src/core/iomgr/timer.h",
"src/core/iomgr/timer_heap.h",
"src/core/iomgr/timer_internal.h",
"src/core/iomgr/udp_server.h",
"src/core/iomgr/wakeup_fd_pipe.h",
"src/core/iomgr/wakeup_fd_posix.h",
Expand Down Expand Up @@ -538,7 +537,6 @@ cc_library(
"src/core/iomgr/time_averaged_stats.h",
"src/core/iomgr/timer.h",
"src/core/iomgr/timer_heap.h",
"src/core/iomgr/timer_internal.h",
"src/core/iomgr/udp_server.h",
"src/core/iomgr/wakeup_fd_pipe.h",
"src/core/iomgr/wakeup_fd_posix.h",
Expand Down Expand Up @@ -1483,7 +1481,6 @@ objc_library(
"src/core/iomgr/time_averaged_stats.h",
"src/core/iomgr/timer.h",
"src/core/iomgr/timer_heap.h",
"src/core/iomgr/timer_internal.h",
"src/core/iomgr/udp_server.h",
"src/core/iomgr/wakeup_fd_pipe.h",
"src/core/iomgr/wakeup_fd_posix.h",
Expand Down
1 change: 0 additions & 1 deletion build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,6 @@ filegroups:
- src/core/iomgr/time_averaged_stats.h
- src/core/iomgr/timer.h
- src/core/iomgr/timer_heap.h
- src/core/iomgr/timer_internal.h
- src/core/iomgr/udp_server.h
- src/core/iomgr/wakeup_fd_pipe.h
- src/core/iomgr/wakeup_fd_posix.h
Expand Down
2 changes: 0 additions & 2 deletions gRPC.podspec
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,6 @@ Pod::Spec.new do |s|
'src/core/iomgr/time_averaged_stats.h',
'src/core/iomgr/timer.h',
'src/core/iomgr/timer_heap.h',
'src/core/iomgr/timer_internal.h',
'src/core/iomgr/udp_server.h',
'src/core/iomgr/wakeup_fd_pipe.h',
'src/core/iomgr/wakeup_fd_posix.h',
Expand Down Expand Up @@ -532,7 +531,6 @@ Pod::Spec.new do |s|
'src/core/iomgr/time_averaged_stats.h',
'src/core/iomgr/timer.h',
'src/core/iomgr/timer_heap.h',
'src/core/iomgr/timer_internal.h',
'src/core/iomgr/udp_server.h',
'src/core/iomgr/wakeup_fd_pipe.h',
'src/core/iomgr/wakeup_fd_posix.h',
Expand Down
1 change: 0 additions & 1 deletion grpc.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,6 @@ Gem::Specification.new do |s|
s.files += %w( src/core/iomgr/time_averaged_stats.h )
s.files += %w( src/core/iomgr/timer.h )
s.files += %w( src/core/iomgr/timer_heap.h )
s.files += %w( src/core/iomgr/timer_internal.h )
s.files += %w( src/core/iomgr/udp_server.h )
s.files += %w( src/core/iomgr/wakeup_fd_pipe.h )
s.files += %w( src/core/iomgr/wakeup_fd_posix.h )
Expand Down
1 change: 0 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,6 @@
"src/core/iomgr/time_averaged_stats.h",
"src/core/iomgr/timer.h",
"src/core/iomgr/timer_heap.h",
"src/core/iomgr/timer_internal.h",
"src/core/iomgr/udp_server.h",
"src/core/iomgr/wakeup_fd_pipe.h",
"src/core/iomgr/wakeup_fd_posix.h",
Expand Down
2 changes: 1 addition & 1 deletion src/core/iomgr/iocp_windows.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/thd.h>

#include "src/core/iomgr/timer_internal.h"
#include "src/core/iomgr/timer.h"
#include "src/core/iomgr/iocp_windows.h"
#include "src/core/iomgr/iomgr_internal.h"
#include "src/core/iomgr/socket_windows.h"
Expand Down
4 changes: 2 additions & 2 deletions src/core/iomgr/iomgr.c
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
*
* Copyright 2015, Google Inc.
* Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
Expand Down Expand Up @@ -43,7 +43,7 @@
#include <grpc/support/thd.h>

#include "src/core/iomgr/iomgr_internal.h"
#include "src/core/iomgr/timer_internal.h"
#include "src/core/iomgr/timer.h"
#include "src/core/support/string.h"

static gpr_mu g_mu;
Expand Down
11 changes: 0 additions & 11 deletions src/core/iomgr/pollset_posix.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
#include <string.h>
#include <unistd.h>

#include "src/core/iomgr/timer_internal.h"
#include "src/core/iomgr/fd_posix.h"
#include "src/core/iomgr/iomgr_internal.h"
#include "src/core/iomgr/socket_utils_posix.h"
Expand Down Expand Up @@ -274,16 +273,6 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL);
goto done;
}
/* Check alarms - these are a global resource so we just ping
each time through on every pollset.
May update deadline to ensure timely wakeups.
TODO(ctiller): can this work be localized? */
if (grpc_timer_check(exec_ctx, now, &deadline)) {
GPR_TIMER_MARK("grpc_pollset_work.alarm_triggered", 0);
gpr_mu_unlock(&pollset->mu);
locked = 0;
goto done;
}
/* If we're shutting down then we don't execute any extended work */
if (pollset->shutting_down) {
GPR_TIMER_MARK("grpc_pollset_work.shutting_down", 0);
Expand Down
4 changes: 0 additions & 4 deletions src/core/iomgr/pollset_windows.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
#include <grpc/support/log.h>
#include <grpc/support/thd.h>

#include "src/core/iomgr/timer_internal.h"
#include "src/core/iomgr/iomgr_internal.h"
#include "src/core/iomgr/iocp_windows.h"
#include "src/core/iomgr/pollset.h"
Expand Down Expand Up @@ -136,9 +135,6 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
worker->kicked = 0;
worker->pollset = pollset;
gpr_cv_init(&worker->cv);
if (grpc_timer_check(exec_ctx, now, &deadline)) {
goto done;
}
if (!pollset->kicked_without_pollers && !pollset->shutting_down) {
if (g_active_poller == NULL) {
grpc_pollset_worker *next_worker;
Expand Down
5 changes: 2 additions & 3 deletions src/core/iomgr/timer.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
#include "src/core/iomgr/timer.h"

#include "src/core/iomgr/timer_heap.h"
#include "src/core/iomgr/timer_internal.h"
#include "src/core/iomgr/time_averaged_stats.h"
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
Expand Down Expand Up @@ -336,8 +335,8 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_timespec now,
return (int)n;
}

int grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now,
gpr_timespec *next) {
bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now,
gpr_timespec *next) {
GPR_ASSERT(now.clock_type == g_clock_type);
return run_some_expired_timers(
exec_ctx, now, next,
Expand Down
22 changes: 21 additions & 1 deletion src/core/iomgr/timer.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
*
* Copyright 2015, Google Inc.
* Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
Expand Down Expand Up @@ -86,4 +86,24 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
Requires: cancel() must happen after add() on a given timer */
void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer);

/* iomgr internal api for dealing with timers */

/* Check for timers to be run, and run them.
Return true if timer callbacks were executed.
Drops drop_mu if it is non-null before executing callbacks.
If next is non-null, TRY to update *next with the next running timer
IF that timer occurs before *next current value.
*next is never guaranteed to be updated on any given execution; however,
with high probability at least one thread in the system will see an update
at any time slice. */

bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now,
gpr_timespec *next);
void grpc_timer_list_init(gpr_timespec now);
void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx);

/* the following must be implemented by each iomgr implementation */

void grpc_kick_poller(void);

#endif /* GRPC_INTERNAL_CORE_IOMGR_TIMER_H */
61 changes: 0 additions & 61 deletions src/core/iomgr/timer_internal.h

This file was deleted.

30 changes: 28 additions & 2 deletions src/core/surface/completion_queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,20 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
break;
}
first_loop = 0;
grpc_pollset_work(&exec_ctx, &cc->pollset, &worker, now, deadline);
/* Check alarms - these are a global resource so we just ping
each time through on every pollset.
May update deadline to ensure timely wakeups.
TODO(ctiller): can this work be localized? */
gpr_timespec iteration_deadline = deadline;
if (grpc_timer_check(&exec_ctx, now, &iteration_deadline)) {
GPR_TIMER_MARK("alarm_triggered", 0);
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
grpc_exec_ctx_flush(&exec_ctx);
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
} else {
grpc_pollset_work(&exec_ctx, &cc->pollset, &worker, now,
iteration_deadline);
}
}
GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
GRPC_CQ_INTERNAL_UNREF(cc, "next");
Expand Down Expand Up @@ -427,7 +440,20 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
break;
}
first_loop = 0;
grpc_pollset_work(&exec_ctx, &cc->pollset, &worker, now, deadline);
/* Check alarms - these are a global resource so we just ping
each time through on every pollset.
May update deadline to ensure timely wakeups.
TODO(ctiller): can this work be localized? */
gpr_timespec iteration_deadline = deadline;
if (grpc_timer_check(&exec_ctx, now, &iteration_deadline)) {
GPR_TIMER_MARK("alarm_triggered", 0);
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
grpc_exec_ctx_flush(&exec_ctx);
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
} else {
grpc_pollset_work(&exec_ctx, &cc->pollset, &worker, now,
iteration_deadline);
}
del_plucker(cc, tag, &worker);
}
done:
Expand Down
26 changes: 18 additions & 8 deletions test/core/iomgr/tcp_client_posix_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@

#include "src/core/iomgr/iomgr.h"
#include "src/core/iomgr/socket_utils_posix.h"
#include "src/core/iomgr/timer.h"
#include "test/core/util/test_config.h"

static grpc_pollset_set g_pollset_set;
Expand Down Expand Up @@ -125,11 +126,13 @@ void test_succeeds(void) {
gpr_now(GPR_CLOCK_MONOTONIC),
GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5));
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
grpc_exec_ctx_finish(&exec_ctx);
grpc_exec_ctx_flush(&exec_ctx);
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
}

gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));

grpc_exec_ctx_finish(&exec_ctx);
}

void test_fails(void) {
Expand Down Expand Up @@ -159,14 +162,18 @@ void test_fails(void) {
/* wait for the connection callback to finish */
while (g_connections_complete == connections_complete_before) {
grpc_pollset_worker worker;
grpc_pollset_work(&exec_ctx, &g_pollset, &worker,
gpr_now(GPR_CLOCK_MONOTONIC), test_deadline());
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
gpr_timespec polling_deadline = test_deadline();
if (!grpc_timer_check(&exec_ctx, now, &polling_deadline)) {
grpc_pollset_work(&exec_ctx, &g_pollset, &worker, now, polling_deadline);
}
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
grpc_exec_ctx_finish(&exec_ctx);
grpc_exec_ctx_flush(&exec_ctx);
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
}

gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
grpc_exec_ctx_finish(&exec_ctx);
}

void test_times_out(void) {
Expand Down Expand Up @@ -243,15 +250,18 @@ void test_times_out(void) {
GPR_ASSERT(g_connections_complete ==
connections_complete_before + is_after_deadline);
}
grpc_pollset_work(&exec_ctx, &g_pollset, &worker,
gpr_now(GPR_CLOCK_MONOTONIC),
GRPC_TIMEOUT_MILLIS_TO_DEADLINE(10));
gpr_timespec polling_deadline = GRPC_TIMEOUT_MILLIS_TO_DEADLINE(10);
if (!grpc_timer_check(&exec_ctx, now, &polling_deadline)) {
grpc_pollset_work(&exec_ctx, &g_pollset, &worker, now, polling_deadline);
}
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
grpc_exec_ctx_finish(&exec_ctx);
grpc_exec_ctx_flush(&exec_ctx);
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
}
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));

grpc_exec_ctx_finish(&exec_ctx);

close(svr_fd);
for (i = 0; i < NUM_CLIENT_CONNECTS; ++i) {
close(client_fd[i]);
Expand Down
Loading

0 comments on commit 349bba9

Please sign in to comment.