aboutsummaryrefslogtreecommitdiffstats
path: root/thread.c
diff options
context:
space:
mode:
authorKoichi Sasada <ko1@atdot.net>2020-03-10 02:22:11 +0900
committerKoichi Sasada <ko1@atdot.net>2020-09-03 21:11:06 +0900
commit79df14c04b452411b9d17e26a398e491bca1a811 (patch)
tree7598cee0f105439efd5bb328a727b0fe27d7c666 /thread.c
parenteeb5325d3bfd71301896360c17e8f51abcb9a7e5 (diff)
downloadruby-79df14c04b452411b9d17e26a398e491bca1a811.tar.gz
Introduce Ractor mechanism for parallel execution
This commit introduces Ractor mechanism to run Ruby program in parallel. See doc/ractor.md for more details about Ractor. See ticket [Feature #17100] to see the implementation details and discussions. [Feature #17100] This commit does not complete the implementation. You can find many bugs on using Ractor. Also the specification will be changed so that this feature is experimental. You will see a warning when you make the first Ractor with `Ractor.new`. I hope this feature can help programmers from thread-safety issues.
Diffstat (limited to 'thread.c')
-rw-r--r--thread.c581
1 files changed, 374 insertions, 207 deletions
diff --git a/thread.c b/thread.c
index 141df11e5d..063d96045e 100644
--- a/thread.c
+++ b/thread.c
@@ -92,6 +92,9 @@
#include "ruby/thread_native.h"
#include "timev.h"
#include "vm_core.h"
+#include "ractor.h"
+#include "vm_debug.h"
+#include "vm_sync.h"
#ifndef USE_NATIVE_THREAD_PRIORITY
#define USE_NATIVE_THREAD_PRIORITY 0
@@ -133,7 +136,7 @@ static void sleep_hrtime(rb_thread_t *, rb_hrtime_t, unsigned int fl);
static void sleep_forever(rb_thread_t *th, unsigned int fl);
static void rb_thread_sleep_deadly_allow_spurious_wakeup(void);
static int rb_threadptr_dead(rb_thread_t *th);
-static void rb_check_deadlock(rb_vm_t *vm);
+static void rb_check_deadlock(rb_ractor_t *r);
static int rb_threadptr_pending_interrupt_empty_p(const rb_thread_t *th);
static const char *thread_status_name(rb_thread_t *th, int detail);
static int hrtime_update_expire(rb_hrtime_t *, const rb_hrtime_t);
@@ -167,20 +170,13 @@ static inline int blocking_region_begin(rb_thread_t *th, struct rb_blocking_regi
rb_unblock_function_t *ubf, void *arg, int fail_if_interrupted);
static inline void blocking_region_end(rb_thread_t *th, struct rb_blocking_region_buffer *region);
-#define RB_GC_SAVE_MACHINE_CONTEXT(th) \
- do { \
- FLUSH_REGISTER_WINDOWS; \
- setjmp((th)->ec->machine.regs); \
- SET_MACHINE_STACK_END(&(th)->ec->machine.stack_end); \
- } while (0)
-
#define GVL_UNLOCK_BEGIN(th) do { \
RB_GC_SAVE_MACHINE_CONTEXT(th); \
- gvl_release(th->vm);
+ gvl_release(rb_ractor_gvl(th->ractor));
#define GVL_UNLOCK_END(th) \
- gvl_acquire(th->vm, th); \
- rb_thread_set_current(th); \
+ gvl_acquire(rb_ractor_gvl(th->ractor), th); \
+ rb_ractor_thread_switch(th->ractor, th); \
} while(0)
#ifdef __GNUC__
@@ -222,12 +218,6 @@ vm_check_ints_blocking(rb_execution_context_t *ec)
return rb_threadptr_execute_interrupts(th, 1);
}
-static int
-vm_living_thread_num(const rb_vm_t *vm)
-{
- return vm->living_thread_num;
-}
-
/*
* poll() is supported by many OSes, but so far Linux is the only
* one we know of that supports using poll() in all places select()
@@ -345,7 +335,7 @@ rb_thread_s_debug_set(VALUE self, VALUE val)
#endif
NOINLINE(static int thread_start_func_2(rb_thread_t *th, VALUE *stack_start));
-static void timer_thread_function(void);
+static void timer_thread_function(rb_execution_context_t *ec);
void ruby_sigchld_handler(rb_vm_t *); /* signal.c */
static void
@@ -425,11 +415,13 @@ rb_thread_debug(
#include "thread_sync.c"
void
-rb_vm_gvl_destroy(rb_vm_t *vm)
+rb_vm_gvl_destroy(rb_global_vm_lock_t *gvl)
{
- gvl_release(vm);
- gvl_destroy(vm);
+ gvl_release(gvl);
+ gvl_destroy(gvl);
+
if (0) {
+ rb_vm_t *vm = GET_VM();
/* may be held by running threads */
rb_native_mutex_destroy(&vm->waitpid_lock);
rb_native_mutex_destroy(&vm->workqueue_lock);
@@ -498,6 +490,7 @@ static void
rb_threadptr_interrupt_common(rb_thread_t *th, int trap)
{
rb_native_mutex_lock(&th->interrupt_lock);
+
if (trap) {
RUBY_VM_SET_TRAP_INTERRUPT(th->ec);
}
@@ -526,12 +519,12 @@ threadptr_trap_interrupt(rb_thread_t *th)
}
static void
-terminate_all(rb_vm_t *vm, const rb_thread_t *main_thread)
+terminate_all(rb_ractor_t *r, const rb_thread_t *main_thread)
{
rb_thread_t *th = 0;
- list_for_each(&vm->living_threads, th, vmlt_node) {
- if (th != main_thread) {
+ list_for_each(&r->threads.set, th, lt_node) {
+ if (th != main_thread) {
thread_debug("terminate_all: begin (thid: %"PRI_THREAD_ID", status: %s)\n",
thread_id_str(th), thread_status_name(th, TRUE));
rb_threadptr_pending_interrupt_enque(th, eTerminateSignal);
@@ -567,12 +560,12 @@ rb_thread_terminate_all(void)
{
rb_thread_t *volatile th = GET_THREAD(); /* main thread */
rb_execution_context_t * volatile ec = th->ec;
- rb_vm_t *volatile vm = th->vm;
+ rb_ractor_t *r = th->ractor;
volatile int sleeping = 0;
- if (vm->main_thread != th) {
- rb_bug("rb_thread_terminate_all: called by child thread (%p, %p)",
- (void *)vm->main_thread, (void *)th);
+ if (r->threads.main != th) {
+ rb_bug("rb_thread_terminate_all: called by child thread (%p, %p)",
+ (void *)r->threads.main, (void *)th);
}
/* unlock all locking mutexes */
@@ -582,11 +575,11 @@ rb_thread_terminate_all(void)
if (EC_EXEC_TAG() == TAG_NONE) {
retry:
thread_debug("rb_thread_terminate_all (main thread: %p)\n", (void *)th);
- terminate_all(vm, th);
+ terminate_all(th->ractor, th);
- while (vm_living_thread_num(vm) > 1) {
+ while (rb_ractor_living_thread_num(th->ractor) > 1) {
rb_hrtime_t rel = RB_HRTIME_PER_SEC;
- /*
+ /*q
* Thread exiting routine in thread_start_func_2 notify
* me when the last sub-thread exit.
*/
@@ -669,26 +662,43 @@ rb_vm_proc_local_ep(VALUE proc)
}
}
-static void
-thread_do_start(rb_thread_t *th)
-{
- native_set_thread_name(th);
+// for ractor, defined in vm.c
+VALUE rb_vm_invoke_proc_with_self(rb_execution_context_t *ec, rb_proc_t *proc, VALUE self,
+ int argc, const VALUE *argv, int kw_splat, VALUE passed_block_handler);
- if (th->invoke_type == thread_invoke_type_proc) {
- VALUE args = th->invoke_arg.proc.args;
- int args_len = (int)RARRAY_LEN(args);
- const VALUE *args_ptr;
- VALUE procval = th->invoke_arg.proc.proc;
- rb_proc_t *proc;
- GetProcPtr(procval, proc);
-
- th->ec->errinfo = Qnil;
- th->ec->root_lep = rb_vm_proc_local_ep(procval);
- th->ec->root_svar = Qfalse;
-
- EXEC_EVENT_HOOK(th->ec, RUBY_EVENT_THREAD_BEGIN, th->self, 0, 0, 0, Qundef);
+static void
+thread_do_start_proc(rb_thread_t *th)
+{
+ VALUE args = th->invoke_arg.proc.args;
+ const VALUE *args_ptr;
+ int args_len;
+ VALUE procval = th->invoke_arg.proc.proc;
+ rb_proc_t *proc;
+ GetProcPtr(procval, proc);
+
+ th->ec->errinfo = Qnil;
+ th->ec->root_lep = rb_vm_proc_local_ep(procval);
+ th->ec->root_svar = Qfalse;
+
+ EXEC_EVENT_HOOK(th->ec, RUBY_EVENT_THREAD_BEGIN, th->self, 0, 0, 0, Qundef);
+ vm_check_ints_blocking(th->ec);
+
+ if (th->invoke_type == thread_invoke_type_ractor_proc) {
+ VALUE self = rb_ractor_self(th->ractor);
+ VM_ASSERT(FIXNUM_P(args));
+ args_len = FIX2INT(args);
+ args_ptr = ALLOCA_N(VALUE, args_len);
+ rb_ractor_recv_parameters(th->ec, th->ractor, args_len, (VALUE *)args_ptr);
vm_check_ints_blocking(th->ec);
+ // kick thread
+ th->value = rb_vm_invoke_proc_with_self(th->ec, proc, self,
+ args_len, args_ptr,
+ th->invoke_arg.proc.kw_splat,
+ VM_BLOCK_HANDLER_NONE);
+ }
+ else {
+ args_len = RARRAY_LENINT(args);
if (args_len < 8) {
/* free proc.args if the length is enough small */
args_ptr = ALLOCA_N(VALUE, args_len);
@@ -700,15 +710,36 @@ thread_do_start(rb_thread_t *th)
}
vm_check_ints_blocking(th->ec);
+
+ // kick thread
th->value = rb_vm_invoke_proc(th->ec, proc,
args_len, args_ptr,
th->invoke_arg.proc.kw_splat,
VM_BLOCK_HANDLER_NONE);
+ }
+
+ EXEC_EVENT_HOOK(th->ec, RUBY_EVENT_THREAD_END, th->self, 0, 0, 0, Qundef);
- EXEC_EVENT_HOOK(th->ec, RUBY_EVENT_THREAD_END, th->self, 0, 0, 0, Qundef);
+ if (th->invoke_type == thread_invoke_type_ractor_proc) {
+ rb_ractor_atexit(th->ec, th->value);
}
- else {
+}
+
+static void
+thread_do_start(rb_thread_t *th)
+{
+ native_set_thread_name(th);
+
+ switch (th->invoke_type) {
+ case thread_invoke_type_proc:
+ case thread_invoke_type_ractor_proc:
+ thread_do_start_proc(th);
+ break;
+ case thread_invoke_type_func:
th->value = (*th->invoke_arg.func.func)(th->invoke_arg.func.arg);
+ break;
+ case thread_invoke_type_none:
+ rb_bug("unreachable");
}
VALUE scheduler = th->scheduler;
@@ -725,32 +756,40 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start)
STACK_GROW_DIR_DETECTION;
enum ruby_tag_type state;
rb_thread_list_t *join_list;
- rb_thread_t *main_th;
VALUE errinfo = Qnil;
size_t size = th->vm->default_params.thread_vm_stack_size / sizeof(VALUE);
+ rb_thread_t *ractor_main_th = th->ractor->threads.main;
VALUE * vm_stack = NULL;
- if (th == th->vm->main_thread) {
- rb_bug("thread_start_func_2 must not be used for main thread");
+ VM_ASSERT(th != th->vm->ractor.main_thread);
+ thread_debug("thread start: %p\n", (void *)th);
+
+ // setup native thread
+ gvl_acquire(rb_ractor_gvl(th->ractor), th);
+ ruby_thread_set_native(th);
+
+ // setup ractor
+ if (rb_ractor_status_p(th->ractor, ractor_blocking)) {
+ RB_VM_LOCK();
+ {
+ rb_vm_ractor_blocking_cnt_dec(th->vm, th->ractor, __FILE__, __LINE__);
+ }
+ RB_VM_UNLOCK();
}
- thread_debug("thread start: %p\n", (void *)th);
- VM_ASSERT((size * sizeof(VALUE)) <= th->ec->machine.stack_maxsize);
+ // This assertion is not passed on win32 env. Check it later.
+ // VM_ASSERT((size * sizeof(VALUE)) <= th->ec->machine.stack_maxsize);
+ // setup VM and machine stack
vm_stack = alloca(size * sizeof(VALUE));
VM_ASSERT(vm_stack);
- gvl_acquire(th->vm, th);
-
rb_ec_initialize_vm_stack(th->ec, vm_stack, size);
th->ec->machine.stack_start = STACK_DIR_UPPER(vm_stack + size, vm_stack);
th->ec->machine.stack_maxsize -= size * sizeof(VALUE);
- ruby_thread_set_native(th);
-
{
thread_debug("thread start (get lock): %p\n", (void *)th);
- rb_thread_set_current(th);
EC_PUSH_TAG(th->ec);
if ((state = EC_EXEC_TAG()) == TAG_NONE) {
@@ -758,14 +797,19 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start)
}
else {
errinfo = th->ec->errinfo;
- if (state == TAG_FATAL) {
+
+ if (state == TAG_FATAL) {
/* fatal error within this thread, need to stop whole script */
}
else if (rb_obj_is_kind_of(errinfo, rb_eSystemExit)) {
/* exit on main_thread. */
}
else {
- if (th->report_on_exception) {
+ if (th->invoke_type == thread_invoke_type_ractor_proc) {
+ rb_ractor_atexit_exception(th->ec);
+ }
+
+ if (th->report_on_exception) {
VALUE mesg = rb_thread_to_s(th->self);
rb_str_cat_cstr(mesg, " terminated with exception (report_on_exception is true):\n");
rb_write_error_str(mesg);
@@ -782,16 +826,20 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start)
th->value = Qnil;
}
+ if (th->invoke_type == thread_invoke_type_ractor_proc) {
+ rb_ractor_teardown(th->ec);
+ }
+
th->status = THREAD_KILLED;
thread_debug("thread end: %p\n", (void *)th);
- main_th = th->vm->main_thread;
- if (main_th == th) {
+ if (th->vm->ractor.main_thread == th) {
ruby_stop(0);
}
- if (RB_TYPE_P(errinfo, T_OBJECT)) {
+
+ if (RB_TYPE_P(errinfo, T_OBJECT)) {
/* treat with normal error object */
- rb_threadptr_raise(main_th, 1, &errinfo);
+ rb_threadptr_raise(ractor_main_th, 1, &errinfo);
}
EC_POP_TAG();
@@ -803,11 +851,10 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start)
(void *)th, th->locking_mutex);
}
- /* delete self other than main thread from living_threads */
- rb_vm_living_threads_remove(th->vm, th);
- if (main_th->status == THREAD_KILLED && rb_thread_alone()) {
+ if (ractor_main_th->status == THREAD_KILLED &&
+ th->ractor->threads.cnt <= 2 /* main thread and this thread */) {
/* I'm last thread. wake up main thread from rb_thread_terminate_all */
- rb_threadptr_interrupt(main_th);
+ rb_threadptr_interrupt(ractor_main_th);
}
/* wake up joining threads */
@@ -823,7 +870,7 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start)
}
rb_threadptr_unlock_all_locking_mutexes(th);
- rb_check_deadlock(th->vm);
+ rb_check_deadlock(th->ractor);
rb_fiber_close(th->ec->fiber_ptr);
}
@@ -831,15 +878,40 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start)
thread_cleanup_func(th, FALSE);
VM_ASSERT(th->ec->vm_stack == NULL);
- gvl_release(th->vm);
+ if (th->invoke_type == thread_invoke_type_ractor_proc) {
+ // after rb_ractor_living_threads_remove()
+ // GC will happen anytime and this ractor can be collected (and destroy GVL).
+ // So gvl_release() should be before it.
+ gvl_release(rb_ractor_gvl(th->ractor));
+ rb_ractor_living_threads_remove(th->ractor, th);
+ }
+ else {
+ rb_ractor_living_threads_remove(th->ractor, th);
+ gvl_release(rb_ractor_gvl(th->ractor));
+ }
return 0;
}
+struct thread_create_params {
+ enum thread_invoke_type type;
+
+ // for normal proc thread
+ VALUE args;
+ VALUE proc;
+
+ // for ractor
+ rb_ractor_t *g;
+
+ // for func
+ VALUE (*fn)(void *);
+};
+
static VALUE
-thread_create_core(VALUE thval, VALUE args, VALUE (*fn)(void *))
+thread_create_core(VALUE thval, struct thread_create_params *params)
{
- rb_thread_t *th = rb_thread_ptr(thval), *current_th = GET_THREAD();
+ rb_execution_context_t *ec = GET_EC();
+ rb_thread_t *th = rb_thread_ptr(thval), *current_th = rb_ec_thread_ptr(ec);
int err;
if (OBJ_FROZEN(current_th->thgroup)) {
@@ -847,17 +919,35 @@ thread_create_core(VALUE thval, VALUE args, VALUE (*fn)(void *))
"can't start a new thread (frozen ThreadGroup)");
}
- if (fn) {
- th->invoke_type = thread_invoke_type_func;
- th->invoke_arg.func.func = fn;
- th->invoke_arg.func.arg = (void *)args;
- }
- else {
- (void)RARRAY_LENINT(args);
+ switch (params->type) {
+ case thread_invoke_type_proc:
th->invoke_type = thread_invoke_type_proc;
- th->invoke_arg.proc.proc = rb_block_proc();
- th->invoke_arg.proc.args = args;
+ th->invoke_arg.proc.args = params->args;
+ th->invoke_arg.proc.proc = params->proc;
+ th->invoke_arg.proc.kw_splat = rb_keyword_given_p();
+ break;
+
+ case thread_invoke_type_ractor_proc:
+#if RACTOR_CHECK_MODE > 0
+ rb_ractor_setup_belonging_to(thval, rb_ractor_id(params->g));
+#endif
+ th->invoke_type = thread_invoke_type_ractor_proc;
+ th->ractor = params->g;
+ th->ractor->threads.main = th;
+ th->invoke_arg.proc.proc = rb_proc_isolate_bang(params->proc);
+ th->invoke_arg.proc.args = INT2FIX(RARRAY_LENINT(params->args));
th->invoke_arg.proc.kw_splat = rb_keyword_given_p();
+ rb_ractor_send_parameters(ec, params->g, params->args);
+ break;
+
+ case thread_invoke_type_func:
+ th->invoke_type = thread_invoke_type_func;
+ th->invoke_arg.func.func = params->fn;
+ th->invoke_arg.func.arg = (void *)params->args;
+ break;
+
+ default:
+ rb_bug("unreachable");
}
th->priority = current_th->priority;
@@ -870,13 +960,17 @@ thread_create_core(VALUE thval, VALUE args, VALUE (*fn)(void *))
rb_native_mutex_initialize(&th->interrupt_lock);
+ RUBY_DEBUG_LOG("r:%u th:%p", th->ractor->id, th);
+
+ rb_ractor_living_threads_insert(th->ractor, th);
+
/* kick thread */
err = native_thread_create(th);
if (err) {
th->status = THREAD_KILLED;
- rb_raise(rb_eThreadError, "can't create Thread: %s", strerror(err));
+ rb_ractor_living_threads_remove(th->ractor, th);
+ rb_raise(rb_eThreadError, "can't create Thread: %s", strerror(err));
}
- rb_vm_living_threads_insert(th->vm, th);
return thval;
}
@@ -908,8 +1002,9 @@ thread_s_new(int argc, VALUE *argv, VALUE klass)
rb_thread_t *th;
VALUE thread = rb_thread_alloc(klass);
- if (GET_VM()->main_thread->status == THREAD_KILLED)
- rb_raise(rb_eThreadError, "can't alloc thread");
+ if (GET_RACTOR()->threads.main->status == THREAD_KILLED) {
+ rb_raise(rb_eThreadError, "can't alloc thread");
+ }
rb_obj_call_init_kw(thread, argc, argv, RB_PASS_CALLED_KEYWORDS);
th = rb_thread_ptr(thread);
@@ -933,7 +1028,12 @@ thread_s_new(int argc, VALUE *argv, VALUE klass)
static VALUE
thread_start(VALUE klass, VALUE args)
{
- return thread_create_core(rb_thread_alloc(klass), args, 0);
+ struct thread_create_params params = {
+ .type = thread_invoke_type_proc,
+ .args = args,
+ .proc = rb_block_proc(),
+ };
+ return thread_create_core(rb_thread_alloc(klass), &params);
}
static VALUE
@@ -968,14 +1068,36 @@ thread_initialize(VALUE thread, VALUE args)
}
}
else {
- return thread_create_core(thread, args, 0);
+ struct thread_create_params params = {
+ .type = thread_invoke_type_proc,
+ .args = args,
+ .proc = rb_block_proc(),
+ };
+ return thread_create_core(thread, &params);
}
}
VALUE
rb_thread_create(VALUE (*fn)(void *), void *arg)
{
- return thread_create_core(rb_thread_alloc(rb_cThread), (VALUE)arg, fn);
+ struct thread_create_params params = {
+ .type = thread_invoke_type_func,
+ .fn = fn,
+ .args = (VALUE)arg,
+ };
+ return thread_create_core(rb_thread_alloc(rb_cThread), &params);
+}
+
+VALUE
+rb_thread_create_ractor(rb_ractor_t *g, VALUE args, VALUE proc)
+{
+ struct thread_create_params params = {
+ .type = thread_invoke_type_ractor_proc,
+ .g = g,
+ .args = args,
+ .proc = proc,
+ };
+ return thread_create_core(rb_thread_alloc(rb_cThread), &params);
}
@@ -1019,10 +1141,10 @@ thread_join_sleep(VALUE arg)
while (target_th->status != THREAD_KILLED) {
if (!p->limit) {
th->status = THREAD_STOPPED_FOREVER;
- th->vm->sleeper++;
- rb_check_deadlock(th->vm);
+ rb_ractor_sleeper_threads_inc(th->ractor);
+ rb_check_deadlock(th->ractor);
native_sleep(th, 0);
- th->vm->sleeper--;
+ rb_ractor_sleeper_threads_dec(th->ractor);
}
else {
if (hrtime_update_expire(p->limit, end)) {
@@ -1050,7 +1172,7 @@ thread_join(rb_thread_t *target_th, rb_hrtime_t *rel)
if (th == target_th) {
rb_raise(rb_eThreadError, "Target thread must not be current thread");
}
- if (GET_VM()->main_thread == target_th) {
+ if (th->ractor->threads.main == target_th) {
rb_raise(rb_eThreadError, "Target thread must not be main thread");
}
@@ -1266,12 +1388,12 @@ sleep_forever(rb_thread_t *th, unsigned int fl)
RUBY_VM_CHECK_INTS_BLOCKING(th->ec);
while (th->status == status) {
if (fl & SLEEP_DEADLOCKABLE) {
- th->vm->sleeper++;
- rb_check_deadlock(th->vm);
+ rb_ractor_sleeper_threads_inc(th->ractor);
+ rb_check_deadlock(th->ractor);
}
native_sleep(th, 0);
if (fl & SLEEP_DEADLOCKABLE) {
- th->vm->sleeper--;
+ rb_ractor_sleeper_threads_dec(th->ractor);
}
woke = vm_check_ints_blocking(th->ec);
if (woke && !(fl & SLEEP_SPURIOUS_CHECK))
@@ -1417,8 +1539,8 @@ rb_thread_schedule_limits(uint32_t limits_us)
if (th->running_time_us >= limits_us) {
thread_debug("rb_thread_schedule/switch start\n");
RB_GC_SAVE_MACHINE_CONTEXT(th);
- gvl_yield(th->vm, th);
- rb_thread_set_current(th);
+ gvl_yield(rb_ractor_gvl(th->ractor), th);
+ rb_ractor_thread_switch(th->ractor, th);
thread_debug("rb_thread_schedule/switch done\n");
}
}
@@ -1441,9 +1563,10 @@ blocking_region_begin(rb_thread_t *th, struct rb_blocking_region_buffer *region,
if (unblock_function_set(th, ubf, arg, fail_if_interrupted)) {
th->blocking_region_buffer = region;
th->status = THREAD_STOPPED;
+ rb_ractor_blocking_threads_inc(th->ractor, __FILE__, __LINE__);
thread_debug("enter blocking region (%p)\n", (void *)th);
RB_GC_SAVE_MACHINE_CONTEXT(th);
- gvl_release(th->vm);
+ gvl_release(rb_ractor_gvl(th->ractor));
return TRUE;
}
else {
@@ -1459,10 +1582,12 @@ blocking_region_end(rb_thread_t *th, struct rb_blocking_region_buffer *region)
/* entry to ubf_list impossible at this point, so unregister is safe: */
unregister_ubf_list(th);
- gvl_acquire(th->vm, th);
- rb_thread_set_current(th);
+ gvl_acquire(rb_ractor_gvl(th->ractor), th);
+ rb_ractor_thread_switch(th->ractor, th);
+
thread_debug("leave blocking region (%p)\n", (void *)th);
th->blocking_region_buffer = 0;
+ rb_ractor_blocking_threads_dec(th->ractor, __FILE__, __LINE__);
if (th->status == THREAD_STOPPED) {
th->status = region->prev_status;
}
@@ -1484,7 +1609,7 @@ rb_nogvl(void *(*func)(void *), void *data1,
ubf = ubf_select;
data2 = th;
}
- else if (ubf && vm_living_thread_num(th->vm) == 1) {
+ else if (ubf && rb_ractor_living_thread_num(th->ractor) == 1) {
if (flags & RB_NOGVL_UBF_ASYNC_SAFE) {
th->vm->ubf_async_safe = 1;
}
@@ -1631,7 +1756,12 @@ rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd)
wfd.fd = fd;
wfd.th = rb_ec_thread_ptr(ec);
- list_add(&rb_ec_vm_ptr(ec)->waiting_fds, &wfd.wfd_node);
+
+ RB_VM_LOCK_ENTER();
+ {
+ list_add(&rb_ec_vm_ptr(ec)->waiting_fds, &wfd.wfd_node);
+ }
+ RB_VM_LOCK_LEAVE();
EC_PUSH_TAG(ec);
if ((state = EC_EXEC_TAG()) == TAG_NONE) {
@@ -1646,7 +1776,11 @@ rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd)
* must be deleted before jump
* this will delete either from waiting_fds or on-stack LIST_HEAD(busy)
*/
- list_del(&wfd.wfd_node);
+ RB_VM_LOCK_ENTER();
+ {
+ list_del(&wfd.wfd_node);
+ }
+ RB_VM_LOCK_LEAVE();
if (state) {
EC_JUMP_TAG(ec, state);
@@ -1700,7 +1834,7 @@ rb_thread_call_with_gvl(void *(*func)(void *), void *data1)
* because this thread is not Ruby's thread.
* What should we do?
*/
-
+ bp();
fprintf(stderr, "[BUG] rb_thread_call_with_gvl() is called by non-ruby thread\n");
exit(EXIT_FAILURE);
}
@@ -2233,18 +2367,25 @@ rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing)
int timer_interrupt;
int pending_interrupt;
int trap_interrupt;
+ int terminate_interrupt;
timer_interrupt = interrupt & TIMER_INTERRUPT_MASK;
pending_interrupt = interrupt & PENDING_INTERRUPT_MASK;
postponed_job_interrupt = interrupt & POSTPONED_JOB_INTERRUPT_MASK;
trap_interrupt = interrupt & TRAP_INTERRUPT_MASK;
+ terminate_interrupt = interrupt & TERMINATE_INTERRUPT_MASK; // request from other ractors
+
+ if (interrupt & VM_BARRIER_INTERRUPT_MASK) {
+ RB_VM_LOCK_ENTER();
+ RB_VM_LOCK_LEAVE();
+ }
if (postponed_job_interrupt) {
rb_postponed_job_flush(th->vm);
}
/* signal handling */
- if (trap_interrupt && (th == th->vm->main_thread)) {
+ if (trap_interrupt && (th == th->vm->ractor.main_thread)) {
enum rb_thread_status prev_status = th->status;
int sigwait_fd = rb_sigwait_fd_get(th);
@@ -2273,7 +2414,7 @@ rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing)
else if (err == eKillSignal /* Thread#kill received */ ||
err == eTerminateSignal /* Terminate thread */ ||
err == INT2FIX(TAG_FATAL) /* Thread.exit etc. */ ) {
- rb_threadptr_to_kill(th);
+ terminate_interrupt = 1;
}
else {
if (err == th->vm->special_exceptions[ruby_error_stream_closed]) {
@@ -2288,7 +2429,11 @@ rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing)
}
}
- if (timer_interrupt) {
+ if (terminate_interrupt) {
+ rb_threadptr_to_kill(th);
+ }
+
+ if (timer_interrupt) {
uint32_t limits_us = TIME_QUANTUM_USEC;
if (th->priority > 0)
@@ -2356,7 +2501,7 @@ rb_threadptr_signal_raise(rb_thread_t *th, int sig)
argv[0] = rb_eSignal;
argv[1] = INT2FIX(sig);
- rb_threadptr_raise(th->vm->main_thread, 2, argv);
+ rb_threadptr_raise(th->vm->ractor.main_thread, 2, argv);
}
void
@@ -2366,7 +2511,9 @@ rb_threadptr_signal_exit(rb_thread_t *th)
argv[0] = rb_eSystemExit;
argv[1] = rb_str_new2("exit");
- rb_threadptr_raise(th->vm->main_thread, 2, argv);
+
+ // TODO: check signal raise deliverly
+ rb_threadptr_raise(th->vm->ractor.main_thread, 2, argv);
}
int
@@ -2395,19 +2542,24 @@ rb_notify_fd_close(int fd, struct list_head *busy)
rb_vm_t *vm = GET_THREAD()->vm;
struct waiting_fd *wfd = 0, *next;
- list_for_each_safe(&vm->waiting_fds, wfd, next, wfd_node) {
- if (wfd->fd == fd) {
- rb_thread_t *th = wfd->th;
- VALUE err;
+ RB_VM_LOCK_ENTER();
+ {
+ list_for_each_safe(&vm->waiting_fds, wfd, next, wfd_node) {
+ if (wfd->fd == fd) {
+ rb_thread_t *th = wfd->th;
+ VALUE err;
- list_del(&wfd->wfd_node);
- list_add(busy, &wfd->wfd_node);
+ list_del(&wfd->wfd_node);
+ list_add(busy, &wfd->wfd_node);
- err = th->vm->special_exceptions[ruby_error_stream_closed];
- rb_threadptr_pending_interrupt_enque(th, err);
- rb_threadptr_interrupt(th);
- }
+ err = th->vm->special_exceptions[ruby_error_stream_closed];
+ rb_threadptr_pending_interrupt_enque(th, err);
+ rb_threadptr_interrupt(th);
+ }
+ }
}
+ RB_VM_LOCK_LEAVE();
+
return !list_empty(busy);
}
@@ -2479,7 +2631,7 @@ rb_thread_kill(VALUE thread)
if (th->to_kill || th->status == THREAD_KILLED) {
return thread;
}
- if (th == th->vm->main_thread) {
+ if (th == th->vm->ractor.main_thread) {
rb_exit(EXIT_SUCCESS);
}
@@ -2658,21 +2810,8 @@ thread_stop(VALUE _)
VALUE
rb_thread_list(void)
{
- VALUE ary = rb_ary_new();
- rb_vm_t *vm = GET_THREAD()->vm;
- rb_thread_t *th = 0;
-
- list_for_each(&vm->living_threads, th, vmlt_node) {
- switch (th->status) {
- case THREAD_RUNNABLE:
- case THREAD_STOPPED:
- case THREAD_STOPPED_FOREVER:
- rb_ary_push(ary, th->self);
- default:
- break;
- }
- }
- return ary;
+ // TODO
+ return rb_ractor_thread_list(GET_RACTOR());
}
/*
@@ -2725,7 +2864,7 @@ thread_s_current(VALUE klass)
VALUE
rb_thread_main(void)
{
- return GET_THREAD()->vm->main_thread->self;
+ return GET_RACTOR()->threads.main->self;
}
/*
@@ -3521,7 +3660,8 @@ thread_keys_i(ID key, VALUE value, void *ary)
int
rb_thread_alone(void)
{
- return vm_living_thread_num(GET_VM()) == 1;
+ // TODO
+ return rb_ractor_living_thread_num(GET_RACTOR()) == 1;
}
/*
@@ -4248,7 +4388,13 @@ rb_wait_for_single_fd(int fd, int events, struct timeval *timeout)
wfd.th = GET_THREAD();
wfd.fd = fd;
- list_add(&wfd.th->vm->waiting_fds, &wfd.wfd_node);
+
+ RB_VM_LOCK_ENTER();
+ {
+ list_add(&wfd.th->vm->waiting_fds, &wfd.wfd_node);
+ }
+ RB_VM_LOCK_LEAVE();
+
EC_PUSH_TAG(wfd.th->ec);
if ((state = EC_EXEC_TAG()) == TAG_NONE) {
RUBY_VM_CHECK_INTS_BLOCKING(wfd.th->ec);
@@ -4401,7 +4547,12 @@ rb_wait_for_single_fd(int fd, int events, struct timeval *timeout)
args.wfd.fd = fd;
args.wfd.th = GET_THREAD();
- list_add(&args.wfd.th->vm->waiting_fds, &args.wfd.wfd_node);
+ RB_VM_LOCK_ENTER();
+ {
+ list_add(&args.wfd.th->vm->waiting_fds, &args.wfd.wfd_node);
+ }
+ RB_VM_LOCK_LEAVE();
+
r = (int)rb_ensure(select_single, ptr, select_single_cleanup, ptr);
if (r == -1)
errno = args.as.error;
@@ -4438,14 +4589,12 @@ rb_threadptr_check_signal(rb_thread_t *mth)
}
static void
-timer_thread_function(void)
+timer_thread_function(rb_execution_context_t *ec)
{
- volatile rb_execution_context_t *ec;
-
- /* for time slice */
- ec = ACCESS_ONCE(rb_execution_context_t *,
- ruby_current_execution_context_ptr);
- if (ec) RUBY_VM_SET_TIMER_INTERRUPT(ec);
+ // strictly speaking, accessing gvl->owner is not thread-safe
+ if (ec) {
+ RUBY_VM_SET_TIMER_INTERRUPT(ec);
+ }
}
static void
@@ -4516,11 +4665,13 @@ check_signals_nogvl(rb_thread_t *th, int sigwait_fd)
ubf_wakeup_all_threads();
ruby_sigchld_handler(vm);
if (rb_signal_buff_size()) {
- if (th == vm->main_thread)
+ if (th == vm->ractor.main_thread) {
/* no need to lock + wakeup if already in main thread */
RUBY_VM_SET_TRAP_INTERRUPT(th->ec);
- else
- threadptr_trap_interrupt(vm->main_thread);
+ }
+ else {
+ threadptr_trap_interrupt(vm->ractor.main_thread);
+ }
ret = TRUE; /* for SIGCHLD_LOSSY && rb_sigwait_sleep */
}
return ret;
@@ -4592,16 +4743,28 @@ rb_thread_atfork_internal(rb_thread_t *th, void (*atfork)(rb_thread_t *, const r
{
rb_thread_t *i = 0;
rb_vm_t *vm = th->vm;
- vm->main_thread = th;
+ rb_ractor_t *r = th->ractor;
+ vm->ractor.main_ractor = r;
+ vm->ractor.main_thread = th;
+ r->threads.main = th;
+ r->status_ = ractor_created;
- gvl_atfork(th->vm);
+ gvl_atfork(rb_ractor_gvl(th->ractor));
ubf_list_atfork();
- list_for_each(&vm->living_threads, i, vmlt_node) {
- atfork(i, th);
+ // OK. Only this thread accesses:
+ list_for_each(&vm->ractor.set, r, vmlr_node) {
+ list_for_each(&r->threads.set, i, lt_node) {
+ atfork(i, th);
+ }
}
rb_vm_living_threads_init(vm);
- rb_vm_living_threads_insert(vm, th);
+
+ // threads
+ vm->ractor.cnt = 0;
+ rb_ractor_living_threads_init(th->ractor);
+ rb_ractor_living_threads_insert(th->ractor, th);
+
/* may be held by MJIT threads in parent */
rb_native_mutex_initialize(&vm->waitpid_lock);
@@ -4611,9 +4774,10 @@ rb_thread_atfork_internal(rb_thread_t *th, void (*atfork)(rb_thread_t *, const r
rb_native_mutex_initialize(&th->interrupt_lock);
vm->fork_gen++;
-
- vm->sleeper = 0;
+ rb_ractor_sleeper_threads_clear(th->ractor);
rb_clear_coverages();
+
+ VM_ASSERT(vm->ractor.cnt == 1);
}
static void
@@ -4730,11 +4894,11 @@ static VALUE
thgroup_list(VALUE group)
{
VALUE ary = rb_ary_new();
- rb_vm_t *vm = GET_THREAD()->vm;
rb_thread_t *th = 0;
+ rb_ractor_t *r = GET_RACTOR();
- list_for_each(&vm->living_threads, th, vmlt_node) {
- if (th->thgroup == group) {
+ list_for_each(&r->threads.set, th, lt_node) {
+ if (th->thgroup == group) {
rb_ary_push(ary, th->self);
}
}
@@ -5364,7 +5528,7 @@ Init_Thread(void)
rb_define_method(cThGroup, "add", thgroup_add, 1);
{
- th->thgroup = th->vm->thgroup_default = rb_obj_alloc(cThGroup);
+ th->thgroup = th->ractor->thgroup_default = rb_obj_alloc(cThGroup);
rb_define_const(cThGroup, "Default", th->thgroup);
}
@@ -5376,8 +5540,8 @@ Init_Thread(void)
/* main thread setting */
{
/* acquire global vm lock */
- gvl_init(th->vm);
- gvl_acquire(th->vm, th);
+ rb_global_vm_lock_t *gvl = rb_ractor_gvl(th->ractor);
+ gvl_acquire(gvl, th);
rb_native_mutex_initialize(&th->vm->waitpid_lock);
rb_native_mutex_initialize(&th->vm->workqueue_lock);
rb_native_mutex_initialize(&th->interrupt_lock);
@@ -5390,9 +5554,6 @@ Init_Thread(void)
rb_thread_create_timer_thread();
- /* suppress warnings on cygwin, mingw and mswin.*/
- (void)native_mutex_trylock;
-
Init_thread_sync();
}
@@ -5405,67 +5566,73 @@ ruby_native_thread_p(void)
}
static void
-debug_deadlock_check(rb_vm_t *vm, VALUE msg)
+debug_deadlock_check(rb_ractor_t *r, VALUE msg)
{
rb_thread_t *th = 0;
VALUE sep = rb_str_new_cstr("\n ");
rb_str_catf(msg, "\n%d threads, %d sleeps current:%p main thread:%p\n",
- vm_living_thread_num(vm), vm->sleeper, (void *)GET_THREAD(), (void *)vm->main_thread);
- list_for_each(&vm->living_threads, th, vmlt_node) {
- rb_str_catf(msg, "* %+"PRIsVALUE"\n rb_thread_t:%p "
- "native:%"PRI_THREAD_ID" int:%u",
- th->self, (void *)th, thread_id_str(th), th->ec->interrupt_flag);
- if (th->locking_mutex) {
- rb_mutex_t *mutex = mutex_ptr(th->locking_mutex);
- rb_str_catf(msg, " mutex:%p cond:%"PRIuSIZE,
- (void *)mutex->th, rb_mutex_num_waiting(mutex));
- }
- {
- rb_thread_list_t *list = th->join_list;
- while (list) {
- rb_str_catf(msg, "\n depended by: tb_thread_id:%p", (void *)list->th);
- list = list->next;
- }
- }
- rb_str_catf(msg, "\n ");
- rb_str_concat(msg, rb_ary_join(rb_ec_backtrace_str_ary(th->ec, 0, 0), sep));
- rb_str_catf(msg, "\n");
+ rb_ractor_living_thread_num(r), rb_ractor_sleeper_thread_num(r),
+ (void *)GET_THREAD(), (void *)r->threads.main);
+
+ list_for_each(&r->threads.set, th, lt_node) {
+ rb_str_catf(msg, "* %+"PRIsVALUE"\n rb_thread_t:%p "
+ "native:%"PRI_THREAD_ID" int:%u",
+ th->self, (void *)th, thread_id_str(th), th->ec->interrupt_flag);
+
+ if (th->locking_mutex) {
+ rb_mutex_t *mutex = mutex_ptr(th->locking_mutex);
+ rb_str_catf(msg, " mutex:%p cond:%"PRIuSIZE,
+ (void *)mutex->th, rb_mutex_num_waiting(mutex));
+ }
+
+ {
+ rb_thread_list_t *list = th->join_list;
+ while (list) {
+ rb_str_catf(msg, "\n depended by: tb_thread_id:%p", (void *)list->th);
+ list = list->next;
+ }
+ }
+ rb_str_catf(msg, "\n ");
+ rb_str_concat(msg, rb_ary_join(rb_ec_backtrace_str_ary(th->ec, 0, 0), sep));
+ rb_str_catf(msg, "\n");
}
}
static void
-rb_check_deadlock(rb_vm_t *vm)
+rb_check_deadlock(rb_ractor_t *r)
{
int found = 0;
- rb_thread_t *th = 0;
+ rb_thread_t *th = NULL;
+ int sleeper_num = rb_ractor_sleeper_thread_num(r);
+ int ltnum = rb_ractor_living_thread_num(r);
- if (vm_living_thread_num(vm) > vm->sleeper) return;
- if (vm_living_thread_num(vm) < vm->sleeper) rb_bug("sleeper must not be more than vm_living_thread_num(vm)");
+ if (ltnum > sleeper_num) return;
+ if (ltnum < sleeper_num) rb_bug("sleeper must not be more than vm_living_thread_num(vm)");
if (patrol_thread && patrol_thread != GET_THREAD()) return;
- list_for_each(&vm->living_threads, th, vmlt_node) {
- if (th->status != THREAD_STOPPED_FOREVER || RUBY_VM_INTERRUPTED(th->ec)) {
- found = 1;
- }
- else if (th->locking_mutex) {
- rb_mutex_t *mutex = mutex_ptr(th->locking_mutex);
+ list_for_each(&r->threads.set, th, lt_node) {
+ if (th->status != THREAD_STOPPED_FOREVER || RUBY_VM_INTERRUPTED(th->ec)) {
+ found = 1;
+ }
+ else if (th->locking_mutex) {
+ rb_mutex_t *mutex = mutex_ptr(th->locking_mutex);
- if (mutex->th == th || (!mutex->th && !list_empty(&mutex->waitq))) {
- found = 1;
- }
- }
- if (found)
- break;
+ if (mutex->th == th || (!mutex->th && !list_empty(&mutex->waitq))) {
+ found = 1;
+ }
+ }
+ if (found)
+ break;
}
if (!found) {
VALUE argv[2];
argv[0] = rb_eFatal;
argv[1] = rb_str_new2("No live threads left. Deadlock?");
- debug_deadlock_check(vm, argv[1]);
- vm->sleeper--;
- rb_threadptr_raise(vm->main_thread, 2, argv);
+ debug_deadlock_check(r, argv[1]);
+ rb_ractor_sleeper_threads_dec(GET_RACTOR());
+ rb_threadptr_raise(r->threads.main, 2, argv);
}
}