Skip to content

Commit

Permalink
Merge pull request grpc#1856 from ctiller/count-the-things
Browse files Browse the repository at this point in the history
Label all iomgr objects
  • Loading branch information
nicolasnoble committed Jun 2, 2015
2 parents 206e6e8 + 33da332 commit 19fdb33
Show file tree
Hide file tree
Showing 22 changed files with 156 additions and 72 deletions.
3 changes: 2 additions & 1 deletion src/core/iomgr/endpoint_pair.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ typedef struct {
grpc_endpoint *server;
} grpc_endpoint_pair;

grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(size_t read_slice_size);
grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name,
size_t read_slice_size);

#endif /* GRPC_INTERNAL_CORE_IOMGR_ENDPOINT_PAIR_H */
17 changes: 14 additions & 3 deletions src/core/iomgr/endpoint_pair_posix.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
#include <sys/socket.h>

#include "src/core/iomgr/tcp_posix.h"
#include "src/core/support/string.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>

static void create_sockets(int sv[2]) {
Expand All @@ -55,12 +57,21 @@ static void create_sockets(int sv[2]) {
GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0);
}

grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(size_t read_slice_size) {
grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name,
size_t read_slice_size) {
int sv[2];
grpc_endpoint_pair p;
char *final_name;
create_sockets(sv);
p.client = grpc_tcp_create(grpc_fd_create(sv[1]), read_slice_size);
p.server = grpc_tcp_create(grpc_fd_create(sv[0]), read_slice_size);

gpr_asprintf(&final_name, "%s:client", name);
p.client =
grpc_tcp_create(grpc_fd_create(sv[1], final_name), read_slice_size);
gpr_free(final_name);
gpr_asprintf(&final_name, "%s:server", name);
p.server =
grpc_tcp_create(grpc_fd_create(sv[0], final_name), read_slice_size);
gpr_free(final_name);
return p;
}

Expand Down
6 changes: 3 additions & 3 deletions src/core/iomgr/endpoint_pair_windows.c
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,12 @@ static void create_sockets(SOCKET sv[2]) {
sv[0] = svr_sock;
}

grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(size_t read_slice_size) {
grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name, size_t read_slice_size) {
SOCKET sv[2];
grpc_endpoint_pair p;
create_sockets(sv);
p.client = grpc_tcp_create(grpc_winsocket_create(sv[1]));
p.server = grpc_tcp_create(grpc_winsocket_create(sv[0]));
p.client = grpc_tcp_create(grpc_winsocket_create(sv[1], "endpoint:client"));
p.server = grpc_tcp_create(grpc_winsocket_create(sv[0], "endpoint:server"));
return p;
}

Expand Down
7 changes: 3 additions & 4 deletions src/core/iomgr/fd_posix.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
#include <sys/socket.h>
#include <unistd.h>

