From 79df14c04b452411b9d17e26a398e491bca1a811 Mon Sep 17 00:00:00 2001 From: Koichi Sasada Date: Tue, 10 Mar 2020 02:22:11 +0900 Subject: Introduce Ractor mechanism for parallel execution This commit introduces Ractor mechanism to run Ruby program in parallel. See doc/ractor.md for more details about Ractor. See ticket [Feature #17100] to see the implementation details and discussions. [Feature #17100] This commit does not complete the implementation. You can find many bugs on using Ractor. Also the specification will be changed so that this feature is experimental. You will see a warning when you make the first Ractor with `Ractor.new`. I hope this feature can help programmers from thread-safety issues. --- thread.c | 581 ++++++++++++++++++++++++++++++++++++++++----------------------- 1 file changed, 374 insertions(+), 207 deletions(-) (limited to 'thread.c') diff --git a/thread.c b/thread.c index 141df11e5d..063d96045e 100644 --- a/thread.c +++ b/thread.c @@ -92,6 +92,9 @@ #include "ruby/thread_native.h" #include "timev.h" #include "vm_core.h" +#include "ractor.h" +#include "vm_debug.h" +#include "vm_sync.h" #ifndef USE_NATIVE_THREAD_PRIORITY #define USE_NATIVE_THREAD_PRIORITY 0 @@ -133,7 +136,7 @@ static void sleep_hrtime(rb_thread_t *, rb_hrtime_t, unsigned int fl); static void sleep_forever(rb_thread_t *th, unsigned int fl); static void rb_thread_sleep_deadly_allow_spurious_wakeup(void); static int rb_threadptr_dead(rb_thread_t *th); -static void rb_check_deadlock(rb_vm_t *vm); +static void rb_check_deadlock(rb_ractor_t *r); static int rb_threadptr_pending_interrupt_empty_p(const rb_thread_t *th); static const char *thread_status_name(rb_thread_t *th, int detail); static int hrtime_update_expire(rb_hrtime_t *, const rb_hrtime_t); @@ -167,20 +170,13 @@ static inline int blocking_region_begin(rb_thread_t *th, struct rb_blocking_regi rb_unblock_function_t *ubf, void *arg, int fail_if_interrupted); static inline void blocking_region_end(rb_thread_t *th, struct rb_blocking_region_buffer *region); -#define RB_GC_SAVE_MACHINE_CONTEXT(th) \ - do { \ - FLUSH_REGISTER_WINDOWS; \ - setjmp((th)->ec->machine.regs); \ - SET_MACHINE_STACK_END(&(th)->ec->machine.stack_end); \ - } while (0) - #define GVL_UNLOCK_BEGIN(th) do { \ RB_GC_SAVE_MACHINE_CONTEXT(th); \ - gvl_release(th->vm); + gvl_release(rb_ractor_gvl(th->ractor)); #define GVL_UNLOCK_END(th) \ - gvl_acquire(th->vm, th); \ - rb_thread_set_current(th); \ + gvl_acquire(rb_ractor_gvl(th->ractor), th); \ + rb_ractor_thread_switch(th->ractor, th); \ } while(0) #ifdef __GNUC__ @@ -222,12 +218,6 @@ vm_check_ints_blocking(rb_execution_context_t *ec) return rb_threadptr_execute_interrupts(th, 1); } -static int -vm_living_thread_num(const rb_vm_t *vm) -{ - return vm->living_thread_num; -} - /* * poll() is supported by many OSes, but so far Linux is the only * one we know of that supports using poll() in all places select() @@ -345,7 +335,7 @@ rb_thread_s_debug_set(VALUE self, VALUE val) #endif NOINLINE(static int thread_start_func_2(rb_thread_t *th, VALUE *stack_start)); -static void timer_thread_function(void); +static void timer_thread_function(rb_execution_context_t *ec); void ruby_sigchld_handler(rb_vm_t *); /* signal.c */ static void @@ -425,11 +415,13 @@ rb_thread_debug( #include "thread_sync.c" void -rb_vm_gvl_destroy(rb_vm_t *vm) +rb_vm_gvl_destroy(rb_global_vm_lock_t *gvl) { - gvl_release(vm); - gvl_destroy(vm); + gvl_release(gvl); + gvl_destroy(gvl); + if (0) { + rb_vm_t *vm = GET_VM(); /* may be held by running threads */ rb_native_mutex_destroy(&vm->waitpid_lock); rb_native_mutex_destroy(&vm->workqueue_lock); @@ -498,6 +490,7 @@ static void rb_threadptr_interrupt_common(rb_thread_t *th, int trap) { rb_native_mutex_lock(&th->interrupt_lock); + if (trap) { RUBY_VM_SET_TRAP_INTERRUPT(th->ec); } @@ -526,12 +519,12 @@ threadptr_trap_interrupt(rb_thread_t *th) } static void -terminate_all(rb_vm_t *vm, const rb_thread_t *main_thread) +terminate_all(rb_ractor_t *r, const rb_thread_t *main_thread) { rb_thread_t *th = 0; - list_for_each(&vm->living_threads, th, vmlt_node) { - if (th != main_thread) { + list_for_each(&r->threads.set, th, lt_node) { + if (th != main_thread) { thread_debug("terminate_all: begin (thid: %"PRI_THREAD_ID", status: %s)\n", thread_id_str(th), thread_status_name(th, TRUE)); rb_threadptr_pending_interrupt_enque(th, eTerminateSignal); @@ -567,12 +560,12 @@ rb_thread_terminate_all(void) { rb_thread_t *volatile th = GET_THREAD(); /* main thread */ rb_execution_context_t * volatile ec = th->ec; - rb_vm_t *volatile vm = th->vm; + rb_ractor_t *r = th->ractor; volatile int sleeping = 0; - if (vm->main_thread != th) { - rb_bug("rb_thread_terminate_all: called by child thread (%p, %p)", - (void *)vm->main_thread, (void *)th); + if (r->threads.main != th) { + rb_bug("rb_thread_terminate_all: called by child thread (%p, %p)", + (void *)r->threads.main, (void *)th); } /* unlock all locking mutexes */ @@ -582,11 +575,11 @@ rb_thread_terminate_all(void) if (EC_EXEC_TAG() == TAG_NONE) { retry: thread_debug("rb_thread_terminate_all (main thread: %p)\n", (void *)th); - terminate_all(vm, th); + terminate_all(th->ractor, th); - while (vm_living_thread_num(vm) > 1) { + while (rb_ractor_living_thread_num(th->ractor) > 1) { rb_hrtime_t rel = RB_HRTIME_PER_SEC; - /* + /*q * Thread exiting routine in thread_start_func_2 notify * me when the last sub-thread exit. */ @@ -669,26 +662,43 @@ rb_vm_proc_local_ep(VALUE proc) } } -static void -thread_do_start(rb_thread_t *th) -{ - native_set_thread_name(th); +// for ractor, defined in vm.c +VALUE rb_vm_invoke_proc_with_self(rb_execution_context_t *ec, rb_proc_t *proc, VALUE self, + int argc, const VALUE *argv, int kw_splat, VALUE passed_block_handler); - if (th->invoke_type == thread_invoke_type_proc) { - VALUE args = th->invoke_arg.proc.args; - int args_len = (int)RARRAY_LEN(args); - const VALUE *args_ptr; - VALUE procval = th->invoke_arg.proc.proc; - rb_proc_t *proc; - GetProcPtr(procval, proc); - - th->ec->errinfo = Qnil; - th->ec->root_lep = rb_vm_proc_local_ep(procval); - th->ec->root_svar = Qfalse; - - EXEC_EVENT_HOOK(th->ec, RUBY_EVENT_THREAD_BEGIN, th->self, 0, 0, 0, Qundef); +static void +thread_do_start_proc(rb_thread_t *th) +{ + VALUE args = th->invoke_arg.proc.args; + const VALUE *args_ptr; + int args_len; + VALUE procval = th->invoke_arg.proc.proc; + rb_proc_t *proc; + GetProcPtr(procval, proc); + + th->ec->errinfo = Qnil; + th->ec->root_lep = rb_vm_proc_local_ep(procval); + th->ec->root_svar = Qfalse; + + EXEC_EVENT_HOOK(th->ec, RUBY_EVENT_THREAD_BEGIN, th->self, 0, 0, 0, Qundef); + vm_check_ints_blocking(th->ec); + + if (th->invoke_type == thread_invoke_type_ractor_proc) { + VALUE self = rb_ractor_self(th->ractor); + VM_ASSERT(FIXNUM_P(args)); + args_len = FIX2INT(args); + args_ptr = ALLOCA_N(VALUE, args_len); + rb_ractor_recv_parameters(th->ec, th->ractor, args_len, (VALUE *)args_ptr); vm_check_ints_blocking(th->ec); + // kick thread + th->value = rb_vm_invoke_proc_with_self(th->ec, proc, self, + args_len, args_ptr, + th->invoke_arg.proc.kw_splat, + VM_BLOCK_HANDLER_NONE); + } + else { + args_len = RARRAY_LENINT(args); if (args_len < 8) { /* free proc.args if the length is enough small */ args_ptr = ALLOCA_N(VALUE, args_len); @@ -700,15 +710,36 @@ thread_do_start(rb_thread_t *th) } vm_check_ints_blocking(th->ec); + + // kick thread th->value = rb_vm_invoke_proc(th->ec, proc, args_len, args_ptr, th->invoke_arg.proc.kw_splat, VM_BLOCK_HANDLER_NONE); + } + + EXEC_EVENT_HOOK(th->ec, RUBY_EVENT_THREAD_END, th->self, 0, 0, 0, Qundef); - EXEC_EVENT_HOOK(th->ec, RUBY_EVENT_THREAD_END, th->self, 0, 0, 0, Qundef); + if (th->invoke_type == thread_invoke_type_ractor_proc) { + rb_ractor_atexit(th->ec, th->value); } - else { +} + +static void +thread_do_start(rb_thread_t *th) +{ + native_set_thread_name(th); + + switch (th->invoke_type) { + case thread_invoke_type_proc: + case thread_invoke_type_ractor_proc: + thread_do_start_proc(th); + break; + case thread_invoke_type_func: th->value = (*th->invoke_arg.func.func)(th->invoke_arg.func.arg); + break; + case thread_invoke_type_none: + rb_bug("unreachable"); } VALUE scheduler = th->scheduler; @@ -725,32 +756,40 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start) STACK_GROW_DIR_DETECTION; enum ruby_tag_type state; rb_thread_list_t *join_list; - rb_thread_t *main_th; VALUE errinfo = Qnil; size_t size = th->vm->default_params.thread_vm_stack_size / sizeof(VALUE); + rb_thread_t *ractor_main_th = th->ractor->threads.main; VALUE * vm_stack = NULL; - if (th == th->vm->main_thread) { - rb_bug("thread_start_func_2 must not be used for main thread"); + VM_ASSERT(th != th->vm->ractor.main_thread); + thread_debug("thread start: %p\n", (void *)th); + + // setup native thread + gvl_acquire(rb_ractor_gvl(th->ractor), th); + ruby_thread_set_native(th); + + // setup ractor + if (rb_ractor_status_p(th->ractor, ractor_blocking)) { + RB_VM_LOCK(); + { + rb_vm_ractor_blocking_cnt_dec(th->vm, th->ractor, __FILE__, __LINE__); + } + RB_VM_UNLOCK(); } - thread_debug("thread start: %p\n", (void *)th); - VM_ASSERT((size * sizeof(VALUE)) <= th->ec->machine.stack_maxsize); + // This assertion is not passed on win32 env. Check it later. + // VM_ASSERT((size * sizeof(VALUE)) <= th->ec->machine.stack_maxsize); + // setup VM and machine stack vm_stack = alloca(size * sizeof(VALUE)); VM_ASSERT(vm_stack); - gvl_acquire(th->vm, th); - rb_ec_initialize_vm_stack(th->ec, vm_stack, size); th->ec->machine.stack_start = STACK_DIR_UPPER(vm_stack + size, vm_stack); th->ec->machine.stack_maxsize -= size * sizeof(VALUE); - ruby_thread_set_native(th); - { thread_debug("thread start (get lock): %p\n", (void *)th); - rb_thread_set_current(th); EC_PUSH_TAG(th->ec); if ((state = EC_EXEC_TAG()) == TAG_NONE) { @@ -758,14 +797,19 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start) } else { errinfo = th->ec->errinfo; - if (state == TAG_FATAL) { + + if (state == TAG_FATAL) { /* fatal error within this thread, need to stop whole script */ } else if (rb_obj_is_kind_of(errinfo, rb_eSystemExit)) { /* exit on main_thread. */ } else { - if (th->report_on_exception) { + if (th->invoke_type == thread_invoke_type_ractor_proc) { + rb_ractor_atexit_exception(th->ec); + } + + if (th->report_on_exception) { VALUE mesg = rb_thread_to_s(th->self); rb_str_cat_cstr(mesg, " terminated with exception (report_on_exception is true):\n"); rb_write_error_str(mesg); @@ -782,16 +826,20 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start) th->value = Qnil; } + if (th->invoke_type == thread_invoke_type_ractor_proc) { + rb_ractor_teardown(th->ec); + } + th->status = THREAD_KILLED; thread_debug("thread end: %p\n", (void *)th); - main_th = th->vm->main_thread; - if (main_th == th) { + if (th->vm->ractor.main_thread == th) { ruby_stop(0); } - if (RB_TYPE_P(errinfo, T_OBJECT)) { + + if (RB_TYPE_P(errinfo, T_OBJECT)) { /* treat with normal error object */ - rb_threadptr_raise(main_th, 1, &errinfo); + rb_threadptr_raise(ractor_main_th, 1, &errinfo); } EC_POP_TAG(); @@ -803,11 +851,10 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start) (void *)th, th->locking_mutex); } - /* delete self other than main thread from living_threads */ - rb_vm_living_threads_remove(th->vm, th); - if (main_th->status == THREAD_KILLED && rb_thread_alone()) { + if (ractor_main_th->status == THREAD_KILLED && + th->ractor->threads.cnt <= 2 /* main thread and this thread */) { /* I'm last thread. wake up main thread from rb_thread_terminate_all */ - rb_threadptr_interrupt(main_th); + rb_threadptr_interrupt(ractor_main_th); } /* wake up joining threads */ @@ -823,7 +870,7 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start) } rb_threadptr_unlock_all_locking_mutexes(th); - rb_check_deadlock(th->vm); + rb_check_deadlock(th->ractor); rb_fiber_close(th->ec->fiber_ptr); } @@ -831,15 +878,40 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start) thread_cleanup_func(th, FALSE); VM_ASSERT(th->ec->vm_stack == NULL); - gvl_release(th->vm); + if (th->invoke_type == thread_invoke_type_ractor_proc) { + // after rb_ractor_living_threads_remove() + // GC will happen anytime and this ractor can be collected (and destroy GVL). + // So gvl_release() should be before it. + gvl_release(rb_ractor_gvl(th->ractor)); + rb_ractor_living_threads_remove(th->ractor, th); + } + else { + rb_ractor_living_threads_remove(th->ractor, th); + gvl_release(rb_ractor_gvl(th->ractor)); + } return 0; } +struct thread_create_params { + enum thread_invoke_type type; + + // for normal proc thread + VALUE args; + VALUE proc; + + // for ractor + rb_ractor_t *g; + + // for func + VALUE (*fn)(void *); +}; + static VALUE -thread_create_core(VALUE thval, VALUE args, VALUE (*fn)(void *)) +thread_create_core(VALUE thval, struct thread_create_params *params) { - rb_thread_t *th = rb_thread_ptr(thval), *current_th = GET_THREAD(); + rb_execution_context_t *ec = GET_EC(); + rb_thread_t *th = rb_thread_ptr(thval), *current_th = rb_ec_thread_ptr(ec); int err; if (OBJ_FROZEN(current_th->thgroup)) { @@ -847,17 +919,35 @@ thread_create_core(VALUE thval, VALUE args, VALUE (*fn)(void *)) "can't start a new thread (frozen ThreadGroup)"); } - if (fn) { - th->invoke_type = thread_invoke_type_func; - th->invoke_arg.func.func = fn; - th->invoke_arg.func.arg = (void *)args; - } - else { - (void)RARRAY_LENINT(args); + switch (params->type) { + case thread_invoke_type_proc: th->invoke_type = thread_invoke_type_proc; - th->invoke_arg.proc.proc = rb_block_proc(); - th->invoke_arg.proc.args = args; + th->invoke_arg.proc.args = params->args; + th->invoke_arg.proc.proc = params->proc; + th->invoke_arg.proc.kw_splat = rb_keyword_given_p(); + break; + + case thread_invoke_type_ractor_proc: +#if RACTOR_CHECK_MODE > 0 + rb_ractor_setup_belonging_to(thval, rb_ractor_id(params->g)); +#endif + th->invoke_type = thread_invoke_type_ractor_proc; + th->ractor = params->g; + th->ractor->threads.main = th; + th->invoke_arg.proc.proc = rb_proc_isolate_bang(params->proc); + th->invoke_arg.proc.args = INT2FIX(RARRAY_LENINT(params->args)); th->invoke_arg.proc.kw_splat = rb_keyword_given_p(); + rb_ractor_send_parameters(ec, params->g, params->args); + break; + + case thread_invoke_type_func: + th->invoke_type = thread_invoke_type_func; + th->invoke_arg.func.func = params->fn; + th->invoke_arg.func.arg = (void *)params->args; + break; + + default: + rb_bug("unreachable"); } th->priority = current_th->priority; @@ -870,13 +960,17 @@ thread_create_core(VALUE thval, VALUE args, VALUE (*fn)(void *)) rb_native_mutex_initialize(&th->interrupt_lock); + RUBY_DEBUG_LOG("r:%u th:%p", th->ractor->id, th); + + rb_ractor_living_threads_insert(th->ractor, th); + /* kick thread */ err = native_thread_create(th); if (err) { th->status = THREAD_KILLED; - rb_raise(rb_eThreadError, "can't create Thread: %s", strerror(err)); + rb_ractor_living_threads_remove(th->ractor, th); + rb_raise(rb_eThreadError, "can't create Thread: %s", strerror(err)); } - rb_vm_living_threads_insert(th->vm, th); return thval; } @@ -908,8 +1002,9 @@ thread_s_new(int argc, VALUE *argv, VALUE klass) rb_thread_t *th; VALUE thread = rb_thread_alloc(klass); - if (GET_VM()->main_thread->status == THREAD_KILLED) - rb_raise(rb_eThreadError, "can't alloc thread"); + if (GET_RACTOR()->threads.main->status == THREAD_KILLED) { + rb_raise(rb_eThreadError, "can't alloc thread"); + } rb_obj_call_init_kw(thread, argc, argv, RB_PASS_CALLED_KEYWORDS); th = rb_thread_ptr(thread); @@ -933,7 +1028,12 @@ thread_s_new(int argc, VALUE *argv, VALUE klass) static VALUE thread_start(VALUE klass, VALUE args) { - return thread_create_core(rb_thread_alloc(klass), args, 0); + struct thread_create_params params = { + .type = thread_invoke_type_proc, + .args = args, + .proc = rb_block_proc(), + }; + return thread_create_core(rb_thread_alloc(klass), ¶ms); } static VALUE @@ -968,14 +1068,36 @@ thread_initialize(VALUE thread, VALUE args) } } else { - return thread_create_core(thread, args, 0); + struct thread_create_params params = { + .type = thread_invoke_type_proc, + .args = args, + .proc = rb_block_proc(), + }; + return thread_create_core(thread, ¶ms); } } VALUE rb_thread_create(VALUE (*fn)(void *), void *arg) { - return thread_create_core(rb_thread_alloc(rb_cThread), (VALUE)arg, fn); + struct thread_create_params params = { + .type = thread_invoke_type_func, + .fn = fn, + .args = (VALUE)arg, + }; + return thread_create_core(rb_thread_alloc(rb_cThread), ¶ms); +} + +VALUE +rb_thread_create_ractor(rb_ractor_t *g, VALUE args, VALUE proc) +{ + struct thread_create_params params = { + .type = thread_invoke_type_ractor_proc, + .g = g, + .args = args, + .proc = proc, + }; + return thread_create_core(rb_thread_alloc(rb_cThread), ¶ms); } @@ -1019,10 +1141,10 @@ thread_join_sleep(VALUE arg) while (target_th->status != THREAD_KILLED) { if (!p->limit) { th->status = THREAD_STOPPED_FOREVER; - th->vm->sleeper++; - rb_check_deadlock(th->vm); + rb_ractor_sleeper_threads_inc(th->ractor); + rb_check_deadlock(th->ractor); native_sleep(th, 0); - th->vm->sleeper--; + rb_ractor_sleeper_threads_dec(th->ractor); } else { if (hrtime_update_expire(p->limit, end)) { @@ -1050,7 +1172,7 @@ thread_join(rb_thread_t *target_th, rb_hrtime_t *rel) if (th == target_th) { rb_raise(rb_eThreadError, "Target thread must not be current thread"); } - if (GET_VM()->main_thread == target_th) { + if (th->ractor->threads.main == target_th) { rb_raise(rb_eThreadError, "Target thread must not be main thread"); } @@ -1266,12 +1388,12 @@ sleep_forever(rb_thread_t *th, unsigned int fl) RUBY_VM_CHECK_INTS_BLOCKING(th->ec); while (th->status == status) { if (fl & SLEEP_DEADLOCKABLE) { - th->vm->sleeper++; - rb_check_deadlock(th->vm); + rb_ractor_sleeper_threads_inc(th->ractor); + rb_check_deadlock(th->ractor); } native_sleep(th, 0); if (fl & SLEEP_DEADLOCKABLE) { - th->vm->sleeper--; + rb_ractor_sleeper_threads_dec(th->ractor); } woke = vm_check_ints_blocking(th->ec); if (woke && !(fl & SLEEP_SPURIOUS_CHECK)) @@ -1417,8 +1539,8 @@ rb_thread_schedule_limits(uint32_t limits_us) if (th->running_time_us >= limits_us) { thread_debug("rb_thread_schedule/switch start\n"); RB_GC_SAVE_MACHINE_CONTEXT(th); - gvl_yield(th->vm, th); - rb_thread_set_current(th); + gvl_yield(rb_ractor_gvl(th->ractor), th); + rb_ractor_thread_switch(th->ractor, th); thread_debug("rb_thread_schedule/switch done\n"); } } @@ -1441,9 +1563,10 @@ blocking_region_begin(rb_thread_t *th, struct rb_blocking_region_buffer *region, if (unblock_function_set(th, ubf, arg, fail_if_interrupted)) { th->blocking_region_buffer = region; th->status = THREAD_STOPPED; + rb_ractor_blocking_threads_inc(th->ractor, __FILE__, __LINE__); thread_debug("enter blocking region (%p)\n", (void *)th); RB_GC_SAVE_MACHINE_CONTEXT(th); - gvl_release(th->vm); + gvl_release(rb_ractor_gvl(th->ractor)); return TRUE; } else { @@ -1459,10 +1582,12 @@ blocking_region_end(rb_thread_t *th, struct rb_blocking_region_buffer *region) /* entry to ubf_list impossible at this point, so unregister is safe: */ unregister_ubf_list(th); - gvl_acquire(th->vm, th); - rb_thread_set_current(th); + gvl_acquire(rb_ractor_gvl(th->ractor), th); + rb_ractor_thread_switch(th->ractor, th); + thread_debug("leave blocking region (%p)\n", (void *)th); th->blocking_region_buffer = 0; + rb_ractor_blocking_threads_dec(th->ractor, __FILE__, __LINE__); if (th->status == THREAD_STOPPED) { th->status = region->prev_status; } @@ -1484,7 +1609,7 @@ rb_nogvl(void *(*func)(void *), void *data1, ubf = ubf_select; data2 = th; } - else if (ubf && vm_living_thread_num(th->vm) == 1) { + else if (ubf && rb_ractor_living_thread_num(th->ractor) == 1) { if (flags & RB_NOGVL_UBF_ASYNC_SAFE) { th->vm->ubf_async_safe = 1; } @@ -1631,7 +1756,12 @@ rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd) wfd.fd = fd; wfd.th = rb_ec_thread_ptr(ec); - list_add(&rb_ec_vm_ptr(ec)->waiting_fds, &wfd.wfd_node); + + RB_VM_LOCK_ENTER(); + { + list_add(&rb_ec_vm_ptr(ec)->waiting_fds, &wfd.wfd_node); + } + RB_VM_LOCK_LEAVE(); EC_PUSH_TAG(ec); if ((state = EC_EXEC_TAG()) == TAG_NONE) { @@ -1646,7 +1776,11 @@ rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd) * must be deleted before jump * this will delete either from waiting_fds or on-stack LIST_HEAD(busy) */ - list_del(&wfd.wfd_node); + RB_VM_LOCK_ENTER(); + { + list_del(&wfd.wfd_node); + } + RB_VM_LOCK_LEAVE(); if (state) { EC_JUMP_TAG(ec, state); @@ -1700,7 +1834,7 @@ rb_thread_call_with_gvl(void *(*func)(void *), void *data1) * because this thread is not Ruby's thread. * What should we do? */ - + bp(); fprintf(stderr, "[BUG] rb_thread_call_with_gvl() is called by non-ruby thread\n"); exit(EXIT_FAILURE); } @@ -2233,18 +2367,25 @@ rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing) int timer_interrupt; int pending_interrupt; int trap_interrupt; + int terminate_interrupt; timer_interrupt = interrupt & TIMER_INTERRUPT_MASK; pending_interrupt = interrupt & PENDING_INTERRUPT_MASK; postponed_job_interrupt = interrupt & POSTPONED_JOB_INTERRUPT_MASK; trap_interrupt = interrupt & TRAP_INTERRUPT_MASK; + terminate_interrupt = interrupt & TERMINATE_INTERRUPT_MASK; // request from other ractors + + if (interrupt & VM_BARRIER_INTERRUPT_MASK) { + RB_VM_LOCK_ENTER(); + RB_VM_LOCK_LEAVE(); + } if (postponed_job_interrupt) { rb_postponed_job_flush(th->vm); } /* signal handling */ - if (trap_interrupt && (th == th->vm->main_thread)) { + if (trap_interrupt && (th == th->vm->ractor.main_thread)) { enum rb_thread_status prev_status = th->status; int sigwait_fd = rb_sigwait_fd_get(th); @@ -2273,7 +2414,7 @@ rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing) else if (err == eKillSignal /* Thread#kill received */ || err == eTerminateSignal /* Terminate thread */ || err == INT2FIX(TAG_FATAL) /* Thread.exit etc. */ ) { - rb_threadptr_to_kill(th); + terminate_interrupt = 1; } else { if (err == th->vm->special_exceptions[ruby_error_stream_closed]) { @@ -2288,7 +2429,11 @@ rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing) } } - if (timer_interrupt) { + if (terminate_interrupt) { + rb_threadptr_to_kill(th); + } + + if (timer_interrupt) { uint32_t limits_us = TIME_QUANTUM_USEC; if (th->priority > 0) @@ -2356,7 +2501,7 @@ rb_threadptr_signal_raise(rb_thread_t *th, int sig) argv[0] = rb_eSignal; argv[1] = INT2FIX(sig); - rb_threadptr_raise(th->vm->main_thread, 2, argv); + rb_threadptr_raise(th->vm->ractor.main_thread, 2, argv); } void @@ -2366,7 +2511,9 @@ rb_threadptr_signal_exit(rb_thread_t *th) argv[0] = rb_eSystemExit; argv[1] = rb_str_new2("exit"); - rb_threadptr_raise(th->vm->main_thread, 2, argv); + + // TODO: check signal raise deliverly + rb_threadptr_raise(th->vm->ractor.main_thread, 2, argv); } int @@ -2395,19 +2542,24 @@ rb_notify_fd_close(int fd, struct list_head *busy) rb_vm_t *vm = GET_THREAD()->vm; struct waiting_fd *wfd = 0, *next; - list_for_each_safe(&vm->waiting_fds, wfd, next, wfd_node) { - if (wfd->fd == fd) { - rb_thread_t *th = wfd->th; - VALUE err; + RB_VM_LOCK_ENTER(); + { + list_for_each_safe(&vm->waiting_fds, wfd, next, wfd_node) { + if (wfd->fd == fd) { + rb_thread_t *th = wfd->th; + VALUE err; - list_del(&wfd->wfd_node); - list_add(busy, &wfd->wfd_node); + list_del(&wfd->wfd_node); + list_add(busy, &wfd->wfd_node); - err = th->vm->special_exceptions[ruby_error_stream_closed]; - rb_threadptr_pending_interrupt_enque(th, err); - rb_threadptr_interrupt(th); - } + err = th->vm->special_exceptions[ruby_error_stream_closed]; + rb_threadptr_pending_interrupt_enque(th, err); + rb_threadptr_interrupt(th); + } + } } + RB_VM_LOCK_LEAVE(); + return !list_empty(busy); } @@ -2479,7 +2631,7 @@ rb_thread_kill(VALUE thread) if (th->to_kill || th->status == THREAD_KILLED) { return thread; } - if (th == th->vm->main_thread) { + if (th == th->vm->ractor.main_thread) { rb_exit(EXIT_SUCCESS); } @@ -2658,21 +2810,8 @@ thread_stop(VALUE _) VALUE rb_thread_list(void) { - VALUE ary = rb_ary_new(); - rb_vm_t *vm = GET_THREAD()->vm; - rb_thread_t *th = 0; - - list_for_each(&vm->living_threads, th, vmlt_node) { - switch (th->status) { - case THREAD_RUNNABLE: - case THREAD_STOPPED: - case THREAD_STOPPED_FOREVER: - rb_ary_push(ary, th->self); - default: - break; - } - } - return ary; + // TODO + return rb_ractor_thread_list(GET_RACTOR()); } /* @@ -2725,7 +2864,7 @@ thread_s_current(VALUE klass) VALUE rb_thread_main(void) { - return GET_THREAD()->vm->main_thread->self; + return GET_RACTOR()->threads.main->self; } /* @@ -3521,7 +3660,8 @@ thread_keys_i(ID key, VALUE value, void *ary) int rb_thread_alone(void) { - return vm_living_thread_num(GET_VM()) == 1; + // TODO + return rb_ractor_living_thread_num(GET_RACTOR()) == 1; } /* @@ -4248,7 +4388,13 @@ rb_wait_for_single_fd(int fd, int events, struct timeval *timeout) wfd.th = GET_THREAD(); wfd.fd = fd; - list_add(&wfd.th->vm->waiting_fds, &wfd.wfd_node); + + RB_VM_LOCK_ENTER(); + { + list_add(&wfd.th->vm->waiting_fds, &wfd.wfd_node); + } + RB_VM_LOCK_LEAVE(); + EC_PUSH_TAG(wfd.th->ec); if ((state = EC_EXEC_TAG()) == TAG_NONE) { RUBY_VM_CHECK_INTS_BLOCKING(wfd.th->ec); @@ -4401,7 +4547,12 @@ rb_wait_for_single_fd(int fd, int events, struct timeval *timeout) args.wfd.fd = fd; args.wfd.th = GET_THREAD(); - list_add(&args.wfd.th->vm->waiting_fds, &args.wfd.wfd_node); + RB_VM_LOCK_ENTER(); + { + list_add(&args.wfd.th->vm->waiting_fds, &args.wfd.wfd_node); + } + RB_VM_LOCK_LEAVE(); + r = (int)rb_ensure(select_single, ptr, select_single_cleanup, ptr); if (r == -1) errno = args.as.error; @@ -4438,14 +4589,12 @@ rb_threadptr_check_signal(rb_thread_t *mth) } static void -timer_thread_function(void) +timer_thread_function(rb_execution_context_t *ec) { - volatile rb_execution_context_t *ec; - - /* for time slice */ - ec = ACCESS_ONCE(rb_execution_context_t *, - ruby_current_execution_context_ptr); - if (ec) RUBY_VM_SET_TIMER_INTERRUPT(ec); + // strictly speaking, accessing gvl->owner is not thread-safe + if (ec) { + RUBY_VM_SET_TIMER_INTERRUPT(ec); + } } static void @@ -4516,11 +4665,13 @@ check_signals_nogvl(rb_thread_t *th, int sigwait_fd) ubf_wakeup_all_threads(); ruby_sigchld_handler(vm); if (rb_signal_buff_size()) { - if (th == vm->main_thread) + if (th == vm->ractor.main_thread) { /* no need to lock + wakeup if already in main thread */ RUBY_VM_SET_TRAP_INTERRUPT(th->ec); - else - threadptr_trap_interrupt(vm->main_thread); + } + else { + threadptr_trap_interrupt(vm->ractor.main_thread); + } ret = TRUE; /* for SIGCHLD_LOSSY && rb_sigwait_sleep */ } return ret; @@ -4592,16 +4743,28 @@ rb_thread_atfork_internal(rb_thread_t *th, void (*atfork)(rb_thread_t *, const r { rb_thread_t *i = 0; rb_vm_t *vm = th->vm; - vm->main_thread = th; + rb_ractor_t *r = th->ractor; + vm->ractor.main_ractor = r; + vm->ractor.main_thread = th; + r->threads.main = th; + r->status_ = ractor_created; - gvl_atfork(th->vm); + gvl_atfork(rb_ractor_gvl(th->ractor)); ubf_list_atfork(); - list_for_each(&vm->living_threads, i, vmlt_node) { - atfork(i, th); + // OK. Only this thread accesses: + list_for_each(&vm->ractor.set, r, vmlr_node) { + list_for_each(&r->threads.set, i, lt_node) { + atfork(i, th); + } } rb_vm_living_threads_init(vm); - rb_vm_living_threads_insert(vm, th); + + // threads + vm->ractor.cnt = 0; + rb_ractor_living_threads_init(th->ractor); + rb_ractor_living_threads_insert(th->ractor, th); + /* may be held by MJIT threads in parent */ rb_native_mutex_initialize(&vm->waitpid_lock); @@ -4611,9 +4774,10 @@ rb_thread_atfork_internal(rb_thread_t *th, void (*atfork)(rb_thread_t *, const r rb_native_mutex_initialize(&th->interrupt_lock); vm->fork_gen++; - - vm->sleeper = 0; + rb_ractor_sleeper_threads_clear(th->ractor); rb_clear_coverages(); + + VM_ASSERT(vm->ractor.cnt == 1); } static void @@ -4730,11 +4894,11 @@ static VALUE thgroup_list(VALUE group) { VALUE ary = rb_ary_new(); - rb_vm_t *vm = GET_THREAD()->vm; rb_thread_t *th = 0; + rb_ractor_t *r = GET_RACTOR(); - list_for_each(&vm->living_threads, th, vmlt_node) { - if (th->thgroup == group) { + list_for_each(&r->threads.set, th, lt_node) { + if (th->thgroup == group) { rb_ary_push(ary, th->self); } } @@ -5364,7 +5528,7 @@ Init_Thread(void) rb_define_method(cThGroup, "add", thgroup_add, 1); { - th->thgroup = th->vm->thgroup_default = rb_obj_alloc(cThGroup); + th->thgroup = th->ractor->thgroup_default = rb_obj_alloc(cThGroup); rb_define_const(cThGroup, "Default", th->thgroup); } @@ -5376,8 +5540,8 @@ Init_Thread(void) /* main thread setting */ { /* acquire global vm lock */ - gvl_init(th->vm); - gvl_acquire(th->vm, th); + rb_global_vm_lock_t *gvl = rb_ractor_gvl(th->ractor); + gvl_acquire(gvl, th); rb_native_mutex_initialize(&th->vm->waitpid_lock); rb_native_mutex_initialize(&th->vm->workqueue_lock); rb_native_mutex_initialize(&th->interrupt_lock); @@ -5390,9 +5554,6 @@ Init_Thread(void) rb_thread_create_timer_thread(); - /* suppress warnings on cygwin, mingw and mswin.*/ - (void)native_mutex_trylock; - Init_thread_sync(); } @@ -5405,67 +5566,73 @@ ruby_native_thread_p(void) } static void -debug_deadlock_check(rb_vm_t *vm, VALUE msg) +debug_deadlock_check(rb_ractor_t *r, VALUE msg) { rb_thread_t *th = 0; VALUE sep = rb_str_new_cstr("\n "); rb_str_catf(msg, "\n%d threads, %d sleeps current:%p main thread:%p\n", - vm_living_thread_num(vm), vm->sleeper, (void *)GET_THREAD(), (void *)vm->main_thread); - list_for_each(&vm->living_threads, th, vmlt_node) { - rb_str_catf(msg, "* %+"PRIsVALUE"\n rb_thread_t:%p " - "native:%"PRI_THREAD_ID" int:%u", - th->self, (void *)th, thread_id_str(th), th->ec->interrupt_flag); - if (th->locking_mutex) { - rb_mutex_t *mutex = mutex_ptr(th->locking_mutex); - rb_str_catf(msg, " mutex:%p cond:%"PRIuSIZE, - (void *)mutex->th, rb_mutex_num_waiting(mutex)); - } - { - rb_thread_list_t *list = th->join_list; - while (list) { - rb_str_catf(msg, "\n depended by: tb_thread_id:%p", (void *)list->th); - list = list->next; - } - } - rb_str_catf(msg, "\n "); - rb_str_concat(msg, rb_ary_join(rb_ec_backtrace_str_ary(th->ec, 0, 0), sep)); - rb_str_catf(msg, "\n"); + rb_ractor_living_thread_num(r), rb_ractor_sleeper_thread_num(r), + (void *)GET_THREAD(), (void *)r->threads.main); + + list_for_each(&r->threads.set, th, lt_node) { + rb_str_catf(msg, "* %+"PRIsVALUE"\n rb_thread_t:%p " + "native:%"PRI_THREAD_ID" int:%u", + th->self, (void *)th, thread_id_str(th), th->ec->interrupt_flag); + + if (th->locking_mutex) { + rb_mutex_t *mutex = mutex_ptr(th->locking_mutex); + rb_str_catf(msg, " mutex:%p cond:%"PRIuSIZE, + (void *)mutex->th, rb_mutex_num_waiting(mutex)); + } + + { + rb_thread_list_t *list = th->join_list; + while (list) { + rb_str_catf(msg, "\n depended by: tb_thread_id:%p", (void *)list->th); + list = list->next; + } + } + rb_str_catf(msg, "\n "); + rb_str_concat(msg, rb_ary_join(rb_ec_backtrace_str_ary(th->ec, 0, 0), sep)); + rb_str_catf(msg, "\n"); } } static void -rb_check_deadlock(rb_vm_t *vm) +rb_check_deadlock(rb_ractor_t *r) { int found = 0; - rb_thread_t *th = 0; + rb_thread_t *th = NULL; + int sleeper_num = rb_ractor_sleeper_thread_num(r); + int ltnum = rb_ractor_living_thread_num(r); - if (vm_living_thread_num(vm) > vm->sleeper) return; - if (vm_living_thread_num(vm) < vm->sleeper) rb_bug("sleeper must not be more than vm_living_thread_num(vm)"); + if (ltnum > sleeper_num) return; + if (ltnum < sleeper_num) rb_bug("sleeper must not be more than vm_living_thread_num(vm)"); if (patrol_thread && patrol_thread != GET_THREAD()) return; - list_for_each(&vm->living_threads, th, vmlt_node) { - if (th->status != THREAD_STOPPED_FOREVER || RUBY_VM_INTERRUPTED(th->ec)) { - found = 1; - } - else if (th->locking_mutex) { - rb_mutex_t *mutex = mutex_ptr(th->locking_mutex); + list_for_each(&r->threads.set, th, lt_node) { + if (th->status != THREAD_STOPPED_FOREVER || RUBY_VM_INTERRUPTED(th->ec)) { + found = 1; + } + else if (th->locking_mutex) { + rb_mutex_t *mutex = mutex_ptr(th->locking_mutex); - if (mutex->th == th || (!mutex->th && !list_empty(&mutex->waitq))) { - found = 1; - } - } - if (found) - break; + if (mutex->th == th || (!mutex->th && !list_empty(&mutex->waitq))) { + found = 1; + } + } + if (found) + break; } if (!found) { VALUE argv[2]; argv[0] = rb_eFatal; argv[1] = rb_str_new2("No live threads left. Deadlock?"); - debug_deadlock_check(vm, argv[1]); - vm->sleeper--; - rb_threadptr_raise(vm->main_thread, 2, argv); + debug_deadlock_check(r, argv[1]); + rb_ractor_sleeper_threads_dec(GET_RACTOR()); + rb_threadptr_raise(r->threads.main, 2, argv); } } -- cgit v1.2.3