From 2535477c65a0093aa14324a5c47d8ea12b6da255 Mon Sep 17 00:00:00 2001 From: Kazuki Yamaguchi Date: Sun, 8 Nov 2015 12:12:45 +0900 Subject: client: split HTTP/2-specific stuffs to ClientSession --- lib/plum.rb | 1 + lib/plum/client.rb | 132 +++++++++++--------------------------- lib/plum/client/client_session.rb | 82 +++++++++++++++++++++++ lib/plum/client/response.rb | 4 +- 4 files changed, 121 insertions(+), 98 deletions(-) create mode 100644 lib/plum/client/client_session.rb (limited to 'lib') diff --git a/lib/plum.rb b/lib/plum.rb index bcd72d2..7b8e2ed 100644 --- a/lib/plum.rb +++ b/lib/plum.rb @@ -25,3 +25,4 @@ require "plum/server/http_connection" require "plum/client" require "plum/client/response" require "plum/client/connection" +require "plum/client/client_session" diff --git a/lib/plum/client.rb b/lib/plum/client.rb index 526fb69..6e53ade 100644 --- a/lib/plum/client.rb +++ b/lib/plum/client.rb @@ -30,8 +30,6 @@ module Plum @port = port || (config[:tls] ? 443 : 80) end @config = DEFAULT_CONFIG.merge(hostname: host).merge(config) - @response_handlers = {} - @responses = {} @started = false end @@ -54,23 +52,25 @@ module Plum # Waits for the asynchronous response(s) to finish. # @param response [Response] if specified, waits only for the response + # @return [Response] if parameter response is specified def wait(response = nil) if response - _succ while !response.failed? && !response.finished? + @session.succ until response.failed? || response.finished? + response else - _succ while !@responses.empty? + @session.succ until @session.empty? end end # Waits for the response headers. # @param response [Response] the incomplete response. def wait_headers(response) - _succ while !response.failed? && !response.headers + @session.succ while !response.failed? && !response.headers end - # Closes the connection. + # Closes the connection immediately. def close - @plum.close if @plum + @session.close if @session ensure @socket.close if @socket end @@ -78,40 +78,16 @@ module Plum # Creates a new HTTP request. # @param headers [Hash] the request headers # @param body [String] the request body - # @param block [Proc] if specified, calls the block when finished + # @param block [Proc] if passed, it will be called when received response headers. def request_async(headers, body = nil, &block) - stream = @plum.open_stream - response = Response.new - @responses[stream] = response - - if body - stream.send_headers(headers, end_stream: false) - stream.send_data(body, end_stream: true) - else - stream.send_headers(headers, end_stream: true) - end - - if block_given? - @response_handlers[stream] = block - end - - response + @session.request(headers, body, &block) end # Creates a new HTTP request and waits for the response # @param headers [Hash] the request headers # @param body [String] the request body - def request(headers, body = nil) - raise ArgumentError, ":method and :path headers are required" unless headers[":method"] && headers[":path"] - - base_headers = { ":method" => nil, - ":path" => nil, - ":authority" => @config[:hostname], - ":scheme" => @config[:scheme] } - - response = request_async(base_headers.merge(headers), body) - wait(response) - response + def request(headers, body = nil, &block) + wait @session.request(headers, body, &block) end # @!method get @@ -129,8 +105,8 @@ module Plum # @param block [Proc] if specified, calls the block when finished # Shorthand method for `#request_async` %w(GET HEAD DELETE).each { |method| - define_method(:"#{method.downcase}") do |path, headers = {}| - request({ ":method" => method, ":path" => path }.merge(headers)) + define_method(:"#{method.downcase}") do |path, headers = {}, &block| + request({ ":method" => method, ":path" => path }.merge(headers), &block) end define_method(:"#{method.downcase}_async") do |path, headers = {}, &block| request_async({ ":method" => method, ":path" => path }.merge(headers), nil, &block) @@ -151,8 +127,8 @@ module Plum # @param block [Proc] if specified, calls the block when finished # Shorthand method for `#request_async` %w(POST PUT).each { |method| - define_method(:"#{method.downcase}") do |path, body = nil, headers = {}| - request({ ":method" => method, ":path" => path }.merge(headers), body) + define_method(:"#{method.downcase}") do |path, body = nil, headers = {}, &block| + request({ ":method" => method, ":path" => path }.merge(headers), body, &block) end define_method(:"#{method.downcase}_async") do |path, body = nil, headers = {}, &block| request_async({ ":method" => method, ":path" => path }.merge(headers), body, &block) @@ -162,69 +138,33 @@ module Plum private def _start @started = true + + http2 = true unless @socket - sock = TCPSocket.open(host, port) + @socket = TCPSocket.open(host, port) if config[:tls] ctx = @config[:ssl_context] || new_ssl_ctx - sock = OpenSSL::SSL::SSLSocket.new(sock, ctx) - sock.hostname = @config[:hostname] if sock.respond_to?(:hostname=) - sock.sync_close = true - sock.connect - sock.post_connection_check(@config[:hostname]) + @socket = OpenSSL::SSL::SSLSocket.new(@socket, ctx) + @socket.hostname = @config[:hostname] if @socket.respond_to?(:hostname=) + @socket.sync_close = true + @socket.connect + @socket.post_connection_check(@config[:hostname]) if ctx.verify_mode != OpenSSL::SSL::VERIFY_NONE + + if @socket.respond_to?(:alpn_protocol) + http2 = @socket.alpn_protocol == "h2" + elsif sock.respond_to?(:npn_protocol) + http2 = @socket.npn_protocol == "h2" + else + http2 = false + end end - - @socket = sock end - @plum = setup_plum(@socket) - end - - def setup_plum(sock) - local_settings = { - enable_push: 0, - initial_window_size: (1 << 30) - 1, - } - plum = ClientConnection.new(sock.method(:write), local_settings) - plum.on(:protocol_error) { |ex| - _fail(ex) - raise ex - } - plum.on(:close) { _fail(RuntimeError.new(:closed)) } - plum.on(:stream_error) { |stream, ex| - if res = @responses.delete(stream) - res._fail(ex) unless res.finished? - end - raise ex - } - plum.on(:headers) { |stream, headers| - response = @responses[stream] - response._headers(headers) - if handler = @response_handlers.delete(stream) - handler.call(response) - end - } - plum.on(:data) { |stream, chunk| - response = @responses[stream] - response._chunk(chunk) - } - plum.on(:end_stream) { |stream| - response = @responses.delete(stream) - response._finish - } - plum - end - - def _succ - @plum << @socket.readpartial(1024) - end - - def _fail(ex) - while sr = @responses.shift - stream, res = sr - res._fail(ex) unless res.finished? + if http2 + @session = ClientSession.new(@socket, @config) + else + @session = LegacyClientSession.new(@socket, @config) end - ensure - close end def new_ssl_ctx @@ -237,7 +177,7 @@ module Plum if ctx.respond_to?(:alpn_protocols) ctx.alpn_protocols = ["h2", "http/1.1"] end - if ctx.respond_to?(:npn_select_cb) + if ctx.respond_to?(:npn_select_cb) # TODO: RFC 7540 does not define protocol negotiation with NPN ctx.npn_select_cb = -> protocols { protocols.include?("h2") ? "h2" : protocols.first } diff --git a/lib/plum/client/client_session.rb b/lib/plum/client/client_session.rb new file mode 100644 index 0000000..470f100 --- /dev/null +++ b/lib/plum/client/client_session.rb @@ -0,0 +1,82 @@ +# -*- frozen-string-literal: true -*- +module Plum + class ClientSession + HTTP2_DEFAULT_SETTINGS = { + enable_push: 0, # TODO: api? + initial_window_size: (1 << 30) - 1, # TODO: maximum size: disable flow control + } + + def initialize(socket, config) + @socket = socket + @config = config + + @plum = setup_plum + @responses = Set.new + end + + def succ + @plum << @socket.readpartial(1024) + rescue => e + fail(e) + end + + def empty? + @responses.empty? + end + + def close + @closed = true + @responses.each { |response| response._fail } + @responses.clear + @plum.close + end + + def request(headers, body = nil, &headers_cb) + raise ArgumentError, ":method and :path headers are required" unless headers[":method"] && headers[":path"] + + @responses << (response = Response.new) + + headers = { ":method" => nil, + ":path" => nil, + ":authority" => @config[:hostname], + ":scheme" => @config[:scheme] + }.merge(headers) + + stream = @plum.open_stream + stream.send_headers(headers, end_stream: !body) + stream.send_data(body, end_stream: true) if body + + stream.on(:headers) { |resp_headers_raw| + response._headers(resp_headers_raw) + headers_cb.call(response) if headers_cb + } + stream.on(:data) { |chunk| + response._chunk(chunk) + } + stream.on(:end_stream) { + response._finish + @responses.delete(response) + } + stream.on(:stream_error) { |ex| + response._fail + raise ex + } + + response + end + + private + def fail(exception) + close + raise exception + end + + def setup_plum + plum = ClientConnection.new(@socket.method(:write), HTTP2_DEFAULT_SETTINGS) + plum.on(:connection_error) { |ex| + fail(ex) + } + plum + end + end +end diff --git a/lib/plum/client/response.rb b/lib/plum/client/response.rb index 8270e95..5c0b2a0 100644 --- a/lib/plum/client/response.rb +++ b/lib/plum/client/response.rb @@ -13,7 +13,7 @@ module Plum @body = [] end - # Return the HTTP status code. + # Returns the HTTP status code. # @return [String] the HTTP status code def status @headers && @headers[":status"] @@ -81,7 +81,7 @@ module Plum end # @api private - def _fail(ex) + def _fail @failed = true end end -- cgit v1.2.3