Skip to content

Commit

Permalink
Merge pull request grpc#96 from dklempner/pollset_kick
Browse files Browse the repository at this point in the history
Factor out the pollset kicking mechanism and eliminate sharding
  • Loading branch information
ctiller committed Jan 17, 2015
2 parents beddbda + 7f3ed1e commit 55c3b27
Show file tree
Hide file tree
Showing 11 changed files with 479 additions and 71 deletions.
53 changes: 52 additions & 1 deletion Makefile

Large diffs are not rendered by default.

16 changes: 16 additions & 0 deletions build.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
"src/core/iomgr/fd_posix.c",
"src/core/iomgr/iomgr.c",
"src/core/iomgr/iomgr_posix.c",
"src/core/iomgr/pollset_kick_posix.c",
"src/core/iomgr/pollset_multipoller_with_poll_posix.c",
"src/core/iomgr/pollset_posix.c",
"src/core/iomgr/resolve_address_posix.c",
Expand Down Expand Up @@ -126,6 +127,8 @@
"src/core/iomgr/iomgr.h",
"src/core/iomgr/iomgr_internal.h",
"src/core/iomgr/iomgr_posix.h",
"src/core/iomgr/pollset_kick.h",
"src/core/iomgr/pollset_kick_posix.h",
"src/core/iomgr/pollset.h",
"src/core/iomgr/pollset_posix.h",
"src/core/iomgr/resolve_address.h",
Expand Down Expand Up @@ -1497,6 +1500,19 @@
"gpr_test_util",
"gpr"
]
},
{
"name": "poll_kick_test",
"build": "test",
"src": [
"test/core/iomgr/poll_kick_test.c"
],
"deps": [
"grpc_test_util",
"grpc",
"gpr_test_util",
"gpr"
]
}
]
}
68 changes: 68 additions & 0 deletions src/core/iomgr/pollset_kick.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/

#ifndef __GRPC_INTERNAL_IOMGR_POLLSET_KICK_H_
#define __GRPC_INTERNAL_IOMGR_POLLSET_KICK_H_

#include <grpc/support/port_platform.h>

/* This is an abstraction around the typical pipe mechanism for waking up a
thread sitting in a poll() style call. */

#ifdef GPR_POSIX_SOCKET
#include "src/core/iomgr/pollset_kick_posix.h"
#else
#error "No pollset kick support on platform"
#endif

void grpc_pollset_kick_global_init(void);
void grpc_pollset_kick_global_destroy(void);

void grpc_pollset_kick_init(grpc_pollset_kick_state *kick_state);
void grpc_pollset_kick_destroy(grpc_pollset_kick_state *kick_state);

/* Must be called before entering poll(). If return value is -1, this consumed
an existing kick. Otherwise the return value is an FD to add to the poll set.
*/
int grpc_pollset_kick_pre_poll(grpc_pollset_kick_state *kick_state);

/* Consume an existing kick. Must be called after poll returns that the fd was
readable, and before calling kick_post_poll. */
void grpc_pollset_kick_consume(grpc_pollset_kick_state *kick_state);

/* Must be called after pre_poll, and after consume if applicable */
void grpc_pollset_kick_post_poll(grpc_pollset_kick_state *kick_state);

void grpc_pollset_kick_kick(grpc_pollset_kick_state *kick_state);

#endif /* __GRPC_INTERNAL_IOMGR_POLLSET_KICK_H_ */
161 changes: 161 additions & 0 deletions src/core/iomgr/pollset_kick_posix.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/

#include "src/core/iomgr/pollset_kick_posix.h"

#include <errno.h>
#include <string.h>
#include <unistd.h>

#include "src/core/iomgr/socket_utils_posix.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>

/* This implementation is based on a freelist of pipes. */

typedef struct grpc_kick_pipe_info {
int pipe_read_fd;
int pipe_write_fd;
struct grpc_kick_pipe_info *next;
} grpc_kick_pipe_info;

static grpc_kick_pipe_info *pipe_freelist = NULL;
static gpr_mu pipe_freelist_mu;

static grpc_kick_pipe_info *allocate_pipe() {
grpc_kick_pipe_info *info;
gpr_mu_lock(&pipe_freelist_mu);
if (pipe_freelist != NULL) {
info = pipe_freelist;
pipe_freelist = pipe_freelist->next;
} else {
int pipefd[2];
/* TODO(klempner): Make this nonfatal */
GPR_ASSERT(0 == pipe(pipefd));
GPR_ASSERT(grpc_set_socket_nonblocking(pipefd[0], 1));
GPR_ASSERT(grpc_set_socket_nonblocking(pipefd[1], 1));
info = gpr_malloc(sizeof(*info));
info->pipe_read_fd = pipefd[0];
info->pipe_write_fd = pipefd[1];
info->next = NULL;
}
gpr_mu_unlock(&pipe_freelist_mu);
return info;
}

