aboutsummaryrefslogtreecommitdiffstats
path: root/thread.c
diff options
context:
space:
mode:
authorKJ Tsanaktsidis <ktsanaktsidis@zendesk.com>2023-06-01 16:37:18 +0800
committerGitHub <noreply@github.com>2023-06-01 17:37:18 +0900
commitedee9b6a12ac846d7b3de2d704e170bf28178cb3 (patch)
treef0b4af6454d65cc42eb22f58073691bfbd8667e3 /thread.c
parentd8f333491e4c26df7ca577f40d7708d5aedf764f (diff)
downloadruby-edee9b6a12ac846d7b3de2d704e170bf28178cb3.tar.gz
Use a real Ruby mutex in rb_io_close_wait_list (#7884)
Because a thread calling IO#close now blocks in a native condvar wait, it's possible for there to be _no_ threads left to actually handle incoming signals/ubf calls/etc. This manifested as failing tests on Solaris 10 (SPARC), because: * One thread called IO#close, which sent a SIGVTALRM to the other thread to interrupt it, and then waited on the condvar to be notified that the reading thread was done. * One thread was calling IO#read, but it hadn't yet reached the actual call to select(2) when the SIGVTALRM arrived, so it never unblocked itself. This results in a deadlock. The fix is to use a real Ruby mutex for the close lock; that way, the closing thread goes into sigwait-sleep and can keep trying to interrupt the select(2) thread. See the discussion in: https://github.com/ruby/ruby/pull/7865/
Diffstat (limited to 'thread.c')
-rw-r--r--thread.c96
1 files changed, 53 insertions, 43 deletions
diff --git a/thread.c b/thread.c
index 67fa18b1a6..e521d13fb5 100644
--- a/thread.c
+++ b/thread.c
@@ -1663,6 +1663,27 @@ rb_thread_call_without_gvl(void *(*func)(void *data), void *data1,
return rb_nogvl(func, data1, ubf, data2, 0);
}
+static void
+rb_thread_io_wake_pending_closer(struct waiting_fd *wfd)
+{
+ bool has_waiter = wfd->busy && RB_TEST(wfd->busy->wakeup_mutex);
+ if (has_waiter) {
+ rb_mutex_lock(wfd->busy->wakeup_mutex);
+ }
+
+ /* Needs to be protected with RB_VM_LOCK because we don't know if
+ wfd is on the global list of pending FD ops or if it's on a
+ struct rb_io_close_wait_list close-waiter. */
+ RB_VM_LOCK_ENTER();
+ ccan_list_del(&wfd->wfd_node);
+ RB_VM_LOCK_LEAVE();
+
+ if (has_waiter) {
+ rb_thread_wakeup(wfd->busy->closing_thread);
+ rb_mutex_unlock(wfd->busy->wakeup_mutex);
+ }
+}
+
VALUE
rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd)
{
@@ -1700,20 +1721,9 @@ rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd)
/*
* must be deleted before jump
- * this will delete either from waiting_fds or on-stack CCAN_LIST_HEAD(busy)
+ * this will delete either from waiting_fds or on-stack struct rb_io_close_wait_list
*/
- RB_VM_LOCK_ENTER();
- {
- if (waiting_fd.busy) {
- rb_native_mutex_lock(&waiting_fd.busy->mu);
- }
- ccan_list_del(&waiting_fd.wfd_node);
- if (waiting_fd.busy) {
- rb_native_cond_broadcast(&waiting_fd.busy->cv);
- rb_native_mutex_unlock(&waiting_fd.busy->mu);
- }
- }
- RB_VM_LOCK_LEAVE();
+ rb_thread_io_wake_pending_closer(&waiting_fd);
if (state) {
EC_JUMP_TAG(ec, state);
@@ -2474,8 +2484,9 @@ rb_notify_fd_close(int fd, struct rb_io_close_wait_list *busy)
{
rb_vm_t *vm = GET_THREAD()->vm;
struct waiting_fd *wfd = 0, *next;
- ccan_list_head_init(&busy->list);
+ ccan_list_head_init(&busy->pending_fd_users);
int has_any;
+ VALUE wakeup_mutex;
RB_VM_LOCK_ENTER();
{
@@ -2485,7 +2496,7 @@ rb_notify_fd_close(int fd, struct rb_io_close_wait_list *busy)
VALUE err;
ccan_list_del(&wfd->wfd_node);
- ccan_list_add(&busy->list, &wfd->wfd_node);
+ ccan_list_add(&busy->pending_fd_users, &wfd->wfd_node);
wfd->busy = busy;
err = th->vm->special_exceptions[ruby_error_stream_closed];
@@ -2494,34 +2505,39 @@ rb_notify_fd_close(int fd, struct rb_io_close_wait_list *busy)
}
}
}
- has_any = !ccan_list_empty(&busy->list);
+
+ has_any = !ccan_list_empty(&busy->pending_fd_users);
+ busy->closing_thread = rb_thread_current();
+ wakeup_mutex = Qnil;
if (has_any) {
- rb_native_mutex_initialize(&busy->mu);
- rb_native_cond_initialize(&busy->cv);
+ wakeup_mutex = rb_mutex_new();
+ RBASIC_CLEAR_CLASS(wakeup_mutex); /* hide from ObjectSpace */
}
+ busy->wakeup_mutex = wakeup_mutex;
+
RB_VM_LOCK_LEAVE();
+ /* If the caller didn't pass *busy as a pointer to something on the stack,
+ we need to guard this mutex object on _our_ C stack for the duration
+ of this function. */
+ RB_GC_GUARD(wakeup_mutex);
return has_any;
}
void
rb_notify_fd_close_wait(struct rb_io_close_wait_list *busy)
{
- rb_native_mutex_lock(&busy->mu);
- while (!ccan_list_empty(&busy->list)) {
- rb_native_cond_wait(&busy->cv, &busy->mu);
- };
- rb_native_mutex_unlock(&busy->mu);
- rb_native_mutex_destroy(&busy->mu);
- rb_native_cond_destroy(&busy->cv);
-}
+ if (!RB_TEST(busy->wakeup_mutex)) {
+ /* There was nobody else using this file when we closed it, so we
+ never bothered to allocate a mutex*/
+ return;
+ }
-static void*
-call_notify_fd_close_wait_nogvl(void *arg)
-{
- struct rb_io_close_wait_list *busy = (struct rb_io_close_wait_list *)arg;
- rb_notify_fd_close_wait(busy);
- return NULL;
+ rb_mutex_lock(busy->wakeup_mutex);
+ while (!ccan_list_empty(&busy->pending_fd_users)) {
+ rb_mutex_sleep(busy->wakeup_mutex, Qnil);
+ }
+ rb_mutex_unlock(busy->wakeup_mutex);
}
void
@@ -2530,7 +2546,7 @@ rb_thread_fd_close(int fd)
struct rb_io_close_wait_list busy;
if (rb_notify_fd_close(fd, &busy)) {
- rb_thread_call_without_gvl(call_notify_fd_close_wait_nogvl, &busy, RUBY_UBF_IO, 0);
+ rb_notify_fd_close_wait(&busy);
}
}
@@ -4273,6 +4289,7 @@ rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout)
wfd.th = GET_THREAD();
wfd.fd = fd;
+ wfd.busy = NULL;
RB_VM_LOCK_ENTER();
{
@@ -4324,11 +4341,7 @@ rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout)
}
EC_POP_TAG();
- RB_VM_LOCK_ENTER();
- {
- ccan_list_del(&wfd.wfd_node);
- }
- RB_VM_LOCK_LEAVE();
+ rb_thread_io_wake_pending_closer(&wfd);
if (state) {
EC_JUMP_TAG(wfd.th->ec, state);
@@ -4402,11 +4415,7 @@ select_single_cleanup(VALUE ptr)
{
struct select_args *args = (struct select_args *)ptr;
- RB_VM_LOCK_ENTER();
- {
- ccan_list_del(&args->wfd.wfd_node);
- }
- RB_VM_LOCK_LEAVE();
+ rb_thread_io_wake_pending_closer(&args->wfd);
if (args->read) rb_fd_term(args->read);
if (args->write) rb_fd_term(args->write);
if (args->except) rb_fd_term(args->except);
@@ -4429,6 +4438,7 @@ rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout)
args.tv = timeout;
args.wfd.fd = fd;
args.wfd.th = GET_THREAD();
+ args.wfd.busy = NULL;
RB_VM_LOCK_ENTER();
{