Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make grpc_tcp_listener private. #4680

Merged
merged 14 commits into from
Jan 25, 2016
Next Next commit
Make grpc_tcp_listener private.
  • Loading branch information
daniel-j-born committed Jan 12, 2016
commit fa6b606898b18c22cdcbaa006338fe1d57d8f93f
52 changes: 31 additions & 21 deletions src/core/iomgr/tcp_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,45 +39,55 @@
/* Forward decl of grpc_tcp_server */
typedef struct grpc_tcp_server grpc_tcp_server;

/* Forward decl of grpc_tcp_listener */
typedef struct grpc_tcp_listener grpc_tcp_listener;

/* Called for newly connected TCP connections. */
/* Called for newly connected TCP connections. Callee owns a ref on
from_server. */
typedef void (*grpc_tcp_server_cb)(grpc_exec_ctx *exec_ctx, void *arg,
grpc_endpoint *ep);
grpc_endpoint *ep,
grpc_tcp_server *from_server,
unsigned port_index, unsigned fd_index);

/* Create a server, initially not bound to any ports */
grpc_tcp_server *grpc_tcp_server_create(void);
/* Create a server, initially not bound to any ports. The caller owns one ref.
If shutdown_complete is not NULL, it will be used by
grpc_tcp_server_unref(). */
grpc_tcp_server *grpc_tcp_server_create(grpc_closure *shutdown_complete);

/* Start listening to bound ports */
void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *server,
grpc_pollset **pollsets, size_t pollset_count,
grpc_tcp_server_cb on_accept_cb, void *cb_arg);

/* Add a port to the server, returning the newly created listener on success,
or a null pointer on failure.
/* Add a port to the server, returning the newly allocated port on success, or
-1 on failure.

The :: and 0.0.0.0 wildcard addresses are treated identically, accepting
both IPv4 and IPv6 connections, but :: is the preferred style. This usually
creates one socket, but possibly two on systems which support IPv6,
but not dualstack sockets. */
/* TODO(ctiller): deprecate this, and make grpc_tcp_server_add_ports to handle
all of the multiple socket port matching logic in one place */
grpc_tcp_listener *grpc_tcp_server_add_port(grpc_tcp_server *s,
const void *addr, size_t addr_len);
int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
size_t addr_len);

/* Number of fds at the given port_index, or 0 if port_index is out of
bounds. */
unsigned grpc_tcp_server_fds_for_port(grpc_tcp_server *s, unsigned port_index);

/* Returns the file descriptor of the Nth listening socket on this server,
or -1 if the index is out of bounds.
/* Returns the file descriptor of the Mth (fd_index) listening socket of the Nth
(port_index) call to add_port() on this server, or -1 if the indices are out
of bounds. The file descriptor remains owned by the server, and will be
cleaned up when grpc_tcp_server_destroy is called. */
int grpc_tcp_server_get_fd(grpc_tcp_server *s, unsigned port_index,
unsigned fd_index);

The file descriptor remains owned by the server, and will be cleaned
up when grpc_tcp_server_destroy is called. */
int grpc_tcp_server_get_fd(grpc_tcp_server *s, unsigned index);
/* Ref s and return s. */
grpc_tcp_server *grpc_tcp_server_ref(grpc_tcp_server *s);

void grpc_tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *server,
grpc_closure *closure);
/* Set or reset the shutdown_complete closure. shutdown_complete may be NULL. */
void grpc_tcp_server_set_shutdown_complete(grpc_tcp_server *s,
grpc_closure *shutdown_complete);

int grpc_tcp_listener_get_port(grpc_tcp_listener *listener);
void grpc_tcp_listener_ref(grpc_tcp_listener *listener);
void grpc_tcp_listener_unref(grpc_tcp_listener *listener);
/* If the recount drops to zero, delete s, and call (exec_ctx==NULL) or enqueue
a call (exec_ctx!=NULL) to shutdown_complete. */
void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s);

#endif /* GRPC_INTERNAL_CORE_IOMGR_TCP_SERVER_H */
133 changes: 88 additions & 45 deletions src/core/iomgr/tcp_server_posix.c
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ static gpr_once s_init_max_accept_queue_size;
static int s_max_accept_queue_size;

