Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

import partr code, allow using it in threaded loops #31398

Merged
merged 2 commits into from
Mar 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 33 additions & 26 deletions base/task.jl
Original file line number Diff line number Diff line change
Expand Up @@ -444,12 +444,17 @@ end

function enq_work(t::Task)
(t.state == :runnable && t.queue === nothing) || error("schedule: Task not runnable")
tid = (t.sticky ? Threads.threadid(t) : 0)
if tid == 0
tid = Threads.threadid()
if t.sticky
tid = Threads.threadid(t)
if tid == 0
tid = Threads.threadid()
end
push!(Workqueues[tid], t)
else
tid = 0
ccall(:jl_enqueue_task, Cvoid, (Any,), t)
end
push!(Workqueues[tid], t)
tid == 1 && ccall(:uv_stop, Cvoid, (Ptr{Cvoid},), eventloop())
ccall(:jl_wakeup_thread, Cvoid, (Int16,), (tid - 1) % Int16)
return t
end

Expand Down Expand Up @@ -603,30 +608,32 @@ function trypoptask(W::StickyWorkqueue)
end

@noinline function poptaskref(W::StickyWorkqueue)
local task
while true
task = trypoptask(W)
task === nothing || break
if !Threads.in_threaded_loop[] && Threads.threadid() == 1
if process_events(true) == 0
task = trypoptask(W)
task === nothing || break
# if there are no active handles and no runnable tasks, just
# wait for signals.
pause()
end
else
if Threads.threadid() == 1
process_events(false)
end
ccall(:jl_gc_safepoint, Cvoid, ())
ccall(:jl_cpu_pause, Cvoid, ())
end
end
gettask = () -> trypoptask(W)
task = ccall(:jl_task_get_next, Any, (Any,), gettask)
## Below is a reference implementation for `jl_task_get_next`, which currently lives in C
#local task
#while true
# task = trypoptask(W)
# task === nothing || break
# if !Threads.in_threaded_loop[] && Threads.threadid() == 1
# if process_events(true) == 0
# task = trypoptask(W)
# task === nothing || break
# # if there are no active handles and no runnable tasks, just
# # wait for signals.
# pause()
# end
# else
# if Threads.threadid() == 1
# process_events(false)
# end
# ccall(:jl_gc_safepoint, Cvoid, ())
# ccall(:jl_cpu_pause, Cvoid, ())
# end
#end
return Ref(task)
end


function wait()
W = Workqueues[Threads.threadid()]
reftask = poptaskref(W)
Expand Down
4 changes: 2 additions & 2 deletions src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ SRCS := \
jltypes gf typemap ast builtins module interpreter symbol \
dlload sys init task array dump staticdata toplevel jl_uv datatype \
simplevector APInt-C runtime_intrinsics runtime_ccall precompile \
threadgroup threading stackwalk gc gc-debug gc-pages gc-stacks method \
threading partr stackwalk gc gc-debug gc-pages gc-stacks method \
jlapi signal-handling safepoint jloptions timing subtype rtutils \
crc32c processor

Expand Down Expand Up @@ -215,7 +215,7 @@ $(BUILDDIR)/gc-debug.o $(BUILDDIR)/gc-debug.dbg.obj: $(SRCDIR)/gc.h
$(BUILDDIR)/gc-pages.o $(BUILDDIR)/gc-pages.dbg.obj: $(SRCDIR)/gc.h
$(BUILDDIR)/signal-handling.o $(BUILDDIR)/signal-handling.dbg.obj: $(addprefix $(SRCDIR)/,signals-*.c)
$(BUILDDIR)/dump.o $(BUILDDIR)/dump.dbg.obj: $(addprefix $(SRCDIR)/,common_symbols1.inc common_symbols2.inc)
$(addprefix $(BUILDDIR)/,threading.o threading.dbg.obj gc.o gc.dbg.obj init.c init.dbg.obj task.o task.dbg.obj): $(addprefix $(SRCDIR)/,threading.h threadgroup.h)
$(addprefix $(BUILDDIR)/,threading.o threading.dbg.obj gc.o gc.dbg.obj init.c init.dbg.obj task.o task.dbg.obj): $(addprefix $(SRCDIR)/,threading.h)
$(addprefix $(BUILDDIR)/,APInt-C.o APInt-C.dbg.obj runtime_intrinsics.o runtime_intrinsics.dbg.obj): $(SRCDIR)/APInt-C.h

