From 6c1d0baf6f7f80d0cc18865213e0b1ad02118510 Mon Sep 17 00:00:00 2001 From: Kazuki Yamaguchi Date: Wed, 13 Jan 2016 23:00:51 +0900 Subject: rack: add threadpool-size option --- lib/plum/rack.rb | 1 + lib/plum/rack/cli.rb | 5 ++++ lib/plum/rack/config.rb | 3 ++- lib/plum/rack/dsl.rb | 4 ++++ lib/plum/rack/server.rb | 6 ++++- lib/plum/rack/session.rb | 27 ++++++++++++++------- lib/plum/rack/thread_pool.rb | 56 ++++++++++++++++++++++++++++++++++++++++++++ 7 files changed, 92 insertions(+), 10 deletions(-) create mode 100644 lib/plum/rack/thread_pool.rb (limited to 'lib') diff --git a/lib/plum/rack.rb b/lib/plum/rack.rb index d38aa4a..923389b 100644 --- a/lib/plum/rack.rb +++ b/lib/plum/rack.rb @@ -8,3 +8,4 @@ require "plum/rack/dsl" require "plum/rack/listener" require "plum/rack/server" require "plum/rack/session" +require "plum/rack/thread_pool" diff --git a/lib/plum/rack/cli.rb b/lib/plum/rack/cli.rb index 3ee50fe..9d8a03d 100644 --- a/lib/plum/rack/cli.rb +++ b/lib/plum/rack/cli.rb @@ -45,6 +45,7 @@ module Plum config[:debug] = @options[:debug] unless @options[:debug].nil? config[:server_push] = @options[:server_push] unless @options[:server_push].nil? config[:threaded] = @options[:threaded] unless @options[:threaded].nil? + config[:threadpool_size] = @options[:threadpool_size] unless @options[:threadpool_size].nil? if @options[:fallback_legacy] h, p = @options[:fallback_legacy].split(":") @@ -124,6 +125,10 @@ module Plum @options[:threaded] = true end + o.on "--threadpool-size SIZE", "Set the size of thread pool" do |arg| + @options[:threadpool_size] = arg.to_i + end + o.on "--fallback-legacy HOST:PORT", "Fallbacks if the client doesn't support HTTP/2" do |arg| @options[:fallback_legacy] = arg end diff --git a/lib/plum/rack/config.rb b/lib/plum/rack/config.rb index 22e0691..a5a651f 100644 --- a/lib/plum/rack/config.rb +++ b/lib/plum/rack/config.rb @@ -7,7 +7,8 @@ module Plum debug: false, log: nil, # $stdout server_push: true, - threaded: false + threaded: false, + threadpool_size: 20, }.freeze def initialize(config = {}) diff --git a/lib/plum/rack/dsl.rb b/lib/plum/rack/dsl.rb index 006d6dc..1b46360 100644 --- a/lib/plum/rack/dsl.rb +++ b/lib/plum/rack/dsl.rb @@ -44,6 +44,10 @@ module Plum @config[:threaded] = !!bool end + def threadpool_size(int) + @config[:threadpool_size] = int.to_i + end + def fallback_legacy(str) h, p = str.split(":") @config[:fallback_legacy_host] = h diff --git a/lib/plum/rack/server.rb b/lib/plum/rack/server.rb index 9a0caf4..3bd48ff 100644 --- a/lib/plum/rack/server.rb +++ b/lib/plum/rack/server.rb @@ -14,6 +14,9 @@ module Plum @listeners = config[:listeners].map { |lc| lc[:listener].new(lc) } + if @config[:threaded] + @threadpool = ThreadPool.new(@config[:threadpool_size]) + end @logger.info("Plum #{::Plum::VERSION}") @logger.info("Config: #{config}") @@ -60,7 +63,8 @@ module Plum sock: sock, logger: @logger, config: @config, - remote_addr: sock.peeraddr.last) + remote_addr: sock.peeraddr.last, + threadpool: @threadpool) con.run rescue ::Plum::LegacyHTTPError => e @logger.info "legacy HTTP client: #{e}" diff --git a/lib/plum/rack/session.rb b/lib/plum/rack/session.rb index 151a2f0..c40f609 100644 --- a/lib/plum/rack/session.rb +++ b/lib/plum/rack/session.rb @@ -8,14 +8,15 @@ module Plum class Session attr_reader :app, :plum - def initialize(app:, plum:, sock:, logger:, config:, remote_addr: "127.0.0.1") + def initialize(app:, plum:, sock:, logger:, config:, remote_addr:, threadpool:) @app = app @plum = plum @sock = sock @logger = logger @config = config @remote_addr = remote_addr - @request_thread = {} # used if threaded + @threadpool = threadpool + @request_tokens = Set.new setup_plum end @@ -29,7 +30,7 @@ module Plum @plum << @sock.readpartial(1024) end ensure - @request_thread.each { |stream, thread| thread.kill } + @request_tokens.each { |token| @threadpool.cancel(token) } if @threadpool stop end @@ -57,12 +58,22 @@ module Plum } @plum.on(:end_stream) { |stream| - if @config[:threaded] - @request_thread[stream] = Thread.new { - handle_request(stream, reqs[stream][:headers], reqs[stream][:data]) + req = reqs.delete(stream) + err = proc { |err| + stream.send_headers({ ":status" => 500 }, end_stream: true) + @logger.error(err) + } + if @threadpool + headers = req[:headers] + @request_tokens << @threadpool.acquire(headers, err) { + handle_request(stream, headers, req[:data]) } else - handle_request(stream, reqs[stream][:headers], reqs[stream][:data]) + begin + handle_request(stream, req[:headers], req[:data]) + rescue + err.call($!) + end end } end @@ -139,7 +150,7 @@ module Plum } end - @request_thread.delete(stream) + @request_tokens.delete(headers) # headers is tag end def new_env(h, data) diff --git a/lib/plum/rack/thread_pool.rb b/lib/plum/rack/thread_pool.rb new file mode 100644 index 0000000..7c3fc48 --- /dev/null +++ b/lib/plum/rack/thread_pool.rb @@ -0,0 +1,56 @@ +# -*- frozen-string-literal: true -*- +module Plum + module Rack + class ThreadPool + def initialize(size = 20) + @workers = Set.new + @jobs = Queue.new + @tags = {} + @cancels = Set.new + @mutex = Mutex.new + + size.times { |i| + spawn_worker + } + end + + # returns cancel token + def acquire(tag = nil, err = nil, &blk) + @jobs << [blk, err, tag] + tag + end + + def cancel(tag) + worker = @mutex.synchronize { @tags.delete?(tag) || (@cancels << tag; return) } + @workers.delete(worker) + worker.kill + spawn_worker + end + + private + def spawn_worker + t = Thread.new { + while true + job, err, tag = @jobs.pop + if tag + next if @mutex.synchronize { + c = @cancels.delete?(tag) + @tags[tag] = self unless c + c + } + end + + begin + job.call + rescue => e + err << e if err + end + + @mutex.synchronize { @tags.delete(tag) } if tag + end + } + @workers << t + end + end + end +end -- cgit v1.2.3