/* one listening port */
typedef struct grpc_tcp_listener grpc_tcp_listener;
struct grpc_tcp_listener {
int fd;
grpc_fd *emfd;
Expand All @@ -84,9 +85,10 @@ struct grpc_tcp_listener {
} addr;
size_t addr_len;
int port;
unsigned port_index;
unsigned fd_index;
grpc_closure read_closure;
grpc_closure destroyed_closure;
gpr_refcount refs;
struct grpc_tcp_listener *next;
/* When we add a listener, more than one can be created, mainly because of
IPv6. A sibling will still be in the normal list, but will be flagged
Expand All @@ -106,6 +108,7 @@ static void unlink_if_unix_domain_socket(const struct sockaddr_un *un) {

/* the overall server */
struct grpc_tcp_server {
gpr_refcount refs;
/* Called whenever accept() succeeds on a server port. */
grpc_tcp_server_cb on_accept_cb;
void *on_accept_cb_arg;
Expand All @@ -122,6 +125,7 @@ struct grpc_tcp_server {

/* linked list of server ports */
grpc_tcp_listener *head;
grpc_tcp_listener *tail;
unsigned nports;

/* shutdown callback */
Expand All @@ -133,28 +137,33 @@ struct grpc_tcp_server {
size_t pollset_count;
};

grpc_tcp_server *grpc_tcp_server_create(void) {
grpc_tcp_server *grpc_tcp_server_create(grpc_closure *shutdown_complete) {
grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server));
gpr_ref_init(&s->refs, 1);
gpr_mu_init(&s->mu);
s->active_ports = 0;
s->destroyed_ports = 0;
s->shutdown = 0;
s->shutdown_complete = shutdown_complete;
s->on_accept_cb = NULL;
s->on_accept_cb_arg = NULL;
s->head = NULL;
s->tail = NULL;
s->nports = 0;
return s;
}

static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
grpc_exec_ctx_enqueue(exec_ctx, s->shutdown_complete, 1);
if (s->shutdown_complete != NULL) {
grpc_exec_ctx_enqueue(exec_ctx, s->shutdown_complete, 1);
}

gpr_mu_destroy(&s->mu);

while (s->head) {
grpc_tcp_listener *sp = s->head;
s->head = sp->next;
grpc_tcp_listener_unref(sp);
gpr_free(sp);
}

gpr_free(s);
Expand Down Expand Up @@ -203,15 +212,13 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
}
}

void grpc_tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s,
grpc_closure *closure) {
static void grpc_tcp_server_destroy(grpc_exec_ctx *exec_ctx,
grpc_tcp_server *s) {
gpr_mu_lock(&s->mu);

GPR_ASSERT(!s->shutdown);
s->shutdown = 1;

s->shutdown_complete = closure;

/* shutdown all fd's */
if (s->active_ports) {
grpc_tcp_listener *sp;
Expand Down Expand Up @@ -355,7 +362,8 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, int success) {
}
sp->server->on_accept_cb(
exec_ctx, sp->server->on_accept_cb_arg,
grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str));
grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str),
grpc_tcp_server_ref(sp->server), sp->port_index, sp->fd_index);

