aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkosaki <kosaki@b2dd03c8-39d4-4d8f-98ff-823fe69b080e>2012-11-30 18:55:09 +0000
committerkosaki <kosaki@b2dd03c8-39d4-4d8f-98ff-823fe69b080e>2012-11-30 18:55:09 +0000
commit3357d88ada8ad44ecf267f5f2a1fb1df4e5cb8f4 (patch)
tree6053a9051f0bed2ba5ba52adaab996817eb35bb4
parente742a446c8864e3988537d06022312c708f9e176 (diff)
downloadruby-3357d88ada8ad44ecf267f5f2a1fb1df4e5cb8f4.tar.gz
* lib/thread.rb (SizedQueue#pop): rewrite by using ConditionVariable.
* lib/thread.rb (SizedQueue#push): ditto. * lib/thread.rb (SizedQueue#max): ditto. * lib/thread.rb (Queue#pop): ditto. * lib/thread.rb (Queue#push): ditto. * lib/thread.rb (SizedQueue#num_waiting): adopt the above changes. * lib/thread.rb (SizedQueue#initialize): ditto. * lib/thread.rb (Queue#num_waiting): ditto. * lib/thread.rb (Queue#initialize): ditto. * test/thread/test_queue.rb (test_sized_queue_and_wakeup): ditto. git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@38087 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
-rw-r--r--ChangeLog14
-rw-r--r--lib/thread.rb99
-rw-r--r--test/thread/test_queue.rb3
3 files changed, 58 insertions, 58 deletions
diff --git a/ChangeLog b/ChangeLog
index 7fe367b06f..7f3caf747a 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,17 @@
+Sat Dec 1 03:29:52 2012 KOSAKI Motohiro <kosaki.motohiro@gmail.com>
+
+ * lib/thread.rb (SizedQueue#pop): rewrite by using ConditionVariable.
+ * lib/thread.rb (SizedQueue#push): ditto.
+ * lib/thread.rb (SizedQueue#max): ditto.
+ * lib/thread.rb (Queue#pop): ditto.
+ * lib/thread.rb (Queue#push): ditto.
+
+ * lib/thread.rb (SizedQueue#num_waiting): adopt the above changes.
+ * lib/thread.rb (SizedQueue#initialize): ditto.
+ * lib/thread.rb (Queue#num_waiting): ditto.
+ * lib/thread.rb (Queue#initialize): ditto.
+ * test/thread/test_queue.rb (test_sized_queue_and_wakeup): ditto.
+
Sat Dec 1 03:45:47 2012 Koichi Sasada <ko1@atdot.net>
* thread.c (Thread.async_interrupt_timing): fix RDoc.
diff --git a/lib/thread.rb b/lib/thread.rb
index f3dc9b15f9..e3aeed19fe 100644
--- a/lib/thread.rb
+++ b/lib/thread.rb
@@ -149,26 +149,23 @@ class Queue
#
def initialize
@que = []
- @waiting = []
@que.taint # enable tainted communication
- @waiting.taint
+ @num_waiting = 0
self.taint
@mutex = Mutex.new
+ @cond = ConditionVariable.new
end
#
# Pushes +obj+ to the queue.
#
def push(obj)
- @mutex.synchronize{
- @que.push obj
- begin
- t = @waiting.shift
- t.wakeup if t
- rescue ThreadError
- retry
+ Thread.async_interrupt_timing(StandardError => :on_blocking) do
+ @mutex.synchronize do
+ @que.push obj
+ @cond.signal
end
- }
+ end
end
#
@@ -187,23 +184,26 @@ class Queue
# thread isn't suspended, and an exception is raised.
#
def pop(non_block=false)
- @mutex.synchronize{
- begin
+ Thread.async_interrupt_timing(StandardError => :on_blocking) do
+ @mutex.synchronize do
while true
if @que.empty?
- raise ThreadError, "queue empty" if non_block
- # @waiting.include? check is necessary for avoiding a race against
- # Thread.wakeup [Bug 5195]
- @waiting.push Thread.current unless @waiting.include?(Thread.current)
- @mutex.sleep
+ if non_block
+ raise ThreadError, "queue empty"
+ else
+ begin
+ @num_waiting += 1
+ @cond.wait @mutex
+ ensure
+ @num_waiting -= 1
+ end
+ end
else
return @que.shift
end
end
- ensure
- @waiting.delete(Thread.current)
end
- }
+ end
end
#
@@ -246,7 +246,7 @@ class Queue
# Returns the number of threads waiting on the queue.
#
def num_waiting
- @waiting.size
+ @num_waiting
end
end
@@ -263,8 +263,8 @@ class SizedQueue < Queue
def initialize(max)
raise ArgumentError, "queue size must be positive" unless max > 0
@max = max
- @queue_wait = []
- @queue_wait.taint # enable tainted comunication
+ @enque_cond = ConditionVariable.new
+ @num_enqueue_waiting = 0
super()
end
@@ -280,22 +280,15 @@ class SizedQueue < Queue
#
def max=(max)
raise ArgumentError, "queue size must be positive" unless max > 0
- diff = nil
- @mutex.synchronize {
+
+ @mutex.synchronize do
if max <= @max
@max = max
else
diff = max - @max
@max = max
- end
- }
- if diff
- diff.times do
- begin
- t = @queue_wait.shift
- t.run if t
- rescue ThreadError
- retry
+ diff.times do
+ @enque_cond.signal
end
end
end
@@ -307,25 +300,22 @@ class SizedQueue < Queue
# until space becomes available.
#
def push(obj)
- @mutex.synchronize{
- begin
+ Thread.async_interrupt_timing(RuntimeError => :on_blocking) do
+ @mutex.synchronize do
while true
break if @que.length < @max
- @queue_wait.push Thread.current unless @queue_wait.include?(Thread.current)
- @mutex.sleep
+ @num_enqueue_waiting += 1
+ begin
+ @enque_cond.wait @mutex
+ ensure
+ @num_enqueue_waiting -= 1
+ end
end
- ensure
- @queue_wait.delete(Thread.current)
- end
- @que.push obj
- begin
- t = @waiting.shift
- t.wakeup if t
- rescue ThreadError
- retry
+ @que.push obj
+ @cond.signal
end
- }
+ end
end
#
@@ -343,16 +333,11 @@ class SizedQueue < Queue
#
def pop(*args)
retval = super
- @mutex.synchronize {
+ @mutex.synchronize do
if @que.length < @max
- begin
- t = @queue_wait.shift
- t.wakeup if t
- rescue ThreadError
- retry
- end
+ @enque_cond.signal
end
- }
+ end
retval
end
@@ -370,7 +355,7 @@ class SizedQueue < Queue
# Returns the number of threads waiting on the queue.
#
def num_waiting
- @waiting.size + @queue_wait.size
+ @num_waiting + @num_enqueue_waiting
end
end
diff --git a/test/thread/test_queue.rb b/test/thread/test_queue.rb
index 84e60d6b81..b6fbbaeb33 100644
--- a/test/thread/test_queue.rb
+++ b/test/thread/test_queue.rb
@@ -69,7 +69,8 @@ class TestQueue < Test::Unit::TestCase
t2 = Thread.start { sq.push(2) }
sleep 0.1 until t1.stop? && t2.stop?
- queue_wait = sq.instance_eval{ @queue_wait }
+ enque_cond = sq.instance_eval{ @enque_cond }
+ queue_wait = enque_cond.instance_eval { @waiters }
assert_equal(queue_wait.uniq, queue_wait)
end