aboutsummaryrefslogtreecommitdiffstats
path: root/lib/thread.rb
diff options
context:
space:
mode:
Diffstat (limited to 'lib/thread.rb')
-rw-r--r--lib/thread.rb99
1 files changed, 42 insertions, 57 deletions
diff --git a/lib/thread.rb b/lib/thread.rb
index f3dc9b15f9..e3aeed19fe 100644
--- a/lib/thread.rb
+++ b/lib/thread.rb
@@ -149,26 +149,23 @@ class Queue
#
def initialize
@que = []
- @waiting = []
@que.taint # enable tainted communication
- @waiting.taint
+ @num_waiting = 0
self.taint
@mutex = Mutex.new
+ @cond = ConditionVariable.new
end
#
# Pushes +obj+ to the queue.
#
def push(obj)
- @mutex.synchronize{
- @que.push obj
- begin
- t = @waiting.shift
- t.wakeup if t
- rescue ThreadError
- retry
+ Thread.async_interrupt_timing(StandardError => :on_blocking) do
+ @mutex.synchronize do
+ @que.push obj
+ @cond.signal
end
- }
+ end
end
#
@@ -187,23 +184,26 @@ class Queue
# thread isn't suspended, and an exception is raised.
#
def pop(non_block=false)
- @mutex.synchronize{
- begin
+ Thread.async_interrupt_timing(StandardError => :on_blocking) do
+ @mutex.synchronize do
while true
if @que.empty?
- raise ThreadError, "queue empty" if non_block
- # @waiting.include? check is necessary for avoiding a race against
- # Thread.wakeup [Bug 5195]
- @waiting.push Thread.current unless @waiting.include?(Thread.current)
- @mutex.sleep
+ if non_block
+ raise ThreadError, "queue empty"
+ else
+ begin
+ @num_waiting += 1
+ @cond.wait @mutex
+ ensure
+ @num_waiting -= 1
+ end
+ end
else
return @que.shift
end
end
- ensure
- @waiting.delete(Thread.current)
end
- }
+ end
end
#
@@ -246,7 +246,7 @@ class Queue
# Returns the number of threads waiting on the queue.
#
def num_waiting
- @waiting.size
+ @num_waiting
end
end
@@ -263,8 +263,8 @@ class SizedQueue < Queue
def initialize(max)
raise ArgumentError, "queue size must be positive" unless max > 0
@max = max
- @queue_wait = []
- @queue_wait.taint # enable tainted comunication
+ @enque_cond = ConditionVariable.new
+ @num_enqueue_waiting = 0
super()
end
@@ -280,22 +280,15 @@ class SizedQueue < Queue
#
def max=(max)
raise ArgumentError, "queue size must be positive" unless max > 0
- diff = nil
- @mutex.synchronize {
+
+ @mutex.synchronize do
if max <= @max
@max = max
else
diff = max - @max
@max = max
- end
- }
- if diff
- diff.times do
- begin
- t = @queue_wait.shift
- t.run if t
- rescue ThreadError
- retry
+ diff.times do
+ @enque_cond.signal
end
end
end
@@ -307,25 +300,22 @@ class SizedQueue < Queue
# until space becomes available.
#
def push(obj)
- @mutex.synchronize{
- begin
+ Thread.async_interrupt_timing(RuntimeError => :on_blocking) do
+ @mutex.synchronize do
while true
break if @que.length < @max
- @queue_wait.push Thread.current unless @queue_wait.include?(Thread.current)
- @mutex.sleep
+ @num_enqueue_waiting += 1
+ begin
+ @enque_cond.wait @mutex
+ ensure
+ @num_enqueue_waiting -= 1
+ end
end
- ensure
- @queue_wait.delete(Thread.current)
- end
- @que.push obj
- begin
- t = @waiting.shift
- t.wakeup if t
- rescue ThreadError
- retry
+ @que.push obj
+ @cond.signal
end
- }
+ end
end
#
@@ -343,16 +333,11 @@ class SizedQueue < Queue
#
def pop(*args)
retval = super
- @mutex.synchronize {
+ @mutex.synchronize do
if @que.length < @max
- begin
- t = @queue_wait.shift
- t.wakeup if t
- rescue ThreadError
- retry
- end
+ @enque_cond.signal
end
- }
+ end
retval
end
@@ -370,7 +355,7 @@ class SizedQueue < Queue
# Returns the number of threads waiting on the queue.
#
def num_waiting
- @waiting.size + @queue_wait.size
+ @num_waiting + @num_enqueue_waiting
end
end