# archive library file rules
Expand Down
6 changes: 6 additions & 0 deletions src/atomics.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,12 @@
// the __atomic builtins or c11 atomics with GNU extension or c11 _Generic
# define jl_atomic_compare_exchange(obj, expected, desired) \
__sync_val_compare_and_swap(obj, expected, desired)
# define jl_atomic_bool_compare_exchange(obj, expected, desired) \
__sync_bool_compare_and_swap(obj, expected, desired)
# define jl_atomic_exchange(obj, desired) \
__atomic_exchange_n(obj, desired, __ATOMIC_SEQ_CST)
# define jl_atomic_exchange_generic(obj, desired, orig)\
__atomic_exchange(obj, desired, orig, __ATOMIC_SEQ_CST)
# define jl_atomic_exchange_relaxed(obj, desired) \
__atomic_exchange_n(obj, desired, __ATOMIC_RELAXED)
// TODO: Maybe add jl_atomic_compare_exchange_weak for spin lock
Expand Down Expand Up @@ -115,6 +119,7 @@ jl_atomic_fetch_add(T *obj, T2 arg)
{
return (T)_InterlockedExchangeAdd64((volatile __int64*)obj, (__int64)arg);
}
// TODO: jl_atomic_exchange_generic
#define jl_atomic_fetch_add_relaxed(obj, arg) jl_atomic_fetch_add(obj, arg)

// and
Expand Down Expand Up @@ -200,6 +205,7 @@ jl_atomic_compare_exchange(volatile T *obj, T2 expected, T3 desired)
return (T)_InterlockedCompareExchange64((volatile __int64*)obj,
(__int64)desired, (__int64)expected);
}
// TODO: jl_atomic_bool_compare_exchange
// atomic exchange
template<typename T, typename T2>
static inline typename std::enable_if<sizeof(T) == 1, T>::type
Expand Down
10 changes: 10 additions & 0 deletions src/gc.c
Original file line number Diff line number Diff line change
Expand Up @@ -1634,6 +1634,11 @@ STATIC_INLINE int gc_mark_queue_obj(jl_gc_mark_cache_t *gc_cache, jl_gc_mark_sp_
return (int)nptr;
}

int jl_gc_mark_queue_obj_explicit(jl_gc_mark_cache_t *gc_cache, jl_gc_mark_sp_t *sp, jl_value_t *obj)
{
return gc_mark_queue_obj(gc_cache, sp, obj);
}

JL_DLLEXPORT int jl_gc_mark_queue_obj(jl_ptls_t ptls, jl_value_t *obj)
{
return gc_mark_queue_obj(&ptls->gc_cache, &ptls->gc_mark_sp, obj);
Expand Down Expand Up @@ -2483,12 +2488,17 @@ static void jl_gc_queue_thread_local(jl_gc_mark_cache_t *gc_cache, jl_gc_mark_sp
gc_mark_queue_obj(gc_cache, sp, ptls2->previous_exception);
}

void jl_gc_mark_enqueued_tasks(jl_gc_mark_cache_t *gc_cache, jl_gc_mark_sp_t *sp);

// mark the initial root set
static void mark_roots(jl_gc_mark_cache_t *gc_cache, jl_gc_mark_sp_t *sp)
{
// modules
gc_mark_queue_obj(gc_cache, sp, jl_main_module);

// tasks
jl_gc_mark_enqueued_tasks(gc_cache, sp);

// invisible builtin values
if (jl_an_empty_vec_any != NULL)
gc_mark_queue_obj(gc_cache, sp, jl_an_empty_vec_any);
Expand Down
4 changes: 4 additions & 0 deletions src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -825,6 +825,10 @@ void _julia_init(JL_IMAGE_SEARCH rel)
ptls->world_age = last_age;
}
}
else {
// nthreads > 1 requires code in Base
jl_n_threads = 1;
}
jl_start_threads();