#include "src/core/iomgr/iomgr_internal.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/useful.h>
Expand Down Expand Up @@ -119,7 +118,7 @@ static void unref_by(grpc_fd *fd, int n) {
if (old == n) {
grpc_iomgr_add_callback(&fd->on_done_closure);
freelist_fd(fd);
grpc_iomgr_unref();
grpc_iomgr_unregister_object(&fd->iomgr_object);
} else {
GPR_ASSERT(old > n);
}
Expand All @@ -138,9 +137,9 @@ void grpc_fd_global_shutdown(void) {

static void do_nothing(void *ignored, int success) {}

grpc_fd *grpc_fd_create(int fd) {
grpc_fd *grpc_fd_create(int fd, const char *name) {
grpc_fd *r = alloc_fd(fd);
grpc_iomgr_ref();
grpc_iomgr_register_object(&r->iomgr_object, name);
grpc_pollset_add_fd(grpc_backup_pollset(), r);
return r;
}
Expand Down
6 changes: 4 additions & 2 deletions src/core/iomgr/fd_posix.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
#ifndef GRPC_INTERNAL_CORE_IOMGR_FD_POSIX_H
#define GRPC_INTERNAL_CORE_IOMGR_FD_POSIX_H

#include "src/core/iomgr/iomgr.h"
#include "src/core/iomgr/iomgr_internal.h"
#include "src/core/iomgr/pollset.h"
#include <grpc/support/atm.h>
#include <grpc/support/sync.h>
Expand Down Expand Up @@ -95,12 +95,14 @@ struct grpc_fd {

grpc_iomgr_closure on_done_closure;
grpc_iomgr_closure *shutdown_closures[2];

grpc_iomgr_object iomgr_object;
};

/* Create a wrapped file descriptor.
Requires fd is a non-blocking file descriptor.
This takes ownership of closing fd. */
grpc_fd *grpc_fd_create(int fd);
grpc_fd *grpc_fd_create(int fd, const char *name);

/* Releases fd to be asynchronously destroyed.
on_done is called when the underlying file descriptor is definitely close()d.
Expand Down
63 changes: 43 additions & 20 deletions src/core/iomgr/iomgr.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

#include "src/core/iomgr/iomgr_internal.h"
#include "src/core/iomgr/alarm_internal.h"
#include "src/core/support/string.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/thd.h>
Expand All @@ -47,8 +48,8 @@ static gpr_cv g_rcv;
static grpc_iomgr_closure *g_cbs_head = NULL;
static grpc_iomgr_closure *g_cbs_tail = NULL;
static int g_shutdown;
static int g_refs;
static gpr_event g_background_callback_executor_done;
static grpc_iomgr_object g_root_object;

/* Execute followup callbacks continuously.
Other threads may check in and help during pollset_work() */
Expand Down Expand Up @@ -88,33 +89,48 @@ void grpc_iomgr_init(void) {
gpr_mu_init(&g_mu);
gpr_cv_init(&g_rcv);
grpc_alarm_list_init(gpr_now());
g_refs = 0;
g_root_object.next = g_root_object.prev = &g_root_object;
g_root_object.name = "root";
grpc_iomgr_platform_init();
gpr_event_init(&g_background_callback_executor_done);
gpr_thd_new(&id, background_callback_executor, NULL, NULL);
}

static size_t count_objects(void) {
grpc_iomgr_object *obj;
size_t n = 0;
for (obj = g_root_object.next; obj != &g_root_object; obj = obj->next) {
n++;
}
return n;
}

void grpc_iomgr_shutdown(void) {
grpc_iomgr_object *obj;
grpc_iomgr_closure *closure;
gpr_timespec shutdown_deadline =
gpr_time_add(gpr_now(), gpr_time_from_seconds(10));


gpr_mu_lock(&g_mu);
g_shutdown = 1;
while (g_cbs_head || g_refs) {
gpr_log(GPR_DEBUG, "Waiting for %d iomgr objects to be destroyed%s", g_refs,
while (g_cbs_head || g_root_object.next != &g_root_object) {
size_t nobjs = count_objects();
gpr_log(GPR_DEBUG, "Waiting for %d iomgr objects to be destroyed%s", nobjs,
g_cbs_head ? " and executing final callbacks" : "");
while (g_cbs_head) {
closure = g_cbs_head;
g_cbs_head = closure->next;
if (!g_cbs_head) g_cbs_tail = NULL;
gpr_mu_unlock(&g_mu);

closure->cb(closure->cb_arg, 0);
gpr_mu_lock(&g_mu);
if (g_cbs_head) {
do {
closure = g_cbs_head;
g_cbs_head = closure->next;
if (!g_cbs_head) g_cbs_tail = NULL;
gpr_mu_unlock(&g_mu);

closure->cb(closure->cb_arg, 0);
gpr_mu_lock(&g_mu);
} while (g_cbs_head);
continue;
}
if (g_refs) {
if (nobjs > 0) {
int timeout = 0;
gpr_timespec short_deadline = gpr_time_add(gpr_now(),
gpr_time_from_millis(100));
Expand All @@ -128,7 +144,10 @@ void grpc_iomgr_shutdown(void) {
gpr_log(GPR_DEBUG,
"Failed to free %d iomgr objects before shutdown deadline: "
"memory leaks are likely",
g_refs);
count_objects());
for (obj = g_root_object.next; obj != &g_root_object; obj = obj->next) {
gpr_log(GPR_DEBUG, "LEAKED OBJECT: %s", obj->name);
}
break;
}
}
Expand All @@ -144,17 +163,21 @@ void grpc_iomgr_shutdown(void) {
gpr_cv_destroy(&g_rcv);
}

void grpc_iomgr_ref(void) {
void grpc_iomgr_register_object(grpc_iomgr_object *obj, const char *name) {
obj->name = gpr_strdup(name);
gpr_mu_lock(&g_mu);
++g_refs;
obj->next = &g_root_object;
obj->prev = obj->next->prev;
obj->next->prev = obj->prev->next = obj;
gpr_mu_unlock(&g_mu);
}

void grpc_iomgr_unref(void) {
void grpc_iomgr_unregister_object(grpc_iomgr_object *obj) {
gpr_free(obj->name);
gpr_mu_lock(&g_mu);
if (0 == --g_refs) {
gpr_cv_signal(&g_rcv);
}
obj->next->prev = obj->prev;
obj->prev->next = obj->next;
gpr_cv_signal(&g_rcv);
gpr_mu_unlock(&g_mu);
}

Expand Down
10 changes: 8 additions & 2 deletions src/core/iomgr/iomgr_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,17 @@
#include "src/core/iomgr/iomgr.h"
#include <grpc/support/sync.h>

typedef struct grpc_iomgr_object {
char *name;
struct grpc_iomgr_object *next;
struct grpc_iomgr_object *prev;
} grpc_iomgr_object;

int grpc_maybe_call_delayed_callbacks(gpr_mu *drop_mu, int success);
void grpc_iomgr_add_delayed_callback(grpc_iomgr_closure *iocb, int success);

void grpc_iomgr_ref(void);
void grpc_iomgr_unref(void);
void grpc_iomgr_register_object(grpc_iomgr_object *obj, const char *name);
void grpc_iomgr_unregister_object(grpc_iomgr_object *obj);

void grpc_iomgr_platform_init(void);
void grpc_iomgr_platform_shutdown(void);
Expand Down
9 changes: 7 additions & 2 deletions src/core/iomgr/resolve_address_posix.c
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ typedef struct {
char *default_port;
grpc_resolve_cb cb;
void *arg;
grpc_iomgr_object iomgr_object;
} request;

grpc_resolved_addresses *grpc_blocking_resolve_address(
Expand Down Expand Up @@ -153,9 +154,9 @@ static void do_request(void *rp) {
grpc_resolve_cb cb = r->cb;
gpr_free(r->name);
gpr_free(r->default_port);
grpc_iomgr_unregister_object(&r->iomgr_object);
gpr_free(r);
cb(arg, resolved);
grpc_iomgr_unref();
}

void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addrs) {
Expand All @@ -167,7 +168,11 @@ void grpc_resolve_address(const char *name, const char *default_port,
grpc_resolve_cb cb, void *arg) {
request *r = gpr_malloc(sizeof(request));
gpr_thd_id id;
grpc_iomgr_ref();
char *tmp;
gpr_asprintf(&tmp, "resolve_address:name='%s':default_port='%s'", name,
default_port);
grpc_iomgr_register_object(&r->iomgr_object, tmp);
gpr_free(tmp);
r->name = gpr_strdup(name);
r->default_port = gpr_strdup(default_port);
r->cb = cb;
Expand Down
8 changes: 6 additions & 2 deletions src/core/iomgr/resolve_address_windows.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ typedef struct {
char *default_port;
grpc_resolve_cb cb;
void *arg;
grpc_iomgr_object iomgr_object;
} request;

grpc_resolved_addresses *grpc_blocking_resolve_address(
Expand Down Expand Up @@ -135,7 +136,7 @@ static void do_request(void *rp) {
gpr_free(r->default_port);
gpr_free(r);
cb(arg, resolved);
grpc_iomgr_unref();
grpc_iomgr_unregister_object(&r->iomgr_object);
}

void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addrs) {
Expand All @@ -147,7 +148,10 @@ void grpc_resolve_address(const char *name, const char *default_port,
grpc_resolve_cb cb, void *arg) {
request *r = gpr_malloc(sizeof(request));
gpr_thd_id id;
grpc_iomgr_ref();
const char *label;
gpr_asprintf(&label, "resolve:%s", name);
grpc_iomgr_register_object(&r->iomgr_object, label);
gpr_free(label);
r->name = gpr_strdup(name);
r->default_port = gpr_strdup(default_port);
r->cb = cb;
Expand Down
6 changes: 3 additions & 3 deletions src/core/iomgr/socket_windows.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@
#include "src/core/iomgr/pollset_windows.h"
#include "src/core/iomgr/socket_windows.h"

grpc_winsocket *grpc_winsocket_create(SOCKET socket) {
grpc_winsocket *grpc_winsocket_create(SOCKET socket, const char *name) {
grpc_winsocket *r = gpr_malloc(sizeof(grpc_winsocket));
memset(r, 0, sizeof(grpc_winsocket));
r->socket = socket;
gpr_mu_init(&r->state_mu);
grpc_iomgr_ref();
grpc_iomgr_register_object(&r->iomgr_object, name);
grpc_iocp_add_socket(r);
return r;
}
Expand Down Expand Up @@ -91,7 +91,7 @@ void grpc_winsocket_orphan(grpc_winsocket *winsocket) {
grpc_winsocket_destroy(winsocket);
}
closesocket(socket);
grpc_iomgr_unref();
grpc_iomgr_unregister_object(&winsocket->iomgr_object);
}

void grpc_winsocket_destroy(grpc_winsocket *winsocket) {
Expand Down
7 changes: 5 additions & 2 deletions src/core/iomgr/socket_windows.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
#include <grpc/support/sync.h>
#include <grpc/support/atm.h>

#include "src/core/iomgr/iomgr.h"
#include "src/core/iomgr/iomgr_internal.h"

/* This holds the data for an outstanding read or write on a socket.
The mutex to protect the concurrent access to that data is the one
Expand Down Expand Up @@ -97,11 +97,14 @@ typedef struct grpc_winsocket {
int orphan;

grpc_iomgr_closure shutdown_closure;

/* A label for iomgr to track outstanding objects */
grpc_iomgr_object iomgr_object;
} grpc_winsocket;

/* Create a wrapped windows handle. This takes ownership of it, meaning that
it will be responsible for closing it. */
grpc_winsocket *grpc_winsocket_create(SOCKET socket);
grpc_winsocket *grpc_winsocket_create(SOCKET socket, const char *name);

/* Initiate an asynchronous shutdown of the socket. Will call off any pending
operation to cancel them. Returns the number of callbacks that got setup. */
Expand Down
Loading

0 comments on commit 19fdb33

Please sign in to comment.