aboutsummaryrefslogtreecommitdiffstats
path: root/lib/plum/rack
diff options
context:
space:
mode:
authorKazuki Yamaguchi <k@rhe.jp>2015-10-25 19:05:43 +0900
committerKazuki Yamaguchi <k@rhe.jp>2015-10-25 19:05:43 +0900
commit7f176b380f88494123a4a51346ade28c4da51c00 (patch)
tree8293401abe3676d88e3236ea168c22137e79a63e /lib/plum/rack
parent4dc76aeab391e29d0df33f58651ddc8232e262cb (diff)
parent7e998a7ae6826ebf2a50aa763b4d84f0082917ec (diff)
downloadplum-7f176b380f88494123a4a51346ade28c4da51c00.tar.gz
Merge remote-tracking branch 'plum-rack/master'
Diffstat (limited to 'lib/plum/rack')
-rw-r--r--lib/plum/rack/config.rb24
-rw-r--r--lib/plum/rack/connection.rb169
-rw-r--r--lib/plum/rack/dsl.rb44
-rw-r--r--lib/plum/rack/listener.rb105
-rw-r--r--lib/plum/rack/server.rb67
5 files changed, 409 insertions, 0 deletions
diff --git a/lib/plum/rack/config.rb b/lib/plum/rack/config.rb
new file mode 100644
index 0000000..2f04886
--- /dev/null
+++ b/lib/plum/rack/config.rb
@@ -0,0 +1,24 @@
+module Plum
+ module Rack
+ class Config
+ DEFAULT_CONFIG = {
+ listeners: [],
+ debug: false,
+ log: nil, # $stdout
+ server_push: true
+ }.freeze
+
+ def initialize(config)
+ @config = DEFAULT_CONFIG.merge(config)
+ end
+
+ def [](key)
+ @config[key]
+ end
+
+ def to_s
+ @config.to_s
+ end
+ end
+ end
+end
diff --git a/lib/plum/rack/connection.rb b/lib/plum/rack/connection.rb
new file mode 100644
index 0000000..45cde79
--- /dev/null
+++ b/lib/plum/rack/connection.rb
@@ -0,0 +1,169 @@
+module Plum
+ module Rack
+ class Connection
+ attr_reader :app, :plum
+
+ def initialize(app, plum, logger)
+ @app = app
+ @plum = plum
+ @logger = logger
+
+ setup_plum
+ end
+
+ def stop
+ @plum.close
+ end
+
+ def run
+ begin
+ @plum.run
+ rescue Errno::EPIPE, Errno::ECONNRESET => e
+ rescue StandardError => e
+ @logger.error("#{e.class}: #{e.message}\n#{e.backtrace.map { |b| "\t#{b}" }.join("\n")}")
+ end
+ end
+
+ private
+ def setup_plum
+ @plum.on(:connection_error) { |ex| @logger.error(ex) }
+
+ # @plum.on(:stream) { |stream| @logger.debug("new stream: #{stream}") }
+ @plum.on(:stream_error) { |stream, ex| @logger.error(ex) }
+
+ reqs = {}
+ @plum.on(:headers) { |stream, h|
+ reqs[stream] = { headers: h, data: "".force_encoding(Encoding::BINARY) }
+ }
+
+ @plum.on(:data) { |stream, d|
+ reqs[stream][:data] << d # TODO: store to file?
+ }
+
+ @plum.on(:end_stream) { |stream|
+ handle_request(stream, reqs[stream][:headers], reqs[stream][:data])
+ }
+ end
+
+ def send_body(stream, body)
+ begin
+ if body.is_a?(Array)
+ last = body.size - 1
+ body.each_with_index { |part, i|
+ stream.send_data(part, end_stream: last == i)
+ }
+ elsif body.is_a?(IO)
+ stream.send_data(body, end_stream: true)
+ else
+ body.each { |part| stream.send_data(part, end_stream: false) }
+ stream.send_data(nil, end_stream: true)
+ end
+ ensure
+ body.close if body.respond_to?(:close)
+ end
+ end
+
+ def extract_push(r_extheaders)
+ if pushs = r_extheaders["plum.serverpush"]
+ pushs.split(";").map { |push| push.split(" ", 2) }
+ else
+ []
+ end
+ end
+
+ def handle_request(stream, headers, data)
+ env = new_env(headers, data)
+ r_status, r_rawheaders, r_body = @app.call(env)
+ r_headers, r_extheaders = extract_headers(r_status, r_rawheaders)
+ r_topushs = extract_push(r_extheaders)
+
+ stream.send_headers(r_headers, end_stream: false)
+ r_pushstreams = r_topushs.map { |method, path|
+ preq = { ":authority" => headers.find { |k, v| k == ":authority" }[1],
+ ":method" => method.to_s.upcase,
+ ":scheme" => headers.find { |k, v| k == ":scheme" }[1],
+ ":path" => path }
+ st = stream.promise(preq)
+ [st, preq]
+ }
+
+ send_body(stream, r_body)
+
+ r_pushstreams.each { |st, preq|
+ penv = new_env(preq, "")
+ p_status, p_h, p_body = @app.call(penv)
+ p_headers = extract_headers(p_status, p_h)
+ st.send_headers(p_headers, end_stream: false)
+ send_body(st, p_body)
+ }
+ end
+
+ def new_env(h, data)
+ ebase = {
+ "SCRIPT_NAME" => "",
+ "rack.version" => ::Rack::VERSION,
+ "rack.input" => StringIO.new(data),
+ "rack.errors" => $stderr,
+ "rack.multithread" => true,
+ "rack.multiprocess" => false,
+ "rack.run_once" => false,
+ "rack.hijack?" => false,
+ }
+
+ h.each { |k, v|
+ case k
+ when ":method"
+ ebase["REQUEST_METHOD"] = v
+ when ":path"
+ cpath_name, cpath_query = v.split("?", 2)
+ ebase["PATH_INFO"] = cpath_name
+ ebase["QUERY_STRING"] = cpath_query || ""
+ when ":authority"
+ chost, cport = v.split(":", 2)
+ ebase["SERVER_NAME"] = chost
+ ebase["SERVER_PORT"] = (cport || 443).to_i
+ when ":scheme"
+ ebase["rack.url_scheme"] = v
+ else
+ if k.start_with?(":")
+ # unknown HTTP/2 pseudo-headers
+ else
+ if "cookie" == k && ebase["HTTP_COOKIE"]
+ ebase["HTTP_COOKIE"] << "; " << v
+ else
+ ebase["HTTP_" << k.tr("-", "_").upcase!] = v
+ end
+ end
+ end
+ }
+
+ ebase
+ end
+
+ def extract_headers(r_status, r_h)
+ rbase = {
+ ":status" => r_status,
+ "server" => "plum/#{::Plum::VERSION}",
+ }
+ rext = {}
+
+ r_h.each do |key, v_|
+ if key.include?(".")
+ rext[key] = v_
+ else
+ key = key.downcase
+
+ if "set-cookie" == key
+ rbase[key] = v_.gsub("\n", "; ") # RFC 7540 8.1.2.5
+ else
+ key = key.byteshift(2) if key.start_with?("x-")
+ rbase[key] = v_.tr("\n", ",") # RFC 7230 7
+ end
+ end
+ end
+
+ [rbase, rext]
+ end
+ end
+ end
+end
diff --git a/lib/plum/rack/dsl.rb b/lib/plum/rack/dsl.rb
new file mode 100644
index 0000000..f4ee850
--- /dev/null
+++ b/lib/plum/rack/dsl.rb
@@ -0,0 +1,44 @@
+module Plum
+ module Rack
+ module DSL
+ class Config
+ attr_reader :config
+
+ def initialize
+ @config = ::Plum::Rack::Config::DEFAULT_CONFIG.dup
+ end
+
+ def log(out)
+ if out.is_a?(String)
+ @config[:log] = File.open(out, "a")
+ else
+ @config[:log] = out
+ end
+ end
+
+ def debug(bool)
+ @config[:debug] = !!bool
+ end
+
+ def listener(type, conf)
+ case type
+ when :unix
+ lc = conf.merge(listener: UNIXListener)
+ when :tcp
+ lc = conf.merge(listener: TCPListener)
+ when :tls
+ lc = conf.merge(listener: TLSListener)
+ else
+ raise "Unknown listener type: #{type} (known type: :unix, :http, :https)"
+ end
+
+ @config[:listeners] << lc
+ end
+
+ def server_push(bool)
+ @config[:server_push] = !!bool
+ end
+ end
+ end
+ end
+end
diff --git a/lib/plum/rack/listener.rb b/lib/plum/rack/listener.rb
new file mode 100644
index 0000000..83a3dff
--- /dev/null
+++ b/lib/plum/rack/listener.rb
@@ -0,0 +1,105 @@
+module Plum
+ module Rack
+ class BaseListener
+ def stop
+ @server.close
+ end
+
+ def to_io
+ raise "not implemented"
+ end
+
+ def method_missing(name, *args)
+ @server.__send__(name, *args)
+ end
+ end
+
+ class TCPListener < BaseListener
+ def initialize(lc)
+ @server = ::TCPServer.new(lc[:hostname], lc[:port])
+ end
+
+ def to_io
+ @server.to_io
+ end
+
+ def plum(sock)
+ ::Plum::HTTPConnection.new(sock)
+ end
+ end
+
+ class TLSListener < BaseListener
+ def initialize(lc)
+ cert, key = lc[:certificate], lc[:certificate_key]
+ unless cert && key
+ puts "WARNING: using dummy certificate"
+ cert, key = dummy_key
+ end
+
+ ctx = OpenSSL::SSL::SSLContext.new
+ ctx.ssl_version = :TLSv1_2
+ ctx.alpn_select_cb = -> protocols {
+ raise "Client does not support HTTP/2: #{protocols}" unless protocols.include?("h2")
+ "h2"
+ }
+ ctx.tmp_ecdh_callback = -> (sock, ise, keyl) { OpenSSL::PKey::EC.new("prime256v1") }
+ ctx.cert = OpenSSL::X509::Certificate.new(cert)
+ ctx.key = OpenSSL::PKey::RSA.new(key)
+ tcp_server = ::TCPServer.new(lc[:hostname], lc[:port])
+ @server = OpenSSL::SSL::SSLServer.new(tcp_server, ctx)
+ @server.start_immediately = false
+ end
+
+ def to_io
+ @server.to_io
+ end
+
+ def plum(sock)
+ ::Plum::HTTPSConnection.new(sock)
+ end
+
+ private
+ # returns: [cert, key]
+ def dummy_key
+ puts "WARNING: Generating new dummy certificate..."
+
+ key = OpenSSL::PKey::RSA.new(2048)
+ cert = OpenSSL::X509::Certificate.new
+ cert.subject = cert.issuer = OpenSSL::X509::Name.parse("/C=JP/O=Test/OU=Test/CN=example.com")
+ cert.not_before = Time.now
+ cert.not_after = Time.now + 363 * 24 * 60 * 60
+ cert.public_key = key.public_key
+ cert.serial = rand((1 << 20) - 1)
+ cert.version = 2
+
+ ef = OpenSSL::X509::ExtensionFactory.new
+ ef.subject_certificate = cert
+ ef.issuer_certificate = cert
+ cert.extensions = [
+ ef.create_extension("basicConstraints","CA:TRUE", true),
+ ef.create_extension("subjectKeyIdentifier", "hash"),
+ ]
+ cert.add_extension ef.create_extension("authorityKeyIdentifier", "keyid:always,issuer:always")
+
+ cert.sign key, OpenSSL::Digest::SHA1.new
+
+ [cert, key]
+ end
+ end
+
+ class UNIXListener < BaseListener
+ def initialize(path, permission, user, group)
+ @server = ::UNIXServer.new(path)
+ # TODO: set permission, user, group
+ end
+
+ def to_io
+ @server.to_io
+ end
+
+ def plum(sock)
+ ::Plum::HTTPSConnection.new(sock)
+ end
+ end
+ end
+end
diff --git a/lib/plum/rack/server.rb b/lib/plum/rack/server.rb
new file mode 100644
index 0000000..a353db7
--- /dev/null
+++ b/lib/plum/rack/server.rb
@@ -0,0 +1,67 @@
+module Plum
+ module Rack
+ class Server
+ def initialize(app, config)
+ @state = :null
+ @app = config[:debug] ? ::Rack::CommonLogger.new(app) : app
+ @logger = Logger.new(config[:log] || $stdout).tap { |l|
+ l.level = config[:debug] ? Logger::DEBUG : Logger::INFO
+ }
+ @listeners = config[:listeners].map { |lc|
+ lc[:listener].new(lc)
+ }
+
+ @logger.info("Plum #{::Plum::VERSION}")
+ @logger.info("Config: #{config}")
+ end
+
+ def start
+ @state = :running
+ while @state == :running
+ break if @listeners.empty?
+ begin
+ if ss = IO.select(@listeners, nil, nil, 2.0)
+ ss[0].each { |svr|
+ new_con(svr)
+ }
+ end
+ rescue Errno::EBADF, Errno::ENOTSOCK, IOError => e # closed
+ rescue StandardError => e
+ @logger.error("#{e.class}: #{e.message}\n#{e.backtrace.map { |b| "\t#{b}" }.join("\n")}")
+ end
+ end
+ end
+
+ def stop
+ @state = :stop
+ @listeners.map(&:stop)
+ # TODO: gracefully shutdown connections
+ end
+
+ private
+ def new_con(svr)
+ sock = svr.accept
+ Thread.new {
+ begin
+ sock = sock.accept if sock.respond_to?(:accept)
+ plum = svr.plum(sock)
+
+ #require "lineprof"
+ #Lineprof.profile(/plum/) {
+ con = Connection.new(@app, plum, @logger)
+ con.run
+ #}
+ rescue Errno::ECONNRESET, Errno::ECONNABORTED, Errno::EPROTO, Errno::EINVAL => e # closed
+ sock.close if sock
+ rescue StandardError => e
+ @logger.error("#{e.class}: #{e.message}\n#{e.backtrace.map { |b| "\t#{b}" }.join("\n")}")
+ sock.close if sock
+ end
+ }
+ rescue Errno::ECONNRESET, Errno::ECONNABORTED, Errno::EPROTO, Errno::EINVAL => e # closed
+ rescue StandardError => e
+ @logger.error("#{e.class}: #{e.message}\n#{e.backtrace.map { |b| "\t#{b}" }.join("\n")}")
+ end
+ end
+ end
+end