Skip to content

Commit

Permalink
Adding joinable threads, and gpr_thd_join.
Browse files Browse the repository at this point in the history
  • Loading branch information
Nicolas Noble authored and nicolasnoble committed Apr 3, 2015
1 parent 300ebc4 commit 91647cc
Show file tree
Hide file tree
Showing 12 changed files with 170 additions and 28 deletions.
1 change: 1 addition & 0 deletions BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ cc_library(
"src/core/support/sync.c",
"src/core/support/sync_posix.c",
"src/core/support/sync_win32.c",
"src/core/support/thd.c",
"src/core/support/thd_posix.c",
"src/core/support/thd_win32.c",
"src/core/support/time.c",
Expand Down
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2334,6 +2334,7 @@ LIBGPR_SRC = \
src/core/support/sync.c \
src/core/support/sync_posix.c \
src/core/support/sync_win32.c \
src/core/support/thd.c \
src/core/support/thd_posix.c \
src/core/support/thd_win32.c \
src/core/support/time.c \
Expand Down Expand Up @@ -2428,6 +2429,7 @@ $(OBJDIR)/$(CONFIG)/src/core/support/string_win32.o:
$(OBJDIR)/$(CONFIG)/src/core/support/sync.o:
$(OBJDIR)/$(CONFIG)/src/core/support/sync_posix.o:
$(OBJDIR)/$(CONFIG)/src/core/support/sync_win32.o:
$(OBJDIR)/$(CONFIG)/src/core/support/thd.o:
$(OBJDIR)/$(CONFIG)/src/core/support/thd_posix.o:
$(OBJDIR)/$(CONFIG)/src/core/support/thd_win32.o:
$(OBJDIR)/$(CONFIG)/src/core/support/time.o:
Expand Down
1 change: 1 addition & 0 deletions build.json
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,7 @@
"src/core/support/sync.c",
"src/core/support/sync_posix.c",
"src/core/support/sync_win32.c",
"src/core/support/thd.c",
"src/core/support/thd_posix.c",
"src/core/support/thd_win32.c",
"src/core/support/time.c",
Expand Down
19 changes: 17 additions & 2 deletions include/grpc/support/thd.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,8 @@ typedef gpr_uint64 gpr_thd_id;

/* Thread creation options. */
typedef struct {
int flags; /* Flags below can be set here. Default value 0. */
int flags; /* Opaque field. Get and set with accessors below. */
} gpr_thd_options;
/* No flags are currently defined. */

/* Create a new thread running (*thd_body)(arg) and place its thread identifier
in *t, and return true. If there are insufficient resources, return false.
Expand All @@ -66,9 +65,25 @@ int gpr_thd_new(gpr_thd_id *t, void (*thd_body)(void *arg), void *arg,
/* Return a gpr_thd_options struct with all fields set to defaults. */
gpr_thd_options gpr_thd_options_default(void);

/* Set the thread to become detached on startup - this is the default. */
void gpr_thd_options_set_detached(gpr_thd_options *options);

/* Set the thread to become joinable - mutually exclusive with detached. */
void gpr_thd_options_set_joinable(gpr_thd_options *options);

/* Returns non-zero if the option detached is set. */
int gpr_thd_options_is_detached(const gpr_thd_options *options);

/* Returns non-zero if the option joinable is set. */
int gpr_thd_options_is_joinable(const gpr_thd_options *options);

/* Returns the identifier of the current thread. */
gpr_thd_id gpr_thd_currentid(void);

/* Blocks until the specified thread properly terminates.
Calling this on a detached thread has unpredictable results. */
void gpr_thd_join(gpr_thd_id t);

#ifdef __cplusplus
}
#endif
Expand Down
66 changes: 66 additions & 0 deletions src/core/support/thd.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/

/* Posix implementation for gpr threads. */

#include <memory.h>

#include <grpc/support/thd.h>

enum {
GPR_THD_JOINABLE = 1
};

gpr_thd_options gpr_thd_options_default(void) {
gpr_thd_options options;
memset(&options, 0, sizeof(options));
return options;
}

