Skip to content

Commit

Permalink
Lower latency profiling
Browse files Browse the repository at this point in the history
Current latency profiles have their tails dominated by writing latency
logs, which is hugely undesirable.

Now when a thread log fills up, push it to a background thread to write
to disk. At shutdown, wait for all latency traces to be flushed.
  • Loading branch information
ctiller committed Nov 3, 2015
1 parent 4b65b1d commit 61ead3e
Show file tree
Hide file tree
Showing 8 changed files with 189 additions and 31 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ cache.mk
# Temporary test reports
report.xml
latency_trace.txt
latency_trace.*.txt

# port server log
portlog.txt
Expand Down
185 changes: 162 additions & 23 deletions src/core/profiling/basic_timers.c
Original file line number Diff line number Diff line change
Expand Up @@ -53,57 +53,194 @@ typedef struct gpr_timer_entry {
short line;
char type;
gpr_uint8 important;
int thd;
} gpr_timer_entry;

#define MAX_COUNT (5 * 1024 * 1024 / sizeof(gpr_timer_entry))
#define MAX_COUNT 1000000

static __thread gpr_timer_entry g_log[MAX_COUNT];
static __thread int g_count;
typedef struct gpr_timer_log {
size_t num_entries;
struct gpr_timer_log *next;
struct gpr_timer_log *prev;
gpr_timer_entry log[MAX_COUNT];
} gpr_timer_log;

typedef struct gpr_timer_log_list {
gpr_timer_log *head;
/* valid iff head!=NULL */
gpr_timer_log *tail;
} gpr_timer_log_list;

static __thread gpr_timer_log *g_thread_log;
static gpr_once g_once_init = GPR_ONCE_INIT;
static FILE *output_file;
static const char *output_filename = "latency_trace.txt";
static pthread_mutex_t g_mu;
static pthread_cond_t g_cv;
static gpr_timer_log_list g_in_progress_logs;
static gpr_timer_log_list g_done_logs;
static int g_shutdown;
static gpr_thd_id g_writing_thread;
static __thread int g_thread_id;
static int g_next_thread_id;

static void close_output() { fclose(output_file); }
static int timer_log_push_back(gpr_timer_log_list *list, gpr_timer_log *log) {
if (list->head == NULL) {
list->head = list->tail = log;
log->next = log->prev = NULL;
return 1;
} else {
log->prev = list->tail;
log->next = NULL;
list->tail->next = log;
list->tail = log;
return 0;
}
}