// This needs to be after jl_start_threads
Expand Down
2 changes: 2 additions & 0 deletions src/julia.h
Original file line number Diff line number Diff line change
Expand Up @@ -1662,6 +1662,8 @@ typedef struct _jl_task_t {
// id of owning thread
// does not need to be defined until the task runs
int16_t tid;
/* for the multiqueue */
int16_t prio;
#ifdef JULIA_ENABLE_THREADING
// This is statically initialized when the task is not holding any locks
arraylist_t locks;
Expand Down
19 changes: 18 additions & 1 deletion src/julia_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,6 @@ extern ssize_t jl_tls_offset;
extern const int jl_tls_elf_support;
void jl_init_threading(void);
void jl_start_threads(void);
void jl_shutdown_threading(void);

// Whether the GC is running
extern char *jl_safepoint_pages;
Expand Down Expand Up @@ -708,6 +707,24 @@ void jl_copy_excstack(jl_excstack_t *dest, jl_excstack_t *src) JL_NOTSAFEPOINT;
// Returns time in nanosec
JL_DLLEXPORT uint64_t jl_hrtime(void);

// congruential random number generator
// for a small amount of thread-local randomness
// we could just use libc:`rand()`, but we want to ensure this is fast
STATIC_INLINE void seed_cong(uint64_t *seed)
{
*seed = rand();
}
STATIC_INLINE void unbias_cong(uint64_t max, uint64_t *unbias)
{
*unbias = UINT64_MAX - ((UINT64_MAX % max) + 1);
}
STATIC_INLINE uint64_t cong(uint64_t max, uint64_t unbias, uint64_t *seed)
{
while ((*seed = 69069 * (*seed) + 362437) > unbias)
;
return *seed % max;
}

// libuv stuff:
JL_DLLEXPORT extern void *jl_dl_handle;
JL_DLLEXPORT extern void *jl_RTLD_DEFAULT_handle;
Expand Down
3 changes: 2 additions & 1 deletion src/julia_threads.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ typedef struct _jl_excstack_t jl_excstack_t;
struct _jl_tls_states_t {
struct _jl_gcframe_t *pgcstack;
size_t world_age;
int16_t tid;
uint64_t rngseed;
volatile size_t *safepoint;
// Whether it is safe to execute GC at the same time.
#define JL_GC_STATE_WAITING 1
Expand All @@ -158,7 +160,6 @@ struct _jl_tls_states_t {
size_t stacksize;
jl_ucontext_t base_ctx; // base context of stack
jl_jmp_buf *safe_restore;
int16_t tid;
// Temp storage for exception thrown in signal handler. Not rooted.
struct _jl_value_t *sig_exception;
// Temporary backtrace buffer. Scanned for gc roots when bt_size > 0.
Expand Down
16 changes: 16 additions & 0 deletions src/locks.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,22 @@ static inline void jl_mutex_lock(jl_mutex_t *lock)
jl_gc_enable_finalizers(ptls, 0);
}

static inline int jl_mutex_trylock_nogc(jl_mutex_t *lock)
{
unsigned long self = jl_thread_self();
unsigned long owner = jl_atomic_load_acquire(&lock->owner);
if (owner == self) {
lock->count++;
return 1;
}
if (owner == 0 &&
jl_atomic_compare_exchange(&lock->owner, 0, self) == 0) {
lock->count = 1;
return 1;
}
return 0;
}

/* Call this function for code that could be called from either a managed
or an unmanaged thread */
static inline void jl_mutex_lock_maybe_nogc(jl_mutex_t *lock)
Expand Down
19 changes: 19 additions & 0 deletions src/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,25 @@
#define MACHINE_EXCLUSIVE_NAME "JULIA_EXCLUSIVE"
#define DEFAULT_MACHINE_EXCLUSIVE 0

// partr -- parallel tasks runtime options ------------------------------------

// multiq
// number of heaps = MULTIQ_HEAP_C * nthreads
#define MULTIQ_HEAP_C 4
// how many in each heap
#define MULTIQ_TASKS_PER_HEAP 129

// parfor
// tasks = niters / (GRAIN_K * nthreads)
#define GRAIN_K 4

// synchronization
// narrivers = ((GRAIN_K * nthreads) ^ ARRIVERS_P) + 1
// limit for number of recursive parfors
#define ARRIVERS_P 2
// nreducers = narrivers * REDUCERS_FRAC
#define REDUCERS_FRAC 1


// sanitizer defaults ---------------------------------------------------------

Expand Down
Loading