aboutsummaryrefslogtreecommitdiffstats
path: root/io.c
diff options
context:
space:
mode:
authorSamuel Williams <samuel.williams@oriontransfer.co.nz>2020-05-14 22:10:55 +1200
committerGitHub <noreply@github.com>2020-05-14 22:10:55 +1200
commit0e3b0fcdba70cf96a8e0654eb8f50aacb8024bd4 (patch)
tree74d381412dfd8ff49dd3039f8aeae09ad9e4e6e3 /io.c
parent336119dfc5e6baae0a936d6feae780a61975479c (diff)
downloadruby-0e3b0fcdba70cf96a8e0654eb8f50aacb8024bd4.tar.gz
Thread scheduler for light weight concurrency.
Diffstat (limited to 'io.c')
-rw-r--r--io.c113
1 files changed, 72 insertions, 41 deletions
diff --git a/io.c b/io.c
index fc817f96b2..26202e0806 100644
--- a/io.c
+++ b/io.c
@@ -177,15 +177,6 @@ off_t __syscall(quad_t number, ...);
#define rename(f, t) rb_w32_urename((f), (t))
#endif
-#if defined(_WIN32)
-# define RUBY_PIPE_NONBLOCK_DEFAULT (0)
-#elif defined(O_NONBLOCK)
- /* disabled for [Bug #15356] (Rack::Deflater + rails) failure: */
-# define RUBY_PIPE_NONBLOCK_DEFAULT (0)
-#else /* any platforms where O_NONBLOCK does not exist? */
-# define RUBY_PIPE_NONBLOCK_DEFAULT (0)
-#endif
-
VALUE rb_cIO;
VALUE rb_eEOFError;
VALUE rb_eIOError;
@@ -406,44 +397,37 @@ rb_fd_set_nonblock(int fd)
}
int
-rb_cloexec_pipe(int fildes[2])
+rb_cloexec_pipe(int descriptors[2])
{
- int ret;
-
-#if defined(HAVE_PIPE2)
- static int try_pipe2 = 1;
- if (try_pipe2) {
- ret = pipe2(fildes, O_CLOEXEC | RUBY_PIPE_NONBLOCK_DEFAULT);
- if (ret != -1)
- return ret;
- /* pipe2 is available since Linux 2.6.27, glibc 2.9. */
- if (errno == ENOSYS) {
- try_pipe2 = 0;
- ret = pipe(fildes);
- }
- }
- else {
- ret = pipe(fildes);
- }
+#ifdef HAVE_PIPE2
+ int result = pipe2(descriptors, O_CLOEXEC | O_NONBLOCK);
#else
- ret = pipe(fildes);
+ int result = pipe(descriptors);
#endif
- if (ret < 0) return ret;
+
+ if (result < 0)
+ return result;
+
#ifdef __CYGWIN__
- if (ret == 0 && fildes[1] == -1) {
- close(fildes[0]);
- fildes[0] = -1;
- errno = ENFILE;
- return -1;
+ if (ret == 0 && descriptors[1] == -1) {
+ close(descriptors[0]);
+ descriptors[0] = -1;
+ errno = ENFILE;
+ return -1;
}
#endif
- rb_maygvl_fd_fix_cloexec(fildes[0]);
- rb_maygvl_fd_fix_cloexec(fildes[1]);
- if (RUBY_PIPE_NONBLOCK_DEFAULT) {
- rb_fd_set_nonblock(fildes[0]);
- rb_fd_set_nonblock(fildes[1]);
- }
- return ret;
+
+#ifndef HAVE_PIPE2
+ rb_maygvl_fd_fix_cloexec(descriptors[0]);
+ rb_maygvl_fd_fix_cloexec(descriptors[1]);
+
+#ifndef _WIN32
+ rb_fd_set_nonblock(descriptors[0]);
+ rb_fd_set_nonblock(descriptors[1]);
+#endif
+#endif
+
+ return result;
}
int
@@ -1270,6 +1254,12 @@ io_fflush(rb_io_t *fptr)
int
rb_io_wait_readable(int f)
{
+ VALUE scheduler = rb_current_thread_scheduler();
+ if (scheduler != Qnil) {
+ VALUE result = rb_funcall(scheduler, rb_intern("wait_readable_fd"), 1, INT2NUM(f));
+ return RTEST(result);
+ }
+
io_fd_check_closed(f);
switch (errno) {
case EINTR:
@@ -1294,6 +1284,12 @@ rb_io_wait_readable(int f)
int
rb_io_wait_writable(int f)
{
+ VALUE scheduler = rb_current_thread_scheduler();
+ if (scheduler != Qnil) {
+ VALUE result = rb_funcall(scheduler, rb_intern("wait_writable_fd"), 1, INT2NUM(f));
+ return RTEST(result);
+ }
+
io_fd_check_closed(f);
switch (errno) {
case EINTR:
@@ -10897,6 +10893,23 @@ maygvl_copy_stream_continue_p(int has_gvl, struct copy_stream_struct *stp)
return FALSE;
}
+struct wait_for_single_fd {
+ VALUE scheduler;
+
+ int fd;
+ short events;
+
+ VALUE result;
+};
+
+void * rb_thread_scheduler_wait_for_single_fd(void * _args) {
+ struct wait_for_single_fd *args = (struct wait_for_single_fd *)_args;
+
+ args->result = rb_funcall(args->scheduler, rb_intern("wait_for_single_fd"), 3, INT2NUM(args->fd), INT2NUM(args->events), Qnil);
+
+ return NULL;
+}
+
#if USE_POLL
# define IOWAIT_SYSCALL "poll"
STATIC_ASSERT(pollin_expected, POLLIN == RB_WAITFD_IN);
@@ -10904,6 +10917,13 @@ STATIC_ASSERT(pollout_expected, POLLOUT == RB_WAITFD_OUT);
static int
nogvl_wait_for_single_fd(int fd, short events)
{
+ VALUE scheduler = rb_current_thread_scheduler();
+ if (scheduler != Qnil) {
+ struct wait_for_single_fd args = {.scheduler = scheduler, .fd = fd, .events = events};
+ rb_thread_call_with_gvl(rb_thread_scheduler_wait_for_single_fd, &args);
+ return RTEST(args.result);
+ }
+
struct pollfd fds;
fds.fd = fd;
@@ -10916,6 +10936,13 @@ nogvl_wait_for_single_fd(int fd, short events)
static int
nogvl_wait_for_single_fd(int fd, short events)
{
+ VALUE scheduler = rb_current_thread_scheduler();
+ if (scheduler != Qnil) {
+ struct wait_for_single_fd args = {.scheduler = scheduler, .fd = fd, .events = events};
+ rb_thread_call_with_gvl(rb_thread_scheduler_wait_for_single_fd, &args);
+ return RTEST(args.result);
+ }
+
rb_fdset_t fds;
int ret;
@@ -13283,6 +13310,10 @@ Init_IO(void)
rb_cIO = rb_define_class("IO", rb_cObject);
rb_include_module(rb_cIO, rb_mEnumerable);
+ rb_define_const(rb_cIO, "WAIT_READABLE", INT2NUM(RB_WAITFD_IN));
+ rb_define_const(rb_cIO, "WAIT_PRIORITY", INT2NUM(RB_WAITFD_PRI));
+ rb_define_const(rb_cIO, "WAIT_WRITABLE", INT2NUM(RB_WAITFD_OUT));
+
/* exception to wait for reading. see IO.select. */
rb_mWaitReadable = rb_define_module_under(rb_cIO, "WaitReadable");
/* exception to wait for writing. see IO.select. */