static void init_output() {
output_file = fopen("latency_trace.txt", "w");
GPR_ASSERT(output_file);
atexit(close_output);
static gpr_timer_log *timer_log_pop_front(gpr_timer_log_list *list) {
gpr_timer_log *out = list->head;
if (out != NULL) {
list->head = out->next;
if (list->head != NULL) {
list->head->prev = NULL;
} else {
list->tail = NULL;
}
}
return out;
}

static void log_report() {
int i;
gpr_once_init(&g_once_init, init_output);
for (i = 0; i < g_count; i++) {
gpr_timer_entry *entry = &(g_log[i]);
static void timer_log_remove(gpr_timer_log_list *list, gpr_timer_log *log) {
if (log->prev == NULL) {
list->head = log->next;
if (list->head != NULL) {
list->head->prev = NULL;
}
} else {
log->prev->next = log->next;
}
if (log->next == NULL) {
list->tail = log->prev;
if (list->tail != NULL) {
list->tail->next = NULL;
}
} else {
log->next->prev = log->prev;
}
}

static void write_log(gpr_timer_log *log) {
size_t i;
if (output_file == NULL) {
output_file = fopen(output_filename, "w");
}
for (i = 0; i < log->num_entries; i++) {
gpr_timer_entry *entry = &(log->log[i]);
if (gpr_time_cmp(entry->tm, gpr_time_0(entry->tm.clock_type)) < 0) {
entry->tm = gpr_time_0(entry->tm.clock_type);
}
fprintf(output_file,
"{\"t\": %ld.%09d, \"thd\": \"%p\", \"type\": \"%c\", \"tag\": "
"{\"t\": %ld.%09d, \"thd\": \"%d\", \"type\": \"%c\", \"tag\": "
"\"%s\", \"file\": \"%s\", \"line\": %d, \"imp\": %d}\n",
entry->tm.tv_sec, entry->tm.tv_nsec,
(void *)(gpr_intptr)gpr_thd_currentid(), entry->type, entry->tagstr,
entry->file, entry->line, entry->important);
entry->tm.tv_sec, entry->tm.tv_nsec, entry->thd, entry->type,
entry->tagstr, entry->file, entry->line, entry->important);
}
}

static void writing_thread(void *unused) {
gpr_timer_log *log;
pthread_mutex_lock(&g_mu);
for (;;) {
while ((log = timer_log_pop_front(&g_done_logs)) == NULL && !g_shutdown) {
pthread_cond_wait(&g_cv, &g_mu);
}
if (log != NULL) {
pthread_mutex_unlock(&g_mu);
write_log(log);
free(log);
pthread_mutex_lock(&g_mu);
}
if (g_shutdown) {
pthread_mutex_unlock(&g_mu);
return;
}
}
}

/* Now clear out the log */
g_count = 0;
static void flush_logs(gpr_timer_log_list *list) {
gpr_timer_log *log;
while ((log = timer_log_pop_front(list)) != NULL) {
write_log(log);
free(log);
}
}

static void finish_writing() {
pthread_mutex_lock(&g_mu);
g_shutdown = 1;
pthread_cond_signal(&g_cv);
pthread_mutex_unlock(&g_mu);
gpr_thd_join(g_writing_thread);

gpr_log(GPR_INFO, "flushing logs");

pthread_mutex_lock(&g_mu);
flush_logs(&g_done_logs);
flush_logs(&g_in_progress_logs);
pthread_mutex_unlock(&g_mu);

if (output_file) {
fclose(output_file);
}
}

void gpr_timers_set_log_filename(const char *filename) {
output_filename = filename;
}

static void init_output() {
gpr_thd_options options = gpr_thd_options_default();
gpr_thd_options_set_joinable(&options);
gpr_thd_new(&g_writing_thread, writing_thread, NULL, &options);
atexit(finish_writing);
}

static void rotate_log() {
gpr_timer_log *new = malloc(sizeof(*new));
gpr_once_init(&g_once_init, init_output);
new->num_entries = 0;
pthread_mutex_lock(&g_mu);
if (g_thread_log != NULL) {
timer_log_remove(&g_in_progress_logs, g_thread_log);
if (timer_log_push_back(&g_done_logs, g_thread_log)) {
pthread_cond_signal(&g_cv);
}
} else {
g_thread_id = g_next_thread_id++;
}
timer_log_push_back(&g_in_progress_logs, new);
pthread_mutex_unlock(&g_mu);
g_thread_log = new;
}

static void gpr_timers_log_add(const char *tagstr, marker_type type,
int important, const char *file, int line) {
gpr_timer_entry *entry;

/* TODO (vpai) : Improve concurrency */
if (g_count == MAX_COUNT) {
log_report();
if (g_thread_log == NULL || g_thread_log->num_entries == MAX_COUNT) {
rotate_log();
}

entry = &g_log[g_count++];
entry = &g_thread_log->log[g_thread_log->num_entries++];

entry->tm = gpr_now(GPR_CLOCK_PRECISE);
entry->tagstr = tagstr;
entry->type = type;
entry->file = file;
entry->line = (short)line;
entry->important = important != 0;
entry->thd = g_thread_id;
}

/* Latency profiler API implementation. */
Expand Down Expand Up @@ -131,4 +268,6 @@ void gpr_timers_global_destroy(void) {}
void gpr_timers_global_init(void) {}

void gpr_timers_global_destroy(void) {}

void gpr_timers_set_log_filename(const char *filename) {}
#endif /* GRPC_BASIC_PROFILER */
2 changes: 2 additions & 0 deletions src/core/profiling/timers.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ void gpr_timer_begin(const char *tagstr, int important, const char *file,
void gpr_timer_end(const char *tagstr, int important, const char *file,
int line);

void gpr_timers_set_log_filename(const char *filename);

#if !(defined(GRPC_STAP_PROFILER) + defined(GRPC_BASIC_PROFILER))
/* No profiling. No-op all the things. */
#define GPR_TIMER_MARK(tag, important) \
Expand Down
9 changes: 6 additions & 3 deletions src/core/support/thd_posix.c
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ struct thd_arg {
/* Body of every thread started via gpr_thd_new. */
static void *thread_body(void *v) {
struct thd_arg a = *(struct thd_arg *)v;
gpr_free(v);
free(v);
(*a.body)(a.arg);
return NULL;
}
Expand All @@ -63,7 +63,10 @@ int gpr_thd_new(gpr_thd_id *t, void (*thd_body)(void *arg), void *arg,
int thread_started;
pthread_attr_t attr;
pthread_t p;
struct thd_arg *a = gpr_malloc(sizeof(*a));
/* don't use gpr_malloc as we may cause an infinite recursion with
* the profiling code */
struct thd_arg *a = malloc(sizeof(*a));
GPR_ASSERT(a != NULL);
a->body = thd_body;
a->arg = arg;

Expand All @@ -78,7 +81,7 @@ int gpr_thd_new(gpr_thd_id *t, void (*thd_body)(void *arg), void *arg,
thread_started = (pthread_create(&p, &attr, &thread_body, a) == 0);
GPR_ASSERT(pthread_attr_destroy(&attr) == 0);
if (!thread_started) {
gpr_free(a);
free(a);
}
*t = (gpr_thd_id)p;
return thread_started;
Expand Down
2 changes: 2 additions & 0 deletions src/core/transport/chttp2/writing.c
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,8 @@ static void finalize_outbuf(grpc_exec_ctx *exec_ctx,
grpc_chttp2_list_add_written_stream(transport_writing, stream_writing);
}
}

GPR_TIMER_END("finalize_outbuf", 0);
}

void grpc_chttp2_cleanup_writing(
Expand Down
11 changes: 9 additions & 2 deletions test/core/fling/client.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include <grpc/support/useful.h>
#include "src/core/profiling/timers.h"
#include "test/core/util/grpc_profiler.h"
#include "test/core/util/test_config.h"

Expand Down Expand Up @@ -89,6 +90,7 @@ static void init_ping_pong_request(void) {
}

static void step_ping_pong_request(void) {
GPR_TIMER_BEGIN("ping_pong", 1);
call = grpc_channel_create_call(channel, NULL, GRPC_PROPAGATE_DEFAULTS, cq,
"/Reflector/reflectUnary", "localhost",
gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
Expand All @@ -99,6 +101,7 @@ static void step_ping_pong_request(void) {
grpc_call_destroy(call);
grpc_byte_buffer_destroy(response_payload_recv);
call = NULL;
GPR_TIMER_END("ping_pong", 1);
}

static void init_ping_pong_stream(void) {
Expand All @@ -122,10 +125,12 @@ static void init_ping_pong_stream(void) {

static void step_ping_pong_stream(void) {
grpc_call_error error;
GPR_TIMER_BEGIN("ping_pong", 1);
error = grpc_call_start_batch(call, stream_step_ops, 2, (void *)1, NULL);
GPR_ASSERT(GRPC_CALL_OK == error);
grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
grpc_byte_buffer_destroy(response_payload_recv);
GPR_TIMER_END("ping_pong", 1);
}

static double now(void) {
Expand Down Expand Up @@ -159,12 +164,14 @@ int main(int argc, char **argv) {
char *scenario_name = "ping-pong-request";
scenario sc = {NULL, NULL, NULL};

gpr_timers_set_log_filename("latency_trace.fling_client.txt");

grpc_init();

GPR_ASSERT(argc >= 1);
fake_argv[0] = argv[0];
grpc_test_init(1, fake_argv);

grpc_init();

cl = gpr_cmdline_create("fling client");
gpr_cmdline_add_int(cl, "payload_size", "Size of the payload to send",
&payload_size);
Expand Down
7 changes: 5 additions & 2 deletions test/core/fling/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,16 @@
#include <unistd.h>
#endif

#include "test/core/util/grpc_profiler.h"
#include "test/core/util/test_config.h"
#include <grpc/support/alloc.h>
#include <grpc/support/cmdline.h>
#include <grpc/support/host_port.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include "src/core/profiling/timers.h"
#include "test/core/util/port.h"
#include "test/core/end2end/data/ssl_test_data.h"
#include "test/core/util/grpc_profiler.h"
#include "test/core/util/test_config.h"

static grpc_completion_queue *cq;
static grpc_server *server;
Expand Down Expand Up @@ -192,6 +193,8 @@ int main(int argc, char **argv) {

char *fake_argv[1];

gpr_timers_set_log_filename("latency_trace.fling_server.txt");

GPR_ASSERT(argc >= 1);
fake_argv[0] = argv[0];
grpc_test_init(1, fake_argv);
Expand Down
3 changes: 2 additions & 1 deletion tools/profiling/latency_profile/profile_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def mark(self, line):
self.call_stack_builder.lines.append(line_item)

def finish(self, line):
assert line['tag'] == self.top_line.tag, 'expected %s, got %s' % (self.top_line.tag, line['tag'])
assert line['tag'] == self.top_line.tag, 'expected %s, got %s; thread=%s; t0=%f t1=%f' % (self.top_line.tag, line['tag'], line['thd'], self.top_line.start_time, line['t'])
final_time_stamp = line['t']
assert self.top_line.end_time is None
self.top_line.end_time = final_time_stamp
Expand Down Expand Up @@ -84,6 +84,7 @@ def add(self, line):
self.stk.append(ScopeBuilder(self, line))
return False
elif line_type == '}':
assert self.stk, 'expected non-empty stack for closing %s; thread=%s; t=%f' % (line['tag'], line['thd'], line['t'])
self.stk.pop().finish(line)
if not self.stk:
self.finish()
Expand Down

0 comments on commit 61ead3e

Please sign in to comment.