From edee9b6a12ac846d7b3de2d704e170bf28178cb3 Mon Sep 17 00:00:00 2001 From: KJ Tsanaktsidis Date: Thu, 1 Jun 2023 16:37:18 +0800 Subject: Use a real Ruby mutex in rb_io_close_wait_list (#7884) Because a thread calling IO#close now blocks in a native condvar wait, it's possible for there to be _no_ threads left to actually handle incoming signals/ubf calls/etc. This manifested as failing tests on Solaris 10 (SPARC), because: * One thread called IO#close, which sent a SIGVTALRM to the other thread to interrupt it, and then waited on the condvar to be notified that the reading thread was done. * One thread was calling IO#read, but it hadn't yet reached the actual call to select(2) when the SIGVTALRM arrived, so it never unblocked itself. This results in a deadlock. The fix is to use a real Ruby mutex for the close lock; that way, the closing thread goes into sigwait-sleep and can keep trying to interrupt the select(2) thread. See the discussion in: https://github.com/ruby/ruby/pull/7865/ --- internal/thread.h | 7 ++-- io.c | 10 +----- thread.c | 96 ++++++++++++++++++++++++++++++------------------------- 3 files changed, 57 insertions(+), 56 deletions(-) diff --git a/internal/thread.h b/internal/thread.h index 7a6a860aeb..c41a16c12d 100644 --- a/internal/thread.h +++ b/internal/thread.h @@ -11,7 +11,6 @@ #include "ruby/ruby.h" /* for VALUE */ #include "ruby/intern.h" /* for rb_blocking_function_t */ #include "ccan/list/list.h" /* for list in rb_io_close_wait_list */ -#include "ruby/thread_native.h" /* for mutexes in rb_io_close_wait_list */ struct rb_thread_struct; /* in vm_core.h */ @@ -55,9 +54,9 @@ VALUE rb_exec_recursive_outer_mid(VALUE (*f)(VALUE g, VALUE h, int r), VALUE g, int rb_thread_wait_for_single_fd(int fd, int events, struct timeval * timeout); struct rb_io_close_wait_list { - struct ccan_list_head list; - rb_nativethread_lock_t mu; - rb_nativethread_cond_t cv; + struct ccan_list_head pending_fd_users; + VALUE closing_thread; + VALUE wakeup_mutex; }; int rb_notify_fd_close(int fd, struct rb_io_close_wait_list *busy); void rb_notify_fd_close_wait(struct rb_io_close_wait_list *busy); diff --git a/io.c b/io.c index b99041fbd1..e4df1a5ed5 100644 --- a/io.c +++ b/io.c @@ -5423,14 +5423,6 @@ maygvl_fclose(FILE *file, int keepgvl) static void free_io_buffer(rb_io_buffer_t *buf); static void clear_codeconv(rb_io_t *fptr); -static void* -call_close_wait_nogvl(void *arg) -{ - struct rb_io_close_wait_list *busy = (struct rb_io_close_wait_list *)arg; - rb_notify_fd_close_wait(busy); - return NULL; -} - static void fptr_finalize_flush(rb_io_t *fptr, int noraise, int keepgvl, struct rb_io_close_wait_list *busy) @@ -5476,7 +5468,7 @@ fptr_finalize_flush(rb_io_t *fptr, int noraise, int keepgvl, // Ensure waiting_fd users do not hit EBADF. if (busy) { // Wait for them to exit before we call close(). - (void)rb_thread_call_without_gvl(call_close_wait_nogvl, busy, RUBY_UBF_IO, 0); + rb_notify_fd_close_wait(busy); } // Disable for now. diff --git a/thread.c b/thread.c index 67fa18b1a6..e521d13fb5 100644 --- a/thread.c +++ b/thread.c @@ -1663,6 +1663,27 @@ rb_thread_call_without_gvl(void *(*func)(void *data), void *data1, return rb_nogvl(func, data1, ubf, data2, 0); } +static void +rb_thread_io_wake_pending_closer(struct waiting_fd *wfd) +{ + bool has_waiter = wfd->busy && RB_TEST(wfd->busy->wakeup_mutex); + if (has_waiter) { + rb_mutex_lock(wfd->busy->wakeup_mutex); + } + + /* Needs to be protected with RB_VM_LOCK because we don't know if + wfd is on the global list of pending FD ops or if it's on a + struct rb_io_close_wait_list close-waiter. */ + RB_VM_LOCK_ENTER(); + ccan_list_del(&wfd->wfd_node); + RB_VM_LOCK_LEAVE(); + + if (has_waiter) { + rb_thread_wakeup(wfd->busy->closing_thread); + rb_mutex_unlock(wfd->busy->wakeup_mutex); + } +} + VALUE rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd) { @@ -1700,20 +1721,9 @@ rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd) /* * must be deleted before jump - * this will delete either from waiting_fds or on-stack CCAN_LIST_HEAD(busy) + * this will delete either from waiting_fds or on-stack struct rb_io_close_wait_list */ - RB_VM_LOCK_ENTER(); - { - if (waiting_fd.busy) { - rb_native_mutex_lock(&waiting_fd.busy->mu); - } - ccan_list_del(&waiting_fd.wfd_node); - if (waiting_fd.busy) { - rb_native_cond_broadcast(&waiting_fd.busy->cv); - rb_native_mutex_unlock(&waiting_fd.busy->mu); - } - } - RB_VM_LOCK_LEAVE(); + rb_thread_io_wake_pending_closer(&waiting_fd); if (state) { EC_JUMP_TAG(ec, state); @@ -2474,8 +2484,9 @@ rb_notify_fd_close(int fd, struct rb_io_close_wait_list *busy) { rb_vm_t *vm = GET_THREAD()->vm; struct waiting_fd *wfd = 0, *next; - ccan_list_head_init(&busy->list); + ccan_list_head_init(&busy->pending_fd_users); int has_any; + VALUE wakeup_mutex; RB_VM_LOCK_ENTER(); { @@ -2485,7 +2496,7 @@ rb_notify_fd_close(int fd, struct rb_io_close_wait_list *busy) VALUE err; ccan_list_del(&wfd->wfd_node); - ccan_list_add(&busy->list, &wfd->wfd_node); + ccan_list_add(&busy->pending_fd_users, &wfd->wfd_node); wfd->busy = busy; err = th->vm->special_exceptions[ruby_error_stream_closed]; @@ -2494,34 +2505,39 @@ rb_notify_fd_close(int fd, struct rb_io_close_wait_list *busy) } } } - has_any = !ccan_list_empty(&busy->list); + + has_any = !ccan_list_empty(&busy->pending_fd_users); + busy->closing_thread = rb_thread_current(); + wakeup_mutex = Qnil; if (has_any) { - rb_native_mutex_initialize(&busy->mu); - rb_native_cond_initialize(&busy->cv); + wakeup_mutex = rb_mutex_new(); + RBASIC_CLEAR_CLASS(wakeup_mutex); /* hide from ObjectSpace */ } + busy->wakeup_mutex = wakeup_mutex; + RB_VM_LOCK_LEAVE(); + /* If the caller didn't pass *busy as a pointer to something on the stack, + we need to guard this mutex object on _our_ C stack for the duration + of this function. */ + RB_GC_GUARD(wakeup_mutex); return has_any; } void rb_notify_fd_close_wait(struct rb_io_close_wait_list *busy) { - rb_native_mutex_lock(&busy->mu); - while (!ccan_list_empty(&busy->list)) { - rb_native_cond_wait(&busy->cv, &busy->mu); - }; - rb_native_mutex_unlock(&busy->mu); - rb_native_mutex_destroy(&busy->mu); - rb_native_cond_destroy(&busy->cv); -} + if (!RB_TEST(busy->wakeup_mutex)) { + /* There was nobody else using this file when we closed it, so we + never bothered to allocate a mutex*/ + return; + } -static void* -call_notify_fd_close_wait_nogvl(void *arg) -{ - struct rb_io_close_wait_list *busy = (struct rb_io_close_wait_list *)arg; - rb_notify_fd_close_wait(busy); - return NULL; + rb_mutex_lock(busy->wakeup_mutex); + while (!ccan_list_empty(&busy->pending_fd_users)) { + rb_mutex_sleep(busy->wakeup_mutex, Qnil); + } + rb_mutex_unlock(busy->wakeup_mutex); } void @@ -2530,7 +2546,7 @@ rb_thread_fd_close(int fd) struct rb_io_close_wait_list busy; if (rb_notify_fd_close(fd, &busy)) { - rb_thread_call_without_gvl(call_notify_fd_close_wait_nogvl, &busy, RUBY_UBF_IO, 0); + rb_notify_fd_close_wait(&busy); } } @@ -4273,6 +4289,7 @@ rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout) wfd.th = GET_THREAD(); wfd.fd = fd; + wfd.busy = NULL; RB_VM_LOCK_ENTER(); { @@ -4324,11 +4341,7 @@ rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout) } EC_POP_TAG(); - RB_VM_LOCK_ENTER(); - { - ccan_list_del(&wfd.wfd_node); - } - RB_VM_LOCK_LEAVE(); + rb_thread_io_wake_pending_closer(&wfd); if (state) { EC_JUMP_TAG(wfd.th->ec, state); @@ -4402,11 +4415,7 @@ select_single_cleanup(VALUE ptr) { struct select_args *args = (struct select_args *)ptr; - RB_VM_LOCK_ENTER(); - { - ccan_list_del(&args->wfd.wfd_node); - } - RB_VM_LOCK_LEAVE(); + rb_thread_io_wake_pending_closer(&args->wfd); if (args->read) rb_fd_term(args->read); if (args->write) rb_fd_term(args->write); if (args->except) rb_fd_term(args->except); @@ -4429,6 +4438,7 @@ rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout) args.tv = timeout; args.wfd.fd = fd; args.wfd.th = GET_THREAD(); + args.wfd.busy = NULL; RB_VM_LOCK_ENTER(); { -- cgit v1.2.3