diff options
author | Kazuki Yamaguchi <k@rhe.jp> | 2015-10-25 19:05:43 +0900 |
---|---|---|
committer | Kazuki Yamaguchi <k@rhe.jp> | 2015-10-25 19:05:43 +0900 |
commit | 7f176b380f88494123a4a51346ade28c4da51c00 (patch) | |
tree | 8293401abe3676d88e3236ea168c22137e79a63e /lib | |
parent | 4dc76aeab391e29d0df33f58651ddc8232e262cb (diff) | |
parent | 7e998a7ae6826ebf2a50aa763b4d84f0082917ec (diff) | |
download | plum-7f176b380f88494123a4a51346ade28c4da51c00.tar.gz |
Merge remote-tracking branch 'plum-rack/master'
Diffstat (limited to 'lib')
-rw-r--r-- | lib/plum/rack.rb | 10 | ||||
-rw-r--r-- | lib/plum/rack/config.rb | 24 | ||||
-rw-r--r-- | lib/plum/rack/connection.rb | 169 | ||||
-rw-r--r-- | lib/plum/rack/dsl.rb | 44 | ||||
-rw-r--r-- | lib/plum/rack/listener.rb | 105 | ||||
-rw-r--r-- | lib/plum/rack/server.rb | 67 | ||||
-rw-r--r-- | lib/rack/handler/plum.rb | 49 |
7 files changed, 468 insertions, 0 deletions
diff --git a/lib/plum/rack.rb b/lib/plum/rack.rb new file mode 100644 index 0000000..da0ae8a --- /dev/null +++ b/lib/plum/rack.rb @@ -0,0 +1,10 @@ +require "logger" +require "stringio" +require "plum" +require "rack" +require "rack/handler/plum" +require "plum/rack/config" +require "plum/rack/dsl" +require "plum/rack/listener" +require "plum/rack/server" +require "plum/rack/connection" diff --git a/lib/plum/rack/config.rb b/lib/plum/rack/config.rb new file mode 100644 index 0000000..2f04886 --- /dev/null +++ b/lib/plum/rack/config.rb @@ -0,0 +1,24 @@ +module Plum + module Rack + class Config + DEFAULT_CONFIG = { + listeners: [], + debug: false, + log: nil, # $stdout + server_push: true + }.freeze + + def initialize(config) + @config = DEFAULT_CONFIG.merge(config) + end + + def [](key) + @config[key] + end + + def to_s + @config.to_s + end + end + end +end diff --git a/lib/plum/rack/connection.rb b/lib/plum/rack/connection.rb new file mode 100644 index 0000000..45cde79 --- /dev/null +++ b/lib/plum/rack/connection.rb @@ -0,0 +1,169 @@ +module Plum + module Rack + class Connection + attr_reader :app, :plum + + def initialize(app, plum, logger) + @app = app + @plum = plum + @logger = logger + + setup_plum + end + + def stop + @plum.close + end + + def run + begin + @plum.run + rescue Errno::EPIPE, Errno::ECONNRESET => e + rescue StandardError => e + @logger.error("#{e.class}: #{e.message}\n#{e.backtrace.map { |b| "\t#{b}" }.join("\n")}") + end + end + + private + def setup_plum + @plum.on(:connection_error) { |ex| @logger.error(ex) } + + # @plum.on(:stream) { |stream| @logger.debug("new stream: #{stream}") } + @plum.on(:stream_error) { |stream, ex| @logger.error(ex) } + + reqs = {} + @plum.on(:headers) { |stream, h| + reqs[stream] = { headers: h, data: "".force_encoding(Encoding::BINARY) } + } + + @plum.on(:data) { |stream, d| + reqs[stream][:data] << d # TODO: store to file? + } + + @plum.on(:end_stream) { |stream| + handle_request(stream, reqs[stream][:headers], reqs[stream][:data]) + } + end + + def send_body(stream, body) + begin + if body.is_a?(Array) + last = body.size - 1 + body.each_with_index { |part, i| + stream.send_data(part, end_stream: last == i) + } + elsif body.is_a?(IO) + stream.send_data(body, end_stream: true) + else + body.each { |part| stream.send_data(part, end_stream: false) } + stream.send_data(nil, end_stream: true) + end + ensure + body.close if body.respond_to?(:close) + end + end + + def extract_push(r_extheaders) + if pushs = r_extheaders["plum.serverpush"] + pushs.split(";").map { |push| push.split(" ", 2) } + else + [] + end + end + + def handle_request(stream, headers, data) + env = new_env(headers, data) + r_status, r_rawheaders, r_body = @app.call(env) + r_headers, r_extheaders = extract_headers(r_status, r_rawheaders) + r_topushs = extract_push(r_extheaders) + + stream.send_headers(r_headers, end_stream: false) + r_pushstreams = r_topushs.map { |method, path| + preq = { ":authority" => headers.find { |k, v| k == ":authority" }[1], + ":method" => method.to_s.upcase, + ":scheme" => headers.find { |k, v| k == ":scheme" }[1], + ":path" => path } + st = stream.promise(preq) + [st, preq] + } + + send_body(stream, r_body) + + r_pushstreams.each { |st, preq| + penv = new_env(preq, "") + p_status, p_h, p_body = @app.call(penv) + p_headers = extract_headers(p_status, p_h) + st.send_headers(p_headers, end_stream: false) + send_body(st, p_body) + } + end + + def new_env(h, data) + ebase = { + "SCRIPT_NAME" => "", + "rack.version" => ::Rack::VERSION, + "rack.input" => StringIO.new(data), + "rack.errors" => $stderr, + "rack.multithread" => true, + "rack.multiprocess" => false, + "rack.run_once" => false, + "rack.hijack?" => false, + } + + h.each { |k, v| + case k + when ":method" + ebase["REQUEST_METHOD"] = v + when ":path" + cpath_name, cpath_query = v.split("?", 2) + ebase["PATH_INFO"] = cpath_name + ebase["QUERY_STRING"] = cpath_query || "" + when ":authority" + chost, cport = v.split(":", 2) + ebase["SERVER_NAME"] = chost + ebase["SERVER_PORT"] = (cport || 443).to_i + when ":scheme" + ebase["rack.url_scheme"] = v + else + if k.start_with?(":") + # unknown HTTP/2 pseudo-headers + else + if "cookie" == k && ebase["HTTP_COOKIE"] + ebase["HTTP_COOKIE"] << "; " << v + else + ebase["HTTP_" << k.tr("-", "_").upcase!] = v + end + end + end + } + + ebase + end + + def extract_headers(r_status, r_h) + rbase = { + ":status" => r_status, + "server" => "plum/#{::Plum::VERSION}", + } + rext = {} + + r_h.each do |key, v_| + if key.include?(".") + rext[key] = v_ + else + key = key.downcase + + if "set-cookie" == key + rbase[key] = v_.gsub("\n", "; ") # RFC 7540 8.1.2.5 + else + key = key.byteshift(2) if key.start_with?("x-") + rbase[key] = v_.tr("\n", ",") # RFC 7230 7 + end + end + end + + [rbase, rext] + end + end + end +end diff --git a/lib/plum/rack/dsl.rb b/lib/plum/rack/dsl.rb new file mode 100644 index 0000000..f4ee850 --- /dev/null +++ b/lib/plum/rack/dsl.rb @@ -0,0 +1,44 @@ +module Plum + module Rack + module DSL + class Config + attr_reader :config + + def initialize + @config = ::Plum::Rack::Config::DEFAULT_CONFIG.dup + end + + def log(out) + if out.is_a?(String) + @config[:log] = File.open(out, "a") + else + @config[:log] = out + end + end + + def debug(bool) + @config[:debug] = !!bool + end + + def listener(type, conf) + case type + when :unix + lc = conf.merge(listener: UNIXListener) + when :tcp + lc = conf.merge(listener: TCPListener) + when :tls + lc = conf.merge(listener: TLSListener) + else + raise "Unknown listener type: #{type} (known type: :unix, :http, :https)" + end + + @config[:listeners] << lc + end + + def server_push(bool) + @config[:server_push] = !!bool + end + end + end + end +end diff --git a/lib/plum/rack/listener.rb b/lib/plum/rack/listener.rb new file mode 100644 index 0000000..83a3dff --- /dev/null +++ b/lib/plum/rack/listener.rb @@ -0,0 +1,105 @@ +module Plum + module Rack + class BaseListener + def stop + @server.close + end + + def to_io + raise "not implemented" + end + + def method_missing(name, *args) + @server.__send__(name, *args) + end + end + + class TCPListener < BaseListener + def initialize(lc) + @server = ::TCPServer.new(lc[:hostname], lc[:port]) + end + + def to_io + @server.to_io + end + + def plum(sock) + ::Plum::HTTPConnection.new(sock) + end + end + + class TLSListener < BaseListener + def initialize(lc) + cert, key = lc[:certificate], lc[:certificate_key] + unless cert && key + puts "WARNING: using dummy certificate" + cert, key = dummy_key + end + + ctx = OpenSSL::SSL::SSLContext.new + ctx.ssl_version = :TLSv1_2 + ctx.alpn_select_cb = -> protocols { + raise "Client does not support HTTP/2: #{protocols}" unless protocols.include?("h2") + "h2" + } + ctx.tmp_ecdh_callback = -> (sock, ise, keyl) { OpenSSL::PKey::EC.new("prime256v1") } + ctx.cert = OpenSSL::X509::Certificate.new(cert) + ctx.key = OpenSSL::PKey::RSA.new(key) + tcp_server = ::TCPServer.new(lc[:hostname], lc[:port]) + @server = OpenSSL::SSL::SSLServer.new(tcp_server, ctx) + @server.start_immediately = false + end + + def to_io + @server.to_io + end + + def plum(sock) + ::Plum::HTTPSConnection.new(sock) + end + + private + # returns: [cert, key] + def dummy_key + puts "WARNING: Generating new dummy certificate..." + + key = OpenSSL::PKey::RSA.new(2048) + cert = OpenSSL::X509::Certificate.new + cert.subject = cert.issuer = OpenSSL::X509::Name.parse("/C=JP/O=Test/OU=Test/CN=example.com") + cert.not_before = Time.now + cert.not_after = Time.now + 363 * 24 * 60 * 60 + cert.public_key = key.public_key + cert.serial = rand((1 << 20) - 1) + cert.version = 2 + + ef = OpenSSL::X509::ExtensionFactory.new + ef.subject_certificate = cert + ef.issuer_certificate = cert + cert.extensions = [ + ef.create_extension("basicConstraints","CA:TRUE", true), + ef.create_extension("subjectKeyIdentifier", "hash"), + ] + cert.add_extension ef.create_extension("authorityKeyIdentifier", "keyid:always,issuer:always") + + cert.sign key, OpenSSL::Digest::SHA1.new + + [cert, key] + end + end + + class UNIXListener < BaseListener + def initialize(path, permission, user, group) + @server = ::UNIXServer.new(path) + # TODO: set permission, user, group + end + + def to_io + @server.to_io + end + + def plum(sock) + ::Plum::HTTPSConnection.new(sock) + end + end + end +end diff --git a/lib/plum/rack/server.rb b/lib/plum/rack/server.rb new file mode 100644 index 0000000..a353db7 --- /dev/null +++ b/lib/plum/rack/server.rb @@ -0,0 +1,67 @@ +module Plum + module Rack + class Server + def initialize(app, config) + @state = :null + @app = config[:debug] ? ::Rack::CommonLogger.new(app) : app + @logger = Logger.new(config[:log] || $stdout).tap { |l| + l.level = config[:debug] ? Logger::DEBUG : Logger::INFO + } + @listeners = config[:listeners].map { |lc| + lc[:listener].new(lc) + } + + @logger.info("Plum #{::Plum::VERSION}") + @logger.info("Config: #{config}") + end + + def start + @state = :running + while @state == :running + break if @listeners.empty? + begin + if ss = IO.select(@listeners, nil, nil, 2.0) + ss[0].each { |svr| + new_con(svr) + } + end + rescue Errno::EBADF, Errno::ENOTSOCK, IOError => e # closed + rescue StandardError => e + @logger.error("#{e.class}: #{e.message}\n#{e.backtrace.map { |b| "\t#{b}" }.join("\n")}") + end + end + end + + def stop + @state = :stop + @listeners.map(&:stop) + # TODO: gracefully shutdown connections + end + + private + def new_con(svr) + sock = svr.accept + Thread.new { + begin + sock = sock.accept if sock.respond_to?(:accept) + plum = svr.plum(sock) + + #require "lineprof" + #Lineprof.profile(/plum/) { + con = Connection.new(@app, plum, @logger) + con.run + #} + rescue Errno::ECONNRESET, Errno::ECONNABORTED, Errno::EPROTO, Errno::EINVAL => e # closed + sock.close if sock + rescue StandardError => e + @logger.error("#{e.class}: #{e.message}\n#{e.backtrace.map { |b| "\t#{b}" }.join("\n")}") + sock.close if sock + end + } + rescue Errno::ECONNRESET, Errno::ECONNABORTED, Errno::EPROTO, Errno::EINVAL => e # closed + rescue StandardError => e + @logger.error("#{e.class}: #{e.message}\n#{e.backtrace.map { |b| "\t#{b}" }.join("\n")}") + end + end + end +end diff --git a/lib/rack/handler/plum.rb b/lib/rack/handler/plum.rb new file mode 100644 index 0000000..30eed72 --- /dev/null +++ b/lib/rack/handler/plum.rb @@ -0,0 +1,49 @@ +module Rack + module Handler + class Plum + def self.run(app, options = {}) + opts = default_options.merge(options) + + config = ::Plum::Rack::Config.new( + listeners: [ + { + listener: ::Plum::Rack::TLSListener, + hostname: opts[:Host], + port: opts[:Port].to_i + } + ], + debug: !!opts[:Debug] + ) + + @server = ::Plum::Rack::Server.new(app, config) + yield @server if block_given? # TODO + @server.start + end + + def self.shutdown + @server.stop if @server + end + + def self.valid_options + { + "Host=HOST" => "Hostname to listen on (default: #{default_options[:Host]})", + "Port=PORT" => "Port to listen on (default: #{default_options[:Port]})", + "Debug" => "Turn on debug mode (default: #{default_options[:Debug]})", + } + end + + private + def self.default_options + rack_env = ENV["RACK_ENV"] || "development" + dev = rack_env == "development" + default_options = { + Host: dev ? "localhost" : "0.0.0.0", + Port: 8080, + Debug: dev, + } + end + end + + register(:plum, ::Rack::Handler::Plum) + end +end |