diff options
Diffstat (limited to 'thread.c')
-rw-r--r-- | thread.c | 71 |
1 files changed, 54 insertions, 17 deletions
@@ -110,8 +110,8 @@ static int timespec_cmp(const struct timespec *a, const struct timespec *b); static int timespec_update_expire(struct timespec *, const struct timespec *); static void getclockofday(struct timespec *); NORETURN(static void async_bug_fd(const char *mesg, int errno_arg, int fd)); -static void consume_communication_pipe(int fd); -static void check_signals_nogvl(rb_thread_t *, int sigwait_fd); +static int consume_communication_pipe(int fd); +static int check_signals_nogvl(rb_thread_t *, int sigwait_fd); void rb_sigwait_fd_migrate(rb_vm_t *); /* process.c */ #define eKillSignal INT2FIX(0) @@ -385,6 +385,15 @@ ubf_sigwait(void *ignore) #error "unsupported thread type" #endif +/* + * TODO: somebody with win32 knowledge should be able to get rid of + * timer-thread by busy-waiting on signals. And it should be possible + * to make the GVL in thread_pthread.c be platform-independent. + */ +#ifndef BUSY_WAIT_SIGNALS +# define BUSY_WAIT_SIGNALS (0) +#endif + #if THREAD_DEBUG static int debug_mutex_initialized = 1; static rb_nativethread_lock_t debug_mutex; @@ -2173,7 +2182,7 @@ rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing) int sigwait_fd = rb_sigwait_fd_get(th); if (sigwait_fd >= 0) { - consume_communication_pipe(sigwait_fd); + (void)consume_communication_pipe(sigwait_fd); ruby_sigchld_handler(th->vm); rb_sigwait_fd_put(th, sigwait_fd); rb_sigwait_fd_migrate(th->vm); @@ -3885,6 +3894,21 @@ select_set_free(VALUE p) return Qfalse; } +static const struct timespec * +sigwait_timeout(rb_thread_t *th, int sigwait_fd, const struct timespec *orig, + int *drained_p) +{ + static const struct timespec quantum = { 0, TIME_QUANTUM_USEC * 1000 }; + + if (sigwait_fd >= 0 && (!ubf_threads_empty() || BUSY_WAIT_SIGNALS)) { + *drained_p = check_signals_nogvl(th, sigwait_fd); + if (!orig || timespec_cmp(orig, &quantum) > 0) + return &quantum; + } + + return orig; +} + static VALUE do_select(VALUE p) { @@ -3892,6 +3916,8 @@ do_select(VALUE p) int MAYBE_UNUSED(result); int lerrno; struct timespec ts, end, *tsp; + const struct timespec *to; + struct timeval tv; timeout_prepare(&tsp, &ts, &end, set->timeout); #define restore_fdset(dst, src) \ @@ -3903,17 +3929,20 @@ do_select(VALUE p) TRUE) do { + int drained; lerrno = 0; BLOCKING_REGION(set->th, { + to = sigwait_timeout(set->th, set->sigwait_fd, tsp, &drained); result = native_fd_select(set->max, set->rset, set->wset, set->eset, - timeval_for(set->timeout, tsp), set->th); + timeval_for(&tv, to), set->th); if (result < 0) lerrno = errno; }, set->sigwait_fd >= 0 ? ubf_sigwait : ubf_select, set->th, FALSE); - if (set->sigwait_fd >= 0 && rb_fd_isset(set->sigwait_fd, set->rset)) { - result--; - check_signals_nogvl(set->th, set->sigwait_fd); + if (set->sigwait_fd >= 0) { + if (result > 0 && rb_fd_isset(set->sigwait_fd, set->rset)) + result--; + (void)check_signals_nogvl(set->th, set->sigwait_fd); } RUBY_VM_CHECK_INTS_BLOCKING(set->th->ec); /* may raise */ @@ -4042,6 +4071,8 @@ rb_wait_for_single_fd(int fd, int events, struct timeval *timeout) struct pollfd fds[2]; int result = 0, lerrno; struct timespec ts, end, *tsp; + const struct timespec *to; + int drained; rb_thread_t *th = GET_THREAD(); nfds_t nfds; rb_unblock_function_t *ubf; @@ -4066,16 +4097,17 @@ rb_wait_for_single_fd(int fd, int events, struct timeval *timeout) lerrno = 0; BLOCKING_REGION(th, { - result = ppoll(fds, nfds, tsp, NULL); + to = sigwait_timeout(th, fds[1].fd, tsp, &drained); + result = ppoll(fds, nfds, to, NULL); if (result < 0) lerrno = errno; }, ubf, th, FALSE); if (fds[1].fd >= 0) { - if (fds[1].revents) { + if (result > 0 && fds[1].revents) { result--; - check_signals_nogvl(th, fds[1].fd); fds[1].revents = 0; } + (void)check_signals_nogvl(th, fds[1].fd); rb_sigwait_fd_put(th, fds[1].fd); rb_sigwait_fd_migrate(th->vm); } @@ -4228,18 +4260,22 @@ async_bug_fd(const char *mesg, int errno_arg, int fd) } /* VM-dependent API is not available for this function */ -static void +static int consume_communication_pipe(int fd) { #define CCP_READ_BUFF_SIZE 1024 /* buffer can be shared because no one refers to them. */ static char buff[CCP_READ_BUFF_SIZE]; ssize_t result; + int ret = FALSE; /* for rb_sigwait_sleep */ while (1) { result = read(fd, buff, sizeof(buff)); - if (result == 0) { - return; + if (result > 0) { + ret = TRUE; + } + else if (result == 0) { + return ret; } else if (result < 0) { int e = errno; @@ -4250,7 +4286,7 @@ consume_communication_pipe(int fd) #if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN case EWOULDBLOCK: #endif - return; + return ret; default: async_bug_fd("consume_communication_pipe: read", e, fd); } @@ -4258,12 +4294,11 @@ consume_communication_pipe(int fd) } } -static void +static int check_signals_nogvl(rb_thread_t *th, int sigwait_fd) { rb_vm_t *vm = GET_VM(); /* th may be 0 */ - - consume_communication_pipe(sigwait_fd); + int ret = consume_communication_pipe(sigwait_fd); ubf_wakeup_all_threads(); ruby_sigchld_handler(vm); if (rb_signal_buff_size()) { @@ -4272,7 +4307,9 @@ check_signals_nogvl(rb_thread_t *th, int sigwait_fd) RUBY_VM_SET_TRAP_INTERRUPT(th->ec); else threadptr_trap_interrupt(vm->main_thread); + ret = TRUE; /* for SIGCHLD_LOSSY && rb_sigwait_sleep */ } + return ret; } void |