aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorKazuki Yamaguchi <k@rhe.jp>2016-01-14 12:46:28 +0900
committerKazuki Yamaguchi <k@rhe.jp>2016-01-14 12:46:28 +0900
commit6f2e135321b09efa6958c477e6729a93d1700e1f (patch)
treed818fec74d6d4bbcefe5de2c5212c6ebb0642f6f
parent8ab57c01c62622249c51291d01185df53d2d7a9b (diff)
parent65ba41222eb0caf8d882c6fa2e202b313ef2801e (diff)
downloadplum-6f2e135321b09efa6958c477e6729a93d1700e1f.tar.gz
Merge branch 'threaded'
-rw-r--r--lib/plum/rack.rb2
-rw-r--r--lib/plum/rack/cli.rb5
-rw-r--r--lib/plum/rack/config.rb3
-rw-r--r--lib/plum/rack/dsl.rb4
-rw-r--r--lib/plum/rack/legacy_session.rb36
-rw-r--r--lib/plum/rack/listener.rb65
-rw-r--r--lib/plum/rack/server.rb85
-rw-r--r--lib/plum/rack/session.rb39
-rw-r--r--lib/plum/rack/thread_pool.rb35
9 files changed, 184 insertions, 90 deletions
diff --git a/lib/plum/rack.rb b/lib/plum/rack.rb
index d38aa4a..a9d2c66 100644
--- a/lib/plum/rack.rb
+++ b/lib/plum/rack.rb
@@ -8,3 +8,5 @@ require "plum/rack/dsl"
require "plum/rack/listener"
require "plum/rack/server"
require "plum/rack/session"
+require "plum/rack/legacy_session"
+require "plum/rack/thread_pool"
diff --git a/lib/plum/rack/cli.rb b/lib/plum/rack/cli.rb
index 3ee50fe..9d8a03d 100644
--- a/lib/plum/rack/cli.rb
+++ b/lib/plum/rack/cli.rb
@@ -45,6 +45,7 @@ module Plum
config[:debug] = @options[:debug] unless @options[:debug].nil?
config[:server_push] = @options[:server_push] unless @options[:server_push].nil?
config[:threaded] = @options[:threaded] unless @options[:threaded].nil?
+ config[:threadpool_size] = @options[:threadpool_size] unless @options[:threadpool_size].nil?
if @options[:fallback_legacy]
h, p = @options[:fallback_legacy].split(":")
@@ -124,6 +125,10 @@ module Plum
@options[:threaded] = true
end
+ o.on "--threadpool-size SIZE", "Set the size of thread pool" do |arg|
+ @options[:threadpool_size] = arg.to_i
+ end
+
o.on "--fallback-legacy HOST:PORT", "Fallbacks if the client doesn't support HTTP/2" do |arg|
@options[:fallback_legacy] = arg
end
diff --git a/lib/plum/rack/config.rb b/lib/plum/rack/config.rb
index 22e0691..a5a651f 100644
--- a/lib/plum/rack/config.rb
+++ b/lib/plum/rack/config.rb
@@ -7,7 +7,8 @@ module Plum
debug: false,
log: nil, # $stdout
server_push: true,
- threaded: false
+ threaded: false,
+ threadpool_size: 20,
}.freeze
def initialize(config = {})
diff --git a/lib/plum/rack/dsl.rb b/lib/plum/rack/dsl.rb
index 006d6dc..1b46360 100644
--- a/lib/plum/rack/dsl.rb
+++ b/lib/plum/rack/dsl.rb
@@ -44,6 +44,10 @@ module Plum
@config[:threaded] = !!bool
end
+ def threadpool_size(int)
+ @config[:threadpool_size] = int.to_i
+ end
+
def fallback_legacy(str)
h, p = str.split(":")
@config[:fallback_legacy_host] = h
diff --git a/lib/plum/rack/legacy_session.rb b/lib/plum/rack/legacy_session.rb
new file mode 100644
index 0000000..6712933
--- /dev/null
+++ b/lib/plum/rack/legacy_session.rb
@@ -0,0 +1,36 @@
+# -*- frozen-string-literal: true -*-
+using Plum::BinaryString
+
+module Plum
+ module Rack
+ class LegacySession
+ def initialize(svc, e, sock)
+ @svc = svc
+ @e = e
+ @sock = sock
+ @config = svc.config
+ end
+
+ def run
+ if @config[:fallback_legacy_host]
+ @logger.info "legacy HTTP: fallbacking to: #{@config[:fallback_legacy_host]}:#{@config[:fallback_legacy_port]}"
+ upstream = TCPSocket.open(@config[:fallback_legacy_host], @config[:fallback_legacy_port])
+ upstream.write(@e.buf) if @e.buf
+ loop do
+ ret = IO.select([@sock, upstream])
+ ret[0].each { |s|
+ a = s.readpartial(65536)
+ if s == upstream
+ @sock.write(a)
+ else
+ upstream.write(a)
+ end
+ }
+ end
+ end
+ ensure
+ upstream.close if upstream
+ end
+ end
+ end
+end
diff --git a/lib/plum/rack/listener.rb b/lib/plum/rack/listener.rb
index 3ff6c4a..8af12be 100644
--- a/lib/plum/rack/listener.rb
+++ b/lib/plum/rack/listener.rb
@@ -10,6 +10,10 @@ module Plum
raise "not implemented"
end
+ def accept(svc)
+ raise "not implemented"
+ end
+
def method_missing(name, *args)
@server.__send__(name, *args)
end
@@ -24,8 +28,24 @@ module Plum
@server.to_io
end
- def plum(sock)
- ::Plum::HTTPServerConnection.new(sock.method(:write))
+ def accept(svc)
+ sock = @server.accept
+ Thread.start {
+ begin
+ plum = ::Plum::HTTPServerConnection.new(sock.method(:write))
+ sess = Session.new(svc, sock, plum)
+ sess.run
+ rescue Errno::ECONNRESET, EOFError # closed
+ rescue ::Plum::LegacyHTTPError => e
+ @logger.info "legacy HTTP client: #{e}"
+ sess = LegacySession.new(svc, e, sock)
+ sess.run
+ rescue => e
+ svc.log_exception(e)
+ ensure
+ sock.close
+ end
+ }
end
end
@@ -57,7 +77,7 @@ module Plum
}
tcp_server = ::TCPServer.new(lc[:hostname], lc[:port])
@server = OpenSSL::SSL::SSLServer.new(tcp_server, ctx)
- @server.start_immediately = false
+ @server.start_immediately = false # call socket#accept twice: [tcp, tls]
end
def parse_chained_cert(str)
@@ -68,9 +88,26 @@ module Plum
@server.to_io
end
- def plum(sock)
- raise ::Plum::LegacyHTTPError.new("client didn't offer h2 with ALPN", nil) unless sock.alpn_protocol == "h2"
- ::Plum::ServerConnection.new(sock.method(:write))
+ def accept(svc)
+ sock = @server.accept
+ Thread.start {
+ begin
+ sock = sock.accept
+ raise ::Plum::LegacyHTTPError.new("client didn't offer h2 with ALPN", nil) unless sock.alpn_protocol == "h2"
+ plum = ::Plum::ServerConnection.new(sock.method(:write))
+ sess = Session.new(svc, sock, plum)
+ sess.run
+ rescue Errno::ECONNRESET, EOFError # closed
+ rescue ::Plum::LegacyHTTPError => e
+ @logger.info "legacy HTTP client: #{e}"
+ sess = LegacySession.new(svc, e, sock)
+ sess.run
+ rescue => e
+ svc.log_exception(e)
+ ensure
+ sock.close if sock
+ end
+ }
end
private
@@ -126,8 +163,20 @@ module Plum
@server.to_io
end
- def plum(sock)
- ::Plum::ServerConnection.new(sock.method(:write))
+ def accept(svc)
+ sock = @server.accept
+ Thread.start {
+ begin
+ plum = ::Plum::ServerConnection.new(sock.method(:write))
+ sess = Session.new(svc, sock, plum)
+ sess.run
+ rescue Errno::ECONNRESET, EOFError # closed
+ rescue => e
+ svc.log_exception(e)
+ ensure
+ sock.close if sock
+ end
+ }
end
end
end
diff --git a/lib/plum/rack/server.rb b/lib/plum/rack/server.rb
index 9a0caf4..9aedc20 100644
--- a/lib/plum/rack/server.rb
+++ b/lib/plum/rack/server.rb
@@ -2,18 +2,15 @@
module Plum
module Rack
class Server
- attr_reader :config
+ attr_reader :config, :app, :logger, :threadpool
def initialize(app, config)
@config = config
@state = :null
@app = config[:debug] ? ::Rack::CommonLogger.new(app) : app
- @logger = Logger.new(config[:log] || $stdout).tap { |l|
- l.level = config[:debug] ? Logger::DEBUG : Logger::INFO
- }
- @listeners = config[:listeners].map { |lc|
- lc[:listener].new(lc)
- }
+ @logger = Logger.new(config[:log] || $stdout).tap { |l| l.level = config[:debug] ? Logger::DEBUG : Logger::INFO }
+ @listeners = config[:listeners].map { |lc| lc[:listener].new(lc) }
+ @threadpool = ThreadPool.new(@config[:threadpool_size]) if @config[:threaded]
@logger.info("Plum #{::Plum::VERSION}")
@logger.info("Config: #{config}")
@@ -24,87 +21,41 @@ module Plum
end
def start
+ #trap(:INT) { @state = :ee }
+ #require "lineprof"
+ #Lineprof.profile(//){
@state = :running
- while @state == :running
- break if @listeners.empty?
+ while @state == :running && !@listeners.empty?
begin
if ss = IO.select(@listeners, nil, nil, 2.0)
ss[0].each { |svr|
- new_con(svr)
+ begin
+ svr.accept(self)
+ rescue Errno::ECONNRESET, Errno::ECONNABORTED # closed
+ rescue => e
+ log_exception(e)
+ end
}
end
- rescue Errno::EBADF, Errno::ENOTSOCK, IOError => e # closed
- rescue StandardError => e
+ rescue Errno::EBADF # closed
+ rescue => e
log_exception(e)
end
end
+ #}
end
def stop
@state = :stop
@listeners.map(&:stop)
- # TODO: gracefully shutdown connections
+ # TODO: gracefully shutdown connections (wait threadpool?)
end
private
- def new_con(svr)
- sock = svr.accept
- Thread.new {
- begin
- begin
- sock = sock.accept if sock.respond_to?(:accept)
- plum = svr.plum(sock)
-
- con = Session.new(app: @app,
- plum: plum,
- sock: sock,
- logger: @logger,
- config: @config,
- remote_addr: sock.peeraddr.last)
- con.run
- rescue ::Plum::LegacyHTTPError => e
- @logger.info "legacy HTTP client: #{e}"
- handle_legacy(e, sock)
- end
- rescue Errno::ECONNRESET, Errno::EPROTO, Errno::EINVAL, EOFError => e # closed
- rescue StandardError => e
- log_exception(e)
- ensure
- sock.close if sock
- end
- }
- rescue Errno::ECONNRESET, Errno::ECONNABORTED, Errno::EPROTO, Errno::EINVAL => e # closed
- sock.close if sock
- rescue StandardError => e
- log_exception(e)
- sock.close if sock
- end
-
def log_exception(e)
@logger.error("#{e.class}: #{e.message}\n#{e.backtrace.map { |b| "\t#{b}" }.join("\n")}")
end
- def handle_legacy(e, sock)
- if @config[:fallback_legacy_host]
- @logger.info "legacy HTTP: fallbacking to: #{@config[:fallback_legacy_host]}:#{@config[:fallback_legacy_port]}"
- upstream = TCPSocket.open(@config[:fallback_legacy_host], @config[:fallback_legacy_port])
- upstream.write(e.buf) if e.buf
- loop do
- ret = IO.select([sock, upstream])
- ret[0].each { |s|
- a = s.readpartial(65536)
- if s == upstream
- sock.write(a)
- else
- upstream.write(a)
- end
- }
- end
- end
- ensure
- upstream.close if upstream
- end
-
def drop_privileges
begin
user = @config[:user]
diff --git a/lib/plum/rack/session.rb b/lib/plum/rack/session.rb
index 151a2f0..3a27248 100644
--- a/lib/plum/rack/session.rb
+++ b/lib/plum/rack/session.rb
@@ -8,14 +8,15 @@ module Plum
class Session
attr_reader :app, :plum
- def initialize(app:, plum:, sock:, logger:, config:, remote_addr: "127.0.0.1")
- @app = app
- @plum = plum
+ def initialize(svc, sock, plum)
+ @svc = svc
+ @app = svc.app
@sock = sock
- @logger = logger
- @config = config
- @remote_addr = remote_addr
- @request_thread = {} # used if threaded
+ @plum = plum
+ @logger = svc.logger
+ @config = svc.config
+ @remote_addr = sock.peeraddr.last
+ @threadpool = svc.threadpool
setup_plum
end
@@ -24,12 +25,15 @@ module Plum
@plum.close
end
+ def to_io
+ @sock.to_io
+ end
+
def run
while !@sock.closed? && !@sock.eof?
@plum << @sock.readpartial(1024)
end
ensure
- @request_thread.each { |stream, thread| thread.kill }
stop
end
@@ -57,12 +61,21 @@ module Plum
}
@plum.on(:end_stream) { |stream|
- if @config[:threaded]
- @request_thread[stream] = Thread.new {
- handle_request(stream, reqs[stream][:headers], reqs[stream][:data])
+ req = reqs.delete(stream)
+ err = proc { |err|
+ stream.send_headers({ ":status" => 500 }, end_stream: true)
+ @logger.error(err)
+ }
+ if @threadpool
+ @threadpool.acquire(err) {
+ handle_request(stream, req[:headers], req[:data])
}
else
- handle_request(stream, reqs[stream][:headers], reqs[stream][:data])
+ begin
+ handle_request(stream, req[:headers], req[:data])
+ rescue
+ err.call($!)
+ end
end
}
end
@@ -138,8 +151,6 @@ module Plum
send_body(st, p_body) unless pno_body
}
end
-
- @request_thread.delete(stream)
end
def new_env(h, data)
diff --git a/lib/plum/rack/thread_pool.rb b/lib/plum/rack/thread_pool.rb
new file mode 100644
index 0000000..9e07943
--- /dev/null
+++ b/lib/plum/rack/thread_pool.rb
@@ -0,0 +1,35 @@
+# -*- frozen-string-literal: true -*-
+module Plum
+ module Rack
+ class ThreadPool
+ def initialize(size = 20)
+ @workers = Set.new
+ @jobs = Queue.new
+
+ size.times { |i|
+ spawn_worker
+ }
+ end
+
+ # returns cancel token
+ def acquire(tag = nil, err = nil, &blk)
+ @jobs << [blk, err]
+ end
+
+ private
+ def spawn_worker
+ t = Thread.new {
+ while true
+ job, err = @jobs.pop
+ begin
+ job.call
+ rescue => e
+ err << e if err
+ end
+ end
+ }
+ @workers << t
+ end
+ end
+ end
+end