aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authornormal <normal@b2dd03c8-39d4-4d8f-98ff-823fe69b080e>2018-08-13 21:34:20 +0000
committernormal <normal@b2dd03c8-39d4-4d8f-98ff-823fe69b080e>2018-08-13 21:34:20 +0000
commit48b6bd74e2febde095ac85d818e94c0e58677647 (patch)
tree73348a2cf6a66b1f1d6620b93109b7f7dfb0ca03
parent4d2e0fffb08f0418fa6995be2e15aad7ee11b048 (diff)
downloadruby-48b6bd74e2febde095ac85d818e94c0e58677647.tar.gz
thread_pthread.c: eliminate timer thread by restructuring GVL
This reverts commit 194a6a2c68e9c8a3536b24db18ceac87535a6051 (r64203). Race conditions which caused the original reversion will be fixed in the subsequent commit. [ruby-core:88360] [Misc #14937] git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@64352 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
-rw-r--r--internal.h3
-rw-r--r--process.c140
-rw-r--r--signal.c7
-rw-r--r--test/ruby/test_io.rb9
-rw-r--r--test/ruby/test_process.rb2
-rw-r--r--test/ruby/test_thread.rb5
-rw-r--r--thread.c397
-rw-r--r--thread_pthread.c710
-rw-r--r--thread_pthread.h20
-rw-r--r--thread_win32.c29
-rw-r--r--vm_core.h8
11 files changed, 746 insertions, 584 deletions
diff --git a/internal.h b/internal.h
index 072c5ce158..fbac11fe9e 100644
--- a/internal.h
+++ b/internal.h
@@ -77,6 +77,9 @@ extern "C" {
# define __has_extension __has_feature
#endif
+/* Prevent compiler from reordering access */
+#define ACCESS_ONCE(type,x) (*((volatile type *)&(x)))
+
#if defined(__STDC_VERSION__) && (__STDC_VERSION__ >= 201112L)
# define STATIC_ASSERT(name, expr) _Static_assert(expr, #name ": " #expr)
#elif GCC_VERSION_SINCE(4, 6, 0) || __has_extension(c_static_assert)
diff --git a/process.c b/process.c
index dd65ad36cf..5c099acb89 100644
--- a/process.c
+++ b/process.c
@@ -924,6 +924,7 @@ struct waitpid_state {
int status;
int options;
int errnum;
+ int sigwait_fd;
};
void rb_native_mutex_lock(rb_nativethread_lock_t *);
@@ -932,13 +933,65 @@ void rb_native_cond_signal(rb_nativethread_cond_t *);
void rb_native_cond_wait(rb_nativethread_cond_t *, rb_nativethread_lock_t *);
rb_nativethread_cond_t *rb_sleep_cond_get(const rb_execution_context_t *);
void rb_sleep_cond_put(rb_nativethread_cond_t *);
+int rb_sigwait_fd_get(const rb_thread_t *);
+void rb_sigwait_sleep(const rb_thread_t *, int fd, const struct timespec *);
+void rb_sigwait_fd_put(const rb_thread_t *, int fd);
+
+static int
+sigwait_fd_migrate_signaled_p(struct waitpid_state *w)
+{
+ int signaled = FALSE;
+ rb_thread_t *th = w->ec ? rb_ec_thread_ptr(w->ec) : 0;
+
+ if (th) rb_native_mutex_lock(&th->interrupt_lock);
+
+ if (w->cond) {
+ rb_native_cond_signal(w->cond);
+ signaled = TRUE;
+ }
+
+ if (th) rb_native_mutex_unlock(&th->interrupt_lock);
+
+ return signaled;
+}
+
+/*
+ * When a thread is done using sigwait_fd and there are other threads
+ * sleeping on waitpid, we must kick one of the threads out of
+ * rb_native_cond_wait so it can switch to rb_sigwait_sleep
+ */
+static void
+sigwait_fd_migrate_sleeper(rb_vm_t *vm)
+{
+ struct waitpid_state *w = 0;
+
+ list_for_each(&vm->waiting_pids, w, wnode) {
+ if (sigwait_fd_migrate_signaled_p(w)) return;
+ }
+ list_for_each(&vm->waiting_grps, w, wnode) {
+ if (sigwait_fd_migrate_signaled_p(w)) return;
+ }
+}
+
+void
+rb_sigwait_fd_migrate(rb_vm_t *vm)
+{
+ rb_native_mutex_lock(&vm->waitpid_lock);
+ sigwait_fd_migrate_sleeper(vm);
+ rb_native_mutex_unlock(&vm->waitpid_lock);
+}
static void
waitpid_notify(struct waitpid_state *w, rb_pid_t ret)
{
w->ret = ret;
list_del_init(&w->wnode);
- rb_native_cond_signal(w->cond);
+ if (w->cond) {
+ rb_native_cond_signal(w->cond);
+ }
+ else {
+ /* w is owned by this thread */
+ }
}
#ifdef _WIN32 /* for spawnvp result from mjit.c */
@@ -950,7 +1003,7 @@ waitpid_notify(struct waitpid_state *w, rb_pid_t ret)
#endif
extern volatile unsigned int ruby_nocldwait; /* signal.c */
-/* called by timer thread */
+/* called by timer thread or thread which acquired sigwait_fd */
static void
waitpid_each(struct list_head *head)
{
@@ -1004,6 +1057,17 @@ waitpid_state_init(struct waitpid_state *w, rb_pid_t pid, int options)
w->options = options;
}
+static const struct timespec *
+sigwait_sleep_time(void)
+{
+ if (SIGCHLD_LOSSY) {
+ static const struct timespec busy_wait = { 0, 100000000 };
+
+ return &busy_wait;
+ }
+ return 0;
+}
+
/*
* must be called with vm->waitpid_lock held, this is not interruptible
*/
@@ -1022,13 +1086,31 @@ ruby_waitpid_locked(rb_vm_t *vm, rb_pid_t pid, int *status, int options,
if (w.ret == -1) w.errnum = errno;
}
else {
- w.cond = cond;
w.ec = 0;
+ w.sigwait_fd = -1;
list_add(w.pid > 0 ? &vm->waiting_pids : &vm->waiting_grps, &w.wnode);
do {
- rb_native_cond_wait(w.cond, &vm->waitpid_lock);
+ if (w.sigwait_fd < 0)
+ w.sigwait_fd = rb_sigwait_fd_get(0);
+
+ if (w.sigwait_fd >= 0) {
+ w.cond = 0;
+ rb_native_mutex_unlock(&vm->waitpid_lock);
+ rb_sigwait_sleep(0, w.sigwait_fd, sigwait_sleep_time());
+ rb_native_mutex_lock(&vm->waitpid_lock);
+ }
+ else {
+ w.cond = cond;
+ rb_native_cond_wait(w.cond, &vm->waitpid_lock);
+ }
} while (!w.ret);
list_del(&w.wnode);
+
+ /* we're done, maybe other waitpid callers are not: */
+ if (w.sigwait_fd >= 0) {
+ rb_sigwait_fd_put(0, w.sigwait_fd);
+ sigwait_fd_migrate_sleeper(vm);
+ }
}
if (status) {
*status = w.status;
@@ -1043,7 +1125,10 @@ waitpid_wake(void *x)
struct waitpid_state *w = x;
/* th->interrupt_lock is already held by rb_threadptr_interrupt_common */
- rb_native_cond_signal(w->cond);
+ if (w->cond)
+ rb_native_cond_signal(w->cond);
+ else
+ rb_thread_wakeup_timer_thread(0); /* kick sigwait_fd */
}
static void *
@@ -1058,11 +1143,40 @@ waitpid_nogvl(void *x)
* by the time we enter this. And we may also be interrupted.
*/
if (!w->ret && !RUBY_VM_INTERRUPTED_ANY(w->ec)) {
- if (SIGCHLD_LOSSY) {
- rb_thread_wakeup_timer_thread();
+ if (w->sigwait_fd < 0)
+ w->sigwait_fd = rb_sigwait_fd_get(th);
+
+ if (w->sigwait_fd >= 0) {
+ rb_nativethread_cond_t *cond = w->cond;
+
+ w->cond = 0;
+ rb_native_mutex_unlock(&th->interrupt_lock);
+ rb_sigwait_sleep(th, w->sigwait_fd, sigwait_sleep_time());
+ rb_native_mutex_lock(&th->interrupt_lock);
+ w->cond = cond;
}
- rb_native_cond_wait(w->cond, &th->interrupt_lock);
+ else {
+ if (!w->cond)
+ w->cond = rb_sleep_cond_get(w->ec);
+
+ /* another thread calling rb_sigwait_sleep will process
+ * signals for us */
+ if (SIGCHLD_LOSSY) {
+ rb_thread_wakeup_timer_thread(0);
+ }
+ rb_native_cond_wait(w->cond, &th->interrupt_lock);
+ }
+ }
+
+ /*
+ * we must release th->native_thread_data.sleep_cond when
+ * re-acquiring GVL:
+ */
+ if (w->cond) {
+ rb_sleep_cond_put(w->cond);
+ w->cond = 0;
}
+
rb_native_mutex_unlock(&th->interrupt_lock);
return 0;
@@ -1092,8 +1206,15 @@ waitpid_cleanup(VALUE x)
list_del(&w->wnode);
rb_native_mutex_unlock(&vm->waitpid_lock);
}
- rb_sleep_cond_put(w->cond);
+ /* we may have never released and re-acquired GVL */
+ if (w->cond)
+ rb_sleep_cond_put(w->cond);
+
+ if (w->sigwait_fd >= 0) {
+ rb_sigwait_fd_put(rb_ec_thread_ptr(w->ec), w->sigwait_fd);
+ rb_sigwait_fd_migrate(rb_ec_vm_ptr(w->ec));
+ }
return Qfalse;
}
@@ -1120,6 +1241,7 @@ waitpid_wait(struct waitpid_state *w)
}
else {
w->cond = rb_sleep_cond_get(w->ec);
+ w->sigwait_fd = -1;
/* order matters, favor specified PIDs rather than -1 or 0 */
list_add(w->pid > 0 ? &vm->waiting_pids : &vm->waiting_grps, &w->wnode);
}
diff --git a/signal.c b/signal.c
index 6393273adf..65b45cf6bd 100644
--- a/signal.c
+++ b/signal.c
@@ -709,9 +709,6 @@ signal_enque(int sig)
static rb_atomic_t sigchld_hit;
-/* Prevent compiler from reordering access */
-#define ACCESS_ONCE(type,x) (*((volatile type *)&(x)))
-
static RETSIGTYPE
sighandler(int sig)
{
@@ -730,7 +727,7 @@ sighandler(int sig)
else {
signal_enque(sig);
}
- rb_thread_wakeup_timer_thread();
+ rb_thread_wakeup_timer_thread(sig);
#if !defined(BSD_SIGNAL) && !defined(POSIX_SIGNAL)
ruby_signal(sig, sighandler);
#endif
@@ -764,7 +761,6 @@ rb_enable_interrupt(void)
#ifdef HAVE_PTHREAD_SIGMASK
sigset_t mask;
sigemptyset(&mask);
- sigaddset(&mask, RUBY_SIGCHLD); /* timer-thread handles this */
pthread_sigmask(SIG_SETMASK, &mask, NULL);
#endif
}
@@ -1077,7 +1073,6 @@ rb_trap_exit(void)
void ruby_waitpid_all(rb_vm_t *); /* process.c */
-/* only runs in the timer-thread */
void
ruby_sigchld_handler(rb_vm_t *vm)
{
diff --git a/test/ruby/test_io.rb b/test/ruby/test_io.rb
index ea1dfc758c..cf8c9651b8 100644
--- a/test/ruby/test_io.rb
+++ b/test/ruby/test_io.rb
@@ -3564,7 +3564,8 @@ __END__
end
def test_race_gets_and_close
- assert_separately([], "#{<<-"begin;"}\n#{<<-"end;"}")
+ opt = { signal: :ABRT, timeout: 200 }
+ assert_separately([], "#{<<-"begin;"}\n#{<<-"end;"}", opt)
bug13076 = '[ruby-core:78845] [Bug #13076]'
begin;
10.times do |i|
@@ -3586,9 +3587,9 @@ __END__
w.close
r.close
end
- assert_nothing_raised(IOError, bug13076) {
- t.each(&:join)
- }
+ t.each do |th|
+ assert_same(th, th.join(2), bug13076)
+ end
end
end;
end
diff --git a/test/ruby/test_process.rb b/test/ruby/test_process.rb
index 759230f834..a0b08dd110 100644
--- a/test/ruby/test_process.rb
+++ b/test/ruby/test_process.rb
@@ -1767,7 +1767,7 @@ class TestProcess < Test::Unit::TestCase
puts Dir.entries("/proc/self/task") - %W[. ..]
end
bug4920 = '[ruby-dev:43873]'
- assert_equal(2, data.size, bug4920)
+ assert_include(1..2, data.size, bug4920)
assert_not_include(data.map(&:to_i), pid)
end
else # darwin
diff --git a/test/ruby/test_thread.rb b/test/ruby/test_thread.rb
index 03fd1e3075..56a7bfb23b 100644
--- a/test/ruby/test_thread.rb
+++ b/test/ruby/test_thread.rb
@@ -952,15 +952,16 @@ _eom
def test_thread_timer_and_interrupt
bug5757 = '[ruby-dev:44985]'
pid = nil
- cmd = 'Signal.trap(:INT, "DEFAULT"); r,=IO.pipe; Thread.start {Thread.pass until Thread.main.stop?; puts; STDOUT.flush}; r.read'
+ cmd = 'Signal.trap(:INT, "DEFAULT"); pipe=IO.pipe; Thread.start {Thread.pass until Thread.main.stop?; puts; STDOUT.flush}; pipe[0].read'
opt = {}
opt[:new_pgroup] = true if /mswin|mingw/ =~ RUBY_PLATFORM
s, t, _err = EnvUtil.invoke_ruby(['-e', cmd], "", true, true, opt) do |in_p, out_p, err_p, cpid|
+ assert IO.select([out_p], nil, nil, 10), 'subprocess not ready'
out_p.gets
pid = cpid
t0 = Time.now.to_f
Process.kill(:SIGINT, pid)
- Process.wait(pid)
+ Timeout.timeout(10) { Process.wait(pid) }
t1 = Time.now.to_f
[$?, t1 - t0, err_p.read]
end
diff --git a/thread.c b/thread.c
index a7d48a464f..9c7fcee05f 100644
--- a/thread.c
+++ b/thread.c
@@ -106,8 +106,13 @@ static int rb_threadptr_pending_interrupt_empty_p(const rb_thread_t *th);
static const char *thread_status_name(rb_thread_t *th, int detail);
static void timespec_add(struct timespec *, const struct timespec *);
static void timespec_sub(struct timespec *, const struct timespec *);
+static int timespec_cmp(const struct timespec *a, const struct timespec *b);
static int timespec_update_expire(struct timespec *, const struct timespec *);
static void getclockofday(struct timespec *);
+NORETURN(static void async_bug_fd(const char *mesg, int errno_arg, int fd));
+static int consume_communication_pipe(int fd);
+static int check_signals_nogvl(rb_thread_t *, int sigwait_fd);
+void rb_sigwait_fd_migrate(rb_vm_t *); /* process.c */
#define eKillSignal INT2FIX(0)
#define eTerminateSignal INT2FIX(1)
@@ -348,7 +353,14 @@ rb_thread_s_debug_set(VALUE self, VALUE val)
#endif
NOINLINE(static int thread_start_func_2(rb_thread_t *th, VALUE *stack_start,
VALUE *register_stack_start));
-static void timer_thread_function(void *);
+static void timer_thread_function(void);
+void ruby_sigchld_handler(rb_vm_t *); /* signal.c */
+
+static void
+ubf_sigwait(void *ignore)
+{
+ rb_thread_wakeup_timer_thread(0);
+}
#if defined(_WIN32)
#include "thread_win32.c"
@@ -373,6 +385,15 @@ static void timer_thread_function(void *);
#error "unsupported thread type"
#endif
+/*
+ * TODO: somebody with win32 knowledge should be able to get rid of
+ * timer-thread by busy-waiting on signals. And it should be possible
+ * to make the GVL in thread_pthread.c be platform-independent.
+ */
+#ifndef BUSY_WAIT_SIGNALS
+# define BUSY_WAIT_SIGNALS (0)
+#endif
+
#if THREAD_DEBUG
static int debug_mutex_initialized = 1;
static rb_nativethread_lock_t debug_mutex;
@@ -412,7 +433,6 @@ rb_vm_gvl_destroy(rb_vm_t *vm)
{
gvl_release(vm);
gvl_destroy(vm);
- rb_native_mutex_destroy(&vm->thread_destruct_lock);
if (0) {
/* may be held by running threads */
rb_native_mutex_destroy(&vm->waitpid_lock);
@@ -773,10 +793,6 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start, VALUE *register_stack_s
rb_fiber_close(th->ec->fiber_ptr);
}
- rb_native_mutex_lock(&th->vm->thread_destruct_lock);
- /* make sure vm->running_thread never point me after this point.*/
- th->vm->running_thread = NULL;
- rb_native_mutex_unlock(&th->vm->thread_destruct_lock);
thread_cleanup_func(th, FALSE);
gvl_release(th->vm);
@@ -2163,6 +2179,14 @@ rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing)
/* signal handling */
if (trap_interrupt && (th == th->vm->main_thread)) {
enum rb_thread_status prev_status = th->status;
+ int sigwait_fd = rb_sigwait_fd_get(th);
+
+ if (sigwait_fd >= 0) {
+ (void)consume_communication_pipe(sigwait_fd);
+ ruby_sigchld_handler(th->vm);
+ rb_sigwait_fd_put(th, sigwait_fd);
+ rb_sigwait_fd_migrate(th->vm);
+ }
th->status = THREAD_RUNNABLE;
while ((sig = rb_get_next_signal()) != 0) {
rb_signal_exec(th, sig);
@@ -3840,86 +3864,95 @@ wait_retryable(int *result, int errnum, struct timespec *timeout,
return FALSE;
}
-#define restore_fdset(fds1, fds2) \
- ((fds1) ? rb_fd_dup(fds1, fds2) : (void)0)
-
struct select_set {
- rb_fdset_t read;
- rb_fdset_t write;
- rb_fdset_t except;
+ int max;
+ int sigwait_fd;
+ rb_thread_t *th;
+ rb_fdset_t *rset;
+ rb_fdset_t *wset;
+ rb_fdset_t *eset;
+ rb_fdset_t orig_rset;
+ rb_fdset_t orig_wset;
+ rb_fdset_t orig_eset;
+ struct timeval *timeout;
};
-static size_t
-select_set_memsize(const void *p)
+static VALUE
+select_set_free(VALUE p)
{
- return sizeof(struct select_set);
+ struct select_set *set = (struct select_set *)p;
+
+ if (set->sigwait_fd >= 0) {
+ rb_sigwait_fd_put(set->th, set->sigwait_fd);
+ rb_sigwait_fd_migrate(set->th->vm);
+ }
+
+ rb_fd_term(&set->orig_rset);
+ rb_fd_term(&set->orig_wset);
+ rb_fd_term(&set->orig_eset);
+
+ return Qfalse;
}
-static void
-select_set_free(void *p)
+static const struct timespec *
+sigwait_timeout(rb_thread_t *th, int sigwait_fd, const struct timespec *orig,
+ int *drained_p)
{
- struct select_set *orig = p;
+ static const struct timespec quantum = { 0, TIME_QUANTUM_USEC * 1000 };
- rb_fd_term(&orig->read);
- rb_fd_term(&orig->write);
- rb_fd_term(&orig->except);
- xfree(orig);
-}
+ if (sigwait_fd >= 0 && (!ubf_threads_empty() || BUSY_WAIT_SIGNALS)) {
+ *drained_p = check_signals_nogvl(th, sigwait_fd);
+ if (!orig || timespec_cmp(orig, &quantum) > 0)
+ return &quantum;
+ }
-static const rb_data_type_t select_set_type = {
- "select_set",
- {NULL, select_set_free, select_set_memsize,},
- 0, 0, RUBY_TYPED_FREE_IMMEDIATELY
-};
+ return orig;
+}
-static int
-do_select(int n, rb_fdset_t *const readfds, rb_fdset_t *const writefds,
- rb_fdset_t *const exceptfds, struct timeval *timeout)
+static VALUE
+do_select(VALUE p)
{
+ struct select_set *set = (struct select_set *)p;
int MAYBE_UNUSED(result);
int lerrno;
struct timespec ts, end, *tsp;
- rb_thread_t *th = GET_THREAD();
- VALUE o;
- struct select_set *orig;
-
- o = TypedData_Make_Struct(0, struct select_set, &select_set_type, orig);
+ const struct timespec *to;
+ struct timeval tv;
- timeout_prepare(&tsp, &ts, &end, timeout);
+ timeout_prepare(&tsp, &ts, &end, set->timeout);
+#define restore_fdset(dst, src) \
+ ((dst) ? rb_fd_dup(dst, src) : (void)0)
#define do_select_update() \
- (restore_fdset(readfds, &orig->read), \
- restore_fdset(writefds, &orig->write), \
- restore_fdset(exceptfds, &orig->except), \
+ (restore_fdset(set->rset, &set->orig_rset), \
+ restore_fdset(set->wset, &set->orig_wset), \
+ restore_fdset(set->eset, &set->orig_eset), \
TRUE)
-#define fd_init_copy(f) \
- (f##fds) ? rb_fd_init_copy(&orig->f, f##fds) : rb_fd_no_init(&orig->f)
- fd_init_copy(read);
- fd_init_copy(write);
- fd_init_copy(except);
-#undef fd_init_copy
-
do {
+ int drained;
lerrno = 0;
- BLOCKING_REGION(th, {
- result = native_fd_select(n, readfds, writefds, exceptfds,
- timeval_for(timeout, tsp), th);
+ BLOCKING_REGION(set->th, {
+ to = sigwait_timeout(set->th, set->sigwait_fd, tsp, &drained);
+ result = native_fd_select(set->max, set->rset, set->wset, set->eset,
+ timeval_for(&tv, to), set->th);
if (result < 0) lerrno = errno;
- }, ubf_select, th, FALSE);
+ }, set->sigwait_fd >= 0 ? ubf_sigwait : ubf_select, set->th, FALSE);
- RUBY_VM_CHECK_INTS_BLOCKING(th->ec); /* may raise */
- } while (wait_retryable(&result, lerrno, tsp, &end) && do_select_update());
+ if (set->sigwait_fd >= 0) {
+ if (result > 0 && rb_fd_isset(set->sigwait_fd, set->rset))
+ result--;
+ (void)check_signals_nogvl(set->th, set->sigwait_fd);
+ }
- /* didn't raise, perform cleanup ourselves */
- select_set_free(orig);
- rb_gc_force_recycle(o);
+ RUBY_VM_CHECK_INTS_BLOCKING(set->th->ec); /* may raise */
+ } while (wait_retryable(&result, lerrno, tsp, &end) && do_select_update());
if (result < 0) {
errno = lerrno;
}
- return result;
+ return (VALUE)result;
}
static void
@@ -3955,11 +3988,42 @@ rb_thread_fd_writable(int fd)
return TRUE;
}
+static rb_fdset_t *
+init_set_fd(int fd, rb_fdset_t *fds)
+{
+ if (fd < 0) {
+ return 0;
+ }
+ rb_fd_init(fds);
+ rb_fd_set(fd, fds);
+
+ return fds;
+}
+
int
rb_thread_fd_select(int max, rb_fdset_t * read, rb_fdset_t * write, rb_fdset_t * except,
struct timeval *timeout)
{
- if (!read && !write && !except) {
+ struct select_set set;
+
+ set.th = GET_THREAD();
+ set.max = max;
+ set.sigwait_fd = rb_sigwait_fd_get(set.th);
+ set.rset = read;
+ set.wset = write;
+ set.eset = except;
+ set.timeout = timeout;
+
+ if (set.sigwait_fd >= 0) {
+ if (set.rset)
+ rb_fd_set(set.sigwait_fd, set.rset);
+ else
+ set.rset = init_set_fd(set.sigwait_fd, &set.orig_rset);
+ if (set.sigwait_fd > set.max) {
+ set.max = set.sigwait_fd + 1;
+ }
+ }
+ if (!set.rset && !set.wset && !set.eset) {
if (!timeout) {
rb_thread_sleep_forever();
return 0;
@@ -3968,16 +4032,23 @@ rb_thread_fd_select(int max, rb_fdset_t * read, rb_fdset_t * write, rb_fdset_t *
return 0;
}
- if (read) {
- rb_fd_resize(max - 1, read);
- }
- if (write) {
- rb_fd_resize(max - 1, write);
- }
- if (except) {
- rb_fd_resize(max - 1, except);
- }
- return do_select(max, read, write, except, timeout);
+#define fd_init_copy(f) do { \
+ if (set.f) { \
+ rb_fd_resize(set.max - 1, set.f); \
+ if (&set.orig_##f != set.f) { /* sigwait_fd */ \
+ rb_fd_init_copy(&set.orig_##f, set.f); \
+ } \
+ } \
+ else { \
+ rb_fd_no_init(&set.orig_##f); \
+ } \
+ } while (0)
+ fd_init_copy(rset);
+ fd_init_copy(wset);
+ fd_init_copy(eset);
+#undef fd_init_copy
+
+ return (int)rb_ensure(do_select, (VALUE)&set, select_set_free, (VALUE)&set);
}
#ifdef USE_POLL
@@ -3991,68 +4062,64 @@ rb_thread_fd_select(int max, rb_fdset_t * read, rb_fdset_t * write, rb_fdset_t *
# define POLLERR_SET (0)
#endif
-#ifndef HAVE_PPOLL
-/* TODO: don't ignore sigmask */
-static int
-ruby_ppoll(struct pollfd *fds, nfds_t nfds,
- const struct timespec *ts, const sigset_t *sigmask)
-{
- int timeout_ms;
-
- if (ts) {
- int tmp, tmp2;
-
- if (ts->tv_sec > INT_MAX/1000)
- timeout_ms = INT_MAX;
- else {
- tmp = (int)(ts->tv_sec * 1000);
- /* round up 1ns to 1ms to avoid excessive wakeups for <1ms sleep */
- tmp2 = (int)((ts->tv_nsec + 999999L) / (1000L * 1000L));
- if (INT_MAX - tmp < tmp2)
- timeout_ms = INT_MAX;
- else
- timeout_ms = (int)(tmp + tmp2);
- }
- }
- else
- timeout_ms = -1;
-
- return poll(fds, nfds, timeout_ms);
-}
-# define ppoll(fds,nfds,ts,sigmask) ruby_ppoll((fds),(nfds),(ts),(sigmask))
-#endif
-
/*
* returns a mask of events
*/
int
rb_wait_for_single_fd(int fd, int events, struct timeval *timeout)
{
- struct pollfd fds;
+ struct pollfd fds[2];
int result = 0, lerrno;
struct timespec ts, end, *tsp;
+ const struct timespec *to;
+ int drained;
rb_thread_t *th = GET_THREAD();
+ nfds_t nfds;
+ rb_unblock_function_t *ubf;
timeout_prepare(&tsp, &ts, &end, timeout);
- fds.fd = fd;
- fds.events = (short)events;
-
+ fds[0].fd = fd;
+ fds[0].events = (short)events;
do {
- fds.revents = 0;
+ fds[0].revents = 0;
+ fds[1].fd = rb_sigwait_fd_get(th);
+
+ if (fds[1].fd >= 0) {
+ fds[1].events = POLLIN;
+ fds[1].revents = 0;
+ nfds = 2;
+ ubf = ubf_sigwait;
+ }
+ else {
+ nfds = 1;
+ ubf = ubf_select;
+ }
+
lerrno = 0;
BLOCKING_REGION(th, {
- result = ppoll(&fds, 1, tsp, NULL);
+ to = sigwait_timeout(th, fds[1].fd, tsp, &drained);
+ result = ppoll(fds, nfds, to, NULL);
if (result < 0) lerrno = errno;
- }, ubf_select, th, FALSE);
+ }, ubf, th, FALSE);
+ if (fds[1].fd >= 0) {
+ if (result > 0 && fds[1].revents) {
+ result--;
+ fds[1].revents = 0;
+ }
+ (void)check_signals_nogvl(th, fds[1].fd);
+ rb_sigwait_fd_put(th, fds[1].fd);
+ rb_sigwait_fd_migrate(th->vm);
+ }
RUBY_VM_CHECK_INTS_BLOCKING(th->ec);
} while (wait_retryable(&result, lerrno, tsp, &end));
+
if (result < 0) {
errno = lerrno;
return -1;
}
- if (fds.revents & POLLNVAL) {
+ if (fds[0].revents & POLLNVAL) {
errno = EBADF;
return -1;
}
@@ -4062,32 +4129,20 @@ rb_wait_for_single_fd(int fd, int events, struct timeval *timeout)
* Therefore we need to fix it up.
*/
result = 0;
- if (fds.revents & POLLIN_SET)
+ if (fds[0].revents & POLLIN_SET)
result |= RB_WAITFD_IN;
- if (fds.revents & POLLOUT_SET)
+ if (fds[0].revents & POLLOUT_SET)
result |= RB_WAITFD_OUT;
- if (fds.revents & POLLEX_SET)
+ if (fds[0].revents & POLLEX_SET)
result |= RB_WAITFD_PRI;
/* all requested events are ready if there is an error */
- if (fds.revents & POLLERR_SET)
+ if (fds[0].revents & POLLERR_SET)
result |= events;
return result;
}
#else /* ! USE_POLL - implement rb_io_poll_fd() using select() */
-static rb_fdset_t *
-init_set_fd(int fd, rb_fdset_t *fds)
-{
- if (fd < 0) {
- return 0;
- }
- rb_fd_init(fds);
- rb_fd_set(fd, fds);
-
- return fds;
-}
-
struct select_args {
union {
int fd;
@@ -4168,10 +4223,6 @@ rb_gc_set_stack_end(VALUE **stack_end_p)
}
#endif
-
-/* signal.c */
-void ruby_sigchld_handler(rb_vm_t *);
-
/*
*
*/
@@ -4187,36 +4238,81 @@ rb_threadptr_check_signal(rb_thread_t *mth)
}
static void
-timer_thread_function(void *arg)
+timer_thread_function(void)
{
- rb_vm_t *vm = GET_VM(); /* TODO: fix me for Multi-VM */
+ volatile rb_execution_context_t *ec;
- /*
- * Tricky: thread_destruct_lock doesn't close a race against
- * vm->running_thread switch. however it guarantees th->running_thread
- * point to valid pointer or NULL.
- */
- rb_native_mutex_lock(&vm->thread_destruct_lock);
/* for time slice */
- if (vm->running_thread) {
- RUBY_VM_SET_TIMER_INTERRUPT(vm->running_thread->ec);
- }
- rb_native_mutex_unlock(&vm->thread_destruct_lock);
-
- /* check signal */
- ruby_sigchld_handler(vm);
- rb_threadptr_check_signal(vm->main_thread);
+ ec = ACCESS_ONCE(rb_execution_context_t *,
+ ruby_current_execution_context_ptr);
+ if (ec) RUBY_VM_SET_TIMER_INTERRUPT(ec);
+}
-#if 0
- /* prove profiler */
- if (vm->prove_profile.enable) {
- rb_thread_t *th = vm->running_thread;
+static void
+async_bug_fd(const char *mesg, int errno_arg, int fd)
+{
+ char buff[64];
+ size_t n = strlcpy(buff, mesg, sizeof(buff));
+ if (n < sizeof(buff)-3) {
+ ruby_snprintf(buff+n, sizeof(buff)-n, "(%d)", fd);
+ }
+ rb_async_bug_errno(buff, errno_arg);
+}
- if (vm->during_gc) {
- /* GC prove profiling */
+/* VM-dependent API is not available for this function */
+static int
+consume_communication_pipe(int fd)
+{
+#define CCP_READ_BUFF_SIZE 1024
+ /* buffer can be shared because no one refers to them. */
+ static char buff[CCP_READ_BUFF_SIZE];
+ ssize_t result;
+ int ret = FALSE; /* for rb_sigwait_sleep */
+
+ while (1) {
+ result = read(fd, buff, sizeof(buff));
+ if (result > 0) {
+ ret = TRUE;
+ if (result < (ssize_t)sizeof(buff)) {
+ return ret;
+ }
}
- }
+ else if (result == 0) {
+ return ret;
+ }
+ else if (result < 0) {
+ int e = errno;
+ switch (e) {
+ case EINTR:
+ continue; /* retry */
+ case EAGAIN:
+#if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN
+ case EWOULDBLOCK:
#endif
+ return ret;
+ default:
+ async_bug_fd("consume_communication_pipe: read", e, fd);
+ }
+ }
+ }
+}
+
+static int
+check_signals_nogvl(rb_thread_t *th, int sigwait_fd)
+{
+ rb_vm_t *vm = GET_VM(); /* th may be 0 */
+ int ret = consume_communication_pipe(sigwait_fd);
+ ubf_wakeup_all_threads();
+ ruby_sigchld_handler(vm);
+ if (rb_signal_buff_size()) {
+ if (th == vm->main_thread)
+ /* no need to lock + wakeup if already in main thread */
+ RUBY_VM_SET_TRAP_INTERRUPT(th->ec);
+ else
+ threadptr_trap_interrupt(vm->main_thread);
+ ret = TRUE; /* for SIGCHLD_LOSSY && rb_sigwait_sleep */
+ }
+ return ret;
}
void
@@ -5046,7 +5142,6 @@ Init_Thread(void)
/* acquire global vm lock */
gvl_init(th->vm);
gvl_acquire(th->vm, th);
- rb_native_mutex_initialize(&th->vm->thread_destruct_lock);
rb_native_mutex_initialize(&th->vm->waitpid_lock);
rb_native_mutex_initialize(&th->interrupt_lock);
diff --git a/thread_pthread.c b/thread_pthread.c
index 29805ef2df..545cc2fa3b 100644
--- a/thread_pthread.c
+++ b/thread_pthread.c
@@ -45,27 +45,21 @@ void rb_native_cond_broadcast(rb_nativethread_cond_t *cond);
void rb_native_cond_wait(rb_nativethread_cond_t *cond, rb_nativethread_lock_t *mutex);
void rb_native_cond_initialize(rb_nativethread_cond_t *cond);
void rb_native_cond_destroy(rb_nativethread_cond_t *cond);
-static void rb_thread_wakeup_timer_thread_low(void);
static void clear_thread_cache_altstack(void);
+static void ubf_wakeup_all_threads(void);
+static int ubf_threads_empty(void);
+static int native_cond_timedwait(rb_nativethread_cond_t *, pthread_mutex_t *,
+ const struct timespec *);
+static const struct timespec *sigwait_timeout(rb_thread_t *, int sigwait_fd,
+ const struct timespec *,
+ int *drained_p);
-#define TIMER_THREAD_MASK (1)
-#define TIMER_THREAD_SLEEPY (2|TIMER_THREAD_MASK)
-#define TIMER_THREAD_BUSY (4|TIMER_THREAD_MASK)
+#define TIMER_THREAD_CREATED_P() (timer_thread_pipe.owner_process == getpid())
-#if defined(HAVE_POLL) && defined(HAVE_FCNTL) && defined(F_GETFL) && \
- defined(F_SETFL) && defined(O_NONBLOCK) && \
- defined(F_GETFD) && defined(F_SETFD) && defined(FD_CLOEXEC)
-/* The timer thread sleeps while only one Ruby thread is running. */
-# define TIMER_IMPL TIMER_THREAD_SLEEPY
-#else
-# define TIMER_IMPL TIMER_THREAD_BUSY
-#endif
-
-static struct {
- pthread_t id;
- int created;
-} timer_thread;
-#define TIMER_THREAD_CREATED_P() (timer_thread.created != 0)
+/* for testing, and in case we come across a platform w/o pipes: */
+#define BUSY_WAIT_SIGNALS (0)
+#define THREAD_INVALID ((const rb_thread_t *)-1)
+static const rb_thread_t *sigwait_th;
#ifdef HAVE_SCHED_YIELD
#define native_thread_yield() (void)sched_yield()
@@ -82,49 +76,96 @@ static pthread_condattr_t *condattr_monotonic = &condattr_mono;
static const void *const condattr_monotonic = NULL;
#endif
+/* 100ms. 10ms is too small for user level thread scheduling
+ * on recent Linux (tested on 2.6.35)
+ */
+#define TIME_QUANTUM_USEC (100 * 1000)
+
+static struct timespec native_cond_timeout(rb_nativethread_cond_t *,
+ struct timespec rel);
+
static void
-gvl_acquire_common(rb_vm_t *vm)
+gvl_acquire_common(rb_vm_t *vm, rb_thread_t *th)
{
if (vm->gvl.acquired) {
-
- if (!vm->gvl.waiting++) {
- /*
- * Wake up timer thread iff timer thread is slept.
- * When timer thread is polling mode, we don't want to
- * make confusing timer thread interval time.
- */
- rb_thread_wakeup_timer_thread_low();
- }
-
- while (vm->gvl.acquired) {
- rb_native_cond_wait(&vm->gvl.cond, &vm->gvl.lock);
- }
-
- --vm->gvl.waiting;
-
- if (vm->gvl.need_yield) {
- vm->gvl.need_yield = 0;
+ native_thread_data_t *nd = &th->native_thread_data;
+
+ VM_ASSERT(th->unblock.func == 0 && "we reuse ubf_list for GVL waitq");
+
+ list_add_tail(&vm->gvl.waitq, &nd->ubf_list);
+ do {
+ if (!vm->gvl.timer) {
+ static struct timespec ts;
+ static int err = ETIMEDOUT;
+
+ /*
+ * become designated timer thread to kick vm->gvl.acquired
+ * periodically. Continue on old timeout if it expired:
+ */
+ if (err == ETIMEDOUT) {
+ ts.tv_sec = 0;
+ ts.tv_nsec = TIME_QUANTUM_USEC * 1000;
+ ts = native_cond_timeout(&nd->cond.gvlq, ts);
+ }
+ vm->gvl.timer = th;
+ err = native_cond_timedwait(&nd->cond.gvlq, &vm->gvl.lock, &ts);
+ vm->gvl.timer = 0;
+ ubf_wakeup_all_threads();
+
+ /*
+ * Timeslice. We can't touch thread_destruct_lock here,
+ * as the process may fork while this thread is contending
+ * for GVL:
+ */
+ if (vm->gvl.acquired) timer_thread_function();
+ }
+ else {
+ rb_native_cond_wait(&nd->cond.gvlq, &vm->gvl.lock);
+ }
+ } while (vm->gvl.acquired);
+
+ list_del_init(&nd->ubf_list);
+
+ if (vm->gvl.need_yield) {
+ vm->gvl.need_yield = 0;
rb_native_cond_signal(&vm->gvl.switch_cond);
- }
+ }
}
+ vm->gvl.acquired = th;
+ /*
+ * Designate the next gvl.timer thread, favor the last thread in
+ * the waitq since it will be in waitq longest
+ */
+ if (!vm->gvl.timer) {
+ native_thread_data_t *last;
- vm->gvl.acquired = 1;
+ last = list_tail(&vm->gvl.waitq, native_thread_data_t, ubf_list);
+ if (last) {
+ rb_native_cond_signal(&last->cond.gvlq);
+ }
+ else if (!ubf_threads_empty()) {
+ rb_thread_wakeup_timer_thread(0);
+ }
+ }
}
static void
gvl_acquire(rb_vm_t *vm, rb_thread_t *th)
{
rb_native_mutex_lock(&vm->gvl.lock);
- gvl_acquire_common(vm);
+ gvl_acquire_common(vm, th);
rb_native_mutex_unlock(&vm->gvl.lock);
}
-static void
+static native_thread_data_t *
gvl_release_common(rb_vm_t *vm)
{
+ native_thread_data_t *next;
vm->gvl.acquired = 0;
- if (vm->gvl.waiting > 0)
- rb_native_cond_signal(&vm->gvl.cond);
+ next = list_top(&vm->gvl.waitq, native_thread_data_t, ubf_list);
+ if (next) rb_native_cond_signal(&next->cond.gvlq);
+
+ return next;
}
static void
@@ -138,34 +179,38 @@ gvl_release(rb_vm_t *vm)
static void
gvl_yield(rb_vm_t *vm, rb_thread_t *th)
{
- rb_native_mutex_lock(&vm->gvl.lock);
+ native_thread_data_t *next;
- gvl_release_common(vm);
+ rb_native_mutex_lock(&vm->gvl.lock);
+ next = gvl_release_common(vm);
/* An another thread is processing GVL yield. */
if (UNLIKELY(vm->gvl.wait_yield)) {
- while (vm->gvl.wait_yield)
+ while (vm->gvl.wait_yield)
rb_native_cond_wait(&vm->gvl.switch_wait_cond, &vm->gvl.lock);
- goto acquire;
}
-
- if (vm->gvl.waiting > 0) {
- /* Wait until another thread task take GVL. */
- vm->gvl.need_yield = 1;
- vm->gvl.wait_yield = 1;
- while (vm->gvl.need_yield)
+ else if (next) {
+ /* Wait until another thread task takes GVL. */
+ vm->gvl.need_yield = 1;
+ vm->gvl.wait_yield = 1;
+ while (vm->gvl.need_yield)
rb_native_cond_wait(&vm->gvl.switch_cond, &vm->gvl.lock);
- vm->gvl.wait_yield = 0;
+ vm->gvl.wait_yield = 0;
+ rb_native_cond_broadcast(&vm->gvl.switch_wait_cond);
}
else {
- rb_native_mutex_unlock(&vm->gvl.lock);
- sched_yield();
+ rb_native_mutex_unlock(&vm->gvl.lock);
+ /*
+ * GVL was not contended when we released, so we have no potential
+ * contenders for reacquisition. Perhaps they are stuck in blocking
+ * region w/o GVL, too, so we kick them:
+ */
+ ubf_wakeup_all_threads();
+ native_thread_yield();
rb_native_mutex_lock(&vm->gvl.lock);
+ rb_native_cond_broadcast(&vm->gvl.switch_wait_cond);
}
-
- rb_native_cond_broadcast(&vm->gvl.switch_wait_cond);
- acquire:
- gvl_acquire_common(vm);
+ gvl_acquire_common(vm, th);
rb_native_mutex_unlock(&vm->gvl.lock);
}
@@ -173,11 +218,11 @@ static void
gvl_init(rb_vm_t *vm)
{
rb_native_mutex_initialize(&vm->gvl.lock);
- rb_native_cond_initialize(&vm->gvl.cond);
rb_native_cond_initialize(&vm->gvl.switch_cond);
rb_native_cond_initialize(&vm->gvl.switch_wait_cond);
+ list_head_init(&vm->gvl.waitq);
vm->gvl.acquired = 0;
- vm->gvl.waiting = 0;
+ vm->gvl.timer = 0;
vm->gvl.need_yield = 0;
vm->gvl.wait_yield = 0;
}
@@ -185,10 +230,16 @@ gvl_init(rb_vm_t *vm)
static void
gvl_destroy(rb_vm_t *vm)
{
- rb_native_cond_destroy(&vm->gvl.switch_wait_cond);
- rb_native_cond_destroy(&vm->gvl.switch_cond);
- rb_native_cond_destroy(&vm->gvl.cond);
- rb_native_mutex_destroy(&vm->gvl.lock);
+ /*
+ * only called once at VM shutdown (not atfork), another thread
+ * may still grab vm->gvl.lock when calling gvl_release at
+ * the end of thread_start_func_2
+ */
+ if (0) {
+ rb_native_cond_destroy(&vm->gvl.switch_wait_cond);
+ rb_native_cond_destroy(&vm->gvl.switch_cond);
+ rb_native_mutex_destroy(&vm->gvl.lock);
+ }
clear_thread_cache_altstack();
}
@@ -433,7 +484,9 @@ native_thread_init(rb_thread_t *th)
#ifdef USE_UBF_LIST
list_node_init(&nd->ubf_list);
#endif
- rb_native_cond_initialize(&nd->sleep_cond);
+ rb_native_cond_initialize(&nd->cond.gvlq);
+ if (&nd->cond.gvlq != &nd->cond.intr)
+ rb_native_cond_initialize(&nd->cond.intr);
ruby_thread_set_native(th);
}
@@ -444,7 +497,11 @@ native_thread_init(rb_thread_t *th)
static void
native_thread_destroy(rb_thread_t *th)
{
- rb_native_cond_destroy(&th->native_thread_data.sleep_cond);
+ native_thread_data_t *nd = &th->native_thread_data;
+
+ rb_native_cond_destroy(&nd->cond.gvlq);
+ if (&nd->cond.gvlq != &nd->cond.intr)
+ rb_native_cond_destroy(&nd->cond.intr);
/*
* prevent false positive from ruby_thread_has_gvl_p if that
@@ -1012,17 +1069,6 @@ native_thread_create(rb_thread_t *th)
return err;
}
-#if (TIMER_IMPL & TIMER_THREAD_MASK)
-static void
-native_thread_join(pthread_t th)
-{
- int err = pthread_join(th, 0);
- if (err) {
- rb_raise(rb_eThreadError, "native_thread_join() failed (%d)", err);
- }
-}
-#endif /* TIMER_THREAD_MASK */
-
#if USE_NATIVE_THREAD_PRIORITY
static void
@@ -1064,15 +1110,15 @@ ubf_pthread_cond_signal(void *ptr)
{
rb_thread_t *th = (rb_thread_t *)ptr;
thread_debug("ubf_pthread_cond_signal (%p)\n", (void *)th);
- rb_native_cond_signal(&th->native_thread_data.sleep_cond);
+ rb_native_cond_signal(&th->native_thread_data.cond.intr);
}
static void
-native_sleep(rb_thread_t *th, struct timespec *timeout_rel)
+native_cond_sleep(rb_thread_t *th, struct timespec *timeout_rel)
{
struct timespec timeout;
rb_nativethread_lock_t *lock = &th->interrupt_lock;
- rb_nativethread_cond_t *cond = &th->native_thread_data.sleep_cond;
+ rb_nativethread_cond_t *cond = &th->native_thread_data.cond.intr;
if (timeout_rel) {
/* Solaris cond_timedwait() return EINVAL if an argument is greater than
@@ -1164,17 +1210,30 @@ static void
ubf_select(void *ptr)
{
rb_thread_t *th = (rb_thread_t *)ptr;
+ rb_vm_t *vm = th->vm;
+
register_ubf_list(th);
/*
* ubf_wakeup_thread() doesn't guarantee to wake up a target thread.
* Therefore, we repeatedly call ubf_wakeup_thread() until a target thread
- * exit from ubf function.
- * In the other hands, we shouldn't call rb_thread_wakeup_timer_thread()
- * if running on timer thread because it may make endless wakeups.
+ * exit from ubf function. We must designate a timer-thread to perform
+ * this operation.
*/
- if (!pthread_equal(pthread_self(), timer_thread.id))
- rb_thread_wakeup_timer_thread();
+ rb_native_mutex_lock(&vm->gvl.lock);
+ if (!vm->gvl.timer) {
+ native_thread_data_t *last;
+
+ last = list_tail(&vm->gvl.waitq, native_thread_data_t, ubf_list);
+ if (last) {
+ rb_native_cond_signal(&last->cond.gvlq);
+ }
+ else {
+ rb_thread_wakeup_timer_thread(0);
+ }
+ }
+ rb_native_mutex_unlock(&vm->gvl.lock);
+
ubf_wakeup_thread(th);
}
@@ -1211,39 +1270,16 @@ static int ubf_threads_empty(void) { return 1; }
#define TT_DEBUG 0
#define WRITE_CONST(fd, str) (void)(write((fd),(str),sizeof(str)-1)<0)
-/* 100ms. 10ms is too small for user level thread scheduling
- * on recent Linux (tested on 2.6.35)
- */
-#define TIME_QUANTUM_USEC (100 * 1000)
-
-#if TIMER_IMPL == TIMER_THREAD_SLEEPY
static struct {
- /*
- * Read end of each pipe is closed inside timer thread for shutdown
- * Write ends are closed by a normal Ruby thread during shutdown
- */
+ /* pipes are closed in forked children when owner_process does not match */
int normal[2];
- int low[2];
/* volatile for signal handler use: */
volatile rb_pid_t owner_process;
} timer_thread_pipe = {
{-1, -1},
- {-1, -1}, /* low priority */
};
-NORETURN(static void async_bug_fd(const char *mesg, int errno_arg, int fd));
-static void
-async_bug_fd(const char *mesg, int errno_arg, int fd)
-{
- char buff[64];
- size_t n = strlcpy(buff, mesg, sizeof(buff));
- if (n < sizeof(buff)-3) {
- ruby_snprintf(buff+n, sizeof(buff)-n, "(%d)", fd);
- }
- rb_async_bug_errno(buff, errno_arg);
-}
-
/* only use signal-safe system calls here */
static void
rb_thread_wakeup_timer_thread_fd(int fd)
@@ -1275,49 +1311,33 @@ rb_thread_wakeup_timer_thread_fd(int fd)
}
void
-rb_thread_wakeup_timer_thread(void)
+rb_thread_wakeup_timer_thread(int sig)
{
/* must be safe inside sighandler, so no mutex */
if (timer_thread_pipe.owner_process == getpid()) {
- rb_thread_wakeup_timer_thread_fd(timer_thread_pipe.normal[1]);
- }
-}
-
-static void
-rb_thread_wakeup_timer_thread_low(void)
-{
- if (timer_thread_pipe.owner_process == getpid()) {
- rb_thread_wakeup_timer_thread_fd(timer_thread_pipe.low[1]);
- }
-}
-
-/* VM-dependent API is not available for this function */
-static void
-consume_communication_pipe(int fd)
-{
-#define CCP_READ_BUFF_SIZE 1024
- /* buffer can be shared because no one refers to them. */
- static char buff[CCP_READ_BUFF_SIZE];
- ssize_t result;
-
- while (1) {
- result = read(fd, buff, sizeof(buff));
- if (result == 0) {
- return;
- }
- else if (result < 0) {
- int e = errno;
- switch (e) {
- case EINTR:
- continue; /* retry */
- case EAGAIN:
-#if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN
- case EWOULDBLOCK:
-#endif
- return;
- default:
- async_bug_fd("consume_communication_pipe: read", e, fd);
- }
+ rb_thread_wakeup_timer_thread_fd(timer_thread_pipe.normal[1]);
+
+ /*
+ * system_working check is required because vm and main_thread are
+ * freed during shutdown
+ */
+ if (sig && system_working) {
+ volatile rb_execution_context_t *ec;
+ rb_vm_t *vm = GET_VM();
+ rb_thread_t *mth;
+
+ /*
+ * FIXME: root VM and main_thread should be static and not
+ * on heap for maximum safety (and startup/shutdown speed)
+ */
+ if (!vm) return;
+ mth = vm->main_thread;
+ if (!mth || !system_working) return;
+
+ /* this relies on GC for grace period before cont_free */
+ ec = ACCESS_ONCE(rb_execution_context_t *, mth->ec);
+
+ if (ec) RUBY_VM_SET_TRAP_INTERRUPT(ec);
}
}
}
@@ -1350,6 +1370,7 @@ set_nonblock(int fd)
rb_sys_fail(0);
}
+/* communication pipe with timer thread and signal handler */
static int
setup_communication_pipe_internal(int pipes[2])
{
@@ -1374,108 +1395,6 @@ setup_communication_pipe_internal(int pipes[2])
return 0;
}
-/* communication pipe with timer thread and signal handler */
-static int
-setup_communication_pipe(void)
-{
- rb_pid_t owner = timer_thread_pipe.owner_process;
-
- if (owner && owner != getpid()) {
- CLOSE_INVALIDATE(normal[0]);
- CLOSE_INVALIDATE(normal[1]);
- CLOSE_INVALIDATE(low[0]);
- CLOSE_INVALIDATE(low[1]);
- }
-
- if (setup_communication_pipe_internal(timer_thread_pipe.normal) < 0) {
- return errno;
- }
- if (setup_communication_pipe_internal(timer_thread_pipe.low) < 0) {
- return errno;
- }
-
- return 0;
-}
-
-/**
- * Let the timer thread sleep a while.
- *
- * The timer thread sleeps until woken up by rb_thread_wakeup_timer_thread() if only one Ruby thread is running.
- * @pre the calling context is in the timer thread.
- */
-static inline void
-timer_thread_sleep(rb_vm_t *vm)
-{
- int result;
- int need_polling;
- struct pollfd pollfds[2];
-
- pollfds[0].fd = timer_thread_pipe.normal[0];
- pollfds[0].events = POLLIN;
- pollfds[1].fd = timer_thread_pipe.low[0];
- pollfds[1].events = POLLIN;
-
- need_polling = !ubf_threads_empty();
-
- if (SIGCHLD_LOSSY && !need_polling) {
- rb_native_mutex_lock(&vm->waitpid_lock);
- if (!list_empty(&vm->waiting_pids) || !list_empty(&vm->waiting_grps)) {
- need_polling = 1;
- }
- rb_native_mutex_unlock(&vm->waitpid_lock);
- }
-
- if (vm->gvl.waiting > 0 || need_polling) {
- /* polling (TIME_QUANTUM_USEC usec) */
- result = poll(pollfds, 1, TIME_QUANTUM_USEC/1000);
- }
- else {
- /* wait (infinite) */
- result = poll(pollfds, numberof(pollfds), -1);
- }
-
- if (result == 0) {
- /* maybe timeout */
- }
- else if (result > 0) {
- consume_communication_pipe(timer_thread_pipe.normal[0]);
- consume_communication_pipe(timer_thread_pipe.low[0]);
- }
- else { /* result < 0 */
- int e = errno;
- switch (e) {
- case EBADF:
- case EINVAL:
- case ENOMEM: /* from Linux man */
- case EFAULT: /* from FreeBSD man */
- rb_async_bug_errno("thread_timer: select", e);
- default:
- /* ignore */;
- }
- }
-}
-#endif /* TIMER_THREAD_SLEEPY */
-
-#if TIMER_IMPL == TIMER_THREAD_BUSY
-# define PER_NANO 1000000000
-void rb_thread_wakeup_timer_thread(void) {}
-static void rb_thread_wakeup_timer_thread_low(void) {}
-
-static rb_nativethread_lock_t timer_thread_lock;
-static rb_nativethread_cond_t timer_thread_cond;
-
-static inline void
-timer_thread_sleep(rb_vm_t *unused)
-{
- struct timespec ts;
- ts.tv_sec = 0;
- ts.tv_nsec = TIME_QUANTUM_USEC * 1000;
- ts = native_cond_timeout(&timer_thread_cond, ts);
-
- native_cond_timedwait(&timer_thread_cond, &timer_thread_lock, &ts);
-}
-#endif /* TIMER_IMPL == TIMER_THREAD_BUSY */
-
#if !defined(SET_CURRENT_THREAD_NAME) && defined(__linux__) && defined(PR_SET_NAME)
# define SET_CURRENT_THREAD_NAME(name) prctl(PR_SET_NAME, name)
#endif
@@ -1526,137 +1445,26 @@ native_set_another_thread_name(rb_nativethread_id_t thread_id, VALUE name)
return name;
}
-static void *
-thread_timer(void *p)
-{
- rb_vm_t *vm = p;
-#ifdef HAVE_PTHREAD_SIGMASK /* mainly to enable SIGCHLD */
- {
- sigset_t mask;
- sigemptyset(&mask);
- pthread_sigmask(SIG_SETMASK, &mask, NULL);
- }
-#endif
-
- if (TT_DEBUG) WRITE_CONST(2, "start timer thread\n");
-
-#ifdef SET_CURRENT_THREAD_NAME
- SET_CURRENT_THREAD_NAME("ruby-timer-thr");
-#endif
-
-#if TIMER_IMPL == TIMER_THREAD_BUSY
- rb_native_mutex_initialize(&timer_thread_lock);
- rb_native_cond_initialize(&timer_thread_cond);
- rb_native_mutex_lock(&timer_thread_lock);
-#endif
- while (system_working > 0) {
-
- /* timer function */
- ubf_wakeup_all_threads();
- timer_thread_function(0);
-
- if (TT_DEBUG) WRITE_CONST(2, "tick\n");
-
- /* wait */
- timer_thread_sleep(vm);
- }
-#if TIMER_IMPL == TIMER_THREAD_BUSY
- rb_native_mutex_unlock(&timer_thread_lock);
- rb_native_cond_destroy(&timer_thread_cond);
- rb_native_mutex_destroy(&timer_thread_lock);
-#endif
-
- if (TT_DEBUG) WRITE_CONST(2, "finish timer thread\n");
- return NULL;
-}
-
-#if (TIMER_IMPL & TIMER_THREAD_MASK)
static void
rb_thread_create_timer_thread(void)
{
- if (!timer_thread.created) {
- size_t stack_size = 0;
- int err;
- pthread_attr_t attr;
- rb_vm_t *vm = GET_VM();
+ /* we only create the pipe, and lazy-spawn */
+ rb_pid_t current = getpid();
+ rb_pid_t owner = timer_thread_pipe.owner_process;
- err = pthread_attr_init(&attr);
- if (err != 0) {
- rb_warn("pthread_attr_init failed for timer: %s, scheduling broken",
- strerror(err));
- return;
- }
-# ifdef PTHREAD_STACK_MIN
- {
- size_t stack_min = PTHREAD_STACK_MIN; /* may be dynamic, get only once */
- const size_t min_size = (4096 * 4);
- /* Allocate the machine stack for the timer thread
- * at least 16KB (4 pages). FreeBSD 8.2 AMD64 causes
- * machine stack overflow only with PTHREAD_STACK_MIN.
- */
- enum {
- needs_more_stack =
-#if defined HAVE_VALGRIND_MEMCHECK_H && defined __APPLE__
- 1
-#else
- THREAD_DEBUG != 0
-#endif
- };
- stack_size = stack_min;
- if (stack_size < min_size) stack_size = min_size;
- if (needs_more_stack) {
- stack_size += +((BUFSIZ - 1) / stack_min + 1) * stack_min;
- }
- err = pthread_attr_setstacksize(&attr, stack_size);
- if (err != 0) {
- rb_bug("pthread_attr_setstacksize(.., %"PRIuSIZE") failed: %s",
- stack_size, strerror(err));
- }
- }
-# endif
+ if (owner && owner != current) {
+ CLOSE_INVALIDATE(normal[0]);
+ CLOSE_INVALIDATE(normal[1]);
+ }
-#if TIMER_IMPL == TIMER_THREAD_SLEEPY
- err = setup_communication_pipe();
- if (err) return;
-#endif /* TIMER_THREAD_SLEEPY */
+ if (setup_communication_pipe_internal(timer_thread_pipe.normal) < 0) return;
- /* create timer thread */
- if (timer_thread.created) {
- rb_bug("rb_thread_create_timer_thread: Timer thread was already created\n");
- }
- err = pthread_create(&timer_thread.id, &attr, thread_timer, vm);
- pthread_attr_destroy(&attr);
-
- if (err == EINVAL) {
- /*
- * Even if we are careful with our own stack use in thread_timer(),
- * any third-party libraries (eg libkqueue) which rely on __thread
- * storage can cause small stack sizes to fail. So lets hope the
- * default stack size is enough for them:
- */
- stack_size = 0;
- err = pthread_create(&timer_thread.id, NULL, thread_timer, vm);
- }
- if (err != 0) {
- rb_warn("pthread_create failed for timer: %s, scheduling broken",
- strerror(err));
- if (stack_size) {
- rb_warn("timer thread stack size: %"PRIuSIZE, stack_size);
- }
- else {
- rb_warn("timer thread stack size: system default");
- }
- VM_ASSERT(err == 0);
- return;
- }
-#if TIMER_IMPL == TIMER_THREAD_SLEEPY
- /* validate pipe on this process */
- timer_thread_pipe.owner_process = getpid();
-#endif /* TIMER_THREAD_SLEEPY */
- timer_thread.created = 1;
+ if (owner != current) {
+ /* validate pipe on this process */
+ sigwait_th = THREAD_INVALID;
+ timer_thread_pipe.owner_process = current;
}
}
-#endif /* TIMER_IMPL & TIMER_THREAD_MASK */
static int
native_stop_timer_thread(void)
@@ -1665,24 +1473,6 @@ native_stop_timer_thread(void)
stopped = --system_working <= 0;
if (TT_DEBUG) fprintf(stderr, "stop timer thread\n");
- if (stopped) {
-#if TIMER_IMPL == TIMER_THREAD_SLEEPY
- /* kick timer thread out of sleep */
- rb_thread_wakeup_timer_thread_fd(timer_thread_pipe.normal[1]);
-#endif
-
- /* timer thread will stop looping when system_working <= 0: */
- native_thread_join(timer_thread.id);
-
- /*
- * don't care if timer_thread_pipe may fill up at this point.
- * If we restart timer thread, signals will be processed, if
- * we don't, it's because we're in a different child
- */
-
- if (TT_DEBUG) fprintf(stderr, "joined timer thread\n");
- timer_thread.created = 0;
- }
return stopped;
}
@@ -1739,20 +1529,14 @@ ruby_stack_overflowed_p(const rb_thread_t *th, const void *addr)
int
rb_reserved_fd_p(int fd)
{
-#if TIMER_IMPL == TIMER_THREAD_SLEEPY
if ((fd == timer_thread_pipe.normal[0] ||
- fd == timer_thread_pipe.normal[1] ||
- fd == timer_thread_pipe.low[0] ||
- fd == timer_thread_pipe.low[1]) &&
+ fd == timer_thread_pipe.normal[1]) &&
timer_thread_pipe.owner_process == getpid()) { /* async-signal-safe */
return 1;
}
else {
return 0;
}
-#else
- return 0;
-#endif
}
rb_nativethread_id_t
@@ -1803,7 +1587,7 @@ rb_sleep_cond_get(const rb_execution_context_t *ec)
{
rb_thread_t *th = rb_ec_thread_ptr(ec);
- return &th->native_thread_data.sleep_cond;
+ return &th->native_thread_data.cond.intr;
}
void
@@ -1813,4 +1597,126 @@ rb_sleep_cond_put(rb_nativethread_cond_t *cond)
}
#endif /* USE_NATIVE_SLEEP_COND */
+int
+rb_sigwait_fd_get(const rb_thread_t *th)
+{
+ if (timer_thread_pipe.owner_process == getpid() &&
+ timer_thread_pipe.normal[0] >= 0) {
+ if (ATOMIC_PTR_CAS(sigwait_th, THREAD_INVALID, th) == THREAD_INVALID) {
+ return timer_thread_pipe.normal[0];
+ }
+ }
+ return -1; /* avoid thundering herd */
+}
+
+void
+rb_sigwait_fd_put(const rb_thread_t *th, int fd)
+{
+ const rb_thread_t *old;
+
+ VM_ASSERT(timer_thread_pipe.normal[0] == fd);
+ old = ATOMIC_PTR_EXCHANGE(sigwait_th, THREAD_INVALID);
+ if (old != th) assert(old == th);
+}
+
+#ifndef HAVE_PPOLL
+/* TODO: don't ignore sigmask */
+static int
+ruby_ppoll(struct pollfd *fds, nfds_t nfds,
+ const struct timespec *ts, const sigset_t *sigmask)
+{
+ int timeout_ms;
+
+ if (ts) {
+ int tmp, tmp2;
+
+ if (ts->tv_sec > INT_MAX/1000)
+ timeout_ms = INT_MAX;
+ else {
+ tmp = (int)(ts->tv_sec * 1000);
+ /* round up 1ns to 1ms to avoid excessive wakeups for <1ms sleep */
+ tmp2 = (int)((ts->tv_nsec + 999999L) / (1000L * 1000L));
+ if (INT_MAX - tmp < tmp2)
+ timeout_ms = INT_MAX;
+ else
+ timeout_ms = (int)(tmp + tmp2);
+ }
+ }
+ else
+ timeout_ms = -1;
+
+ return poll(fds, nfds, timeout_ms);
+}
+# define ppoll(fds,nfds,ts,sigmask) ruby_ppoll((fds),(nfds),(ts),(sigmask))
+#endif
+
+void
+rb_sigwait_sleep(rb_thread_t *th, int sigwait_fd, const struct timespec *ts)
+{
+ struct pollfd pfd;
+
+ pfd.fd = sigwait_fd;
+ pfd.events = POLLIN;
+
+ if (!BUSY_WAIT_SIGNALS && ubf_threads_empty()) {
+ (void)ppoll(&pfd, 1, ts, 0);
+ check_signals_nogvl(th, sigwait_fd);
+ }
+ else {
+ struct timespec end, diff;
+ const struct timespec *to;
+ int n = 0;
+
+ if (ts) {
+ getclockofday(&end);
+ timespec_add(&end, ts);
+ diff = *ts;
+ ts = &diff;
+ }
+ /*
+ * tricky: this needs to return on spurious wakeup (no auto-retry).
+ * But we also need to distinguish between periodic quantum
+ * wakeups, so we care about the result of consume_communication_pipe
+ */
+ for (;;) {
+ to = sigwait_timeout(th, sigwait_fd, ts, &n);
+ if (n) return;
+ n = ppoll(&pfd, 1, to, 0);
+ if (check_signals_nogvl(th, sigwait_fd))
+ return;
+ if (n || (th && RUBY_VM_INTERRUPTED(th->ec)))
+ return;
+ if (ts && timespec_update_expire(&diff, &end))
+ return;
+ }
+ }
+}
+
+static void
+native_sleep(rb_thread_t *th, struct timespec *timeout_rel)
+{
+ int sigwait_fd = rb_sigwait_fd_get(th);
+
+ if (sigwait_fd >= 0) {
+ rb_native_mutex_lock(&th->interrupt_lock);
+ th->unblock.func = ubf_sigwait;
+ rb_native_mutex_unlock(&th->interrupt_lock);
+
+ GVL_UNLOCK_BEGIN(th);
+
+ if (!RUBY_VM_INTERRUPTED(th->ec)) {
+ rb_sigwait_sleep(th, sigwait_fd, timeout_rel);
+ }
+ else {
+ check_signals_nogvl(th, sigwait_fd);
+ }
+ unblock_function_clear(th);
+ GVL_UNLOCK_END(th);
+ rb_sigwait_fd_put(th, sigwait_fd);
+ rb_sigwait_fd_migrate(th->vm);
+ }
+ else {
+ native_cond_sleep(th, timeout_rel);
+ }
+}
#endif /* THREAD_SYSTEM_DEPENDENT_IMPLEMENTATION */
diff --git a/thread_pthread.h b/thread_pthread.h
index 0566193eb5..60e0fe0ea3 100644
--- a/thread_pthread.h
+++ b/thread_pthread.h
@@ -22,7 +22,19 @@ typedef pthread_cond_t rb_nativethread_cond_t;
typedef struct native_thread_data_struct {
struct list_node ubf_list;
- rb_nativethread_cond_t sleep_cond;
+#if defined(__GLIBC__) || defined(__FreeBSD__)
+ union
+#else
+ /*
+ * assume the platform condvars are badly implemented and have a
+ * "memory" of which mutex they're associated with
+ */
+ struct
+#endif
+ {
+ rb_nativethread_cond_t intr; /* th->interrupt_lock */
+ rb_nativethread_cond_t gvlq; /* vm->gvl.lock */
+ } cond;
} native_thread_data_t;
#undef except
@@ -32,12 +44,12 @@ typedef struct native_thread_data_struct {
typedef struct rb_global_vm_lock_struct {
/* fast path */
- unsigned long acquired;
+ const struct rb_thread_struct *acquired;
rb_nativethread_lock_t lock;
/* slow path */
- volatile unsigned long waiting;
- rb_nativethread_cond_t cond;
+ struct list_head waitq;
+ const struct rb_thread_struct *timer;
/* yield */
rb_nativethread_cond_t switch_cond;
diff --git a/thread_win32.c b/thread_win32.c
index 2d5eac1ff4..6db1f25fa7 100644
--- a/thread_win32.c
+++ b/thread_win32.c
@@ -20,6 +20,8 @@
#define native_thread_yield() Sleep(0)
#define unregister_ubf_list(th)
+#define ubf_wakeup_all_threads() do {} while (0)
+#define ubf_threads_empty() (1)
static volatile DWORD ruby_native_thread_key = TLS_OUT_OF_INDEXES;
@@ -680,18 +682,21 @@ static struct {
static unsigned long __stdcall
timer_thread_func(void *dummy)
{
+ rb_vm_t *vm = GET_VM();
thread_debug("timer_thread\n");
rb_w32_set_thread_description(GetCurrentThread(), L"ruby-timer-thread");
while (WaitForSingleObject(timer_thread.lock, TIME_QUANTUM_USEC/1000) ==
WAIT_TIMEOUT) {
- timer_thread_function(dummy);
+ timer_thread_function();
+ ruby_sigchld_handler(vm); /* probably no-op */
+ rb_threadptr_check_signal(vm->main_thread);
}
thread_debug("timer killed\n");
return 0;
}
void
-rb_thread_wakeup_timer_thread(void)
+rb_thread_wakeup_timer_thread(int sig)
{
/* do nothing */
}
@@ -768,6 +773,26 @@ rb_reserved_fd_p(int fd)
return 0;
}
+int
+rb_sigwait_fd_get(rb_thread_t *th)
+{
+ return -1; /* TODO */
+}
+
+NORETURN(void rb_sigwait_fd_put(rb_thread_t *, int));
+void
+rb_sigwait_fd_put(rb_thread_t *th, int fd)
+{
+ rb_bug("not implemented, should not be called");
+}
+
+NORETURN(void rb_sigwait_sleep(const rb_thread_t *, int, const struct timespec *));
+void
+rb_sigwait_sleep(const rb_thread_t *th, int fd, const struct timespec *ts)
+{
+ rb_bug("not implemented, should not be called");
+}
+
rb_nativethread_id_t
rb_nativethread_self(void)
{
diff --git a/vm_core.h b/vm_core.h
index 88b3ccbdcb..4fb6e07619 100644
--- a/vm_core.h
+++ b/vm_core.h
@@ -564,10 +564,12 @@ typedef struct rb_vm_struct {
VALUE self;
rb_global_vm_lock_t gvl;
- rb_nativethread_lock_t thread_destruct_lock;
struct rb_thread_struct *main_thread;
- struct rb_thread_struct *running_thread;
+
+ /* persists across uncontended GVL release/acquire for time slice */
+ const struct rb_thread_struct *running_thread;
+
#ifdef USE_SIGALTSTACK
void *main_altstack;
#endif
@@ -1583,7 +1585,7 @@ void rb_vm_pop_frame(rb_execution_context_t *ec);
void rb_thread_start_timer_thread(void);
void rb_thread_stop_timer_thread(void);
void rb_thread_reset_timer_thread(void);
-void rb_thread_wakeup_timer_thread(void);
+void rb_thread_wakeup_timer_thread(int);
static inline void
rb_vm_living_threads_init(rb_vm_t *vm)