diff options
author | ko1 <ko1@b2dd03c8-39d4-4d8f-98ff-823fe69b080e> | 2012-07-19 14:19:40 +0000 |
---|---|---|
committer | ko1 <ko1@b2dd03c8-39d4-4d8f-98ff-823fe69b080e> | 2012-07-19 14:19:40 +0000 |
commit | f4a8db647ae66621f5d37402f5a11a3d57c69bb0 (patch) | |
tree | 2325b2b809b9d94ef2ccc367f5f84d9672e9f00d /thread.c | |
parent | 422e8d5adc3cf2d67b53cf9050c750eba7db3673 (diff) | |
download | ruby-f4a8db647ae66621f5d37402f5a11a3d57c69bb0.tar.gz |
* thread.c (rb_thread_s_control_interrupt,
rb_thread_s_check_interrupt): added for
Thread.control_intgerrupt and Thread.check_interrupt.
See details on rdoc.
I'll make an ticket for this feature.
* test/ruby/test_thread.rb: add a test for Thread.control_intgerrupt.
* thread.c (rb_threadptr_raise): make a new exception object
even if argc is 0.
* thread.c (rb_thread_kill): kill thread immediately if target thread
is current thread.
* vm_core.h (RUBY_VM_CHECK_INTS_BLOCKING): added.
CHECK_INTS while/after blocking operation.
* vm_core.h (RUBY_VM_CHECK_INTS): require rb_thread_t ptr.
* cont.c (fiber_switch): use replaced RUBY_VM_CHECK_INTS().
* eval.c (ruby_cleanup): ditto.
* insns.def: ditto.
* process.c (rb_waitpid): ditto.
* vm_eval.c (vm_call0): ditto.
* vm_insnhelper.c (vm_call_method): ditto.
git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@36470 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
Diffstat (limited to 'thread.c')
-rw-r--r-- | thread.c | 471 |
1 files changed, 385 insertions, 86 deletions
@@ -265,7 +265,7 @@ set_unblock_function(rb_thread_t *th, rb_unblock_function_t *func, void *arg, struct rb_unblock_callback *old) { check_ints: - RUBY_VM_CHECK_INTS(); /* check signal or so */ + RUBY_VM_CHECK_INTS(th); /* check signal or so */ native_mutex_lock(&th->interrupt_lock); if (th->interrupt_flag) { native_mutex_unlock(&th->interrupt_lock); @@ -545,7 +545,7 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start, VALUE *register_stack_s static VALUE thread_create_core(VALUE thval, VALUE args, VALUE (*fn)(ANYARGS)) { - rb_thread_t *th; + rb_thread_t *th, *current_th = GET_THREAD(); int err; if (OBJ_FROZEN(GET_THREAD()->thgroup)) { @@ -559,12 +559,12 @@ thread_create_core(VALUE thval, VALUE args, VALUE (*fn)(ANYARGS)) th->first_proc = fn ? Qfalse : rb_block_proc(); th->first_args = args; /* GC: shouldn't put before above line */ - th->priority = GET_THREAD()->priority; - th->thgroup = GET_THREAD()->thgroup; + th->priority = current_th->priority; + th->thgroup = current_th->thgroup; th->async_errinfo_queue = rb_ary_new(); th->async_errinfo_queue_checked = 0; - th->async_errinfo_mask_stack = rb_ary_new(); + th->async_errinfo_mask_stack = rb_ary_dup(current_th->async_errinfo_mask_stack); native_mutex_initialize(&th->interrupt_lock); if (GET_VM()->event_hooks != NULL) @@ -859,7 +859,7 @@ sleep_forever(rb_thread_t *th, int deadlockable) if (deadlockable) { th->vm->sleeper--; } - RUBY_VM_CHECK_INTS(); + RUBY_VM_CHECK_INTS_BLOCKING(th); } while (th->status == status); th->status = prev_status; } @@ -896,7 +896,7 @@ sleep_timeval(rb_thread_t *th, struct timeval tv) th->status = THREAD_STOPPED; do { native_sleep(th, &tv); - RUBY_VM_CHECK_INTS(); + RUBY_VM_CHECK_INTS_BLOCKING(th); getclockofday(&tvn); if (to.tv_sec < tvn.tv_sec) break; if (to.tv_sec == tvn.tv_sec && to.tv_usec <= tvn.tv_usec) break; @@ -968,9 +968,9 @@ rb_thread_wait_for(struct timeval time) void rb_thread_polling(void) { - RUBY_VM_CHECK_INTS(); if (!rb_thread_alone()) { rb_thread_t *th = GET_THREAD(); + RUBY_VM_CHECK_INTS_BLOCKING(th); sleep_for_polling(th); } } @@ -985,7 +985,7 @@ rb_thread_polling(void) void rb_thread_check_ints(void) { - RUBY_VM_CHECK_INTS(); + RUBY_VM_CHECK_INTS_BLOCKING(GET_THREAD()); } /* @@ -1013,7 +1013,7 @@ rb_thread_sleep(int sec) rb_thread_wait_for(rb_time_timeval(INT2FIX(sec))); } -static void rb_threadptr_execute_interrupts_common(rb_thread_t *); +static void rb_threadptr_execute_interrupts_common(rb_thread_t *, int blocking); static void rb_thread_schedule_limits(unsigned long limits_us) @@ -1040,7 +1040,7 @@ rb_thread_schedule(void) rb_thread_schedule_limits(0); if (UNLIKELY(GET_THREAD()->interrupt_flag)) { - rb_threadptr_execute_interrupts_common(GET_THREAD()); + rb_threadptr_execute_interrupts_common(GET_THREAD(), 0); } } @@ -1076,7 +1076,7 @@ rb_thread_blocking_region_end(struct rb_blocking_region_buffer *region) rb_thread_t *th = GET_THREAD(); blocking_region_end(th, region); xfree(region); - RUBY_VM_CHECK_INTS(); + RUBY_VM_CHECK_INTS_BLOCKING(th); errno = saved_errno; } @@ -1181,7 +1181,7 @@ rb_thread_call_without_gvl2(void *(*func)(void *data, int *skip_checkints), void }, ubf, data2); if (!skip_checkints) { - RUBY_VM_CHECK_INTS(); + RUBY_VM_CHECK_INTS_BLOCKING(th); } errno = saved_errno; @@ -1237,7 +1237,7 @@ rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd) JUMP_TAG(state); } /* TODO: check func() */ - RUBY_VM_CHECK_INTS(); + RUBY_VM_CHECK_INTS_BLOCKING(th); errno = saved_errno; @@ -1350,12 +1350,348 @@ thread_s_pass(VALUE klass) return Qnil; } +/*****************************************************/ + /* + * rb_threadptr_async_errinfo_* - manage async errors queue + * + * Async events such as an exception throwed by Thread#raise, + * Thread#kill and thread termination (after main thread termination) + * will be queued to th->async_errinfo_queue. + * - clear: clear the queue. + * - enque: enque err object into queue. + * - deque: deque err object from queue. + * - active_p: return 1 if the queue should be checked. * + * All rb_threadptr_async_errinfo_* functions are called by + * a GVL acquired thread, of course. + * Note that all "rb_" prefix APIs need GVL to call. */ +void +rb_threadptr_async_errinfo_clear(rb_thread_t *th) +{ + rb_ary_clear(th->async_errinfo_queue); +} + +void +rb_threadptr_async_errinfo_enque(rb_thread_t *th, VALUE v) +{ + rb_ary_push(th->async_errinfo_queue, v); + th->async_errinfo_queue_checked = 0; +} + +enum interrupt_timing { + INTERRUPT_NONE, + INTERRUPT_IMMEDIATE, + INTERRUPT_ON_BLOCKING, + INTERRUPT_NEVER +}; + +static enum interrupt_timing +rb_threadptr_async_errinfo_check_mask(rb_thread_t *th, VALUE err) +{ + VALUE mask; + long mask_stack_len = RARRAY_LEN(th->async_errinfo_mask_stack); + VALUE *mask_stack = RARRAY_PTR(th->async_errinfo_mask_stack); + VALUE ancestors = rb_mod_ancestors(err); /* TODO: GC guard */ + long ancestors_len = RARRAY_LEN(ancestors); + VALUE *ancestors_ptr = RARRAY_PTR(ancestors); + int i, j; + + for (i=0; i<mask_stack_len; i++) { + mask = mask_stack[mask_stack_len-(i+1)]; + + for (j=0; j<ancestors_len; j++) { + VALUE klass = ancestors_ptr[j]; + VALUE sym; + + /* TODO: remove rb_intern() */ + if ((sym = rb_hash_aref(mask, klass)) != Qnil) { + if (sym == ID2SYM(rb_intern("immediate"))) { + return INTERRUPT_IMMEDIATE; + } + else if (sym == ID2SYM(rb_intern("on_blocking"))) { + return INTERRUPT_ON_BLOCKING; + } + else if (sym == ID2SYM(rb_intern("never"))) { + return INTERRUPT_NEVER; + } + else { + rb_raise(rb_eThreadError, "unknown mask signature"); + } + } + } + /* try to next mask */ + } + return INTERRUPT_NONE; +} + +static int +rb_threadptr_async_errinfo_empty_p(rb_thread_t *th) +{ + return RARRAY_LEN(th->async_errinfo_queue) == 0; +} + +static VALUE +rb_threadptr_async_errinfo_deque(rb_thread_t *th, enum interrupt_timing timing) +{ +#if 1 /* 1 to enable Thread#control_interrupt, 0 to ignore it */ + int i; + + for (i=0; i<RARRAY_LEN(th->async_errinfo_queue); i++) { + VALUE err = RARRAY_PTR(th->async_errinfo_queue)[i]; + + enum interrupt_timing mask_timing = rb_threadptr_async_errinfo_check_mask(th, CLASS_OF(err)); + + switch (mask_timing) { + case INTERRUPT_ON_BLOCKING: + if (timing != INTERRUPT_ON_BLOCKING) { + break; + } + /* fall through */ + case INTERRUPT_NONE: /* default: IMMEDIATE */ + case INTERRUPT_IMMEDIATE: + rb_ary_delete_at(th->async_errinfo_queue, i); + return err; + case INTERRUPT_NEVER: + break; + } + } + + th->async_errinfo_queue_checked = 1; + return Qundef; +#else + VALUE err = rb_ary_shift(th->async_errinfo_queue); + if (rb_threadptr_async_errinfo_empty_p(th)) { + th->async_errinfo_queue_checked = 1; + } + return err; +#endif +} + +int +rb_threadptr_async_errinfo_active_p(rb_thread_t *th) +{ + if (th->async_errinfo_queue_checked || rb_threadptr_async_errinfo_empty_p(th)) { + return 0; + } + else { + return 1; + } +} + +static VALUE +rb_threadptr_interrupt_mask(rb_thread_t *th, VALUE mask, VALUE (*func)(rb_thread_t *th)) +{ + VALUE r = Qnil; + int state; + + rb_ary_push(th->async_errinfo_mask_stack, mask); + if (!rb_threadptr_async_errinfo_empty_p(th)) { + th->async_errinfo_queue_checked = 0; + RUBY_VM_SET_INTERRUPT(th); + } + + TH_PUSH_TAG(th); + if ((state = EXEC_TAG()) == 0) { + r = func(th); + } + TH_POP_TAG(); + + rb_ary_pop(th->async_errinfo_mask_stack); + if (!rb_threadptr_async_errinfo_empty_p(th)) { + th->async_errinfo_queue_checked = 0; + RUBY_VM_SET_INTERRUPT(th); + } + + if (state) { + JUMP_TAG(state); + } + + return r; +} + +/* + * call-seq: + * Thread.control_interrupt(hash) { ... } -> result of the block + * + * Thread.control_interrupt controls interrupt timing. + * + * _interrupt_ means asynchronous event and corresponding procedure + * by Thread#raise, Thread#kill, signal trap (not supported yet) + * and main thread termination (if main thread terminates, then all + * other thread will be killed). + * + * _hash_ has pairs of ExceptionClass and TimingSymbol. TimingSymbol + * is one of them: + * - :immediate Invoke interrupt immediately. + * - :on_blocking Invoke interrupt while _BlockingOperation_. + * - :never Never invoke interrupt. + * + * _BlockingOperation_ means that the operation will block the calling thread, + * such as read and write. On CRuby implementation, _BlockingOperation_ is + * operation executed without GVL. + * + * Masked interrupts are delayed until they are enabled. + * This method is similar to sigprocmask(3). + * + * TODO (DOC): control_interrupt is stacked. + * TODO (DOC): check ancestors. + * TODO (DOC): to prevent all interrupt, {Object => :never} works. + * + * NOTE: Asynchronous interrupts are difficult to use. + * If you need to communicate between threads, + * please consider to use another way such as Queue. + * Or use them with deep understanding about this method. + * + * + * # example: Guard from Thread#raise + * th = Thread.new do + * Thead.control_interrupt(RuntimeError => :never) { + * begin + * # Thread#raise doesn't interrupt here. + * # You can write resource allocation code safely. + * Thread.control_interrupt(RuntimeError => :immediate) { + * # ... + * # It is possible to be interrupted by Thread#raise. + * } + * ensure + * # Thread#raise doesn't interrupt here. + * # You can write resource dealocation code safely. + * end + * } + * end + * Thread.pass + * # ... + * th.raise "stop" + * + * # example: Guard from TimeoutError + * require 'timeout' + * Thread.control_interrupt(TimeoutError => :never) { + * timeout(10){ + * # TimeoutError doesn't occur here + * Thread.control_interrupt(TimeoutError => :on_blocking) { + * # possible to be killed by TimeoutError + * # while blocking operation + * } + * # TimeoutError doesn't occur here + * } + * } + * + * # example: Stack control settings + * Thread.control_interrupt(FooError => :never) { + * Thread.control_interrupt(BarError => :never) { + * # FooError and BarError are prohibited. + * } + * } + * + * # example: check ancestors + * Thread.control_interrupt(Exception => :never) { + * # all exceptions inherited from Exception are prohibited. + * } + * + */ + +static VALUE +control_interrupt_func(rb_thread_t *th) +{ + return rb_yield(Qnil); +} + +static VALUE +rb_thread_s_control_interrupt(VALUE self, VALUE mask_arg) +{ + if (!rb_block_given_p()) { + rb_raise(rb_eArgError, "block is needed."); + } + + return rb_threadptr_interrupt_mask(GET_THREAD(), + rb_convert_type(mask_arg, T_HASH, "Hash", "to_hash"), + control_interrupt_func); +} + +/* + * call-seq: + * Thread.check_interrupt() -> nil + * + * Check queued interrupts. + * + * If there are queued interrupts, process respective procedures. + * + * This method can be defined as the following Ruby code: + * + * def Thread.check_interrupt + * Thread.control_interrupt(Object => :immediate) { + * Thread.pass + * } + * end + * + * Examples: + * + * th = Thread.new{ + * Thread.control_interrupt(RuntimeError => :on_blocking){ + * while true + * ... + * # reach safe point to invoke interrupt + * Thread.check_interrupt + * ... + * end + * } + * } + * ... + * th.raise # stop thread + * + * NOTE: This example can be described by the another code. + * You need to keep to avoid asynchronous interrupts. + * + * flag = true + * th = Thread.new{ + * Thread.control_interrupt(RuntimeError => :on_blocking){ + * while true + * ... + * # reach safe point to invoke interrupt + * break if flag == false + * ... + * end + * } + * } + * ... + * flag = false # stop thread + */ + +static VALUE +check_interrupt_func(rb_thread_t *th) +{ + RUBY_VM_CHECK_INTS(th); + return Qnil; +} + +static VALUE +rb_thread_s_check_interrupt(VALUE self) +{ + rb_thread_t *th = GET_THREAD(); + + if (!rb_threadptr_async_errinfo_empty_p(th)) { + VALUE mask = rb_hash_new(); + rb_hash_aset(mask, rb_cObject, ID2SYM(rb_intern("immediate"))); + rb_threadptr_interrupt_mask(GET_THREAD(), mask, check_interrupt_func); + } + + return Qnil; +} + +static void +rb_threadptr_to_kill(rb_thread_t *th) +{ + rb_threadptr_async_errinfo_clear(th); + th->status = THREAD_TO_KILL; + th->errinfo = INT2FIX(TAG_FATAL); + TH_JUMP_TAG(th, TAG_FATAL); +} + static void -rb_threadptr_execute_interrupts_common(rb_thread_t *th) +rb_threadptr_execute_interrupts_common(rb_thread_t *th, int blocking_timing) { rb_atomic_t interrupt; @@ -1378,15 +1714,15 @@ rb_threadptr_execute_interrupts_common(rb_thread_t *th) /* exception from another thread */ if (rb_threadptr_async_errinfo_active_p(th)) { - VALUE err = rb_threadptr_async_errinfo_deque(th); + VALUE err = rb_threadptr_async_errinfo_deque(th, blocking_timing ? INTERRUPT_ON_BLOCKING : INTERRUPT_NONE); thread_debug("rb_thread_execute_interrupts: %"PRIdVALUE"\n", err); - if (err == eKillSignal /* Thread#kill receieved */ || - err == eTerminateSignal /* Terminate thread */ ) { - rb_threadptr_async_errinfo_clear(th); - th->status = THREAD_TO_KILL; - th->errinfo = INT2FIX(TAG_FATAL); - TH_JUMP_TAG(th, TAG_FATAL); + if (err == Qundef) { + /* no error */ + } + else if (err == eKillSignal /* Thread#kill receieved */ || + err == eTerminateSignal /* Terminate thread */ ) { + rb_threadptr_to_kill(th); } else { rb_exc_raise(err); @@ -1417,9 +1753,9 @@ rb_threadptr_execute_interrupts_common(rb_thread_t *th) } void -rb_threadptr_execute_interrupts(rb_thread_t *th) +rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing) { - rb_threadptr_execute_interrupts_common(th); + rb_threadptr_execute_interrupts_common(th, blocking_timing); } void @@ -1427,59 +1763,7 @@ rb_thread_execute_interrupts(VALUE thval) { rb_thread_t *th; GetThreadPtr(thval, th); - rb_threadptr_execute_interrupts_common(th); -} - -/*****************************************************/ - -/* - * rb_threadptr_async_errinfo_* - manage async errors queue - * - * Async events such as an exception throwed by Thread#raise, - * Thread#kill and thread termination (after main thread termination) - * will be queued to th->async_errinfo_queue. - * - clear: clear the queue. - * - enque: enque err object into queue. - * - deque: deque err object from queue. - * - active_p: return 1 if the queue should be checked. - * - * All rb_threadptr_async_errinfo_* functions are called by - * a GVL acquired thread, of course. - * Note that all "rb_" prefix APIs need GVL to call. - */ - -void -rb_threadptr_async_errinfo_clear(rb_thread_t *th) -{ - rb_ary_clear(th->async_errinfo_queue); -} - -void -rb_threadptr_async_errinfo_enque(rb_thread_t *th, VALUE v) -{ - rb_ary_push(th->async_errinfo_queue, v); - th->async_errinfo_queue_checked = 0; -} - -VALUE -rb_threadptr_async_errinfo_deque(rb_thread_t *th) -{ - VALUE err = rb_ary_shift(th->async_errinfo_queue); - if (RARRAY_LEN(th->async_errinfo_queue) == 0) { - th->async_errinfo_queue_checked = 1; - } - return err; -} - -int -rb_threadptr_async_errinfo_active_p(rb_thread_t *th) -{ - if (th->async_errinfo_queue_checked) { - return 0; - } - else { - return RARRAY_LEN(th->async_errinfo_queue) > 0; - } + rb_threadptr_execute_interrupts_common(th, 1); } static void @@ -1497,7 +1781,12 @@ rb_threadptr_raise(rb_thread_t *th, int argc, VALUE *argv) return Qnil; } - exc = rb_make_exception(argc, argv); + if (argc == 0) { + exc = rb_exc_new(rb_eRuntimeError, 0, 0); + } + else { + exc = rb_make_exception(argc, argv); + } rb_threadptr_async_errinfo_enque(th, exc); rb_threadptr_interrupt(th); return Qnil; @@ -1642,9 +1931,15 @@ rb_thread_kill(VALUE thread) thread_debug("rb_thread_kill: %p (%p)\n", (void *)th, (void *)th->thread_id); - rb_threadptr_async_errinfo_enque(th, eKillSignal); - th->status = THREAD_TO_KILL; - rb_threadptr_interrupt(th); + if (th == GET_THREAD()) { + /* kill myself immediately */ + rb_threadptr_to_kill(th); + } + else { + rb_threadptr_async_errinfo_enque(th, eKillSignal); + th->status = THREAD_TO_KILL; + rb_threadptr_interrupt(th); + } return thread; } @@ -1683,7 +1978,8 @@ rb_thread_s_kill(VALUE obj, VALUE th) static VALUE rb_thread_exit(void) { - return rb_thread_kill(GET_THREAD()->self); + rb_thread_t *th = GET_THREAD(); + return rb_thread_kill(th->self); } @@ -2704,7 +3000,7 @@ do_select(int n, rb_fdset_t *read, rb_fdset_t *write, rb_fdset_t *except, if (result < 0) lerrno = errno; }, ubf_select, th); - RUBY_VM_CHECK_INTS(); + RUBY_VM_CHECK_INTS_BLOCKING(th); errno = lerrno; @@ -2910,6 +3206,7 @@ rb_wait_for_single_fd(int fd, int events, struct timeval *tv) double limit = 0; struct timespec ts; struct timespec *timeout = NULL; + rb_thread_t *th = GET_THREAD(); if (tv) { ts.tv_sec = tv->tv_sec; @@ -2927,9 +3224,9 @@ retry: BLOCKING_REGION({ result = ppoll(&fds, 1, timeout, NULL); if (result < 0) lerrno = errno; - }, ubf_select, GET_THREAD()); + }, ubf_select, th); - RUBY_VM_CHECK_INTS(); + RUBY_VM_CHECK_INTS_BLOCKING(th); if (result < 0) { errno = lerrno; @@ -3673,7 +3970,7 @@ rb_mutex_lock(VALUE self) if (mutex->th == th) mutex_locked(th, self); if (interrupted) { - RUBY_VM_CHECK_INTS(); + RUBY_VM_CHECK_INTS_BLOCKING(th); } } } @@ -4755,6 +5052,8 @@ Init_Thread(void) rb_define_singleton_method(rb_cThread, "DEBUG", rb_thread_s_debug, 0); rb_define_singleton_method(rb_cThread, "DEBUG=", rb_thread_s_debug_set, 1); #endif + rb_define_singleton_method(rb_cThread, "control_interrupt", rb_thread_s_control_interrupt, 1); + rb_define_singleton_method(rb_cThread, "check_interrupt", rb_thread_s_check_interrupt, 1); rb_define_method(rb_cThread, "initialize", thread_initialize, -2); rb_define_method(rb_cThread, "raise", thread_raise_m, -1); |