aboutsummaryrefslogtreecommitdiffstats
path: root/thread_pthread.c
diff options
context:
space:
mode:
authornormal <normal@b2dd03c8-39d4-4d8f-98ff-823fe69b080e>2018-08-06 05:22:00 +0000
committernormal <normal@b2dd03c8-39d4-4d8f-98ff-823fe69b080e>2018-08-06 05:22:00 +0000
commit1bc1570a7a493cd8227a946de7bfe5a296ed7bcf (patch)
tree1d6074813a7c515b81e61e451422e559541ba79d /thread_pthread.c
parent5f125fdf77a5f2158b96c4f27f382063d6b789a1 (diff)
downloadruby-1bc1570a7a493cd8227a946de7bfe5a296ed7bcf.tar.gz
thread_pthread.c: restore timer-thread for now :<
[ruby-core:88306] Revert "process.c: ensure th->interrupt lock is held when migrating" This reverts commit 5ca416bdf6b6785cb20f139c2c514eda005fe42f (r64201) Revert "process.c (rb_waitpid): reduce sigwait_fd bouncing" This reverts commit 217bdd776fbeea3bfd0b9324eefbfcec3b1ccb3e (r64200). Revert "test/ruby/test_thread.rb (test_thread_timer_and_interrupt): add timeouts" This reverts commit 9f395f11202fc3c7edbd76f5aa6ce1f8a1e752a9 (r64199). Revert "thread_pthread.c (native_sleep): reduce ppoll sleeps" This reverts commit b3aa256c4d43d3d7e9975ec18eb127f45f623c9b (r64193). Revert "thread.c (consume_communication_pipe): do not retry after short read" This reverts commit 291a82f748de56e65fac10edefc51ec7a54a82d4 (r64185). Revert "test/ruby/test_io.rb (test_race_gets_and_close): timeout each thread" This reverts commit 3dbd8d1f66537f968f0461ed8547460b3b1241b3 (r64184). Revert "thread_pthread.c (gvl_acquire_common): persist timeout across calls" This reverts commit 8c2ae6e3ed072b06fc3cbc34fa8a14b2acbb49d5 (r64165). Revert "test/ruby/test_io.rb (test_race_gets_and_close): use SIGABRT on timeout" This reverts commit 931cda4db8afd6b544a8d85a6815765a9c417213 (r64135). Revert "thread_pthread.c (gvl_yield): do ubf wakeups when uncontended" This reverts commit 508f00314f46c08b6e9b0141c01355d24954260c (r64133). Revert "thread_pthread.h (native_thread_data): split condvars on some platforms" This reverts commit a038bf238bd9a24bf1e1622f618a27db261fc91b (r64124). Revert "process.c (waitpid_nogvl): prevent conflicting use of sleep_cond" This reverts commit 7018acc946882f21d519af7c42ccf84b22a46b27 (r64117). Revert "thread_pthread.c (rb_sigwait_sleep): th may be 0 from MJIT" This reverts commit 56491afc7916fb24f5c4dc2c632fb93fa7063992 (r64116). Revert "thread*.c: waiting on sigwait_fd performs periodic ubf wakeups" This reverts commit ab47a57a46e70634d049e4da20a5441c7a14cdec (r64115). Revert "thread_pthread.c (gvl_destroy): make no-op on GVL bits" This reverts commit 95cae748171f4754b97f4ba54da2ae62a8d484fd (r64114). Revert "thread_pthread.c (rb_sigwait_sleep): fix uninitialized poll set in UBF case" This reverts commit 4514362948fdb914c6138b12d961d92e9c0fee6c (r64113). Revert "thread_pthread.c (rb_sigwait_sleep): re-fix [Bug #5343] harder" This reverts commit 26b8a70bb309c7a367b9134045508b5b5a580a77 (r64111). Revert "thread.c: move ppoll wrapper into thread_pthread.c" This reverts commit 3dc7727d22fecbc355597edda25d2a245bf55ba1 (r64110). Revert "thread.c: move ppoll wrapper before thread_pthread.c" This reverts commit 2fa1e2e3c3c5c4b3ce84730dee4bcbe9d81b8e35 (r64109). Revert "thread_pthread.c (ubf_select): refix [Bug #5343]" This reverts commit 4c1ab82f0623eca91a95d2a44053be22bbce48ad (r64108). Revert "thread_win32.c: suppress warnings by -Wsuggest-attribute" This reverts commit 6a9b63e39075c53870933fbac5c1065f7d22047c (r64159). Revert "thread_pthread: remove timer-thread by restructuring GVL" This reverts commit 708bfd21156828526fe72de2cedecfaca6647dc1 (r64107). git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@64203 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
Diffstat (limited to 'thread_pthread.c')
-rw-r--r--thread_pthread.c710
1 files changed, 402 insertions, 308 deletions
diff --git a/thread_pthread.c b/thread_pthread.c
index 545cc2fa3b..29805ef2df 100644
--- a/thread_pthread.c
+++ b/thread_pthread.c
@@ -45,21 +45,27 @@ void rb_native_cond_broadcast(rb_nativethread_cond_t *cond);
void rb_native_cond_wait(rb_nativethread_cond_t *cond, rb_nativethread_lock_t *mutex);
void rb_native_cond_initialize(rb_nativethread_cond_t *cond);
void rb_native_cond_destroy(rb_nativethread_cond_t *cond);
+static void rb_thread_wakeup_timer_thread_low(void);
static void clear_thread_cache_altstack(void);
-static void ubf_wakeup_all_threads(void);
-static int ubf_threads_empty(void);
-static int native_cond_timedwait(rb_nativethread_cond_t *, pthread_mutex_t *,
- const struct timespec *);
-static const struct timespec *sigwait_timeout(rb_thread_t *, int sigwait_fd,
- const struct timespec *,
- int *drained_p);
-#define TIMER_THREAD_CREATED_P() (timer_thread_pipe.owner_process == getpid())
+#define TIMER_THREAD_MASK (1)
+#define TIMER_THREAD_SLEEPY (2|TIMER_THREAD_MASK)
+#define TIMER_THREAD_BUSY (4|TIMER_THREAD_MASK)
-/* for testing, and in case we come across a platform w/o pipes: */
-#define BUSY_WAIT_SIGNALS (0)
-#define THREAD_INVALID ((const rb_thread_t *)-1)
-static const rb_thread_t *sigwait_th;
+#if defined(HAVE_POLL) && defined(HAVE_FCNTL) && defined(F_GETFL) && \
+ defined(F_SETFL) && defined(O_NONBLOCK) && \
+ defined(F_GETFD) && defined(F_SETFD) && defined(FD_CLOEXEC)
+/* The timer thread sleeps while only one Ruby thread is running. */
+# define TIMER_IMPL TIMER_THREAD_SLEEPY
+#else
+# define TIMER_IMPL TIMER_THREAD_BUSY
+#endif
+
+static struct {
+ pthread_t id;
+ int created;
+} timer_thread;
+#define TIMER_THREAD_CREATED_P() (timer_thread.created != 0)
#ifdef HAVE_SCHED_YIELD
#define native_thread_yield() (void)sched_yield()
@@ -76,96 +82,49 @@ static pthread_condattr_t *condattr_monotonic = &condattr_mono;
static const void *const condattr_monotonic = NULL;
#endif
-/* 100ms. 10ms is too small for user level thread scheduling
- * on recent Linux (tested on 2.6.35)
- */
-#define TIME_QUANTUM_USEC (100 * 1000)
-
-static struct timespec native_cond_timeout(rb_nativethread_cond_t *,
- struct timespec rel);
-
static void
-gvl_acquire_common(rb_vm_t *vm, rb_thread_t *th)
+gvl_acquire_common(rb_vm_t *vm)
{
if (vm->gvl.acquired) {
- native_thread_data_t *nd = &th->native_thread_data;
-
- VM_ASSERT(th->unblock.func == 0 && "we reuse ubf_list for GVL waitq");
-
- list_add_tail(&vm->gvl.waitq, &nd->ubf_list);
- do {
- if (!vm->gvl.timer) {
- static struct timespec ts;
- static int err = ETIMEDOUT;
-
- /*
- * become designated timer thread to kick vm->gvl.acquired
- * periodically. Continue on old timeout if it expired:
- */
- if (err == ETIMEDOUT) {
- ts.tv_sec = 0;
- ts.tv_nsec = TIME_QUANTUM_USEC * 1000;
- ts = native_cond_timeout(&nd->cond.gvlq, ts);
- }
- vm->gvl.timer = th;
- err = native_cond_timedwait(&nd->cond.gvlq, &vm->gvl.lock, &ts);
- vm->gvl.timer = 0;
- ubf_wakeup_all_threads();
-
- /*
- * Timeslice. We can't touch thread_destruct_lock here,
- * as the process may fork while this thread is contending
- * for GVL:
- */
- if (vm->gvl.acquired) timer_thread_function();
- }
- else {
- rb_native_cond_wait(&nd->cond.gvlq, &vm->gvl.lock);
- }
- } while (vm->gvl.acquired);
-
- list_del_init(&nd->ubf_list);
-
- if (vm->gvl.need_yield) {
- vm->gvl.need_yield = 0;
+
+ if (!vm->gvl.waiting++) {
+ /*
+ * Wake up timer thread iff timer thread is slept.
+ * When timer thread is polling mode, we don't want to
+ * make confusing timer thread interval time.
+ */
+ rb_thread_wakeup_timer_thread_low();
+ }
+
+ while (vm->gvl.acquired) {
+ rb_native_cond_wait(&vm->gvl.cond, &vm->gvl.lock);
+ }
+
+ --vm->gvl.waiting;
+
+ if (vm->gvl.need_yield) {
+ vm->gvl.need_yield = 0;
rb_native_cond_signal(&vm->gvl.switch_cond);
- }
+ }
}
- vm->gvl.acquired = th;
- /*
- * Designate the next gvl.timer thread, favor the last thread in
- * the waitq since it will be in waitq longest
- */
- if (!vm->gvl.timer) {
- native_thread_data_t *last;
- last = list_tail(&vm->gvl.waitq, native_thread_data_t, ubf_list);
- if (last) {
- rb_native_cond_signal(&last->cond.gvlq);
- }
- else if (!ubf_threads_empty()) {
- rb_thread_wakeup_timer_thread(0);
- }
- }
+ vm->gvl.acquired = 1;
}
static void
gvl_acquire(rb_vm_t *vm, rb_thread_t *th)
{
rb_native_mutex_lock(&vm->gvl.lock);
- gvl_acquire_common(vm, th);
+ gvl_acquire_common(vm);
rb_native_mutex_unlock(&vm->gvl.lock);
}
-static native_thread_data_t *
+static void
gvl_release_common(rb_vm_t *vm)
{
- native_thread_data_t *next;
vm->gvl.acquired = 0;
- next = list_top(&vm->gvl.waitq, native_thread_data_t, ubf_list);
- if (next) rb_native_cond_signal(&next->cond.gvlq);
-
- return next;
+ if (vm->gvl.waiting > 0)
+ rb_native_cond_signal(&vm->gvl.cond);
}
static void
@@ -179,38 +138,34 @@ gvl_release(rb_vm_t *vm)
static void
gvl_yield(rb_vm_t *vm, rb_thread_t *th)
{
- native_thread_data_t *next;
-
rb_native_mutex_lock(&vm->gvl.lock);
- next = gvl_release_common(vm);
+
+ gvl_release_common(vm);
/* An another thread is processing GVL yield. */
if (UNLIKELY(vm->gvl.wait_yield)) {
- while (vm->gvl.wait_yield)
+ while (vm->gvl.wait_yield)
rb_native_cond_wait(&vm->gvl.switch_wait_cond, &vm->gvl.lock);
+ goto acquire;
}
- else if (next) {
- /* Wait until another thread task takes GVL. */
- vm->gvl.need_yield = 1;
- vm->gvl.wait_yield = 1;
- while (vm->gvl.need_yield)
+
+ if (vm->gvl.waiting > 0) {
+ /* Wait until another thread task take GVL. */
+ vm->gvl.need_yield = 1;
+ vm->gvl.wait_yield = 1;
+ while (vm->gvl.need_yield)
rb_native_cond_wait(&vm->gvl.switch_cond, &vm->gvl.lock);
- vm->gvl.wait_yield = 0;
- rb_native_cond_broadcast(&vm->gvl.switch_wait_cond);
+ vm->gvl.wait_yield = 0;
}
else {
- rb_native_mutex_unlock(&vm->gvl.lock);
- /*
- * GVL was not contended when we released, so we have no potential
- * contenders for reacquisition. Perhaps they are stuck in blocking
- * region w/o GVL, too, so we kick them:
- */
- ubf_wakeup_all_threads();
- native_thread_yield();
+ rb_native_mutex_unlock(&vm->gvl.lock);
+ sched_yield();
rb_native_mutex_lock(&vm->gvl.lock);
- rb_native_cond_broadcast(&vm->gvl.switch_wait_cond);
}
- gvl_acquire_common(vm, th);
+
+ rb_native_cond_broadcast(&vm->gvl.switch_wait_cond);
+ acquire:
+ gvl_acquire_common(vm);
rb_native_mutex_unlock(&vm->gvl.lock);
}
@@ -218,11 +173,11 @@ static void
gvl_init(rb_vm_t *vm)
{
rb_native_mutex_initialize(&vm->gvl.lock);
+ rb_native_cond_initialize(&vm->gvl.cond);
rb_native_cond_initialize(&vm->gvl.switch_cond);
rb_native_cond_initialize(&vm->gvl.switch_wait_cond);
- list_head_init(&vm->gvl.waitq);
vm->gvl.acquired = 0;
- vm->gvl.timer = 0;
+ vm->gvl.waiting = 0;
vm->gvl.need_yield = 0;
vm->gvl.wait_yield = 0;
}
@@ -230,16 +185,10 @@ gvl_init(rb_vm_t *vm)
static void
gvl_destroy(rb_vm_t *vm)
{
- /*
- * only called once at VM shutdown (not atfork), another thread
- * may still grab vm->gvl.lock when calling gvl_release at
- * the end of thread_start_func_2
- */
- if (0) {
- rb_native_cond_destroy(&vm->gvl.switch_wait_cond);
- rb_native_cond_destroy(&vm->gvl.switch_cond);
- rb_native_mutex_destroy(&vm->gvl.lock);
- }
+ rb_native_cond_destroy(&vm->gvl.switch_wait_cond);
+ rb_native_cond_destroy(&vm->gvl.switch_cond);
+ rb_native_cond_destroy(&vm->gvl.cond);
+ rb_native_mutex_destroy(&vm->gvl.lock);
clear_thread_cache_altstack();
}
@@ -484,9 +433,7 @@ native_thread_init(rb_thread_t *th)
#ifdef USE_UBF_LIST
list_node_init(&nd->ubf_list);
#endif
- rb_native_cond_initialize(&nd->cond.gvlq);
- if (&nd->cond.gvlq != &nd->cond.intr)
- rb_native_cond_initialize(&nd->cond.intr);
+ rb_native_cond_initialize(&nd->sleep_cond);
ruby_thread_set_native(th);
}
@@ -497,11 +444,7 @@ native_thread_init(rb_thread_t *th)
static void
native_thread_destroy(rb_thread_t *th)
{
- native_thread_data_t *nd = &th->native_thread_data;
-
- rb_native_cond_destroy(&nd->cond.gvlq);
- if (&nd->cond.gvlq != &nd->cond.intr)
- rb_native_cond_destroy(&nd->cond.intr);
+ rb_native_cond_destroy(&th->native_thread_data.sleep_cond);
/*
* prevent false positive from ruby_thread_has_gvl_p if that
@@ -1069,6 +1012,17 @@ native_thread_create(rb_thread_t *th)
return err;
}
+#if (TIMER_IMPL & TIMER_THREAD_MASK)
+static void
+native_thread_join(pthread_t th)
+{
+ int err = pthread_join(th, 0);
+ if (err) {
+ rb_raise(rb_eThreadError, "native_thread_join() failed (%d)", err);
+ }
+}
+#endif /* TIMER_THREAD_MASK */
+
#if USE_NATIVE_THREAD_PRIORITY
static void
@@ -1110,15 +1064,15 @@ ubf_pthread_cond_signal(void *ptr)
{
rb_thread_t *th = (rb_thread_t *)ptr;
thread_debug("ubf_pthread_cond_signal (%p)\n", (void *)th);
- rb_native_cond_signal(&th->native_thread_data.cond.intr);
+ rb_native_cond_signal(&th->native_thread_data.sleep_cond);
}
static void
-native_cond_sleep(rb_thread_t *th, struct timespec *timeout_rel)
+native_sleep(rb_thread_t *th, struct timespec *timeout_rel)
{
struct timespec timeout;
rb_nativethread_lock_t *lock = &th->interrupt_lock;
- rb_nativethread_cond_t *cond = &th->native_thread_data.cond.intr;
+ rb_nativethread_cond_t *cond = &th->native_thread_data.sleep_cond;
if (timeout_rel) {
/* Solaris cond_timedwait() return EINVAL if an argument is greater than
@@ -1210,30 +1164,17 @@ static void
ubf_select(void *ptr)
{
rb_thread_t *th = (rb_thread_t *)ptr;
- rb_vm_t *vm = th->vm;
-
register_ubf_list(th);
/*
* ubf_wakeup_thread() doesn't guarantee to wake up a target thread.
* Therefore, we repeatedly call ubf_wakeup_thread() until a target thread
- * exit from ubf function. We must designate a timer-thread to perform
- * this operation.
+ * exit from ubf function.
+ * In the other hands, we shouldn't call rb_thread_wakeup_timer_thread()
+ * if running on timer thread because it may make endless wakeups.
*/
- rb_native_mutex_lock(&vm->gvl.lock);
- if (!vm->gvl.timer) {
- native_thread_data_t *last;
-
- last = list_tail(&vm->gvl.waitq, native_thread_data_t, ubf_list);
- if (last) {
- rb_native_cond_signal(&last->cond.gvlq);
- }
- else {
- rb_thread_wakeup_timer_thread(0);
- }
- }
- rb_native_mutex_unlock(&vm->gvl.lock);
-
+ if (!pthread_equal(pthread_self(), timer_thread.id))
+ rb_thread_wakeup_timer_thread();
ubf_wakeup_thread(th);
}
@@ -1270,16 +1211,39 @@ static int ubf_threads_empty(void) { return 1; }
#define TT_DEBUG 0
#define WRITE_CONST(fd, str) (void)(write((fd),(str),sizeof(str)-1)<0)
+/* 100ms. 10ms is too small for user level thread scheduling
+ * on recent Linux (tested on 2.6.35)
+ */
+#define TIME_QUANTUM_USEC (100 * 1000)
+
+#if TIMER_IMPL == TIMER_THREAD_SLEEPY
static struct {
- /* pipes are closed in forked children when owner_process does not match */
+ /*
+ * Read end of each pipe is closed inside timer thread for shutdown
+ * Write ends are closed by a normal Ruby thread during shutdown
+ */
int normal[2];
+ int low[2];
/* volatile for signal handler use: */
volatile rb_pid_t owner_process;
} timer_thread_pipe = {
{-1, -1},
+ {-1, -1}, /* low priority */
};
+NORETURN(static void async_bug_fd(const char *mesg, int errno_arg, int fd));
+static void
+async_bug_fd(const char *mesg, int errno_arg, int fd)
+{
+ char buff[64];
+ size_t n = strlcpy(buff, mesg, sizeof(buff));
+ if (n < sizeof(buff)-3) {
+ ruby_snprintf(buff+n, sizeof(buff)-n, "(%d)", fd);
+ }
+ rb_async_bug_errno(buff, errno_arg);
+}
+
/* only use signal-safe system calls here */
static void
rb_thread_wakeup_timer_thread_fd(int fd)
@@ -1311,33 +1275,49 @@ rb_thread_wakeup_timer_thread_fd(int fd)
}
void
-rb_thread_wakeup_timer_thread(int sig)
+rb_thread_wakeup_timer_thread(void)
{
/* must be safe inside sighandler, so no mutex */
if (timer_thread_pipe.owner_process == getpid()) {
- rb_thread_wakeup_timer_thread_fd(timer_thread_pipe.normal[1]);
-
- /*
- * system_working check is required because vm and main_thread are
- * freed during shutdown
- */
- if (sig && system_working) {
- volatile rb_execution_context_t *ec;
- rb_vm_t *vm = GET_VM();
- rb_thread_t *mth;
-
- /*
- * FIXME: root VM and main_thread should be static and not
- * on heap for maximum safety (and startup/shutdown speed)
- */
- if (!vm) return;
- mth = vm->main_thread;
- if (!mth || !system_working) return;
-
- /* this relies on GC for grace period before cont_free */
- ec = ACCESS_ONCE(rb_execution_context_t *, mth->ec);
-
- if (ec) RUBY_VM_SET_TRAP_INTERRUPT(ec);
+ rb_thread_wakeup_timer_thread_fd(timer_thread_pipe.normal[1]);
+ }
+}
+
+static void
+rb_thread_wakeup_timer_thread_low(void)
+{
+ if (timer_thread_pipe.owner_process == getpid()) {
+ rb_thread_wakeup_timer_thread_fd(timer_thread_pipe.low[1]);
+ }
+}
+
+/* VM-dependent API is not available for this function */
+static void
+consume_communication_pipe(int fd)
+{
+#define CCP_READ_BUFF_SIZE 1024
+ /* buffer can be shared because no one refers to them. */
+ static char buff[CCP_READ_BUFF_SIZE];
+ ssize_t result;
+
+ while (1) {
+ result = read(fd, buff, sizeof(buff));
+ if (result == 0) {
+ return;
+ }
+ else if (result < 0) {
+ int e = errno;
+ switch (e) {
+ case EINTR:
+ continue; /* retry */
+ case EAGAIN:
+#if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN
+ case EWOULDBLOCK:
+#endif
+ return;
+ default:
+ async_bug_fd("consume_communication_pipe: read", e, fd);
+ }
}
}
}
@@ -1370,7 +1350,6 @@ set_nonblock(int fd)
rb_sys_fail(0);
}
-/* communication pipe with timer thread and signal handler */
static int
setup_communication_pipe_internal(int pipes[2])
{
@@ -1395,6 +1374,108 @@ setup_communication_pipe_internal(int pipes[2])
return 0;
}
+/* communication pipe with timer thread and signal handler */
+static int
+setup_communication_pipe(void)
+{
+ rb_pid_t owner = timer_thread_pipe.owner_process;
+
+ if (owner && owner != getpid()) {
+ CLOSE_INVALIDATE(normal[0]);
+ CLOSE_INVALIDATE(normal[1]);
+ CLOSE_INVALIDATE(low[0]);
+ CLOSE_INVALIDATE(low[1]);
+ }
+
+ if (setup_communication_pipe_internal(timer_thread_pipe.normal) < 0) {
+ return errno;
+ }
+ if (setup_communication_pipe_internal(timer_thread_pipe.low) < 0) {
+ return errno;
+ }
+
+ return 0;
+}
+
+/**
+ * Let the timer thread sleep a while.
+ *
+ * The timer thread sleeps until woken up by rb_thread_wakeup_timer_thread() if only one Ruby thread is running.
+ * @pre the calling context is in the timer thread.
+ */
+static inline void
+timer_thread_sleep(rb_vm_t *vm)
+{
+ int result;
+ int need_polling;
+ struct pollfd pollfds[2];
+
+ pollfds[0].fd = timer_thread_pipe.normal[0];
+ pollfds[0].events = POLLIN;
+ pollfds[1].fd = timer_thread_pipe.low[0];
+ pollfds[1].events = POLLIN;
+
+ need_polling = !ubf_threads_empty();
+
+ if (SIGCHLD_LOSSY && !need_polling) {
+ rb_native_mutex_lock(&vm->waitpid_lock);
+ if (!list_empty(&vm->waiting_pids) || !list_empty(&vm->waiting_grps)) {
+ need_polling = 1;
+ }
+ rb_native_mutex_unlock(&vm->waitpid_lock);
+ }
+
+ if (vm->gvl.waiting > 0 || need_polling) {
+ /* polling (TIME_QUANTUM_USEC usec) */
+ result = poll(pollfds, 1, TIME_QUANTUM_USEC/1000);
+ }
+ else {
+ /* wait (infinite) */
+ result = poll(pollfds, numberof(pollfds), -1);
+ }
+
+ if (result == 0) {
+ /* maybe timeout */
+ }
+ else if (result > 0) {
+ consume_communication_pipe(timer_thread_pipe.normal[0]);
+ consume_communication_pipe(timer_thread_pipe.low[0]);
+ }
+ else { /* result < 0 */
+ int e = errno;
+ switch (e) {
+ case EBADF:
+ case EINVAL:
+ case ENOMEM: /* from Linux man */
+ case EFAULT: /* from FreeBSD man */
+ rb_async_bug_errno("thread_timer: select", e);
+ default:
+ /* ignore */;
+ }
+ }
+}
+#endif /* TIMER_THREAD_SLEEPY */
+
+#if TIMER_IMPL == TIMER_THREAD_BUSY
+# define PER_NANO 1000000000
+void rb_thread_wakeup_timer_thread(void) {}
+static void rb_thread_wakeup_timer_thread_low(void) {}
+
+static rb_nativethread_lock_t timer_thread_lock;
+static rb_nativethread_cond_t timer_thread_cond;
+
+static inline void
+timer_thread_sleep(rb_vm_t *unused)
+{
+ struct timespec ts;
+ ts.tv_sec = 0;
+ ts.tv_nsec = TIME_QUANTUM_USEC * 1000;
+ ts = native_cond_timeout(&timer_thread_cond, ts);
+
+ native_cond_timedwait(&timer_thread_cond, &timer_thread_lock, &ts);
+}
+#endif /* TIMER_IMPL == TIMER_THREAD_BUSY */
+
#if !defined(SET_CURRENT_THREAD_NAME) && defined(__linux__) && defined(PR_SET_NAME)
# define SET_CURRENT_THREAD_NAME(name) prctl(PR_SET_NAME, name)
#endif
@@ -1445,26 +1526,137 @@ native_set_another_thread_name(rb_nativethread_id_t thread_id, VALUE name)
return name;
}
+static void *
+thread_timer(void *p)
+{
+ rb_vm_t *vm = p;
+#ifdef HAVE_PTHREAD_SIGMASK /* mainly to enable SIGCHLD */
+ {
+ sigset_t mask;
+ sigemptyset(&mask);
+ pthread_sigmask(SIG_SETMASK, &mask, NULL);
+ }
+#endif
+
+ if (TT_DEBUG) WRITE_CONST(2, "start timer thread\n");
+
+#ifdef SET_CURRENT_THREAD_NAME
+ SET_CURRENT_THREAD_NAME("ruby-timer-thr");
+#endif
+
+#if TIMER_IMPL == TIMER_THREAD_BUSY
+ rb_native_mutex_initialize(&timer_thread_lock);
+ rb_native_cond_initialize(&timer_thread_cond);
+ rb_native_mutex_lock(&timer_thread_lock);
+#endif
+ while (system_working > 0) {
+
+ /* timer function */
+ ubf_wakeup_all_threads();
+ timer_thread_function(0);
+
+ if (TT_DEBUG) WRITE_CONST(2, "tick\n");
+
+ /* wait */
+ timer_thread_sleep(vm);
+ }
+#if TIMER_IMPL == TIMER_THREAD_BUSY
+ rb_native_mutex_unlock(&timer_thread_lock);
+ rb_native_cond_destroy(&timer_thread_cond);
+ rb_native_mutex_destroy(&timer_thread_lock);
+#endif
+
+ if (TT_DEBUG) WRITE_CONST(2, "finish timer thread\n");
+ return NULL;
+}
+
+#if (TIMER_IMPL & TIMER_THREAD_MASK)
static void
rb_thread_create_timer_thread(void)
{
- /* we only create the pipe, and lazy-spawn */
- rb_pid_t current = getpid();
- rb_pid_t owner = timer_thread_pipe.owner_process;
+ if (!timer_thread.created) {
+ size_t stack_size = 0;
+ int err;
+ pthread_attr_t attr;
+ rb_vm_t *vm = GET_VM();
- if (owner && owner != current) {
- CLOSE_INVALIDATE(normal[0]);
- CLOSE_INVALIDATE(normal[1]);
- }
+ err = pthread_attr_init(&attr);
+ if (err != 0) {
+ rb_warn("pthread_attr_init failed for timer: %s, scheduling broken",
+ strerror(err));
+ return;
+ }
+# ifdef PTHREAD_STACK_MIN
+ {
+ size_t stack_min = PTHREAD_STACK_MIN; /* may be dynamic, get only once */
+ const size_t min_size = (4096 * 4);
+ /* Allocate the machine stack for the timer thread
+ * at least 16KB (4 pages). FreeBSD 8.2 AMD64 causes
+ * machine stack overflow only with PTHREAD_STACK_MIN.
+ */
+ enum {
+ needs_more_stack =
+#if defined HAVE_VALGRIND_MEMCHECK_H && defined __APPLE__
+ 1
+#else
+ THREAD_DEBUG != 0
+#endif
+ };
+ stack_size = stack_min;
+ if (stack_size < min_size) stack_size = min_size;
+ if (needs_more_stack) {
+ stack_size += +((BUFSIZ - 1) / stack_min + 1) * stack_min;
+ }
+ err = pthread_attr_setstacksize(&attr, stack_size);
+ if (err != 0) {
+ rb_bug("pthread_attr_setstacksize(.., %"PRIuSIZE") failed: %s",
+ stack_size, strerror(err));
+ }
+ }
+# endif
- if (setup_communication_pipe_internal(timer_thread_pipe.normal) < 0) return;
+#if TIMER_IMPL == TIMER_THREAD_SLEEPY
+ err = setup_communication_pipe();
+ if (err) return;
+#endif /* TIMER_THREAD_SLEEPY */
- if (owner != current) {
- /* validate pipe on this process */
- sigwait_th = THREAD_INVALID;
- timer_thread_pipe.owner_process = current;
+ /* create timer thread */
+ if (timer_thread.created) {
+ rb_bug("rb_thread_create_timer_thread: Timer thread was already created\n");
+ }
+ err = pthread_create(&timer_thread.id, &attr, thread_timer, vm);
+ pthread_attr_destroy(&attr);
+
+ if (err == EINVAL) {
+ /*
+ * Even if we are careful with our own stack use in thread_timer(),
+ * any third-party libraries (eg libkqueue) which rely on __thread
+ * storage can cause small stack sizes to fail. So lets hope the
+ * default stack size is enough for them:
+ */
+ stack_size = 0;
+ err = pthread_create(&timer_thread.id, NULL, thread_timer, vm);
+ }
+ if (err != 0) {
+ rb_warn("pthread_create failed for timer: %s, scheduling broken",
+ strerror(err));
+ if (stack_size) {
+ rb_warn("timer thread stack size: %"PRIuSIZE, stack_size);
+ }
+ else {
+ rb_warn("timer thread stack size: system default");
+ }
+ VM_ASSERT(err == 0);
+ return;
+ }
+#if TIMER_IMPL == TIMER_THREAD_SLEEPY
+ /* validate pipe on this process */
+ timer_thread_pipe.owner_process = getpid();
+#endif /* TIMER_THREAD_SLEEPY */
+ timer_thread.created = 1;
}
}
+#endif /* TIMER_IMPL & TIMER_THREAD_MASK */
static int
native_stop_timer_thread(void)
@@ -1473,6 +1665,24 @@ native_stop_timer_thread(void)
stopped = --system_working <= 0;
if (TT_DEBUG) fprintf(stderr, "stop timer thread\n");
+ if (stopped) {
+#if TIMER_IMPL == TIMER_THREAD_SLEEPY
+ /* kick timer thread out of sleep */
+ rb_thread_wakeup_timer_thread_fd(timer_thread_pipe.normal[1]);
+#endif
+
+ /* timer thread will stop looping when system_working <= 0: */
+ native_thread_join(timer_thread.id);
+
+ /*
+ * don't care if timer_thread_pipe may fill up at this point.
+ * If we restart timer thread, signals will be processed, if
+ * we don't, it's because we're in a different child
+ */
+
+ if (TT_DEBUG) fprintf(stderr, "joined timer thread\n");
+ timer_thread.created = 0;
+ }
return stopped;
}
@@ -1529,14 +1739,20 @@ ruby_stack_overflowed_p(const rb_thread_t *th, const void *addr)
int
rb_reserved_fd_p(int fd)
{
+#if TIMER_IMPL == TIMER_THREAD_SLEEPY
if ((fd == timer_thread_pipe.normal[0] ||
- fd == timer_thread_pipe.normal[1]) &&
+ fd == timer_thread_pipe.normal[1] ||
+ fd == timer_thread_pipe.low[0] ||
+ fd == timer_thread_pipe.low[1]) &&
timer_thread_pipe.owner_process == getpid()) { /* async-signal-safe */
return 1;
}
else {
return 0;
}
+#else
+ return 0;
+#endif
}
rb_nativethread_id_t
@@ -1587,7 +1803,7 @@ rb_sleep_cond_get(const rb_execution_context_t *ec)
{
rb_thread_t *th = rb_ec_thread_ptr(ec);
- return &th->native_thread_data.cond.intr;
+ return &th->native_thread_data.sleep_cond;
}
void
@@ -1597,126 +1813,4 @@ rb_sleep_cond_put(rb_nativethread_cond_t *cond)
}
#endif /* USE_NATIVE_SLEEP_COND */
-int
-rb_sigwait_fd_get(const rb_thread_t *th)
-{
- if (timer_thread_pipe.owner_process == getpid() &&
- timer_thread_pipe.normal[0] >= 0) {
- if (ATOMIC_PTR_CAS(sigwait_th, THREAD_INVALID, th) == THREAD_INVALID) {
- return timer_thread_pipe.normal[0];
- }
- }
- return -1; /* avoid thundering herd */
-}
-
-void
-rb_sigwait_fd_put(const rb_thread_t *th, int fd)
-{
- const rb_thread_t *old;
-
- VM_ASSERT(timer_thread_pipe.normal[0] == fd);
- old = ATOMIC_PTR_EXCHANGE(sigwait_th, THREAD_INVALID);
- if (old != th) assert(old == th);
-}
-
-#ifndef HAVE_PPOLL
-/* TODO: don't ignore sigmask */
-static int
-ruby_ppoll(struct pollfd *fds, nfds_t nfds,
- const struct timespec *ts, const sigset_t *sigmask)
-{
- int timeout_ms;
-
- if (ts) {
- int tmp, tmp2;
-
- if (ts->tv_sec > INT_MAX/1000)
- timeout_ms = INT_MAX;
- else {
- tmp = (int)(ts->tv_sec * 1000);
- /* round up 1ns to 1ms to avoid excessive wakeups for <1ms sleep */
- tmp2 = (int)((ts->tv_nsec + 999999L) / (1000L * 1000L));
- if (INT_MAX - tmp < tmp2)
- timeout_ms = INT_MAX;
- else
- timeout_ms = (int)(tmp + tmp2);
- }
- }
- else
- timeout_ms = -1;
-
- return poll(fds, nfds, timeout_ms);
-}
-# define ppoll(fds,nfds,ts,sigmask) ruby_ppoll((fds),(nfds),(ts),(sigmask))
-#endif
-
-void
-rb_sigwait_sleep(rb_thread_t *th, int sigwait_fd, const struct timespec *ts)
-{
- struct pollfd pfd;
-
- pfd.fd = sigwait_fd;
- pfd.events = POLLIN;
-
- if (!BUSY_WAIT_SIGNALS && ubf_threads_empty()) {
- (void)ppoll(&pfd, 1, ts, 0);
- check_signals_nogvl(th, sigwait_fd);
- }
- else {
- struct timespec end, diff;
- const struct timespec *to;
- int n = 0;
-
- if (ts) {
- getclockofday(&end);
- timespec_add(&end, ts);
- diff = *ts;
- ts = &diff;
- }
- /*
- * tricky: this needs to return on spurious wakeup (no auto-retry).
- * But we also need to distinguish between periodic quantum
- * wakeups, so we care about the result of consume_communication_pipe
- */
- for (;;) {
- to = sigwait_timeout(th, sigwait_fd, ts, &n);
- if (n) return;
- n = ppoll(&pfd, 1, to, 0);
- if (check_signals_nogvl(th, sigwait_fd))
- return;
- if (n || (th && RUBY_VM_INTERRUPTED(th->ec)))
- return;
- if (ts && timespec_update_expire(&diff, &end))
- return;
- }
- }
-}
-
-static void
-native_sleep(rb_thread_t *th, struct timespec *timeout_rel)
-{
- int sigwait_fd = rb_sigwait_fd_get(th);
-
- if (sigwait_fd >= 0) {
- rb_native_mutex_lock(&th->interrupt_lock);
- th->unblock.func = ubf_sigwait;
- rb_native_mutex_unlock(&th->interrupt_lock);
-
- GVL_UNLOCK_BEGIN(th);
-
- if (!RUBY_VM_INTERRUPTED(th->ec)) {
- rb_sigwait_sleep(th, sigwait_fd, timeout_rel);
- }
- else {
- check_signals_nogvl(th, sigwait_fd);
- }
- unblock_function_clear(th);
- GVL_UNLOCK_END(th);
- rb_sigwait_fd_put(th, sigwait_fd);
- rb_sigwait_fd_migrate(th->vm);
- }
- else {
- native_cond_sleep(th, timeout_rel);
- }
-}
#endif /* THREAD_SYSTEM_DEPENDENT_IMPLEMENTATION */