static void free_pipe(grpc_kick_pipe_info *pipe_info) {
/* TODO(klempner): Start closing pipes if the free list gets too large */
gpr_mu_lock(&pipe_freelist_mu);
pipe_info->next = pipe_freelist;
pipe_freelist = pipe_info;
gpr_mu_unlock(&pipe_freelist_mu);
}

void grpc_pollset_kick_global_init() {
pipe_freelist = NULL;
gpr_mu_init(&pipe_freelist_mu);
}

void grpc_pollset_kick_global_destroy() {
while (pipe_freelist != NULL) {
grpc_kick_pipe_info *current = pipe_freelist;
pipe_freelist = pipe_freelist->next;
close(current->pipe_read_fd);
close(current->pipe_write_fd);
gpr_free(current);
}
gpr_mu_destroy(&pipe_freelist_mu);
}

void grpc_pollset_kick_init(grpc_pollset_kick_state *kick_state) {
gpr_mu_init(&kick_state->mu);
kick_state->kicked = 0;
kick_state->pipe_info = NULL;
}

void grpc_pollset_kick_destroy(grpc_pollset_kick_state *kick_state) {
gpr_mu_destroy(&kick_state->mu);
GPR_ASSERT(kick_state->pipe_info == NULL);
}

int grpc_pollset_kick_pre_poll(grpc_pollset_kick_state *kick_state) {
gpr_mu_lock(&kick_state->mu);
if (kick_state->kicked) {
kick_state->kicked = 0;
gpr_mu_unlock(&kick_state->mu);
return -1;
}
kick_state->pipe_info = allocate_pipe();
gpr_mu_unlock(&kick_state->mu);
return kick_state->pipe_info->pipe_read_fd;
}

void grpc_pollset_kick_consume(grpc_pollset_kick_state *kick_state) {
char buf[128];
int r;

for (;;) {
r = read(kick_state->pipe_info->pipe_read_fd, buf, sizeof(buf));
if (r > 0) continue;
if (r == 0) return;
switch (errno) {
case EAGAIN:
return;
case EINTR:
continue;
default:
gpr_log(GPR_ERROR, "error reading pipe: %s", strerror(errno));
return;
}
}
}

void grpc_pollset_kick_post_poll(grpc_pollset_kick_state *kick_state) {
gpr_mu_lock(&kick_state->mu);
free_pipe(kick_state->pipe_info);
kick_state->pipe_info = NULL;
gpr_mu_unlock(&kick_state->mu);
}

void grpc_pollset_kick_kick(grpc_pollset_kick_state *kick_state) {
gpr_mu_lock(&kick_state->mu);
if (kick_state->pipe_info != NULL) {
char c = 0;
while (write(kick_state->pipe_info->pipe_write_fd, &c, 1) != 1 &&
errno == EINTR)
;
} else {
kick_state->kicked = 1;
}
gpr_mu_unlock(&kick_state->mu);
}
47 changes: 47 additions & 0 deletions src/core/iomgr/pollset_kick_posix.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/

#ifndef __GRPC_INTERNAL_IOMGR_POLLSET_KICK_POSIX_H_
#define __GRPC_INTERNAL_IOMGR_POLLSET_KICK_POSIX_H_

#include <grpc/support/sync.h>

struct grpc_kick_pipe_info;

typedef struct grpc_pollset_kick_state {
gpr_mu mu;
int kicked;
struct grpc_kick_pipe_info *pipe_info;
} grpc_pollset_kick_state;

#endif /* __GRPC_INTERNAL_IOMGR_POLLSET_KICK_POSIX_H_ */
9 changes: 7 additions & 2 deletions src/core/iomgr/pollset_multipoller_with_poll_posix.c
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,11 @@ static int multipoll_with_poll_pollset_maybe_work(
}
nf = 0;
np = 1;
h->pfds[0].fd = grpc_kick_read_fd(pollset);
h->pfds[0].fd = grpc_pollset_kick_pre_poll(&pollset->kick_state);
if (h->pfds[0].fd < 0) {
/* Already kicked */
return 1;
}
h->pfds[0].events = POLLIN;
h->pfds[0].revents = POLLOUT;
for (i = 0; i < h->fd_count; i++) {
Expand Down Expand Up @@ -173,7 +177,7 @@ static int multipoll_with_poll_pollset_maybe_work(
/* do nothing */
} else {
if (h->pfds[0].revents & POLLIN) {
grpc_kick_drain(pollset);
grpc_pollset_kick_consume(&pollset->kick_state);
}
for (i = 1; i < np; i++) {
if (h->pfds[i].revents & POLLIN) {
Expand All @@ -184,6 +188,7 @@ static int multipoll_with_poll_pollset_maybe_work(
}
}
}
grpc_pollset_kick_post_poll(&pollset->kick_state);
end_polling(pollset);

gpr_mu_lock(&pollset->mu);
Expand Down
Loading

0 comments on commit 55c3b27

Please sign in to comment.