gpr_free(name);
gpr_free(addr_str);
Expand All @@ -375,7 +383,9 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, int success) {

static grpc_tcp_listener *add_socket_to_server(grpc_tcp_server *s, int fd,
const struct sockaddr *addr,
size_t addr_len) {
size_t addr_len,
unsigned port_index,
unsigned fd_index) {
grpc_tcp_listener *sp = NULL;
int port;
char *addr_str;
Expand All @@ -389,17 +399,23 @@ static grpc_tcp_listener *add_socket_to_server(grpc_tcp_server *s, int fd,
s->nports++;
GPR_ASSERT(!s->on_accept_cb && "must add ports before starting server");
sp = gpr_malloc(sizeof(grpc_tcp_listener));
sp->next = s->head;
s->head = sp;
sp->next = NULL;
if (s->head == NULL) {
s->head = sp;
} else {
s->tail->next = sp;
}
s->tail = sp;
sp->server = s;
sp->fd = fd;
sp->emfd = grpc_fd_create(fd, name);
memcpy(sp->addr.untyped, addr, addr_len);
sp->addr_len = addr_len;
sp->port = port;
sp->port_index = port_index;
sp->fd_index = fd_index;
sp->is_sibling = 0;
sp->sibling = NULL;
gpr_ref_init(&sp->refs, 1);
GPR_ASSERT(sp->emfd);
gpr_mu_unlock(&s->mu);
gpr_free(addr_str);
Expand All @@ -409,8 +425,8 @@ static grpc_tcp_listener *add_socket_to_server(grpc_tcp_server *s, int fd,
return sp;
}

grpc_tcp_listener *grpc_tcp_server_add_port(grpc_tcp_server *s,
const void *addr, size_t addr_len) {
int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
size_t addr_len) {
grpc_tcp_listener *sp;
grpc_tcp_listener *sp2 = NULL;
int fd;
Expand All @@ -423,7 +439,11 @@ grpc_tcp_listener *grpc_tcp_server_add_port(grpc_tcp_server *s,
struct sockaddr_storage sockname_temp;
socklen_t sockname_len;
int port;

unsigned port_index = 0;
unsigned fd_index = 0;
if (s->tail != NULL) {
port_index = s->tail->port_index + 1;
}
if (((struct sockaddr *)addr)->sa_family == AF_UNIX) {
unlink_if_unix_domain_socket(addr);
}
Expand Down Expand Up @@ -462,11 +482,13 @@ grpc_tcp_listener *grpc_tcp_server_add_port(grpc_tcp_server *s,
addr = (struct sockaddr *)&wild6;
addr_len = sizeof(wild6);
fd = grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, &dsmode);
sp = add_socket_to_server(s, fd, addr, addr_len);
sp = add_socket_to_server(s, fd, addr, addr_len, port_index, fd_index);
if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) {
goto done;
}

if (sp != NULL) {
++fd_index;
}
/* If we didn't get a dualstack socket, also listen on 0.0.0.0. */
if (port == 0 && sp != NULL) {
grpc_sockaddr_set_port((struct sockaddr *)&wild4, sp->port);
Expand All @@ -485,20 +507,47 @@ grpc_tcp_listener *grpc_tcp_server_add_port(grpc_tcp_server *s,
addr = (struct sockaddr *)&addr4_copy;
addr_len = sizeof(addr4_copy);
}
sp = add_socket_to_server(s, fd, addr, addr_len);
if (sp != NULL) sp->sibling = sp2;
if (sp2 != NULL) sp2->is_sibling = 1;
sp = add_socket_to_server(s, fd, addr, addr_len, port_index, fd_index);
if (sp2 != NULL) {
if (sp != NULL) {
sp->sibling = sp2;
}
sp2->is_sibling = 1;
}

done:
gpr_free(allocated_addr);
return sp;
if (sp != NULL) {
return sp->port;
} else {
return -1;
}
}

unsigned grpc_tcp_server_fds_for_port(grpc_tcp_server *s, unsigned port_index) {
unsigned num_fds = 0;
grpc_tcp_listener *sp;
for (sp = s->head; sp && port_index != 0; sp = sp->next) {
if (!sp->is_sibling) {
--port_index;
}
}
for (; sp; sp = sp->sibling, ++num_fds)
;
return num_fds;
}

int grpc_tcp_server_get_fd(grpc_tcp_server *s, unsigned port_index) {
int grpc_tcp_server_get_fd(grpc_tcp_server *s, unsigned port_index,
unsigned fd_index) {
grpc_tcp_listener *sp;
for (sp = s->head; sp && port_index != 0; sp = sp->next, port_index--)
for (sp = s->head; sp && port_index != 0; sp = sp->next) {
if (!sp->is_sibling) {
--port_index;
}
}
for (; sp && fd_index != 0; sp = sp->sibling, --fd_index)
;
if (port_index == 0 && sp) {
if (sp) {
return sp->fd;
} else {
return -1;
Expand Down Expand Up @@ -531,31 +580,25 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s,
gpr_mu_unlock(&s->mu);
}

int grpc_tcp_listener_get_port(grpc_tcp_listener *listener) {
if (listener != NULL) {
grpc_tcp_listener *sp = listener;
return sp->port;
} else {
return 0;
}
grpc_tcp_server *grpc_tcp_server_ref(grpc_tcp_server *s) {
gpr_ref(&s->refs);
return s;
}

void grpc_tcp_listener_ref(grpc_tcp_listener *listener) {
grpc_tcp_listener *sp = listener;
gpr_ref(&sp->refs);
void grpc_tcp_server_set_shutdown_complete(grpc_tcp_server *s,
grpc_closure *shutdown_complete) {
s->shutdown_complete = shutdown_complete;
}

void grpc_tcp_listener_unref(grpc_tcp_listener *listener) {
grpc_tcp_listener *sp = listener;
if (sp->is_sibling) return;
if (gpr_unref(&sp->refs)) {
grpc_tcp_listener *sibling = sp->sibling;
while (sibling) {
sp = sibling;
sibling = sp->sibling;
gpr_free(sp);
void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
if (gpr_unref(&s->refs)) {
if (exec_ctx == NULL) {
grpc_exec_ctx local_exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_tcp_server_destroy(&local_exec_ctx, s);
grpc_exec_ctx_finish(&local_exec_ctx);
} else {
grpc_tcp_server_destroy(exec_ctx, s);
}
gpr_free(listener);
}
}

Expand Down
Loading