From 3fa9a5693d7417d75a8957cc0c8ff41347714fd1 Mon Sep 17 00:00:00 2001 From: Kazuki Yamaguchi Date: Thu, 22 Oct 2015 09:23:45 +0900 Subject: initial commit --- lib/plum/rack/config.rb | 24 ++++++++ lib/plum/rack/connection.rb | 140 ++++++++++++++++++++++++++++++++++++++++++++ lib/plum/rack/dsl.rb | 44 ++++++++++++++ lib/plum/rack/listener.rb | 59 +++++++++++++++++++ lib/plum/rack/server.rb | 55 +++++++++++++++++ lib/plum/rack/version.rb | 5 ++ 6 files changed, 327 insertions(+) create mode 100644 lib/plum/rack/config.rb create mode 100644 lib/plum/rack/connection.rb create mode 100644 lib/plum/rack/dsl.rb create mode 100644 lib/plum/rack/listener.rb create mode 100644 lib/plum/rack/server.rb create mode 100644 lib/plum/rack/version.rb (limited to 'lib/plum/rack') diff --git a/lib/plum/rack/config.rb b/lib/plum/rack/config.rb new file mode 100644 index 0000000..2f04886 --- /dev/null +++ b/lib/plum/rack/config.rb @@ -0,0 +1,24 @@ +module Plum + module Rack + class Config + DEFAULT_CONFIG = { + listeners: [], + debug: false, + log: nil, # $stdout + server_push: true + }.freeze + + def initialize(config) + @config = DEFAULT_CONFIG.merge(config) + end + + def [](key) + @config[key] + end + + def to_s + @config.to_s + end + end + end +end diff --git a/lib/plum/rack/connection.rb b/lib/plum/rack/connection.rb new file mode 100644 index 0000000..1100ae8 --- /dev/null +++ b/lib/plum/rack/connection.rb @@ -0,0 +1,140 @@ +module Plum + module Rack + class Connection + attr_reader :app, :sock, :plum + + def initialize(app, sock, logger) + @app = app + @sock = sock + @logger = logger + end + + def stop + @sock.close # TODO: gracefully shutdown + end + + def start + Thread.new { + begin + @sock = @sock.accept if @sock.respond_to?(:accept) # SSLSocket + @plum = setup_plum + @plum.run + rescue Errno::EPIPE, Errno::ECONNRESET => e + @logger.debug("connection closed: #{e}") + rescue StandardError => e + @logger.error("#{e.class}: #{e.message}\n#{e.backtrace.map { |b| "\t#{b}" }.join("\n")}") + end + } + end + + private + def setup_plum + plum = ::Plum::HTTPConnection.new(@sock) + plum.on(:connection_error) { |ex| @logger.error(ex) } + + plum.on(:stream) do |stream| + stream.on(:stream_error) { |ex| @logger.error(ex) } + + headers = data = nil + stream.on(:open) { + headers = nil + data = "".b + } + + stream.on(:headers) { |h| + @logger.debug("headers: " + h.map {|name, value| "#{name}: #{value}" }.join(" // ")) + headers = h + } + + stream.on(:data) { |d| + @logger.debug("data: #{d.bytesize}") + data << d # TODO: store to file? + } + + stream.on(:end_stream) { + env = new_env(headers, data) + r_headers, r_body = new_resp(@app.call(env)) + + if r_body.is_a?(::Rack::BodyProxy) + stream.respond(r_headers, end_stream: false) + r_body.each { |part| + stream.send_data(part, end_stream: false) + } + stream.send_data(nil) + else + stream.respond(r_headers, r_body) + end + } + end + + plum + end + + def new_env(h, data) + headers = h.group_by { |k, v| k }.map { |k, kvs| + if k == "cookie" + [k, kvs.map(&:last).join("; ")] + else + [k, kvs.first.last] + end + }.to_h + + cmethod = headers.delete(":method") + cpath = headers.delete(":path") + cpath_name, cpath_query = cpath.split("?", 2).map(&:to_s) + cauthority = headers.delete(":authority") + cscheme = headers.delete(":scheme") + ebase = { + "REQUEST_METHOD" => cmethod, + "SCRIPT_NAME" => "", + "PATH_INFO" => cpath_name, + "QUERY_STRING" => cpath_query.to_s, + "SERVER_NAME" => cauthority.split(":").first, + "SERVER_PORT" => (cauthority.split(":").last || 443), # TODO: forwarded header (RFC 7239) + } + + headers.each {|key, value| + ebase["HTTP_" + key.gsub("-", "_").upcase] = value + } + + ebase.merge!({ + "rack.version" => ::Rack::VERSION, + "rack.url_scheme" => cscheme, + "rack.input" => StringIO.new(data), + "rack.errors" => $stderr, + "rack.multithread" => true, + "rack.multiprocess" => false, + "rack.run_once" => false, + "rack.hijack?" => false, + }) + + ebase + end + + def new_resp(app_call) + r_status, r_h, r_body = app_call + + rbase = { + ":status" => r_status, + "server" => "plum/#{::Plum::VERSION}", + } + + r_h.each do |key, v_| + if key.start_with?("rack.") + next + end + + key = key.downcase.gsub(/^x-/, "") + vs = v_.split("\n") + if key == "set-cookie" + rbase[key] = vs.join("; ") # RFC 7540 8.1.2.5 + else + rbase[key] = vs.join(",") # RFC 7230 7 + end + end + + [rbase, r_body] + end + end + end +end diff --git a/lib/plum/rack/dsl.rb b/lib/plum/rack/dsl.rb new file mode 100644 index 0000000..f4ee850 --- /dev/null +++ b/lib/plum/rack/dsl.rb @@ -0,0 +1,44 @@ +module Plum + module Rack + module DSL + class Config + attr_reader :config + + def initialize + @config = ::Plum::Rack::Config::DEFAULT_CONFIG.dup + end + + def log(out) + if out.is_a?(String) + @config[:log] = File.open(out, "a") + else + @config[:log] = out + end + end + + def debug(bool) + @config[:debug] = !!bool + end + + def listener(type, conf) + case type + when :unix + lc = conf.merge(listener: UNIXListener) + when :tcp + lc = conf.merge(listener: TCPListener) + when :tls + lc = conf.merge(listener: TLSListener) + else + raise "Unknown listener type: #{type} (known type: :unix, :http, :https)" + end + + @config[:listeners] << lc + end + + def server_push(bool) + @config[:server_push] = !!bool + end + end + end + end +end diff --git a/lib/plum/rack/listener.rb b/lib/plum/rack/listener.rb new file mode 100644 index 0000000..e1dde02 --- /dev/null +++ b/lib/plum/rack/listener.rb @@ -0,0 +1,59 @@ +module Plum + module Rack + class BaseListener + def stop + @server.close + end + + def to_io + raise "not implemented" + end + + def accept + to_io.accept + end + end + + class TCPListener < BaseListener + def initialize(lc) + @server = ::TCPServer.new(lc[:hostname], lc[:port]) + end + + def to_io + @server.to_io + end + end + + class TLSListener < BaseListener + def initialize(hostname, port, cert, key) + ctx = OpenSSL::SSL::SSLContext.new + ctx.ssl_version = :TLSv1_2 + ctx.alpn_select_cb = -> protocols { + raise "Client does not support HTTP/2: #{protocols}" unless protocols.include?("h2") + "h2" + } + ctx.tmp_ecdh_callback = -> (sock, ise, keyl) { OpenSSL::PKey::EC.new("prime256v1") } + ctx.cert = OpenSSL::X509::Certificate.new(cert) + ctx.key = OpenSSL::PKey::RSA.new(key) + tcp_server = ::TCPServer.new(hostname, port) + @server = OpenSSL::SSL::SSLServer.new(tcp_server, ctx) + @server.start_immediately = false + end + + def to_io + @server.to_io + end + end + + class UNIXListener < BaseListener + def initialize(path, permission, user, group) + @server = ::UNIXServer.new(path) + # TODO: set permission, user, group + end + + def to_io + @server.to_io + end + end + end +end diff --git a/lib/plum/rack/server.rb b/lib/plum/rack/server.rb new file mode 100644 index 0000000..15cb10a --- /dev/null +++ b/lib/plum/rack/server.rb @@ -0,0 +1,55 @@ +module Plum + module Rack + class Server + def initialize(app, config) + @state = :null + @app = app + @logger = Logger.new(config[:log] || $stdout).tap { |l| + l.level = config[:debug] ? Logger::DEBUG : Logger::INFO + } + @listeners = config[:listeners].map { |lc| + lc[:listener].new(lc) + } + + @logger.info("Plum::Rack #{::Plum::Rack::VERSION} (Plum #{::Plum::VERSION})") + @logger.info("Config: #{config}") + end + + def start + @state = :running + while @state == :running + break if @listeners.empty? + begin + if ss = IO.select(@listeners, nil, nil, 2.0) + ss[0].each { |svr| + new_con(svr) + } + end + rescue Errno::EBADF, Errno::ENOTSOCK, IOError => e # closed + @logger.debug("socket closed?: #{e}") + rescue StandardError => e + @logger.error("#{e.class}: #{e.message}\n#{e.backtrace.map { |b| "\t#{b}" }.join("\n")}") + end + end + end + + def stop + @state = :stop + @listeners.map(&:stop) + end + + private + def new_con(svr) + sock = svr.accept + @logger.debug("accept: #{sock}") + + con = Connection.new(@app, sock, @logger) + con.start + rescue Errno::ECONNRESET, Errno::ECONNABORTED, Errno::EPROTO, Errno::EINVAL => e # closed + @logger.debug("connection closed while accepting: #{e}") + rescue StandardError => e + @logger.error("#{e.class}: #{e.message}\n#{e.backtrace.map { |b| "\t#{b}" }.join("\n")}") + end + end + end +end diff --git a/lib/plum/rack/version.rb b/lib/plum/rack/version.rb new file mode 100644 index 0000000..d968fe8 --- /dev/null +++ b/lib/plum/rack/version.rb @@ -0,0 +1,5 @@ +module Plum + module Rack + VERSION = "0.0.1" + end +end -- cgit v1.2.3 From 8aae57e6a3ea2d2367e9b4a82aa3af46c9f05720 Mon Sep 17 00:00:00 2001 From: Kazuki Yamaguchi Date: Thu, 22 Oct 2015 10:19:08 +0900 Subject: make TLS default --- lib/plum/rack/connection.rb | 11 ++++---- lib/plum/rack/listener.rb | 68 ++++++++++++++++++++++++++++++++++++++++++--- lib/rack/handler/plum.rb | 2 +- 3 files changed, 70 insertions(+), 11 deletions(-) (limited to 'lib/plum/rack') diff --git a/lib/plum/rack/connection.rb b/lib/plum/rack/connection.rb index 1100ae8..23314bd 100644 --- a/lib/plum/rack/connection.rb +++ b/lib/plum/rack/connection.rb @@ -1,22 +1,21 @@ module Plum module Rack class Connection - attr_reader :app, :sock, :plum + attr_reader :app, :listener, :plum - def initialize(app, sock, logger) + def initialize(app, listener, logger) @app = app - @sock = sock + @listener = listener @logger = logger end def stop - @sock.close # TODO: gracefully shutdown + @listener.close # TODO: gracefully shutdown end def start Thread.new { begin - @sock = @sock.accept if @sock.respond_to?(:accept) # SSLSocket @plum = setup_plum @plum.run rescue Errno::EPIPE, Errno::ECONNRESET => e @@ -29,7 +28,7 @@ module Plum private def setup_plum - plum = ::Plum::HTTPConnection.new(@sock) + plum = @listener.plum plum.on(:connection_error) { |ex| @logger.error(ex) } plum.on(:stream) do |stream| diff --git a/lib/plum/rack/listener.rb b/lib/plum/rack/listener.rb index e1dde02..d39aad1 100644 --- a/lib/plum/rack/listener.rb +++ b/lib/plum/rack/listener.rb @@ -10,7 +10,16 @@ module Plum end def accept - to_io.accept + @sock = to_io.accept + self + end + + def method_missing(name, *args) + if @sock + @sock.__send__(name, *args) + else + @server.__send__(name, *args) + end end end @@ -22,10 +31,20 @@ module Plum def to_io @server.to_io end + + def plum + ::Plum::HTTPConnection.new(self) + end end class TLSListener < BaseListener - def initialize(hostname, port, cert, key) + def initialize(lc) + cert, key = lc[:certificate], lc[:certificate_key] + unless cert && key + puts "WARNING: using dummy certificate" + cert, key = dummy_key + end + ctx = OpenSSL::SSL::SSLContext.new ctx.ssl_version = :TLSv1_2 ctx.alpn_select_cb = -> protocols { @@ -35,14 +54,51 @@ module Plum ctx.tmp_ecdh_callback = -> (sock, ise, keyl) { OpenSSL::PKey::EC.new("prime256v1") } ctx.cert = OpenSSL::X509::Certificate.new(cert) ctx.key = OpenSSL::PKey::RSA.new(key) - tcp_server = ::TCPServer.new(hostname, port) + tcp_server = ::TCPServer.new(lc[:hostname], lc[:port]) @server = OpenSSL::SSL::SSLServer.new(tcp_server, ctx) - @server.start_immediately = false + @server.start_immediately = true # TODO end def to_io @server.to_io end + + def accept + @sock = @server.accept + self + end + + def plum + ::Plum::HTTPSConnection.new(self) + end + + private + # returns: [cert, key] + def dummy_key + key = OpenSSL::PKey::RSA.new(2048) + public_key = key.public_key + + cert = OpenSSL::X509::Certificate.new + cert.subject = cert.issuer = OpenSSL::X509::Name.parse("/C=JP/O=Test/OU=Test/CN=example.com") + cert.not_before = Time.now + cert.not_after = Time.now + 363 * 24 * 60 * 60 + cert.public_key = public_key + cert.serial = 0x0 + cert.version = 2 + + ef = OpenSSL::X509::ExtensionFactory.new + ef.subject_certificate = cert + ef.issuer_certificate = cert + cert.extensions = [ + ef.create_extension("basicConstraints","CA:TRUE", true), + ef.create_extension("subjectKeyIdentifier", "hash"), + ] + cert.add_extension ef.create_extension("authorityKeyIdentifier", "keyid:always,issuer:always") + + cert.sign key, OpenSSL::Digest::SHA1.new + + [cert, key] + end end class UNIXListener < BaseListener @@ -54,6 +110,10 @@ module Plum def to_io @server.to_io end + + def plum + ::Plum::HTTPConnection.new(self) + end end end end diff --git a/lib/rack/handler/plum.rb b/lib/rack/handler/plum.rb index 599a3e1..1052592 100644 --- a/lib/rack/handler/plum.rb +++ b/lib/rack/handler/plum.rb @@ -7,7 +7,7 @@ module Rack config = ::Plum::Rack::Config.new( listeners: [ { - listener: ::Plum::Rack::TCPListener, + listener: ::Plum::Rack::TLSListener, hostname: opts[:Host], port: opts[:Port].to_i } -- cgit v1.2.3 From 3075a3e1432158f0e5b29d035affb67e50b1226b Mon Sep 17 00:00:00 2001 From: Kazuki Yamaguchi Date: Thu, 22 Oct 2015 20:44:08 +0900 Subject: connection: fix 'ThreadError: deadlock; recursive locking': close BodyProxy after use --- lib/plum/rack/connection.rb | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) (limited to 'lib/plum/rack') diff --git a/lib/plum/rack/connection.rb b/lib/plum/rack/connection.rb index 23314bd..cae7080 100644 --- a/lib/plum/rack/connection.rb +++ b/lib/plum/rack/connection.rb @@ -55,11 +55,15 @@ module Plum r_headers, r_body = new_resp(@app.call(env)) if r_body.is_a?(::Rack::BodyProxy) - stream.respond(r_headers, end_stream: false) - r_body.each { |part| - stream.send_data(part, end_stream: false) - } - stream.send_data(nil) + begin + stream.respond(r_headers, end_stream: false) + r_body.each { |part| + stream.send_data(part, end_stream: false) + } + stream.send_data(nil) + ensure + r_body.close + end else stream.respond(r_headers, r_body) end -- cgit v1.2.3 From b579a2e34e8cc2bb26250d954af6ed24fadeda45 Mon Sep 17 00:00:00 2001 From: Kazuki Yamaguchi Date: Thu, 22 Oct 2015 20:44:42 +0900 Subject: server: wrap the passed app with Rack::CommonLogger if debug mode --- lib/plum/rack/server.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'lib/plum/rack') diff --git a/lib/plum/rack/server.rb b/lib/plum/rack/server.rb index 15cb10a..1e444c0 100644 --- a/lib/plum/rack/server.rb +++ b/lib/plum/rack/server.rb @@ -3,7 +3,7 @@ module Plum class Server def initialize(app, config) @state = :null - @app = app + @app = config[:debug] ? ::Rack::CommonLogger.new(app) : app @logger = Logger.new(config[:log] || $stdout).tap { |l| l.level = config[:debug] ? Logger::DEBUG : Logger::INFO } -- cgit v1.2.3 From c35bd14ca3d8d892d1c7509c57a2f429acce9e23 Mon Sep 17 00:00:00 2001 From: Kazuki Yamaguchi Date: Thu, 22 Oct 2015 20:44:52 +0900 Subject: reduce debug logging --- lib/plum/rack/connection.rb | 3 --- lib/plum/rack/server.rb | 2 -- 2 files changed, 5 deletions(-) (limited to 'lib/plum/rack') diff --git a/lib/plum/rack/connection.rb b/lib/plum/rack/connection.rb index cae7080..f84ec5b 100644 --- a/lib/plum/rack/connection.rb +++ b/lib/plum/rack/connection.rb @@ -19,7 +19,6 @@ module Plum @plum = setup_plum @plum.run rescue Errno::EPIPE, Errno::ECONNRESET => e - @logger.debug("connection closed: #{e}") rescue StandardError => e @logger.error("#{e.class}: #{e.message}\n#{e.backtrace.map { |b| "\t#{b}" }.join("\n")}") end @@ -41,12 +40,10 @@ module Plum } stream.on(:headers) { |h| - @logger.debug("headers: " + h.map {|name, value| "#{name}: #{value}" }.join(" // ")) headers = h } stream.on(:data) { |d| - @logger.debug("data: #{d.bytesize}") data << d # TODO: store to file? } diff --git a/lib/plum/rack/server.rb b/lib/plum/rack/server.rb index 1e444c0..f942df4 100644 --- a/lib/plum/rack/server.rb +++ b/lib/plum/rack/server.rb @@ -26,7 +26,6 @@ module Plum } end rescue Errno::EBADF, Errno::ENOTSOCK, IOError => e # closed - @logger.debug("socket closed?: #{e}") rescue StandardError => e @logger.error("#{e.class}: #{e.message}\n#{e.backtrace.map { |b| "\t#{b}" }.join("\n")}") end @@ -46,7 +45,6 @@ module Plum con = Connection.new(@app, sock, @logger) con.start rescue Errno::ECONNRESET, Errno::ECONNABORTED, Errno::EPROTO, Errno::EINVAL => e # closed - @logger.debug("connection closed while accepting: #{e}") rescue StandardError => e @logger.error("#{e.class}: #{e.message}\n#{e.backtrace.map { |b| "\t#{b}" }.join("\n")}") end -- cgit v1.2.3 From f385e1d5770ca5063f2420ed24a3f8c91363cd62 Mon Sep 17 00:00:00 2001 From: Kazuki Yamaguchi Date: Thu, 22 Oct 2015 20:49:40 +0900 Subject: tlslistener: use random number for dummy certificate's serial number --- lib/plum/rack/listener.rb | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'lib/plum/rack') diff --git a/lib/plum/rack/listener.rb b/lib/plum/rack/listener.rb index d39aad1..5127eb3 100644 --- a/lib/plum/rack/listener.rb +++ b/lib/plum/rack/listener.rb @@ -75,15 +75,15 @@ module Plum private # returns: [cert, key] def dummy_key - key = OpenSSL::PKey::RSA.new(2048) - public_key = key.public_key + puts "WARNING: Generating new dummy certificate..." + key = OpenSSL::PKey::RSA.new(2048) cert = OpenSSL::X509::Certificate.new cert.subject = cert.issuer = OpenSSL::X509::Name.parse("/C=JP/O=Test/OU=Test/CN=example.com") cert.not_before = Time.now cert.not_after = Time.now + 363 * 24 * 60 * 60 - cert.public_key = public_key - cert.serial = 0x0 + cert.public_key = key.public_key + cert.serial = rand((1 << 20) - 1) cert.version = 2 ef = OpenSSL::X509::ExtensionFactory.new -- cgit v1.2.3 From b6de76aa184bae4637fa4c1beb6bf296d97c7679 Mon Sep 17 00:00:00 2001 From: Kazuki Yamaguchi Date: Fri, 23 Oct 2015 09:59:39 +0900 Subject: set start_immediately=false for SSLServer --- lib/plum/rack/connection.rb | 46 ++++++++++++++++++++++----------------------- lib/plum/rack/listener.rb | 30 ++++++++--------------------- lib/plum/rack/server.rb | 19 +++++++++++++++---- lib/rack/handler/plum.rb | 5 +++-- 4 files changed, 49 insertions(+), 51 deletions(-) (limited to 'lib/plum/rack') diff --git a/lib/plum/rack/connection.rb b/lib/plum/rack/connection.rb index 23314bd..52e7117 100644 --- a/lib/plum/rack/connection.rb +++ b/lib/plum/rack/connection.rb @@ -1,37 +1,35 @@ module Plum module Rack class Connection - attr_reader :app, :listener, :plum + attr_reader :app, :sock, :plum - def initialize(app, listener, logger) + def initialize(app, plum, logger) @app = app - @listener = listener + @plum = plum @logger = logger + + setup_plum end def stop - @listener.close # TODO: gracefully shutdown + @plum.stop end - def start - Thread.new { - begin - @plum = setup_plum - @plum.run - rescue Errno::EPIPE, Errno::ECONNRESET => e - @logger.debug("connection closed: #{e}") - rescue StandardError => e - @logger.error("#{e.class}: #{e.message}\n#{e.backtrace.map { |b| "\t#{b}" }.join("\n")}") - end - } + def run + begin + @plum.run + rescue Errno::EPIPE, Errno::ECONNRESET => e + @logger.debug("connection closed: #{e}") + rescue StandardError => e + @logger.error("#{e.class}: #{e.message}\n#{e.backtrace.map { |b| "\t#{b}" }.join("\n")}") + end end private def setup_plum - plum = @listener.plum - plum.on(:connection_error) { |ex| @logger.error(ex) } + @plum.on(:connection_error) { |ex| @logger.error(ex) } - plum.on(:stream) do |stream| + @plum.on(:stream) do |stream| stream.on(:stream_error) { |ex| @logger.error(ex) } headers = data = nil @@ -56,17 +54,19 @@ module Plum if r_body.is_a?(::Rack::BodyProxy) stream.respond(r_headers, end_stream: false) - r_body.each { |part| - stream.send_data(part, end_stream: false) - } + begin + r_body.each { |part| + stream.send_data(part, end_stream: false) + } + ensure + r_body.close + end stream.send_data(nil) else stream.respond(r_headers, r_body) end } end - - plum end def new_env(h, data) diff --git a/lib/plum/rack/listener.rb b/lib/plum/rack/listener.rb index d39aad1..87d6179 100644 --- a/lib/plum/rack/listener.rb +++ b/lib/plum/rack/listener.rb @@ -9,17 +9,8 @@ module Plum raise "not implemented" end - def accept - @sock = to_io.accept - self - end - def method_missing(name, *args) - if @sock - @sock.__send__(name, *args) - else - @server.__send__(name, *args) - end + @server.__send__(name, *args) end end @@ -32,8 +23,8 @@ module Plum @server.to_io end - def plum - ::Plum::HTTPConnection.new(self) + def plum(sock) + ::Plum::HTTPConnection.new(sock) end end @@ -56,20 +47,15 @@ module Plum ctx.key = OpenSSL::PKey::RSA.new(key) tcp_server = ::TCPServer.new(lc[:hostname], lc[:port]) @server = OpenSSL::SSL::SSLServer.new(tcp_server, ctx) - @server.start_immediately = true # TODO + @server.start_immediately = false end def to_io @server.to_io end - def accept - @sock = @server.accept - self - end - - def plum - ::Plum::HTTPSConnection.new(self) + def plum(sock) + ::Plum::HTTPSConnection.new(sock) end private @@ -111,8 +97,8 @@ module Plum @server.to_io end - def plum - ::Plum::HTTPConnection.new(self) + def plum(sock) + ::Plum::HTTPSConnection.new(sock) end end end diff --git a/lib/plum/rack/server.rb b/lib/plum/rack/server.rb index 15cb10a..2a72d6e 100644 --- a/lib/plum/rack/server.rb +++ b/lib/plum/rack/server.rb @@ -36,17 +36,28 @@ module Plum def stop @state = :stop @listeners.map(&:stop) + # TODO: gracefully shutdown connections end private def new_con(svr) sock = svr.accept - @logger.debug("accept: #{sock}") + Thread.new { + begin + sock = sock.accept if sock.respond_to?(:accept) + plum = svr.plum(sock) + @logger.debug("accept: #{plum}") - con = Connection.new(@app, sock, @logger) - con.start + con = Connection.new(@app, plum, @logger) + con.run + rescue Errno::ECONNRESET, Errno::ECONNABORTED, Errno::EPROTO, Errno::EINVAL => e # closed + sock.close if sock + rescue StandardError => e + @logger.error("#{e.class}: #{e.message}\n#{e.backtrace.map { |b| "\t#{b}" }.join("\n")}") + sock.close if sock + end + } rescue Errno::ECONNRESET, Errno::ECONNABORTED, Errno::EPROTO, Errno::EINVAL => e # closed - @logger.debug("connection closed while accepting: #{e}") rescue StandardError => e @logger.error("#{e.class}: #{e.message}\n#{e.backtrace.map { |b| "\t#{b}" }.join("\n")}") end diff --git a/lib/rack/handler/plum.rb b/lib/rack/handler/plum.rb index 1052592..c3ca543 100644 --- a/lib/rack/handler/plum.rb +++ b/lib/rack/handler/plum.rb @@ -31,10 +31,11 @@ module Rack private def self.default_options rack_env = ENV["RACK_ENV"] || "development" + dev = rack_env == "development" default_options = { - Host: rack_env == "development" ? "localhost" : "0.0.0.0", + Host: dev ? "localhost" : "0.0.0.0", Port: 8080, - Debug: true, + Debug: dev, } end end -- cgit v1.2.3 From b750b4fdca59fa63000847d0daece6d0e28ef874 Mon Sep 17 00:00:00 2001 From: Kazuki Yamaguchi Date: Fri, 23 Oct 2015 11:29:41 +0900 Subject: support server push --- lib/plum/rack/connection.rb | 86 ++++++++++++++++++++++++++++++++------------- 1 file changed, 61 insertions(+), 25 deletions(-) (limited to 'lib/plum/rack') diff --git a/lib/plum/rack/connection.rb b/lib/plum/rack/connection.rb index 52e7117..17bdbc9 100644 --- a/lib/plum/rack/connection.rb +++ b/lib/plum/rack/connection.rb @@ -49,26 +49,62 @@ module Plum } stream.on(:end_stream) { - env = new_env(headers, data) - r_headers, r_body = new_resp(@app.call(env)) - - if r_body.is_a?(::Rack::BodyProxy) - stream.respond(r_headers, end_stream: false) - begin - r_body.each { |part| - stream.send_data(part, end_stream: false) - } - ensure - r_body.close - end - stream.send_data(nil) - else - stream.respond(r_headers, r_body) - end + handle_request(stream, headers, data) } end end + def send_body(stream, body) + if body.is_a?(::Rack::BodyProxy) + begin + body.each { |part| + stream.send_data(part, end_stream: false) + } + ensure + body.close + end + stream.send_data(nil, end_stream: true) + else + stream.send_data(body, end_stream: true) + end + end + + def extract_push(r_rawheaders) + _, pushs = r_rawheaders.find { |k, v| k == "plum.serverpush" } + if pushs + pushs.split(";").map { |push| push.split(" ", 2) } + else + [] + end + end + + def handle_request(stream, headers, data) + env = new_env(headers, data) + r_status, r_rawheaders, r_body = @app.call(env) + r_headers = extract_headers(r_status, r_rawheaders) + r_topushs = extract_push(r_rawheaders) + + stream.send_headers(r_headers, end_stream: false) + r_pushstreams = r_topushs.map { |method, path| + preq = { ":authority" => headers.find { |k, v| k == ":authority" }[1], + ":method" => method.to_s.upcase, + ":scheme" => headers.find { |k, v| k == ":scheme" }[1], + ":path" => path } + st = stream.promise(preq) + [st, preq] + } + + send_body(stream, r_body) + + r_pushstreams.each { |st, preq| + penv = new_env(preq, "") + p_status, p_h, p_body = @app.call(penv) + p_headers = extract_headers(p_status, p_h) + st.send_headers(p_headers, end_stream: false) + send_body(st, p_body) + } + end + def new_env(h, data) headers = h.group_by { |k, v| k }.map { |k, kvs| if k == "cookie" @@ -78,11 +114,11 @@ module Plum end }.to_h - cmethod = headers.delete(":method") - cpath = headers.delete(":path") + cmethod = headers[":method"] + cpath = headers[":path"] cpath_name, cpath_query = cpath.split("?", 2).map(&:to_s) - cauthority = headers.delete(":authority") - cscheme = headers.delete(":scheme") + cauthority = headers[":authority"] + cscheme = headers[":scheme"] ebase = { "REQUEST_METHOD" => cmethod, "SCRIPT_NAME" => "", @@ -93,7 +129,9 @@ module Plum } headers.each {|key, value| - ebase["HTTP_" + key.gsub("-", "_").upcase] = value + unless key.start_with?(":") && key.include?(".") + ebase["HTTP_" + key.gsub("-", "_").upcase] = value + end } ebase.merge!({ @@ -110,9 +148,7 @@ module Plum ebase end - def new_resp(app_call) - r_status, r_h, r_body = app_call - + def extract_headers(r_status, r_h) rbase = { ":status" => r_status, "server" => "plum/#{::Plum::VERSION}", @@ -132,7 +168,7 @@ module Plum end end - [rbase, r_body] + rbase end end end -- cgit v1.2.3 From 085c88260516d398ad44acb53617dd669dad53fe Mon Sep 17 00:00:00 2001 From: Kazuki Yamaguchi Date: Sun, 25 Oct 2015 11:11:45 +0900 Subject: improve performance --- examples/rack.ru | 22 +++++++++++++++ examples/sinatra.rb | 14 --------- lib/plum/rack/connection.rb | 69 ++++++++++++++++++++++----------------------- lib/rack/handler/plum.rb | 6 +++- 4 files changed, 61 insertions(+), 50 deletions(-) create mode 100644 examples/rack.ru delete mode 100644 examples/sinatra.rb (limited to 'lib/plum/rack') diff --git a/examples/rack.ru b/examples/rack.ru new file mode 100644 index 0000000..7558c80 --- /dev/null +++ b/examples/rack.ru @@ -0,0 +1,22 @@ +$LOAD_PATH << File.expand_path("../../lib", __FILE__) +require "plum/rack" + +class App2 + def call(env) + if env["REQUEST_METHOD"] == "GET" && env["PATH_INFO"] == "/" + [ + 200, + { "Content-Type" => "text/html" }, + ["Hello World!"] + ] + else + [ + 404, + { "Content-Type" => "text/html" }, + [""] + ] + end + end +end + +run App2.new diff --git a/examples/sinatra.rb b/examples/sinatra.rb deleted file mode 100644 index 83c2113..0000000 --- a/examples/sinatra.rb +++ /dev/null @@ -1,14 +0,0 @@ -$LOAD_PATH << File.expand_path("../../lib", __FILE__) -require "plum/rack" -require "sinatra" - -set :server, :plum -enable :logging, :dump_errors, :raise_errors - -get "/" do - "get: #{params}" -end - -post "/" do - "post: " + params.to_s -end diff --git a/lib/plum/rack/connection.rb b/lib/plum/rack/connection.rb index 2ab33e5..400a098 100644 --- a/lib/plum/rack/connection.rb +++ b/lib/plum/rack/connection.rb @@ -34,7 +34,7 @@ module Plum headers = data = nil stream.on(:open) { headers = nil - data = "".b + data = "".force_encoding(Encoding::BINARY) } stream.on(:headers) { |h| @@ -103,44 +103,43 @@ module Plum end def new_env(h, data) - headers = h.group_by { |k, v| k }.map { |k, kvs| - if k == "cookie" - [k, kvs.map(&:last).join("; ")] - else - [k, kvs.first.last] - end - }.to_h - - cmethod = headers[":method"] - cpath = headers[":path"] - cpath_name, cpath_query = cpath.split("?", 2).map(&:to_s) - cauthority = headers[":authority"] - cscheme = headers[":scheme"] ebase = { - "REQUEST_METHOD" => cmethod, "SCRIPT_NAME" => "", - "PATH_INFO" => cpath_name, - "QUERY_STRING" => cpath_query.to_s, - "SERVER_NAME" => cauthority.split(":").first, - "SERVER_PORT" => (cauthority.split(":").last || 443), # TODO: forwarded header (RFC 7239) - } - - headers.each {|key, value| - unless key.start_with?(":") && key.include?(".") - ebase["HTTP_" + key.gsub("-", "_").upcase] = value - end - } - - ebase.merge!({ "rack.version" => ::Rack::VERSION, - "rack.url_scheme" => cscheme, "rack.input" => StringIO.new(data), "rack.errors" => $stderr, "rack.multithread" => true, "rack.multiprocess" => false, "rack.run_once" => false, "rack.hijack?" => false, - }) + } + + h.each { |k, v| + case k + when ":method" + ebase["REQUEST_METHOD"] = v + when ":path" + cpath_name, cpath_query = v.split("?", 2) + ebase["PATH_INFO"] = cpath_name + ebase["QUERY_STRING"] = cpath_query || "" + when ":authority" + chost, cport = v.split(":", 2) + ebase["SERVER_NAME"] = chost + ebase["SERVER_PORT"] = (cport || 443).to_i + when ":scheme" + ebase["rack.url_scheme"] = v + else + if k.start_with?(":") + # unknown HTTP/2 pseudo-headers + else + if "cookie" == k && headers["HTTP_COOKIE"] + ebase["HTTP_COOKIE"] << "; " << v + else + ebase["HTTP_" << k.tr("-", "_").upcase!] = v + end + end + end + } ebase end @@ -156,12 +155,12 @@ module Plum next end - key = key.downcase.gsub(/^x-/, "") - vs = v_.split("\n") - if key == "set-cookie" - rbase[key] = vs.join("; ") # RFC 7540 8.1.2.5 + key = key.downcase + if "set-cookie".freeze == key + rbase[key] = v_.gsub("\n", "; ") # RFC 7540 8.1.2.5 else - rbase[key] = vs.join(",") # RFC 7230 7 + key = key.byteshift(2) if key.start_with?("x-") + rbase[key] = v_.tr("\n", ",") # RFC 7230 7 end end diff --git a/lib/rack/handler/plum.rb b/lib/rack/handler/plum.rb index c3ca543..30eed72 100644 --- a/lib/rack/handler/plum.rb +++ b/lib/rack/handler/plum.rb @@ -16,10 +16,14 @@ module Rack ) @server = ::Plum::Rack::Server.new(app, config) - yield @server if block_given? + yield @server if block_given? # TODO @server.start end + def self.shutdown + @server.stop if @server + end + def self.valid_options { "Host=HOST" => "Hostname to listen on (default: #{default_options[:Host]})", -- cgit v1.2.3 From 7e998a7ae6826ebf2a50aa763b4d84f0082917ec Mon Sep 17 00:00:00 2001 From: Kazuki Yamaguchi Date: Sun, 25 Oct 2015 18:47:37 +0900 Subject: better performance --- examples/rack.ru | 2 +- lib/plum/rack/connection.rb | 88 ++++++++++++++++++++++----------------------- lib/plum/rack/server.rb | 8 +++-- 3 files changed, 49 insertions(+), 49 deletions(-) (limited to 'lib/plum/rack') diff --git a/examples/rack.ru b/examples/rack.ru index 7558c80..67462e9 100644 --- a/examples/rack.ru +++ b/examples/rack.ru @@ -7,7 +7,7 @@ class App2 [ 200, { "Content-Type" => "text/html" }, - ["Hello World!"] + ["*10 bytes*"*400] ] else [ diff --git a/lib/plum/rack/connection.rb b/lib/plum/rack/connection.rb index 400a098..45cde79 100644 --- a/lib/plum/rack/connection.rb +++ b/lib/plum/rack/connection.rb @@ -1,7 +1,7 @@ module Plum module Rack class Connection - attr_reader :app, :sock, :plum + attr_reader :app, :plum def initialize(app, plum, logger) @app = app @@ -12,7 +12,7 @@ module Plum end def stop - @plum.stop + @plum.close end def run @@ -28,47 +28,43 @@ module Plum def setup_plum @plum.on(:connection_error) { |ex| @logger.error(ex) } - @plum.on(:stream) do |stream| - stream.on(:stream_error) { |ex| @logger.error(ex) } + # @plum.on(:stream) { |stream| @logger.debug("new stream: #{stream}") } + @plum.on(:stream_error) { |stream, ex| @logger.error(ex) } - headers = data = nil - stream.on(:open) { - headers = nil - data = "".force_encoding(Encoding::BINARY) - } - - stream.on(:headers) { |h| - headers = h - } + reqs = {} + @plum.on(:headers) { |stream, h| + reqs[stream] = { headers: h, data: "".force_encoding(Encoding::BINARY) } + } - stream.on(:data) { |d| - data << d # TODO: store to file? - } + @plum.on(:data) { |stream, d| + reqs[stream][:data] << d # TODO: store to file? + } - stream.on(:end_stream) { - handle_request(stream, headers, data) - } - end + @plum.on(:end_stream) { |stream| + handle_request(stream, reqs[stream][:headers], reqs[stream][:data]) + } end def send_body(stream, body) - if body.is_a?(::Rack::BodyProxy) - begin - body.each { |part| - stream.send_data(part, end_stream: false) + begin + if body.is_a?(Array) + last = body.size - 1 + body.each_with_index { |part, i| + stream.send_data(part, end_stream: last == i) } - ensure - body.close + elsif body.is_a?(IO) + stream.send_data(body, end_stream: true) + else + body.each { |part| stream.send_data(part, end_stream: false) } + stream.send_data(nil, end_stream: true) end - stream.send_data(nil, end_stream: true) - else - stream.send_data(body, end_stream: true) + ensure + body.close if body.respond_to?(:close) end end - def extract_push(r_rawheaders) - _, pushs = r_rawheaders.find { |k, v| k == "plum.serverpush" } - if pushs + def extract_push(r_extheaders) + if pushs = r_extheaders["plum.serverpush"] pushs.split(";").map { |push| push.split(" ", 2) } else [] @@ -78,8 +74,8 @@ module Plum def handle_request(stream, headers, data) env = new_env(headers, data) r_status, r_rawheaders, r_body = @app.call(env) - r_headers = extract_headers(r_status, r_rawheaders) - r_topushs = extract_push(r_rawheaders) + r_headers, r_extheaders = extract_headers(r_status, r_rawheaders) + r_topushs = extract_push(r_extheaders) stream.send_headers(r_headers, end_stream: false) r_pushstreams = r_topushs.map { |method, path| @@ -132,7 +128,7 @@ module Plum if k.start_with?(":") # unknown HTTP/2 pseudo-headers else - if "cookie" == k && headers["HTTP_COOKIE"] + if "cookie" == k && ebase["HTTP_COOKIE"] ebase["HTTP_COOKIE"] << "; " << v else ebase["HTTP_" << k.tr("-", "_").upcase!] = v @@ -149,22 +145,24 @@ module Plum ":status" => r_status, "server" => "plum/#{::Plum::VERSION}", } + rext = {} r_h.each do |key, v_| - if key.start_with?("rack.") - next - end - - key = key.downcase - if "set-cookie".freeze == key - rbase[key] = v_.gsub("\n", "; ") # RFC 7540 8.1.2.5 + if key.include?(".") + rext[key] = v_ else - key = key.byteshift(2) if key.start_with?("x-") - rbase[key] = v_.tr("\n", ",") # RFC 7230 7 + key = key.downcase + + if "set-cookie" == key + rbase[key] = v_.gsub("\n", "; ") # RFC 7540 8.1.2.5 + else + key = key.byteshift(2) if key.start_with?("x-") + rbase[key] = v_.tr("\n", ",") # RFC 7230 7 + end end end - rbase + [rbase, rext] end end end diff --git a/lib/plum/rack/server.rb b/lib/plum/rack/server.rb index 496a375..8dda95e 100644 --- a/lib/plum/rack/server.rb +++ b/lib/plum/rack/server.rb @@ -45,10 +45,12 @@ module Plum begin sock = sock.accept if sock.respond_to?(:accept) plum = svr.plum(sock) - @logger.debug("accept: #{plum}") - con = Connection.new(@app, plum, @logger) - con.run + #require "lineprof" + #Lineprof.profile(/plum/) { + con = Connection.new(@app, plum, @logger) + con.run + #} rescue Errno::ECONNRESET, Errno::ECONNABORTED, Errno::EPROTO, Errno::EINVAL => e # closed sock.close if sock rescue StandardError => e -- cgit v1.2.3