aboutsummaryrefslogtreecommitdiffstats
path: root/thread.c
diff options
context:
space:
mode:
authorSamuel Williams <samuel.williams@oriontransfer.co.nz>2020-09-21 09:54:08 +1200
committerSamuel Williams <samuel.williams@oriontransfer.co.nz>2020-09-21 11:48:44 +1200
commit70f08f1eed1df4579fef047d28fc3c807183fcfa (patch)
tree75b19e3db5bcb6367f000764bf7f5096fc28e0ed /thread.c
parent596173155a15b6d4a7b04bdaf9218b3e756a0683 (diff)
downloadruby-70f08f1eed1df4579fef047d28fc3c807183fcfa.tar.gz
Make `Thread#join` non-blocking.
Diffstat (limited to 'thread.c')
-rw-r--r--thread.c249
1 files changed, 137 insertions, 112 deletions
diff --git a/thread.c b/thread.c
index 53bfbe8562..35a35f23f9 100644
--- a/thread.c
+++ b/thread.c
@@ -544,6 +544,32 @@ terminate_all(rb_ractor_t *r, const rb_thread_t *main_thread)
}
}
+static void
+rb_threadptr_join_list_wakeup(rb_thread_t *thread)
+{
+ struct rb_waiting_list *join_list = thread->join_list;
+
+ while (join_list) {
+ rb_thread_t *target_thread = join_list->thread;
+
+ if (target_thread->scheduler != Qnil) {
+ rb_scheduler_unblock(target_thread->scheduler, target_thread->self, rb_fiberptr_self(join_list->fiber));
+ } else {
+ rb_threadptr_interrupt(target_thread);
+
+ switch (target_thread->status) {
+ case THREAD_STOPPED:
+ case THREAD_STOPPED_FOREVER:
+ target_thread->status = THREAD_RUNNABLE;
+ default:
+ break;
+ }
+ }
+
+ join_list = join_list->next;
+ }
+}
+
void
rb_threadptr_unlock_all_locking_mutexes(rb_thread_t *th)
{
@@ -758,7 +784,6 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start)
{
STACK_GROW_DIR_DETECTION;
enum ruby_tag_type state;
- rb_thread_list_t *join_list;
VALUE errinfo = Qnil;
size_t size = th->vm->default_params.thread_vm_stack_size / sizeof(VALUE);
rb_thread_t *ractor_main_th = th->ractor->threads.main;
@@ -860,20 +885,9 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start)
rb_threadptr_interrupt(ractor_main_th);
}
- /* wake up joining threads */
- join_list = th->join_list;
- while (join_list) {
- rb_threadptr_interrupt(join_list->th);
- switch (join_list->th->status) {
- case THREAD_STOPPED: case THREAD_STOPPED_FOREVER:
- join_list->th->status = THREAD_RUNNABLE;
- default: break;
- }
- join_list = join_list->next;
- }
-
- rb_threadptr_unlock_all_locking_mutexes(th);
- rb_check_deadlock(th->ractor);
+ rb_threadptr_join_list_wakeup(th);
+ rb_threadptr_unlock_all_locking_mutexes(th);
+ rb_check_deadlock(th->ractor);
rb_fiber_close(th->ec->fiber_ptr);
}
@@ -1105,129 +1119,152 @@ rb_thread_create_ractor(rb_ractor_t *g, VALUE args, VALUE proc)
struct join_arg {
- rb_thread_t *target, *waiting;
- rb_hrtime_t *limit;
+ struct rb_waiting_list *waiting_list;
+ rb_thread_t *target;
+ VALUE timeout;
};
static VALUE
remove_from_join_list(VALUE arg)
{
struct join_arg *p = (struct join_arg *)arg;
- rb_thread_t *target_th = p->target, *th = p->waiting;
+ rb_thread_t *target_thread = p->target;
- if (target_th->status != THREAD_KILLED) {
- rb_thread_list_t **p = &target_th->join_list;
+ if (target_thread->status != THREAD_KILLED) {
+ struct rb_waiting_list **join_list = &target_thread->join_list;
- while (*p) {
- if ((*p)->th == th) {
- *p = (*p)->next;
- break;
- }
- p = &(*p)->next;
- }
+ while (*join_list) {
+ if (*join_list == p->waiting_list) {
+ *join_list = (*join_list)->next;
+ break;
+ }
+
+ join_list = &(*join_list)->next;
+ }
}
return Qnil;
}
+static rb_hrtime_t *double2hrtime(rb_hrtime_t *, double);
+
static VALUE
thread_join_sleep(VALUE arg)
{
struct join_arg *p = (struct join_arg *)arg;
- rb_thread_t *target_th = p->target, *th = p->waiting;
- rb_hrtime_t end = 0;
+ rb_thread_t *target_th = p->target, *th = p->waiting_list->thread;
+ rb_hrtime_t end = 0, rel = 0, *limit = 0;
- if (p->limit) {
- end = rb_hrtime_add(*p->limit, rb_hrtime_now());
+ /*
+ * This supports INFINITY and negative values, so we can't use
+ * rb_time_interval right now...
+ */
+ if (p->timeout == Qnil) {
+ /* unlimited */
+ }
+ else if (FIXNUM_P(p->timeout)) {
+ rel = rb_sec2hrtime(NUM2TIMET(p->timeout));
+ limit = &rel;
+ }
+ else {
+ limit = double2hrtime(&rel, rb_num2dbl(p->timeout));
+ }
+
+ if (limit) {
+ end = rb_hrtime_add(*limit, rb_hrtime_now());
}
while (target_th->status != THREAD_KILLED) {
- if (!p->limit) {
- th->status = THREAD_STOPPED_FOREVER;
+ if (th->scheduler != Qnil) {
+ rb_scheduler_block(th->scheduler, target_th->self, p->timeout);
+ } else if (!limit) {
+ th->status = THREAD_STOPPED_FOREVER;
rb_ractor_sleeper_threads_inc(th->ractor);
- rb_check_deadlock(th->ractor);
- native_sleep(th, 0);
+ rb_check_deadlock(th->ractor);
+ native_sleep(th, 0);
rb_ractor_sleeper_threads_dec(th->ractor);
- }
- else {
- if (hrtime_update_expire(p->limit, end)) {
- thread_debug("thread_join: timeout (thid: %"PRI_THREAD_ID")\n",
- thread_id_str(target_th));
- return Qfalse;
- }
- th->status = THREAD_STOPPED;
- native_sleep(th, p->limit);
- }
- RUBY_VM_CHECK_INTS_BLOCKING(th->ec);
- th->status = THREAD_RUNNABLE;
- thread_debug("thread_join: interrupted (thid: %"PRI_THREAD_ID", status: %s)\n",
- thread_id_str(target_th), thread_status_name(target_th, TRUE));
+ }
+ else {
+ if (hrtime_update_expire(limit, end)) {
+ thread_debug("thread_join: timeout (thid: %"PRI_THREAD_ID")\n",
+ thread_id_str(target_th));
+ return Qfalse;
+ }
+ th->status = THREAD_STOPPED;
+ native_sleep(th, limit);
+ }
+ RUBY_VM_CHECK_INTS_BLOCKING(th->ec);
+ th->status = THREAD_RUNNABLE;
+ thread_debug("thread_join: interrupted (thid: %"PRI_THREAD_ID", status: %s)\n",
+ thread_id_str(target_th), thread_status_name(target_th, TRUE));
}
return Qtrue;
}
static VALUE
-thread_join(rb_thread_t *target_th, rb_hrtime_t *rel)
+thread_join(rb_thread_t *target_th, VALUE timeout)
{
- rb_thread_t *th = GET_THREAD();
- struct join_arg arg;
+ rb_execution_context_t *ec = GET_EC();
+ rb_thread_t *th = ec->thread_ptr;
+ rb_fiber_t *fiber = ec->fiber_ptr;
if (th == target_th) {
- rb_raise(rb_eThreadError, "Target thread must not be current thread");
+ rb_raise(rb_eThreadError, "Target thread must not be current thread");
}
+
if (th->ractor->threads.main == target_th) {
- rb_raise(rb_eThreadError, "Target thread must not be main thread");
+ rb_raise(rb_eThreadError, "Target thread must not be main thread");
}
- arg.target = target_th;
- arg.waiting = th;
- arg.limit = rel;
-
thread_debug("thread_join (thid: %"PRI_THREAD_ID", status: %s)\n",
- thread_id_str(target_th), thread_status_name(target_th, TRUE));
+ thread_id_str(target_th), thread_status_name(target_th, TRUE));
if (target_th->status != THREAD_KILLED) {
- rb_thread_list_t list;
- list.next = target_th->join_list;
- list.th = th;
- target_th->join_list = &list;
- if (!rb_ensure(thread_join_sleep, (VALUE)&arg,
- remove_from_join_list, (VALUE)&arg)) {
- return Qnil;
- }
+ struct rb_waiting_list waiting_list;
+ waiting_list.next = target_th->join_list;
+ waiting_list.thread = th;
+ waiting_list.fiber = fiber;
+ target_th->join_list = &waiting_list;
+
+ struct join_arg arg;
+ arg.waiting_list = &waiting_list;
+ arg.target = target_th;
+ arg.timeout = timeout;
+
+ if (!rb_ensure(thread_join_sleep, (VALUE)&arg, remove_from_join_list, (VALUE)&arg)) {
+ return Qnil;
+ }
}
thread_debug("thread_join: success (thid: %"PRI_THREAD_ID", status: %s)\n",
- thread_id_str(target_th), thread_status_name(target_th, TRUE));
+ thread_id_str(target_th), thread_status_name(target_th, TRUE));
if (target_th->ec->errinfo != Qnil) {
- VALUE err = target_th->ec->errinfo;
-
- if (FIXNUM_P(err)) {
- switch (err) {
- case INT2FIX(TAG_FATAL):
- thread_debug("thread_join: terminated (thid: %"PRI_THREAD_ID", status: %s)\n",
- thread_id_str(target_th), thread_status_name(target_th, TRUE));
-
- /* OK. killed. */
- break;
- default:
- rb_bug("thread_join: Fixnum (%d) should not reach here.", FIX2INT(err));
- }
- }
- else if (THROW_DATA_P(target_th->ec->errinfo)) {
- rb_bug("thread_join: THROW_DATA should not reach here.");
- }
- else {
- /* normal exception */
- rb_exc_raise(err);
- }
+ VALUE err = target_th->ec->errinfo;
+
+ if (FIXNUM_P(err)) {
+ switch (err) {
+ case INT2FIX(TAG_FATAL):
+ thread_debug("thread_join: terminated (thid: %"PRI_THREAD_ID", status: %s)\n",
+ thread_id_str(target_th), thread_status_name(target_th, TRUE));
+
+ /* OK. killed. */
+ break;
+ default:
+ rb_bug("thread_join: Fixnum (%d) should not reach here.", FIX2INT(err));
+ }
+ }
+ else if (THROW_DATA_P(target_th->ec->errinfo)) {
+ rb_bug("thread_join: THROW_DATA should not reach here.");
+ }
+ else {
+ /* normal exception */
+ rb_exc_raise(err);
+ }
}
return target_th->self;
}
-static rb_hrtime_t *double2hrtime(rb_hrtime_t *, double);
-
/*
* call-seq:
* thr.join -> thr
@@ -1270,25 +1307,13 @@ static rb_hrtime_t *double2hrtime(rb_hrtime_t *, double);
static VALUE
thread_join_m(int argc, VALUE *argv, VALUE self)
{
- VALUE limit;
- rb_hrtime_t rel, *to = 0;
-
- /*
- * This supports INFINITY and negative values, so we can't use
- * rb_time_interval right now...
- */
- if (!rb_check_arity(argc, 0, 1) || NIL_P(argv[0])) {
- /* unlimited */
- }
- else if (FIXNUM_P(limit = argv[0])) {
- rel = rb_sec2hrtime(NUM2TIMET(limit));
- to = &rel;
- }
- else {
- to = double2hrtime(&rel, rb_num2dbl(limit));
+ VALUE timeout = Qnil;
+
+ if (rb_check_arity(argc, 0, 1)) {
+ timeout = argv[0];
}
- return thread_join(rb_thread_ptr(self), to);
+ return thread_join(rb_thread_ptr(self), timeout);
}
/*
@@ -1309,7 +1334,7 @@ static VALUE
thread_value(VALUE self)
{
rb_thread_t *th = rb_thread_ptr(self);
- thread_join(th, 0);
+ thread_join(th, Qnil);
return th->value;
}
@@ -1486,7 +1511,7 @@ rb_thread_sleep_deadly_allow_spurious_wakeup(VALUE blocker)
{
VALUE scheduler = rb_thread_current_scheduler();
if (scheduler != Qnil) {
- rb_scheduler_block(scheduler, blocker);
+ rb_scheduler_block(scheduler, blocker, Qnil);
} else {
thread_debug("rb_thread_sleep_deadly_allow_spurious_wakeup\n");
sleep_forever(GET_THREAD(), SLEEP_DEADLOCKABLE);
@@ -5559,9 +5584,9 @@ debug_deadlock_check(rb_ractor_t *r, VALUE msg)
}
{
- rb_thread_list_t *list = th->join_list;
+ struct rb_waiting_list *list = th->join_list;
while (list) {
- rb_str_catf(msg, "\n depended by: tb_thread_id:%p", (void *)list->th);
+ rb_str_catf(msg, "\n depended by: tb_thread_id:%p", (void *)list->thread);
list = list->next;
}
}