diff --git a/.gitignore b/.gitignore index 40f4c4734c7c6..f059eca2393b7 100644 --- a/.gitignore +++ b/.gitignore @@ -38,6 +38,7 @@ cache.mk # Temporary test reports report.xml latency_trace.txt +latency_trace.*.txt # port server log portlog.txt diff --git a/src/core/profiling/basic_timers.c b/src/core/profiling/basic_timers.c index 527a1601019b2..f0fce7858d3d1 100644 --- a/src/core/profiling/basic_timers.c +++ b/src/core/profiling/basic_timers.c @@ -53,50 +53,186 @@ typedef struct gpr_timer_entry { short line; char type; gpr_uint8 important; + int thd; } gpr_timer_entry; -#define MAX_COUNT (5 * 1024 * 1024 / sizeof(gpr_timer_entry)) +#define MAX_COUNT 1000000 -static __thread gpr_timer_entry g_log[MAX_COUNT]; -static __thread int g_count; +typedef struct gpr_timer_log { + size_t num_entries; + struct gpr_timer_log *next; + struct gpr_timer_log *prev; + gpr_timer_entry log[MAX_COUNT]; +} gpr_timer_log; + +typedef struct gpr_timer_log_list { + gpr_timer_log *head; + /* valid iff head!=NULL */ + gpr_timer_log *tail; +} gpr_timer_log_list; + +static __thread gpr_timer_log *g_thread_log; static gpr_once g_once_init = GPR_ONCE_INIT; static FILE *output_file; +static const char *output_filename = "latency_trace.txt"; +static pthread_mutex_t g_mu; +static pthread_cond_t g_cv; +static gpr_timer_log_list g_in_progress_logs; +static gpr_timer_log_list g_done_logs; +static int g_shutdown; +static gpr_thd_id g_writing_thread; +static __thread int g_thread_id; +static int g_next_thread_id; -static void close_output() { fclose(output_file); } +static int timer_log_push_back(gpr_timer_log_list *list, gpr_timer_log *log) { + if (list->head == NULL) { + list->head = list->tail = log; + log->next = log->prev = NULL; + return 1; + } else { + log->prev = list->tail; + log->next = NULL; + list->tail->next = log; + list->tail = log; + return 0; + } +} -static void init_output() { - output_file = fopen("latency_trace.txt", "w"); - GPR_ASSERT(output_file); - atexit(close_output); +static gpr_timer_log *timer_log_pop_front(gpr_timer_log_list *list) { + gpr_timer_log *out = list->head; + if (out != NULL) { + list->head = out->next; + if (list->head != NULL) { + list->head->prev = NULL; + } else { + list->tail = NULL; + } + } + return out; } -static void log_report() { - int i; - gpr_once_init(&g_once_init, init_output); - for (i = 0; i < g_count; i++) { - gpr_timer_entry *entry = &(g_log[i]); +static void timer_log_remove(gpr_timer_log_list *list, gpr_timer_log *log) { + if (log->prev == NULL) { + list->head = log->next; + if (list->head != NULL) { + list->head->prev = NULL; + } + } else { + log->prev->next = log->next; + } + if (log->next == NULL) { + list->tail = log->prev; + if (list->tail != NULL) { + list->tail->next = NULL; + } + } else { + log->next->prev = log->prev; + } +} + +static void write_log(gpr_timer_log *log) { + size_t i; + if (output_file == NULL) { + output_file = fopen(output_filename, "w"); + } + for (i = 0; i < log->num_entries; i++) { + gpr_timer_entry *entry = &(log->log[i]); + if (gpr_time_cmp(entry->tm, gpr_time_0(entry->tm.clock_type)) < 0) { + entry->tm = gpr_time_0(entry->tm.clock_type); + } fprintf(output_file, - "{\"t\": %ld.%09d, \"thd\": \"%p\", \"type\": \"%c\", \"tag\": " + "{\"t\": %ld.%09d, \"thd\": \"%d\", \"type\": \"%c\", \"tag\": " "\"%s\", \"file\": \"%s\", \"line\": %d, \"imp\": %d}\n", - entry->tm.tv_sec, entry->tm.tv_nsec, - (void *)(gpr_intptr)gpr_thd_currentid(), entry->type, entry->tagstr, - entry->file, entry->line, entry->important); + entry->tm.tv_sec, entry->tm.tv_nsec, entry->thd, entry->type, + entry->tagstr, entry->file, entry->line, entry->important); + } +} + +static void writing_thread(void *unused) { + gpr_timer_log *log; + pthread_mutex_lock(&g_mu); + for (;;) { + while ((log = timer_log_pop_front(&g_done_logs)) == NULL && !g_shutdown) { + pthread_cond_wait(&g_cv, &g_mu); + } + if (log != NULL) { + pthread_mutex_unlock(&g_mu); + write_log(log); + free(log); + pthread_mutex_lock(&g_mu); + } + if (g_shutdown) { + pthread_mutex_unlock(&g_mu); + return; + } } +} - /* Now clear out the log */ - g_count = 0; +static void flush_logs(gpr_timer_log_list *list) { + gpr_timer_log *log; + while ((log = timer_log_pop_front(list)) != NULL) { + write_log(log); + free(log); + } +} + +static void finish_writing() { + pthread_mutex_lock(&g_mu); + g_shutdown = 1; + pthread_cond_signal(&g_cv); + pthread_mutex_unlock(&g_mu); + gpr_thd_join(g_writing_thread); + + gpr_log(GPR_INFO, "flushing logs"); + + pthread_mutex_lock(&g_mu); + flush_logs(&g_done_logs); + flush_logs(&g_in_progress_logs); + pthread_mutex_unlock(&g_mu); + + if (output_file) { + fclose(output_file); + } +} + +void gpr_timers_set_log_filename(const char *filename) { + output_filename = filename; +} + +static void init_output() { + gpr_thd_options options = gpr_thd_options_default(); + gpr_thd_options_set_joinable(&options); + gpr_thd_new(&g_writing_thread, writing_thread, NULL, &options); + atexit(finish_writing); +} + +static void rotate_log() { + gpr_timer_log *new = malloc(sizeof(*new)); + gpr_once_init(&g_once_init, init_output); + new->num_entries = 0; + pthread_mutex_lock(&g_mu); + if (g_thread_log != NULL) { + timer_log_remove(&g_in_progress_logs, g_thread_log); + if (timer_log_push_back(&g_done_logs, g_thread_log)) { + pthread_cond_signal(&g_cv); + } + } else { + g_thread_id = g_next_thread_id++; + } + timer_log_push_back(&g_in_progress_logs, new); + pthread_mutex_unlock(&g_mu); + g_thread_log = new; } static void gpr_timers_log_add(const char *tagstr, marker_type type, int important, const char *file, int line) { gpr_timer_entry *entry; - /* TODO (vpai) : Improve concurrency */ - if (g_count == MAX_COUNT) { - log_report(); + if (g_thread_log == NULL || g_thread_log->num_entries == MAX_COUNT) { + rotate_log(); } - entry = &g_log[g_count++]; + entry = &g_thread_log->log[g_thread_log->num_entries++]; entry->tm = gpr_now(GPR_CLOCK_PRECISE); entry->tagstr = tagstr; @@ -104,6 +240,7 @@ static void gpr_timers_log_add(const char *tagstr, marker_type type, entry->file = file; entry->line = (short)line; entry->important = important != 0; + entry->thd = g_thread_id; } /* Latency profiler API implementation. */ @@ -131,4 +268,6 @@ void gpr_timers_global_destroy(void) {} void gpr_timers_global_init(void) {} void gpr_timers_global_destroy(void) {} + +void gpr_timers_set_log_filename(const char *filename) {} #endif /* GRPC_BASIC_PROFILER */ diff --git a/src/core/profiling/timers.h b/src/core/profiling/timers.h index 0d112e7248551..6a188dc56648b 100644 --- a/src/core/profiling/timers.h +++ b/src/core/profiling/timers.h @@ -48,6 +48,8 @@ void gpr_timer_begin(const char *tagstr, int important, const char *file, void gpr_timer_end(const char *tagstr, int important, const char *file, int line); +void gpr_timers_set_log_filename(const char *filename); + #if !(defined(GRPC_STAP_PROFILER) + defined(GRPC_BASIC_PROFILER)) /* No profiling. No-op all the things. */ #define GPR_TIMER_MARK(tag, important) \ diff --git a/src/core/support/thd_posix.c b/src/core/support/thd_posix.c index c36d94d044d79..653a1c88c13b8 100644 --- a/src/core/support/thd_posix.c +++ b/src/core/support/thd_posix.c @@ -53,7 +53,7 @@ struct thd_arg { /* Body of every thread started via gpr_thd_new. */ static void *thread_body(void *v) { struct thd_arg a = *(struct thd_arg *)v; - gpr_free(v); + free(v); (*a.body)(a.arg); return NULL; } @@ -63,7 +63,10 @@ int gpr_thd_new(gpr_thd_id *t, void (*thd_body)(void *arg), void *arg, int thread_started; pthread_attr_t attr; pthread_t p; - struct thd_arg *a = gpr_malloc(sizeof(*a)); + /* don't use gpr_malloc as we may cause an infinite recursion with + * the profiling code */ + struct thd_arg *a = malloc(sizeof(*a)); + GPR_ASSERT(a != NULL); a->body = thd_body; a->arg = arg; @@ -78,7 +81,7 @@ int gpr_thd_new(gpr_thd_id *t, void (*thd_body)(void *arg), void *arg, thread_started = (pthread_create(&p, &attr, &thread_body, a) == 0); GPR_ASSERT(pthread_attr_destroy(&attr) == 0); if (!thread_started) { - gpr_free(a); + free(a); } *t = (gpr_thd_id)p; return thread_started; diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c index 353e476e40df6..7fe5649e5ef0e 100644 --- a/src/core/transport/chttp2/writing.c +++ b/src/core/transport/chttp2/writing.c @@ -315,6 +315,8 @@ static void finalize_outbuf(grpc_exec_ctx *exec_ctx, grpc_chttp2_list_add_written_stream(transport_writing, stream_writing); } } + + GPR_TIMER_END("finalize_outbuf", 0); } void grpc_chttp2_cleanup_writing( diff --git a/test/core/fling/client.c b/test/core/fling/client.c index a53411c2f56c4..99b30d6c4a902 100644 --- a/test/core/fling/client.c +++ b/test/core/fling/client.c @@ -41,6 +41,7 @@ #include #include #include +#include "src/core/profiling/timers.h" #include "test/core/util/grpc_profiler.h" #include "test/core/util/test_config.h" @@ -89,6 +90,7 @@ static void init_ping_pong_request(void) { } static void step_ping_pong_request(void) { + GPR_TIMER_BEGIN("ping_pong", 1); call = grpc_channel_create_call(channel, NULL, GRPC_PROPAGATE_DEFAULTS, cq, "/Reflector/reflectUnary", "localhost", gpr_inf_future(GPR_CLOCK_REALTIME), NULL); @@ -99,6 +101,7 @@ static void step_ping_pong_request(void) { grpc_call_destroy(call); grpc_byte_buffer_destroy(response_payload_recv); call = NULL; + GPR_TIMER_END("ping_pong", 1); } static void init_ping_pong_stream(void) { @@ -122,10 +125,12 @@ static void init_ping_pong_stream(void) { static void step_ping_pong_stream(void) { grpc_call_error error; + GPR_TIMER_BEGIN("ping_pong", 1); error = grpc_call_start_batch(call, stream_step_ops, 2, (void *)1, NULL); GPR_ASSERT(GRPC_CALL_OK == error); grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME), NULL); grpc_byte_buffer_destroy(response_payload_recv); + GPR_TIMER_END("ping_pong", 1); } static double now(void) { @@ -159,12 +164,14 @@ int main(int argc, char **argv) { char *scenario_name = "ping-pong-request"; scenario sc = {NULL, NULL, NULL}; + gpr_timers_set_log_filename("latency_trace.fling_client.txt"); + + grpc_init(); + GPR_ASSERT(argc >= 1); fake_argv[0] = argv[0]; grpc_test_init(1, fake_argv); - grpc_init(); - cl = gpr_cmdline_create("fling client"); gpr_cmdline_add_int(cl, "payload_size", "Size of the payload to send", &payload_size); diff --git a/test/core/fling/server.c b/test/core/fling/server.c index 67631e5a07e57..14a1a815bcb98 100644 --- a/test/core/fling/server.c +++ b/test/core/fling/server.c @@ -44,15 +44,16 @@ #include #endif -#include "test/core/util/grpc_profiler.h" -#include "test/core/util/test_config.h" #include #include #include #include #include +#include "src/core/profiling/timers.h" #include "test/core/util/port.h" #include "test/core/end2end/data/ssl_test_data.h" +#include "test/core/util/grpc_profiler.h" +#include "test/core/util/test_config.h" static grpc_completion_queue *cq; static grpc_server *server; @@ -192,6 +193,8 @@ int main(int argc, char **argv) { char *fake_argv[1]; + gpr_timers_set_log_filename("latency_trace.fling_server.txt"); + GPR_ASSERT(argc >= 1); fake_argv[0] = argv[0]; grpc_test_init(1, fake_argv); diff --git a/tools/profiling/latency_profile/profile_analyzer.py b/tools/profiling/latency_profile/profile_analyzer.py index b12e1074e521b..b2a38ea60a18a 100755 --- a/tools/profiling/latency_profile/profile_analyzer.py +++ b/tools/profiling/latency_profile/profile_analyzer.py @@ -49,7 +49,7 @@ def mark(self, line): self.call_stack_builder.lines.append(line_item) def finish(self, line): - assert line['tag'] == self.top_line.tag, 'expected %s, got %s' % (self.top_line.tag, line['tag']) + assert line['tag'] == self.top_line.tag, 'expected %s, got %s; thread=%s; t0=%f t1=%f' % (self.top_line.tag, line['tag'], line['thd'], self.top_line.start_time, line['t']) final_time_stamp = line['t'] assert self.top_line.end_time is None self.top_line.end_time = final_time_stamp @@ -84,6 +84,7 @@ def add(self, line): self.stk.append(ScopeBuilder(self, line)) return False elif line_type == '}': + assert self.stk, 'expected non-empty stack for closing %s; thread=%s; t=%f' % (line['tag'], line['thd'], line['t']) self.stk.pop().finish(line) if not self.stk: self.finish()