aboutsummaryrefslogtreecommitdiffstats
path: root/thread_sync.c
diff options
context:
space:
mode:
authorKoichi Sasada <ko1@atdot.net>2023-04-10 10:53:13 +0900
committerKoichi Sasada <ko1@atdot.net>2023-10-12 14:47:01 +0900
commitbe1bbd5b7d40ad863ab35097765d3754726bbd54 (patch)
tree2995a0859bea1d6b2903dcd324f41869dbef14a1 /thread_sync.c
parent096ee0648e215915a3019c2cd68ba220d94eca12 (diff)
downloadruby-be1bbd5b7d40ad863ab35097765d3754726bbd54.tar.gz
M:N thread scheduler for Ractors
This patch introduce M:N thread scheduler for Ractor system. In general, M:N thread scheduler employs N native threads (OS threads) to manage M user-level threads (Ruby threads in this case). On the Ruby interpreter, 1 native thread is provided for 1 Ractor and all Ruby threads are managed by the native thread. From Ruby 1.9, the interpreter uses 1:1 thread scheduler which means 1 Ruby thread has 1 native thread. M:N scheduler change this strategy. Because of compatibility issue (and stableness issue of the implementation) main Ractor doesn't use M:N scheduler on default. On the other words, threads on the main Ractor will be managed with 1:1 thread scheduler. There are additional settings by environment variables: `RUBY_MN_THREADS=1` enables M:N thread scheduler on the main ractor. Note that non-main ractors use the M:N scheduler without this configuration. With this configuration, single ractor applications run threads on M:1 thread scheduler (green threads, user-level threads). `RUBY_MAX_CPU=n` specifies maximum number of native threads for M:N scheduler (default: 8). This patch will be reverted soon if non-easy issues are found. [Bug #19842]
Diffstat (limited to 'thread_sync.c')
-rw-r--r--thread_sync.c108
1 files changed, 64 insertions, 44 deletions
diff --git a/thread_sync.c b/thread_sync.c
index ca463c35f1..825fdde76f 100644
--- a/thread_sync.c
+++ b/thread_sync.c
@@ -41,6 +41,8 @@ struct queue_sleep_arg {
static void
sync_wakeup(struct ccan_list_head *head, long max)
{
+ RUBY_DEBUG_LOG("max:%ld", max);
+
struct sync_waiter *cur = 0, *next;
ccan_list_for_each_safe(head, cur, next, node) {
@@ -51,6 +53,7 @@ sync_wakeup(struct ccan_list_head *head, long max)
rb_fiber_scheduler_unblock(cur->th->scheduler, cur->self, rb_fiberptr_self(cur->fiber));
}
else {
+ RUBY_DEBUG_LOG("target_th:%u", rb_th_serial(cur->th));
rb_threadptr_interrupt(cur->th);
cur->th->status = THREAD_RUNNABLE;
}
@@ -251,6 +254,8 @@ rb_mutex_trylock(VALUE self)
rb_mutex_t *mutex = mutex_ptr(self);
if (mutex->fiber == 0) {
+ RUBY_DEBUG_LOG("%p ok", mutex);
+
rb_fiber_t *fiber = GET_EC()->fiber_ptr;
rb_thread_t *th = GET_THREAD();
mutex->fiber = fiber;
@@ -258,17 +263,12 @@ rb_mutex_trylock(VALUE self)
mutex_locked(th, self);
return Qtrue;
}
-
- return Qfalse;
+ else {
+ RUBY_DEBUG_LOG("%p ng", mutex);
+ return Qfalse;
+ }
}
-/*
- * At maximum, only one thread can use cond_timedwait and watch deadlock
- * periodically. Multiple polling thread (i.e. concurrent deadlock check)
- * introduces new race conditions. [Bug #6278] [ruby-core:44275]
- */
-static const rb_thread_t *patrol_thread = NULL;
-
static VALUE
mutex_owned_p(rb_fiber_t *fiber, rb_mutex_t *mutex)
{
@@ -290,6 +290,8 @@ delete_from_waitq(VALUE value)
return Qnil;
}
+static inline rb_atomic_t threadptr_get_interrupts(rb_thread_t *th);
+
static VALUE
do_mutex_lock(VALUE self, int interruptible_p)
{
@@ -297,6 +299,7 @@ do_mutex_lock(VALUE self, int interruptible_p)
rb_thread_t *th = ec->thread_ptr;
rb_fiber_t *fiber = ec->fiber_ptr;
rb_mutex_t *mutex = mutex_ptr(self);
+ rb_atomic_t saved_ints = 0;
/* When running trap handler */
if (!FL_TEST_RAW(self, MUTEX_ALLOW_TRAP) &&
@@ -310,6 +313,8 @@ do_mutex_lock(VALUE self, int interruptible_p)
}
while (mutex->fiber != fiber) {
+ VM_ASSERT(mutex->fiber != NULL);
+
VALUE scheduler = rb_fiber_scheduler_current();
if (scheduler != Qnil) {
struct sync_waiter sync_waiter = {
@@ -331,51 +336,47 @@ do_mutex_lock(VALUE self, int interruptible_p)
rb_raise(rb_eThreadError, "deadlock; lock already owned by another fiber belonging to the same thread");
}
- enum rb_thread_status prev_status = th->status;
- rb_hrtime_t *timeout = 0;
- rb_hrtime_t rel = rb_msec2hrtime(100);
-
- th->status = THREAD_STOPPED_FOREVER;
- th->locking_mutex = self;
- rb_ractor_sleeper_threads_inc(th->ractor);
- /*
- * Carefully! while some contended threads are in native_sleep(),
- * ractor->sleeper is unstable value. we have to avoid both deadlock
- * and busy loop.
- */
- if ((rb_ractor_living_thread_num(th->ractor) == rb_ractor_sleeper_thread_num(th->ractor)) &&
- !patrol_thread) {
- timeout = &rel;
- patrol_thread = th;
- }
-
struct sync_waiter sync_waiter = {
.self = self,
.th = th,
- .fiber = nonblocking_fiber(fiber)
+ .fiber = nonblocking_fiber(fiber),
};
- ccan_list_add_tail(&mutex->waitq, &sync_waiter.node);
+ RUBY_DEBUG_LOG("%p wait", mutex);
+
+ // similar code with `sleep_forever`, but
+ // sleep_forever(SLEEP_DEADLOCKABLE) raises an exception.
+ // Ensure clause is needed like but `rb_ensure` a bit slow.
+ //
+ // begin
+ // sleep_forever(th, SLEEP_DEADLOCKABLE);
+ // ensure
+ // ccan_list_del(&sync_waiter.node);
+ // end
+ enum rb_thread_status prev_status = th->status;
+ th->status = THREAD_STOPPED_FOREVER;
+ rb_ractor_sleeper_threads_inc(th->ractor);
+ rb_check_deadlock(th->ractor);
- native_sleep(th, timeout); /* release GVL */
+ th->locking_mutex = self;
+ ccan_list_add_tail(&mutex->waitq, &sync_waiter.node);
+ {
+ native_sleep(th, NULL);
+ }
ccan_list_del(&sync_waiter.node);
+ // unlocked by another thread while sleeping
if (!mutex->fiber) {
mutex->fiber = fiber;
}
- if (patrol_thread == th)
- patrol_thread = NULL;
-
- th->locking_mutex = Qfalse;
- if (mutex->fiber && timeout && !RUBY_VM_INTERRUPTED(th->ec)) {
- rb_check_deadlock(th->ractor);
- }
- if (th->status == THREAD_STOPPED_FOREVER) {
- th->status = prev_status;
- }
rb_ractor_sleeper_threads_dec(th->ractor);
+ th->status = prev_status;
+ th->locking_mutex = Qfalse;
+ th->locking_mutex = Qfalse;
+
+ RUBY_DEBUG_LOG("%p wakeup", mutex);
}
if (interruptible_p) {
@@ -387,11 +388,27 @@ do_mutex_lock(VALUE self, int interruptible_p)
mutex->fiber = fiber;
}
}
+ else {
+ // clear interrupt information
+ if (RUBY_VM_INTERRUPTED(th->ec)) {
+ // reset interrupts
+ if (saved_ints == 0) {
+ saved_ints = threadptr_get_interrupts(th);
+ }
+ else {
+ // ignore additional interrupts
+ threadptr_get_interrupts(th);
+ }
+ }
+ }
}
+ if (saved_ints) th->ec->interrupt_flag = saved_ints;
if (mutex->fiber == fiber) mutex_locked(th, self);
}
+ RUBY_DEBUG_LOG("%p locked", mutex);
+
// assertion
if (mutex_owned_p(fiber, mutex) == Qfalse) rb_bug("do_mutex_lock: mutex is not owned.");
@@ -435,6 +452,8 @@ rb_mutex_owned_p(VALUE self)
static const char *
rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th, rb_fiber_t *fiber)
{
+ RUBY_DEBUG_LOG("%p", mutex);
+
if (mutex->fiber == 0) {
return "Attempt to unlock a mutex which is not locked";
}
@@ -456,13 +475,14 @@ rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th, rb_fiber_t *fiber)
}
else {
switch (cur->th->status) {
- case THREAD_RUNNABLE: /* from someone else calling Thread#run */
- case THREAD_STOPPED_FOREVER: /* likely (rb_mutex_lock) */
+ case THREAD_RUNNABLE: /* from someone else calling Thread#run */
+ case THREAD_STOPPED_FOREVER: /* likely (rb_mutex_lock) */
+ RUBY_DEBUG_LOG("wakeup th:%u", rb_th_serial(cur->th));
rb_threadptr_interrupt(cur->th);
return NULL;
- case THREAD_STOPPED: /* probably impossible */
+ case THREAD_STOPPED: /* probably impossible */
rb_bug("unexpected THREAD_STOPPED");
- case THREAD_KILLED:
+ case THREAD_KILLED:
/* not sure about this, possible in exit GC? */
rb_bug("unexpected THREAD_KILLED");
continue;