From 6418b22ff8fcaf7c3b7b1c3a81d2b98c23f66ea3 Mon Sep 17 00:00:00 2001 From: Kazuki Yamaguchi Date: Sat, 7 Nov 2015 16:42:36 +0900 Subject: client: hostname is set in SSLSocket, not in SSLContext --- lib/plum/client.rb | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'lib') diff --git a/lib/plum/client.rb b/lib/plum/client.rb index 35c4001..197b214 100644 --- a/lib/plum/client.rb +++ b/lib/plum/client.rb @@ -166,6 +166,9 @@ module Plum if config[:tls] ctx = @config[:ssl_context] || new_ssl_ctx sock = OpenSSL::SSL::SSLSocket.new(sock, ctx) + if sock.respond_to?(:hostname=) + sock.hostname = @config[:hostname] || @host + end sock.sync_close = true sock.connect end @@ -228,9 +231,6 @@ module Plum ctx = OpenSSL::SSL::SSLContext.new ctx.ssl_version = :TLSv1_2 ctx.verify_mode = @config[:verify_mode] - if ctx.respond_to?(:hostname=) - ctx.hostname = @config[:hostname] || @host - end if ctx.respond_to?(:alpn_protocols) ctx.alpn_protocols = ["h2", "http/1.1"] end -- cgit v1.2.3 From 6b0f6ef5977f68561d0944b8b8986be496e84e7b Mon Sep 17 00:00:00 2001 From: Kazuki Yamaguchi Date: Sat, 7 Nov 2015 17:11:32 +0900 Subject: client: set certificate store to ssl context --- lib/plum/client.rb | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) (limited to 'lib') diff --git a/lib/plum/client.rb b/lib/plum/client.rb index 197b214..5713acb 100644 --- a/lib/plum/client.rb +++ b/lib/plum/client.rb @@ -166,11 +166,10 @@ module Plum if config[:tls] ctx = @config[:ssl_context] || new_ssl_ctx sock = OpenSSL::SSL::SSLSocket.new(sock, ctx) - if sock.respond_to?(:hostname=) - sock.hostname = @config[:hostname] || @host - end + sock.hostname = (@config[:hostname] || @host) if sock.respond_to?(:hostname=) sock.sync_close = true sock.connect + sock.post_connection_check(@config[:hostname] || @host) end @socket = sock @@ -231,11 +230,14 @@ module Plum ctx = OpenSSL::SSL::SSLContext.new ctx.ssl_version = :TLSv1_2 ctx.verify_mode = @config[:verify_mode] + cert_store = OpenSSL::X509::Store.new + cert_store.set_default_paths + ctx.cert_store = cert_store if ctx.respond_to?(:alpn_protocols) ctx.alpn_protocols = ["h2", "http/1.1"] end if ctx.respond_to?(:npn_select_cb) - ctx.alpn_select_cb = -> protocols { + ctx.npn_select_cb = -> protocols { protocols.include?("h2") ? "h2" : protocols.first } end -- cgit v1.2.3 From b12d3a31530a6dfffb9bfe473ca8be888190ac0d Mon Sep 17 00:00:00 2001 From: Kazuki Yamaguchi Date: Sat, 7 Nov 2015 17:35:19 +0900 Subject: client: refactoring --- lib/plum/client.rb | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) (limited to 'lib') diff --git a/lib/plum/client.rb b/lib/plum/client.rb index 5713acb..526fb69 100644 --- a/lib/plum/client.rb +++ b/lib/plum/client.rb @@ -5,6 +5,7 @@ module Plum tls: true, scheme: "https", verify_mode: OpenSSL::SSL::VERIFY_PEER, + ssl_context: nil }.freeze attr_reader :host, :port, :config @@ -28,7 +29,7 @@ module Plum @host = host @port = port || (config[:tls] ? 443 : 80) end - @config = DEFAULT_CONFIG.merge(config) + @config = DEFAULT_CONFIG.merge(hostname: host).merge(config) @response_handlers = {} @responses = {} @started = false @@ -105,8 +106,8 @@ module Plum base_headers = { ":method" => nil, ":path" => nil, - ":authority" => (@config[:hostname] || @host), - ":scheme" => (@config[:scheme] || "https") } + ":authority" => @config[:hostname], + ":scheme" => @config[:scheme] } response = request_async(base_headers.merge(headers), body) wait(response) @@ -166,10 +167,10 @@ module Plum if config[:tls] ctx = @config[:ssl_context] || new_ssl_ctx sock = OpenSSL::SSL::SSLSocket.new(sock, ctx) - sock.hostname = (@config[:hostname] || @host) if sock.respond_to?(:hostname=) + sock.hostname = @config[:hostname] if sock.respond_to?(:hostname=) sock.sync_close = true sock.connect - sock.post_connection_check(@config[:hostname] || @host) + sock.post_connection_check(@config[:hostname]) end @socket = sock -- cgit v1.2.3 From 2535477c65a0093aa14324a5c47d8ea12b6da255 Mon Sep 17 00:00:00 2001 From: Kazuki Yamaguchi Date: Sun, 8 Nov 2015 12:12:45 +0900 Subject: client: split HTTP/2-specific stuffs to ClientSession --- lib/plum.rb | 1 + lib/plum/client.rb | 132 +++++++++++--------------------------- lib/plum/client/client_session.rb | 82 +++++++++++++++++++++++ lib/plum/client/response.rb | 4 +- test/plum/client/test_client.rb | 4 +- test/plum/client/test_response.rb | 14 +++- 6 files changed, 135 insertions(+), 102 deletions(-) create mode 100644 lib/plum/client/client_session.rb (limited to 'lib') diff --git a/lib/plum.rb b/lib/plum.rb index bcd72d2..7b8e2ed 100644 --- a/lib/plum.rb +++ b/lib/plum.rb @@ -25,3 +25,4 @@ require "plum/server/http_connection" require "plum/client" require "plum/client/response" require "plum/client/connection" +require "plum/client/client_session" diff --git a/lib/plum/client.rb b/lib/plum/client.rb index 526fb69..6e53ade 100644 --- a/lib/plum/client.rb +++ b/lib/plum/client.rb @@ -30,8 +30,6 @@ module Plum @port = port || (config[:tls] ? 443 : 80) end @config = DEFAULT_CONFIG.merge(hostname: host).merge(config) - @response_handlers = {} - @responses = {} @started = false end @@ -54,23 +52,25 @@ module Plum # Waits for the asynchronous response(s) to finish. # @param response [Response] if specified, waits only for the response + # @return [Response] if parameter response is specified def wait(response = nil) if response - _succ while !response.failed? && !response.finished? + @session.succ until response.failed? || response.finished? + response else - _succ while !@responses.empty? + @session.succ until @session.empty? end end # Waits for the response headers. # @param response [Response] the incomplete response. def wait_headers(response) - _succ while !response.failed? && !response.headers + @session.succ while !response.failed? && !response.headers end - # Closes the connection. + # Closes the connection immediately. def close - @plum.close if @plum + @session.close if @session ensure @socket.close if @socket end @@ -78,40 +78,16 @@ module Plum # Creates a new HTTP request. # @param headers [Hash] the request headers # @param body [String] the request body - # @param block [Proc] if specified, calls the block when finished + # @param block [Proc] if passed, it will be called when received response headers. def request_async(headers, body = nil, &block) - stream = @plum.open_stream - response = Response.new - @responses[stream] = response - - if body - stream.send_headers(headers, end_stream: false) - stream.send_data(body, end_stream: true) - else - stream.send_headers(headers, end_stream: true) - end - - if block_given? - @response_handlers[stream] = block - end - - response + @session.request(headers, body, &block) end # Creates a new HTTP request and waits for the response # @param headers [Hash] the request headers # @param body [String] the request body - def request(headers, body = nil) - raise ArgumentError, ":method and :path headers are required" unless headers[":method"] && headers[":path"] - - base_headers = { ":method" => nil, - ":path" => nil, - ":authority" => @config[:hostname], - ":scheme" => @config[:scheme] } - - response = request_async(base_headers.merge(headers), body) - wait(response) - response + def request(headers, body = nil, &block) + wait @session.request(headers, body, &block) end # @!method get @@ -129,8 +105,8 @@ module Plum # @param block [Proc] if specified, calls the block when finished # Shorthand method for `#request_async` %w(GET HEAD DELETE).each { |method| - define_method(:"#{method.downcase}") do |path, headers = {}| - request({ ":method" => method, ":path" => path }.merge(headers)) + define_method(:"#{method.downcase}") do |path, headers = {}, &block| + request({ ":method" => method, ":path" => path }.merge(headers), &block) end define_method(:"#{method.downcase}_async") do |path, headers = {}, &block| request_async({ ":method" => method, ":path" => path }.merge(headers), nil, &block) @@ -151,8 +127,8 @@ module Plum # @param block [Proc] if specified, calls the block when finished # Shorthand method for `#request_async` %w(POST PUT).each { |method| - define_method(:"#{method.downcase}") do |path, body = nil, headers = {}| - request({ ":method" => method, ":path" => path }.merge(headers), body) + define_method(:"#{method.downcase}") do |path, body = nil, headers = {}, &block| + request({ ":method" => method, ":path" => path }.merge(headers), body, &block) end define_method(:"#{method.downcase}_async") do |path, body = nil, headers = {}, &block| request_async({ ":method" => method, ":path" => path }.merge(headers), body, &block) @@ -162,69 +138,33 @@ module Plum private def _start @started = true + + http2 = true unless @socket - sock = TCPSocket.open(host, port) + @socket = TCPSocket.open(host, port) if config[:tls] ctx = @config[:ssl_context] || new_ssl_ctx - sock = OpenSSL::SSL::SSLSocket.new(sock, ctx) - sock.hostname = @config[:hostname] if sock.respond_to?(:hostname=) - sock.sync_close = true - sock.connect - sock.post_connection_check(@config[:hostname]) + @socket = OpenSSL::SSL::SSLSocket.new(@socket, ctx) + @socket.hostname = @config[:hostname] if @socket.respond_to?(:hostname=) + @socket.sync_close = true + @socket.connect + @socket.post_connection_check(@config[:hostname]) if ctx.verify_mode != OpenSSL::SSL::VERIFY_NONE + + if @socket.respond_to?(:alpn_protocol) + http2 = @socket.alpn_protocol == "h2" + elsif sock.respond_to?(:npn_protocol) + http2 = @socket.npn_protocol == "h2" + else + http2 = false + end end - - @socket = sock end - @plum = setup_plum(@socket) - end - - def setup_plum(sock) - local_settings = { - enable_push: 0, - initial_window_size: (1 << 30) - 1, - } - plum = ClientConnection.new(sock.method(:write), local_settings) - plum.on(:protocol_error) { |ex| - _fail(ex) - raise ex - } - plum.on(:close) { _fail(RuntimeError.new(:closed)) } - plum.on(:stream_error) { |stream, ex| - if res = @responses.delete(stream) - res._fail(ex) unless res.finished? - end - raise ex - } - plum.on(:headers) { |stream, headers| - response = @responses[stream] - response._headers(headers) - if handler = @response_handlers.delete(stream) - handler.call(response) - end - } - plum.on(:data) { |stream, chunk| - response = @responses[stream] - response._chunk(chunk) - } - plum.on(:end_stream) { |stream| - response = @responses.delete(stream) - response._finish - } - plum - end - - def _succ - @plum << @socket.readpartial(1024) - end - - def _fail(ex) - while sr = @responses.shift - stream, res = sr - res._fail(ex) unless res.finished? + if http2 + @session = ClientSession.new(@socket, @config) + else + @session = LegacyClientSession.new(@socket, @config) end - ensure - close end def new_ssl_ctx @@ -237,7 +177,7 @@ module Plum if ctx.respond_to?(:alpn_protocols) ctx.alpn_protocols = ["h2", "http/1.1"] end - if ctx.respond_to?(:npn_select_cb) + if ctx.respond_to?(:npn_select_cb) # TODO: RFC 7540 does not define protocol negotiation with NPN ctx.npn_select_cb = -> protocols { protocols.include?("h2") ? "h2" : protocols.first } diff --git a/lib/plum/client/client_session.rb b/lib/plum/client/client_session.rb new file mode 100644 index 0000000..470f100 --- /dev/null +++ b/lib/plum/client/client_session.rb @@ -0,0 +1,82 @@ +# -*- frozen-string-literal: true -*- +module Plum + class ClientSession + HTTP2_DEFAULT_SETTINGS = { + enable_push: 0, # TODO: api? + initial_window_size: (1 << 30) - 1, # TODO: maximum size: disable flow control + } + + def initialize(socket, config) + @socket = socket + @config = config + + @plum = setup_plum + @responses = Set.new + end + + def succ + @plum << @socket.readpartial(1024) + rescue => e + fail(e) + end + + def empty? + @responses.empty? + end + + def close + @closed = true + @responses.each { |response| response._fail } + @responses.clear + @plum.close + end + + def request(headers, body = nil, &headers_cb) + raise ArgumentError, ":method and :path headers are required" unless headers[":method"] && headers[":path"] + + @responses << (response = Response.new) + + headers = { ":method" => nil, + ":path" => nil, + ":authority" => @config[:hostname], + ":scheme" => @config[:scheme] + }.merge(headers) + + stream = @plum.open_stream + stream.send_headers(headers, end_stream: !body) + stream.send_data(body, end_stream: true) if body + + stream.on(:headers) { |resp_headers_raw| + response._headers(resp_headers_raw) + headers_cb.call(response) if headers_cb + } + stream.on(:data) { |chunk| + response._chunk(chunk) + } + stream.on(:end_stream) { + response._finish + @responses.delete(response) + } + stream.on(:stream_error) { |ex| + response._fail + raise ex + } + + response + end + + private + def fail(exception) + close + raise exception + end + + def setup_plum + plum = ClientConnection.new(@socket.method(:write), HTTP2_DEFAULT_SETTINGS) + plum.on(:connection_error) { |ex| + fail(ex) + } + plum + end + end +end diff --git a/lib/plum/client/response.rb b/lib/plum/client/response.rb index 8270e95..5c0b2a0 100644 --- a/lib/plum/client/response.rb +++ b/lib/plum/client/response.rb @@ -13,7 +13,7 @@ module Plum @body = [] end - # Return the HTTP status code. + # Returns the HTTP status code. # @return [String] the HTTP status code def status @headers && @headers[":status"] @@ -81,7 +81,7 @@ module Plum end # @api private - def _fail(ex) + def _fail @failed = true end end diff --git a/test/plum/client/test_client.rb b/test/plum/client/test_client.rb index f8b64b0..ef93e40 100644 --- a/test/plum/client/test_client.rb +++ b/test/plum/client/test_client.rb @@ -7,7 +7,7 @@ class ClientTest < Minitest::Test client = Client.start("127.0.0.1", LISTEN_PORT, https: true, verify_mode: OpenSSL::SSL::VERIFY_NONE) res1 = client.request({ ":path" => "/", ":method" => "POST", ":scheme" => "https", "header" => "ccc" }, "abc") assert_equal("POSTcccabc", res1.body) - res2 = client.put("/", "aaa", { ":scheme" => "https", "header" => "ccc" }) + res2 = client.put("/", "aaa", { "header" => "ccc" }) assert_equal("PUTcccaaa", res2.body) client.close ensure @@ -93,7 +93,7 @@ class ClientTest < Minitest::Test server_thread = Thread.new { plum = nil begin - Timeout.timeout(3) { + Timeout.timeout(1) { sock = ssl_server.accept plum = HTTPSServerConnection.new(sock) diff --git a/test/plum/client/test_response.rb b/test/plum/client/test_response.rb index eb98e3d..553bf45 100644 --- a/test/plum/client/test_response.rb +++ b/test/plum/client/test_response.rb @@ -11,8 +11,7 @@ class ResponseTest < Minitest::Test def test_fail resp = Response.new - resp._chunk("a") - resp._fail(RuntimeError.new) + resp._fail assert(true, resp.failed?) end @@ -41,6 +40,15 @@ class ResponseTest < Minitest::Test assert_equal("ab", resp.body) end + def test_body_not_finished + resp = Response.new + resp._chunk("a") + resp._chunk("b") + assert_raises { # TODO + resp.body + } + end + def test_on_chunk resp = Response.new res = [] @@ -49,5 +57,7 @@ class ResponseTest < Minitest::Test resp._finish resp.on_chunk { |chunk| res << chunk } assert_equal(["a", "b"], res) + resp._chunk("c") + assert_equal(["a", "b", "c"], res) end end -- cgit v1.2.3 From 4fd79fb69c1f49620d9ec19497fd8a2096fcc44f Mon Sep 17 00:00:00 2001 From: Kazuki Yamaguchi Date: Sun, 8 Nov 2015 13:26:52 +0900 Subject: client: add HTTP/1.x client --- lib/plum.rb | 1 + lib/plum/client/legacy_client_session.rb | 88 ++++++++++++++++++++++++++++++++ 2 files changed, 89 insertions(+) create mode 100644 lib/plum/client/legacy_client_session.rb (limited to 'lib') diff --git a/lib/plum.rb b/lib/plum.rb index 7b8e2ed..5c754ea 100644 --- a/lib/plum.rb +++ b/lib/plum.rb @@ -26,3 +26,4 @@ require "plum/client" require "plum/client/response" require "plum/client/connection" require "plum/client/client_session" +require "plum/client/legacy_client_session" diff --git a/lib/plum/client/legacy_client_session.rb b/lib/plum/client/legacy_client_session.rb new file mode 100644 index 0000000..731dfc3 --- /dev/null +++ b/lib/plum/client/legacy_client_session.rb @@ -0,0 +1,88 @@ +# -*- frozen-string-literal: true -*- +module Plum + # HTTP/1.x client session. + class LegacyClientSession + # Creates a new HTTP/1.1 client session + def initialize(socket, config) + require "http/parser" + @socket = socket + @config = config + + @parser = setup_parser + @requests = [] + @response = nil + @headers_callback = nil + end + + def succ + @parser << @socket.readpartial(1024) + rescue => e # including HTTP::Parser::Error + fail(e) + end + + def empty? + !@response + end + + def close + @closed = true + @response._fail if @response + end + + def request(headers, body = nil, &headers_cb) + response = Response.new + @requests << [response, headers, body, headers_cb] + consume_queue + response + end + + private + def fail(exception) + close + raise exception + end + + def consume_queue + return if @response + + response, headers, body, cb = @requests.shift + headers["host"] = headers[":authority"] || headers["host"] || @config[:hostname] + @response = response + @headers_callback = cb + + @socket << "%s %s HTTP/1.1\r\n" % [headers[":method"], headers[":path"]] + headers.each { |key, value| + next if key.start_with?(":") # HTTP/2 psuedo headers + @socket << "%s: %s\r\n" % [key, value] + } + @socket << "\r\n" + + if body + @socket << body + end + end + + def setup_parser + parser = HTTP::Parser.new + parser.on_headers_complete = proc { + resp_headers = parser.headers.map { |key, value| [key.downcase, value] }.to_h + @response._headers({ ":status" => parser.status_code }.merge(resp_headers)) + @headers_callback.call(@response) if @headers_callback + } + + parser.on_body = proc { |chunk| + @response._chunk(chunk) + } + + parser.on_message_complete = proc { |env| + @response._finish + @response = nil + @headers_callback = nil + close unless parser.keep_alive? + consume_queue + } + + parser + end + end +end -- cgit v1.2.3 From cf95e6c3fefcb21e5c4784497544af0143f45200 Mon Sep 17 00:00:00 2001 From: Kazuki Yamaguchi Date: Sun, 8 Nov 2015 15:47:26 +0900 Subject: client/client_session: fix flow control handling --- lib/plum/client.rb | 8 ++++++-- lib/plum/client/client_session.rb | 25 ++++++++++++++++--------- 2 files changed, 22 insertions(+), 11 deletions(-) (limited to 'lib') diff --git a/lib/plum/client.rb b/lib/plum/client.rb index 6e53ade..28f0cdd 100644 --- a/lib/plum/client.rb +++ b/lib/plum/client.rb @@ -5,7 +5,9 @@ module Plum tls: true, scheme: "https", verify_mode: OpenSSL::SSL::VERIFY_PEER, - ssl_context: nil + ssl_context: nil, + hostname: nil, + http2_settings: {}, }.freeze attr_reader :host, :port, :config @@ -80,6 +82,7 @@ module Plum # @param body [String] the request body # @param block [Proc] if passed, it will be called when received response headers. def request_async(headers, body = nil, &block) + raise ArgumentError, ":method and :path headers are required" unless headers[":method"] && headers[":path"] @session.request(headers, body, &block) end @@ -87,7 +90,7 @@ module Plum # @param headers [Hash] the request headers # @param body [String] the request body def request(headers, body = nil, &block) - wait @session.request(headers, body, &block) + wait request_async(headers, body, &block) end # @!method get @@ -171,6 +174,7 @@ module Plum ctx = OpenSSL::SSL::SSLContext.new ctx.ssl_version = :TLSv1_2 ctx.verify_mode = @config[:verify_mode] + ctx.ciphers = "ALL:!" + HTTPSServerConnection::CIPHER_BLACKLIST.join(":!") cert_store = OpenSSL::X509::Store.new cert_store.set_default_paths ctx.cert_store = cert_store diff --git a/lib/plum/client/client_session.rb b/lib/plum/client/client_session.rb index 470f100..c637aa5 100644 --- a/lib/plum/client/client_session.rb +++ b/lib/plum/client/client_session.rb @@ -1,14 +1,16 @@ # -*- frozen-string-literal: true -*- module Plum + # HTTP/2 client session. class ClientSession HTTP2_DEFAULT_SETTINGS = { enable_push: 0, # TODO: api? - initial_window_size: (1 << 30) - 1, # TODO: maximum size: disable flow control + initial_window_size: 2 ** 30, # TODO } def initialize(socket, config) @socket = socket @config = config + @http2_settings = HTTP2_DEFAULT_SETTINGS.merge(@config[:http2_settings]) @plum = setup_plum @responses = Set.new @@ -26,22 +28,20 @@ module Plum def close @closed = true - @responses.each { |response| response._fail } + @responses.each(&:_fail) @responses.clear @plum.close end def request(headers, body = nil, &headers_cb) - raise ArgumentError, ":method and :path headers are required" unless headers[":method"] && headers[":path"] - - @responses << (response = Response.new) - headers = { ":method" => nil, ":path" => nil, ":authority" => @config[:hostname], ":scheme" => @config[:scheme] - }.merge(headers) + }.merge(headers) + response = Response.new + @responses << response stream = @plum.open_stream stream.send_headers(headers, end_stream: !body) stream.send_data(body, end_stream: true) if body @@ -52,6 +52,7 @@ module Plum } stream.on(:data) { |chunk| response._chunk(chunk) + check_window(stream) } stream.on(:end_stream) { response._finish @@ -61,7 +62,6 @@ module Plum response._fail raise ex } - response end @@ -72,11 +72,18 @@ module Plum end def setup_plum - plum = ClientConnection.new(@socket.method(:write), HTTP2_DEFAULT_SETTINGS) + plum = ClientConnection.new(@socket.method(:write), @http2_settings) plum.on(:connection_error) { |ex| fail(ex) } + plum.window_update(@http2_settings[:initial_window_size]) plum end + + def check_window(stream) + ws = @http2_settings[:initial_window_size] + stream.window_update(ws) if stream.recv_remaining_window < (ws / 2) + @plum.window_update(ws) if @plum.recv_remaining_window < (ws / 2) + end end end -- cgit v1.2.3 From 9272be6f787e19a90204faab23f9586e8136ec93 Mon Sep 17 00:00:00 2001 From: Kazuki Yamaguchi Date: Sun, 8 Nov 2015 16:51:49 +0900 Subject: client/legacy_session: consume_queue: early return if queue is empty --- lib/plum/client/legacy_client_session.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'lib') diff --git a/lib/plum/client/legacy_client_session.rb b/lib/plum/client/legacy_client_session.rb index 731dfc3..bc0c2bb 100644 --- a/lib/plum/client/legacy_client_session.rb +++ b/lib/plum/client/legacy_client_session.rb @@ -43,7 +43,7 @@ module Plum end def consume_queue - return if @response + return if @response || @requests.empty? response, headers, body, cb = @requests.shift headers["host"] = headers[":authority"] || headers["host"] || @config[:hostname] -- cgit v1.2.3 From 5628c38cc54aeef01f7ff7e6c4600d3baea87b87 Mon Sep 17 00:00:00 2001 From: Kazuki Yamaguchi Date: Sun, 8 Nov 2015 17:58:49 +0900 Subject: client: make #request accept more arguments --- README.md | 4 +-- lib/plum/client.rb | 43 ++++++++++++++++---------- lib/plum/client/client_session.rb | 2 +- lib/plum/client/legacy_client_session.rb | 2 +- test/plum/client/test_client.rb | 6 ++-- test/plum/client/test_legacy_client_session.rb | 8 ++--- 6 files changed, 38 insertions(+), 27 deletions(-) (limited to 'lib') diff --git a/README.md b/README.md index 2abc857..007c7be 100644 --- a/README.md +++ b/README.md @@ -16,8 +16,8 @@ If the server does't support HTTP/2, `Plum::Client` tries to use HTTP/1.x instea ##### Sequential request ```ruby -client = Plum::Client.start("http2.rhe.jp", 443) -res1 = client.get("/") +client = Plum::Client.start("http2.rhe.jp", 443, user_agent: "nyaan") +res1 = client.get("/", headers: { "accept" => "*/*" }) puts res1.body # => "..." res2 = client.post("/post", "data") puts res2.body # => "..." diff --git a/lib/plum/client.rb b/lib/plum/client.rb index 28f0cdd..64452f3 100644 --- a/lib/plum/client.rb +++ b/lib/plum/client.rb @@ -8,6 +8,7 @@ module Plum ssl_context: nil, hostname: nil, http2_settings: {}, + user_agent: "plum/#{Plum::VERSION}", }.freeze attr_reader :host, :port, :config @@ -80,61 +81,63 @@ module Plum # Creates a new HTTP request. # @param headers [Hash] the request headers # @param body [String] the request body + # @param options [Hash] request options # @param block [Proc] if passed, it will be called when received response headers. - def request_async(headers, body = nil, &block) + def request_async(headers, body, options = {}, &block) raise ArgumentError, ":method and :path headers are required" unless headers[":method"] && headers[":path"] - @session.request(headers, body, &block) + @session.request(headers, body, options, &block) end # Creates a new HTTP request and waits for the response # @param headers [Hash] the request headers # @param body [String] the request body - def request(headers, body = nil, &block) - wait request_async(headers, body, &block) + # @param options [Hash] the request options + def request(headers, body, options = {}, &block) + wait request_async(headers, body, options, &block) end # @!method get # @!method head # @!method delete # @param path [String] the absolute path to request (translated into :path header) - # @param headers [Hash] the request headers + # @param options [Hash] the request options # Shorthand method for `#request` # @!method get_async # @!method head_async # @!method delete_async # @param path [String] the absolute path to request (translated into :path header) - # @param headers [Hash] the request headers + # @param options [Hash] the request options # @param block [Proc] if specified, calls the block when finished # Shorthand method for `#request_async` %w(GET HEAD DELETE).each { |method| - define_method(:"#{method.downcase}") do |path, headers = {}, &block| - request({ ":method" => method, ":path" => path }.merge(headers), &block) + define_method(:"#{method.downcase}") do |path, options = {}, &block| + wait _request_helper(method, path, nil, options, &block) end - define_method(:"#{method.downcase}_async") do |path, headers = {}, &block| - request_async({ ":method" => method, ":path" => path }.merge(headers), nil, &block) + define_method(:"#{method.downcase}_async") do |path, options = {}, &block| + _request_helper(method, path, nil, options, &block) end } # @!method post # @!method put # @param path [String] the absolute path to request (translated into :path header) # @param body [String] the request body - # @param headers [Hash] the request headers + # @param options [Hash] the request options # Shorthand method for `#request` # @!method post_async # @!method put_async # @param path [String] the absolute path to request (translated into :path header) # @param body [String] the request body - # @param headers [Hash] the request headers + # @param options [Hash] the request options # @param block [Proc] if specified, calls the block when finished # Shorthand method for `#request_async` %w(POST PUT).each { |method| - define_method(:"#{method.downcase}") do |path, body = nil, headers = {}, &block| - request({ ":method" => method, ":path" => path }.merge(headers), body, &block) + define_method(:"#{method.downcase}") do |path, body, options = {}, &block| + wait _request_helper(method, path, body, options, &block) end - define_method(:"#{method.downcase}_async") do |path, body = nil, headers = {}, &block| - request_async({ ":method" => method, ":path" => path }.merge(headers), body, &block) + define_method(:"#{method.downcase}_async") do |path, body, options = {}, &block| + _request_helper(method, path, body, options, &block) end } @@ -188,5 +191,13 @@ module Plum end ctx end + + def _request_helper(method, path, body, options, &block) + base = { ":method" => method, + ":path" => path, + "user-agent" => @config[:user_agent] } + base.merge!(options[:headers]) if options[:headers] + request_async(base, body, options, &block) + end end end diff --git a/lib/plum/client/client_session.rb b/lib/plum/client/client_session.rb index c637aa5..2964769 100644 --- a/lib/plum/client/client_session.rb +++ b/lib/plum/client/client_session.rb @@ -33,7 +33,7 @@ module Plum @plum.close end - def request(headers, body = nil, &headers_cb) + def request(headers, body, options, &headers_cb) headers = { ":method" => nil, ":path" => nil, ":authority" => @config[:hostname], diff --git a/lib/plum/client/legacy_client_session.rb b/lib/plum/client/legacy_client_session.rb index bc0c2bb..2e52be2 100644 --- a/lib/plum/client/legacy_client_session.rb +++ b/lib/plum/client/legacy_client_session.rb @@ -29,7 +29,7 @@ module Plum @response._fail if @response end - def request(headers, body = nil, &headers_cb) + def request(headers, body, options, &headers_cb) response = Response.new @requests << [response, headers, body, headers_cb] consume_queue diff --git a/test/plum/client/test_client.rb b/test/plum/client/test_client.rb index ef93e40..1cc0930 100644 --- a/test/plum/client/test_client.rb +++ b/test/plum/client/test_client.rb @@ -7,7 +7,7 @@ class ClientTest < Minitest::Test client = Client.start("127.0.0.1", LISTEN_PORT, https: true, verify_mode: OpenSSL::SSL::VERIFY_NONE) res1 = client.request({ ":path" => "/", ":method" => "POST", ":scheme" => "https", "header" => "ccc" }, "abc") assert_equal("POSTcccabc", res1.body) - res2 = client.put("/", "aaa", { "header" => "ccc" }) + res2 = client.put("/", "aaa", headers: { "header" => "ccc" }) assert_equal("PUTcccaaa", res2.body) client.close ensure @@ -20,12 +20,12 @@ class ClientTest < Minitest::Test server_thread = start_tls_server Client.start("127.0.0.1", LISTEN_PORT, https: true, verify_mode: OpenSSL::SSL::VERIFY_NONE) { |c| client = c - res1 = client.request_async({ ":path" => "/", ":method" => "GET", ":scheme" => "https", "header" => "ccc" }) { |res1| + res1 = client.request_async({ ":path" => "/", ":method" => "GET", ":scheme" => "https", "header" => "ccc" }, nil) { |res1| assert(res1.headers) } assert_nil(res1.headers) - res2 = client.get_async("/", "header" => "ccc") + res2 = client.get_async("/", headers: { "header" => "ccc" }) } assert(res2.headers) assert_equal("GETccc", res2.body) diff --git a/test/plum/client/test_legacy_client_session.rb b/test/plum/client/test_legacy_client_session.rb index 16111ff..2c8f146 100644 --- a/test/plum/client/test_legacy_client_session.rb +++ b/test/plum/client/test_legacy_client_session.rb @@ -6,7 +6,7 @@ class LegacyClientSessionTest < Minitest::Test io = StringIO.new session = LegacyClientSession.new(io, Client::DEFAULT_CONFIG) assert(session.empty?) - res = session.request({}, "aa") + res = session.request({}, "aa", {}) assert(!session.empty?) io.string << "HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n" session.succ @@ -16,7 +16,7 @@ class LegacyClientSessionTest < Minitest::Test def test_close_fails_req session = LegacyClientSession.new(StringIO.new, Client::DEFAULT_CONFIG) - res = session.request({}) + res = session.request({}, nil, {}) assert(!res.failed?) session.close assert(res.failed?) @@ -25,7 +25,7 @@ class LegacyClientSessionTest < Minitest::Test def test_fail io = StringIO.new session = LegacyClientSession.new(io, Client::DEFAULT_CONFIG) - res = session.request({}, "aa") + res = session.request({}, "aa", {}) assert_raises { session.succ } @@ -36,7 +36,7 @@ class LegacyClientSessionTest < Minitest::Test def test_request io = StringIO.new session = LegacyClientSession.new(io, Client::DEFAULT_CONFIG.merge(hostname: "aa")) - res = session.request({ ":method" => "GET", ":path" => "/aa" }, "aa") + res = session.request({ ":method" => "GET", ":path" => "/aa" }, "aa", {}) assert("GET /aa HTTP/1.1\r\nhost: aa\r\n\r\naa") io.string << "HTTP/1.1 200 OK\r\nContent-Length: 3\r\n\r\naaa" session.succ until res.finished? -- cgit v1.2.3 From 2b23faa53b254bb20b5f3604f8bd8a42dab4f8d0 Mon Sep 17 00:00:00 2001 From: Kazuki Yamaguchi Date: Sun, 8 Nov 2015 18:18:33 +0900 Subject: stream_utils: remove #respond --- examples/non_tls_server.rb | 18 +++++++++++------- examples/static_server.rb | 41 +++++++++++++++++++++++------------------ lib/plum/stream_utils.rb | 12 ------------ test/plum/client/test_client.rb | 3 ++- 4 files changed, 36 insertions(+), 38 deletions(-) (limited to 'lib') diff --git a/examples/non_tls_server.rb b/examples/non_tls_server.rb index ae8d692..e7bad41 100644 --- a/examples/non_tls_server.rb +++ b/examples/non_tls_server.rb @@ -77,28 +77,31 @@ loop do EOF - stream.respond({ + stream.send_headers({ ":status": "200", "server": "plum", "content-type": "text/html", "content-length": body.bytesize - }, body) + }, end_stream: false) + stream.send_data(body, end_stream: true) when ["POST", "/post.page"] body = "Posted value is: #{CGI.unescape(data).gsub("<", "<").gsub(">", ">")}
Back to top page" - stream.respond({ + stream.send_headers({ ":status": "200", "server": "plum", "content-type": "text/html", "content-length": body.bytesize - }, body) + }, end_stream: false) + stream.send_data(body, end_stream: true) else body = "Page not found! Back to top page" - stream.respond({ + stream.send_headers({ ":status": "404", "server": "plum", "content-type": "text/html", "content-length": body.bytesize - }, body) + }, end_stream: false) + stream.send_data(body, end_stream: true) end end end @@ -109,12 +112,13 @@ loop do plum << sock.readpartial(1024) end rescue Plum::LegacyHTTPError + data = "Use modern web browser with HTTP/2 support." resp = "HTTP/1.1 505 HTTP Version Not Supported\r\n" "Content-Type: text/plain\r\n" "Content-Length: #{data.bytesize}\r\n" "Server: plum/#{Plum::VERSION}\r\n" "\r\n" - "Use modern web browser with HTTP/2 support." + "#{data}" sock.write(resp) rescue diff --git a/examples/static_server.rb b/examples/static_server.rb index e2a3645..4013f28 100644 --- a/examples/static_server.rb +++ b/examples/static_server.rb @@ -95,52 +95,57 @@ loop do EOF - stream.respond({ + stream.send_headers({ ":status": "200", "server": "plum", "content-type": "text/html", - "content-length": body.size - }, body) + "content-length": body.bytesize + }, end_stream: false) + stream.send_data(body, end_stream: true) when ["GET", "/abc.html"] body = "ABC! Back to top page
" + stream.send_headers({ + ":status": "200", + "server": "plum", + "content-type": "text/html", + "content-length": body.bytesize + }, end_stream: false) i_stream = stream.promise({ ":authority": "localhost:40443", ":method": "GET", ":scheme": "https", ":path": "/image.nyan" }) - stream.respond({ - ":status": "200", - "server": "plum", - "content-type": "text/html", - "content-length": body.size - }, body) + stream.send_data(body, end_stream: true) image = ("iVBORw0KGgoAAAANSUhEUgAAAEAAAABAAgMAAADXB5lNAAAACVBMVEX///93o0jG/4mTMy20AAAA" \ "bklEQVQ4y2NgoAoIRQJkCoSimIdTgJGBBU1ABE1A1AVdBQuaACu6gCALhhZ0axlZCDgMWYAB6ilU" \ "35IoADEMxWyyBDD45AhQCFahM0kXWIVu3sAJrILzyBcgytoFeATABBcXWohhCEC14BCgGAAAX1ZQ" \ "ZtJp0zAAAAAASUVORK5CYII=").unpack("m")[0] - i_stream.respond({ + i_stream.send_headers({ ":status": "200", "server": "plum", "content-type": "image/png", - "content-length": image.size - }, image) + "content-length": image.bytesize + }, end_stream: false) + i_stream.send_data(image, end_stream: true) when ["POST", "/post.page"] body = "Posted value is: #{CGI.unescape(data).gsub("<", "<").gsub(">", ">")}
Back to top page" - stream.respond({ + stream.send_headers({ ":status": "200", "server": "plum", "content-type": "text/html", - "content-length": body.size - }, body) + "content-length": body.bytesize + }, end_stream: false) + stream.send_data(body, end_stream: true) else body = "Page not found! Back to top page" - stream.respond({ + stream.send_headers({ ":status": "404", "server": "plum", "content-type": "text/html", - "content-length": body.size - }, body) + "content-length": body.bytesize + }, end_stream: false) + stream.send_data(body, end_stream: true) end end end diff --git a/lib/plum/stream_utils.rb b/lib/plum/stream_utils.rb index a8d959f..e344528 100644 --- a/lib/plum/stream_utils.rb +++ b/lib/plum/stream_utils.rb @@ -3,18 +3,6 @@ using Plum::BinaryString module Plum module StreamUtils - # Responds to a HTTP request. - # @param headers [Enumerable] The response headers. - # @param body [String, IO] The response body. - def respond(headers, body = nil, end_stream: true) # TODO: priority, padding - if body - send_headers(headers, end_stream: false) - send_data(body, end_stream: end_stream) - else - send_headers(headers, end_stream: end_stream) - end - end - # Reserves a stream to server push. Sends PUSH_PROMISE and create new stream. # @param headers [Enumerable] The *request* headers. It must contain all of them: ':authority', ':method', ':scheme' and ':path'. # @return [Stream] The stream to send push response. diff --git a/test/plum/client/test_client.rb b/test/plum/client/test_client.rb index 1cc0930..c9514cf 100644 --- a/test/plum/client/test_client.rb +++ b/test/plum/client/test_client.rb @@ -112,7 +112,8 @@ class ClientTest < Minitest::Test stream.send_data("a", end_stream: false) raise ExampleError, "example error" else - stream.respond({ ":status" => 200 }, headers.to_h[":method"] + headers.to_h["header"].to_s + data.to_s) + stream.send_headers({ ":status" => 200 }, end_stream: false) + stream.send_data(headers.to_h[":method"] + headers.to_h["header"].to_s + data.to_s, end_stream: true) end } } yield plum if block_given? -- cgit v1.2.3 From ee159b255fbf77c04d8a68ec573d195e6fbe0791 Mon Sep 17 00:00:00 2001 From: Kazuki Yamaguchi Date: Sun, 8 Nov 2015 19:56:34 +0900 Subject: doc: update --- README.md | 5 ++++- lib/plum/client.rb | 5 +++-- 2 files changed, 7 insertions(+), 3 deletions(-) (limited to 'lib') diff --git a/README.md b/README.md index 007c7be..b95ce07 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ A pure Ruby HTTP/2 implementation. * OpenSSL 1.0.2 or newer (HTTP/2 requires ALPN) * Optional: * [http_parser.rb gem](https://rubygems.org/gems/http_parser.rb) (HTTP/1.x parser; if you use "http" URI scheme) - * [rack gem](https://rubygems.org/gems/rack) if you use Plum as Rack server. + * [rack gem](https://rubygems.org/gems/rack) (if you use Plum as Rack server) ## Usage ### As a HTTP/2 (HTTP/1.x) client library @@ -82,6 +82,9 @@ By default, Plum generates a dummy server certificate if `--cert` and `--key` op ## TODO * **Better API** +* Plum::Client + * Stream Priority support + * PING frame handling ## License MIT License diff --git a/lib/plum/client.rb b/lib/plum/client.rb index 64452f3..cd1315b 100644 --- a/lib/plum/client.rb +++ b/lib/plum/client.rb @@ -53,7 +53,7 @@ module Plum self end - # Waits for the asynchronous response(s) to finish. + # Resume communication with the server, until the specified (or all running) requests are complete. # @param response [Response] if specified, waits only for the response # @return [Response] if parameter response is specified def wait(response = nil) @@ -65,7 +65,7 @@ module Plum end end - # Waits for the response headers. + # Resume communication with the server until the response headers are sent. # @param response [Response] the incomplete response. def wait_headers(response) @session.succ while !response.failed? && !response.headers @@ -92,6 +92,7 @@ module Plum # @param headers [Hash] the request headers # @param body [String] the request body # @param options [Hash] the request options + # @param block [Proc] if passed, it will be called when received response headers. def request(headers, body, options = {}, &block) wait request_async(headers, body, options, &block) end -- cgit v1.2.3 From 70418e74c948917d4f6b3f03896abae01c44256a Mon Sep 17 00:00:00 2001 From: Kazuki Yamaguchi Date: Sun, 8 Nov 2015 20:37:21 +0900 Subject: client: add :http2 option --- lib/plum/client.rb | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) (limited to 'lib') diff --git a/lib/plum/client.rb b/lib/plum/client.rb index cd1315b..a4fe243 100644 --- a/lib/plum/client.rb +++ b/lib/plum/client.rb @@ -9,6 +9,7 @@ module Plum hostname: nil, http2_settings: {}, user_agent: "plum/#{Plum::VERSION}", + http2: true, }.freeze attr_reader :host, :port, :config @@ -146,7 +147,7 @@ module Plum def _start @started = true - http2 = true + http2 = @config[:http2] unless @socket @socket = TCPSocket.open(host, port) if config[:tls] @@ -159,7 +160,7 @@ module Plum if @socket.respond_to?(:alpn_protocol) http2 = @socket.alpn_protocol == "h2" - elsif sock.respond_to?(:npn_protocol) + elsif @socket.respond_to?(:npn_protocol) # TODO: remove http2 = @socket.npn_protocol == "h2" else http2 = false @@ -178,17 +179,19 @@ module Plum ctx = OpenSSL::SSL::SSLContext.new ctx.ssl_version = :TLSv1_2 ctx.verify_mode = @config[:verify_mode] - ctx.ciphers = "ALL:!" + HTTPSServerConnection::CIPHER_BLACKLIST.join(":!") cert_store = OpenSSL::X509::Store.new cert_store.set_default_paths ctx.cert_store = cert_store - if ctx.respond_to?(:alpn_protocols) - ctx.alpn_protocols = ["h2", "http/1.1"] - end - if ctx.respond_to?(:npn_select_cb) # TODO: RFC 7540 does not define protocol negotiation with NPN - ctx.npn_select_cb = -> protocols { - protocols.include?("h2") ? "h2" : protocols.first - } + if @config[:http2] + ctx.ciphers = "ALL:!" + HTTPSServerConnection::CIPHER_BLACKLIST.join(":!") + if ctx.respond_to?(:alpn_protocols) + ctx.alpn_protocols = ["h2", "http/1.1"] + end + if ctx.respond_to?(:npn_select_cb) # TODO: RFC 7540 does not define protocol negotiation with NPN + ctx.npn_select_cb = -> protocols { + protocols.include?("h2") ? "h2" : protocols.first + } + end end ctx end -- cgit v1.2.3 From 256e0a5133861568e52c499da8074dd211b650ff Mon Sep 17 00:00:00 2001 From: Kazuki Yamaguchi Date: Sun, 8 Nov 2015 22:43:14 +0900 Subject: client/legacy_client_session: set transfer-encoding: chunked if content-length is not specified --- lib/plum/client/legacy_client_session.rb | 46 +++++++++++++++++++----- test/plum/client/test_legacy_client_session.rb | 49 ++++++++++++++++++++++++-- 2 files changed, 84 insertions(+), 11 deletions(-) (limited to 'lib') diff --git a/lib/plum/client/legacy_client_session.rb b/lib/plum/client/legacy_client_session.rb index 2e52be2..087c643 100644 --- a/lib/plum/client/legacy_client_session.rb +++ b/lib/plum/client/legacy_client_session.rb @@ -30,8 +30,18 @@ module Plum end def request(headers, body, options, &headers_cb) + headers["host"] = headers[":authority"] || headers["host"] || @config[:hostname] + if body + if headers["content-length"] || headers["transfer-encoding"] + chunked = false + else + chunked = true + headers["transfer-encoding"] = "chunked" + end + end + response = Response.new - @requests << [response, headers, body, headers_cb] + @requests << [response, headers, body, chunked, headers_cb] consume_queue response end @@ -45,20 +55,40 @@ module Plum def consume_queue return if @response || @requests.empty? - response, headers, body, cb = @requests.shift - headers["host"] = headers[":authority"] || headers["host"] || @config[:hostname] + response, headers, body, chunked, cb = @requests.shift @response = response @headers_callback = cb - @socket << "%s %s HTTP/1.1\r\n" % [headers[":method"], headers[":path"]] + @socket << construct_request(headers) + + if body + if chunked + read_object(body) { |chunk| + @socket << chunk.bytesize.to_s(16) << "\r\n" << chunk << "\r\n" + } + else + read_object(body) { |chunk| @socket << chunk } + end + end + end + + def construct_request(headers) + out = String.new + out << "%s %s HTTP/1.1\r\n" % [headers[":method"], headers[":path"]] headers.each { |key, value| next if key.start_with?(":") # HTTP/2 psuedo headers - @socket << "%s: %s\r\n" % [key, value] + out << "%s: %s\r\n" % [key, value] } - @socket << "\r\n" + out << "\r\n" + end - if body - @socket << body + def read_object(body) + if body.is_a?(String) + yield body + else # IO + until body.eof? + yield body.readpartial(1024) + end end end diff --git a/test/plum/client/test_legacy_client_session.rb b/test/plum/client/test_legacy_client_session.rb index 2c8f146..421a261 100644 --- a/test/plum/client/test_legacy_client_session.rb +++ b/test/plum/client/test_legacy_client_session.rb @@ -37,11 +37,54 @@ class LegacyClientSessionTest < Minitest::Test io = StringIO.new session = LegacyClientSession.new(io, Client::DEFAULT_CONFIG.merge(hostname: "aa")) res = session.request({ ":method" => "GET", ":path" => "/aa" }, "aa", {}) - assert("GET /aa HTTP/1.1\r\nhost: aa\r\n\r\naa") + assert_equal("GET /aa HTTP/1.1\r\nhost: aa\r\ntransfer-encoding: chunked\r\n\r\n2\r\naa\r\n", io.string) io.string << "HTTP/1.1 200 OK\r\nContent-Length: 3\r\n\r\naaa" session.succ until res.finished? assert(res.finished?) - assert("aaa", res.body) - assert({ ":status" => "200", "content-length" => "3" }, res.headers) + assert_equal("aaa", res.body) + assert_equal({ ":status" => "200", "content-length" => "3" }, res.headers) + end + + def test_chunked_chunked_string + io = StringIO.new + session = LegacyClientSession.new(io, Client::DEFAULT_CONFIG.merge(hostname: "hostname")) + res = session.request({ ":method" => "GET", ":path" => "/aa" }, "a" * 1025, {}) + assert_equal(<<-EOR, io.string) +GET /aa HTTP/1.1\r +host: hostname\r +transfer-encoding: chunked\r +\r +401\r +#{"a"*1025}\r + EOR + end + + def test_chunked_chunked_io + io = StringIO.new + session = LegacyClientSession.new(io, Client::DEFAULT_CONFIG.merge(hostname: "hostname")) + res = session.request({ ":method" => "GET", ":path" => "/aa" }, StringIO.new("a" * 1025), {}) + assert_equal(<<-EOR, io.string) +GET /aa HTTP/1.1\r +host: hostname\r +transfer-encoding: chunked\r +\r +400\r +#{"a"*1024}\r +1\r +a\r + EOR + end + + def test_chunked_sized + io = StringIO.new + session = LegacyClientSession.new(io, Client::DEFAULT_CONFIG.merge(hostname: "hostname")) + res = session.request({ ":method" => "GET", ":path" => "/aa", "content-length" => 1025 }, StringIO.new("a" * 1025), {}) + assert_equal((<<-EOR).chomp, io.string) +GET /aa HTTP/1.1\r +content-length: 1025\r +host: hostname\r +\r +#{"a"*1025} + EOR end end -- cgit v1.2.3 From 01bfb2249e0be08968dfbaa755b2c66fcb21b916 Mon Sep 17 00:00:00 2001 From: Kazuki Yamaguchi Date: Sun, 8 Nov 2015 22:44:15 +0900 Subject: client/legacy_client_session: fix response header: ':status' header's value must be String for consistency --- lib/plum/client/legacy_client_session.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'lib') diff --git a/lib/plum/client/legacy_client_session.rb b/lib/plum/client/legacy_client_session.rb index 087c643..f98e7bf 100644 --- a/lib/plum/client/legacy_client_session.rb +++ b/lib/plum/client/legacy_client_session.rb @@ -96,7 +96,7 @@ module Plum parser = HTTP::Parser.new parser.on_headers_complete = proc { resp_headers = parser.headers.map { |key, value| [key.downcase, value] }.to_h - @response._headers({ ":status" => parser.status_code }.merge(resp_headers)) + @response._headers({ ":status" => parser.status_code.to_s }.merge(resp_headers)) @headers_callback.call(@response) if @headers_callback } -- cgit v1.2.3 From 9bf2c8a0b1c1bf1010de5dea0c37627f7abd4944 Mon Sep 17 00:00:00 2001 From: Kazuki Yamaguchi Date: Mon, 9 Nov 2015 17:26:55 +0900 Subject: client: rename #wait to #resume --- lib/plum/client.rb | 29 ++++++++--------------------- test/plum/client/test_client.rb | 28 +++++++++++++--------------- 2 files changed, 21 insertions(+), 36 deletions(-) (limited to 'lib') diff --git a/lib/plum/client.rb b/lib/plum/client.rb index a4fe243..233ad4b 100644 --- a/lib/plum/client.rb +++ b/lib/plum/client.rb @@ -45,7 +45,7 @@ module Plum if block_given? begin ret = yield(self) - wait + resume return ret ensure close @@ -57,7 +57,7 @@ module Plum # Resume communication with the server, until the specified (or all running) requests are complete. # @param response [Response] if specified, waits only for the response # @return [Response] if parameter response is specified - def wait(response = nil) + def resume(response = nil) if response @session.succ until response.failed? || response.finished? response @@ -66,12 +66,6 @@ module Plum end end - # Resume communication with the server until the response headers are sent. - # @param response [Response] the incomplete response. - def wait_headers(response) - @session.succ while !response.failed? && !response.headers - end - # Closes the connection immediately. def close @session.close if @session @@ -84,25 +78,17 @@ module Plum # @param body [String] the request body # @param options [Hash] request options # @param block [Proc] if passed, it will be called when received response headers. - def request_async(headers, body, options = {}, &block) + def request(headers, body, options = {}, &block) raise ArgumentError, ":method and :path headers are required" unless headers[":method"] && headers[":path"] @session.request(headers, body, options, &block) end - # Creates a new HTTP request and waits for the response - # @param headers [Hash] the request headers - # @param body [String] the request body - # @param options [Hash] the request options - # @param block [Proc] if passed, it will be called when received response headers. - def request(headers, body, options = {}, &block) - wait request_async(headers, body, options, &block) - end - # @!method get # @!method head # @!method delete # @param path [String] the absolute path to request (translated into :path header) # @param options [Hash] the request options + # @param block [Proc] if specified, calls the block when finished # Shorthand method for `#request` # @!method get_async @@ -114,7 +100,7 @@ module Plum # Shorthand method for `#request_async` %w(GET HEAD DELETE).each { |method| define_method(:"#{method.downcase}") do |path, options = {}, &block| - wait _request_helper(method, path, nil, options, &block) + resume _request_helper(method, path, nil, options, &block) end define_method(:"#{method.downcase}_async") do |path, options = {}, &block| _request_helper(method, path, nil, options, &block) @@ -125,6 +111,7 @@ module Plum # @param path [String] the absolute path to request (translated into :path header) # @param body [String] the request body # @param options [Hash] the request options + # @param block [Proc] if specified, calls the block when finished # Shorthand method for `#request` # @!method post_async @@ -136,7 +123,7 @@ module Plum # Shorthand method for `#request_async` %w(POST PUT).each { |method| define_method(:"#{method.downcase}") do |path, body, options = {}, &block| - wait _request_helper(method, path, body, options, &block) + resume _request_helper(method, path, body, options, &block) end define_method(:"#{method.downcase}_async") do |path, body, options = {}, &block| _request_helper(method, path, body, options, &block) @@ -201,7 +188,7 @@ module Plum ":path" => path, "user-agent" => @config[:user_agent] } base.merge!(options[:headers]) if options[:headers] - request_async(base, body, options, &block) + request(base, body, options, &block) end end end diff --git a/test/plum/client/test_client.rb b/test/plum/client/test_client.rb index c9514cf..8806052 100644 --- a/test/plum/client/test_client.rb +++ b/test/plum/client/test_client.rb @@ -2,16 +2,14 @@ require "test_helper" using Plum::BinaryString class ClientTest < Minitest::Test - def test_request + def test_request_sync server_thread = start_tls_server client = Client.start("127.0.0.1", LISTEN_PORT, https: true, verify_mode: OpenSSL::SSL::VERIFY_NONE) - res1 = client.request({ ":path" => "/", ":method" => "POST", ":scheme" => "https", "header" => "ccc" }, "abc") - assert_equal("POSTcccabc", res1.body) - res2 = client.put("/", "aaa", headers: { "header" => "ccc" }) - assert_equal("PUTcccaaa", res2.body) + res1 = client.put("/", "aaa", headers: { "header" => "ccc" }) + assert_equal("PUTcccaaa", res1.body) client.close ensure - server_thread.join + server_thread.join if server_thread end def test_request_async @@ -20,7 +18,7 @@ class ClientTest < Minitest::Test server_thread = start_tls_server Client.start("127.0.0.1", LISTEN_PORT, https: true, verify_mode: OpenSSL::SSL::VERIFY_NONE) { |c| client = c - res1 = client.request_async({ ":path" => "/", ":method" => "GET", ":scheme" => "https", "header" => "ccc" }, nil) { |res1| + res1 = client.request({ ":path" => "/", ":method" => "GET", ":scheme" => "https", "header" => "ccc" }, nil) { |res1| assert(res1.headers) } assert_nil(res1.headers) @@ -30,7 +28,7 @@ class ClientTest < Minitest::Test assert(res2.headers) assert_equal("GETccc", res2.body) ensure - server_thread.join + server_thread.join if server_thread end def test_verify @@ -40,7 +38,7 @@ class ClientTest < Minitest::Test client = Client.start("127.0.0.1", LISTEN_PORT, https: true, verify_mode: OpenSSL::SSL::VERIFY_PEER) } ensure - server_thread.join + server_thread.join if server_thread end def test_raise_error_sync @@ -53,19 +51,19 @@ class ClientTest < Minitest::Test } } ensure - server_thread.join + server_thread.join if server_thread end - def test_raise_error_async_seq_wait + def test_raise_error_async_seq_resume server_thread = start_tls_server client = Client.start("127.0.0.1", LISTEN_PORT, https: true, verify_mode: OpenSSL::SSL::VERIFY_NONE) res = client.get_async("/error_in_data") assert_raises(LocalConnectionError) { - client.wait(res) + client.resume(res) } client.close ensure - server_thread.join + server_thread.join if server_thread end def test_raise_error_async_block @@ -75,10 +73,10 @@ class ClientTest < Minitest::Test Client.start("127.0.0.1", LISTEN_PORT, https: true, verify_mode: OpenSSL::SSL::VERIFY_NONE) { |c| client = c client.get_async("/connection_error") { |res| flunk "success??" } - } # wait + } # resume } ensure - server_thread.join + server_thread.join if server_thread end private -- cgit v1.2.3 From d2dcdf4f6ec478c139eb160e9c58dc95b96a3bbd Mon Sep 17 00:00:00 2001 From: Kazuki Yamaguchi Date: Mon, 9 Nov 2015 17:35:41 +0900 Subject: client: rename Client#[HTTP_METHOD]_async to #[HTTP_METHOD] --- README.md | 15 ++++++++------- lib/plum/client.rb | 36 ++++++++++++++++++------------------ test/plum/client/test_client.rb | 11 ++++++----- 3 files changed, 32 insertions(+), 30 deletions(-) (limited to 'lib') diff --git a/README.md b/README.md index cd0eac6..68ae80b 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# Plum: An HTTP/2 Library for Ruby + Plum: An HTTP/2 Library for Ruby A pure Ruby HTTP/2 server and client implementation. WARNING: Plum is currently under heavy development. You *will* encounter bugs when using it. @@ -54,9 +54,9 @@ If the server does't support HTTP/2, `Plum::Client` tries to use HTTP/1.x instea ##### Sequential request ```ruby client = Plum::Client.start("http2.rhe.jp", 443, user_agent: "nyaan") -res1 = client.get("/", headers: { "accept" => "*/*" }) +res1 = client.get!("/", headers: { "accept" => "*/*" }) puts res1.body # => "..." -res2 = client.post("/post", "data") +res2 = client.post!("/post", "data") puts res2.body # => "..." client.close @@ -66,8 +66,9 @@ client.close ```ruby res1 = res2 = nil Plum::Client.start("rhe.jp", http2_settings: { max_frame_size: 32768 }) { |client| - res1 = client.get_async("/") - res2 = client.post_async("/post", "data") + res1 = client.get("/") + res2 = client.post("/post", "data") + # res1.status == nil ; because it's async request } # wait for response(s) and close p res1.status # => "200" @@ -76,10 +77,10 @@ p res1.status # => "200" ##### Download a large file ```ruby Plum::Client.start("http2.rhe.jp", hostname: "assets.rhe.jp") { |client| - client.get_async("/large") do |res| # called when received response headers + client.get("/large") do |res| # called when received response headers p res.status # => "200" File.open("/tmp/large.file", "wb") { |file| - res.on_chunk do |chunk| + res.on_chunk do |chunk| # called when each chunk of response body arrived file << chunk end } diff --git a/lib/plum/client.rb b/lib/plum/client.rb index 233ad4b..cc255f4 100644 --- a/lib/plum/client.rb +++ b/lib/plum/client.rb @@ -83,49 +83,49 @@ module Plum @session.request(headers, body, options, &block) end - # @!method get - # @!method head - # @!method delete + # @!method get! + # @!method head! + # @!method delete! # @param path [String] the absolute path to request (translated into :path header) # @param options [Hash] the request options # @param block [Proc] if specified, calls the block when finished - # Shorthand method for `#request` + # Shorthand method for `Client#resume(Client#request(*args))` - # @!method get_async - # @!method head_async - # @!method delete_async + # @!method get + # @!method head + # @!method delete # @param path [String] the absolute path to request (translated into :path header) # @param options [Hash] the request options # @param block [Proc] if specified, calls the block when finished - # Shorthand method for `#request_async` + # Shorthand method for `#request` %w(GET HEAD DELETE).each { |method| - define_method(:"#{method.downcase}") do |path, options = {}, &block| + define_method(:"#{method.downcase}!") do |path, options = {}, &block| resume _request_helper(method, path, nil, options, &block) end - define_method(:"#{method.downcase}_async") do |path, options = {}, &block| + define_method(:"#{method.downcase}") do |path, options = {}, &block| _request_helper(method, path, nil, options, &block) end } - # @!method post - # @!method put + # @!method post! + # @!method put! # @param path [String] the absolute path to request (translated into :path header) # @param body [String] the request body # @param options [Hash] the request options # @param block [Proc] if specified, calls the block when finished - # Shorthand method for `#request` + # Shorthand method for `Client#resume(Client#request(*args))` - # @!method post_async - # @!method put_async + # @!method post + # @!method put # @param path [String] the absolute path to request (translated into :path header) # @param body [String] the request body # @param options [Hash] the request options # @param block [Proc] if specified, calls the block when finished - # Shorthand method for `#request_async` + # Shorthand method for `#request` %w(POST PUT).each { |method| - define_method(:"#{method.downcase}") do |path, body, options = {}, &block| + define_method(:"#{method.downcase}!") do |path, body, options = {}, &block| resume _request_helper(method, path, body, options, &block) end - define_method(:"#{method.downcase}_async") do |path, body, options = {}, &block| + define_method(:"#{method.downcase}") do |path, body, options = {}, &block| _request_helper(method, path, body, options, &block) end } diff --git a/test/plum/client/test_client.rb b/test/plum/client/test_client.rb index 8806052..6d6586a 100644 --- a/test/plum/client/test_client.rb +++ b/test/plum/client/test_client.rb @@ -5,7 +5,7 @@ class ClientTest < Minitest::Test def test_request_sync server_thread = start_tls_server client = Client.start("127.0.0.1", LISTEN_PORT, https: true, verify_mode: OpenSSL::SSL::VERIFY_NONE) - res1 = client.put("/", "aaa", headers: { "header" => "ccc" }) + res1 = client.put!("/", "aaa", headers: { "header" => "ccc" }) assert_equal("PUTcccaaa", res1.body) client.close ensure @@ -23,7 +23,8 @@ class ClientTest < Minitest::Test } assert_nil(res1.headers) - res2 = client.get_async("/", headers: { "header" => "ccc" }) + res2 = client.get("/", headers: { "header" => "ccc" }) + assert_nil(res2.headers) } assert(res2.headers) assert_equal("GETccc", res2.body) @@ -47,7 +48,7 @@ class ClientTest < Minitest::Test Client.start("127.0.0.1", LISTEN_PORT, https: true, verify_mode: OpenSSL::SSL::VERIFY_NONE) { |c| client = c assert_raises(LocalConnectionError) { - client.get("/connection_error") + client.get!("/connection_error") } } ensure @@ -57,7 +58,7 @@ class ClientTest < Minitest::Test def test_raise_error_async_seq_resume server_thread = start_tls_server client = Client.start("127.0.0.1", LISTEN_PORT, https: true, verify_mode: OpenSSL::SSL::VERIFY_NONE) - res = client.get_async("/error_in_data") + res = client.get("/error_in_data") assert_raises(LocalConnectionError) { client.resume(res) } @@ -72,7 +73,7 @@ class ClientTest < Minitest::Test assert_raises(LocalConnectionError) { Client.start("127.0.0.1", LISTEN_PORT, https: true, verify_mode: OpenSSL::SSL::VERIFY_NONE) { |c| client = c - client.get_async("/connection_error") { |res| flunk "success??" } + client.get("/connection_error") { |res| flunk "success??" } } # resume } ensure -- cgit v1.2.3 From 905abc29d5b72ffd4ea9189f7ab827e3fdd062a9 Mon Sep 17 00:00:00 2001 From: Kazuki Yamaguchi Date: Mon, 9 Nov 2015 18:39:33 +0900 Subject: client/response: add Response#on_finish --- lib/plum/client/response.rb | 12 ++++++++++++ test/plum/client/test_response.rb | 11 +++++++++++ 2 files changed, 23 insertions(+) (limited to 'lib') diff --git a/lib/plum/client/response.rb b/lib/plum/client/response.rb index 5c0b2a0..e9bd5e1 100644 --- a/lib/plum/client/response.rb +++ b/lib/plum/client/response.rb @@ -42,6 +42,7 @@ module Plum # @yield [chunk] A chunk of the response body. def on_chunk(&block) raise "Body already read" if @on_chunk + raise ArgumentError, "block must be given" unless block_given? @on_chunk = block unless @body.empty? @body.each(&block) @@ -49,6 +50,16 @@ module Plum end end + # Set callback that will be called when the response finished. + def on_finish(&block) + raise ArgumentError, "block must be given" unless block_given? + if finished? + block.call + else + @on_finish = block + end + end + # Returns the complete response body. Use #each_body instead if the body can be very large. # @return [String] the whole response body def body @@ -78,6 +89,7 @@ module Plum # @api private def _finish @finished = true + @on_finish.call if @on_finish end # @api private diff --git a/test/plum/client/test_response.rb b/test/plum/client/test_response.rb index 553bf45..76d9037 100644 --- a/test/plum/client/test_response.rb +++ b/test/plum/client/test_response.rb @@ -60,4 +60,15 @@ class ResponseTest < Minitest::Test resp._chunk("c") assert_equal(["a", "b", "c"], res) end + + def test_on_finish + resp = Response.new + ran = false + resp.on_finish { ran = true } + resp._finish + assert(ran) + ran = false + resp.on_finish { ran = true } + assert(ran) + end end -- cgit v1.2.3 From 11be104f7ac9e2fe91bd5f82a1bb2489692cdce8 Mon Sep 17 00:00:00 2001 From: Kazuki Yamaguchi Date: Mon, 9 Nov 2015 19:07:21 +0900 Subject: rename {Connection,Stream}Error to Remote{Connection,Stream}Error and create Local*Error --- lib/plum/connection.rb | 16 ++++----- lib/plum/errors.rb | 12 +++---- lib/plum/flow_control.rb | 6 ++-- lib/plum/server/connection.rb | 2 +- lib/plum/server/http_connection.rb | 2 +- lib/plum/server/https_connection.rb | 2 +- lib/plum/stream.rb | 54 +++++++++++++++++-------------- test/plum/connection/test_handle_frame.rb | 5 ++- test/plum/test_connection.rb | 8 +++++ test/plum/test_error.rb | 3 +- test/plum/test_stream.rb | 28 +++++++++++++--- test/utils/assertions.rb | 18 +++++------ 12 files changed, 96 insertions(+), 60 deletions(-) (limited to 'lib') diff --git a/lib/plum/connection.rb b/lib/plum/connection.rb index 1d47360..c31836a 100644 --- a/lib/plum/connection.rb +++ b/lib/plum/connection.rb @@ -51,7 +51,7 @@ module Plum return if new_data.empty? @buffer << new_data consume_buffer - rescue ConnectionError => e + rescue RemoteConnectionError => e callback(:connection_error, e) goaway(e.http2_error_type) close @@ -94,12 +94,12 @@ module Plum def validate_received_frame(frame) if @state == :waiting_settings && frame.type != :settings - raise ConnectionError.new(:protocol_error) + raise RemoteConnectionError.new(:protocol_error) end if @state == :waiting_continuation if frame.type != :continuation || frame.stream_id != @continuation_id - raise ConnectionError.new(:protocol_error) + raise RemoteConnectionError.new(:protocol_error) end if frame.end_headers? @state = :open @@ -127,7 +127,7 @@ module Plum def receive_control_frame(frame) if frame.length > @local_settings[:max_frame_size] - raise ConnectionError.new(:frame_size_error) + raise RemoteConnectionError.new(:frame_size_error) end case frame.type @@ -140,7 +140,7 @@ module Plum when :goaway receive_goaway(frame) when :data, :headers, :priority, :rst_stream, :push_promise, :continuation - raise Plum::ConnectionError.new(:protocol_error) + raise Plum::RemoteConnectionError.new(:protocol_error) else # MUST ignore unknown frame type. end @@ -148,11 +148,11 @@ module Plum def receive_settings(frame, send_ack: true) if frame.ack? - raise ConnectionError.new(:frame_size_error) if frame.length != 0 + raise RemoteConnectionError.new(:frame_size_error) if frame.length != 0 callback(:settings_ack) return else - raise ConnectionError.new(:frame_size_error) if frame.length % 6 != 0 + raise RemoteConnectionError.new(:frame_size_error) if frame.length % 6 != 0 end old_remote_settings = @remote_settings.dup @@ -175,7 +175,7 @@ module Plum end def receive_ping(frame) - raise Plum::ConnectionError.new(:frame_size_error) if frame.length != 8 + raise Plum::RemoteConnectionError.new(:frame_size_error) if frame.length != 8 if frame.ack? callback(:ping_ack) diff --git a/lib/plum/errors.rb b/lib/plum/errors.rb index d5cce48..3220c67 100644 --- a/lib/plum/errors.rb +++ b/lib/plum/errors.rb @@ -31,9 +31,6 @@ module Plum ERROR_CODES[@http2_error_type] end end - class ConnectionError < HTTPError; end - class StreamError < HTTPError; end - class LegacyHTTPError < Error attr_reader :headers, :data, :parser @@ -44,7 +41,10 @@ module Plum end end - # Client - class LocalConnectionError < HTTPError; end - class LocalStreamError < HTTPError; end + class RemoteHTTPError < HTTPError; end + class RemoteConnectionError < RemoteHTTPError; end + class RemoteStreamError < RemoteHTTPError; end + class LocalHTTPError < HTTPError; end + class LocalConnectionError < LocalHTTPError; end + class LocalStreamError < LocalHTTPError; end end diff --git a/lib/plum/flow_control.rb b/lib/plum/flow_control.rb index cfb181d..ccb57dd 100644 --- a/lib/plum/flow_control.rb +++ b/lib/plum/flow_control.rb @@ -65,7 +65,7 @@ module Plum if frame.type == :data @recv_remaining_window -= frame.length if @recv_remaining_window < 0 - local_error = (Connection === self) ? ConnectionError : StreamError + local_error = (Connection === self) ? RemoteConnectionError : RemoteStreamError raise local_error.new(:flow_control_error) end end @@ -82,7 +82,7 @@ module Plum def receive_window_update(frame) if frame.length != 4 - raise Plum::ConnectionError.new(:frame_size_error) + raise Plum::RemoteConnectionError.new(:frame_size_error) end r_wsi = frame.payload.uint32 @@ -90,7 +90,7 @@ module Plum wsi = r_wsi # & ~(1 << 31) if wsi == 0 - local_error = (Connection === self) ? ConnectionError : StreamError + local_error = (Connection === self) ? RemoteConnectionError : RemoteStreamError raise local_error.new(:protocol_error) end diff --git a/lib/plum/server/connection.rb b/lib/plum/server/connection.rb index 450dcf6..a82d4aa 100644 --- a/lib/plum/server/connection.rb +++ b/lib/plum/server/connection.rb @@ -29,7 +29,7 @@ module Plum def negotiate! unless CLIENT_CONNECTION_PREFACE.start_with?(@buffer.byteslice(0, 24)) - raise ConnectionError.new(:protocol_error) # (MAY) send GOAWAY. sending. + raise RemoteConnectionError.new(:protocol_error) # (MAY) send GOAWAY. sending. end if @buffer.bytesize >= 24 diff --git a/lib/plum/server/http_connection.rb b/lib/plum/server/http_connection.rb index 7111bb4..cb49a29 100644 --- a/lib/plum/server/http_connection.rb +++ b/lib/plum/server/http_connection.rb @@ -23,7 +23,7 @@ module Plum private def negotiate! super - rescue ConnectionError + rescue RemoteConnectionError # Upgrade from HTTP/1.1 offset = @_http_parser << @buffer @buffer.byteshift(offset) diff --git a/lib/plum/server/https_connection.rb b/lib/plum/server/https_connection.rb index 09e360f..bac1b4b 100644 --- a/lib/plum/server/https_connection.rb +++ b/lib/plum/server/https_connection.rb @@ -10,7 +10,7 @@ module Plum if @sock.respond_to?(:cipher) # OpenSSL::SSL::SSLSocket-like if CIPHER_BLACKLIST.include?(@sock.cipher.first) # [cipher-suite, ssl-version, keylen, alglen] on(:negotiated) { - raise ConnectionError.new(:inadequate_security) + raise RemoteConnectionError.new(:inadequate_security) } end end diff --git a/lib/plum/stream.rb b/lib/plum/stream.rb index 61c7cd5..87e228e 100644 --- a/lib/plum/stream.rb +++ b/lib/plum/stream.rb @@ -47,20 +47,21 @@ module Plum when :push_promise receive_push_promise(frame) when :ping, :goaway, :settings - raise ConnectionError.new(:protocol_error) # stream_id MUST be 0x00 + raise RemoteConnectionError.new(:protocol_error) # stream_id MUST be 0x00 else # MUST ignore unknown frame end - rescue StreamError => e + rescue RemoteStreamError => e callback(:stream_error, e) - close(e.http2_error_type) + send_immediately Frame.rst_stream(id, e.http2_error_type) + close end # Closes this stream. Sends RST_STREAM frame to the peer. # @param error_type [Symbol] The error type to be contained in the RST_STREAM frame. - def close(error_type = :no_error) + def close @state = :closed - send_immediately Frame.rst_stream(id, error_type) + callback(:close) end # @api private @@ -70,7 +71,7 @@ module Plum # @api private def update_dependency(weight: nil, parent: nil, exclusive: nil) - raise StreamError.new(:protocol_error, "A stream cannot depend on itself.") if parent == self + raise RemoteStreamError.new(:protocol_error, "A stream cannot depend on itself.") if parent == self if weight @weight = weight @@ -102,9 +103,9 @@ module Plum def validate_received_frame(frame) if frame.length > @connection.local_settings[:max_frame_size] if [:headers, :push_promise, :continuation].include?(frame.type) - raise ConnectionError.new(:frame_size_error) + raise RemoteConnectionError.new(:frame_size_error) else - raise StreamError.new(:frame_size_error) + raise RemoteStreamError.new(:frame_size_error) end end end @@ -116,13 +117,13 @@ module Plum def receive_data(frame) if @state != :open && @state != :half_closed_local - raise StreamError.new(:stream_closed) + raise RemoteStreamError.new(:stream_closed) end if frame.padded? padding_length = frame.payload.uint8 if padding_length >= frame.length - raise ConnectionError.new(:protocol_error, "padding is too long") + raise RemoteConnectionError.new(:protocol_error, "padding is too long") end callback(:data, frame.payload.byteslice(1, frame.length - padding_length - 1)) else @@ -149,7 +150,7 @@ module Plum end if padding_length > payload.bytesize - raise ConnectionError.new(:protocol_error, "padding is too long") + raise RemoteConnectionError.new(:protocol_error, "padding is too long") end frames.each do |frame| @@ -159,7 +160,7 @@ module Plum begin decoded_headers = @connection.hpack_decoder.decode(payload) rescue => e - raise ConnectionError.new(:compression_error, e) + raise RemoteConnectionError.new(:compression_error, e) end callback(:headers, decoded_headers) @@ -169,15 +170,15 @@ module Plum def receive_headers(frame) if @state == :reserved_local - raise ConnectionError.new(:protocol_error) + raise RemoteConnectionError.new(:protocol_error) elsif @state == :half_closed_remote - raise StreamError.new(:stream_closed) + raise RemoteStreamError.new(:stream_closed) elsif @state == :closed - raise ConnectionError.new(:stream_closed) + raise RemoteConnectionError.new(:stream_closed) elsif @state == :closed_implicitly - raise ConnectionError.new(:protocol_error) + raise RemoteConnectionError.new(:protocol_error) elsif @state == :idle && self.id.even? - raise ConnectionError.new(:protocol_error) + raise RemoteConnectionError.new(:protocol_error) end @state = :open @@ -195,10 +196,10 @@ module Plum if promised_stream.state == :closed_implicitly # 5.1.1 An endpoint that receives an unexpected stream identifier MUST respond with a connection error of type PROTOCOL_ERROR. - raise ConnectionError.new(:protocol_error) + raise RemoteConnectionError.new(:protocol_error) elsif promised_id.odd? # 5.1.1 Streams initiated by the server MUST use even-numbered stream identifiers. - raise ConnectionError.new(:protocol_error) + raise RemoteConnectionError.new(:protocol_error) end end @@ -214,7 +215,7 @@ module Plum def receive_priority(frame) if frame.length != 5 - raise StreamError.new(:frame_size_error) + raise RemoteStreamError.new(:frame_size_error) end receive_priority_payload(frame.payload) end @@ -230,13 +231,18 @@ module Plum def receive_rst_stream(frame) if frame.length != 4 - raise ConnectionError.new(:frame_size_error) + raise RemoteConnectionError.new(:frame_size_error) elsif @state == :idle - raise ConnectionError.new(:protocol_error) + raise RemoteConnectionError.new(:protocol_error) end - - callback(:rst_stream, frame) @state = :closed # MUST NOT send RST_STREAM + + error_code = frame.payload.uint32 + if error_code > 0 + raise LocalStreamError.new(HTTPError::ERROR_CODES.key(error_code)) + else + callback(:rst_stream, frame) + end end # override EventEmitter diff --git a/test/plum/connection/test_handle_frame.rb b/test/plum/connection/test_handle_frame.rb index 6aa7db7..d1cc54e 100644 --- a/test/plum/connection/test_handle_frame.rb +++ b/test/plum/connection/test_handle_frame.rb @@ -62,7 +62,10 @@ class ServerConnectionHandleFrameTest < Minitest::Test def test_server_handle_goaway_reply open_server_connection {|con| assert_no_error { - con << Frame.goaway(1234, :stream_closed).assemble + begin + con << Frame.goaway(1, :stream_closed).assemble + rescue LocalHTTPError + end } assert_equal(:goaway, sent_frames.last.type) } diff --git a/test/plum/test_connection.rb b/test/plum/test_connection.rb index f490a87..b1a4803 100644 --- a/test/plum/test_connection.rb +++ b/test/plum/test_connection.rb @@ -92,4 +92,12 @@ class ConnectionTest < Minitest::Test assert_equal(:open, con.state) } end + + def test_connection_local_error + open_server_connection { |con| + assert_raises(LocalConnectionError) { + con << Frame.goaway(0, :frame_size_error).assemble + } + } + end end diff --git a/test/plum/test_error.rb b/test/plum/test_error.rb index 167538a..8eaa489 100644 --- a/test/plum/test_error.rb +++ b/test/plum/test_error.rb @@ -7,7 +7,6 @@ class ErrorTest < Minitest::Test assert_equal(0x08, e.http2_error_code) } - test.call ConnectionError - test.call StreamError + test.call HTTPError end end diff --git a/test/plum/test_stream.rb b/test/plum/test_stream.rb index 9df9a55..a9de58e 100644 --- a/test/plum/test_stream.rb +++ b/test/plum/test_stream.rb @@ -19,14 +19,34 @@ class StreamTest < Minitest::Test } end - def test_stream_close - open_new_stream(state: :half_closed_local) {|stream| - stream.close(:frame_size_error) + def test_stream_remote_error + open_server_connection { |con| + stream = nil + con.on(:headers) { |s| + stream = s + raise RemoteStreamError.new(:frame_size_error) + } + + assert_stream_error(:frame_size_error) { + con << Frame.headers(1, "", :end_headers).assemble + } last = sent_frames.last assert_equal(:rst_stream, last.type) - assert_equal(StreamError.new(:frame_size_error).http2_error_code, last.payload.uint32) + assert_equal(HTTPError::ERROR_CODES[:frame_size_error], last.payload.uint32) assert_equal(:closed, stream.state) } end + + def test_stream_local_error + open_server_connection { |con| + stream = nil + con.on(:headers) { |s| stream = s } + + con << Frame.headers(1, "", :end_headers).assemble + assert_raises(LocalStreamError) { + con << Frame.rst_stream(1, :frame_size_error).assemble + } + } + end end diff --git a/test/utils/assertions.rb b/test/utils/assertions.rb index 71928ed..746d475 100644 --- a/test/utils/assertions.rb +++ b/test/utils/assertions.rb @@ -1,21 +1,21 @@ module CustomAssertions def assert_connection_error(type, &blk) - assert_http_error(Plum::ConnectionError, type, &blk) + assert_http_error(Plum::RemoteConnectionError, type, &blk) end def assert_stream_error(type, &blk) - assert_http_error(Plum::StreamError, type, &blk) + assert_http_error(Plum::RemoteStreamError, type, &blk) end def assert_no_error(stream: nil, connection: nil, &blk) - Plum::ConnectionError.reset - Plum::StreamError.reset + Plum::RemoteConnectionError.reset + Plum::RemoteStreamError.reset begin blk.call - rescue Plum::HTTPError + rescue Plum::RemoteHTTPError end - assert_nil(Plum::StreamError.last, "No stream error expected but raised: #{Plum::StreamError.last}") - assert_nil(Plum::ConnectionError.last, "No connection error expected but raised: #{Plum::ConnectionError.last}") + assert_nil(Plum::RemoteStreamError.last, "No stream error expected but raised: #{Plum::RemoteStreamError.last}") + assert_nil(Plum::RemoteConnectionError.last, "No connection error expected but raised: #{Plum::RemoteConnectionError.last}") end def assert_frame(frame, **args) @@ -56,5 +56,5 @@ module LastErrorExtension base.reset end end -Plum::ConnectionError.__send__(:prepend, LastErrorExtension) -Plum::StreamError.__send__(:prepend, LastErrorExtension) +Plum::RemoteConnectionError.__send__(:prepend, LastErrorExtension) +Plum::RemoteStreamError.__send__(:prepend, LastErrorExtension) -- cgit v1.2.3 From 40e700d3d85b372e64a1eac3c6661e766cf49423 Mon Sep 17 00:00:00 2001 From: Kazuki Yamaguchi Date: Mon, 9 Nov 2015 21:19:36 +0900 Subject: client: support HTTP/1.1 upgrade --- README.md | 4 +-- lib/plum.rb | 1 + lib/plum/client.rb | 54 +++++++++++++++++-------------- lib/plum/client/client_session.rb | 4 ++- lib/plum/client/legacy_client_session.rb | 2 +- lib/plum/client/upgrade_client_session.rb | 46 ++++++++++++++++++++++++++ lib/plum/connection.rb | 29 +++++++++-------- lib/plum/rack/session.rb | 2 +- lib/plum/server/http_connection.rb | 8 ++--- 9 files changed, 103 insertions(+), 47 deletions(-) create mode 100644 lib/plum/client/upgrade_client_session.rb (limited to 'lib') diff --git a/README.md b/README.md index c1e81cf..2bf6fe7 100644 --- a/README.md +++ b/README.md @@ -65,7 +65,7 @@ client.close ##### Parallel request ```ruby res1 = res2 = nil -Plum::Client.start("rhe.jp", http2_settings: { max_frame_size: 32768 }) { |client| +Plum::Client.start("rhe.jp", 443, http2_settings: { max_frame_size: 32768 }) { |client| res1 = client.get("/") res2 = client.post("/post", "data") # res1.status == nil ; because it's async request @@ -76,7 +76,7 @@ p res1.status # => "200" ##### Download a large file ```ruby -Plum::Client.start("http2.rhe.jp", hostname: "assets.rhe.jp") { |client| +Plum::Client.start("http2.rhe.jp", 443, hostname: "assets.rhe.jp") { |client| client.get("/large") do |res| # called when received response headers p res.status # => "200" File.open("/tmp/large.file", "wb") { |file| diff --git a/lib/plum.rb b/lib/plum.rb index 5c754ea..3ae301f 100644 --- a/lib/plum.rb +++ b/lib/plum.rb @@ -27,3 +27,4 @@ require "plum/client/response" require "plum/client/connection" require "plum/client/client_session" require "plum/client/legacy_client_session" +require "plum/client/upgrade_client_session" diff --git a/lib/plum/client.rb b/lib/plum/client.rb index cc255f4..b78e25c 100644 --- a/lib/plum/client.rb +++ b/lib/plum/client.rb @@ -2,7 +2,6 @@ module Plum class Client DEFAULT_CONFIG = { - tls: true, scheme: "https", verify_mode: OpenSSL::SSL::VERIFY_PEER, ssl_context: nil, @@ -31,7 +30,7 @@ module Plum @socket = host else @host = host - @port = port || (config[:tls] ? 443 : 80) + @port = port || (config[:scheme] == "https" ? 443 : 80) end @config = DEFAULT_CONFIG.merge(hostname: host).merge(config) @started = false @@ -131,35 +130,40 @@ module Plum } private + # @return [Boolean] http2 nego? + def _connect + @socket = TCPSocket.open(@host, @port) + + if @config[:scheme] == "https" + ctx = @config[:ssl_context] || new_ssl_ctx + @socket = OpenSSL::SSL::SSLSocket.new(@socket, ctx) + @socket.hostname = @config[:hostname] if @socket.respond_to?(:hostname=) + @socket.sync_close = true + @socket.connect + @socket.post_connection_check(@config[:hostname]) if ctx.verify_mode != OpenSSL::SSL::VERIFY_NONE + + @socket.respond_to?(:alpn_protocol) && @socket.alpn_protocol == "h2" || + @socket.respond_to?(:npn_protocol) && @socket.npn_protocol == "h2" + end + end + def _start @started = true - http2 = @config[:http2] - unless @socket - @socket = TCPSocket.open(host, port) - if config[:tls] - ctx = @config[:ssl_context] || new_ssl_ctx - @socket = OpenSSL::SSL::SSLSocket.new(@socket, ctx) - @socket.hostname = @config[:hostname] if @socket.respond_to?(:hostname=) - @socket.sync_close = true - @socket.connect - @socket.post_connection_check(@config[:hostname]) if ctx.verify_mode != OpenSSL::SSL::VERIFY_NONE - - if @socket.respond_to?(:alpn_protocol) - http2 = @socket.alpn_protocol == "h2" - elsif @socket.respond_to?(:npn_protocol) # TODO: remove - http2 = @socket.npn_protocol == "h2" - else - http2 = false - end - end - end + klass = @config[:http2] ? ClientSession : LegacyClientSession + nego = @socket || _connect - if http2 - @session = ClientSession.new(@socket, @config) + if @config[:http2] + if @config[:scheme] == "https" + klass = nego ? ClientSession : LegacyClientSession + else + klass = UpgradeClientSession + end else - @session = LegacyClientSession.new(@socket, @config) + klass = LegacyClientSession end + + @session = klass.new(@socket, @config) end def new_ssl_ctx diff --git a/lib/plum/client/client_session.rb b/lib/plum/client/client_session.rb index 2964769..495d099 100644 --- a/lib/plum/client/client_session.rb +++ b/lib/plum/client/client_session.rb @@ -7,6 +7,8 @@ module Plum initial_window_size: 2 ** 30, # TODO } + attr_reader :plum + def initialize(socket, config) @socket = socket @config = config @@ -17,7 +19,7 @@ module Plum end def succ - @plum << @socket.readpartial(1024) + @plum << @socket.readpartial(16384) rescue => e fail(e) end diff --git a/lib/plum/client/legacy_client_session.rb b/lib/plum/client/legacy_client_session.rb index f98e7bf..a30c237 100644 --- a/lib/plum/client/legacy_client_session.rb +++ b/lib/plum/client/legacy_client_session.rb @@ -15,7 +15,7 @@ module Plum end def succ - @parser << @socket.readpartial(1024) + @parser << @socket.readpartial(16384) rescue => e # including HTTP::Parser::Error fail(e) end diff --git a/lib/plum/client/upgrade_client_session.rb b/lib/plum/client/upgrade_client_session.rb new file mode 100644 index 0000000..c0e0d9e --- /dev/null +++ b/lib/plum/client/upgrade_client_session.rb @@ -0,0 +1,46 @@ +# -*- frozen-string-literal: true -*- +module Plum + # Try upgrade to HTTP/2 + class UpgradeClientSession + def initialize(socket, config) + prepare_session(socket, config) + end + + def succ + @session.succ + end + + def empty? + @session.empty? + end + + def close + @session.close + end + + def request(headers, body, options, &headers_cb) + @session.request(headers, body, options, &headers_cb) + end + + private + def prepare_session(socket, config) + lcs = LegacyClientSession.new(socket, config) + opt_res = lcs.request({ ":method" => "OPTIONS", + ":path" => "*", + "User-Agent" => config[:user_agent], + "Connection" => "Upgrade, HTTP2-Settings", + "Upgrade" => "h2c", + "HTTP2-Settings" => "" }, nil, {}) + lcs.succ until opt_res.finished? + + if opt_res.status == "101" + lcs.close + @session = ClientSession.new(socket, config) + @session.plum.stream(1).set_state(:half_closed_local) + else + @session = lcs + end + end + end +end + diff --git a/lib/plum/connection.rb b/lib/plum/connection.rb index c31836a..f158560 100644 --- a/lib/plum/connection.rb +++ b/lib/plum/connection.rb @@ -58,19 +58,9 @@ module Plum end alias << receive - private - def consume_buffer - while frame = Frame.parse!(@buffer) - callback(:frame, frame) - receive_frame(frame) - end - end - - def send_immediately(frame) - callback(:send_frame, frame) - @writer.call(frame.assemble) - end - + # Returns a Stream object with the specified ID. + # @param stream_id [Integer] the stream id + # @return [Stream] the stream def stream(stream_id) raise ArgumentError, "stream_id can't be 0" if stream_id == 0 @@ -92,6 +82,19 @@ module Plum stream end + private + def consume_buffer + while frame = Frame.parse!(@buffer) + callback(:frame, frame) + receive_frame(frame) + end + end + + def send_immediately(frame) + callback(:send_frame, frame) + @writer.call(frame.assemble) + end + def validate_received_frame(frame) if @state == :waiting_settings && frame.type != :settings raise RemoteConnectionError.new(:protocol_error) diff --git a/lib/plum/rack/session.rb b/lib/plum/rack/session.rb index d0aedc5..d9efb9d 100644 --- a/lib/plum/rack/session.rb +++ b/lib/plum/rack/session.rb @@ -26,7 +26,7 @@ module Plum def run begin while !@sock.closed? && !@sock.eof? - @plum << @sock.readpartial(1024) + @plum << @sock.readpartial(16384) end rescue Errno::EPIPE, Errno::ECONNRESET => e rescue StandardError => e diff --git a/lib/plum/server/http_connection.rb b/lib/plum/server/http_connection.rb index cb49a29..74b7ad8 100644 --- a/lib/plum/server/http_connection.rb +++ b/lib/plum/server/http_connection.rb @@ -59,10 +59,10 @@ module Plum process_first_request } - resp = "HTTP/1.1 101 Switching Protocols\r\n" - "Connection: Upgrade\r\n" - "Upgrade: h2c\r\n" - "Server: plum/#{Plum::VERSION}\r\n" + resp = "HTTP/1.1 101 Switching Protocols\r\n" + + "Connection: Upgrade\r\n" + + "Upgrade: h2c\r\n" + + "Server: plum/#{Plum::VERSION}\r\n" + "\r\n" @sock.write(resp) -- cgit v1.2.3 From 223addb898938dec5cd5196f7ad9f0b3aca17a99 Mon Sep 17 00:00:00 2001 From: Kazuki Yamaguchi Date: Mon, 9 Nov 2015 22:38:26 +0900 Subject: tests: add test cases for Client --- lib/plum/client.rb | 6 +++--- test/plum/client/test_client.rb | 18 ++++++++++++++++++ test/utils/string_socket.rb | 9 ++++++--- 3 files changed, 27 insertions(+), 6 deletions(-) (limited to 'lib') diff --git a/lib/plum/client.rb b/lib/plum/client.rb index b78e25c..c2a1b83 100644 --- a/lib/plum/client.rb +++ b/lib/plum/client.rb @@ -2,17 +2,17 @@ module Plum class Client DEFAULT_CONFIG = { + http2: true, scheme: "https", + hostname: nil, verify_mode: OpenSSL::SSL::VERIFY_PEER, ssl_context: nil, - hostname: nil, http2_settings: {}, user_agent: "plum/#{Plum::VERSION}", - http2: true, }.freeze attr_reader :host, :port, :config - attr_reader :socket + attr_reader :socket, :session # Creates a new HTTP client and starts communication. # A shorthand for `Plum::Client.new(args).start(&block)` diff --git a/test/plum/client/test_client.rb b/test/plum/client/test_client.rb index 6d6586a..214e30f 100644 --- a/test/plum/client/test_client.rb +++ b/test/plum/client/test_client.rb @@ -80,6 +80,24 @@ class ClientTest < Minitest::Test server_thread.join if server_thread end + def test_session_socket_http2_https + sock = StringSocket.new + client = Client.start(sock, nil, http2: true, scheme: "https") + assert(client.session.class == ClientSession) + end + + def test_session_socket_http2_http + sock = StringSocket.new("HTTP/1.1 100\r\n\r\n") + client = Client.start(sock, nil, http2: true, scheme: "http") + assert(client.session.class == UpgradeClientSession) + end + + def test_session_socket_http1 + sock = StringSocket.new + client = Client.start(sock, nil, http2: false) + assert(client.session.class == LegacyClientSession) + end + private def start_tls_server(&block) ctx = OpenSSL::SSL::SSLContext.new diff --git a/test/utils/string_socket.rb b/test/utils/string_socket.rb index fb59172..0cffa64 100644 --- a/test/utils/string_socket.rb +++ b/test/utils/string_socket.rb @@ -1,12 +1,15 @@ -class StringSocket +class StringSocket < IO + # remove all methods + (IO.instance_methods - Object.instance_methods).each { |symbol| undef_method symbol } + extend Forwardable def_delegators :@rio, :readpartial def_delegators :@wio, :<<, :write attr_reader :rio, :wio - def initialize(str) - @rio = StringIO.new(str) + def initialize(str = nil) + @rio = StringIO.new(str.to_s) @wio = StringIO.new end end -- cgit v1.2.3 From 0dea10636e87686119a0aed72bca7564a50cb9f9 Mon Sep 17 00:00:00 2001 From: Kazuki Yamaguchi Date: Mon, 9 Nov 2015 22:47:24 +0900 Subject: bump version to 0.2.0 --- lib/plum/version.rb | 2 +- plum.gemspec | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) (limited to 'lib') diff --git a/lib/plum/version.rb b/lib/plum/version.rb index 1e4ed3e..19b76b4 100644 --- a/lib/plum/version.rb +++ b/lib/plum/version.rb @@ -1,4 +1,4 @@ # -*- frozen-string-literal: true -*- module Plum - VERSION = "0.1.3" + VERSION = "0.2.0" end diff --git a/plum.gemspec b/plum.gemspec index cb1a3b6..9754d7e 100644 --- a/plum.gemspec +++ b/plum.gemspec @@ -9,7 +9,7 @@ Gem::Specification.new do |spec| spec.authors = ["rhenium"] spec.email = ["k@rhe.jp"] - spec.summary = %q{A minimal implementation of HTTP/2 server.} + spec.summary = %q{An HTTP/2 Library for Ruby} spec.description = spec.summary spec.homepage = "https://github.com/rhenium/plum" spec.license = "MIT" @@ -24,7 +24,7 @@ Gem::Specification.new do |spec| spec.add_development_dependency "rack" spec.add_development_dependency "rake" spec.add_development_dependency "yard" - spec.add_development_dependency "minitest", "~> 5.7.0" + spec.add_development_dependency "minitest", "~> 5.8.0" spec.add_development_dependency "simplecov" spec.add_development_dependency "codeclimate-test-reporter" spec.add_development_dependency "guard" -- cgit v1.2.3 From 88763b7f38b97ffe0377c5b0699938326ced61c2 Mon Sep 17 00:00:00 2001 From: Kazuki Yamaguchi Date: Tue, 10 Nov 2015 20:19:20 +0900 Subject: client: add auto_decode option: decode automatically deflate or gzip encoded response --- lib/plum.rb | 2 ++ lib/plum/client.rb | 3 +- lib/plum/client/client_session.rb | 2 +- lib/plum/client/decoders.rb | 51 ++++++++++++++++++++++++++++++ lib/plum/client/legacy_client_session.rb | 2 +- lib/plum/client/response.rb | 26 ++++++++++----- lib/plum/errors.rb | 22 +++++++++---- test/plum/client/test_decoders.rb | 54 ++++++++++++++++++++++++++++++++ test/plum/client/test_response.rb | 5 +++ 9 files changed, 150 insertions(+), 17 deletions(-) create mode 100644 lib/plum/client/decoders.rb create mode 100644 test/plum/client/test_decoders.rb (limited to 'lib') diff --git a/lib/plum.rb b/lib/plum.rb index 3ae301f..95e5588 100644 --- a/lib/plum.rb +++ b/lib/plum.rb @@ -2,6 +2,7 @@ require "openssl" require "socket" require "base64" require "set" +require "zlib" require "plum/version" require "plum/errors" require "plum/binary_string" @@ -24,6 +25,7 @@ require "plum/server/https_connection" require "plum/server/http_connection" require "plum/client" require "plum/client/response" +require "plum/client/decoders" require "plum/client/connection" require "plum/client/client_session" require "plum/client/legacy_client_session" diff --git a/lib/plum/client.rb b/lib/plum/client.rb index c2a1b83..8cb9230 100644 --- a/lib/plum/client.rb +++ b/lib/plum/client.rb @@ -9,6 +9,7 @@ module Plum ssl_context: nil, http2_settings: {}, user_agent: "plum/#{Plum::VERSION}", + auto_decode: true, }.freeze attr_reader :host, :port, :config @@ -79,7 +80,7 @@ module Plum # @param block [Proc] if passed, it will be called when received response headers. def request(headers, body, options = {}, &block) raise ArgumentError, ":method and :path headers are required" unless headers[":method"] && headers[":path"] - @session.request(headers, body, options, &block) + @session.request(headers, body, @config.merge(options), &block) end # @!method get! diff --git a/lib/plum/client/client_session.rb b/lib/plum/client/client_session.rb index 495d099..6e9fc56 100644 --- a/lib/plum/client/client_session.rb +++ b/lib/plum/client/client_session.rb @@ -42,7 +42,7 @@ module Plum ":scheme" => @config[:scheme] }.merge(headers) - response = Response.new + response = Response.new(**options) @responses << response stream = @plum.open_stream stream.send_headers(headers, end_stream: !body) diff --git a/lib/plum/client/decoders.rb b/lib/plum/client/decoders.rb new file mode 100644 index 0000000..e6d72e7 --- /dev/null +++ b/lib/plum/client/decoders.rb @@ -0,0 +1,51 @@ +module Plum + module Decoders + class Base + def decode(chunk) + chunk + end + + def finish + end + end + + # `deflate` is not just deflate, wrapped by zlib format (RFC 1950) + class Deflate < Base + def initialize + @inflate = Zlib::Inflate.new(Zlib::MAX_WBITS) + end + + def decode(chunk) + @inflate.inflate(chunk) + rescue Zlib::Error => e + raise DecoderError.new("failed to decode chunk", e) + end + + def finish + @inflate.finish + rescue Zlib::Error => e + raise DecoderError.new("failed to finalize", e) + end + end + + class GZip < Base + def initialize + @stream = Zlib::Inflate.new(Zlib::MAX_WBITS + 16) + end + + def decode(chunk) + @stream.inflate(chunk) + rescue Zlib::Error => e + raise DecoderError.new("failed to decode chunk", e) + end + + def finish + @stream.finish + rescue Zlib::Error => e + raise DecoderError.new("failed to finalize", e) + end + end + + DECODERS = { "gzip" => GZip, "deflate" => Deflate }.freeze + end +end diff --git a/lib/plum/client/legacy_client_session.rb b/lib/plum/client/legacy_client_session.rb index a30c237..bc531ac 100644 --- a/lib/plum/client/legacy_client_session.rb +++ b/lib/plum/client/legacy_client_session.rb @@ -40,7 +40,7 @@ module Plum end end - response = Response.new + response = Response.new(**options) @requests << [response, headers, body, chunked, headers_cb] consume_queue response diff --git a/lib/plum/client/response.rb b/lib/plum/client/response.rb index e9bd5e1..9e50f02 100644 --- a/lib/plum/client/response.rb +++ b/lib/plum/client/response.rb @@ -6,11 +6,12 @@ module Plum attr_reader :headers # @api private - def initialize + def initialize(auto_decode: true, **options) @body = Queue.new @finished = false @failed = false @body = [] + @auto_decode = auto_decode end # Returns the HTTP status code. @@ -54,7 +55,7 @@ module Plum def on_finish(&block) raise ArgumentError, "block must be given" unless block_given? if finished? - block.call + yield else @on_finish = block end @@ -64,21 +65,20 @@ module Plum # @return [String] the whole response body def body raise "Body already read" if @on_chunk - if finished? - @body.join - else - raise "Response body is not complete" - end + raise "Response body is not complete" unless finished? + @body.join end # @api private def _headers(raw_headers) # response headers should not have duplicates @headers = raw_headers.to_h.freeze + @decoder = setup_decoder end # @api private - def _chunk(chunk) + def _chunk(encoded) + chunk = @decoder.decode(encoded) if @on_chunk @on_chunk.call(chunk) else @@ -89,6 +89,7 @@ module Plum # @api private def _finish @finished = true + @decoder.finish @on_finish.call if @on_finish end @@ -96,5 +97,14 @@ module Plum def _fail @failed = true end + + private + def setup_decoder + if @auto_decode + klass = Decoders::DECODERS[@headers["content-encoding"]] + end + klass ||= Decoders::Base + klass.new + end end end diff --git a/lib/plum/errors.rb b/lib/plum/errors.rb index 3220c67..26fade1 100644 --- a/lib/plum/errors.rb +++ b/lib/plum/errors.rb @@ -31,6 +31,14 @@ module Plum ERROR_CODES[@http2_error_type] end end + + class RemoteHTTPError < HTTPError; end + class RemoteConnectionError < RemoteHTTPError; end + class RemoteStreamError < RemoteHTTPError; end + class LocalHTTPError < HTTPError; end + class LocalConnectionError < LocalHTTPError; end + class LocalStreamError < LocalHTTPError; end + class LegacyHTTPError < Error attr_reader :headers, :data, :parser @@ -41,10 +49,12 @@ module Plum end end - class RemoteHTTPError < HTTPError; end - class RemoteConnectionError < RemoteHTTPError; end - class RemoteStreamError < RemoteHTTPError; end - class LocalHTTPError < HTTPError; end - class LocalConnectionError < LocalHTTPError; end - class LocalStreamError < LocalHTTPError; end + class DecoderError < Error + attr_reader :inner_error + + def initialize(message, inner_error = nil) + super(message) + @inner_error = inner_error + end + end end diff --git a/test/plum/client/test_decoders.rb b/test/plum/client/test_decoders.rb new file mode 100644 index 0000000..dfd67b1 --- /dev/null +++ b/test/plum/client/test_decoders.rb @@ -0,0 +1,54 @@ +require "test_helper" + +using Plum::BinaryString +class DecodersTest < Minitest::Test + def test_base_decode + decoder = Decoders::Base.new + assert_equal("abc", decoder.decode("abc")) + end + + def test_base_finish + decoder = Decoders::Base.new + decoder.finish + end + + def test_deflate_decode + decoder = Decoders::Deflate.new + assert_equal("hello", decoder.decode("\x78\x9c\xcb\x48\xcd\xc9\xc9\x07\x00\x06\x2c\x02\x15")) + end + + def test_deflate_decode_error + decoder = Decoders::Deflate.new + assert_raises(DecoderError) { + decoder.decode("\x79\x9c\xcb\x48\xcd\xc9\xc9\x07\x00\x06\x2c\x02\x15") + } + end + + def test_deflate_finish_error + decoder = Decoders::Deflate.new + decoder.decode("\x78\x9c\xcb\x48\xcd\xc9\xc9\x07\x00\x06\x2c\x02") + assert_raises(DecoderError) { + decoder.finish + } + end + + def test_gzip_decode + decoder = Decoders::GZip.new + assert_equal("hello", decoder.decode("\x1f\x8b\x08\x00\x1a\x96\xe0\x4c\x00\x03\xcb\x48\xcd\xc9\xc9\x07\x00\x86\xa6\x10\x36\x05\x00\x00\x00")) + end + + def test_gzip_decode_error + decoder = Decoders::GZip.new + assert_raises(DecoderError) { + decoder.decode("\x2f\x8b\x08\x00\x1a\x96\xe0\x4c\x00\x03\xcb\x48\xcd\xc9\xc9\x07\x00\x86\xa6\x10\x36\x05\x00\x00\x00") + } + end + + def test_gzip_finish_error + decoder = Decoders::GZip.new + decoder.decode("\x1f\x8b\x08\x00\x1a\x96") + assert_raises(DecoderError) { + decoder.finish + } + end +end diff --git a/test/plum/client/test_response.rb b/test/plum/client/test_response.rb index 76d9037..511a073 100644 --- a/test/plum/client/test_response.rb +++ b/test/plum/client/test_response.rb @@ -4,6 +4,7 @@ using Plum::BinaryString class ResponseTest < Minitest::Test def test_finished resp = Response.new + resp._headers({}) assert_equal(false, resp.finished?) resp._finish assert_equal(true, resp.finished?) @@ -34,6 +35,7 @@ class ResponseTest < Minitest::Test def test_body resp = Response.new + resp._headers({}) resp._chunk("a") resp._chunk("b") resp._finish @@ -42,6 +44,7 @@ class ResponseTest < Minitest::Test def test_body_not_finished resp = Response.new + resp._headers({}) resp._chunk("a") resp._chunk("b") assert_raises { # TODO @@ -51,6 +54,7 @@ class ResponseTest < Minitest::Test def test_on_chunk resp = Response.new + resp._headers({}) res = [] resp._chunk("a") resp._chunk("b") @@ -63,6 +67,7 @@ class ResponseTest < Minitest::Test def test_on_finish resp = Response.new + resp._headers({}) ran = false resp.on_finish { ran = true } resp._finish -- cgit v1.2.3 From d22caa2e2684d1e875beb64f53eadecb6e561cf9 Mon Sep 17 00:00:00 2001 From: Kazuki Yamaguchi Date: Tue, 10 Nov 2015 20:20:52 +0900 Subject: bump version to 0.2.1 --- lib/plum/version.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'lib') diff --git a/lib/plum/version.rb b/lib/plum/version.rb index 19b76b4..bbf27cf 100644 --- a/lib/plum/version.rb +++ b/lib/plum/version.rb @@ -1,4 +1,4 @@ # -*- frozen-string-literal: true -*- module Plum - VERSION = "0.2.0" + VERSION = "0.2.1" end -- cgit v1.2.3 From 3466b54b14cc2d4ce3d1db6af1bc282678bab443 Mon Sep 17 00:00:00 2001 From: Kazuki Yamaguchi Date: Wed, 11 Nov 2015 14:16:47 +0900 Subject: connection: split large frame in #send_immediately --- lib/plum/binary_string.rb | 14 ++++++++++++ lib/plum/connection.rb | 9 +++++++- lib/plum/frame_utils.rb | 47 +++++++++++++++++--------------------- lib/plum/server/http_connection.rb | 5 ++-- lib/plum/stream.rb | 1 - lib/plum/stream_utils.rb | 21 +++++++---------- test/plum/test_binary_string.rb | 6 +++++ test/plum/test_connection.rb | 11 +++++++++ test/plum/test_frame_utils.rb | 15 ++++++------ test/utils/server.rb | 7 +++--- 10 files changed, 80 insertions(+), 56 deletions(-) (limited to 'lib') diff --git a/lib/plum/binary_string.rb b/lib/plum/binary_string.rb index 400c57b..2b50b38 100644 --- a/lib/plum/binary_string.rb +++ b/lib/plum/binary_string.rb @@ -70,6 +70,20 @@ module Plum # I want to write `enum_for(__method__, n)`! end end + + # Splits this String into chunks. + # @param n [Integer] max chunk bytesize + # @return [Array] the slices + def chunk(n) + res = [] + pos = 0 + lim = bytesize + while pos < lim + res << byteslice(pos, n) + pos += n + end + res + end end end end diff --git a/lib/plum/connection.rb b/lib/plum/connection.rb index f158560..73f6206 100644 --- a/lib/plum/connection.rb +++ b/lib/plum/connection.rb @@ -92,7 +92,14 @@ module Plum def send_immediately(frame) callback(:send_frame, frame) - @writer.call(frame.assemble) + + if frame.length <= @remote_settings[:max_frame_size] + @writer.call(frame.assemble) + else + frame.split(@remote_settings[:max_frame_size]) { |splitted| + @writer.call(splitted.assemble) + } + end end def validate_received_frame(frame) diff --git a/lib/plum/frame_utils.rb b/lib/plum/frame_utils.rb index 20c79de..447c154 100644 --- a/lib/plum/frame_utils.rb +++ b/lib/plum/frame_utils.rb @@ -3,33 +3,28 @@ using Plum::BinaryString module Plum module FrameUtils - # Splits the DATA frame into multiple frames if the payload size exceeds max size. + # Splits this frame into multiple frames not to exceed MAX_FRAME_SIZE. # @param max [Integer] The maximum size of a frame payload. - # @return [Array] The splitted frames. - def split_data(max) - return [self] if self.length <= max - raise "Frame type must be DATA" unless self.type == :data - - fragments = self.payload.each_byteslice(max).to_a - frames = fragments.map {|fragment| Frame.new(type: :data, flags: [], stream_id: self.stream_id, payload: fragment) } - frames.first.flags = self.flags - [:end_stream] - frames.last.flags = self.flags & [:end_stream] - frames - end - - # Splits the HEADERS or PUSH_PROMISE frame into multiple frames if the payload size exceeds max size. - # @param max [Integer] The maximum size of a frame payload. - # @return [Array] The splitted frames. - def split_headers(max) - return [self] if self.length <= max - raise "Frame type must be HEADERS or PUSH_PROMISE" unless [:headers, :push_promise].include?(self.type) - - fragments = self.payload.each_byteslice(max).to_a - frames = fragments.map {|fragment| Frame.new(type: :continuation, flags: [], stream_id: self.stream_id, payload: fragment) } - frames.first.type_value = self.type_value - frames.first.flags = self.flags - [:end_headers] - frames.last.flags = self.flags & [:end_headers] - frames + # @yield [Frame] The splitted frames. + def split(max) + return yield self if @length <= max + first, *mid, last = @payload.chunk(max) + case type + when :data + yield Frame.new(type_value: 0, stream_id: @stream_id, payload: first, flags_value: @flags_value & ~1) + mid.each { |slice| + yield Frame.new(type_value: 0, stream_id: @stream_id, payload: slice, flags_value: 0) + } + yield Frame.new(type_value: 0, stream_id: @stream_id, payload: last, flags_value: @flags_value & 1) + when :headers, :push_promise + yield Frame.new(type_value: @type_value, stream_id: @stream_id, payload: first, flags_value: @flags_value & ~4) + mid.each { |slice| + yield Frame.new(type: :continuation, stream_id: @stream_id, payload: slice, flags_value: 0) + } + yield Frame.new(type: :continuation, stream_id: @stream_id, payload: last, flags_value: @flags_value & 4) + else + raise NotImplementedError.new("frame split of frame with type #{type} is not supported") + end end # Parses SETTINGS frame payload. Ignores unknown settings type (see RFC7540 6.5.2). diff --git a/lib/plum/server/http_connection.rb b/lib/plum/server/http_connection.rb index 74b7ad8..116fdcb 100644 --- a/lib/plum/server/http_connection.rb +++ b/lib/plum/server/http_connection.rb @@ -77,9 +77,8 @@ module Plum ":authority" => @_headers["host"] }) .reject {|n, v| ["connection", "http2-settings", "upgrade", "host"].include?(n) } - headers_s = Frame.headers(1, encoder.encode(headers), :end_headers).split_headers(max_frame_size) # stream ID is 1 - data_s = Frame.data(1, @_body, :end_stream).split_data(max_frame_size) - (headers_s + data_s).each {|frag| stream.receive_frame(frag) } + stream.receive_frame Frame.headers(1, encoder.encode(headers), :end_headers) + stream.receive_frame Frame.data(1, @_body, :end_stream) end end end diff --git a/lib/plum/stream.rb b/lib/plum/stream.rb index 87e228e..04190fc 100644 --- a/lib/plum/stream.rb +++ b/lib/plum/stream.rb @@ -58,7 +58,6 @@ module Plum end # Closes this stream. Sends RST_STREAM frame to the peer. - # @param error_type [Symbol] The error type to be contained in the RST_STREAM frame. def close @state = :closed callback(:close) diff --git a/lib/plum/stream_utils.rb b/lib/plum/stream_utils.rb index e344528..ff14c7a 100644 --- a/lib/plum/stream_utils.rb +++ b/lib/plum/stream_utils.rb @@ -9,10 +9,8 @@ module Plum def promise(headers) stream = @connection.reserve_stream(weight: self.weight + 1, parent: self) encoded = @connection.hpack_encoder.encode(headers) - original = Frame.push_promise(id, stream.id, encoded, :end_headers) - original.split_headers(@connection.remote_settings[:max_frame_size]).each do |frame| - send frame - end + frame = Frame.push_promise(id, stream.id, encoded, :end_headers) + send frame stream end @@ -22,10 +20,8 @@ module Plum def send_headers(headers, end_stream:) max = @connection.remote_settings[:max_frame_size] encoded = @connection.hpack_encoder.encode(headers) - original_frame = Frame.headers(id, encoded, :end_headers, (end_stream && :end_stream || nil)) - original_frame.split_headers(max).each do |frame| - send frame - end + frame = Frame.headers(id, encoded, :end_headers, (end_stream && :end_stream || nil)) + send frame @state = :half_closed_local if end_stream end @@ -35,14 +31,13 @@ module Plum def send_data(data, end_stream: true) max = @connection.remote_settings[:max_frame_size] if data.is_a?(IO) - while !data.eof? && fragment = data.readpartial(max) + until data.eof? + fragment = data.readpartial(max) send Frame.data(id, fragment, (end_stream && data.eof? && :end_stream)) end else - original = Frame.data(id, data, (end_stream && :end_stream)) - original.split_data(max).each do |frame| - send frame - end + frame = Frame.data(id, data, (end_stream && :end_stream)) + send frame end @state = :half_closed_local if end_stream end diff --git a/test/plum/test_binary_string.rb b/test/plum/test_binary_string.rb index 403248b..266b548 100644 --- a/test/plum/test_binary_string.rb +++ b/test/plum/test_binary_string.rb @@ -61,4 +61,10 @@ class BinaryStringTest < Minitest::Test ret = string.each_byteslice(3) assert_equal(["123", "456", "78"], ret.to_a) end + + def test_chunk + string = "12345678" + ret = string.chunk(3) + assert_equal(["123", "456", "78"], ret) + end end diff --git a/test/plum/test_connection.rb b/test/plum/test_connection.rb index b1a4803..799c3d0 100644 --- a/test/plum/test_connection.rb +++ b/test/plum/test_connection.rb @@ -100,4 +100,15 @@ class ConnectionTest < Minitest::Test } } end + + def test_send_immediately_split + io = StringIO.new + con = Connection.new(io.method(:write)) + fs = parse_frames(io) { + con.__send__(:send_immediately, Frame.new(type: :data, stream_id: 1, payload: "a"*16385)) + } + assert_equal(2, fs.size) + assert_equal(16384, fs.first.length) + assert_equal(1, fs.last.length) + end end diff --git a/test/plum/test_frame_utils.rb b/test/plum/test_frame_utils.rb index 4564e9a..7ad869f 100644 --- a/test/plum/test_frame_utils.rb +++ b/test/plum/test_frame_utils.rb @@ -3,30 +3,29 @@ require "test_helper" class FrameUtilsTest < Minitest::Test def test_frame_enough_short frame = Frame.new(type: :data, stream_id: 1, payload: "123") - ret = frame.split_data(3) + ret = frame.to_enum(:split, 3).to_a assert_equal(1, ret.size) assert_equal("123", ret.first.payload) end def test_frame_unknown frame = Frame.new(type: :settings, stream_id: 1, payload: "123") - assert_raises { frame.split_data(2) } - assert_raises { frame.split_headers(2) } + assert_raises(NotImplementedError) { frame.split(2) } end def test_frame_data frame = Frame.new(type: :data, flags: [:end_stream], stream_id: 1, payload: "12345") - ret = frame.split_data(3) - assert_equal(2, ret.size) - assert_equal("123", ret.first.payload) + ret = frame.to_enum(:split, 2).to_a + assert_equal(3, ret.size) + assert_equal("12", ret.first.payload) assert_equal([], ret.first.flags) - assert_equal("45", ret.last.payload) + assert_equal("5", ret.last.payload) assert_equal([:end_stream], ret.last.flags) end def test_frame_headers frame = Frame.new(type: :headers, flags: [:priority, :end_stream, :end_headers], stream_id: 1, payload: "1234567") - ret = frame.split_headers(3) + ret = frame.to_enum(:split, 3).to_a assert_equal(3, ret.size) assert_equal("123", ret[0].payload) assert_equal([:end_stream, :priority], ret[0].flags) diff --git a/test/utils/server.rb b/test/utils/server.rb index be2aba6..82ed9ad 100644 --- a/test/utils/server.rb +++ b/test/utils/server.rb @@ -39,8 +39,7 @@ module ServerUtils frames end - def capture_frames(con = nil, &blk) - io = (con || @_con).sock + def parse_frames(io, &blk) pos = io.string.bytesize blk.call resp = io.string.byteslice(pos, io.string.bytesize - pos).force_encoding(Encoding::BINARY) @@ -51,8 +50,8 @@ module ServerUtils frames end - def capture_frame(con = nil, &blk) - frames = capture_frames(con, &blk) + def parse_frame(io, &blk) + frames = capture_frames(io, &blk) assert_equal(1, frames.size, "Supplied block sent no frames or more than 1 frame") frames.first end -- cgit v1.2.3 From dc0e97de3cda4e104e48ecaaba9181b7d4906d9d Mon Sep 17 00:00:00 2001 From: Kazuki Yamaguchi Date: Wed, 11 Nov 2015 18:48:31 +0900 Subject: frame_factory: specify flags by kwargs --- lib/plum/frame_factory.rb | 32 +++++++++++++++---------- lib/plum/hpack/constants.rb | 2 ++ lib/plum/hpack/context.rb | 8 +++---- lib/plum/server/http_connection.rb | 4 ++-- lib/plum/stream_utils.rb | 9 ++++--- test/plum/client/test_upgrade_client_session.rb | 4 ++-- test/plum/test_connection_utils.rb | 2 +- test/plum/test_flow_control.rb | 12 +++++----- test/plum/test_frame_factory.rb | 6 ++--- test/plum/test_stream.rb | 4 ++-- 10 files changed, 46 insertions(+), 37 deletions(-) (limited to 'lib') diff --git a/lib/plum/frame_factory.rb b/lib/plum/frame_factory.rb index b4c1c01..de5a0b4 100644 --- a/lib/plum/frame_factory.rb +++ b/lib/plum/frame_factory.rb @@ -54,37 +54,45 @@ module Plum # Creates a DATA frame. # @param stream_id [Integer] The stream ID. # @param payload [String] Payload. - # @param flags [Array] Flags. - def data(stream_id, payload, *flags) + # @param end_stream [Boolean] add END_STREAM flag + def data(stream_id, payload, end_stream: false) payload = payload.b if payload && payload.encoding != Encoding::BINARY - Frame.new(type: :data, stream_id: stream_id, flags: flags, payload: payload) + fval = 0 + fval += 1 if end_stream + Frame.new(type_value: 0, stream_id: stream_id, flags_value: fval, payload: payload) end # Creates a HEADERS frame. # @param stream_id [Integer] The stream ID. # @param encoded [String] Headers. - # @param flags [Array] Flags. - def headers(stream_id, encoded, *flags) - Frame.new(type: :headers, stream_id: stream_id, flags: flags, payload: encoded) + # @param end_stream [Boolean] add END_STREAM flag + # @param end_headers [Boolean] add END_HEADERS flag + def headers(stream_id, encoded, end_stream: false, end_headers: false) + fval = 0 + fval += 1 if end_stream + fval += 4 if end_headers + Frame.new(type_value: 1, stream_id: stream_id, flags_value: fval, payload: encoded) end # Creates a PUSH_PROMISE frame. # @param stream_id [Integer] The stream ID. # @param new_id [Integer] The stream ID to create. # @param encoded [String] Request headers. - # @param flags [Array] Flags. - def push_promise(stream_id, new_id, encoded, *flags) + # @param end_headers [Boolean] add END_HEADERS flag + def push_promise(stream_id, new_id, encoded, end_headers: false) payload = String.new.push_uint32(new_id) .push(encoded) - Frame.new(type: :push_promise, stream_id: stream_id, flags: flags, payload: payload) + fval = 0 + fval += 4 if end_headers + Frame.new(type: :push_promise, stream_id: stream_id, flags_value: fval, payload: payload) end # Creates a CONTINUATION frame. # @param stream_id [Integer] The stream ID. # @param payload [String] Payload. - # @param flags [Array] Flags. - def continuation(stream_id, payload, *flags) - Frame.new(type: :continuation, stream_id: stream_id, flags: flags, payload: payload) + # @param end_headers [Boolean] add END_HEADERS flag + def continuation(stream_id, payload, end_headers: false) + Frame.new(type: :continuation, stream_id: stream_id, flags_value: (end_headers && 4 || 0), payload: payload) end end end diff --git a/lib/plum/hpack/constants.rb b/lib/plum/hpack/constants.rb index 4eac05c..6993735 100644 --- a/lib/plum/hpack/constants.rb +++ b/lib/plum/hpack/constants.rb @@ -67,6 +67,8 @@ module Plum ["www-authenticate", ""], ].freeze + STATIC_TABLE_SIZE = STATIC_TABLE.size + HUFFMAN_TABLE = [ "1111111111000", "11111111111111111011000", diff --git a/lib/plum/hpack/context.rb b/lib/plum/hpack/context.rb index 622fe36..1d7f7d6 100644 --- a/lib/plum/hpack/context.rb +++ b/lib/plum/hpack/context.rb @@ -26,10 +26,10 @@ module Plum def fetch(index) if index == 0 raise HPACKError.new("index can't be 0") - elsif index <= STATIC_TABLE.size - STATIC_TABLE[index - 1] + elsif index <= STATIC_TABLE_SIZE + STATIC_TABLE[index - 1] elsif index <= STATIC_TABLE.size + @dynamic_table.size - @dynamic_table[index - STATIC_TABLE.size - 1] + @dynamic_table[index - STATIC_TABLE_SIZE - 1] else raise HPACKError.new("invalid index: #{index}") end @@ -43,7 +43,7 @@ module Plum si = STATIC_TABLE.index &pr return si + 1 if si di = @dynamic_table.index &pr - return di + STATIC_TABLE.size + 1 if di + return di + STATIC_TABLE_SIZE + 1 if di end def evict diff --git a/lib/plum/server/http_connection.rb b/lib/plum/server/http_connection.rb index 116fdcb..8f91ca4 100644 --- a/lib/plum/server/http_connection.rb +++ b/lib/plum/server/http_connection.rb @@ -77,8 +77,8 @@ module Plum ":authority" => @_headers["host"] }) .reject {|n, v| ["connection", "http2-settings", "upgrade", "host"].include?(n) } - stream.receive_frame Frame.headers(1, encoder.encode(headers), :end_headers) - stream.receive_frame Frame.data(1, @_body, :end_stream) + stream.receive_frame Frame.headers(1, encoder.encode(headers), end_headers: true) + stream.receive_frame Frame.data(1, @_body, end_stream: true) end end end diff --git a/lib/plum/stream_utils.rb b/lib/plum/stream_utils.rb index ff14c7a..dbb8d96 100644 --- a/lib/plum/stream_utils.rb +++ b/lib/plum/stream_utils.rb @@ -9,7 +9,7 @@ module Plum def promise(headers) stream = @connection.reserve_stream(weight: self.weight + 1, parent: self) encoded = @connection.hpack_encoder.encode(headers) - frame = Frame.push_promise(id, stream.id, encoded, :end_headers) + frame = Frame.push_promise(id, stream.id, encoded, end_headers: true) send frame stream end @@ -20,7 +20,7 @@ module Plum def send_headers(headers, end_stream:) max = @connection.remote_settings[:max_frame_size] encoded = @connection.hpack_encoder.encode(headers) - frame = Frame.headers(id, encoded, :end_headers, (end_stream && :end_stream || nil)) + frame = Frame.headers(id, encoded, end_headers: true, end_stream: end_stream) send frame @state = :half_closed_local if end_stream end @@ -33,11 +33,10 @@ module Plum if data.is_a?(IO) until data.eof? fragment = data.readpartial(max) - send Frame.data(id, fragment, (end_stream && data.eof? && :end_stream)) + send Frame.data(id, fragment, end_stream: end_stream && data.eof?) end else - frame = Frame.data(id, data, (end_stream && :end_stream)) - send frame + send Frame.data(id, data, end_stream: end_stream) end @state = :half_closed_local if end_stream end diff --git a/test/plum/client/test_upgrade_client_session.rb b/test/plum/client/test_upgrade_client_session.rb index d695142..3cc97b4 100644 --- a/test/plum/client/test_upgrade_client_session.rb +++ b/test/plum/client/test_upgrade_client_session.rb @@ -24,8 +24,8 @@ class UpgradeClientSessionTest < Minitest::Test sock.rio.string << Frame.settings().assemble sock.rio.string << Frame.settings(:ack).assemble res = session.request({ ":method" => "GET", ":path" => "/aa" }, "aa", {}) - sock.rio.string << Frame.headers(3, HPACK::Encoder.new(3).encode(":status" => "200", "content-length" => "3"), :end_headers).assemble - sock.rio.string << Frame.data(3, "aaa", :end_stream).assemble + sock.rio.string << Frame.headers(3, HPACK::Encoder.new(3).encode(":status" => "200", "content-length" => "3"), end_headers: true).assemble + sock.rio.string << Frame.data(3, "aaa", end_stream: true).assemble session.succ until res.finished? assert(res.finished?) assert_equal("aaa", res.body) diff --git a/test/plum/test_connection_utils.rb b/test/plum/test_connection_utils.rb index 0051117..35ab7b8 100644 --- a/test/plum/test_connection_utils.rb +++ b/test/plum/test_connection_utils.rb @@ -16,7 +16,7 @@ class ServerConnectionUtilsTest < Minitest::Test def test_server_goaway open_server_connection {|con| - con << Frame.headers(3, "", :end_stream, :end_headers).assemble + con << Frame.headers(3, "", end_stream: true, end_headers: true).assemble con.goaway(:stream_closed) last = sent_frames.last diff --git a/test/plum/test_flow_control.rb b/test/plum/test_flow_control.rb index 758472d..8133e0e 100644 --- a/test/plum/test_flow_control.rb +++ b/test/plum/test_flow_control.rb @@ -135,17 +135,17 @@ class FlowControlTest < Minitest::Test prepare.call {|con, stream| con.window_update(500) # extend only connection - con << Frame.headers(stream.id, "", :end_headers).assemble + con << Frame.headers(stream.id, "", end_headers: true).assemble assert_stream_error(:flow_control_error) { - con << Frame.data(stream.id, "\x00" * 30, :end_stream).assemble + con << Frame.data(stream.id, "\x00" * 30, end_stream: true).assemble } } prepare.call {|con, stream| stream.window_update(500) # extend only stream - con << Frame.headers(stream.id, "", :end_headers).assemble + con << Frame.headers(stream.id, "", end_headers: true).assemble assert_connection_error(:flow_control_error) { - con << Frame.data(stream.id, "\x00" * 30, :end_stream).assemble + con << Frame.data(stream.id, "\x00" * 30, end_stream: true).assemble } } end @@ -155,8 +155,8 @@ class FlowControlTest < Minitest::Test con = stream.connection con.settings(initial_window_size: 24) stream.window_update(1) - con << Frame.headers(stream.id, "", :end_headers).assemble - con << Frame.data(stream.id, "\x00" * 20, :end_stream).assemble + con << Frame.headers(stream.id, "", end_headers: true).assemble + con << Frame.data(stream.id, "\x00" * 20, end_stream: true).assemble assert_equal(4, con.recv_remaining_window) assert_equal(5, stream.recv_remaining_window) con.settings(initial_window_size: 60) diff --git a/test/plum/test_frame_factory.rb b/test/plum/test_frame_factory.rb index ccaa56c..109fb95 100644 --- a/test/plum/test_frame_factory.rb +++ b/test/plum/test_frame_factory.rb @@ -55,7 +55,7 @@ class FrameFactoryTest < Minitest::Test end def test_continuation - frame = Frame.continuation(123, "abc", :end_headers) + frame = Frame.continuation(123, "abc", end_headers: true) assert_frame(frame, type: :continuation, stream_id: 123, @@ -74,7 +74,7 @@ class FrameFactoryTest < Minitest::Test end def test_headers - frame = Frame.headers(123, "abc", :end_stream) + frame = Frame.headers(123, "abc", end_stream: true) assert_frame(frame, type: :headers, stream_id: 123, @@ -83,7 +83,7 @@ class FrameFactoryTest < Minitest::Test end def test_push_promise - frame = Frame.push_promise(345, 2, "abc", :end_headers) + frame = Frame.push_promise(345, 2, "abc", end_headers: true) assert_frame(frame, type: :push_promise, stream_id: 345, diff --git a/test/plum/test_stream.rb b/test/plum/test_stream.rb index a9de58e..9b06214 100644 --- a/test/plum/test_stream.rb +++ b/test/plum/test_stream.rb @@ -28,7 +28,7 @@ class StreamTest < Minitest::Test } assert_stream_error(:frame_size_error) { - con << Frame.headers(1, "", :end_headers).assemble + con << Frame.headers(1, "", end_headers: true).assemble } last = sent_frames.last @@ -43,7 +43,7 @@ class StreamTest < Minitest::Test stream = nil con.on(:headers) { |s| stream = s } - con << Frame.headers(1, "", :end_headers).assemble + con << Frame.headers(1, "", end_headers: true).assemble assert_raises(LocalStreamError) { con << Frame.rst_stream(1, :frame_size_error).assemble } -- cgit v1.2.3 From 02ea3e09a4318a60c61213122d0bf3e8e9547174 Mon Sep 17 00:00:00 2001 From: Kazuki Yamaguchi Date: Wed, 11 Nov 2015 20:05:33 +0900 Subject: rack/session: send WINDOW_UPDATE when receive window became too small --- lib/plum/rack/session.rb | 7 +++++++ 1 file changed, 7 insertions(+) (limited to 'lib') diff --git a/lib/plum/rack/session.rb b/lib/plum/rack/session.rb index d9efb9d..873378e 100644 --- a/lib/plum/rack/session.rb +++ b/lib/plum/rack/session.rb @@ -48,6 +48,7 @@ module Plum @plum.on(:data) { |stream, d| reqs[stream][:data] << d # TODO: store to file? + check_window(stream) } @plum.on(:end_stream) { |stream| @@ -55,6 +56,12 @@ module Plum } end + def check_window(stream) + ws = @plum.local_settings[:initial_window_size] + stream.window_update(ws) if stream.recv_remaining_window < (ws / 2) + @plum.window_update(ws) if @plum.recv_remaining_window < (ws / 2) + end + def send_body(stream, body) begin if body.is_a?(IO) -- cgit v1.2.3 From 014ff6d424f5ad863a099428d865bb74c857b36e Mon Sep 17 00:00:00 2001 From: Kazuki Yamaguchi Date: Thu, 12 Nov 2015 23:21:42 +0900 Subject: rack/handler/plum: require plum/rack --- lib/rack/handler/plum.rb | 2 ++ 1 file changed, 2 insertions(+) (limited to 'lib') diff --git a/lib/rack/handler/plum.rb b/lib/rack/handler/plum.rb index cf34ee4..97108a8 100644 --- a/lib/rack/handler/plum.rb +++ b/lib/rack/handler/plum.rb @@ -1,4 +1,6 @@ # -*- frozen-string-literal: true -*- +require "plum/rack" + module Rack module Handler class Plum -- cgit v1.2.3