From 708bfd21156828526fe72de2cedecfaca6647dc1 Mon Sep 17 00:00:00 2001 From: normal Date: Sun, 29 Jul 2018 20:47:33 +0000 Subject: thread_pthread: remove timer-thread by restructuring GVL To reduce resource use and reduce CI failure; remove timer-thread. Single-threaded Ruby processes (including forked children) will never see extra thread overhead. This prevents glibc and jemalloc from going into multi-threaded mode and initializing locks or causing fragmentation via arena explosion. The GVL is implements its own wait-queue as a ccan/list to permit controlling wakeup order. Timeslice under contention is handled by a designated timer thread (similar to choosing a "patrol_thread" for current deadlock checking). There is only one self-pipe, now, as wakeups for timeslice are done independently using condition variables. This reduces FD pressure slightly. Signal handling is handled directly by a Ruby Thread (instead of timer-thread) by exposing signal self-pipe to callers of rb_thread_fd_select, native_sleep, rb_wait_for_single_fd, etc... Acquiring, using, and releasing the self-pipe is exposed via 4 new internal functions: 1) rb_sigwait_fd_get - exclusively acquire timer_thread_pipe.normal[0] 2) rb_sigwait_fd_sleep - sleep and wait for signal (and no other FDs) 3) rb_sigwait_fd_put - release acquired result from rb_sigwait_fd_get 4) rb_sigwait_fd_migrate - migrate signal handling to another thread after calling rb_sigwait_fd_put. rb_sigwait_fd_migrate is necessary for waitpid callers because only one thread can wait on self-pipe at a time, otherwise a deadlock will occur if threads fight over the self-pipe. TRAP_INTERRUPT_MASK is now set for the main thread directly in signal handler via rb_thread_wakeup_timer_thread. Originally, I wanted to use POSIX timers (timer_create/timer_settime) for this. Unfortunately, this proved unfeasible as Mutex#sleep resumes on spurious wakeups and test/thread/test_cv.rb::test_condvar_timed_wait failed. Using pthread_sigmask to mask out SIGVTALRM fixed that test, but test/fiddle/test_function.rb::test_nogvl_poll proved there'd be some unavoidable (and frequent) incompatibilities from that approach. Finally, this allows us to drop thread_destruct_lock and interrupt current ec directly. We don't need to rely on vm->thread_destruct_lock or a coherent vm->running_thread on any platform. Separate timer-thread for time slice and signal handling is relegated to thread_win32.c, now. [ruby-core:88088] [Misc #14937] git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@64107 b2dd03c8-39d4-4d8f-98ff-823fe69b080e --- thread_pthread.c | 586 ++++++++++++++++++------------------------------------- 1 file changed, 187 insertions(+), 399 deletions(-) (limited to 'thread_pthread.c') diff --git a/thread_pthread.c b/thread_pthread.c index 475554cf61..0eed3847b2 100644 --- a/thread_pthread.c +++ b/thread_pthread.c @@ -45,27 +45,16 @@ void rb_native_cond_broadcast(rb_nativethread_cond_t *cond); void rb_native_cond_wait(rb_nativethread_cond_t *cond, rb_nativethread_lock_t *mutex); void rb_native_cond_initialize(rb_nativethread_cond_t *cond); void rb_native_cond_destroy(rb_nativethread_cond_t *cond); -static void rb_thread_wakeup_timer_thread_low(void); static void clear_thread_cache_altstack(void); +static void ubf_wakeup_all_threads(void); +static int ubf_threads_empty(void); +static int native_cond_timedwait(rb_nativethread_cond_t *, pthread_mutex_t *, + const struct timespec *); -#define TIMER_THREAD_MASK (1) -#define TIMER_THREAD_SLEEPY (2|TIMER_THREAD_MASK) -#define TIMER_THREAD_BUSY (4|TIMER_THREAD_MASK) +#define TIMER_THREAD_CREATED_P() (timer_thread_pipe.owner_process == getpid()) -#if defined(HAVE_POLL) && defined(HAVE_FCNTL) && defined(F_GETFL) && \ - defined(F_SETFL) && defined(O_NONBLOCK) && \ - defined(F_GETFD) && defined(F_SETFD) && defined(FD_CLOEXEC) -/* The timer thread sleeps while only one Ruby thread is running. */ -# define TIMER_IMPL TIMER_THREAD_SLEEPY -#else -# define TIMER_IMPL TIMER_THREAD_BUSY -#endif - -static struct { - pthread_t id; - int created; -} timer_thread; -#define TIMER_THREAD_CREATED_P() (timer_thread.created != 0) +#define THREAD_INVALID ((const rb_thread_t *)-1) +static const rb_thread_t *sigwait_th; #ifdef HAVE_SCHED_YIELD #define native_thread_yield() (void)sched_yield() @@ -82,49 +71,85 @@ static pthread_condattr_t *condattr_monotonic = &condattr_mono; static const void *const condattr_monotonic = NULL; #endif +/* 100ms. 10ms is too small for user level thread scheduling + * on recent Linux (tested on 2.6.35) + */ +#define TIME_QUANTUM_USEC (100 * 1000) + +static struct timespec native_cond_timeout(rb_nativethread_cond_t *, + struct timespec rel); + static void -gvl_acquire_common(rb_vm_t *vm) +gvl_acquire_common(rb_vm_t *vm, rb_thread_t *th) { if (vm->gvl.acquired) { - - if (!vm->gvl.waiting++) { - /* - * Wake up timer thread iff timer thread is slept. - * When timer thread is polling mode, we don't want to - * make confusing timer thread interval time. - */ - rb_thread_wakeup_timer_thread_low(); - } - - while (vm->gvl.acquired) { - rb_native_cond_wait(&vm->gvl.cond, &vm->gvl.lock); - } - - --vm->gvl.waiting; - - if (vm->gvl.need_yield) { - vm->gvl.need_yield = 0; + native_thread_data_t *nd = &th->native_thread_data; + + VM_ASSERT(th->unblock.func == 0 && "we reuse ubf_list for GVL waitq"); + + list_add_tail(&vm->gvl.waitq, &nd->ubf_list); + do { + if (!vm->gvl.timer) { + struct timespec ts = { 0, TIME_QUANTUM_USEC * 1000 }; + /* + * become designated timer thread to kick vm->gvl.acquired + * periodically + */ + ts = native_cond_timeout(&nd->sleep_cond, ts); + vm->gvl.timer = th; + native_cond_timedwait(&nd->sleep_cond, &vm->gvl.lock, &ts); + vm->gvl.timer = 0; + ubf_wakeup_all_threads(); + + /* + * Timeslice. We can't touch thread_destruct_lock here, + * as the process may fork while this thread is contending + * for GVL: + */ + if (vm->gvl.acquired) timer_thread_function(); + } + else { + rb_native_cond_wait(&nd->sleep_cond, &vm->gvl.lock); + } + } while (vm->gvl.acquired); + + list_del_init(&nd->ubf_list); + + if (vm->gvl.need_yield) { + vm->gvl.need_yield = 0; rb_native_cond_signal(&vm->gvl.switch_cond); - } + } } + vm->gvl.acquired = th; + /* + * Designate the next gvl.timer thread, favor the last thread in + * the waitq since it will be in waitq longest + */ + if (!vm->gvl.timer) { + native_thread_data_t *last; - vm->gvl.acquired = 1; + last = list_tail(&vm->gvl.waitq, native_thread_data_t, ubf_list); + if (last) rb_native_cond_signal(&last->sleep_cond); + } } static void gvl_acquire(rb_vm_t *vm, rb_thread_t *th) { rb_native_mutex_lock(&vm->gvl.lock); - gvl_acquire_common(vm); + gvl_acquire_common(vm, th); rb_native_mutex_unlock(&vm->gvl.lock); } -static void +static native_thread_data_t * gvl_release_common(rb_vm_t *vm) { + native_thread_data_t *next; vm->gvl.acquired = 0; - if (vm->gvl.waiting > 0) - rb_native_cond_signal(&vm->gvl.cond); + next = list_top(&vm->gvl.waitq, native_thread_data_t, ubf_list); + if (next) rb_native_cond_signal(&next->sleep_cond); + + return next; } static void @@ -138,34 +163,32 @@ gvl_release(rb_vm_t *vm) static void gvl_yield(rb_vm_t *vm, rb_thread_t *th) { - rb_native_mutex_lock(&vm->gvl.lock); + native_thread_data_t *next; - gvl_release_common(vm); + rb_native_mutex_lock(&vm->gvl.lock); + next = gvl_release_common(vm); /* An another thread is processing GVL yield. */ if (UNLIKELY(vm->gvl.wait_yield)) { - while (vm->gvl.wait_yield) + while (vm->gvl.wait_yield) rb_native_cond_wait(&vm->gvl.switch_wait_cond, &vm->gvl.lock); - goto acquire; } - - if (vm->gvl.waiting > 0) { - /* Wait until another thread task take GVL. */ - vm->gvl.need_yield = 1; - vm->gvl.wait_yield = 1; - while (vm->gvl.need_yield) + else if (next) { + /* Wait until another thread task takes GVL. */ + vm->gvl.need_yield = 1; + vm->gvl.wait_yield = 1; + while (vm->gvl.need_yield) rb_native_cond_wait(&vm->gvl.switch_cond, &vm->gvl.lock); - vm->gvl.wait_yield = 0; + vm->gvl.wait_yield = 0; + rb_native_cond_broadcast(&vm->gvl.switch_wait_cond); } else { - rb_native_mutex_unlock(&vm->gvl.lock); - sched_yield(); + rb_native_mutex_unlock(&vm->gvl.lock); + native_thread_yield(); rb_native_mutex_lock(&vm->gvl.lock); + rb_native_cond_broadcast(&vm->gvl.switch_wait_cond); } - - rb_native_cond_broadcast(&vm->gvl.switch_wait_cond); - acquire: - gvl_acquire_common(vm); + gvl_acquire_common(vm, th); rb_native_mutex_unlock(&vm->gvl.lock); } @@ -173,11 +196,11 @@ static void gvl_init(rb_vm_t *vm) { rb_native_mutex_initialize(&vm->gvl.lock); - rb_native_cond_initialize(&vm->gvl.cond); rb_native_cond_initialize(&vm->gvl.switch_cond); rb_native_cond_initialize(&vm->gvl.switch_wait_cond); + list_head_init(&vm->gvl.waitq); vm->gvl.acquired = 0; - vm->gvl.waiting = 0; + vm->gvl.timer = 0; vm->gvl.need_yield = 0; vm->gvl.wait_yield = 0; } @@ -187,7 +210,6 @@ gvl_destroy(rb_vm_t *vm) { rb_native_cond_destroy(&vm->gvl.switch_wait_cond); rb_native_cond_destroy(&vm->gvl.switch_cond); - rb_native_cond_destroy(&vm->gvl.cond); rb_native_mutex_destroy(&vm->gvl.lock); clear_thread_cache_altstack(); } @@ -1012,17 +1034,6 @@ native_thread_create(rb_thread_t *th) return err; } -#if (TIMER_IMPL & TIMER_THREAD_MASK) -static void -native_thread_join(pthread_t th) -{ - int err = pthread_join(th, 0); - if (err) { - rb_raise(rb_eThreadError, "native_thread_join() failed (%d)", err); - } -} -#endif /* TIMER_THREAD_MASK */ - #if USE_NATIVE_THREAD_PRIORITY static void @@ -1068,7 +1079,7 @@ ubf_pthread_cond_signal(void *ptr) } static void -native_sleep(rb_thread_t *th, struct timespec *timeout_rel) +native_cond_sleep(rb_thread_t *th, struct timespec *timeout_rel) { struct timespec timeout; rb_nativethread_lock_t *lock = &th->interrupt_lock; @@ -1161,17 +1172,8 @@ static void ubf_select(void *ptr) { rb_thread_t *th = (rb_thread_t *)ptr; - register_ubf_list(th); - /* - * ubf_wakeup_thread() doesn't guarantee to wake up a target thread. - * Therefore, we repeatedly call ubf_wakeup_thread() until a target thread - * exit from ubf function. - * In the other hands, we shouldn't call rb_thread_wakeup_timer_thread() - * if running on timer thread because it may make endless wakeups. - */ - if (!pthread_equal(pthread_self(), timer_thread.id)) - rb_thread_wakeup_timer_thread(); + register_ubf_list(th); ubf_wakeup_thread(th); } @@ -1208,39 +1210,16 @@ static int ubf_threads_empty(void) { return 1; } #define TT_DEBUG 0 #define WRITE_CONST(fd, str) (void)(write((fd),(str),sizeof(str)-1)<0) -/* 100ms. 10ms is too small for user level thread scheduling - * on recent Linux (tested on 2.6.35) - */ -#define TIME_QUANTUM_USEC (100 * 1000) - -#if TIMER_IMPL == TIMER_THREAD_SLEEPY static struct { - /* - * Read end of each pipe is closed inside timer thread for shutdown - * Write ends are closed by a normal Ruby thread during shutdown - */ + /* pipes are closed in forked children when owner_process does not match */ int normal[2]; - int low[2]; /* volatile for signal handler use: */ volatile rb_pid_t owner_process; } timer_thread_pipe = { {-1, -1}, - {-1, -1}, /* low priority */ }; -NORETURN(static void async_bug_fd(const char *mesg, int errno_arg, int fd)); -static void -async_bug_fd(const char *mesg, int errno_arg, int fd) -{ - char buff[64]; - size_t n = strlcpy(buff, mesg, sizeof(buff)); - if (n < sizeof(buff)-3) { - ruby_snprintf(buff+n, sizeof(buff)-n, "(%d)", fd); - } - rb_async_bug_errno(buff, errno_arg); -} - /* only use signal-safe system calls here */ static void rb_thread_wakeup_timer_thread_fd(int fd) @@ -1272,49 +1251,33 @@ rb_thread_wakeup_timer_thread_fd(int fd) } void -rb_thread_wakeup_timer_thread(void) +rb_thread_wakeup_timer_thread(int sig) { /* must be safe inside sighandler, so no mutex */ if (timer_thread_pipe.owner_process == getpid()) { - rb_thread_wakeup_timer_thread_fd(timer_thread_pipe.normal[1]); - } -} - -static void -rb_thread_wakeup_timer_thread_low(void) -{ - if (timer_thread_pipe.owner_process == getpid()) { - rb_thread_wakeup_timer_thread_fd(timer_thread_pipe.low[1]); - } -} - -/* VM-dependent API is not available for this function */ -static void -consume_communication_pipe(int fd) -{ -#define CCP_READ_BUFF_SIZE 1024 - /* buffer can be shared because no one refers to them. */ - static char buff[CCP_READ_BUFF_SIZE]; - ssize_t result; - - while (1) { - result = read(fd, buff, sizeof(buff)); - if (result == 0) { - return; - } - else if (result < 0) { - int e = errno; - switch (e) { - case EINTR: - continue; /* retry */ - case EAGAIN: -#if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN - case EWOULDBLOCK: -#endif - return; - default: - async_bug_fd("consume_communication_pipe: read", e, fd); - } + rb_thread_wakeup_timer_thread_fd(timer_thread_pipe.normal[1]); + + /* + * system_working check is required because vm and main_thread are + * freed during shutdown + */ + if (sig && system_working) { + volatile rb_execution_context_t *ec; + rb_vm_t *vm = GET_VM(); + rb_thread_t *mth; + + /* + * FIXME: root VM and main_thread should be static and not + * on heap for maximum safety (and startup/shutdown speed) + */ + if (!vm) return; + mth = vm->main_thread; + if (!mth || !system_working) return; + + /* this relies on GC for grace period before cont_free */ + ec = ACCESS_ONCE(rb_execution_context_t *, mth->ec); + + if (ec) RUBY_VM_SET_TRAP_INTERRUPT(ec); } } } @@ -1347,6 +1310,7 @@ set_nonblock(int fd) rb_sys_fail(0); } +/* communication pipe with timer thread and signal handler */ static int setup_communication_pipe_internal(int pipes[2]) { @@ -1371,108 +1335,6 @@ setup_communication_pipe_internal(int pipes[2]) return 0; } -/* communication pipe with timer thread and signal handler */ -static int -setup_communication_pipe(void) -{ - rb_pid_t owner = timer_thread_pipe.owner_process; - - if (owner && owner != getpid()) { - CLOSE_INVALIDATE(normal[0]); - CLOSE_INVALIDATE(normal[1]); - CLOSE_INVALIDATE(low[0]); - CLOSE_INVALIDATE(low[1]); - } - - if (setup_communication_pipe_internal(timer_thread_pipe.normal) < 0) { - return errno; - } - if (setup_communication_pipe_internal(timer_thread_pipe.low) < 0) { - return errno; - } - - return 0; -} - -/** - * Let the timer thread sleep a while. - * - * The timer thread sleeps until woken up by rb_thread_wakeup_timer_thread() if only one Ruby thread is running. - * @pre the calling context is in the timer thread. - */ -static inline void -timer_thread_sleep(rb_vm_t *vm) -{ - int result; - int need_polling; - struct pollfd pollfds[2]; - - pollfds[0].fd = timer_thread_pipe.normal[0]; - pollfds[0].events = POLLIN; - pollfds[1].fd = timer_thread_pipe.low[0]; - pollfds[1].events = POLLIN; - - need_polling = !ubf_threads_empty(); - - if (SIGCHLD_LOSSY && !need_polling) { - rb_native_mutex_lock(&vm->waitpid_lock); - if (!list_empty(&vm->waiting_pids) || !list_empty(&vm->waiting_grps)) { - need_polling = 1; - } - rb_native_mutex_unlock(&vm->waitpid_lock); - } - - if (vm->gvl.waiting > 0 || need_polling) { - /* polling (TIME_QUANTUM_USEC usec) */ - result = poll(pollfds, 1, TIME_QUANTUM_USEC/1000); - } - else { - /* wait (infinite) */ - result = poll(pollfds, numberof(pollfds), -1); - } - - if (result == 0) { - /* maybe timeout */ - } - else if (result > 0) { - consume_communication_pipe(timer_thread_pipe.normal[0]); - consume_communication_pipe(timer_thread_pipe.low[0]); - } - else { /* result < 0 */ - int e = errno; - switch (e) { - case EBADF: - case EINVAL: - case ENOMEM: /* from Linux man */ - case EFAULT: /* from FreeBSD man */ - rb_async_bug_errno("thread_timer: select", e); - default: - /* ignore */; - } - } -} -#endif /* TIMER_THREAD_SLEEPY */ - -#if TIMER_IMPL == TIMER_THREAD_BUSY -# define PER_NANO 1000000000 -void rb_thread_wakeup_timer_thread(void) {} -static void rb_thread_wakeup_timer_thread_low(void) {} - -static rb_nativethread_lock_t timer_thread_lock; -static rb_nativethread_cond_t timer_thread_cond; - -static inline void -timer_thread_sleep(rb_vm_t *unused) -{ - struct timespec ts; - ts.tv_sec = 0; - ts.tv_nsec = TIME_QUANTUM_USEC * 1000; - ts = native_cond_timeout(&timer_thread_cond, ts); - - native_cond_timedwait(&timer_thread_cond, &timer_thread_lock, &ts); -} -#endif /* TIMER_IMPL == TIMER_THREAD_BUSY */ - #if !defined(SET_CURRENT_THREAD_NAME) && defined(__linux__) && defined(PR_SET_NAME) # define SET_CURRENT_THREAD_NAME(name) prctl(PR_SET_NAME, name) #endif @@ -1523,137 +1385,26 @@ native_set_another_thread_name(rb_nativethread_id_t thread_id, VALUE name) return name; } -static void * -thread_timer(void *p) -{ - rb_vm_t *vm = p; -#ifdef HAVE_PTHREAD_SIGMASK /* mainly to enable SIGCHLD */ - { - sigset_t mask; - sigemptyset(&mask); - pthread_sigmask(SIG_SETMASK, &mask, NULL); - } -#endif - - if (TT_DEBUG) WRITE_CONST(2, "start timer thread\n"); - -#ifdef SET_CURRENT_THREAD_NAME - SET_CURRENT_THREAD_NAME("ruby-timer-thr"); -#endif - -#if TIMER_IMPL == TIMER_THREAD_BUSY - rb_native_mutex_initialize(&timer_thread_lock); - rb_native_cond_initialize(&timer_thread_cond); - rb_native_mutex_lock(&timer_thread_lock); -#endif - while (system_working > 0) { - - /* timer function */ - ubf_wakeup_all_threads(); - timer_thread_function(0); - - if (TT_DEBUG) WRITE_CONST(2, "tick\n"); - - /* wait */ - timer_thread_sleep(vm); - } -#if TIMER_IMPL == TIMER_THREAD_BUSY - rb_native_mutex_unlock(&timer_thread_lock); - rb_native_cond_destroy(&timer_thread_cond); - rb_native_mutex_destroy(&timer_thread_lock); -#endif - - if (TT_DEBUG) WRITE_CONST(2, "finish timer thread\n"); - return NULL; -} - -#if (TIMER_IMPL & TIMER_THREAD_MASK) static void rb_thread_create_timer_thread(void) { - if (!timer_thread.created) { - size_t stack_size = 0; - int err; - pthread_attr_t attr; - rb_vm_t *vm = GET_VM(); + /* we only create the pipe, and lazy-spawn */ + rb_pid_t current = getpid(); + rb_pid_t owner = timer_thread_pipe.owner_process; - err = pthread_attr_init(&attr); - if (err != 0) { - rb_warn("pthread_attr_init failed for timer: %s, scheduling broken", - strerror(err)); - return; - } -# ifdef PTHREAD_STACK_MIN - { - size_t stack_min = PTHREAD_STACK_MIN; /* may be dynamic, get only once */ - const size_t min_size = (4096 * 4); - /* Allocate the machine stack for the timer thread - * at least 16KB (4 pages). FreeBSD 8.2 AMD64 causes - * machine stack overflow only with PTHREAD_STACK_MIN. - */ - enum { - needs_more_stack = -#if defined HAVE_VALGRIND_MEMCHECK_H && defined __APPLE__ - 1 -#else - THREAD_DEBUG != 0 -#endif - }; - stack_size = stack_min; - if (stack_size < min_size) stack_size = min_size; - if (needs_more_stack) { - stack_size += +((BUFSIZ - 1) / stack_min + 1) * stack_min; - } - err = pthread_attr_setstacksize(&attr, stack_size); - if (err != 0) { - rb_bug("pthread_attr_setstacksize(.., %"PRIuSIZE") failed: %s", - stack_size, strerror(err)); - } - } -# endif + if (owner && owner != current) { + CLOSE_INVALIDATE(normal[0]); + CLOSE_INVALIDATE(normal[1]); + } -#if TIMER_IMPL == TIMER_THREAD_SLEEPY - err = setup_communication_pipe(); - if (err) return; -#endif /* TIMER_THREAD_SLEEPY */ + if (setup_communication_pipe_internal(timer_thread_pipe.normal) < 0) return; - /* create timer thread */ - if (timer_thread.created) { - rb_bug("rb_thread_create_timer_thread: Timer thread was already created\n"); - } - err = pthread_create(&timer_thread.id, &attr, thread_timer, vm); - pthread_attr_destroy(&attr); - - if (err == EINVAL) { - /* - * Even if we are careful with our own stack use in thread_timer(), - * any third-party libraries (eg libkqueue) which rely on __thread - * storage can cause small stack sizes to fail. So lets hope the - * default stack size is enough for them: - */ - stack_size = 0; - err = pthread_create(&timer_thread.id, NULL, thread_timer, vm); - } - if (err != 0) { - rb_warn("pthread_create failed for timer: %s, scheduling broken", - strerror(err)); - if (stack_size) { - rb_warn("timer thread stack size: %"PRIuSIZE, stack_size); - } - else { - rb_warn("timer thread stack size: system default"); - } - VM_ASSERT(err == 0); - return; - } -#if TIMER_IMPL == TIMER_THREAD_SLEEPY - /* validate pipe on this process */ - timer_thread_pipe.owner_process = getpid(); -#endif /* TIMER_THREAD_SLEEPY */ - timer_thread.created = 1; + if (owner != current) { + /* validate pipe on this process */ + sigwait_th = THREAD_INVALID; + timer_thread_pipe.owner_process = current; } } -#endif /* TIMER_IMPL & TIMER_THREAD_MASK */ static int native_stop_timer_thread(void) @@ -1662,24 +1413,6 @@ native_stop_timer_thread(void) stopped = --system_working <= 0; if (TT_DEBUG) fprintf(stderr, "stop timer thread\n"); - if (stopped) { -#if TIMER_IMPL == TIMER_THREAD_SLEEPY - /* kick timer thread out of sleep */ - rb_thread_wakeup_timer_thread_fd(timer_thread_pipe.normal[1]); -#endif - - /* timer thread will stop looping when system_working <= 0: */ - native_thread_join(timer_thread.id); - - /* - * don't care if timer_thread_pipe may fill up at this point. - * If we restart timer thread, signals will be processed, if - * we don't, it's because we're in a different child - */ - - if (TT_DEBUG) fprintf(stderr, "joined timer thread\n"); - timer_thread.created = 0; - } return stopped; } @@ -1736,20 +1469,14 @@ ruby_stack_overflowed_p(const rb_thread_t *th, const void *addr) int rb_reserved_fd_p(int fd) { -#if TIMER_IMPL == TIMER_THREAD_SLEEPY if ((fd == timer_thread_pipe.normal[0] || - fd == timer_thread_pipe.normal[1] || - fd == timer_thread_pipe.low[0] || - fd == timer_thread_pipe.low[1]) && + fd == timer_thread_pipe.normal[1]) && timer_thread_pipe.owner_process == getpid()) { /* async-signal-safe */ return 1; } else { return 0; } -#else - return 0; -#endif } rb_nativethread_id_t @@ -1811,4 +1538,65 @@ rb_sleep_cond_put(rb_nativethread_cond_t *cond) } #endif /* USE_NATIVE_SLEEP_COND */ +int +rb_sigwait_fd_get(const rb_thread_t *th) +{ + if (timer_thread_pipe.owner_process == getpid() && + timer_thread_pipe.normal[0] >= 0) { + if (ATOMIC_PTR_CAS(sigwait_th, THREAD_INVALID, th) == THREAD_INVALID) { + return timer_thread_pipe.normal[0]; + } + } + return -1; /* avoid thundering herd */ +} + +void +rb_sigwait_fd_put(const rb_thread_t *th, int fd) +{ + const rb_thread_t *old; + + VM_ASSERT(timer_thread_pipe.normal[0] == fd); + old = ATOMIC_PTR_EXCHANGE(sigwait_th, THREAD_INVALID); + if (old != th) assert(old == th); +} + +void +rb_sigwait_sleep(rb_thread_t *th, int sigwait_fd, const struct timespec *ts) +{ + struct pollfd pfd; + + pfd.fd = sigwait_fd; + pfd.events = POLLIN; + + (void)ppoll(&pfd, 1, ts, 0); + check_signals_nogvl(th, sigwait_fd); +} + +static void +native_sleep(rb_thread_t *th, struct timespec *timeout_rel) +{ + int sigwait_fd = rb_sigwait_fd_get(th); + + if (sigwait_fd >= 0) { + rb_native_mutex_lock(&th->interrupt_lock); + th->unblock.func = ubf_sigwait; + rb_native_mutex_unlock(&th->interrupt_lock); + + GVL_UNLOCK_BEGIN(th); + + if (!RUBY_VM_INTERRUPTED(th->ec)) { + rb_sigwait_sleep(th, sigwait_fd, timeout_rel); + } + else { + check_signals_nogvl(th, sigwait_fd); + } + unblock_function_clear(th); + rb_sigwait_fd_put(th, sigwait_fd); + rb_sigwait_fd_migrate(th->vm); + GVL_UNLOCK_END(th); + } + else { + native_cond_sleep(th, timeout_rel); + } +} #endif /* THREAD_SYSTEM_DEPENDENT_IMPLEMENTATION */ -- cgit v1.2.3