diff options
Diffstat (limited to 'lib/plum/rack')
-rw-r--r-- | lib/plum/rack/cli.rb | 5 | ||||
-rw-r--r-- | lib/plum/rack/config.rb | 3 | ||||
-rw-r--r-- | lib/plum/rack/dsl.rb | 4 | ||||
-rw-r--r-- | lib/plum/rack/legacy_session.rb | 36 | ||||
-rw-r--r-- | lib/plum/rack/listener.rb | 65 | ||||
-rw-r--r-- | lib/plum/rack/server.rb | 85 | ||||
-rw-r--r-- | lib/plum/rack/session.rb | 39 | ||||
-rw-r--r-- | lib/plum/rack/thread_pool.rb | 35 |
8 files changed, 182 insertions, 90 deletions
diff --git a/lib/plum/rack/cli.rb b/lib/plum/rack/cli.rb index 3ee50fe..9d8a03d 100644 --- a/lib/plum/rack/cli.rb +++ b/lib/plum/rack/cli.rb @@ -45,6 +45,7 @@ module Plum config[:debug] = @options[:debug] unless @options[:debug].nil? config[:server_push] = @options[:server_push] unless @options[:server_push].nil? config[:threaded] = @options[:threaded] unless @options[:threaded].nil? + config[:threadpool_size] = @options[:threadpool_size] unless @options[:threadpool_size].nil? if @options[:fallback_legacy] h, p = @options[:fallback_legacy].split(":") @@ -124,6 +125,10 @@ module Plum @options[:threaded] = true end + o.on "--threadpool-size SIZE", "Set the size of thread pool" do |arg| + @options[:threadpool_size] = arg.to_i + end + o.on "--fallback-legacy HOST:PORT", "Fallbacks if the client doesn't support HTTP/2" do |arg| @options[:fallback_legacy] = arg end diff --git a/lib/plum/rack/config.rb b/lib/plum/rack/config.rb index 22e0691..a5a651f 100644 --- a/lib/plum/rack/config.rb +++ b/lib/plum/rack/config.rb @@ -7,7 +7,8 @@ module Plum debug: false, log: nil, # $stdout server_push: true, - threaded: false + threaded: false, + threadpool_size: 20, }.freeze def initialize(config = {}) diff --git a/lib/plum/rack/dsl.rb b/lib/plum/rack/dsl.rb index 006d6dc..1b46360 100644 --- a/lib/plum/rack/dsl.rb +++ b/lib/plum/rack/dsl.rb @@ -44,6 +44,10 @@ module Plum @config[:threaded] = !!bool end + def threadpool_size(int) + @config[:threadpool_size] = int.to_i + end + def fallback_legacy(str) h, p = str.split(":") @config[:fallback_legacy_host] = h diff --git a/lib/plum/rack/legacy_session.rb b/lib/plum/rack/legacy_session.rb new file mode 100644 index 0000000..6712933 --- /dev/null +++ b/lib/plum/rack/legacy_session.rb @@ -0,0 +1,36 @@ +# -*- frozen-string-literal: true -*- +using Plum::BinaryString + +module Plum + module Rack + class LegacySession + def initialize(svc, e, sock) + @svc = svc + @e = e + @sock = sock + @config = svc.config + end + + def run + if @config[:fallback_legacy_host] + @logger.info "legacy HTTP: fallbacking to: #{@config[:fallback_legacy_host]}:#{@config[:fallback_legacy_port]}" + upstream = TCPSocket.open(@config[:fallback_legacy_host], @config[:fallback_legacy_port]) + upstream.write(@e.buf) if @e.buf + loop do + ret = IO.select([@sock, upstream]) + ret[0].each { |s| + a = s.readpartial(65536) + if s == upstream + @sock.write(a) + else + upstream.write(a) + end + } + end + end + ensure + upstream.close if upstream + end + end + end +end diff --git a/lib/plum/rack/listener.rb b/lib/plum/rack/listener.rb index 3ff6c4a..8af12be 100644 --- a/lib/plum/rack/listener.rb +++ b/lib/plum/rack/listener.rb @@ -10,6 +10,10 @@ module Plum raise "not implemented" end + def accept(svc) + raise "not implemented" + end + def method_missing(name, *args) @server.__send__(name, *args) end @@ -24,8 +28,24 @@ module Plum @server.to_io end - def plum(sock) - ::Plum::HTTPServerConnection.new(sock.method(:write)) + def accept(svc) + sock = @server.accept + Thread.start { + begin + plum = ::Plum::HTTPServerConnection.new(sock.method(:write)) + sess = Session.new(svc, sock, plum) + sess.run + rescue Errno::ECONNRESET, EOFError # closed + rescue ::Plum::LegacyHTTPError => e + @logger.info "legacy HTTP client: #{e}" + sess = LegacySession.new(svc, e, sock) + sess.run + rescue => e + svc.log_exception(e) + ensure + sock.close + end + } end end @@ -57,7 +77,7 @@ module Plum } tcp_server = ::TCPServer.new(lc[:hostname], lc[:port]) @server = OpenSSL::SSL::SSLServer.new(tcp_server, ctx) - @server.start_immediately = false + @server.start_immediately = false # call socket#accept twice: [tcp, tls] end def parse_chained_cert(str) @@ -68,9 +88,26 @@ module Plum @server.to_io end - def plum(sock) - raise ::Plum::LegacyHTTPError.new("client didn't offer h2 with ALPN", nil) unless sock.alpn_protocol == "h2" - ::Plum::ServerConnection.new(sock.method(:write)) + def accept(svc) + sock = @server.accept + Thread.start { + begin + sock = sock.accept + raise ::Plum::LegacyHTTPError.new("client didn't offer h2 with ALPN", nil) unless sock.alpn_protocol == "h2" + plum = ::Plum::ServerConnection.new(sock.method(:write)) + sess = Session.new(svc, sock, plum) + sess.run + rescue Errno::ECONNRESET, EOFError # closed + rescue ::Plum::LegacyHTTPError => e + @logger.info "legacy HTTP client: #{e}" + sess = LegacySession.new(svc, e, sock) + sess.run + rescue => e + svc.log_exception(e) + ensure + sock.close if sock + end + } end private @@ -126,8 +163,20 @@ module Plum @server.to_io end - def plum(sock) - ::Plum::ServerConnection.new(sock.method(:write)) + def accept(svc) + sock = @server.accept + Thread.start { + begin + plum = ::Plum::ServerConnection.new(sock.method(:write)) + sess = Session.new(svc, sock, plum) + sess.run + rescue Errno::ECONNRESET, EOFError # closed + rescue => e + svc.log_exception(e) + ensure + sock.close if sock + end + } end end end diff --git a/lib/plum/rack/server.rb b/lib/plum/rack/server.rb index 9a0caf4..9aedc20 100644 --- a/lib/plum/rack/server.rb +++ b/lib/plum/rack/server.rb @@ -2,18 +2,15 @@ module Plum module Rack class Server - attr_reader :config + attr_reader :config, :app, :logger, :threadpool def initialize(app, config) @config = config @state = :null @app = config[:debug] ? ::Rack::CommonLogger.new(app) : app - @logger = Logger.new(config[:log] || $stdout).tap { |l| - l.level = config[:debug] ? Logger::DEBUG : Logger::INFO - } - @listeners = config[:listeners].map { |lc| - lc[:listener].new(lc) - } + @logger = Logger.new(config[:log] || $stdout).tap { |l| l.level = config[:debug] ? Logger::DEBUG : Logger::INFO } + @listeners = config[:listeners].map { |lc| lc[:listener].new(lc) } + @threadpool = ThreadPool.new(@config[:threadpool_size]) if @config[:threaded] @logger.info("Plum #{::Plum::VERSION}") @logger.info("Config: #{config}") @@ -24,87 +21,41 @@ module Plum end def start + #trap(:INT) { @state = :ee } + #require "lineprof" + #Lineprof.profile(//){ @state = :running - while @state == :running - break if @listeners.empty? + while @state == :running && !@listeners.empty? begin if ss = IO.select(@listeners, nil, nil, 2.0) ss[0].each { |svr| - new_con(svr) + begin + svr.accept(self) + rescue Errno::ECONNRESET, Errno::ECONNABORTED # closed + rescue => e + log_exception(e) + end } end - rescue Errno::EBADF, Errno::ENOTSOCK, IOError => e # closed - rescue StandardError => e + rescue Errno::EBADF # closed + rescue => e log_exception(e) end end + #} end def stop @state = :stop @listeners.map(&:stop) - # TODO: gracefully shutdown connections + # TODO: gracefully shutdown connections (wait threadpool?) end private - def new_con(svr) - sock = svr.accept - Thread.new { - begin - begin - sock = sock.accept if sock.respond_to?(:accept) - plum = svr.plum(sock) - - con = Session.new(app: @app, - plum: plum, - sock: sock, - logger: @logger, - config: @config, - remote_addr: sock.peeraddr.last) - con.run - rescue ::Plum::LegacyHTTPError => e - @logger.info "legacy HTTP client: #{e}" - handle_legacy(e, sock) - end - rescue Errno::ECONNRESET, Errno::EPROTO, Errno::EINVAL, EOFError => e # closed - rescue StandardError => e - log_exception(e) - ensure - sock.close if sock - end - } - rescue Errno::ECONNRESET, Errno::ECONNABORTED, Errno::EPROTO, Errno::EINVAL => e # closed - sock.close if sock - rescue StandardError => e - log_exception(e) - sock.close if sock - end - def log_exception(e) @logger.error("#{e.class}: #{e.message}\n#{e.backtrace.map { |b| "\t#{b}" }.join("\n")}") end - def handle_legacy(e, sock) - if @config[:fallback_legacy_host] - @logger.info "legacy HTTP: fallbacking to: #{@config[:fallback_legacy_host]}:#{@config[:fallback_legacy_port]}" - upstream = TCPSocket.open(@config[:fallback_legacy_host], @config[:fallback_legacy_port]) - upstream.write(e.buf) if e.buf - loop do - ret = IO.select([sock, upstream]) - ret[0].each { |s| - a = s.readpartial(65536) - if s == upstream - sock.write(a) - else - upstream.write(a) - end - } - end - end - ensure - upstream.close if upstream - end - def drop_privileges begin user = @config[:user] diff --git a/lib/plum/rack/session.rb b/lib/plum/rack/session.rb index 151a2f0..3a27248 100644 --- a/lib/plum/rack/session.rb +++ b/lib/plum/rack/session.rb @@ -8,14 +8,15 @@ module Plum class Session attr_reader :app, :plum - def initialize(app:, plum:, sock:, logger:, config:, remote_addr: "127.0.0.1") - @app = app - @plum = plum + def initialize(svc, sock, plum) + @svc = svc + @app = svc.app @sock = sock - @logger = logger - @config = config - @remote_addr = remote_addr - @request_thread = {} # used if threaded + @plum = plum + @logger = svc.logger + @config = svc.config + @remote_addr = sock.peeraddr.last + @threadpool = svc.threadpool setup_plum end @@ -24,12 +25,15 @@ module Plum @plum.close end + def to_io + @sock.to_io + end + def run while !@sock.closed? && !@sock.eof? @plum << @sock.readpartial(1024) end ensure - @request_thread.each { |stream, thread| thread.kill } stop end @@ -57,12 +61,21 @@ module Plum } @plum.on(:end_stream) { |stream| - if @config[:threaded] - @request_thread[stream] = Thread.new { - handle_request(stream, reqs[stream][:headers], reqs[stream][:data]) + req = reqs.delete(stream) + err = proc { |err| + stream.send_headers({ ":status" => 500 }, end_stream: true) + @logger.error(err) + } + if @threadpool + @threadpool.acquire(err) { + handle_request(stream, req[:headers], req[:data]) } else - handle_request(stream, reqs[stream][:headers], reqs[stream][:data]) + begin + handle_request(stream, req[:headers], req[:data]) + rescue + err.call($!) + end end } end @@ -138,8 +151,6 @@ module Plum send_body(st, p_body) unless pno_body } end - - @request_thread.delete(stream) end def new_env(h, data) diff --git a/lib/plum/rack/thread_pool.rb b/lib/plum/rack/thread_pool.rb new file mode 100644 index 0000000..9e07943 --- /dev/null +++ b/lib/plum/rack/thread_pool.rb @@ -0,0 +1,35 @@ +# -*- frozen-string-literal: true -*- +module Plum + module Rack + class ThreadPool + def initialize(size = 20) + @workers = Set.new + @jobs = Queue.new + + size.times { |i| + spawn_worker + } + end + + # returns cancel token + def acquire(tag = nil, err = nil, &blk) + @jobs << [blk, err] + end + + private + def spawn_worker + t = Thread.new { + while true + job, err = @jobs.pop + begin + job.call + rescue => e + err << e if err + end + end + } + @workers << t + end + end + end +end |