diff options
author | Kazuki Yamaguchi <k@rhe.jp> | 2015-11-13 09:03:44 +0900 |
---|---|---|
committer | Kazuki Yamaguchi <k@rhe.jp> | 2015-11-13 09:03:44 +0900 |
commit | 963925aed0bca245f390dbdffc6c5308b88bc683 (patch) | |
tree | cbd76d2db899aff26f202cb2314aee5893ea3baf /lib | |
parent | 0e9f859c18d78a3c34d493e9d673e06ab10c311a (diff) | |
parent | 014ff6d424f5ad863a099428d865bb74c857b36e (diff) | |
download | plum-963925aed0bca245f390dbdffc6c5308b88bc683.tar.gz |
Merge branch 'master' of github.com:rhenium/plum
Diffstat (limited to 'lib')
-rw-r--r-- | lib/plum.rb | 5 | ||||
-rw-r--r-- | lib/plum/binary_string.rb | 14 | ||||
-rw-r--r-- | lib/plum/client.rb | 242 | ||||
-rw-r--r-- | lib/plum/client/client_session.rb | 91 | ||||
-rw-r--r-- | lib/plum/client/decoders.rb | 51 | ||||
-rw-r--r-- | lib/plum/client/legacy_client_session.rb | 118 | ||||
-rw-r--r-- | lib/plum/client/response.rb | 40 | ||||
-rw-r--r-- | lib/plum/client/upgrade_client_session.rb | 46 | ||||
-rw-r--r-- | lib/plum/connection.rb | 52 | ||||
-rw-r--r-- | lib/plum/errors.rb | 20 | ||||
-rw-r--r-- | lib/plum/flow_control.rb | 6 | ||||
-rw-r--r-- | lib/plum/frame_factory.rb | 32 | ||||
-rw-r--r-- | lib/plum/frame_utils.rb | 47 | ||||
-rw-r--r-- | lib/plum/hpack/constants.rb | 2 | ||||
-rw-r--r-- | lib/plum/hpack/context.rb | 8 | ||||
-rw-r--r-- | lib/plum/rack/session.rb | 7 | ||||
-rw-r--r-- | lib/plum/server/connection.rb | 2 | ||||
-rw-r--r-- | lib/plum/server/http_connection.rb | 15 | ||||
-rw-r--r-- | lib/plum/server/https_connection.rb | 2 | ||||
-rw-r--r-- | lib/plum/stream.rb | 55 | ||||
-rw-r--r-- | lib/plum/stream_utils.rb | 34 | ||||
-rw-r--r-- | lib/plum/version.rb | 2 | ||||
-rw-r--r-- | lib/rack/handler/plum.rb | 2 |
23 files changed, 607 insertions, 286 deletions
diff --git a/lib/plum.rb b/lib/plum.rb index bcd72d2..95e5588 100644 --- a/lib/plum.rb +++ b/lib/plum.rb @@ -2,6 +2,7 @@ require "openssl" require "socket" require "base64" require "set" +require "zlib" require "plum/version" require "plum/errors" require "plum/binary_string" @@ -24,4 +25,8 @@ require "plum/server/https_connection" require "plum/server/http_connection" require "plum/client" require "plum/client/response" +require "plum/client/decoders" require "plum/client/connection" +require "plum/client/client_session" +require "plum/client/legacy_client_session" +require "plum/client/upgrade_client_session" diff --git a/lib/plum/binary_string.rb b/lib/plum/binary_string.rb index 400c57b..2b50b38 100644 --- a/lib/plum/binary_string.rb +++ b/lib/plum/binary_string.rb @@ -70,6 +70,20 @@ module Plum # I want to write `enum_for(__method__, n)`! end end + + # Splits this String into chunks. + # @param n [Integer] max chunk bytesize + # @return [Array<String>] the slices + def chunk(n) + res = [] + pos = 0 + lim = bytesize + while pos < lim + res << byteslice(pos, n) + pos += n + end + res + end end end end diff --git a/lib/plum/client.rb b/lib/plum/client.rb index 35c4001..8cb9230 100644 --- a/lib/plum/client.rb +++ b/lib/plum/client.rb @@ -2,13 +2,18 @@ module Plum class Client DEFAULT_CONFIG = { - tls: true, + http2: true, scheme: "https", + hostname: nil, verify_mode: OpenSSL::SSL::VERIFY_PEER, + ssl_context: nil, + http2_settings: {}, + user_agent: "plum/#{Plum::VERSION}", + auto_decode: true, }.freeze attr_reader :host, :port, :config - attr_reader :socket + attr_reader :socket, :session # Creates a new HTTP client and starts communication. # A shorthand for `Plum::Client.new(args).start(&block)` @@ -26,11 +31,9 @@ module Plum @socket = host else @host = host - @port = port || (config[:tls] ? 443 : 80) + @port = port || (config[:scheme] == "https" ? 443 : 80) end - @config = DEFAULT_CONFIG.merge(config) - @response_handlers = {} - @responses = {} + @config = DEFAULT_CONFIG.merge(hostname: host).merge(config) @started = false end @@ -42,7 +45,7 @@ module Plum if block_given? begin ret = yield(self) - wait + resume return ret ensure close @@ -51,25 +54,21 @@ module Plum self end - # Waits for the asynchronous response(s) to finish. + # Resume communication with the server, until the specified (or all running) requests are complete. # @param response [Response] if specified, waits only for the response - def wait(response = nil) + # @return [Response] if parameter response is specified + def resume(response = nil) if response - _succ while !response.failed? && !response.finished? + @session.succ until response.failed? || response.finished? + response else - _succ while !@responses.empty? + @session.succ until @session.empty? end end - # Waits for the response headers. - # @param response [Response] the incomplete response. - def wait_headers(response) - _succ while !response.failed? && !response.headers - end - - # Closes the connection. + # Closes the connection immediately. def close - @plum.close if @plum + @session.close if @session ensure @socket.close if @socket end @@ -77,169 +76,124 @@ module Plum # Creates a new HTTP request. # @param headers [Hash<String, String>] the request headers # @param body [String] the request body - # @param block [Proc] if specified, calls the block when finished - def request_async(headers, body = nil, &block) - stream = @plum.open_stream - response = Response.new - @responses[stream] = response - - if body - stream.send_headers(headers, end_stream: false) - stream.send_data(body, end_stream: true) - else - stream.send_headers(headers, end_stream: true) - end - - if block_given? - @response_handlers[stream] = block - end - - response - end - - # Creates a new HTTP request and waits for the response - # @param headers [Hash<String, String>] the request headers - # @param body [String] the request body - def request(headers, body = nil) + # @param options [Hash<Symbol, Object>] request options + # @param block [Proc] if passed, it will be called when received response headers. + def request(headers, body, options = {}, &block) raise ArgumentError, ":method and :path headers are required" unless headers[":method"] && headers[":path"] - - base_headers = { ":method" => nil, - ":path" => nil, - ":authority" => (@config[:hostname] || @host), - ":scheme" => (@config[:scheme] || "https") } - - response = request_async(base_headers.merge(headers), body) - wait(response) - response + @session.request(headers, body, @config.merge(options), &block) end + # @!method get! + # @!method head! + # @!method delete! + # @param path [String] the absolute path to request (translated into :path header) + # @param options [Hash<Symbol, Object>] the request options + # @param block [Proc] if specified, calls the block when finished + # Shorthand method for `Client#resume(Client#request(*args))` + # @!method get # @!method head # @!method delete # @param path [String] the absolute path to request (translated into :path header) - # @param headers [Hash] the request headers - # Shorthand method for `#request` - - # @!method get_async - # @!method head_async - # @!method delete_async - # @param path [String] the absolute path to request (translated into :path header) - # @param headers [Hash] the request headers + # @param options [Hash<Symbol, Object>] the request options # @param block [Proc] if specified, calls the block when finished - # Shorthand method for `#request_async` + # Shorthand method for `#request` %w(GET HEAD DELETE).each { |method| - define_method(:"#{method.downcase}") do |path, headers = {}| - request({ ":method" => method, ":path" => path }.merge(headers)) + define_method(:"#{method.downcase}!") do |path, options = {}, &block| + resume _request_helper(method, path, nil, options, &block) end - define_method(:"#{method.downcase}_async") do |path, headers = {}, &block| - request_async({ ":method" => method, ":path" => path }.merge(headers), nil, &block) + define_method(:"#{method.downcase}") do |path, options = {}, &block| + _request_helper(method, path, nil, options, &block) end } - # @!method post - # @!method put + # @!method post! + # @!method put! # @param path [String] the absolute path to request (translated into :path header) # @param body [String] the request body - # @param headers [Hash] the request headers - # Shorthand method for `#request` + # @param options [Hash<Symbol, Object>] the request options + # @param block [Proc] if specified, calls the block when finished + # Shorthand method for `Client#resume(Client#request(*args))` - # @!method post_async - # @!method put_async + # @!method post + # @!method put # @param path [String] the absolute path to request (translated into :path header) # @param body [String] the request body - # @param headers [Hash] the request headers + # @param options [Hash<Symbol, Object>] the request options # @param block [Proc] if specified, calls the block when finished - # Shorthand method for `#request_async` + # Shorthand method for `#request` %w(POST PUT).each { |method| - define_method(:"#{method.downcase}") do |path, body = nil, headers = {}| - request({ ":method" => method, ":path" => path }.merge(headers), body) + define_method(:"#{method.downcase}!") do |path, body, options = {}, &block| + resume _request_helper(method, path, body, options, &block) end - define_method(:"#{method.downcase}_async") do |path, body = nil, headers = {}, &block| - request_async({ ":method" => method, ":path" => path }.merge(headers), body, &block) + define_method(:"#{method.downcase}") do |path, body, options = {}, &block| + _request_helper(method, path, body, options, &block) end } private - def _start - @started = true - unless @socket - sock = TCPSocket.open(host, port) - if config[:tls] - ctx = @config[:ssl_context] || new_ssl_ctx - sock = OpenSSL::SSL::SSLSocket.new(sock, ctx) - sock.sync_close = true - sock.connect - end - - @socket = sock + # @return [Boolean] http2 nego? + def _connect + @socket = TCPSocket.open(@host, @port) + + if @config[:scheme] == "https" + ctx = @config[:ssl_context] || new_ssl_ctx + @socket = OpenSSL::SSL::SSLSocket.new(@socket, ctx) + @socket.hostname = @config[:hostname] if @socket.respond_to?(:hostname=) + @socket.sync_close = true + @socket.connect + @socket.post_connection_check(@config[:hostname]) if ctx.verify_mode != OpenSSL::SSL::VERIFY_NONE + + @socket.respond_to?(:alpn_protocol) && @socket.alpn_protocol == "h2" || + @socket.respond_to?(:npn_protocol) && @socket.npn_protocol == "h2" end - - @plum = setup_plum(@socket) end - def setup_plum(sock) - local_settings = { - enable_push: 0, - initial_window_size: (1 << 30) - 1, - } - plum = ClientConnection.new(sock.method(:write), local_settings) - plum.on(:protocol_error) { |ex| - _fail(ex) - raise ex - } - plum.on(:close) { _fail(RuntimeError.new(:closed)) } - plum.on(:stream_error) { |stream, ex| - if res = @responses.delete(stream) - res._fail(ex) unless res.finished? - end - raise ex - } - plum.on(:headers) { |stream, headers| - response = @responses[stream] - response._headers(headers) - if handler = @response_handlers.delete(stream) - handler.call(response) - end - } - plum.on(:data) { |stream, chunk| - response = @responses[stream] - response._chunk(chunk) - } - plum.on(:end_stream) { |stream| - response = @responses.delete(stream) - response._finish - } - plum - end + def _start + @started = true - def _succ - @plum << @socket.readpartial(1024) - end + klass = @config[:http2] ? ClientSession : LegacyClientSession + nego = @socket || _connect - def _fail(ex) - while sr = @responses.shift - stream, res = sr - res._fail(ex) unless res.finished? + if @config[:http2] + if @config[:scheme] == "https" + klass = nego ? ClientSession : LegacyClientSession + else + klass = UpgradeClientSession + end + else + klass = LegacyClientSession end - ensure - close + + @session = klass.new(@socket, @config) end def new_ssl_ctx ctx = OpenSSL::SSL::SSLContext.new ctx.ssl_version = :TLSv1_2 ctx.verify_mode = @config[:verify_mode] - if ctx.respond_to?(:hostname=) - ctx.hostname = @config[:hostname] || @host - end - if ctx.respond_to?(:alpn_protocols) - ctx.alpn_protocols = ["h2", "http/1.1"] - end - if ctx.respond_to?(:npn_select_cb) - ctx.alpn_select_cb = -> protocols { - protocols.include?("h2") ? "h2" : protocols.first - } + cert_store = OpenSSL::X509::Store.new + cert_store.set_default_paths + ctx.cert_store = cert_store + if @config[:http2] + ctx.ciphers = "ALL:!" + HTTPSServerConnection::CIPHER_BLACKLIST.join(":!") + if ctx.respond_to?(:alpn_protocols) + ctx.alpn_protocols = ["h2", "http/1.1"] + end + if ctx.respond_to?(:npn_select_cb) # TODO: RFC 7540 does not define protocol negotiation with NPN + ctx.npn_select_cb = -> protocols { + protocols.include?("h2") ? "h2" : protocols.first + } + end end ctx end + + def _request_helper(method, path, body, options, &block) + base = { ":method" => method, + ":path" => path, + "user-agent" => @config[:user_agent] } + base.merge!(options[:headers]) if options[:headers] + request(base, body, options, &block) + end end end diff --git a/lib/plum/client/client_session.rb b/lib/plum/client/client_session.rb new file mode 100644 index 0000000..6e9fc56 --- /dev/null +++ b/lib/plum/client/client_session.rb @@ -0,0 +1,91 @@ +# -*- frozen-string-literal: true -*- +module Plum + # HTTP/2 client session. + class ClientSession + HTTP2_DEFAULT_SETTINGS = { + enable_push: 0, # TODO: api? + initial_window_size: 2 ** 30, # TODO + } + + attr_reader :plum + + def initialize(socket, config) + @socket = socket + @config = config + @http2_settings = HTTP2_DEFAULT_SETTINGS.merge(@config[:http2_settings]) + + @plum = setup_plum + @responses = Set.new + end + + def succ + @plum << @socket.readpartial(16384) + rescue => e + fail(e) + end + + def empty? + @responses.empty? + end + + def close + @closed = true + @responses.each(&:_fail) + @responses.clear + @plum.close + end + + def request(headers, body, options, &headers_cb) + headers = { ":method" => nil, + ":path" => nil, + ":authority" => @config[:hostname], + ":scheme" => @config[:scheme] + }.merge(headers) + + response = Response.new(**options) + @responses << response + stream = @plum.open_stream + stream.send_headers(headers, end_stream: !body) + stream.send_data(body, end_stream: true) if body + + stream.on(:headers) { |resp_headers_raw| + response._headers(resp_headers_raw) + headers_cb.call(response) if headers_cb + } + stream.on(:data) { |chunk| + response._chunk(chunk) + check_window(stream) + } + stream.on(:end_stream) { + response._finish + @responses.delete(response) + } + stream.on(:stream_error) { |ex| + response._fail + raise ex + } + response + end + + private + def fail(exception) + close + raise exception + end + + def setup_plum + plum = ClientConnection.new(@socket.method(:write), @http2_settings) + plum.on(:connection_error) { |ex| + fail(ex) + } + plum.window_update(@http2_settings[:initial_window_size]) + plum + end + + def check_window(stream) + ws = @http2_settings[:initial_window_size] + stream.window_update(ws) if stream.recv_remaining_window < (ws / 2) + @plum.window_update(ws) if @plum.recv_remaining_window < (ws / 2) + end + end +end diff --git a/lib/plum/client/decoders.rb b/lib/plum/client/decoders.rb new file mode 100644 index 0000000..e6d72e7 --- /dev/null +++ b/lib/plum/client/decoders.rb @@ -0,0 +1,51 @@ +module Plum + module Decoders + class Base + def decode(chunk) + chunk + end + + def finish + end + end + + # `deflate` is not just deflate, wrapped by zlib format (RFC 1950) + class Deflate < Base + def initialize + @inflate = Zlib::Inflate.new(Zlib::MAX_WBITS) + end + + def decode(chunk) + @inflate.inflate(chunk) + rescue Zlib::Error => e + raise DecoderError.new("failed to decode chunk", e) + end + + def finish + @inflate.finish + rescue Zlib::Error => e + raise DecoderError.new("failed to finalize", e) + end + end + + class GZip < Base + def initialize + @stream = Zlib::Inflate.new(Zlib::MAX_WBITS + 16) + end + + def decode(chunk) + @stream.inflate(chunk) + rescue Zlib::Error => e + raise DecoderError.new("failed to decode chunk", e) + end + + def finish + @stream.finish + rescue Zlib::Error => e + raise DecoderError.new("failed to finalize", e) + end + end + + DECODERS = { "gzip" => GZip, "deflate" => Deflate }.freeze + end +end diff --git a/lib/plum/client/legacy_client_session.rb b/lib/plum/client/legacy_client_session.rb new file mode 100644 index 0000000..bc531ac --- /dev/null +++ b/lib/plum/client/legacy_client_session.rb @@ -0,0 +1,118 @@ +# -*- frozen-string-literal: true -*- +module Plum + # HTTP/1.x client session. + class LegacyClientSession + # Creates a new HTTP/1.1 client session + def initialize(socket, config) + require "http/parser" + @socket = socket + @config = config + + @parser = setup_parser + @requests = [] + @response = nil + @headers_callback = nil + end + + def succ + @parser << @socket.readpartial(16384) + rescue => e # including HTTP::Parser::Error + fail(e) + end + + def empty? + !@response + end + + def close + @closed = true + @response._fail if @response + end + + def request(headers, body, options, &headers_cb) + headers["host"] = headers[":authority"] || headers["host"] || @config[:hostname] + if body + if headers["content-length"] || headers["transfer-encoding"] + chunked = false + else + chunked = true + headers["transfer-encoding"] = "chunked" + end + end + + response = Response.new(**options) + @requests << [response, headers, body, chunked, headers_cb] + consume_queue + response + end + + private + def fail(exception) + close + raise exception + end + + def consume_queue + return if @response || @requests.empty? + + response, headers, body, chunked, cb = @requests.shift + @response = response + @headers_callback = cb + + @socket << construct_request(headers) + + if body + if chunked + read_object(body) { |chunk| + @socket << chunk.bytesize.to_s(16) << "\r\n" << chunk << "\r\n" + } + else + read_object(body) { |chunk| @socket << chunk } + end + end + end + + def construct_request(headers) + out = String.new + out << "%s %s HTTP/1.1\r\n" % [headers[":method"], headers[":path"]] + headers.each { |key, value| + next if key.start_with?(":") # HTTP/2 psuedo headers + out << "%s: %s\r\n" % [key, value] + } + out << "\r\n" + end + + def read_object(body) + if body.is_a?(String) + yield body + else # IO + until body.eof? + yield body.readpartial(1024) + end + end + end + + def setup_parser + parser = HTTP::Parser.new + parser.on_headers_complete = proc { + resp_headers = parser.headers.map { |key, value| [key.downcase, value] }.to_h + @response._headers({ ":status" => parser.status_code.to_s }.merge(resp_headers)) + @headers_callback.call(@response) if @headers_callback + } + + parser.on_body = proc { |chunk| + @response._chunk(chunk) + } + + parser.on_message_complete = proc { |env| + @response._finish + @response = nil + @headers_callback = nil + close unless parser.keep_alive? + consume_queue + } + + parser + end + end +end diff --git a/lib/plum/client/response.rb b/lib/plum/client/response.rb index 8270e95..9e50f02 100644 --- a/lib/plum/client/response.rb +++ b/lib/plum/client/response.rb @@ -6,14 +6,15 @@ module Plum attr_reader :headers # @api private - def initialize + def initialize(auto_decode: true, **options) @body = Queue.new @finished = false @failed = false @body = [] + @auto_decode = auto_decode end - # Return the HTTP status code. + # Returns the HTTP status code. # @return [String] the HTTP status code def status @headers && @headers[":status"] @@ -42,6 +43,7 @@ module Plum # @yield [chunk] A chunk of the response body. def on_chunk(&block) raise "Body already read" if @on_chunk + raise ArgumentError, "block must be given" unless block_given? @on_chunk = block unless @body.empty? @body.each(&block) @@ -49,25 +51,34 @@ module Plum end end + # Set callback that will be called when the response finished. + def on_finish(&block) + raise ArgumentError, "block must be given" unless block_given? + if finished? + yield + else + @on_finish = block + end + end + # Returns the complete response body. Use #each_body instead if the body can be very large. # @return [String] the whole response body def body raise "Body already read" if @on_chunk - if finished? - @body.join - else - raise "Response body is not complete" - end + raise "Response body is not complete" unless finished? + @body.join end # @api private def _headers(raw_headers) # response headers should not have duplicates @headers = raw_headers.to_h.freeze + @decoder = setup_decoder end # @api private - def _chunk(chunk) + def _chunk(encoded) + chunk = @decoder.decode(encoded) if @on_chunk @on_chunk.call(chunk) else @@ -78,11 +89,22 @@ module Plum # @api private def _finish @finished = true + @decoder.finish + @on_finish.call if @on_finish end # @api private - def _fail(ex) + def _fail @failed = true end + + private + def setup_decoder + if @auto_decode + klass = Decoders::DECODERS[@headers["content-encoding"]] + end + klass ||= Decoders::Base + klass.new + end end end diff --git a/lib/plum/client/upgrade_client_session.rb b/lib/plum/client/upgrade_client_session.rb new file mode 100644 index 0000000..c0e0d9e --- /dev/null +++ b/lib/plum/client/upgrade_client_session.rb @@ -0,0 +1,46 @@ +# -*- frozen-string-literal: true -*- +module Plum + # Try upgrade to HTTP/2 + class UpgradeClientSession + def initialize(socket, config) + prepare_session(socket, config) + end + + def succ + @session.succ + end + + def empty? + @session.empty? + end + + def close + @session.close + end + + def request(headers, body, options, &headers_cb) + @session.request(headers, body, options, &headers_cb) + end + + private + def prepare_session(socket, config) + lcs = LegacyClientSession.new(socket, config) + opt_res = lcs.request({ ":method" => "OPTIONS", + ":path" => "*", + "User-Agent" => config[:user_agent], + "Connection" => "Upgrade, HTTP2-Settings", + "Upgrade" => "h2c", + "HTTP2-Settings" => "" }, nil, {}) + lcs.succ until opt_res.finished? + + if opt_res.status == "101" + lcs.close + @session = ClientSession.new(socket, config) + @session.plum.stream(1).set_state(:half_closed_local) + else + @session = lcs + end + end + end +end + diff --git a/lib/plum/connection.rb b/lib/plum/connection.rb index 1d47360..73f6206 100644 --- a/lib/plum/connection.rb +++ b/lib/plum/connection.rb @@ -51,26 +51,16 @@ module Plum return if new_data.empty? @buffer << new_data consume_buffer - rescue ConnectionError => e + rescue RemoteConnectionError => e callback(:connection_error, e) goaway(e.http2_error_type) close end alias << receive - private - def consume_buffer - while frame = Frame.parse!(@buffer) - callback(:frame, frame) - receive_frame(frame) - end - end - - def send_immediately(frame) - callback(:send_frame, frame) - @writer.call(frame.assemble) - end - + # Returns a Stream object with the specified ID. + # @param stream_id [Integer] the stream id + # @return [Stream] the stream def stream(stream_id) raise ArgumentError, "stream_id can't be 0" if stream_id == 0 @@ -92,14 +82,34 @@ module Plum stream end + private + def consume_buffer + while frame = Frame.parse!(@buffer) + callback(:frame, frame) + receive_frame(frame) + end + end + + def send_immediately(frame) + callback(:send_frame, frame) + + if frame.length <= @remote_settings[:max_frame_size] + @writer.call(frame.assemble) + else + frame.split(@remote_settings[:max_frame_size]) { |splitted| + @writer.call(splitted.assemble) + } + end + end + def validate_received_frame(frame) if @state == :waiting_settings && frame.type != :settings - raise ConnectionError.new(:protocol_error) + raise RemoteConnectionError.new(:protocol_error) end if @state == :waiting_continuation if frame.type != :continuation || frame.stream_id != @continuation_id - raise ConnectionError.new(:protocol_error) + raise RemoteConnectionError.new(:protocol_error) end if frame.end_headers? @state = :open @@ -127,7 +137,7 @@ module Plum def receive_control_frame(frame) if frame.length > @local_settings[:max_frame_size] - raise ConnectionError.new(:frame_size_error) + raise RemoteConnectionError.new(:frame_size_error) end case frame.type @@ -140,7 +150,7 @@ module Plum when :goaway receive_goaway(frame) when :data, :headers, :priority, :rst_stream, :push_promise, :continuation - raise Plum::ConnectionError.new(:protocol_error) + raise Plum::RemoteConnectionError.new(:protocol_error) else # MUST ignore unknown frame type. end @@ -148,11 +158,11 @@ module Plum def receive_settings(frame, send_ack: true) if frame.ack? - raise ConnectionError.new(:frame_size_error) if frame.length != 0 + raise RemoteConnectionError.new(:frame_size_error) if frame.length != 0 callback(:settings_ack) return else - raise ConnectionError.new(:frame_size_error) if frame.length % 6 != 0 + raise RemoteConnectionError.new(:frame_size_error) if frame.length % 6 != 0 end old_remote_settings = @remote_settings.dup @@ -175,7 +185,7 @@ module Plum end def receive_ping(frame) - raise Plum::ConnectionError.new(:frame_size_error) if frame.length != 8 + raise Plum::RemoteConnectionError.new(:frame_size_error) if frame.length != 8 if frame.ack? callback(:ping_ack) diff --git a/lib/plum/errors.rb b/lib/plum/errors.rb index d5cce48..26fade1 100644 --- a/lib/plum/errors.rb +++ b/lib/plum/errors.rb @@ -31,8 +31,13 @@ module Plum ERROR_CODES[@http2_error_type] end end - class ConnectionError < HTTPError; end - class StreamError < HTTPError; end + + class RemoteHTTPError < HTTPError; end + class RemoteConnectionError < RemoteHTTPError; end + class RemoteStreamError < RemoteHTTPError; end + class LocalHTTPError < HTTPError; end + class LocalConnectionError < LocalHTTPError; end + class LocalStreamError < LocalHTTPError; end class LegacyHTTPError < Error attr_reader :headers, :data, :parser @@ -44,7 +49,12 @@ module Plum end end - # Client - class LocalConnectionError < HTTPError; end - class LocalStreamError < HTTPError; end + class DecoderError < Error + attr_reader :inner_error + + def initialize(message, inner_error = nil) + super(message) + @inner_error = inner_error + end + end end diff --git a/lib/plum/flow_control.rb b/lib/plum/flow_control.rb index cfb181d..ccb57dd 100644 --- a/lib/plum/flow_control.rb +++ b/lib/plum/flow_control.rb @@ -65,7 +65,7 @@ module Plum if frame.type == :data @recv_remaining_window -= frame.length if @recv_remaining_window < 0 - local_error = (Connection === self) ? ConnectionError : StreamError + local_error = (Connection === self) ? RemoteConnectionError : RemoteStreamError raise local_error.new(:flow_control_error) end end @@ -82,7 +82,7 @@ module Plum def receive_window_update(frame) if frame.length != 4 - raise Plum::ConnectionError.new(:frame_size_error) + raise Plum::RemoteConnectionError.new(:frame_size_error) end r_wsi = frame.payload.uint32 @@ -90,7 +90,7 @@ module Plum wsi = r_wsi # & ~(1 << 31) if wsi == 0 - local_error = (Connection === self) ? ConnectionError : StreamError + local_error = (Connection === self) ? RemoteConnectionError : RemoteStreamError raise local_error.new(:protocol_error) end diff --git a/lib/plum/frame_factory.rb b/lib/plum/frame_factory.rb index b4c1c01..de5a0b4 100644 --- a/lib/plum/frame_factory.rb +++ b/lib/plum/frame_factory.rb @@ -54,37 +54,45 @@ module Plum # Creates a DATA frame. # @param stream_id [Integer] The stream ID. # @param payload [String] Payload. - # @param flags [Array<Symbol>] Flags. - def data(stream_id, payload, *flags) + # @param end_stream [Boolean] add END_STREAM flag + def data(stream_id, payload, end_stream: false) payload = payload.b if payload && payload.encoding != Encoding::BINARY - Frame.new(type: :data, stream_id: stream_id, flags: flags, payload: payload) + fval = 0 + fval += 1 if end_stream + Frame.new(type_value: 0, stream_id: stream_id, flags_value: fval, payload: payload) end # Creates a HEADERS frame. # @param stream_id [Integer] The stream ID. # @param encoded [String] Headers. - # @param flags [Array<Symbol>] Flags. - def headers(stream_id, encoded, *flags) - Frame.new(type: :headers, stream_id: stream_id, flags: flags, payload: encoded) + # @param end_stream [Boolean] add END_STREAM flag + # @param end_headers [Boolean] add END_HEADERS flag + def headers(stream_id, encoded, end_stream: false, end_headers: false) + fval = 0 + fval += 1 if end_stream + fval += 4 if end_headers + Frame.new(type_value: 1, stream_id: stream_id, flags_value: fval, payload: encoded) end # Creates a PUSH_PROMISE frame. # @param stream_id [Integer] The stream ID. # @param new_id [Integer] The stream ID to create. # @param encoded [String] Request headers. - # @param flags [Array<Symbol>] Flags. - def push_promise(stream_id, new_id, encoded, *flags) + # @param end_headers [Boolean] add END_HEADERS flag + def push_promise(stream_id, new_id, encoded, end_headers: false) payload = String.new.push_uint32(new_id) .push(encoded) - Frame.new(type: :push_promise, stream_id: stream_id, flags: flags, payload: payload) + fval = 0 + fval += 4 if end_headers + Frame.new(type: :push_promise, stream_id: stream_id, flags_value: fval, payload: payload) end # Creates a CONTINUATION frame. # @param stream_id [Integer] The stream ID. # @param payload [String] Payload. - # @param flags [Array<Symbol>] Flags. - def continuation(stream_id, payload, *flags) - Frame.new(type: :continuation, stream_id: stream_id, flags: flags, payload: payload) + # @param end_headers [Boolean] add END_HEADERS flag + def continuation(stream_id, payload, end_headers: false) + Frame.new(type: :continuation, stream_id: stream_id, flags_value: (end_headers && 4 || 0), payload: payload) end end end diff --git a/lib/plum/frame_utils.rb b/lib/plum/frame_utils.rb index 20c79de..447c154 100644 --- a/lib/plum/frame_utils.rb +++ b/lib/plum/frame_utils.rb @@ -3,33 +3,28 @@ using Plum::BinaryString module Plum module FrameUtils - # Splits the DATA frame into multiple frames if the payload size exceeds max size. + # Splits this frame into multiple frames not to exceed MAX_FRAME_SIZE. # @param max [Integer] The maximum size of a frame payload. - # @return [Array<Frame>] The splitted frames. - def split_data(max) - return [self] if self.length <= max - raise "Frame type must be DATA" unless self.type == :data - - fragments = self.payload.each_byteslice(max).to_a - frames = fragments.map {|fragment| Frame.new(type: :data, flags: [], stream_id: self.stream_id, payload: fragment) } - frames.first.flags = self.flags - [:end_stream] - frames.last.flags = self.flags & [:end_stream] - frames - end - - # Splits the HEADERS or PUSH_PROMISE frame into multiple frames if the payload size exceeds max size. - # @param max [Integer] The maximum size of a frame payload. - # @return [Array<Frame>] The splitted frames. - def split_headers(max) - return [self] if self.length <= max - raise "Frame type must be HEADERS or PUSH_PROMISE" unless [:headers, :push_promise].include?(self.type) - - fragments = self.payload.each_byteslice(max).to_a - frames = fragments.map {|fragment| Frame.new(type: :continuation, flags: [], stream_id: self.stream_id, payload: fragment) } - frames.first.type_value = self.type_value - frames.first.flags = self.flags - [:end_headers] - frames.last.flags = self.flags & [:end_headers] - frames + # @yield [Frame] The splitted frames. + def split(max) + return yield self if @length <= max + first, *mid, last = @payload.chunk(max) + case type + when :data + yield Frame.new(type_value: 0, stream_id: @stream_id, payload: first, flags_value: @flags_value & ~1) + mid.each { |slice| + yield Frame.new(type_value: 0, stream_id: @stream_id, payload: slice, flags_value: 0) + } + yield Frame.new(type_value: 0, stream_id: @stream_id, payload: last, flags_value: @flags_value & 1) + when :headers, :push_promise + yield Frame.new(type_value: @type_value, stream_id: @stream_id, payload: first, flags_value: @flags_value & ~4) + mid.each { |slice| + yield Frame.new(type: :continuation, stream_id: @stream_id, payload: slice, flags_value: 0) + } + yield Frame.new(type: :continuation, stream_id: @stream_id, payload: last, flags_value: @flags_value & 4) + else + raise NotImplementedError.new("frame split of frame with type #{type} is not supported") + end end # Parses SETTINGS frame payload. Ignores unknown settings type (see RFC7540 6.5.2). diff --git a/lib/plum/hpack/constants.rb b/lib/plum/hpack/constants.rb index 4eac05c..6993735 100644 --- a/lib/plum/hpack/constants.rb +++ b/lib/plum/hpack/constants.rb @@ -67,6 +67,8 @@ module Plum ["www-authenticate", ""], ].freeze + STATIC_TABLE_SIZE = STATIC_TABLE.size + HUFFMAN_TABLE = [ "1111111111000", "11111111111111111011000", diff --git a/lib/plum/hpack/context.rb b/lib/plum/hpack/context.rb index 622fe36..1d7f7d6 100644 --- a/lib/plum/hpack/context.rb +++ b/lib/plum/hpack/context.rb @@ -26,10 +26,10 @@ module Plum def fetch(index) if index == 0 raise HPACKError.new("index can't be 0") - elsif index <= STATIC_TABLE.size - STATIC_TABLE[index - 1] + elsif index <= STATIC_TABLE_SIZE + STATIC_TABLE[index - 1] elsif index <= STATIC_TABLE.size + @dynamic_table.size - @dynamic_table[index - STATIC_TABLE.size - 1] + @dynamic_table[index - STATIC_TABLE_SIZE - 1] else raise HPACKError.new("invalid index: #{index}") end @@ -43,7 +43,7 @@ module Plum si = STATIC_TABLE.index &pr return si + 1 if si di = @dynamic_table.index &pr - return di + STATIC_TABLE.size + 1 if di + return di + STATIC_TABLE_SIZE + 1 if di end def evict diff --git a/lib/plum/rack/session.rb b/lib/plum/rack/session.rb index 0d7023e..eddb036 100644 --- a/lib/plum/rack/session.rb +++ b/lib/plum/rack/session.rb @@ -50,6 +50,7 @@ module Plum @plum.on(:data) { |stream, d| reqs[stream][:data] << d # TODO: store to file? + check_window(stream) } @plum.on(:end_stream) { |stream| @@ -63,6 +64,12 @@ module Plum } end + def check_window(stream) + ws = @plum.local_settings[:initial_window_size] + stream.window_update(ws) if stream.recv_remaining_window < (ws / 2) + @plum.window_update(ws) if @plum.recv_remaining_window < (ws / 2) + end + def send_body(stream, body) begin if body.is_a?(IO) diff --git a/lib/plum/server/connection.rb b/lib/plum/server/connection.rb index 450dcf6..a82d4aa 100644 --- a/lib/plum/server/connection.rb +++ b/lib/plum/server/connection.rb @@ -29,7 +29,7 @@ module Plum def negotiate! unless CLIENT_CONNECTION_PREFACE.start_with?(@buffer.byteslice(0, 24)) - raise ConnectionError.new(:protocol_error) # (MAY) send GOAWAY. sending. + raise RemoteConnectionError.new(:protocol_error) # (MAY) send GOAWAY. sending. end if @buffer.bytesize >= 24 diff --git a/lib/plum/server/http_connection.rb b/lib/plum/server/http_connection.rb index 7111bb4..8f91ca4 100644 --- a/lib/plum/server/http_connection.rb +++ b/lib/plum/server/http_connection.rb @@ -23,7 +23,7 @@ module Plum private def negotiate! super - rescue ConnectionError + rescue RemoteConnectionError # Upgrade from HTTP/1.1 offset = @_http_parser << @buffer @buffer.byteshift(offset) @@ -59,10 +59,10 @@ module Plum process_first_request } - resp = "HTTP/1.1 101 Switching Protocols\r\n" - "Connection: Upgrade\r\n" - "Upgrade: h2c\r\n" - "Server: plum/#{Plum::VERSION}\r\n" + resp = "HTTP/1.1 101 Switching Protocols\r\n" + + "Connection: Upgrade\r\n" + + "Upgrade: h2c\r\n" + + "Server: plum/#{Plum::VERSION}\r\n" + "\r\n" @sock.write(resp) @@ -77,9 +77,8 @@ module Plum ":authority" => @_headers["host"] }) .reject {|n, v| ["connection", "http2-settings", "upgrade", "host"].include?(n) } - headers_s = Frame.headers(1, encoder.encode(headers), :end_headers).split_headers(max_frame_size) # stream ID is 1 - data_s = Frame.data(1, @_body, :end_stream).split_data(max_frame_size) - (headers_s + data_s).each {|frag| stream.receive_frame(frag) } + stream.receive_frame Frame.headers(1, encoder.encode(headers), end_headers: true) + stream.receive_frame Frame.data(1, @_body, end_stream: true) end end end diff --git a/lib/plum/server/https_connection.rb b/lib/plum/server/https_connection.rb index 09e360f..bac1b4b 100644 --- a/lib/plum/server/https_connection.rb +++ b/lib/plum/server/https_connection.rb @@ -10,7 +10,7 @@ module Plum if @sock.respond_to?(:cipher) # OpenSSL::SSL::SSLSocket-like if CIPHER_BLACKLIST.include?(@sock.cipher.first) # [cipher-suite, ssl-version, keylen, alglen] on(:negotiated) { - raise ConnectionError.new(:inadequate_security) + raise RemoteConnectionError.new(:inadequate_security) } end end diff --git a/lib/plum/stream.rb b/lib/plum/stream.rb index 61c7cd5..04190fc 100644 --- a/lib/plum/stream.rb +++ b/lib/plum/stream.rb @@ -47,20 +47,20 @@ module Plum when :push_promise receive_push_promise(frame) when :ping, :goaway, :settings - raise ConnectionError.new(:protocol_error) # stream_id MUST be 0x00 + raise RemoteConnectionError.new(:protocol_error) # stream_id MUST be 0x00 else # MUST ignore unknown frame end - rescue StreamError => e + rescue RemoteStreamError => e callback(:stream_error, e) - close(e.http2_error_type) + send_immediately Frame.rst_stream(id, e.http2_error_type) + close end # Closes this stream. Sends RST_STREAM frame to the peer. - # @param error_type [Symbol] The error type to be contained in the RST_STREAM frame. - def close(error_type = :no_error) + def close @state = :closed - send_immediately Frame.rst_stream(id, error_type) + callback(:close) end # @api private @@ -70,7 +70,7 @@ module Plum # @api private def update_dependency(weight: nil, parent: nil, exclusive: nil) - raise StreamError.new(:protocol_error, "A stream cannot depend on itself.") if parent == self + raise RemoteStreamError.new(:protocol_error, "A stream cannot depend on itself.") if parent == self if weight @weight = weight @@ -102,9 +102,9 @@ module Plum def validate_received_frame(frame) if frame.length > @connection.local_settings[:max_frame_size] if [:headers, :push_promise, :continuation].include?(frame.type) - raise ConnectionError.new(:frame_size_error) + raise RemoteConnectionError.new(:frame_size_error) else - raise StreamError.new(:frame_size_error) + raise RemoteStreamError.new(:frame_size_error) end end end @@ -116,13 +116,13 @@ module Plum def receive_data(frame) if @state != :open && @state != :half_closed_local - raise StreamError.new(:stream_closed) + raise RemoteStreamError.new(:stream_closed) end if frame.padded? padding_length = frame.payload.uint8 if padding_length >= frame.length - raise ConnectionError.new(:protocol_error, "padding is too long") + raise RemoteConnectionError.new(:protocol_error, "padding is too long") end callback(:data, frame.payload.byteslice(1, frame.length - padding_length - 1)) else @@ -149,7 +149,7 @@ module Plum end if padding_length > payload.bytesize - raise ConnectionError.new(:protocol_error, "padding is too long") + raise RemoteConnectionError.new(:protocol_error, "padding is too long") end frames.each do |frame| @@ -159,7 +159,7 @@ module Plum begin decoded_headers = @connection.hpack_decoder.decode(payload) rescue => e - raise ConnectionError.new(:compression_error, e) + raise RemoteConnectionError.new(:compression_error, e) end callback(:headers, decoded_headers) @@ -169,15 +169,15 @@ module Plum def receive_headers(frame) if @state == :reserved_local - raise ConnectionError.new(:protocol_error) + raise RemoteConnectionError.new(:protocol_error) elsif @state == :half_closed_remote - raise StreamError.new(:stream_closed) + raise RemoteStreamError.new(:stream_closed) elsif @state == :closed - raise ConnectionError.new(:stream_closed) + raise RemoteConnectionError.new(:stream_closed) elsif @state == :closed_implicitly - raise ConnectionError.new(:protocol_error) + raise RemoteConnectionError.new(:protocol_error) elsif @state == :idle && self.id.even? - raise ConnectionError.new(:protocol_error) + raise RemoteConnectionError.new(:protocol_error) end @state = :open @@ -195,10 +195,10 @@ module Plum if promised_stream.state == :closed_implicitly # 5.1.1 An endpoint that receives an unexpected stream identifier MUST respond with a connection error of type PROTOCOL_ERROR. - raise ConnectionError.new(:protocol_error) + raise RemoteConnectionError.new(:protocol_error) elsif promised_id.odd? # 5.1.1 Streams initiated by the server MUST use even-numbered stream identifiers. - raise ConnectionError.new(:protocol_error) + raise RemoteConnectionError.new(:protocol_error) end end @@ -214,7 +214,7 @@ module Plum def receive_priority(frame) if frame.length != 5 - raise StreamError.new(:frame_size_error) + raise RemoteStreamError.new(:frame_size_error) end receive_priority_payload(frame.payload) end @@ -230,13 +230,18 @@ module Plum def receive_rst_stream(frame) if frame.length != 4 - raise ConnectionError.new(:frame_size_error) + raise RemoteConnectionError.new(:frame_size_error) elsif @state == :idle - raise ConnectionError.new(:protocol_error) + raise RemoteConnectionError.new(:protocol_error) end - - callback(:rst_stream, frame) @state = :closed # MUST NOT send RST_STREAM + + error_code = frame.payload.uint32 + if error_code > 0 + raise LocalStreamError.new(HTTPError::ERROR_CODES.key(error_code)) + else + callback(:rst_stream, frame) + end end # override EventEmitter diff --git a/lib/plum/stream_utils.rb b/lib/plum/stream_utils.rb index a8d959f..dbb8d96 100644 --- a/lib/plum/stream_utils.rb +++ b/lib/plum/stream_utils.rb @@ -3,28 +3,14 @@ using Plum::BinaryString module Plum module StreamUtils - # Responds to a HTTP request. - # @param headers [Enumerable<String, String>] The response headers. - # @param body [String, IO] The response body. - def respond(headers, body = nil, end_stream: true) # TODO: priority, padding - if body - send_headers(headers, end_stream: false) - send_data(body, end_stream: end_stream) - else - send_headers(headers, end_stream: end_stream) - end - end - # Reserves a stream to server push. Sends PUSH_PROMISE and create new stream. # @param headers [Enumerable<String, String>] The *request* headers. It must contain all of them: ':authority', ':method', ':scheme' and ':path'. # @return [Stream] The stream to send push response. def promise(headers) stream = @connection.reserve_stream(weight: self.weight + 1, parent: self) encoded = @connection.hpack_encoder.encode(headers) - original = Frame.push_promise(id, stream.id, encoded, :end_headers) - original.split_headers(@connection.remote_settings[:max_frame_size]).each do |frame| - send frame - end + frame = Frame.push_promise(id, stream.id, encoded, end_headers: true) + send frame stream end @@ -34,10 +20,8 @@ module Plum def send_headers(headers, end_stream:) max = @connection.remote_settings[:max_frame_size] encoded = @connection.hpack_encoder.encode(headers) - original_frame = Frame.headers(id, encoded, :end_headers, (end_stream && :end_stream || nil)) - original_frame.split_headers(max).each do |frame| - send frame - end + frame = Frame.headers(id, encoded, end_headers: true, end_stream: end_stream) + send frame @state = :half_closed_local if end_stream end @@ -47,14 +31,12 @@ module Plum def send_data(data, end_stream: true) max = @connection.remote_settings[:max_frame_size] if data.is_a?(IO) - while !data.eof? && fragment = data.readpartial(max) - send Frame.data(id, fragment, (end_stream && data.eof? && :end_stream)) + until data.eof? + fragment = data.readpartial(max) + send Frame.data(id, fragment, end_stream: end_stream && data.eof?) end else - original = Frame.data(id, data, (end_stream && :end_stream)) - original.split_data(max).each do |frame| - send frame - end + send Frame.data(id, data, end_stream: end_stream) end @state = :half_closed_local if end_stream end diff --git a/lib/plum/version.rb b/lib/plum/version.rb index 1e4ed3e..bbf27cf 100644 --- a/lib/plum/version.rb +++ b/lib/plum/version.rb @@ -1,4 +1,4 @@ # -*- frozen-string-literal: true -*- module Plum - VERSION = "0.1.3" + VERSION = "0.2.1" end diff --git a/lib/rack/handler/plum.rb b/lib/rack/handler/plum.rb index cf34ee4..97108a8 100644 --- a/lib/rack/handler/plum.rb +++ b/lib/rack/handler/plum.rb @@ -1,4 +1,6 @@ # -*- frozen-string-literal: true -*- +require "plum/rack" + module Rack module Handler class Plum |