diff options
author | Kazuki Yamaguchi <k@rhe.jp> | 2015-10-23 21:00:05 +0900 |
---|---|---|
committer | Kazuki Yamaguchi <k@rhe.jp> | 2015-10-23 21:00:05 +0900 |
commit | 4a645bc4cdce3ead0668e2b5be50d473739b8a8d (patch) | |
tree | c7d4def784836013e1d5ae1e81ea6589068824c5 | |
parent | f385e1d5770ca5063f2420ed24a3f8c91363cd62 (diff) | |
parent | b750b4fdca59fa63000847d0daece6d0e28ef874 (diff) | |
download | plum-4a645bc4cdce3ead0668e2b5be50d473739b8a8d.tar.gz |
Merge branch 'master' of github.com:rhenium/plum-rack
-rw-r--r-- | lib/plum/rack/connection.rb | 118 | ||||
-rw-r--r-- | lib/plum/rack/listener.rb | 30 | ||||
-rw-r--r-- | lib/plum/rack/server.rb | 18 | ||||
-rw-r--r-- | lib/rack/handler/plum.rb | 5 |
4 files changed, 101 insertions, 70 deletions
diff --git a/lib/plum/rack/connection.rb b/lib/plum/rack/connection.rb index f84ec5b..2ab33e5 100644 --- a/lib/plum/rack/connection.rb +++ b/lib/plum/rack/connection.rb @@ -1,36 +1,34 @@ module Plum module Rack class Connection - attr_reader :app, :listener, :plum + attr_reader :app, :sock, :plum - def initialize(app, listener, logger) + def initialize(app, plum, logger) @app = app - @listener = listener + @plum = plum @logger = logger + + setup_plum end def stop - @listener.close # TODO: gracefully shutdown + @plum.stop end - def start - Thread.new { - begin - @plum = setup_plum - @plum.run - rescue Errno::EPIPE, Errno::ECONNRESET => e - rescue StandardError => e - @logger.error("#{e.class}: #{e.message}\n#{e.backtrace.map { |b| "\t#{b}" }.join("\n")}") - end - } + def run + begin + @plum.run + rescue Errno::EPIPE, Errno::ECONNRESET => e + rescue StandardError => e + @logger.error("#{e.class}: #{e.message}\n#{e.backtrace.map { |b| "\t#{b}" }.join("\n")}") + end end private def setup_plum - plum = @listener.plum - plum.on(:connection_error) { |ex| @logger.error(ex) } + @plum.on(:connection_error) { |ex| @logger.error(ex) } - plum.on(:stream) do |stream| + @plum.on(:stream) do |stream| stream.on(:stream_error) { |ex| @logger.error(ex) } headers = data = nil @@ -48,26 +46,60 @@ module Plum } stream.on(:end_stream) { - env = new_env(headers, data) - r_headers, r_body = new_resp(@app.call(env)) - - if r_body.is_a?(::Rack::BodyProxy) - begin - stream.respond(r_headers, end_stream: false) - r_body.each { |part| - stream.send_data(part, end_stream: false) - } - stream.send_data(nil) - ensure - r_body.close - end - else - stream.respond(r_headers, r_body) - end + handle_request(stream, headers, data) } end + end + + def send_body(stream, body) + if body.is_a?(::Rack::BodyProxy) + begin + body.each { |part| + stream.send_data(part, end_stream: false) + } + ensure + body.close + end + stream.send_data(nil, end_stream: true) + else + stream.send_data(body, end_stream: true) + end + end + + def extract_push(r_rawheaders) + _, pushs = r_rawheaders.find { |k, v| k == "plum.serverpush" } + if pushs + pushs.split(";").map { |push| push.split(" ", 2) } + else + [] + end + end + + def handle_request(stream, headers, data) + env = new_env(headers, data) + r_status, r_rawheaders, r_body = @app.call(env) + r_headers = extract_headers(r_status, r_rawheaders) + r_topushs = extract_push(r_rawheaders) + + stream.send_headers(r_headers, end_stream: false) + r_pushstreams = r_topushs.map { |method, path| + preq = { ":authority" => headers.find { |k, v| k == ":authority" }[1], + ":method" => method.to_s.upcase, + ":scheme" => headers.find { |k, v| k == ":scheme" }[1], + ":path" => path } + st = stream.promise(preq) + [st, preq] + } + + send_body(stream, r_body) - plum + r_pushstreams.each { |st, preq| + penv = new_env(preq, "") + p_status, p_h, p_body = @app.call(penv) + p_headers = extract_headers(p_status, p_h) + st.send_headers(p_headers, end_stream: false) + send_body(st, p_body) + } end def new_env(h, data) @@ -79,11 +111,11 @@ module Plum end }.to_h - cmethod = headers.delete(":method") - cpath = headers.delete(":path") + cmethod = headers[":method"] + cpath = headers[":path"] cpath_name, cpath_query = cpath.split("?", 2).map(&:to_s) - cauthority = headers.delete(":authority") - cscheme = headers.delete(":scheme") + cauthority = headers[":authority"] + cscheme = headers[":scheme"] ebase = { "REQUEST_METHOD" => cmethod, "SCRIPT_NAME" => "", @@ -94,7 +126,9 @@ module Plum } headers.each {|key, value| - ebase["HTTP_" + key.gsub("-", "_").upcase] = value + unless key.start_with?(":") && key.include?(".") + ebase["HTTP_" + key.gsub("-", "_").upcase] = value + end } ebase.merge!({ @@ -111,9 +145,7 @@ module Plum ebase end - def new_resp(app_call) - r_status, r_h, r_body = app_call - + def extract_headers(r_status, r_h) rbase = { ":status" => r_status, "server" => "plum/#{::Plum::VERSION}", @@ -133,7 +165,7 @@ module Plum end end - [rbase, r_body] + rbase end end end diff --git a/lib/plum/rack/listener.rb b/lib/plum/rack/listener.rb index 5127eb3..83a3dff 100644 --- a/lib/plum/rack/listener.rb +++ b/lib/plum/rack/listener.rb @@ -9,17 +9,8 @@ module Plum raise "not implemented" end - def accept - @sock = to_io.accept - self - end - def method_missing(name, *args) - if @sock - @sock.__send__(name, *args) - else - @server.__send__(name, *args) - end + @server.__send__(name, *args) end end @@ -32,8 +23,8 @@ module Plum @server.to_io end - def plum - ::Plum::HTTPConnection.new(self) + def plum(sock) + ::Plum::HTTPConnection.new(sock) end end @@ -56,20 +47,15 @@ module Plum ctx.key = OpenSSL::PKey::RSA.new(key) tcp_server = ::TCPServer.new(lc[:hostname], lc[:port]) @server = OpenSSL::SSL::SSLServer.new(tcp_server, ctx) - @server.start_immediately = true # TODO + @server.start_immediately = false end def to_io @server.to_io end - def accept - @sock = @server.accept - self - end - - def plum - ::Plum::HTTPSConnection.new(self) + def plum(sock) + ::Plum::HTTPSConnection.new(sock) end private @@ -111,8 +97,8 @@ module Plum @server.to_io end - def plum - ::Plum::HTTPConnection.new(self) + def plum(sock) + ::Plum::HTTPSConnection.new(sock) end end end diff --git a/lib/plum/rack/server.rb b/lib/plum/rack/server.rb index f942df4..496a375 100644 --- a/lib/plum/rack/server.rb +++ b/lib/plum/rack/server.rb @@ -35,15 +35,27 @@ module Plum def stop @state = :stop @listeners.map(&:stop) + # TODO: gracefully shutdown connections end private def new_con(svr) sock = svr.accept - @logger.debug("accept: #{sock}") + Thread.new { + begin + sock = sock.accept if sock.respond_to?(:accept) + plum = svr.plum(sock) + @logger.debug("accept: #{plum}") - con = Connection.new(@app, sock, @logger) - con.start + con = Connection.new(@app, plum, @logger) + con.run + rescue Errno::ECONNRESET, Errno::ECONNABORTED, Errno::EPROTO, Errno::EINVAL => e # closed + sock.close if sock + rescue StandardError => e + @logger.error("#{e.class}: #{e.message}\n#{e.backtrace.map { |b| "\t#{b}" }.join("\n")}") + sock.close if sock + end + } rescue Errno::ECONNRESET, Errno::ECONNABORTED, Errno::EPROTO, Errno::EINVAL => e # closed rescue StandardError => e @logger.error("#{e.class}: #{e.message}\n#{e.backtrace.map { |b| "\t#{b}" }.join("\n")}") diff --git a/lib/rack/handler/plum.rb b/lib/rack/handler/plum.rb index 1052592..c3ca543 100644 --- a/lib/rack/handler/plum.rb +++ b/lib/rack/handler/plum.rb @@ -31,10 +31,11 @@ module Rack private def self.default_options rack_env = ENV["RACK_ENV"] || "development" + dev = rack_env == "development" default_options = { - Host: rack_env == "development" ? "localhost" : "0.0.0.0", + Host: dev ? "localhost" : "0.0.0.0", Port: 8080, - Debug: true, + Debug: dev, } end end |