From b5444a3e50524c703062865badce8124f8957c48 Mon Sep 17 00:00:00 2001 From: Andre Arko Date: Tue, 22 Jul 2014 22:01:03 -0700 Subject: Refactor workers completely. Since we no longer have multiple types of workers, it was possible to collapse the entire set of worker classes into a single Worker class. While I was in there, I tried to simplify the structure of the worker by breaking out individual tasks into methods. I have no idea if this actually works in complex cases in the real world, but it worked for me to install some gemfiles with threads. --- lib/bundler/worker.rb | 71 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 71 insertions(+) create mode 100644 lib/bundler/worker.rb (limited to 'lib/bundler/worker.rb') diff --git a/lib/bundler/worker.rb b/lib/bundler/worker.rb new file mode 100644 index 00000000..3922a576 --- /dev/null +++ b/lib/bundler/worker.rb @@ -0,0 +1,71 @@ +module Bundler + class Worker + POISON = Object.new + + class WrappedException < StandardError + attr_reader :exception + def initialize(exn) + @exception = exn + end + end + + # Creates a worker pool of specified size + # + # @param size [Integer] Size of pool + # @param func [Proc] job to run in inside the worker pool + def initialize(size, func) + @request_queue = Queue.new + @response_queue = Queue.new + @func = func + @threads = size.times.map { |i| Thread.start { process_queue(i) } } + trap("INT") { abort_threads } + end + + # Enqueue a request to be executed in the worker pool + # + # @param obj [String] mostly it is name of spec that should be downloaded + def enq(obj) + @request_queue.enq obj + end + + # Retrieves results of job function being executed in worker pool + def deq + result = @response_queue.deq + raise result.exception if result.is_a?(WrappedException) + result + end + + def stop + stop_threads + end + + private + + def process_queue(i) + loop do + obj = @request_queue.deq + break if obj.equal? POISON + @response_queue.enq apply_func(obj, i) + end + end + + def apply_func(obj, i) + @func.call(obj, i) + rescue Exception => e + WrappedException.new(e) + end + + # Stop the worker threads by sending a poison object down the request queue + # so as worker threads after retrieving it, shut themselves down + def stop_threads + @threads.each { @request_queue.enq POISON } + @threads.each { |thread| thread.join } + end + + def abort_threads + @threads.each {|i| i.exit } + exit 1 + end + + end +end -- cgit v1.2.3