summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorKazuki Yamaguchi <k@rhe.jp>2015-10-23 21:00:05 +0900
committerKazuki Yamaguchi <k@rhe.jp>2015-10-23 21:00:05 +0900
commit4a645bc4cdce3ead0668e2b5be50d473739b8a8d (patch)
treec7d4def784836013e1d5ae1e81ea6589068824c5
parentf385e1d5770ca5063f2420ed24a3f8c91363cd62 (diff)
parentb750b4fdca59fa63000847d0daece6d0e28ef874 (diff)
downloadplum-4a645bc4cdce3ead0668e2b5be50d473739b8a8d.tar.gz
Merge branch 'master' of github.com:rhenium/plum-rack
-rw-r--r--lib/plum/rack/connection.rb118
-rw-r--r--lib/plum/rack/listener.rb30
-rw-r--r--lib/plum/rack/server.rb18
-rw-r--r--lib/rack/handler/plum.rb5
4 files changed, 101 insertions, 70 deletions
diff --git a/lib/plum/rack/connection.rb b/lib/plum/rack/connection.rb
index f84ec5b..2ab33e5 100644
--- a/lib/plum/rack/connection.rb
+++ b/lib/plum/rack/connection.rb
@@ -1,36 +1,34 @@
module Plum
module Rack
class Connection
- attr_reader :app, :listener, :plum
+ attr_reader :app, :sock, :plum
- def initialize(app, listener, logger)
+ def initialize(app, plum, logger)
@app = app
- @listener = listener
+ @plum = plum
@logger = logger
+
+ setup_plum
end
def stop
- @listener.close # TODO: gracefully shutdown
+ @plum.stop
end
- def start
- Thread.new {
- begin
- @plum = setup_plum
- @plum.run
- rescue Errno::EPIPE, Errno::ECONNRESET => e
- rescue StandardError => e
- @logger.error("#{e.class}: #{e.message}\n#{e.backtrace.map { |b| "\t#{b}" }.join("\n")}")
- end
- }
+ def run
+ begin
+ @plum.run
+ rescue Errno::EPIPE, Errno::ECONNRESET => e
+ rescue StandardError => e
+ @logger.error("#{e.class}: #{e.message}\n#{e.backtrace.map { |b| "\t#{b}" }.join("\n")}")
+ end
end
private
def setup_plum
- plum = @listener.plum
- plum.on(:connection_error) { |ex| @logger.error(ex) }
+ @plum.on(:connection_error) { |ex| @logger.error(ex) }
- plum.on(:stream) do |stream|
+ @plum.on(:stream) do |stream|
stream.on(:stream_error) { |ex| @logger.error(ex) }
headers = data = nil
@@ -48,26 +46,60 @@ module Plum
}
stream.on(:end_stream) {
- env = new_env(headers, data)
- r_headers, r_body = new_resp(@app.call(env))
-
- if r_body.is_a?(::Rack::BodyProxy)
- begin
- stream.respond(r_headers, end_stream: false)
- r_body.each { |part|
- stream.send_data(part, end_stream: false)
- }
- stream.send_data(nil)
- ensure
- r_body.close
- end
- else
- stream.respond(r_headers, r_body)
- end
+ handle_request(stream, headers, data)
}
end
+ end
+
+ def send_body(stream, body)
+ if body.is_a?(::Rack::BodyProxy)
+ begin
+ body.each { |part|
+ stream.send_data(part, end_stream: false)
+ }
+ ensure
+ body.close
+ end
+ stream.send_data(nil, end_stream: true)
+ else
+ stream.send_data(body, end_stream: true)
+ end
+ end
+
+ def extract_push(r_rawheaders)
+ _, pushs = r_rawheaders.find { |k, v| k == "plum.serverpush" }
+ if pushs
+ pushs.split(";").map { |push| push.split(" ", 2) }
+ else
+ []
+ end
+ end
+
+ def handle_request(stream, headers, data)
+ env = new_env(headers, data)
+ r_status, r_rawheaders, r_body = @app.call(env)
+ r_headers = extract_headers(r_status, r_rawheaders)
+ r_topushs = extract_push(r_rawheaders)
+
+ stream.send_headers(r_headers, end_stream: false)
+ r_pushstreams = r_topushs.map { |method, path|
+ preq = { ":authority" => headers.find { |k, v| k == ":authority" }[1],
+ ":method" => method.to_s.upcase,
+ ":scheme" => headers.find { |k, v| k == ":scheme" }[1],
+ ":path" => path }
+ st = stream.promise(preq)
+ [st, preq]
+ }
+
+ send_body(stream, r_body)
- plum
+ r_pushstreams.each { |st, preq|
+ penv = new_env(preq, "")
+ p_status, p_h, p_body = @app.call(penv)
+ p_headers = extract_headers(p_status, p_h)
+ st.send_headers(p_headers, end_stream: false)
+ send_body(st, p_body)
+ }
end
def new_env(h, data)
@@ -79,11 +111,11 @@ module Plum
end
}.to_h
- cmethod = headers.delete(":method")
- cpath = headers.delete(":path")
+ cmethod = headers[":method"]
+ cpath = headers[":path"]
cpath_name, cpath_query = cpath.split("?", 2).map(&:to_s)
- cauthority = headers.delete(":authority")
- cscheme = headers.delete(":scheme")
+ cauthority = headers[":authority"]
+ cscheme = headers[":scheme"]
ebase = {
"REQUEST_METHOD" => cmethod,
"SCRIPT_NAME" => "",
@@ -94,7 +126,9 @@ module Plum
}
headers.each {|key, value|
- ebase["HTTP_" + key.gsub("-", "_").upcase] = value
+ unless key.start_with?(":") && key.include?(".")
+ ebase["HTTP_" + key.gsub("-", "_").upcase] = value
+ end
}
ebase.merge!({
@@ -111,9 +145,7 @@ module Plum
ebase
end
- def new_resp(app_call)
- r_status, r_h, r_body = app_call
-
+ def extract_headers(r_status, r_h)
rbase = {
":status" => r_status,
"server" => "plum/#{::Plum::VERSION}",
@@ -133,7 +165,7 @@ module Plum
end
end
- [rbase, r_body]
+ rbase
end
end
end
diff --git a/lib/plum/rack/listener.rb b/lib/plum/rack/listener.rb
index 5127eb3..83a3dff 100644
--- a/lib/plum/rack/listener.rb
+++ b/lib/plum/rack/listener.rb
@@ -9,17 +9,8 @@ module Plum
raise "not implemented"
end
- def accept
- @sock = to_io.accept
- self
- end
-
def method_missing(name, *args)
- if @sock
- @sock.__send__(name, *args)
- else
- @server.__send__(name, *args)
- end
+ @server.__send__(name, *args)
end
end
@@ -32,8 +23,8 @@ module Plum
@server.to_io
end
- def plum
- ::Plum::HTTPConnection.new(self)
+ def plum(sock)
+ ::Plum::HTTPConnection.new(sock)
end
end
@@ -56,20 +47,15 @@ module Plum
ctx.key = OpenSSL::PKey::RSA.new(key)
tcp_server = ::TCPServer.new(lc[:hostname], lc[:port])
@server = OpenSSL::SSL::SSLServer.new(tcp_server, ctx)
- @server.start_immediately = true # TODO
+ @server.start_immediately = false
end
def to_io
@server.to_io
end
- def accept
- @sock = @server.accept
- self
- end
-
- def plum
- ::Plum::HTTPSConnection.new(self)
+ def plum(sock)
+ ::Plum::HTTPSConnection.new(sock)
end
private
@@ -111,8 +97,8 @@ module Plum
@server.to_io
end
- def plum
- ::Plum::HTTPConnection.new(self)
+ def plum(sock)
+ ::Plum::HTTPSConnection.new(sock)
end
end
end
diff --git a/lib/plum/rack/server.rb b/lib/plum/rack/server.rb
index f942df4..496a375 100644
--- a/lib/plum/rack/server.rb
+++ b/lib/plum/rack/server.rb
@@ -35,15 +35,27 @@ module Plum
def stop
@state = :stop
@listeners.map(&:stop)
+ # TODO: gracefully shutdown connections
end
private
def new_con(svr)
sock = svr.accept
- @logger.debug("accept: #{sock}")
+ Thread.new {
+ begin
+ sock = sock.accept if sock.respond_to?(:accept)
+ plum = svr.plum(sock)
+ @logger.debug("accept: #{plum}")
- con = Connection.new(@app, sock, @logger)
- con.start
+ con = Connection.new(@app, plum, @logger)
+ con.run
+ rescue Errno::ECONNRESET, Errno::ECONNABORTED, Errno::EPROTO, Errno::EINVAL => e # closed
+ sock.close if sock
+ rescue StandardError => e
+ @logger.error("#{e.class}: #{e.message}\n#{e.backtrace.map { |b| "\t#{b}" }.join("\n")}")
+ sock.close if sock
+ end
+ }
rescue Errno::ECONNRESET, Errno::ECONNABORTED, Errno::EPROTO, Errno::EINVAL => e # closed
rescue StandardError => e
@logger.error("#{e.class}: #{e.message}\n#{e.backtrace.map { |b| "\t#{b}" }.join("\n")}")
diff --git a/lib/rack/handler/plum.rb b/lib/rack/handler/plum.rb
index 1052592..c3ca543 100644
--- a/lib/rack/handler/plum.rb
+++ b/lib/rack/handler/plum.rb
@@ -31,10 +31,11 @@ module Rack
private
def self.default_options
rack_env = ENV["RACK_ENV"] || "development"
+ dev = rack_env == "development"
default_options = {
- Host: rack_env == "development" ? "localhost" : "0.0.0.0",
+ Host: dev ? "localhost" : "0.0.0.0",
Port: 8080,
- Debug: true,
+ Debug: dev,
}
end
end