void gpr_thd_options_set_detached(gpr_thd_options *options) {
options->flags &= ~GPR_THD_JOINABLE;
}

void gpr_thd_options_set_joinable(gpr_thd_options *options) {
options->flags |= GPR_THD_JOINABLE;
}

int gpr_thd_options_is_detached(const gpr_thd_options *options) {
if (!options) return 1;
return (options->flags & GPR_THD_JOINABLE) == 0;
}

int gpr_thd_options_is_joinable(const gpr_thd_options *options) {
if (!options) return 0;
return (options->flags & GPR_THD_JOINABLE) == GPR_THD_JOINABLE;
}
16 changes: 9 additions & 7 deletions src/core/support/thd_posix.c
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,11 @@ int gpr_thd_new(gpr_thd_id *t, void (*thd_body)(void *arg), void *arg,
a->arg = arg;

GPR_ASSERT(pthread_attr_init(&attr) == 0);
GPR_ASSERT(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED) == 0);
if (gpr_thd_options_is_detached(options)) {
GPR_ASSERT(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED) == 0);
} else {
GPR_ASSERT(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE) == 0);
}
thread_started = (pthread_create(&p, &attr, &thread_body, a) == 0);
GPR_ASSERT(pthread_attr_destroy(&attr) == 0);
if (!thread_started) {
Expand All @@ -78,14 +82,12 @@ int gpr_thd_new(gpr_thd_id *t, void (*thd_body)(void *arg), void *arg,
return thread_started;
}

gpr_thd_options gpr_thd_options_default(void) {
gpr_thd_options options;
memset(&options, 0, sizeof(options));
return options;
}

gpr_thd_id gpr_thd_currentid(void) {
return (gpr_thd_id)pthread_self();
}

void gpr_thd_join(gpr_thd_id t) {
pthread_join(t, NULL);
}

#endif /* GPR_POSIX_SYNC */
72 changes: 53 additions & 19 deletions src/core/support/thd_win32.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
*
*/

/* Posix implementation for gpr threads. */
/* Windows implementation for gpr threads. */

#include <grpc/support/port_platform.h>

Expand All @@ -40,47 +40,81 @@
#include <windows.h>
#include <string.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/thd.h>

struct thd_arg {
#if defined(_MSC_VER)
#define thread_local __declspec(thread)
#elif defined(__GNUC__)
#define thread_local __thread
#else
#error "Unknown compiler - please file a bug report"
#endif

struct thd_info {
void (*body)(void *arg); /* body of a thread */
void *arg; /* argument to a thread */
HANDLE join_event; /* if joinable, the join event */
int joinable; /* true if not detached */
};

static thread_local struct thd_info *g_thd_info;

/* Destroys a thread info */
static destroy_thread(struct thd_info *t) {
if (t->joinable) CloseHandle(t->join_event);
gpr_free(t);
}

/* Body of every thread started via gpr_thd_new. */
static DWORD WINAPI thread_body(void *v) {
struct thd_arg a = *(struct thd_arg *)v;
gpr_free(v);
(*a.body)(a.arg);
g_thd_info = (struct thd_info *)v;
g_thd_info->body(g_thd_info->arg);
if (g_thd_info->joinable) {
BOOL ret = SetEvent(g_thd_info->join_event);
GPR_ASSERT(ret);
} else {
destroy_thread(g_thd_info);
}
return 0;
}

int gpr_thd_new(gpr_thd_id *t, void (*thd_body)(void *arg), void *arg,
const gpr_thd_options *options) {
HANDLE handle;
DWORD thread_id;
struct thd_arg *a = gpr_malloc(sizeof(*a));
a->body = thd_body;
a->arg = arg;
struct thd_info *info = gpr_malloc(sizeof(*info));
info->body = thd_body;
info->arg = arg;
*t = 0;
handle = CreateThread(NULL, 64 * 1024, thread_body, a, 0, &thread_id);
if (gpr_thd_options_is_joinable(options)) {
info->joinable = 1;
info->join_event = CreateEvent(NULL, FALSE, FALSE, NULL);
if (info->join_event == NULL) {
gpr_free(info);
return 0;
}
} else {
info->joinable = 0;
}
handle = CreateThread(NULL, 64 * 1024, thread_body, info, 0, NULL);
if (handle == NULL) {
gpr_free(a);
destroy_thread(info);
} else {
CloseHandle(handle); /* threads are "detached" */
*t = (gpr_thd_id)info;
CloseHandle(handle);
}
*t = (gpr_thd_id)thread_id;
return handle != NULL;
}

gpr_thd_options gpr_thd_options_default(void) {
gpr_thd_options options;
memset(&options, 0, sizeof(options));
return options;
gpr_thd_id gpr_thd_currentid(void) {
return (gpr_thd_id)g_thd_info;
}

gpr_thd_id gpr_thd_currentid(void) {
return (gpr_thd_id)GetCurrentThreadId();
void gpr_thd_join(gpr_thd_id t) {
struct thd_info *info = (struct thd_info *)t;
DWORD ret = WaitForSingleObject(info->join_event, INFINITE);
GPR_ASSERT(ret == WAIT_OBJECT_0);
destroy_thread(info);
}

#endif /* GPR_WIN32 */
11 changes: 11 additions & 0 deletions test/core/support/thd_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,16 @@ static void thd_body(void *v) {
gpr_mu_unlock(&t->mu);
}

static void thd_body_joinable(void *v) { }

/* Test that we can create a number of threads and wait for them. */
static void test(void) {
int i;
gpr_thd_id thd;
gpr_thd_id thds[1000];
struct test t;
int n = 1000;
gpr_thd_options options = gpr_thd_options_default();
gpr_mu_init(&t.mu);
gpr_cv_init(&t.done_cv);
t.n = n;
Expand All @@ -79,6 +83,13 @@ static void test(void) {
}
gpr_mu_unlock(&t.mu);
GPR_ASSERT(t.n == 0);
gpr_thd_options_set_joinable(&options);
for (i = 0; i < n; i++) {
GPR_ASSERT(gpr_thd_new(&thds[i], &thd_body_joinable, NULL, &options));
}
for (i = 0; i < n; i++) {
gpr_thd_join(thds[i]);
}
}

/* ------------------------------------------------- */
Expand Down
2 changes: 2 additions & 0 deletions vsprojects/vs2010/gpr.vcxproj
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@
</ClCompile>
<ClCompile Include="..\..\src\core\support\sync_win32.c">
</ClCompile>
<ClCompile Include="..\..\src\core\support\thd.c">
</ClCompile>
<ClCompile Include="..\..\src\core\support\thd_posix.c">
</ClCompile>
<ClCompile Include="..\..\src\core\support\thd_win32.c">
Expand Down
3 changes: 3 additions & 0 deletions vsprojects/vs2010/gpr.vcxproj.filters
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@
<ClCompile Include="..\..\src\core\support\sync_win32.c">
<Filter>src\core\support</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\support\thd.c">
<Filter>src\core\support</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\support\thd_posix.c">
<Filter>src\core\support</Filter>
</ClCompile>
Expand Down
2 changes: 2 additions & 0 deletions vsprojects/vs2013/gpr.vcxproj
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@
</ClCompile>
<ClCompile Include="..\..\src\core\support\sync_win32.c">
</ClCompile>
<ClCompile Include="..\..\src\core\support\thd.c">
</ClCompile>
<ClCompile Include="..\..\src\core\support\thd_posix.c">
</ClCompile>
<ClCompile Include="..\..\src\core\support\thd_win32.c">
Expand Down
3 changes: 3 additions & 0 deletions vsprojects/vs2013/gpr.vcxproj.filters
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@
<ClCompile Include="..\..\src\core\support\sync_win32.c">
<Filter>src\core\support</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\support\thd.c">
<Filter>src\core\support</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\support\thd_posix.c">
<Filter>src\core\support</Filter>
</ClCompile>
Expand Down

0 comments on commit 91647cc

Please sign in to comment.