diff options
Diffstat (limited to 'lib/plum/client')
-rw-r--r-- | lib/plum/client/client_session.rb | 91 | ||||
-rw-r--r-- | lib/plum/client/decoders.rb | 51 | ||||
-rw-r--r-- | lib/plum/client/legacy_client_session.rb | 118 | ||||
-rw-r--r-- | lib/plum/client/response.rb | 40 | ||||
-rw-r--r-- | lib/plum/client/upgrade_client_session.rb | 46 |
5 files changed, 337 insertions, 9 deletions
diff --git a/lib/plum/client/client_session.rb b/lib/plum/client/client_session.rb new file mode 100644 index 0000000..6e9fc56 --- /dev/null +++ b/lib/plum/client/client_session.rb @@ -0,0 +1,91 @@ +# -*- frozen-string-literal: true -*- +module Plum + # HTTP/2 client session. + class ClientSession + HTTP2_DEFAULT_SETTINGS = { + enable_push: 0, # TODO: api? + initial_window_size: 2 ** 30, # TODO + } + + attr_reader :plum + + def initialize(socket, config) + @socket = socket + @config = config + @http2_settings = HTTP2_DEFAULT_SETTINGS.merge(@config[:http2_settings]) + + @plum = setup_plum + @responses = Set.new + end + + def succ + @plum << @socket.readpartial(16384) + rescue => e + fail(e) + end + + def empty? + @responses.empty? + end + + def close + @closed = true + @responses.each(&:_fail) + @responses.clear + @plum.close + end + + def request(headers, body, options, &headers_cb) + headers = { ":method" => nil, + ":path" => nil, + ":authority" => @config[:hostname], + ":scheme" => @config[:scheme] + }.merge(headers) + + response = Response.new(**options) + @responses << response + stream = @plum.open_stream + stream.send_headers(headers, end_stream: !body) + stream.send_data(body, end_stream: true) if body + + stream.on(:headers) { |resp_headers_raw| + response._headers(resp_headers_raw) + headers_cb.call(response) if headers_cb + } + stream.on(:data) { |chunk| + response._chunk(chunk) + check_window(stream) + } + stream.on(:end_stream) { + response._finish + @responses.delete(response) + } + stream.on(:stream_error) { |ex| + response._fail + raise ex + } + response + end + + private + def fail(exception) + close + raise exception + end + + def setup_plum + plum = ClientConnection.new(@socket.method(:write), @http2_settings) + plum.on(:connection_error) { |ex| + fail(ex) + } + plum.window_update(@http2_settings[:initial_window_size]) + plum + end + + def check_window(stream) + ws = @http2_settings[:initial_window_size] + stream.window_update(ws) if stream.recv_remaining_window < (ws / 2) + @plum.window_update(ws) if @plum.recv_remaining_window < (ws / 2) + end + end +end diff --git a/lib/plum/client/decoders.rb b/lib/plum/client/decoders.rb new file mode 100644 index 0000000..e6d72e7 --- /dev/null +++ b/lib/plum/client/decoders.rb @@ -0,0 +1,51 @@ +module Plum + module Decoders + class Base + def decode(chunk) + chunk + end + + def finish + end + end + + # `deflate` is not just deflate, wrapped by zlib format (RFC 1950) + class Deflate < Base + def initialize + @inflate = Zlib::Inflate.new(Zlib::MAX_WBITS) + end + + def decode(chunk) + @inflate.inflate(chunk) + rescue Zlib::Error => e + raise DecoderError.new("failed to decode chunk", e) + end + + def finish + @inflate.finish + rescue Zlib::Error => e + raise DecoderError.new("failed to finalize", e) + end + end + + class GZip < Base + def initialize + @stream = Zlib::Inflate.new(Zlib::MAX_WBITS + 16) + end + + def decode(chunk) + @stream.inflate(chunk) + rescue Zlib::Error => e + raise DecoderError.new("failed to decode chunk", e) + end + + def finish + @stream.finish + rescue Zlib::Error => e + raise DecoderError.new("failed to finalize", e) + end + end + + DECODERS = { "gzip" => GZip, "deflate" => Deflate }.freeze + end +end diff --git a/lib/plum/client/legacy_client_session.rb b/lib/plum/client/legacy_client_session.rb new file mode 100644 index 0000000..bc531ac --- /dev/null +++ b/lib/plum/client/legacy_client_session.rb @@ -0,0 +1,118 @@ +# -*- frozen-string-literal: true -*- +module Plum + # HTTP/1.x client session. + class LegacyClientSession + # Creates a new HTTP/1.1 client session + def initialize(socket, config) + require "http/parser" + @socket = socket + @config = config + + @parser = setup_parser + @requests = [] + @response = nil + @headers_callback = nil + end + + def succ + @parser << @socket.readpartial(16384) + rescue => e # including HTTP::Parser::Error + fail(e) + end + + def empty? + !@response + end + + def close + @closed = true + @response._fail if @response + end + + def request(headers, body, options, &headers_cb) + headers["host"] = headers[":authority"] || headers["host"] || @config[:hostname] + if body + if headers["content-length"] || headers["transfer-encoding"] + chunked = false + else + chunked = true + headers["transfer-encoding"] = "chunked" + end + end + + response = Response.new(**options) + @requests << [response, headers, body, chunked, headers_cb] + consume_queue + response + end + + private + def fail(exception) + close + raise exception + end + + def consume_queue + return if @response || @requests.empty? + + response, headers, body, chunked, cb = @requests.shift + @response = response + @headers_callback = cb + + @socket << construct_request(headers) + + if body + if chunked + read_object(body) { |chunk| + @socket << chunk.bytesize.to_s(16) << "\r\n" << chunk << "\r\n" + } + else + read_object(body) { |chunk| @socket << chunk } + end + end + end + + def construct_request(headers) + out = String.new + out << "%s %s HTTP/1.1\r\n" % [headers[":method"], headers[":path"]] + headers.each { |key, value| + next if key.start_with?(":") # HTTP/2 psuedo headers + out << "%s: %s\r\n" % [key, value] + } + out << "\r\n" + end + + def read_object(body) + if body.is_a?(String) + yield body + else # IO + until body.eof? + yield body.readpartial(1024) + end + end + end + + def setup_parser + parser = HTTP::Parser.new + parser.on_headers_complete = proc { + resp_headers = parser.headers.map { |key, value| [key.downcase, value] }.to_h + @response._headers({ ":status" => parser.status_code.to_s }.merge(resp_headers)) + @headers_callback.call(@response) if @headers_callback + } + + parser.on_body = proc { |chunk| + @response._chunk(chunk) + } + + parser.on_message_complete = proc { |env| + @response._finish + @response = nil + @headers_callback = nil + close unless parser.keep_alive? + consume_queue + } + + parser + end + end +end diff --git a/lib/plum/client/response.rb b/lib/plum/client/response.rb index 8270e95..9e50f02 100644 --- a/lib/plum/client/response.rb +++ b/lib/plum/client/response.rb @@ -6,14 +6,15 @@ module Plum attr_reader :headers # @api private - def initialize + def initialize(auto_decode: true, **options) @body = Queue.new @finished = false @failed = false @body = [] + @auto_decode = auto_decode end - # Return the HTTP status code. + # Returns the HTTP status code. # @return [String] the HTTP status code def status @headers && @headers[":status"] @@ -42,6 +43,7 @@ module Plum # @yield [chunk] A chunk of the response body. def on_chunk(&block) raise "Body already read" if @on_chunk + raise ArgumentError, "block must be given" unless block_given? @on_chunk = block unless @body.empty? @body.each(&block) @@ -49,25 +51,34 @@ module Plum end end + # Set callback that will be called when the response finished. + def on_finish(&block) + raise ArgumentError, "block must be given" unless block_given? + if finished? + yield + else + @on_finish = block + end + end + # Returns the complete response body. Use #each_body instead if the body can be very large. # @return [String] the whole response body def body raise "Body already read" if @on_chunk - if finished? - @body.join - else - raise "Response body is not complete" - end + raise "Response body is not complete" unless finished? + @body.join end # @api private def _headers(raw_headers) # response headers should not have duplicates @headers = raw_headers.to_h.freeze + @decoder = setup_decoder end # @api private - def _chunk(chunk) + def _chunk(encoded) + chunk = @decoder.decode(encoded) if @on_chunk @on_chunk.call(chunk) else @@ -78,11 +89,22 @@ module Plum # @api private def _finish @finished = true + @decoder.finish + @on_finish.call if @on_finish end # @api private - def _fail(ex) + def _fail @failed = true end + + private + def setup_decoder + if @auto_decode + klass = Decoders::DECODERS[@headers["content-encoding"]] + end + klass ||= Decoders::Base + klass.new + end end end diff --git a/lib/plum/client/upgrade_client_session.rb b/lib/plum/client/upgrade_client_session.rb new file mode 100644 index 0000000..c0e0d9e --- /dev/null +++ b/lib/plum/client/upgrade_client_session.rb @@ -0,0 +1,46 @@ +# -*- frozen-string-literal: true -*- +module Plum + # Try upgrade to HTTP/2 + class UpgradeClientSession + def initialize(socket, config) + prepare_session(socket, config) + end + + def succ + @session.succ + end + + def empty? + @session.empty? + end + + def close + @session.close + end + + def request(headers, body, options, &headers_cb) + @session.request(headers, body, options, &headers_cb) + end + + private + def prepare_session(socket, config) + lcs = LegacyClientSession.new(socket, config) + opt_res = lcs.request({ ":method" => "OPTIONS", + ":path" => "*", + "User-Agent" => config[:user_agent], + "Connection" => "Upgrade, HTTP2-Settings", + "Upgrade" => "h2c", + "HTTP2-Settings" => "" }, nil, {}) + lcs.succ until opt_res.finished? + + if opt_res.status == "101" + lcs.close + @session = ClientSession.new(socket, config) + @session.plum.stream(1).set_state(:half_closed_local) + else + @session = lcs + end + end + end +end + |