From bcfc22b10e30771e692f7716a1fec7577373963e Mon Sep 17 00:00:00 2001 From: kosaki Date: Mon, 13 Jun 2011 14:14:53 +0000 Subject: * thread_pthread.c: rewrite GVL completely. * thread_win32.c: ditto. * thread_pthread.h: ditto. * vm_core.h: ditto. * thread.c: ditto. git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@32064 b2dd03c8-39d4-4d8f-98ff-823fe69b080e --- ChangeLog | 8 ++++ thread.c | 40 ++++++++++-------- thread_pthread.c | 123 +++++++++++++++++++++++++------------------------------ thread_pthread.h | 14 +++++-- thread_win32.c | 4 +- vm_core.h | 5 ++- 6 files changed, 102 insertions(+), 92 deletions(-) diff --git a/ChangeLog b/ChangeLog index 1c51536952..77353084b7 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,11 @@ +Mon Jun 13 23:06:12 2011 KOSAKI Motohiro + + * thread_pthread.c: rewrite GVL completely. + * thread_win32.c: ditto. + * thread_pthread.h: ditto. + * vm_core.h: ditto. + * thread.c: ditto. + Mon Jun 13 23:11:52 2011 Tanaka Akira * test/socket/test_unix.rb: don't use Thread.abort_on_exception. diff --git a/thread.c b/thread.c index caf0427d1a..2a6ba6d637 100644 --- a/thread.c +++ b/thread.c @@ -1015,7 +1015,7 @@ rb_thread_sleep(int sec) static void rb_threadptr_execute_interrupts_rec(rb_thread_t *, int); static void -rb_thread_schedule_rec(int sched_depth) +rb_thread_schedule_rec(int sched_depth, unsigned long limits_us) { thread_debug("rb_thread_schedule\n"); if (!rb_thread_alone()) { @@ -1024,11 +1024,19 @@ rb_thread_schedule_rec(int sched_depth) thread_debug("rb_thread_schedule/switch start\n"); RB_GC_SAVE_MACHINE_CONTEXT(th); + +#if HAVE_GVL_YIELD + { + if (th->running_time_us >= limits_us) + gvl_yield(th->vm, th); + } +#else gvl_release(th->vm); { native_thread_yield(); } gvl_acquire(th->vm, th); +#endif rb_thread_set_current(th); thread_debug("rb_thread_schedule/switch done\n"); @@ -1042,7 +1050,7 @@ rb_thread_schedule_rec(int sched_depth) void rb_thread_schedule(void) { - rb_thread_schedule_rec(0); + rb_thread_schedule_rec(0, 0); } /* blocking region */ @@ -1333,23 +1341,20 @@ rb_threadptr_execute_interrupts_rec(rb_thread_t *th, int sched_depth) } if (!sched_depth && timer_interrupt) { - sched_depth++; + unsigned long limits_us = 250 * 1000; + + if (th->priority > 0) + limits_us <<= th->priority; + else + limits_us >>= -th->priority; + + if (status == THREAD_RUNNABLE) + th->running_time_us += TIME_QUANTUM_USEC; + + sched_depth++; EXEC_EVENT_HOOK(th, RUBY_EVENT_SWITCH, th->cfp->self, 0, 0); - if (th->slice > 0) { - th->slice--; - } - else { - reschedule: - rb_thread_schedule_rec(sched_depth+1); - if (th->slice < 0) { - th->slice++; - goto reschedule; - } - else { - th->slice = th->priority; - } - } + rb_thread_schedule_rec(sched_depth+1, limits_us); } } } @@ -2293,7 +2298,6 @@ rb_thread_priority_set(VALUE thread, VALUE prio) priority = RUBY_THREAD_PRIORITY_MIN; } th->priority = priority; - th->slice = priority; #endif return INT2NUM(th->priority); } diff --git a/thread_pthread.c b/thread_pthread.c index 028fb3f9b5..cd1e7bc48b 100644 --- a/thread_pthread.c +++ b/thread_pthread.c @@ -37,92 +37,79 @@ static void native_cond_destroy(rb_thread_cond_t *cond); #define USE_MONOTONIC_COND 0 #endif -#define GVL_SIMPLE_LOCK 0 #define GVL_DEBUG 0 static void -gvl_show_waiting_threads(rb_vm_t *vm) +__gvl_acquire(rb_vm_t *vm) { - rb_thread_t *th = vm->gvl.waiting_threads; - int i = 0; - while (th) { - fprintf(stderr, "waiting (%d): %p\n", i++, (void *)th); - th = th->native_thread_data.gvl_next; + + if (vm->gvl.acquired) { + vm->gvl.waiting++; + while (vm->gvl.acquired) { + native_cond_wait(&vm->gvl.cond, &vm->gvl.lock); + } + vm->gvl.waiting--; + + if (vm->gvl.need_yield) { + vm->gvl.need_yield = 0; + native_cond_signal(&vm->gvl.switch_cond); + } } + + vm->gvl.acquired = 1; } -#if !GVL_SIMPLE_LOCK static void -gvl_waiting_push(rb_vm_t *vm, rb_thread_t *th) +gvl_acquire(rb_vm_t *vm, rb_thread_t *th) { - th->native_thread_data.gvl_next = 0; - - if (vm->gvl.waiting_threads) { - vm->gvl.waiting_last_thread->native_thread_data.gvl_next = th; - vm->gvl.waiting_last_thread = th; - } - else { - vm->gvl.waiting_threads = th; - vm->gvl.waiting_last_thread = th; - } - th = vm->gvl.waiting_threads; - vm->gvl.waiting++; + native_mutex_lock(&vm->gvl.lock); + __gvl_acquire(vm); + native_mutex_unlock(&vm->gvl.lock); } static void -gvl_waiting_shift(rb_vm_t *vm, rb_thread_t *th) +__gvl_release(rb_vm_t *vm) { - vm->gvl.waiting_threads = vm->gvl.waiting_threads->native_thread_data.gvl_next; - vm->gvl.waiting--; + vm->gvl.acquired = 0; + if (vm->gvl.waiting > 0) + native_cond_signal(&vm->gvl.cond); } -#endif static void -gvl_acquire(rb_vm_t *vm, rb_thread_t *th) +gvl_release(rb_vm_t *vm) { -#if GVL_SIMPLE_LOCK - native_mutex_lock(&vm->gvl.lock); -#else native_mutex_lock(&vm->gvl.lock); - if (vm->gvl.waiting > 0 || vm->gvl.acquired != 0) { - if (GVL_DEBUG) fprintf(stderr, "gvl acquire (%p): sleep\n", (void *)th); - gvl_waiting_push(vm, th); - if (GVL_DEBUG) gvl_show_waiting_threads(vm); - - while (vm->gvl.acquired != 0 || vm->gvl.waiting_threads != th) { - native_cond_wait(&th->native_thread_data.gvl_cond, &vm->gvl.lock); - } - gvl_waiting_shift(vm, th); - } - else { - /* do nothing */ - } - vm->gvl.acquired = 1; + __gvl_release(vm); native_mutex_unlock(&vm->gvl.lock); -#endif - if (GVL_DEBUG) gvl_show_waiting_threads(vm); - if (GVL_DEBUG) fprintf(stderr, "gvl acquire (%p): acquire\n", (void *)th); } +#define HAVE_GVL_YIELD 1 static void -gvl_release(rb_vm_t *vm) +gvl_yield(rb_vm_t *vm, rb_thread_t *th) { -#if GVL_SIMPLE_LOCK - native_mutex_unlock(&vm->gvl.lock); -#else native_mutex_lock(&vm->gvl.lock); - if (vm->gvl.waiting > 0) { - rb_thread_t *th = vm->gvl.waiting_threads; - if (GVL_DEBUG) fprintf(stderr, "gvl release (%p): wakeup: %p\n", (void *)GET_THREAD(), (void *)th); - native_cond_signal(&th->native_thread_data.gvl_cond); + + /* An another thread is processing GVL yield. */ + if (vm->gvl.need_yield) { + native_mutex_unlock(&vm->gvl.lock); + return; } - else { - if (GVL_DEBUG) fprintf(stderr, "gvl release (%p): wakeup: %p\n", (void *)GET_THREAD(), NULL); - /* do nothing */ + + if (vm->gvl.waiting > 0) + vm->gvl.need_yield = 1; + + __gvl_release(vm); + if (vm->gvl.need_yield) { + /* Wait until another thread task take GVL. */ + native_cond_wait(&vm->gvl.switch_cond, &vm->gvl.lock); + } else { + native_mutex_unlock(&vm->gvl.lock); + sched_yield(); + native_mutex_lock(&vm->gvl.lock); } - vm->gvl.acquired = 0; + + __gvl_acquire(vm); native_mutex_unlock(&vm->gvl.lock); -#endif } static void @@ -130,15 +117,12 @@ gvl_init(rb_vm_t *vm) { if (GVL_DEBUG) fprintf(stderr, "gvl init\n"); -#if GVL_SIMPLE_LOCK - native_mutex_initialize(&vm->gvl.lock); -#else native_mutex_initialize(&vm->gvl.lock); - vm->gvl.waiting_threads = 0; - vm->gvl.waiting_last_thread = 0; - vm->gvl.waiting = 0; + native_cond_initialize(&vm->gvl.cond, RB_CONDATTR_CLOCK_MONOTONIC); + native_cond_initialize(&vm->gvl.switch_cond, RB_CONDATTR_CLOCK_MONOTONIC); vm->gvl.acquired = 0; -#endif + vm->gvl.waiting = 0; + vm->gvl.need_yield = 0; } static void @@ -990,6 +974,11 @@ static pthread_t timer_thread_id; static rb_thread_cond_t timer_thread_cond; static pthread_mutex_t timer_thread_lock = PTHREAD_MUTEX_INITIALIZER; +/* 100ms. 10ms is too small for user level thread scheduling + * on recent Linux (tested on 2.6.35) + */ +#define TIME_QUANTUM_USEC (100 * 1000) + static void * thread_timer(void *dummy) { @@ -997,7 +986,7 @@ thread_timer(void *dummy) struct timespec timeout; timeout_10ms.tv_sec = 0; - timeout_10ms.tv_nsec = 10 * 1000 * 1000; + timeout_10ms.tv_nsec = TIME_QUANTUM_USEC * 1000; native_mutex_lock(&timer_thread_lock); native_cond_broadcast(&timer_thread_cond); diff --git a/thread_pthread.h b/thread_pthread.h index 97a26e0e80..781712c3f6 100644 --- a/thread_pthread.h +++ b/thread_pthread.h @@ -35,11 +35,17 @@ typedef struct native_thread_data_struct { #include typedef struct rb_global_vm_lock_struct { + /* fast path */ + unsigned long acquired; pthread_mutex_t lock; - struct rb_thread_struct * volatile waiting_threads; - struct rb_thread_struct *waiting_last_thread; - int waiting; - int volatile acquired; + + /* slow path */ + unsigned long waiting; + rb_thread_cond_t cond; + + /* yield */ + rb_thread_cond_t switch_cond; + unsigned long need_yield; } rb_global_vm_lock_t; #endif /* RUBY_THREAD_PTHREAD_H */ diff --git a/thread_win32.c b/thread_win32.c index c0a026c7b6..da335e88ff 100644 --- a/thread_win32.c +++ b/thread_win32.c @@ -13,7 +13,7 @@ #include -#define WIN32_WAIT_TIMEOUT 10 /* 10 ms */ +#define TIME_QUANTUM_USEC (100 * 1000) #define RB_CONDATTR_CLOCK_MONOTONIC 1 /* no effect */ #undef Sleep @@ -680,7 +680,7 @@ static unsigned long _stdcall timer_thread_func(void *dummy) { thread_debug("timer_thread\n"); - while (WaitForSingleObject(timer_thread_lock, WIN32_WAIT_TIMEOUT) == + while (WaitForSingleObject(timer_thread_lock, TIME_QUANTUM_USEC/1000) == WAIT_TIMEOUT) { timer_thread_function(dummy); } diff --git a/vm_core.h b/vm_core.h index 4d1780f303..98ec9b4c51 100644 --- a/vm_core.h +++ b/vm_core.h @@ -419,7 +419,6 @@ typedef struct rb_thread_struct { rb_thread_id_t thread_id; enum rb_thread_status status; int priority; - int slice; native_thread_data_t native_thread_data; void *blocking_region_buffer; @@ -484,6 +483,7 @@ typedef struct rb_thread_struct { #ifdef USE_SIGALTSTACK void *altstack; #endif + unsigned long running_time_us; } rb_thread_t; /* iseq.c */ @@ -673,6 +673,9 @@ extern rb_vm_t *ruby_current_vm; #define GET_THREAD() ruby_current_thread #define rb_thread_set_current_raw(th) (void)(ruby_current_thread = (th)) #define rb_thread_set_current(th) do { \ + if ((th)->vm->running_thread != (th)) { \ + (th)->vm->running_thread->running_time_us = 0; \ + } \ rb_thread_set_current_raw(th); \ (th)->vm->running_thread = (th); \ } while (0) -- cgit v1.2.3