diff options
-rw-r--r-- | ChangeLog | 33 | ||||
-rw-r--r-- | cont.c | 2 | ||||
-rw-r--r-- | eval.c | 2 | ||||
-rw-r--r-- | insns.def | 12 | ||||
-rw-r--r-- | process.c | 2 | ||||
-rw-r--r-- | test/ruby/test_thread.rb | 66 | ||||
-rw-r--r-- | thread.c | 471 | ||||
-rw-r--r-- | vm_core.h | 14 | ||||
-rw-r--r-- | vm_eval.c | 4 | ||||
-rw-r--r-- | vm_insnhelper.c | 2 |
10 files changed, 504 insertions, 104 deletions
@@ -1,3 +1,36 @@ +Thu Jul 19 15:08:40 2012 Koichi Sasada <ko1@atdot.net> + + * thread.c (rb_thread_s_control_interrupt, + rb_thread_s_check_interrupt): added for + Thread.control_intgerrupt and Thread.check_interrupt. + See details on rdoc. + I'll make an ticket for this feature. + + * test/ruby/test_thread.rb: add a test for Thread.control_intgerrupt. + + * thread.c (rb_threadptr_raise): make a new exception object + even if argc is 0. + + * thread.c (rb_thread_kill): kill thread immediately if target thread + is current thread. + + * vm_core.h (RUBY_VM_CHECK_INTS_BLOCKING): added. + CHECK_INTS while/after blocking operation. + + * vm_core.h (RUBY_VM_CHECK_INTS): require rb_thread_t ptr. + + * cont.c (fiber_switch): use replaced RUBY_VM_CHECK_INTS(). + + * eval.c (ruby_cleanup): ditto. + + * insns.def: ditto. + + * process.c (rb_waitpid): ditto. + + * vm_eval.c (vm_call0): ditto. + + * vm_insnhelper.c (vm_call_method): ditto. + Thu Jul 19 22:46:48 2012 Tanaka Akira <akr@fsij.org> * test/ruby/test_io.rb: remove temporally files early. @@ -1328,7 +1328,7 @@ fiber_switch(VALUE fibval, int argc, VALUE *argv, int is_resume) rb_bug("rb_fiber_resume: unreachable"); } #endif - RUBY_VM_CHECK_INTS(); + RUBY_VM_CHECK_INTS(th); return value; } @@ -160,7 +160,7 @@ ruby_cleanup(volatile int ex) rb_threadptr_check_signal(th); PUSH_TAG(); if ((state = EXEC_TAG()) == 0) { - SAVE_ROOT_JMPBUF(th, { RUBY_VM_CHECK_INTS(); }); + SAVE_ROOT_JMPBUF(th, { RUBY_VM_CHECK_INTS(th); }); } POP_TAG(); @@ -1086,7 +1086,7 @@ leave } } - RUBY_VM_CHECK_INTS(); + RUBY_VM_CHECK_INTS(th); if (UNLIKELY(VM_FRAME_TYPE_FINISH_P(GET_CFP()))) { #if OPT_CALL_THREADED_CODE @@ -1117,7 +1117,7 @@ throw (VALUE throwobj) (VALUE val) { - RUBY_VM_CHECK_INTS(); + RUBY_VM_CHECK_INTS(th); val = vm_throw(th, GET_CFP(), throw_state, throwobj); THROW_EXCEPTION(val); /* unreachable */ @@ -1138,7 +1138,7 @@ jump () () { - RUBY_VM_CHECK_INTS(); + RUBY_VM_CHECK_INTS(th); JUMP(dst); } @@ -1154,7 +1154,7 @@ branchif () { if (RTEST(val)) { - RUBY_VM_CHECK_INTS(); + RUBY_VM_CHECK_INTS(th); JUMP(dst); } } @@ -1171,7 +1171,7 @@ branchunless () { if (!RTEST(val)) { - RUBY_VM_CHECK_INTS(); + RUBY_VM_CHECK_INTS(th); JUMP(dst); } } @@ -1220,7 +1220,7 @@ onceinlinecache } else if (ic->ic_value.value == Qundef) { - RUBY_VM_CHECK_INTS(); + RUBY_VM_CHECK_INTS(th); rb_thread_schedule(); goto retry; } @@ -663,7 +663,7 @@ rb_waitpid(rb_pid_t pid, int *st, int flags) RUBY_UBF_PROCESS, 0); if (result < 0) { if (errno == EINTR) { - RUBY_VM_CHECK_INTS(); + RUBY_VM_CHECK_INTS(GET_THREAD()); goto retry; } return (rb_pid_t)-1; diff --git a/test/ruby/test_thread.rb b/test/ruby/test_thread.rb index 946d522cd9..ed22cf9b44 100644 --- a/test/ruby/test_thread.rb +++ b/test/ruby/test_thread.rb @@ -615,6 +615,72 @@ class TestThread < Test::Unit::TestCase end assert_equal("Can't call on top of Fiber or Thread", error.message, bug5083) end + + def make_control_interrupt_test_thread1 flag + r = [] + q = Queue.new + th = Thread.new{ + begin + Thread.control_interrupt(RuntimeError => flag){ + q << :go + begin + sleep 0.5 + rescue + r << :c1 + end + } + sleep 0.5 + rescue + r << :c2 + end + } + q.pop # wait + th.raise + begin + th.join + rescue + r << :c3 + end + r + end + + def test_control_interrupt + [[:never, :c2], + [:immediate, :c1], + [:on_blocking, :c1]].each{|(flag, c)| + assert_equal([flag, c], [flag] + make_control_interrupt_test_thread1(flag)) + } + # TODO: complex cases are needed. + end + + def test_check_interrupt + q = Queue.new + Thread.control_interrupt(RuntimeError => :never){ + th = Thread.new{ + q.push :e + begin + begin + sleep 0.5 + rescue => e + q.push :ng1 + end + begin + Thread.check_interrupt + rescue => e + q.push :ok + end + rescue => e + q.push :ng2 + ensure + q.push :ng3 + end + } + q.pop + th.raise + th.join + assert_equal(:ok, q.pop) + } + end end class TestThreadGroup < Test::Unit::TestCase @@ -265,7 +265,7 @@ set_unblock_function(rb_thread_t *th, rb_unblock_function_t *func, void *arg, struct rb_unblock_callback *old) { check_ints: - RUBY_VM_CHECK_INTS(); /* check signal or so */ + RUBY_VM_CHECK_INTS(th); /* check signal or so */ native_mutex_lock(&th->interrupt_lock); if (th->interrupt_flag) { native_mutex_unlock(&th->interrupt_lock); @@ -545,7 +545,7 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start, VALUE *register_stack_s static VALUE thread_create_core(VALUE thval, VALUE args, VALUE (*fn)(ANYARGS)) { - rb_thread_t *th; + rb_thread_t *th, *current_th = GET_THREAD(); int err; if (OBJ_FROZEN(GET_THREAD()->thgroup)) { @@ -559,12 +559,12 @@ thread_create_core(VALUE thval, VALUE args, VALUE (*fn)(ANYARGS)) th->first_proc = fn ? Qfalse : rb_block_proc(); th->first_args = args; /* GC: shouldn't put before above line */ - th->priority = GET_THREAD()->priority; - th->thgroup = GET_THREAD()->thgroup; + th->priority = current_th->priority; + th->thgroup = current_th->thgroup; th->async_errinfo_queue = rb_ary_new(); th->async_errinfo_queue_checked = 0; - th->async_errinfo_mask_stack = rb_ary_new(); + th->async_errinfo_mask_stack = rb_ary_dup(current_th->async_errinfo_mask_stack); native_mutex_initialize(&th->interrupt_lock); if (GET_VM()->event_hooks != NULL) @@ -859,7 +859,7 @@ sleep_forever(rb_thread_t *th, int deadlockable) if (deadlockable) { th->vm->sleeper--; } - RUBY_VM_CHECK_INTS(); + RUBY_VM_CHECK_INTS_BLOCKING(th); } while (th->status == status); th->status = prev_status; } @@ -896,7 +896,7 @@ sleep_timeval(rb_thread_t *th, struct timeval tv) th->status = THREAD_STOPPED; do { native_sleep(th, &tv); - RUBY_VM_CHECK_INTS(); + RUBY_VM_CHECK_INTS_BLOCKING(th); getclockofday(&tvn); if (to.tv_sec < tvn.tv_sec) break; if (to.tv_sec == tvn.tv_sec && to.tv_usec <= tvn.tv_usec) break; @@ -968,9 +968,9 @@ rb_thread_wait_for(struct timeval time) void rb_thread_polling(void) { - RUBY_VM_CHECK_INTS(); if (!rb_thread_alone()) { rb_thread_t *th = GET_THREAD(); + RUBY_VM_CHECK_INTS_BLOCKING(th); sleep_for_polling(th); } } @@ -985,7 +985,7 @@ rb_thread_polling(void) void rb_thread_check_ints(void) { - RUBY_VM_CHECK_INTS(); + RUBY_VM_CHECK_INTS_BLOCKING(GET_THREAD()); } /* @@ -1013,7 +1013,7 @@ rb_thread_sleep(int sec) rb_thread_wait_for(rb_time_timeval(INT2FIX(sec))); } -static void rb_threadptr_execute_interrupts_common(rb_thread_t *); +static void rb_threadptr_execute_interrupts_common(rb_thread_t *, int blocking); static void rb_thread_schedule_limits(unsigned long limits_us) @@ -1040,7 +1040,7 @@ rb_thread_schedule(void) rb_thread_schedule_limits(0); if (UNLIKELY(GET_THREAD()->interrupt_flag)) { - rb_threadptr_execute_interrupts_common(GET_THREAD()); + rb_threadptr_execute_interrupts_common(GET_THREAD(), 0); } } @@ -1076,7 +1076,7 @@ rb_thread_blocking_region_end(struct rb_blocking_region_buffer *region) rb_thread_t *th = GET_THREAD(); blocking_region_end(th, region); xfree(region); - RUBY_VM_CHECK_INTS(); + RUBY_VM_CHECK_INTS_BLOCKING(th); errno = saved_errno; } @@ -1181,7 +1181,7 @@ rb_thread_call_without_gvl2(void *(*func)(void *data, int *skip_checkints), void }, ubf, data2); if (!skip_checkints) { - RUBY_VM_CHECK_INTS(); + RUBY_VM_CHECK_INTS_BLOCKING(th); } errno = saved_errno; @@ -1237,7 +1237,7 @@ rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd) JUMP_TAG(state); } /* TODO: check func() */ - RUBY_VM_CHECK_INTS(); + RUBY_VM_CHECK_INTS_BLOCKING(th); errno = saved_errno; @@ -1350,12 +1350,348 @@ thread_s_pass(VALUE klass) return Qnil; } +/*****************************************************/ + /* + * rb_threadptr_async_errinfo_* - manage async errors queue + * + * Async events such as an exception throwed by Thread#raise, + * Thread#kill and thread termination (after main thread termination) + * will be queued to th->async_errinfo_queue. + * - clear: clear the queue. + * - enque: enque err object into queue. + * - deque: deque err object from queue. + * - active_p: return 1 if the queue should be checked. * + * All rb_threadptr_async_errinfo_* functions are called by + * a GVL acquired thread, of course. + * Note that all "rb_" prefix APIs need GVL to call. */ +void +rb_threadptr_async_errinfo_clear(rb_thread_t *th) +{ + rb_ary_clear(th->async_errinfo_queue); +} + +void +rb_threadptr_async_errinfo_enque(rb_thread_t *th, VALUE v) +{ + rb_ary_push(th->async_errinfo_queue, v); + th->async_errinfo_queue_checked = 0; +} + +enum interrupt_timing { + INTERRUPT_NONE, + INTERRUPT_IMMEDIATE, + INTERRUPT_ON_BLOCKING, + INTERRUPT_NEVER +}; + +static enum interrupt_timing +rb_threadptr_async_errinfo_check_mask(rb_thread_t *th, VALUE err) +{ + VALUE mask; + long mask_stack_len = RARRAY_LEN(th->async_errinfo_mask_stack); + VALUE *mask_stack = RARRAY_PTR(th->async_errinfo_mask_stack); + VALUE ancestors = rb_mod_ancestors(err); /* TODO: GC guard */ + long ancestors_len = RARRAY_LEN(ancestors); + VALUE *ancestors_ptr = RARRAY_PTR(ancestors); + int i, j; + + for (i=0; i<mask_stack_len; i++) { + mask = mask_stack[mask_stack_len-(i+1)]; + + for (j=0; j<ancestors_len; j++) { + VALUE klass = ancestors_ptr[j]; + VALUE sym; + + /* TODO: remove rb_intern() */ + if ((sym = rb_hash_aref(mask, klass)) != Qnil) { + if (sym == ID2SYM(rb_intern("immediate"))) { + return INTERRUPT_IMMEDIATE; + } + else if (sym == ID2SYM(rb_intern("on_blocking"))) { + return INTERRUPT_ON_BLOCKING; + } + else if (sym == ID2SYM(rb_intern("never"))) { + return INTERRUPT_NEVER; + } + else { + rb_raise(rb_eThreadError, "unknown mask signature"); + } + } + } + /* try to next mask */ + } + return INTERRUPT_NONE; +} + +static int +rb_threadptr_async_errinfo_empty_p(rb_thread_t *th) +{ + return RARRAY_LEN(th->async_errinfo_queue) == 0; +} + +static VALUE +rb_threadptr_async_errinfo_deque(rb_thread_t *th, enum interrupt_timing timing) +{ +#if 1 /* 1 to enable Thread#control_interrupt, 0 to ignore it */ + int i; + + for (i=0; i<RARRAY_LEN(th->async_errinfo_queue); i++) { + VALUE err = RARRAY_PTR(th->async_errinfo_queue)[i]; + + enum interrupt_timing mask_timing = rb_threadptr_async_errinfo_check_mask(th, CLASS_OF(err)); + + switch (mask_timing) { + case INTERRUPT_ON_BLOCKING: + if (timing != INTERRUPT_ON_BLOCKING) { + break; + } + /* fall through */ + case INTERRUPT_NONE: /* default: IMMEDIATE */ + case INTERRUPT_IMMEDIATE: + rb_ary_delete_at(th->async_errinfo_queue, i); + return err; + case INTERRUPT_NEVER: + break; + } + } + + th->async_errinfo_queue_checked = 1; + return Qundef; +#else + VALUE err = rb_ary_shift(th->async_errinfo_queue); + if (rb_threadptr_async_errinfo_empty_p(th)) { + th->async_errinfo_queue_checked = 1; + } + return err; +#endif +} + +int +rb_threadptr_async_errinfo_active_p(rb_thread_t *th) +{ + if (th->async_errinfo_queue_checked || rb_threadptr_async_errinfo_empty_p(th)) { + return 0; + } + else { + return 1; + } +} + +static VALUE +rb_threadptr_interrupt_mask(rb_thread_t *th, VALUE mask, VALUE (*func)(rb_thread_t *th)) +{ + VALUE r = Qnil; + int state; + + rb_ary_push(th->async_errinfo_mask_stack, mask); + if (!rb_threadptr_async_errinfo_empty_p(th)) { + th->async_errinfo_queue_checked = 0; + RUBY_VM_SET_INTERRUPT(th); + } + + TH_PUSH_TAG(th); + if ((state = EXEC_TAG()) == 0) { + r = func(th); + } + TH_POP_TAG(); + + rb_ary_pop(th->async_errinfo_mask_stack); + if (!rb_threadptr_async_errinfo_empty_p(th)) { + th->async_errinfo_queue_checked = 0; + RUBY_VM_SET_INTERRUPT(th); + } + + if (state) { + JUMP_TAG(state); + } + + return r; +} + +/* + * call-seq: + * Thread.control_interrupt(hash) { ... } -> result of the block + * + * Thread.control_interrupt controls interrupt timing. + * + * _interrupt_ means asynchronous event and corresponding procedure + * by Thread#raise, Thread#kill, signal trap (not supported yet) + * and main thread termination (if main thread terminates, then all + * other thread will be killed). + * + * _hash_ has pairs of ExceptionClass and TimingSymbol. TimingSymbol + * is one of them: + * - :immediate Invoke interrupt immediately. + * - :on_blocking Invoke interrupt while _BlockingOperation_. + * - :never Never invoke interrupt. + * + * _BlockingOperation_ means that the operation will block the calling thread, + * such as read and write. On CRuby implementation, _BlockingOperation_ is + * operation executed without GVL. + * + * Masked interrupts are delayed until they are enabled. + * This method is similar to sigprocmask(3). + * + * TODO (DOC): control_interrupt is stacked. + * TODO (DOC): check ancestors. + * TODO (DOC): to prevent all interrupt, {Object => :never} works. + * + * NOTE: Asynchronous interrupts are difficult to use. + * If you need to communicate between threads, + * please consider to use another way such as Queue. + * Or use them with deep understanding about this method. + * + * + * # example: Guard from Thread#raise + * th = Thread.new do + * Thead.control_interrupt(RuntimeError => :never) { + * begin + * # Thread#raise doesn't interrupt here. + * # You can write resource allocation code safely. + * Thread.control_interrupt(RuntimeError => :immediate) { + * # ... + * # It is possible to be interrupted by Thread#raise. + * } + * ensure + * # Thread#raise doesn't interrupt here. + * # You can write resource dealocation code safely. + * end + * } + * end + * Thread.pass + * # ... + * th.raise "stop" + * + * # example: Guard from TimeoutError + * require 'timeout' + * Thread.control_interrupt(TimeoutError => :never) { + * timeout(10){ + * # TimeoutError doesn't occur here + * Thread.control_interrupt(TimeoutError => :on_blocking) { + * # possible to be killed by TimeoutError + * # while blocking operation + * } + * # TimeoutError doesn't occur here + * } + * } + * + * # example: Stack control settings + * Thread.control_interrupt(FooError => :never) { + * Thread.control_interrupt(BarError => :never) { + * # FooError and BarError are prohibited. + * } + * } + * + * # example: check ancestors + * Thread.control_interrupt(Exception => :never) { + * # all exceptions inherited from Exception are prohibited. + * } + * + */ + +static VALUE +control_interrupt_func(rb_thread_t *th) +{ + return rb_yield(Qnil); +} + +static VALUE +rb_thread_s_control_interrupt(VALUE self, VALUE mask_arg) +{ + if (!rb_block_given_p()) { + rb_raise(rb_eArgError, "block is needed."); + } + + return rb_threadptr_interrupt_mask(GET_THREAD(), + rb_convert_type(mask_arg, T_HASH, "Hash", "to_hash"), + control_interrupt_func); +} + +/* + * call-seq: + * Thread.check_interrupt() -> nil + * + * Check queued interrupts. + * + * If there are queued interrupts, process respective procedures. + * + * This method can be defined as the following Ruby code: + * + * def Thread.check_interrupt + * Thread.control_interrupt(Object => :immediate) { + * Thread.pass + * } + * end + * + * Examples: + * + * th = Thread.new{ + * Thread.control_interrupt(RuntimeError => :on_blocking){ + * while true + * ... + * # reach safe point to invoke interrupt + * Thread.check_interrupt + * ... + * end + * } + * } + * ... + * th.raise # stop thread + * + * NOTE: This example can be described by the another code. + * You need to keep to avoid asynchronous interrupts. + * + * flag = true + * th = Thread.new{ + * Thread.control_interrupt(RuntimeError => :on_blocking){ + * while true + * ... + * # reach safe point to invoke interrupt + * break if flag == false + * ... + * end + * } + * } + * ... + * flag = false # stop thread + */ + +static VALUE +check_interrupt_func(rb_thread_t *th) +{ + RUBY_VM_CHECK_INTS(th); + return Qnil; +} + +static VALUE +rb_thread_s_check_interrupt(VALUE self) +{ + rb_thread_t *th = GET_THREAD(); + + if (!rb_threadptr_async_errinfo_empty_p(th)) { + VALUE mask = rb_hash_new(); + rb_hash_aset(mask, rb_cObject, ID2SYM(rb_intern("immediate"))); + rb_threadptr_interrupt_mask(GET_THREAD(), mask, check_interrupt_func); + } + + return Qnil; +} + +static void +rb_threadptr_to_kill(rb_thread_t *th) +{ + rb_threadptr_async_errinfo_clear(th); + th->status = THREAD_TO_KILL; + th->errinfo = INT2FIX(TAG_FATAL); + TH_JUMP_TAG(th, TAG_FATAL); +} + static void -rb_threadptr_execute_interrupts_common(rb_thread_t *th) +rb_threadptr_execute_interrupts_common(rb_thread_t *th, int blocking_timing) { rb_atomic_t interrupt; @@ -1378,15 +1714,15 @@ rb_threadptr_execute_interrupts_common(rb_thread_t *th) /* exception from another thread */ if (rb_threadptr_async_errinfo_active_p(th)) { - VALUE err = rb_threadptr_async_errinfo_deque(th); + VALUE err = rb_threadptr_async_errinfo_deque(th, blocking_timing ? INTERRUPT_ON_BLOCKING : INTERRUPT_NONE); thread_debug("rb_thread_execute_interrupts: %"PRIdVALUE"\n", err); - if (err == eKillSignal /* Thread#kill receieved */ || - err == eTerminateSignal /* Terminate thread */ ) { - rb_threadptr_async_errinfo_clear(th); - th->status = THREAD_TO_KILL; - th->errinfo = INT2FIX(TAG_FATAL); - TH_JUMP_TAG(th, TAG_FATAL); + if (err == Qundef) { + /* no error */ + } + else if (err == eKillSignal /* Thread#kill receieved */ || + err == eTerminateSignal /* Terminate thread */ ) { + rb_threadptr_to_kill(th); } else { rb_exc_raise(err); @@ -1417,9 +1753,9 @@ rb_threadptr_execute_interrupts_common(rb_thread_t *th) } void -rb_threadptr_execute_interrupts(rb_thread_t *th) +rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing) { - rb_threadptr_execute_interrupts_common(th); + rb_threadptr_execute_interrupts_common(th, blocking_timing); } void @@ -1427,59 +1763,7 @@ rb_thread_execute_interrupts(VALUE thval) { rb_thread_t *th; GetThreadPtr(thval, th); - rb_threadptr_execute_interrupts_common(th); -} - -/*****************************************************/ - -/* - * rb_threadptr_async_errinfo_* - manage async errors queue - * - * Async events such as an exception throwed by Thread#raise, - * Thread#kill and thread termination (after main thread termination) - * will be queued to th->async_errinfo_queue. - * - clear: clear the queue. - * - enque: enque err object into queue. - * - deque: deque err object from queue. - * - active_p: return 1 if the queue should be checked. - * - * All rb_threadptr_async_errinfo_* functions are called by - * a GVL acquired thread, of course. - * Note that all "rb_" prefix APIs need GVL to call. - */ - -void -rb_threadptr_async_errinfo_clear(rb_thread_t *th) -{ - rb_ary_clear(th->async_errinfo_queue); -} - -void -rb_threadptr_async_errinfo_enque(rb_thread_t *th, VALUE v) -{ - rb_ary_push(th->async_errinfo_queue, v); - th->async_errinfo_queue_checked = 0; -} - -VALUE -rb_threadptr_async_errinfo_deque(rb_thread_t *th) -{ - VALUE err = rb_ary_shift(th->async_errinfo_queue); - if (RARRAY_LEN(th->async_errinfo_queue) == 0) { - th->async_errinfo_queue_checked = 1; - } - return err; -} - -int -rb_threadptr_async_errinfo_active_p(rb_thread_t *th) -{ - if (th->async_errinfo_queue_checked) { - return 0; - } - else { - return RARRAY_LEN(th->async_errinfo_queue) > 0; - } + rb_threadptr_execute_interrupts_common(th, 1); } static void @@ -1497,7 +1781,12 @@ rb_threadptr_raise(rb_thread_t *th, int argc, VALUE *argv) return Qnil; } - exc = rb_make_exception(argc, argv); + if (argc == 0) { + exc = rb_exc_new(rb_eRuntimeError, 0, 0); + } + else { + exc = rb_make_exception(argc, argv); + } rb_threadptr_async_errinfo_enque(th, exc); rb_threadptr_interrupt(th); return Qnil; @@ -1642,9 +1931,15 @@ rb_thread_kill(VALUE thread) thread_debug("rb_thread_kill: %p (%p)\n", (void *)th, (void *)th->thread_id); - rb_threadptr_async_errinfo_enque(th, eKillSignal); - th->status = THREAD_TO_KILL; - rb_threadptr_interrupt(th); + if (th == GET_THREAD()) { + /* kill myself immediately */ + rb_threadptr_to_kill(th); + } + else { + rb_threadptr_async_errinfo_enque(th, eKillSignal); + th->status = THREAD_TO_KILL; + rb_threadptr_interrupt(th); + } return thread; } @@ -1683,7 +1978,8 @@ rb_thread_s_kill(VALUE obj, VALUE th) static VALUE rb_thread_exit(void) { - return rb_thread_kill(GET_THREAD()->self); + rb_thread_t *th = GET_THREAD(); + return rb_thread_kill(th->self); } @@ -2704,7 +3000,7 @@ do_select(int n, rb_fdset_t *read, rb_fdset_t *write, rb_fdset_t *except, if (result < 0) lerrno = errno; }, ubf_select, th); - RUBY_VM_CHECK_INTS(); + RUBY_VM_CHECK_INTS_BLOCKING(th); errno = lerrno; @@ -2910,6 +3206,7 @@ rb_wait_for_single_fd(int fd, int events, struct timeval *tv) double limit = 0; struct timespec ts; struct timespec *timeout = NULL; + rb_thread_t *th = GET_THREAD(); if (tv) { ts.tv_sec = tv->tv_sec; @@ -2927,9 +3224,9 @@ retry: BLOCKING_REGION({ result = ppoll(&fds, 1, timeout, NULL); if (result < 0) lerrno = errno; - }, ubf_select, GET_THREAD()); + }, ubf_select, th); - RUBY_VM_CHECK_INTS(); + RUBY_VM_CHECK_INTS_BLOCKING(th); if (result < 0) { errno = lerrno; @@ -3673,7 +3970,7 @@ rb_mutex_lock(VALUE self) if (mutex->th == th) mutex_locked(th, self); if (interrupted) { - RUBY_VM_CHECK_INTS(); + RUBY_VM_CHECK_INTS_BLOCKING(th); } } } @@ -4755,6 +5052,8 @@ Init_Thread(void) rb_define_singleton_method(rb_cThread, "DEBUG", rb_thread_s_debug, 0); rb_define_singleton_method(rb_cThread, "DEBUG=", rb_thread_s_debug_set, 1); #endif + rb_define_singleton_method(rb_cThread, "control_interrupt", rb_thread_s_control_interrupt, 1); + rb_define_singleton_method(rb_cThread, "check_interrupt", rb_thread_s_check_interrupt, 1); rb_define_method(rb_cThread, "initialize", thread_initialize, -2); rb_define_method(rb_cThread, "raise", thread_raise_m, -1); @@ -769,25 +769,27 @@ void rb_signal_exec(rb_thread_t *th, int sig); void rb_threadptr_check_signal(rb_thread_t *mth); void rb_threadptr_signal_raise(rb_thread_t *th, int sig); void rb_threadptr_signal_exit(rb_thread_t *th); -void rb_threadptr_execute_interrupts(rb_thread_t *); +void rb_threadptr_execute_interrupts(rb_thread_t *, int); void rb_threadptr_interrupt(rb_thread_t *th); void rb_threadptr_unlock_all_locking_mutexes(rb_thread_t *th); void rb_threadptr_async_errinfo_clear(rb_thread_t *th); void rb_threadptr_async_errinfo_enque(rb_thread_t *th, VALUE v); -VALUE rb_threadptr_async_errinfo_deque(rb_thread_t *th); int rb_threadptr_async_errinfo_active_p(rb_thread_t *th); void rb_thread_lock_unlock(rb_thread_lock_t *); void rb_thread_lock_destroy(rb_thread_lock_t *); -#define RUBY_VM_CHECK_INTS_TH(th) do { \ +#define RUBY_VM_CHECK_INTS_BLOCKING(th) do { \ if (UNLIKELY((th)->interrupt_flag)) { \ - rb_threadptr_execute_interrupts(th); \ + rb_threadptr_execute_interrupts(th, 1); \ } \ } while (0) -#define RUBY_VM_CHECK_INTS() \ - RUBY_VM_CHECK_INTS_TH(GET_THREAD()) +#define RUBY_VM_CHECK_INTS(th) do { \ + if (UNLIKELY((th)->interrupt_flag)) { \ + rb_threadptr_execute_interrupts(th, 0); \ + } \ +} while (0) /* tracer */ void @@ -103,7 +103,7 @@ vm_call0(rb_thread_t* th, VALUE recv, VALUE id, int argc, const VALUE *argv, if (!klass || !(me = rb_method_entry(klass, id))) { return method_missing(recv, id, argc, argv, NOEX_SUPER); } - RUBY_VM_CHECK_INTS(); + RUBY_VM_CHECK_INTS(th); if (!(def = me->def)) return Qnil; goto again; } @@ -138,7 +138,7 @@ vm_call0(rb_thread_t* th, VALUE recv, VALUE id, int argc, const VALUE *argv, rb_bug("vm_call0: unsupported method type (%d)", def->type); val = Qundef; } - RUBY_VM_CHECK_INTS(); + RUBY_VM_CHECK_INTS(th); return val; } diff --git a/vm_insnhelper.c b/vm_insnhelper.c index e4a45e669d..1b23ee6e18 100644 --- a/vm_insnhelper.c +++ b/vm_insnhelper.c @@ -706,7 +706,7 @@ vm_call_method(rb_thread_t *th, rb_control_frame_t *cfp, } } - RUBY_VM_CHECK_INTS(); + RUBY_VM_CHECK_INTS(th); return val; } |