diff options
author | Kazuki Yamaguchi <k@rhe.jp> | 2016-01-14 12:46:28 +0900 |
---|---|---|
committer | Kazuki Yamaguchi <k@rhe.jp> | 2016-01-14 12:46:28 +0900 |
commit | 6f2e135321b09efa6958c477e6729a93d1700e1f (patch) | |
tree | d818fec74d6d4bbcefe5de2c5212c6ebb0642f6f /lib/plum/rack/server.rb | |
parent | 8ab57c01c62622249c51291d01185df53d2d7a9b (diff) | |
parent | 65ba41222eb0caf8d882c6fa2e202b313ef2801e (diff) | |
download | plum-6f2e135321b09efa6958c477e6729a93d1700e1f.tar.gz |
Merge branch 'threaded'
Diffstat (limited to 'lib/plum/rack/server.rb')
-rw-r--r-- | lib/plum/rack/server.rb | 85 |
1 files changed, 18 insertions, 67 deletions
diff --git a/lib/plum/rack/server.rb b/lib/plum/rack/server.rb index 9a0caf4..9aedc20 100644 --- a/lib/plum/rack/server.rb +++ b/lib/plum/rack/server.rb @@ -2,18 +2,15 @@ module Plum module Rack class Server - attr_reader :config + attr_reader :config, :app, :logger, :threadpool def initialize(app, config) @config = config @state = :null @app = config[:debug] ? ::Rack::CommonLogger.new(app) : app - @logger = Logger.new(config[:log] || $stdout).tap { |l| - l.level = config[:debug] ? Logger::DEBUG : Logger::INFO - } - @listeners = config[:listeners].map { |lc| - lc[:listener].new(lc) - } + @logger = Logger.new(config[:log] || $stdout).tap { |l| l.level = config[:debug] ? Logger::DEBUG : Logger::INFO } + @listeners = config[:listeners].map { |lc| lc[:listener].new(lc) } + @threadpool = ThreadPool.new(@config[:threadpool_size]) if @config[:threaded] @logger.info("Plum #{::Plum::VERSION}") @logger.info("Config: #{config}") @@ -24,87 +21,41 @@ module Plum end def start + #trap(:INT) { @state = :ee } + #require "lineprof" + #Lineprof.profile(//){ @state = :running - while @state == :running - break if @listeners.empty? + while @state == :running && !@listeners.empty? begin if ss = IO.select(@listeners, nil, nil, 2.0) ss[0].each { |svr| - new_con(svr) + begin + svr.accept(self) + rescue Errno::ECONNRESET, Errno::ECONNABORTED # closed + rescue => e + log_exception(e) + end } end - rescue Errno::EBADF, Errno::ENOTSOCK, IOError => e # closed - rescue StandardError => e + rescue Errno::EBADF # closed + rescue => e log_exception(e) end end + #} end def stop @state = :stop @listeners.map(&:stop) - # TODO: gracefully shutdown connections + # TODO: gracefully shutdown connections (wait threadpool?) end private - def new_con(svr) - sock = svr.accept - Thread.new { - begin - begin - sock = sock.accept if sock.respond_to?(:accept) - plum = svr.plum(sock) - - con = Session.new(app: @app, - plum: plum, - sock: sock, - logger: @logger, - config: @config, - remote_addr: sock.peeraddr.last) - con.run - rescue ::Plum::LegacyHTTPError => e - @logger.info "legacy HTTP client: #{e}" - handle_legacy(e, sock) - end - rescue Errno::ECONNRESET, Errno::EPROTO, Errno::EINVAL, EOFError => e # closed - rescue StandardError => e - log_exception(e) - ensure - sock.close if sock - end - } - rescue Errno::ECONNRESET, Errno::ECONNABORTED, Errno::EPROTO, Errno::EINVAL => e # closed - sock.close if sock - rescue StandardError => e - log_exception(e) - sock.close if sock - end - def log_exception(e) @logger.error("#{e.class}: #{e.message}\n#{e.backtrace.map { |b| "\t#{b}" }.join("\n")}") end - def handle_legacy(e, sock) - if @config[:fallback_legacy_host] - @logger.info "legacy HTTP: fallbacking to: #{@config[:fallback_legacy_host]}:#{@config[:fallback_legacy_port]}" - upstream = TCPSocket.open(@config[:fallback_legacy_host], @config[:fallback_legacy_port]) - upstream.write(e.buf) if e.buf - loop do - ret = IO.select([sock, upstream]) - ret[0].each { |s| - a = s.readpartial(65536) - if s == upstream - sock.write(a) - else - upstream.write(a) - end - } - end - end - ensure - upstream.close if upstream - end - def drop_privileges begin user = @config[:user] |