aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorKazuki Yamaguchi <k@rhe.jp>2016-01-13 23:00:51 +0900
committerKazuki Yamaguchi <k@rhe.jp>2016-01-13 23:00:51 +0900
commit6c1d0baf6f7f80d0cc18865213e0b1ad02118510 (patch)
tree80d1c116d2b25bbefe47856de5b0ec822206c3c2
parent0ec62f21412dfadf95f9abca39b8d6f88c720e2e (diff)
downloadplum-6c1d0baf6f7f80d0cc18865213e0b1ad02118510.tar.gz
rack: add threadpool-size option
-rw-r--r--lib/plum/rack.rb1
-rw-r--r--lib/plum/rack/cli.rb5
-rw-r--r--lib/plum/rack/config.rb3
-rw-r--r--lib/plum/rack/dsl.rb4
-rw-r--r--lib/plum/rack/server.rb6
-rw-r--r--lib/plum/rack/session.rb27
-rw-r--r--lib/plum/rack/thread_pool.rb56
7 files changed, 92 insertions, 10 deletions
diff --git a/lib/plum/rack.rb b/lib/plum/rack.rb
index d38aa4a..923389b 100644
--- a/lib/plum/rack.rb
+++ b/lib/plum/rack.rb
@@ -8,3 +8,4 @@ require "plum/rack/dsl"
require "plum/rack/listener"
require "plum/rack/server"
require "plum/rack/session"
+require "plum/rack/thread_pool"
diff --git a/lib/plum/rack/cli.rb b/lib/plum/rack/cli.rb
index 3ee50fe..9d8a03d 100644
--- a/lib/plum/rack/cli.rb
+++ b/lib/plum/rack/cli.rb
@@ -45,6 +45,7 @@ module Plum
config[:debug] = @options[:debug] unless @options[:debug].nil?
config[:server_push] = @options[:server_push] unless @options[:server_push].nil?
config[:threaded] = @options[:threaded] unless @options[:threaded].nil?
+ config[:threadpool_size] = @options[:threadpool_size] unless @options[:threadpool_size].nil?
if @options[:fallback_legacy]
h, p = @options[:fallback_legacy].split(":")
@@ -124,6 +125,10 @@ module Plum
@options[:threaded] = true
end
+ o.on "--threadpool-size SIZE", "Set the size of thread pool" do |arg|
+ @options[:threadpool_size] = arg.to_i
+ end
+
o.on "--fallback-legacy HOST:PORT", "Fallbacks if the client doesn't support HTTP/2" do |arg|
@options[:fallback_legacy] = arg
end
diff --git a/lib/plum/rack/config.rb b/lib/plum/rack/config.rb
index 22e0691..a5a651f 100644
--- a/lib/plum/rack/config.rb
+++ b/lib/plum/rack/config.rb
@@ -7,7 +7,8 @@ module Plum
debug: false,
log: nil, # $stdout
server_push: true,
- threaded: false
+ threaded: false,
+ threadpool_size: 20,
}.freeze
def initialize(config = {})
diff --git a/lib/plum/rack/dsl.rb b/lib/plum/rack/dsl.rb
index 006d6dc..1b46360 100644
--- a/lib/plum/rack/dsl.rb
+++ b/lib/plum/rack/dsl.rb
@@ -44,6 +44,10 @@ module Plum
@config[:threaded] = !!bool
end
+ def threadpool_size(int)
+ @config[:threadpool_size] = int.to_i
+ end
+
def fallback_legacy(str)
h, p = str.split(":")
@config[:fallback_legacy_host] = h
diff --git a/lib/plum/rack/server.rb b/lib/plum/rack/server.rb
index 9a0caf4..3bd48ff 100644
--- a/lib/plum/rack/server.rb
+++ b/lib/plum/rack/server.rb
@@ -14,6 +14,9 @@ module Plum
@listeners = config[:listeners].map { |lc|
lc[:listener].new(lc)
}
+ if @config[:threaded]
+ @threadpool = ThreadPool.new(@config[:threadpool_size])
+ end
@logger.info("Plum #{::Plum::VERSION}")
@logger.info("Config: #{config}")
@@ -60,7 +63,8 @@ module Plum
sock: sock,
logger: @logger,
config: @config,
- remote_addr: sock.peeraddr.last)
+ remote_addr: sock.peeraddr.last,
+ threadpool: @threadpool)
con.run
rescue ::Plum::LegacyHTTPError => e
@logger.info "legacy HTTP client: #{e}"
diff --git a/lib/plum/rack/session.rb b/lib/plum/rack/session.rb
index 151a2f0..c40f609 100644
--- a/lib/plum/rack/session.rb
+++ b/lib/plum/rack/session.rb
@@ -8,14 +8,15 @@ module Plum
class Session
attr_reader :app, :plum
- def initialize(app:, plum:, sock:, logger:, config:, remote_addr: "127.0.0.1")
+ def initialize(app:, plum:, sock:, logger:, config:, remote_addr:, threadpool:)
@app = app
@plum = plum
@sock = sock
@logger = logger
@config = config
@remote_addr = remote_addr
- @request_thread = {} # used if threaded
+ @threadpool = threadpool
+ @request_tokens = Set.new
setup_plum
end
@@ -29,7 +30,7 @@ module Plum
@plum << @sock.readpartial(1024)
end
ensure
- @request_thread.each { |stream, thread| thread.kill }
+ @request_tokens.each { |token| @threadpool.cancel(token) } if @threadpool
stop
end
@@ -57,12 +58,22 @@ module Plum
}
@plum.on(:end_stream) { |stream|
- if @config[:threaded]
- @request_thread[stream] = Thread.new {
- handle_request(stream, reqs[stream][:headers], reqs[stream][:data])
+ req = reqs.delete(stream)
+ err = proc { |err|
+ stream.send_headers({ ":status" => 500 }, end_stream: true)
+ @logger.error(err)
+ }
+ if @threadpool
+ headers = req[:headers]
+ @request_tokens << @threadpool.acquire(headers, err) {
+ handle_request(stream, headers, req[:data])
}
else
- handle_request(stream, reqs[stream][:headers], reqs[stream][:data])
+ begin
+ handle_request(stream, req[:headers], req[:data])
+ rescue
+ err.call($!)
+ end
end
}
end
@@ -139,7 +150,7 @@ module Plum
}
end
- @request_thread.delete(stream)
+ @request_tokens.delete(headers) # headers is tag
end
def new_env(h, data)
diff --git a/lib/plum/rack/thread_pool.rb b/lib/plum/rack/thread_pool.rb
new file mode 100644
index 0000000..7c3fc48
--- /dev/null
+++ b/lib/plum/rack/thread_pool.rb
@@ -0,0 +1,56 @@
+# -*- frozen-string-literal: true -*-
+module Plum
+ module Rack
+ class ThreadPool
+ def initialize(size = 20)
+ @workers = Set.new
+ @jobs = Queue.new
+ @tags = {}
+ @cancels = Set.new
+ @mutex = Mutex.new
+
+ size.times { |i|
+ spawn_worker
+ }
+ end
+
+ # returns cancel token
+ def acquire(tag = nil, err = nil, &blk)
+ @jobs << [blk, err, tag]
+ tag
+ end
+
+ def cancel(tag)
+ worker = @mutex.synchronize { @tags.delete?(tag) || (@cancels << tag; return) }
+ @workers.delete(worker)
+ worker.kill
+ spawn_worker
+ end
+
+ private
+ def spawn_worker
+ t = Thread.new {
+ while true
+ job, err, tag = @jobs.pop
+ if tag
+ next if @mutex.synchronize {
+ c = @cancels.delete?(tag)
+ @tags[tag] = self unless c
+ c
+ }
+ end
+
+ begin
+ job.call
+ rescue => e
+ err << e if err
+ end
+
+ @mutex.synchronize { @tags.delete(tag) } if tag
+ end
+ }
+ @workers << t
+ end
+ end
+ end
+end