aboutsummaryrefslogtreecommitdiffstats
path: root/thread.c
diff options
context:
space:
mode:
authorKoichi Sasada <ko1@atdot.net>2023-04-10 10:53:13 +0900
committerKoichi Sasada <ko1@atdot.net>2023-10-12 14:47:01 +0900
commitbe1bbd5b7d40ad863ab35097765d3754726bbd54 (patch)
tree2995a0859bea1d6b2903dcd324f41869dbef14a1 /thread.c
parent096ee0648e215915a3019c2cd68ba220d94eca12 (diff)
downloadruby-be1bbd5b7d40ad863ab35097765d3754726bbd54.tar.gz
M:N thread scheduler for Ractors
This patch introduce M:N thread scheduler for Ractor system. In general, M:N thread scheduler employs N native threads (OS threads) to manage M user-level threads (Ruby threads in this case). On the Ruby interpreter, 1 native thread is provided for 1 Ractor and all Ruby threads are managed by the native thread. From Ruby 1.9, the interpreter uses 1:1 thread scheduler which means 1 Ruby thread has 1 native thread. M:N scheduler change this strategy. Because of compatibility issue (and stableness issue of the implementation) main Ractor doesn't use M:N scheduler on default. On the other words, threads on the main Ractor will be managed with 1:1 thread scheduler. There are additional settings by environment variables: `RUBY_MN_THREADS=1` enables M:N thread scheduler on the main ractor. Note that non-main ractors use the M:N scheduler without this configuration. With this configuration, single ractor applications run threads on M:1 thread scheduler (green threads, user-level threads). `RUBY_MAX_CPU=n` specifies maximum number of native threads for M:N scheduler (default: 8). This patch will be reverted soon if non-easy issues are found. [Bug #19842]
Diffstat (limited to 'thread.c')
-rw-r--r--thread.c287
1 files changed, 118 insertions, 169 deletions
diff --git a/thread.c b/thread.c
index 7654fc03a4..d59e3d8773 100644
--- a/thread.c
+++ b/thread.c
@@ -147,7 +147,6 @@ static const char *thread_status_name(rb_thread_t *th, int detail);
static int hrtime_update_expire(rb_hrtime_t *, const rb_hrtime_t);
NORETURN(static void async_bug_fd(const char *mesg, int errno_arg, int fd));
static int consume_communication_pipe(int fd);
-static int check_signals_nogvl(rb_thread_t *, int sigwait_fd);
static volatile int system_working = 1;
@@ -260,12 +259,6 @@ timeout_prepare(rb_hrtime_t **to, rb_hrtime_t *rel, rb_hrtime_t *end,
MAYBE_UNUSED(NOINLINE(static int thread_start_func_2(rb_thread_t *th, VALUE *stack_start)));
-static void
-ubf_sigwait(void *ignore)
-{
- rb_thread_wakeup_timer_thread(0);
-}
-
#include THREAD_IMPL_SRC
/*
@@ -646,20 +639,13 @@ static int
thread_start_func_2(rb_thread_t *th, VALUE *stack_start)
{
STACK_GROW_DIR_DETECTION;
- enum ruby_tag_type state;
- VALUE errinfo = Qnil;
- size_t size = th->vm->default_params.thread_vm_stack_size / sizeof(VALUE);
- rb_thread_t *ractor_main_th = th->ractor->threads.main;
- VALUE * vm_stack = NULL;
- VM_ASSERT(th != th->vm->ractor.main_thread);
RUBY_DEBUG_LOG("th:%u", rb_th_serial(th));
+ VM_ASSERT(th != th->vm->ractor.main_thread);
- // setup native thread
- thread_sched_to_running(TH_SCHED(th), th);
- ruby_thread_set_native(th);
-
- RUBY_DEBUG_LOG("got lock. th:%u", rb_th_serial(th));
+ enum ruby_tag_type state;
+ VALUE errinfo = Qnil;
+ rb_thread_t *ractor_main_th = th->ractor->threads.main;
// setup ractor
if (rb_ractor_status_p(th->ractor, ractor_blocking)) {
@@ -674,17 +660,6 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start)
RB_VM_UNLOCK();
}
- // This assertion is not passed on win32 env. Check it later.
- // VM_ASSERT((size * sizeof(VALUE)) <= th->ec->machine.stack_maxsize);
-
- // setup VM and machine stack
- vm_stack = alloca(size * sizeof(VALUE));
- VM_ASSERT(vm_stack);
-
- rb_ec_initialize_vm_stack(th->ec, vm_stack, size);
- th->ec->machine.stack_start = STACK_DIR_UPPER(vm_stack + size, vm_stack);
- th->ec->machine.stack_maxsize -= size * sizeof(VALUE);
-
// Ensure that we are not joinable.
VM_ASSERT(UNDEF_P(th->value));
@@ -990,11 +965,11 @@ rb_thread_create(VALUE (*fn)(void *), void *arg)
}
VALUE
-rb_thread_create_ractor(rb_ractor_t *g, VALUE args, VALUE proc)
+rb_thread_create_ractor(rb_ractor_t *r, VALUE args, VALUE proc)
{
struct thread_create_params params = {
.type = thread_invoke_type_ractor_proc,
- .g = g,
+ .g = r,
.args = args,
.proc = proc,
};
@@ -1375,14 +1350,14 @@ sleep_forever(rb_thread_t *th, unsigned int fl)
void
rb_thread_sleep_forever(void)
{
- RUBY_DEBUG_LOG("");
+ RUBY_DEBUG_LOG("forever");
sleep_forever(GET_THREAD(), SLEEP_SPURIOUS_CHECK);
}
void
rb_thread_sleep_deadly(void)
{
- RUBY_DEBUG_LOG("");
+ RUBY_DEBUG_LOG("deadly");
sleep_forever(GET_THREAD(), SLEEP_DEADLOCKABLE|SLEEP_SPURIOUS_CHECK);
}
@@ -1394,7 +1369,7 @@ rb_thread_sleep_deadly_allow_spurious_wakeup(VALUE blocker, VALUE timeout, rb_hr
rb_fiber_scheduler_block(scheduler, blocker, timeout);
}
else {
- RUBY_DEBUG_LOG("");
+ RUBY_DEBUG_LOG("...");
if (end) {
sleep_hrtime_until(GET_THREAD(), end, SLEEP_SPURIOUS_CHECK);
}
@@ -1491,7 +1466,7 @@ blocking_region_begin(rb_thread_t *th, struct rb_blocking_region_buffer *region,
th->status = THREAD_STOPPED;
rb_ractor_blocking_threads_inc(th->ractor, __FILE__, __LINE__);
- RUBY_DEBUG_LOG("");
+ RUBY_DEBUG_LOG("thread_id:%p", (void *)th->nt->thread_id);
RB_VM_SAVE_MACHINE_CONTEXT(th);
thread_sched_to_waiting(TH_SCHED(th), th);
@@ -1519,8 +1494,12 @@ blocking_region_end(rb_thread_t *th, struct rb_blocking_region_buffer *region)
th->status = region->prev_status;
}
- RUBY_DEBUG_LOG("");
+ RUBY_DEBUG_LOG("end");
+
+#ifndef _WIN32
+ // GET_THREAD() clears WSAGetLastError()
VM_ASSERT(th == GET_THREAD());
+#endif
}
void *
@@ -1544,14 +1523,11 @@ rb_nogvl(void *(*func)(void *), void *data1,
if (flags & RB_NOGVL_UBF_ASYNC_SAFE) {
vm->ubf_async_safe = 1;
}
- else {
- ubf_th = rb_thread_start_unblock_thread();
- }
}
BLOCKING_REGION(th, {
val = func(data1);
- saved_errno = errno;
+ saved_errno = rb_errno();
}, ubf, data2, flags & RB_NOGVL_INTR_FAIL);
if (is_main_thread) vm->ubf_async_safe = 0;
@@ -1564,7 +1540,7 @@ rb_nogvl(void *(*func)(void *), void *data1,
thread_value(rb_thread_kill(ubf_th));
}
- errno = saved_errno;
+ rb_errno_set(saved_errno);
return val;
}
@@ -1689,11 +1665,31 @@ rb_thread_io_wake_pending_closer(struct waiting_fd *wfd)
}
}
+static int
+waitfd_to_waiting_flag(int wfd_event)
+{
+ return wfd_event << 1;
+}
+
VALUE
-rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd)
+rb_thread_io_blocking_call(rb_blocking_function_t *func, void *data1, int fd, int events)
{
- volatile VALUE val = Qundef; /* shouldn't be used */
rb_execution_context_t * volatile ec = GET_EC();
+ rb_thread_t *th = rb_ec_thread_ptr(ec);
+
+ RUBY_DEBUG_LOG("th:%u fd:%d ev:%d", rb_th_serial(th), fd, events);
+
+#ifdef RUBY_THREAD_PTHREAD_H
+ if (events && !th_has_dedicated_nt(th)) {
+ VM_ASSERT(events == RB_WAITFD_IN || events == RB_WAITFD_OUT);
+
+ // wait readable/writable
+ thread_sched_wait_events(TH_SCHED(th), th, fd, waitfd_to_waiting_flag(events), NULL);
+ RUBY_VM_CHECK_INTS_BLOCKING(ec);
+ }
+#endif
+
+ volatile VALUE val = Qundef; /* shouldn't be used */
volatile int saved_errno = 0;
enum ruby_tag_type state;
@@ -1746,6 +1742,12 @@ rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd)
return val;
}
+VALUE
+rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd)
+{
+ return rb_thread_io_blocking_call(func, data1, fd, 0);
+}
+
/*
* rb_thread_call_with_gvl - re-enter the Ruby world after GVL release.
*
@@ -2379,15 +2381,12 @@ rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing)
/* signal handling */
if (trap_interrupt && (th == th->vm->ractor.main_thread)) {
enum rb_thread_status prev_status = th->status;
- int sigwait_fd = rb_sigwait_fd_get(th);
- if (sigwait_fd >= 0) {
- (void)consume_communication_pipe(sigwait_fd);
- rb_sigwait_fd_put(th, sigwait_fd);
- }
th->status = THREAD_RUNNABLE;
- while ((sig = rb_get_next_signal()) != 0) {
- ret |= rb_signal_exec(th, sig);
+ {
+ while ((sig = rb_get_next_signal()) != 0) {
+ ret |= rb_signal_exec(th, sig);
+ }
}
th->status = prev_status;
}
@@ -2432,7 +2431,7 @@ rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing)
limits_us >>= -th->priority;
if (th->status == THREAD_RUNNABLE)
- th->running_time_us += TIME_QUANTUM_USEC;
+ th->running_time_us += 10 * 1000; // 10ms = 10_000us // TODO: use macro
VM_ASSERT(th->ec->cfp);
EXEC_EVENT_HOOK(th->ec, RUBY_INTERNAL_EVENT_SWITCH, th->ec->cfp->self,
@@ -3362,7 +3361,7 @@ rb_thread_setname(VALUE thread, VALUE name)
name = rb_str_new_frozen(name);
}
target_th->name = name;
- if (threadptr_initialized(target_th)) {
+ if (threadptr_initialized(target_th) && target_th->has_dedicated_nt) {
native_set_another_thread_name(target_th->nt->thread_id, name);
}
return name;
@@ -4148,7 +4147,6 @@ wait_retryable(int *result, int errnum, rb_hrtime_t *rel, rb_hrtime_t end)
struct select_set {
int max;
- int sigwait_fd;
rb_thread_t *th;
rb_fdset_t *rset;
rb_fdset_t *wset;
@@ -4164,10 +4162,6 @@ select_set_free(VALUE p)
{
struct select_set *set = (struct select_set *)p;
- if (set->sigwait_fd >= 0) {
- rb_sigwait_fd_put(set->th, set->sigwait_fd);
- }
-
rb_fd_term(&set->orig_rset);
rb_fd_term(&set->orig_wset);
rb_fd_term(&set->orig_eset);
@@ -4175,24 +4169,6 @@ select_set_free(VALUE p)
return Qfalse;
}
-static const rb_hrtime_t *
-sigwait_timeout(rb_thread_t *th, int sigwait_fd, const rb_hrtime_t *orig,
- int *drained_p)
-{
- static const rb_hrtime_t quantum = TIME_QUANTUM_USEC * 1000;
-
- if (sigwait_fd >= 0 && (!ubf_threads_empty() || BUSY_WAIT_SIGNALS)) {
- *drained_p = check_signals_nogvl(th, sigwait_fd);
- if (!orig || *orig > quantum)
- return &quantum;
- }
-
- return orig;
-}
-
-#define sigwait_signals_fd(result, cond, sigwait_fd) \
- (result > 0 && (cond) ? (result--, (sigwait_fd)) : -1)
-
static VALUE
do_select(VALUE p)
{
@@ -4211,28 +4187,18 @@ do_select(VALUE p)
TRUE)
do {
- int drained;
lerrno = 0;
BLOCKING_REGION(set->th, {
- const rb_hrtime_t *sto;
struct timeval tv;
- sto = sigwait_timeout(set->th, set->sigwait_fd, to, &drained);
if (!RUBY_VM_INTERRUPTED(set->th->ec)) {
- result = native_fd_select(set->max, set->rset, set->wset,
- set->eset,
- rb_hrtime2timeval(&tv, sto), set->th);
+ result = native_fd_select(set->max,
+ set->rset, set->wset, set->eset,
+ rb_hrtime2timeval(&tv, to), set->th);
if (result < 0) lerrno = errno;
}
- }, set->sigwait_fd >= 0 ? ubf_sigwait : ubf_select, set->th, TRUE);
-
- if (set->sigwait_fd >= 0) {
- int fd = sigwait_signals_fd(result,
- rb_fd_isset(set->sigwait_fd, set->rset),
- set->sigwait_fd);
- (void)check_signals_nogvl(set->th, fd);
- }
+ }, ubf_select, set->th, TRUE);
RUBY_VM_CHECK_INTS_BLOCKING(set->th->ec); /* may raise */
} while (wait_retryable(&result, lerrno, to, end) && do_select_update());
@@ -4244,18 +4210,6 @@ do_select(VALUE p)
return (VALUE)result;
}
-static rb_fdset_t *
-init_set_fd(int fd, rb_fdset_t *fds)
-{
- if (fd < 0) {
- return 0;
- }
- rb_fd_init(fds);
- rb_fd_set(fd, fds);
-
- return fds;
-}
-
int
rb_thread_fd_select(int max, rb_fdset_t * read, rb_fdset_t * write, rb_fdset_t * except,
struct timeval *timeout)
@@ -4279,16 +4233,6 @@ rb_thread_fd_select(int max, rb_fdset_t * read, rb_fdset_t * write, rb_fdset_t *
return 0;
}
- set.sigwait_fd = rb_sigwait_fd_get(set.th);
- if (set.sigwait_fd >= 0) {
- if (set.rset)
- rb_fd_set(set.sigwait_fd, set.rset);
- else
- set.rset = init_set_fd(set.sigwait_fd, &set.orig_rset);
- if (set.sigwait_fd >= set.max) {
- set.max = set.sigwait_fd + 1;
- }
- }
#define fd_init_copy(f) do { \
if (set.f) { \
rb_fd_resize(set.max - 1, set.f); \
@@ -4325,19 +4269,35 @@ rb_thread_fd_select(int max, rb_fdset_t * read, rb_fdset_t * write, rb_fdset_t *
int
rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout)
{
- struct pollfd fds[2];
+ struct pollfd fds[1];
int result = 0;
- int drained;
nfds_t nfds;
- rb_unblock_function_t *ubf;
struct waiting_fd wfd;
int state;
volatile int lerrno;
- wfd.th = GET_THREAD();
+ rb_thread_t *th = wfd.th = GET_THREAD();
wfd.fd = fd;
wfd.busy = NULL;
+#ifdef RUBY_THREAD_PTHREAD_H
+ if (!th->nt->dedicated) {
+ rb_hrtime_t rel, *prel;
+
+ if (timeout) {
+ rel = rb_timeval2hrtime(timeout);
+ prel = &rel;
+ }
+ else {
+ prel = NULL;
+ }
+
+ if (thread_sched_wait_events(TH_SCHED(th), th, fd, waitfd_to_waiting_flag(events), prel)) {
+ return 0; // timeout
+ }
+ }
+#endif
+
RB_VM_LOCK_ENTER();
{
ccan_list_add(&wfd.th->vm->waiting_fds, &wfd.wfd_node);
@@ -4353,36 +4313,18 @@ rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout)
fds[0].events = (short)events;
fds[0].revents = 0;
do {
- fds[1].fd = rb_sigwait_fd_get(wfd.th);
-
- if (fds[1].fd >= 0) {
- fds[1].events = POLLIN;
- fds[1].revents = 0;
- nfds = 2;
- ubf = ubf_sigwait;
- }
- else {
- nfds = 1;
- ubf = ubf_select;
- }
+ nfds = 1;
lerrno = 0;
BLOCKING_REGION(wfd.th, {
- const rb_hrtime_t *sto;
struct timespec ts;
- sto = sigwait_timeout(wfd.th, fds[1].fd, to, &drained);
if (!RUBY_VM_INTERRUPTED(wfd.th->ec)) {
- result = ppoll(fds, nfds, rb_hrtime2timespec(&ts, sto), 0);
+ result = ppoll(fds, nfds, rb_hrtime2timespec(&ts, to), 0);
if (result < 0) lerrno = errno;
}
- }, ubf, wfd.th, TRUE);
+ }, ubf_select, wfd.th, TRUE);
- if (fds[1].fd >= 0) {
- int fd1 = sigwait_signals_fd(result, fds[1].revents, fds[1].fd);
- (void)check_signals_nogvl(wfd.th, fd1);
- rb_sigwait_fd_put(wfd.th, fds[1].fd);
- }
RUBY_VM_CHECK_INTS_BLOCKING(wfd.th->ec);
} while (wait_retryable(&result, lerrno, to, end));
}
@@ -4470,6 +4412,18 @@ select_single_cleanup(VALUE ptr)
return (VALUE)-1;
}
+static rb_fdset_t *
+init_set_fd(int fd, rb_fdset_t *fds)
+{
+ if (fd < 0) {
+ return 0;
+ }
+ rb_fd_init(fds);
+ rb_fd_set(fd, fds);
+
+ return fds;
+}
+
int
rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout)
{
@@ -4552,16 +4506,13 @@ consume_communication_pipe(int fd)
ssize_t result;
int ret = FALSE; /* for rb_sigwait_sleep */
- /*
- * disarm UBF_TIMER before we read, because it can become
- * re-armed at any time via sighandler and the pipe will refill
- * We can disarm it because this thread is now processing signals
- * and we do not want unnecessary SIGVTALRM
- */
- ubf_timer_disarm();
-
while (1) {
result = read(fd, buff, sizeof(buff));
+#if USE_EVENTFD
+ RUBY_DEBUG_LOG("resultf:%d buff:%lu", (int)result, (unsigned long)buff[0]);
+#else
+ RUBY_DEBUG_LOG("result:%d", (int)result);
+#endif
if (result > 0) {
ret = TRUE;
if (USE_EVENTFD || result < (ssize_t)sizeof(buff)) {
@@ -4588,24 +4539,6 @@ consume_communication_pipe(int fd)
}
}
-static int
-check_signals_nogvl(rb_thread_t *th, int sigwait_fd)
-{
- rb_vm_t *vm = GET_VM(); /* th may be 0 */
- int ret = sigwait_fd >= 0 ? consume_communication_pipe(sigwait_fd) : FALSE;
- ubf_wakeup_all_threads();
- if (rb_signal_buff_size()) {
- if (th == vm->ractor.main_thread) {
- /* no need to lock + wakeup if already in main thread */
- RUBY_VM_SET_TRAP_INTERRUPT(th->ec);
- }
- else {
- threadptr_trap_interrupt(vm->ractor.main_thread);
- }
- }
- return ret;
-}
-
void
rb_thread_stop_timer_thread(void)
{
@@ -4702,6 +4635,10 @@ rb_thread_atfork_internal(rb_thread_t *th, void (*atfork)(rb_thread_t *, const r
rb_ractor_sleeper_threads_clear(th->ractor);
rb_clear_coverages();
+ // restart timer thread (timer threads access to `vm->waitpid_lock` and so on.
+ rb_thread_reset_timer_thread();
+ rb_thread_start_timer_thread();
+
VM_ASSERT(vm->ractor.blocking_cnt == 0);
VM_ASSERT(vm->ractor.cnt == 1);
}
@@ -5467,8 +5404,16 @@ Init_Thread(void)
/* main thread setting */
{
/* acquire global vm lock */
- struct rb_thread_sched *sched = TH_SCHED(th);
- thread_sched_to_running(sched, th);
+#ifdef HAVE_PTHREAD_NP_H
+ VM_ASSERT(TH_SCHED(th)->running == th);
+#endif
+ // thread_sched_to_running() should not be called because
+ // it assumes blocked by thread_sched_to_waiting().
+ // thread_sched_to_running(sched, th);
+
+#ifdef RB_INTERNAL_THREAD_HOOK
+ RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_RESUMED);
+#endif
th->pending_interrupt_queue = rb_ary_hidden_new(0);
th->pending_interrupt_queue_checked = 0;
@@ -5481,7 +5426,7 @@ Init_Thread(void)
Init_thread_sync();
// TODO: Suppress unused function warning for now
- if (0) rb_thread_sched_destroy(NULL);
+ // if (0) rb_thread_sched_destroy(NULL);
}
int
@@ -5511,7 +5456,7 @@ debug_deadlock_check(rb_ractor_t *r, VALUE msg)
ccan_list_for_each(&r->threads.set, th, lt_node) {
rb_str_catf(msg, "* %+"PRIsVALUE"\n rb_thread_t:%p "
"native:%p int:%u",
- th->self, (void *)th, thread_id_str(th), th->ec->interrupt_flag);
+ th->self, (void *)th, th->nt ? thread_id_str(th) : "N/A", th->ec->interrupt_flag);
if (th->locking_mutex) {
rb_mutex_t *mutex = mutex_ptr(th->locking_mutex);
@@ -5537,14 +5482,18 @@ rb_check_deadlock(rb_ractor_t *r)
{
if (GET_THREAD()->vm->thread_ignore_deadlock) return;
- int found = 0;
- rb_thread_t *th = NULL;
+#ifdef RUBY_THREAD_PTHREAD_H
+ if (r->threads.sched.readyq_cnt > 0) return;
+#endif
+
int sleeper_num = rb_ractor_sleeper_thread_num(r);
int ltnum = rb_ractor_living_thread_num(r);
if (ltnum > sleeper_num) return;
if (ltnum < sleeper_num) rb_bug("sleeper must not be more than vm_living_thread_num(vm)");
- if (patrol_thread && patrol_thread != GET_THREAD()) return;
+
+ int found = 0;
+ rb_thread_t *th = NULL;
ccan_list_for_each(&r->threads.set, th, lt_node) {
if (th->status != THREAD_STOPPED_FOREVER || RUBY_VM_INTERRUPTED(th->ec)) {