diff options
author | Koichi Sasada <ko1@atdot.net> | 2023-04-10 10:53:13 +0900 |
---|---|---|
committer | Koichi Sasada <ko1@atdot.net> | 2023-10-12 14:47:01 +0900 |
commit | be1bbd5b7d40ad863ab35097765d3754726bbd54 (patch) | |
tree | 2995a0859bea1d6b2903dcd324f41869dbef14a1 /thread_sync.c | |
parent | 096ee0648e215915a3019c2cd68ba220d94eca12 (diff) | |
download | ruby-be1bbd5b7d40ad863ab35097765d3754726bbd54.tar.gz |
M:N thread scheduler for Ractors
This patch introduce M:N thread scheduler for Ractor system.
In general, M:N thread scheduler employs N native threads (OS threads)
to manage M user-level threads (Ruby threads in this case).
On the Ruby interpreter, 1 native thread is provided for 1 Ractor
and all Ruby threads are managed by the native thread.
From Ruby 1.9, the interpreter uses 1:1 thread scheduler which means
1 Ruby thread has 1 native thread. M:N scheduler change this strategy.
Because of compatibility issue (and stableness issue of the implementation)
main Ractor doesn't use M:N scheduler on default. On the other words,
threads on the main Ractor will be managed with 1:1 thread scheduler.
There are additional settings by environment variables:
`RUBY_MN_THREADS=1` enables M:N thread scheduler on the main ractor.
Note that non-main ractors use the M:N scheduler without this
configuration. With this configuration, single ractor applications
run threads on M:1 thread scheduler (green threads, user-level threads).
`RUBY_MAX_CPU=n` specifies maximum number of native threads for
M:N scheduler (default: 8).
This patch will be reverted soon if non-easy issues are found.
[Bug #19842]
Diffstat (limited to 'thread_sync.c')
-rw-r--r-- | thread_sync.c | 108 |
1 files changed, 64 insertions, 44 deletions
diff --git a/thread_sync.c b/thread_sync.c index ca463c35f1..825fdde76f 100644 --- a/thread_sync.c +++ b/thread_sync.c @@ -41,6 +41,8 @@ struct queue_sleep_arg { static void sync_wakeup(struct ccan_list_head *head, long max) { + RUBY_DEBUG_LOG("max:%ld", max); + struct sync_waiter *cur = 0, *next; ccan_list_for_each_safe(head, cur, next, node) { @@ -51,6 +53,7 @@ sync_wakeup(struct ccan_list_head *head, long max) rb_fiber_scheduler_unblock(cur->th->scheduler, cur->self, rb_fiberptr_self(cur->fiber)); } else { + RUBY_DEBUG_LOG("target_th:%u", rb_th_serial(cur->th)); rb_threadptr_interrupt(cur->th); cur->th->status = THREAD_RUNNABLE; } @@ -251,6 +254,8 @@ rb_mutex_trylock(VALUE self) rb_mutex_t *mutex = mutex_ptr(self); if (mutex->fiber == 0) { + RUBY_DEBUG_LOG("%p ok", mutex); + rb_fiber_t *fiber = GET_EC()->fiber_ptr; rb_thread_t *th = GET_THREAD(); mutex->fiber = fiber; @@ -258,17 +263,12 @@ rb_mutex_trylock(VALUE self) mutex_locked(th, self); return Qtrue; } - - return Qfalse; + else { + RUBY_DEBUG_LOG("%p ng", mutex); + return Qfalse; + } } -/* - * At maximum, only one thread can use cond_timedwait and watch deadlock - * periodically. Multiple polling thread (i.e. concurrent deadlock check) - * introduces new race conditions. [Bug #6278] [ruby-core:44275] - */ -static const rb_thread_t *patrol_thread = NULL; - static VALUE mutex_owned_p(rb_fiber_t *fiber, rb_mutex_t *mutex) { @@ -290,6 +290,8 @@ delete_from_waitq(VALUE value) return Qnil; } +static inline rb_atomic_t threadptr_get_interrupts(rb_thread_t *th); + static VALUE do_mutex_lock(VALUE self, int interruptible_p) { @@ -297,6 +299,7 @@ do_mutex_lock(VALUE self, int interruptible_p) rb_thread_t *th = ec->thread_ptr; rb_fiber_t *fiber = ec->fiber_ptr; rb_mutex_t *mutex = mutex_ptr(self); + rb_atomic_t saved_ints = 0; /* When running trap handler */ if (!FL_TEST_RAW(self, MUTEX_ALLOW_TRAP) && @@ -310,6 +313,8 @@ do_mutex_lock(VALUE self, int interruptible_p) } while (mutex->fiber != fiber) { + VM_ASSERT(mutex->fiber != NULL); + VALUE scheduler = rb_fiber_scheduler_current(); if (scheduler != Qnil) { struct sync_waiter sync_waiter = { @@ -331,51 +336,47 @@ do_mutex_lock(VALUE self, int interruptible_p) rb_raise(rb_eThreadError, "deadlock; lock already owned by another fiber belonging to the same thread"); } - enum rb_thread_status prev_status = th->status; - rb_hrtime_t *timeout = 0; - rb_hrtime_t rel = rb_msec2hrtime(100); - - th->status = THREAD_STOPPED_FOREVER; - th->locking_mutex = self; - rb_ractor_sleeper_threads_inc(th->ractor); - /* - * Carefully! while some contended threads are in native_sleep(), - * ractor->sleeper is unstable value. we have to avoid both deadlock - * and busy loop. - */ - if ((rb_ractor_living_thread_num(th->ractor) == rb_ractor_sleeper_thread_num(th->ractor)) && - !patrol_thread) { - timeout = &rel; - patrol_thread = th; - } - struct sync_waiter sync_waiter = { .self = self, .th = th, - .fiber = nonblocking_fiber(fiber) + .fiber = nonblocking_fiber(fiber), }; - ccan_list_add_tail(&mutex->waitq, &sync_waiter.node); + RUBY_DEBUG_LOG("%p wait", mutex); + + // similar code with `sleep_forever`, but + // sleep_forever(SLEEP_DEADLOCKABLE) raises an exception. + // Ensure clause is needed like but `rb_ensure` a bit slow. + // + // begin + // sleep_forever(th, SLEEP_DEADLOCKABLE); + // ensure + // ccan_list_del(&sync_waiter.node); + // end + enum rb_thread_status prev_status = th->status; + th->status = THREAD_STOPPED_FOREVER; + rb_ractor_sleeper_threads_inc(th->ractor); + rb_check_deadlock(th->ractor); - native_sleep(th, timeout); /* release GVL */ + th->locking_mutex = self; + ccan_list_add_tail(&mutex->waitq, &sync_waiter.node); + { + native_sleep(th, NULL); + } ccan_list_del(&sync_waiter.node); + // unlocked by another thread while sleeping if (!mutex->fiber) { mutex->fiber = fiber; } - if (patrol_thread == th) - patrol_thread = NULL; - - th->locking_mutex = Qfalse; - if (mutex->fiber && timeout && !RUBY_VM_INTERRUPTED(th->ec)) { - rb_check_deadlock(th->ractor); - } - if (th->status == THREAD_STOPPED_FOREVER) { - th->status = prev_status; - } rb_ractor_sleeper_threads_dec(th->ractor); + th->status = prev_status; + th->locking_mutex = Qfalse; + th->locking_mutex = Qfalse; + + RUBY_DEBUG_LOG("%p wakeup", mutex); } if (interruptible_p) { @@ -387,11 +388,27 @@ do_mutex_lock(VALUE self, int interruptible_p) mutex->fiber = fiber; } } + else { + // clear interrupt information + if (RUBY_VM_INTERRUPTED(th->ec)) { + // reset interrupts + if (saved_ints == 0) { + saved_ints = threadptr_get_interrupts(th); + } + else { + // ignore additional interrupts + threadptr_get_interrupts(th); + } + } + } } + if (saved_ints) th->ec->interrupt_flag = saved_ints; if (mutex->fiber == fiber) mutex_locked(th, self); } + RUBY_DEBUG_LOG("%p locked", mutex); + // assertion if (mutex_owned_p(fiber, mutex) == Qfalse) rb_bug("do_mutex_lock: mutex is not owned."); @@ -435,6 +452,8 @@ rb_mutex_owned_p(VALUE self) static const char * rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th, rb_fiber_t *fiber) { + RUBY_DEBUG_LOG("%p", mutex); + if (mutex->fiber == 0) { return "Attempt to unlock a mutex which is not locked"; } @@ -456,13 +475,14 @@ rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th, rb_fiber_t *fiber) } else { switch (cur->th->status) { - case THREAD_RUNNABLE: /* from someone else calling Thread#run */ - case THREAD_STOPPED_FOREVER: /* likely (rb_mutex_lock) */ + case THREAD_RUNNABLE: /* from someone else calling Thread#run */ + case THREAD_STOPPED_FOREVER: /* likely (rb_mutex_lock) */ + RUBY_DEBUG_LOG("wakeup th:%u", rb_th_serial(cur->th)); rb_threadptr_interrupt(cur->th); return NULL; - case THREAD_STOPPED: /* probably impossible */ + case THREAD_STOPPED: /* probably impossible */ rb_bug("unexpected THREAD_STOPPED"); - case THREAD_KILLED: + case THREAD_KILLED: /* not sure about this, possible in exit GC? */ rb_bug("unexpected THREAD_KILLED"); continue; |