aboutsummaryrefslogtreecommitdiffstats
path: root/lib/bundler/worker.rb
diff options
context:
space:
mode:
authorAndre Arko <andre@arko.net>2014-07-22 22:01:03 -0700
committerAndre Arko <andre@arko.net>2014-07-22 22:01:04 -0700
commitb5444a3e50524c703062865badce8124f8957c48 (patch)
treed45f8c407e282f7bfd415fdd9e34331c5c1199ee /lib/bundler/worker.rb
parentfb65ebe5aec8f67617031a0f1232b8ee5744411b (diff)
downloadbundler-b5444a3e50524c703062865badce8124f8957c48.tar.gz
Refactor workers completely.
Since we no longer have multiple types of workers, it was possible to collapse the entire set of worker classes into a single Worker class. While I was in there, I tried to simplify the structure of the worker by breaking out individual tasks into methods. I have no idea if this actually works in complex cases in the real world, but it worked for me to install some gemfiles with threads.
Diffstat (limited to 'lib/bundler/worker.rb')
-rw-r--r--lib/bundler/worker.rb71
1 files changed, 71 insertions, 0 deletions
diff --git a/lib/bundler/worker.rb b/lib/bundler/worker.rb
new file mode 100644
index 00000000..3922a576
--- /dev/null
+++ b/lib/bundler/worker.rb
@@ -0,0 +1,71 @@
+module Bundler
+ class Worker
+ POISON = Object.new
+
+ class WrappedException < StandardError
+ attr_reader :exception
+ def initialize(exn)
+ @exception = exn
+ end
+ end
+
+ # Creates a worker pool of specified size
+ #
+ # @param size [Integer] Size of pool
+ # @param func [Proc] job to run in inside the worker pool
+ def initialize(size, func)
+ @request_queue = Queue.new
+ @response_queue = Queue.new
+ @func = func
+ @threads = size.times.map { |i| Thread.start { process_queue(i) } }
+ trap("INT") { abort_threads }
+ end
+
+ # Enqueue a request to be executed in the worker pool
+ #
+ # @param obj [String] mostly it is name of spec that should be downloaded
+ def enq(obj)
+ @request_queue.enq obj
+ end
+
+ # Retrieves results of job function being executed in worker pool
+ def deq
+ result = @response_queue.deq
+ raise result.exception if result.is_a?(WrappedException)
+ result
+ end
+
+ def stop
+ stop_threads
+ end
+
+ private
+
+ def process_queue(i)
+ loop do
+ obj = @request_queue.deq
+ break if obj.equal? POISON
+ @response_queue.enq apply_func(obj, i)
+ end
+ end
+
+ def apply_func(obj, i)
+ @func.call(obj, i)
+ rescue Exception => e
+ WrappedException.new(e)
+ end
+
+ # Stop the worker threads by sending a poison object down the request queue
+ # so as worker threads after retrieving it, shut themselves down
+ def stop_threads
+ @threads.each { @request_queue.enq POISON }
+ @threads.each { |thread| thread.join }
+ end
+
+ def abort_threads
+ @threads.each {|i| i.exit }
+ exit 1
+ end
+
+ end
+end