diff --git a/src/julia_threads.h b/src/julia_threads.h index a626da27c7d65..06088d52f48fb 100644 --- a/src/julia_threads.h +++ b/src/julia_threads.h @@ -144,6 +144,7 @@ struct _jl_tls_states_t { int16_t tid; uint64_t rngseed; volatile size_t *safepoint; + volatile int8_t sleep_check_state; // Whether it is safe to execute GC at the same time. #define JL_GC_STATE_WAITING 1 // gc_state = 1 means the thread is doing GC or is waiting for the GC to diff --git a/src/partr.c b/src/partr.c index a64915bbca16f..734607bc38ae1 100644 --- a/src/partr.c +++ b/src/partr.c @@ -315,9 +315,12 @@ static void wake_thread(int16_t self, int16_t tid) { if (self != tid) { jl_ptls_t other = jl_all_tls_states[tid]; - uv_mutex_lock(&other->sleep_lock); - uv_cond_signal(&other->wake_signal); - uv_mutex_unlock(&other->sleep_lock); + int16_t state = jl_atomic_exchange(&other->sleep_check_state, not_sleeping); + if (state == sleeping) { + uv_mutex_lock(&other->sleep_lock); + uv_cond_signal(&other->wake_signal); + uv_mutex_unlock(&other->sleep_lock); + } } } @@ -386,12 +389,14 @@ JL_DLLEXPORT void jl_set_task_tid(jl_task_t *task, int tid) JL_NOTSAFEPOINT // get the next runnable task from the multiq static jl_task_t *get_next_task(jl_value_t *getsticky) { + jl_gc_safepoint(); jl_task_t *task = (jl_task_t*)jl_apply(&getsticky, 1); if (jl_typeis(task, jl_task_type)) { int self = jl_get_ptls_states()->tid; jl_set_task_tid(task, self); return task; } + jl_gc_safepoint(); #ifdef JULIA_ENABLE_THREADING return multiq_deletemin(); #else @@ -399,6 +404,11 @@ static jl_task_t *get_next_task(jl_value_t *getsticky) #endif } +static int may_sleep(jl_ptls_t ptls) +{ + return jl_atomic_load(&sleep_check_state) == sleeping && jl_atomic_load(&ptls->sleep_check_state) == sleeping; +} + extern volatile unsigned _threadedregion; JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *getsticky) @@ -408,12 +418,12 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *getsticky) jl_task_t *task; while (1) { - jl_gc_safepoint(); task = get_next_task(getsticky); if (task) return task; #ifdef JULIA_ENABLE_THREADING + // quick, race-y check to see if there seems to be any stuff in there jl_cpu_pause(); if (!multiq_check_empty()) { start_cycles = 0; @@ -425,6 +435,7 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *getsticky) if (sleep_check_after_threshold(&start_cycles) || (!_threadedregion && ptls->tid == 0)) { if (!sleep_check_now(ptls->tid)) continue; + jl_atomic_store(&ptls->sleep_check_state, sleeping); // acquire sleep-check lock task = get_next_task(getsticky); if (task) return task; @@ -449,29 +460,26 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *getsticky) JL_UV_UNLOCK(); } else { - // otherwise, block until someone asks us for the lock - task = get_next_task(getsticky); - if (task) { - JL_UV_UNLOCK(); - return task; - } + // otherwise, we may block until someone asks us for the lock uv_loop_t *loop = jl_global_event_loop(); - loop->stop_flag = 0; - active = uv_run(loop, UV_RUN_ONCE); + jl_gc_safepoint(); + if (may_sleep(ptls)) { + loop->stop_flag = 0; + active = uv_run(loop, UV_RUN_ONCE); + } JL_UV_UNLOCK(); - // optimization: check again first if we added work for ourself - task = get_next_task(getsticky); - if (task) - return task; - // or someone else might have - if (jl_atomic_load(&sleep_check_state) != sleeping) { + // optimization: check again first if we may have work to do + if (!may_sleep(ptls)) { start_cycles = 0; continue; } // otherwise, we got a spurious wakeup since some other - // thread just wanted to steal libuv from us, + // thread that just wanted to steal libuv from us, // just go right back to sleep on the other wake signal // to let them take it from us without conflict + // TODO: this relinquishes responsibility for all event + // to the last thread to do an explicit operation, + // which may starve other threads of critical work } if (!_threadedregion && active && ptls->tid == 0) { // thread 0 is the only thread permitted to run the event loop @@ -480,20 +488,16 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *getsticky) continue; } } + // the other threads will just wait for on signal to resume int8_t gc_state = jl_gc_safe_enter(ptls); uv_mutex_lock(&ptls->sleep_lock); - while (jl_atomic_load(&sleep_check_state) == sleeping) { - task = get_next_task(getsticky); - if (task) - break; + while (may_sleep(ptls)) { uv_cond_wait(&ptls->wake_signal, &ptls->sleep_lock); } uv_mutex_unlock(&ptls->sleep_lock); - jl_gc_safe_leave(ptls, gc_state); + jl_gc_safe_leave(ptls, gc_state); // contains jl_gc_safepoint start_cycles = 0; - if (task) - return task; } else { // maybe check the kernel for new messages too