aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorKazuki Yamaguchi <k@rhe.jp>2015-11-06 11:33:10 +0900
committerKazuki Yamaguchi <k@rhe.jp>2015-11-06 11:33:10 +0900
commit0e9f859c18d78a3c34d493e9d673e06ab10c311a (patch)
tree6dfb522d15935be3c494086dbe499dad58cd7fe5
parent33125477f3c3afa082fdef9d29a8ef36a71bb172 (diff)
downloadplum-0e9f859c18d78a3c34d493e9d673e06ab10c311a.tar.gz
rack/server: add threaded mode (run Rack app in threads)
-rw-r--r--lib/plum/rack/cli.rb5
-rw-r--r--lib/plum/rack/config.rb3
-rw-r--r--lib/plum/rack/dsl.rb4
-rw-r--r--lib/plum/rack/server.rb2
-rw-r--r--lib/plum/rack/session.rb34
5 files changed, 33 insertions, 15 deletions
diff --git a/lib/plum/rack/cli.rb b/lib/plum/rack/cli.rb
index 18fe90f..b145c59 100644
--- a/lib/plum/rack/cli.rb
+++ b/lib/plum/rack/cli.rb
@@ -44,6 +44,7 @@ module Plum
ENV["RACK_ENV"] = @options[:env] if @options[:env]
config[:debug] = @options[:debug] unless @options[:debug].nil?
config[:server_push] = @options[:server_push] unless @options[:server_push].nil?
+ config[:threaded] = @options[:threaded] unless @options[:threaded].nil?
if @options[:socket]
config[:listeners] << { listener: UNIXListener,
@@ -113,6 +114,10 @@ module Plum
@options[:key] = arg
end
+ o.on "--threaded", "Call the Rack application in threads (experimental)" do
+ @options[:threaded] = true
+ end
+
o.on "-v", "--version", "Show version" do
puts "plum version #{::Plum::VERSION}"
exit(0)
diff --git a/lib/plum/rack/config.rb b/lib/plum/rack/config.rb
index b75fd08..22e0691 100644
--- a/lib/plum/rack/config.rb
+++ b/lib/plum/rack/config.rb
@@ -6,7 +6,8 @@ module Plum
listeners: [],
debug: false,
log: nil, # $stdout
- server_push: true
+ server_push: true,
+ threaded: false
}.freeze
def initialize(config = {})
diff --git a/lib/plum/rack/dsl.rb b/lib/plum/rack/dsl.rb
index eb2ba17..6e8dda6 100644
--- a/lib/plum/rack/dsl.rb
+++ b/lib/plum/rack/dsl.rb
@@ -39,6 +39,10 @@ module Plum
def server_push(bool)
@config[:server_push] = !!bool
end
+
+ def threaded(bool)
+ @config[:threaded] = !!bool
+ end
end
end
end
diff --git a/lib/plum/rack/server.rb b/lib/plum/rack/server.rb
index 37ae166..26ead2a 100644
--- a/lib/plum/rack/server.rb
+++ b/lib/plum/rack/server.rb
@@ -54,7 +54,7 @@ module Plum
plum: plum,
sock: sock,
logger: @logger,
- server_push: @config[:server_push],
+ config: @config,
remote_addr: sock.peeraddr.last)
con.run
rescue Errno::ECONNRESET, Errno::ECONNABORTED, Errno::EPROTO, Errno::EINVAL => e # closed
diff --git a/lib/plum/rack/session.rb b/lib/plum/rack/session.rb
index d0aedc5..0d7023e 100644
--- a/lib/plum/rack/session.rb
+++ b/lib/plum/rack/session.rb
@@ -8,13 +8,14 @@ module Plum
class Session
attr_reader :app, :plum
- def initialize(app:, plum:, sock:, logger:, server_push: true, remote_addr: "127.0.0.1")
+ def initialize(app:, plum:, sock:, logger:, config:, remote_addr: "127.0.0.1")
@app = app
@plum = plum
@sock = sock
@logger = logger
- @server_push = server_push
+ @config = config
@remote_addr = remote_addr
+ @request_thread = {} # used if threaded
setup_plum
end
@@ -24,14 +25,15 @@ module Plum
end
def run
- begin
- while !@sock.closed? && !@sock.eof?
- @plum << @sock.readpartial(1024)
- end
- rescue Errno::EPIPE, Errno::ECONNRESET => e
- rescue StandardError => e
- @logger.error("#{e.class}: #{e.message}\n#{e.backtrace.map { |b| "\t#{b}" }.join("\n")}")
+ while !@sock.closed? && !@sock.eof?
+ @plum << @sock.readpartial(1024)
end
+ rescue Errno::EPIPE, Errno::ECONNRESET => e
+ rescue StandardError => e
+ @logger.error("#{e.class}: #{e.message}\n#{e.backtrace.map { |b| "\t#{b}" }.join("\n")}")
+ ensure
+ @request_thread.each { |stream, thread| thread.kill }
+ stop
end
private
@@ -51,7 +53,13 @@ module Plum
}
@plum.on(:end_stream) { |stream|
- handle_request(stream, reqs[stream][:headers], reqs[stream][:data])
+ if @config[:threaded]
+ @request_thread[stream] = Thread.new {
+ handle_request(stream, reqs[stream][:headers], reqs[stream][:data])
+ }
+ else
+ handle_request(stream, reqs[stream][:headers], reqs[stream][:data])
+ end
}
end
@@ -77,9 +85,7 @@ module Plum
end
def extract_push(reqheaders, extheaders)
- if @server_push &&
- @plum.push_enabled? &&
- pushs = extheaders["plum.serverpush"]
+ if @config[:server_push] && @plum.push_enabled? && pushs = extheaders["plum.serverpush"]
authority = reqheaders.find { |k, v| k == ":authority" }[1]
scheme = reqheaders.find { |k, v| k == ":scheme" }[1]
@@ -117,6 +123,8 @@ module Plum
st.send_headers(p_headers, end_stream: false)
send_body(st, p_body)
}
+
+ @request_thread.delete(stream)
end
def new_env(h, data)