aboutsummaryrefslogtreecommitdiffstats
path: root/lib/plum/rack/server.rb
diff options
context:
space:
mode:
authorKazuki Yamaguchi <k@rhe.jp>2016-01-14 12:46:28 +0900
committerKazuki Yamaguchi <k@rhe.jp>2016-01-14 12:46:28 +0900
commit6f2e135321b09efa6958c477e6729a93d1700e1f (patch)
treed818fec74d6d4bbcefe5de2c5212c6ebb0642f6f /lib/plum/rack/server.rb
parent8ab57c01c62622249c51291d01185df53d2d7a9b (diff)
parent65ba41222eb0caf8d882c6fa2e202b313ef2801e (diff)
downloadplum-6f2e135321b09efa6958c477e6729a93d1700e1f.tar.gz
Merge branch 'threaded'
Diffstat (limited to 'lib/plum/rack/server.rb')
-rw-r--r--lib/plum/rack/server.rb85
1 files changed, 18 insertions, 67 deletions
diff --git a/lib/plum/rack/server.rb b/lib/plum/rack/server.rb
index 9a0caf4..9aedc20 100644
--- a/lib/plum/rack/server.rb
+++ b/lib/plum/rack/server.rb
@@ -2,18 +2,15 @@
module Plum
module Rack
class Server
- attr_reader :config
+ attr_reader :config, :app, :logger, :threadpool
def initialize(app, config)
@config = config
@state = :null
@app = config[:debug] ? ::Rack::CommonLogger.new(app) : app
- @logger = Logger.new(config[:log] || $stdout).tap { |l|
- l.level = config[:debug] ? Logger::DEBUG : Logger::INFO
- }
- @listeners = config[:listeners].map { |lc|
- lc[:listener].new(lc)
- }
+ @logger = Logger.new(config[:log] || $stdout).tap { |l| l.level = config[:debug] ? Logger::DEBUG : Logger::INFO }
+ @listeners = config[:listeners].map { |lc| lc[:listener].new(lc) }
+ @threadpool = ThreadPool.new(@config[:threadpool_size]) if @config[:threaded]
@logger.info("Plum #{::Plum::VERSION}")
@logger.info("Config: #{config}")
@@ -24,87 +21,41 @@ module Plum
end
def start
+ #trap(:INT) { @state = :ee }
+ #require "lineprof"
+ #Lineprof.profile(//){
@state = :running
- while @state == :running
- break if @listeners.empty?
+ while @state == :running && !@listeners.empty?
begin
if ss = IO.select(@listeners, nil, nil, 2.0)
ss[0].each { |svr|
- new_con(svr)
+ begin
+ svr.accept(self)
+ rescue Errno::ECONNRESET, Errno::ECONNABORTED # closed
+ rescue => e
+ log_exception(e)
+ end
}
end
- rescue Errno::EBADF, Errno::ENOTSOCK, IOError => e # closed
- rescue StandardError => e
+ rescue Errno::EBADF # closed
+ rescue => e
log_exception(e)
end
end
+ #}
end
def stop
@state = :stop
@listeners.map(&:stop)
- # TODO: gracefully shutdown connections
+ # TODO: gracefully shutdown connections (wait threadpool?)
end
private
- def new_con(svr)
- sock = svr.accept
- Thread.new {
- begin
- begin
- sock = sock.accept if sock.respond_to?(:accept)
- plum = svr.plum(sock)
-
- con = Session.new(app: @app,
- plum: plum,
- sock: sock,
- logger: @logger,
- config: @config,
- remote_addr: sock.peeraddr.last)
- con.run
- rescue ::Plum::LegacyHTTPError => e
- @logger.info "legacy HTTP client: #{e}"
- handle_legacy(e, sock)
- end
- rescue Errno::ECONNRESET, Errno::EPROTO, Errno::EINVAL, EOFError => e # closed
- rescue StandardError => e
- log_exception(e)
- ensure
- sock.close if sock
- end
- }
- rescue Errno::ECONNRESET, Errno::ECONNABORTED, Errno::EPROTO, Errno::EINVAL => e # closed
- sock.close if sock
- rescue StandardError => e
- log_exception(e)
- sock.close if sock
- end
-
def log_exception(e)
@logger.error("#{e.class}: #{e.message}\n#{e.backtrace.map { |b| "\t#{b}" }.join("\n")}")
end
- def handle_legacy(e, sock)
- if @config[:fallback_legacy_host]
- @logger.info "legacy HTTP: fallbacking to: #{@config[:fallback_legacy_host]}:#{@config[:fallback_legacy_port]}"
- upstream = TCPSocket.open(@config[:fallback_legacy_host], @config[:fallback_legacy_port])
- upstream.write(e.buf) if e.buf
- loop do
- ret = IO.select([sock, upstream])
- ret[0].each { |s|
- a = s.readpartial(65536)
- if s == upstream
- sock.write(a)
- else
- upstream.write(a)
- end
- }
- end
- end
- ensure
- upstream.close if upstream
- end
-
def drop_privileges
begin
user = @config[:user]