Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Making HPC-GAP work with Julia #3734

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Prev Previous commit
Next Next commit
Add create/join thread functionality to HPCGAP w/ Julia.
  • Loading branch information
rbehrends committed Feb 2, 2020
commit c511c8bd3f18c39caa0e35df308d571d7aad0e4f
79 changes: 77 additions & 2 deletions src/hpc/thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
# endif
# include <gc/gc.h>
#elif USE_JULIA_GC
# include <julia.h>
# ifdef HPCGAP
# include "julia_gc.h"
# endif
Expand All @@ -61,6 +62,9 @@ typedef struct ThreadData {
void * arg;
Obj thread_object;
Obj region_name;
#ifdef USE_JULIA_GC
jl_task_t * julia_task;
#endif
struct ThreadData * next;
} ThreadData;

Expand Down Expand Up @@ -255,14 +259,13 @@ static void RemoveGCRoots(void)
}
#endif /* DISABLE_GC */

#if !defined(USE_NATIVE_TLS) && !defined(USE_PTHREAD_TLS)

#ifdef HAVE_FUNC_ATTRIBUTE_NOINLINE
#define NOINLINE __attribute__((noinline))
#else
#define NOINLINE
#endif

#if !defined(USE_NATIVE_TLS) && !defined(USE_PTHREAD_TLS)

/* In order to safely use thread-local memory on the main stack, we have
* to work around an idiosyncracy in some virtual memory systems. These
Expand Down Expand Up @@ -457,6 +460,60 @@ static void * DispatchThread(void * arg)
return 0;
}

#ifdef USE_JULIA_GC
void gap_julia_thread_callback(int thread_num)
{
ThreadData *thread = thread_data + thread_num;
DispatchThread(thread);
}

void JL_DLLEXPORT wait_julia_task(jl_task_t * task)
{
static jl_function_t * wait_func = NULL;
if (!wait_func) {
wait_func =
(jl_function_t *)jl_get_global(jl_base_module, jl_symbol("wait"));
}
size_t last_age = jl_get_ptls_states()->world_age;
jl_get_ptls_states()->world_age = jl_get_world_counter();
jl_value_t * args[] = { (jl_value_t *)wait_func, (jl_value_t *)task };
jl_apply(args, 2);
jl_get_ptls_states()->world_age = last_age;
}


void schedule_julia_task(jl_task_t * task)
{
static jl_function_t * sched_func = NULL;
if (!sched_func) {
sched_func = (jl_function_t *)jl_get_global(jl_base_module,
jl_symbol("schedule"));
}
size_t last_age = jl_get_ptls_states()->world_age;
jl_get_ptls_states()->world_age = jl_get_world_counter();
jl_value_t * args[] = { (jl_value_t *)sched_func, (jl_value_t *)task };
jl_apply(args, 2);
jl_get_ptls_states()->world_age = last_age;
}

int run_julia_task(ThreadData *thread)
{
void jl_set_task_tid(jl_task_t *task, int tid);
char jeval_buffer[8192];
int thread_num = thread - thread_data;
sprintf(jeval_buffer,
"function() ccall(:gap_julia_thread_callback, "
"Cvoid, (Cint,), %d) "
"end", thread_num);
jl_value_t * task_func = jl_eval_string(jeval_buffer);
jl_task_t * task = jl_new_task(task_func, jl_nothing, 0);
thread->julia_task = task;
jl_set_task_tid(task, thread_num);
schedule_julia_task(task);
return 1;
}
#endif

Obj RunThread(void (*start)(void *), void * arg)
{
ThreadData * result;
Expand Down Expand Up @@ -498,17 +555,25 @@ Obj RunThread(void (*start)(void *), void * arg)
}
result->thread_object = NewThreadObject(result - thread_data);
/* set up the thread attribute to support a custom stack in our TLS */
#ifdef USE_JULIA_GC
run_julia_task(result);
#else
pthread_attr_init(&thread_attr);
#if !defined(USE_NATIVE_TLS) && !defined(USE_PTHREAD_TLS)
size_t pagesize = getpagesize();
pthread_attr_setstack(&thread_attr, (char *)tls + pagesize * 2,
TLS_SIZE - pagesize * 2);
#endif
#endif
UnlockThreadControl();
/* fork the thread */
IncThreadCounter();
#ifdef USE_JULIA_GC
if (pthread_create(&result->pthread_id, &thread_attr, DispatchThread,
result) < 0) {
#else
if (run_julia_task(result)) {
#endif
/* No more threads available */
DecThreadCounter();
LockThreadControl(1);
Expand All @@ -521,21 +586,27 @@ Obj RunThread(void (*start)(void *), void * arg)
#endif
return (Obj)0;
}
#ifndef USE_JULIA_GC
pthread_attr_destroy(&thread_attr);
#endif
return result->thread_object;
}

int JoinThread(int id)
{
#ifndef USE_JULIA_GC
pthread_t pthread_id;
#endif
void (*start)(void *);
#ifndef USE_NATIVE_TLS
void * tls;
#endif
if (id < 0 || id >= MAX_THREADS)
return 0;
LockThreadControl(1);
#ifndef USE_JULIA_GC
pthread_id = thread_data[id].pthread_id;
#endif
start = thread_data[id].start;
#ifndef USE_NATIVE_TLS
tls = thread_data[id].tls;
Expand All @@ -546,7 +617,11 @@ int JoinThread(int id)
}
thread_data[id].joined = 1;
UnlockThreadControl();
#ifdef USE_JULIA_GC
wait_julia_task(thread_data[id].julia_task);
#else
pthread_join(pthread_id, NULL);
#endif
LockThreadControl(1);
thread_data[id].next = thread_free_list;
thread_free_list = thread_data + id;
Expand Down
5 changes: 0 additions & 5 deletions src/hpc/threadapi.c
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,6 @@ static void ThreadedInterpreter(void * funcargs)

static Obj FuncCreateThread(Obj self, Obj funcargs)
{
#ifndef USE_JULIA_GC
Int i, n;
Obj thread;
Obj templist;
Expand All @@ -503,10 +502,6 @@ static Obj FuncCreateThread(Obj self, Obj funcargs)
if (!thread)
return Fail;
return thread;
#else
ErrorMayQuit("CreateThread: disabled with Julia GC", 0, 0);
return 0; // flow control hint
#endif
}

/****************************************************************************
Expand Down
4 changes: 4 additions & 0 deletions src/julia_gc.c
Original file line number Diff line number Diff line change
Expand Up @@ -785,6 +785,10 @@ void InitBags(UInt initial_size, Bag * stack_bottom, UInt stack_align)
#endif
max_pool_obj_size = jl_gc_max_internal_obj_size();
jl_gc_enable_conservative_gc_support();
#ifdef HPCGAP
if (!getenv("JULIA_NUM_THREADS"))
setenv("JULIA_NUM_THREADS", "16", 0); // TODO: make configurable
#endif
jl_init();
jl_gc_enable(0);

Expand Down