aboutsummaryrefslogtreecommitdiffstats
path: root/lib/plum/client
diff options
context:
space:
mode:
Diffstat (limited to 'lib/plum/client')
-rw-r--r--lib/plum/client/client_session.rb91
-rw-r--r--lib/plum/client/decoders.rb51
-rw-r--r--lib/plum/client/legacy_client_session.rb118
-rw-r--r--lib/plum/client/response.rb40
-rw-r--r--lib/plum/client/upgrade_client_session.rb46
5 files changed, 337 insertions, 9 deletions
diff --git a/lib/plum/client/client_session.rb b/lib/plum/client/client_session.rb
new file mode 100644
index 0000000..6e9fc56
--- /dev/null
+++ b/lib/plum/client/client_session.rb
@@ -0,0 +1,91 @@
+# -*- frozen-string-literal: true -*-
+module Plum
+ # HTTP/2 client session.
+ class ClientSession
+ HTTP2_DEFAULT_SETTINGS = {
+ enable_push: 0, # TODO: api?
+ initial_window_size: 2 ** 30, # TODO
+ }
+
+ attr_reader :plum
+
+ def initialize(socket, config)
+ @socket = socket
+ @config = config
+ @http2_settings = HTTP2_DEFAULT_SETTINGS.merge(@config[:http2_settings])
+
+ @plum = setup_plum
+ @responses = Set.new
+ end
+
+ def succ
+ @plum << @socket.readpartial(16384)
+ rescue => e
+ fail(e)
+ end
+
+ def empty?
+ @responses.empty?
+ end
+
+ def close
+ @closed = true
+ @responses.each(&:_fail)
+ @responses.clear
+ @plum.close
+ end
+
+ def request(headers, body, options, &headers_cb)
+ headers = { ":method" => nil,
+ ":path" => nil,
+ ":authority" => @config[:hostname],
+ ":scheme" => @config[:scheme]
+ }.merge(headers)
+
+ response = Response.new(**options)
+ @responses << response
+ stream = @plum.open_stream
+ stream.send_headers(headers, end_stream: !body)
+ stream.send_data(body, end_stream: true) if body
+
+ stream.on(:headers) { |resp_headers_raw|
+ response._headers(resp_headers_raw)
+ headers_cb.call(response) if headers_cb
+ }
+ stream.on(:data) { |chunk|
+ response._chunk(chunk)
+ check_window(stream)
+ }
+ stream.on(:end_stream) {
+ response._finish
+ @responses.delete(response)
+ }
+ stream.on(:stream_error) { |ex|
+ response._fail
+ raise ex
+ }
+ response
+ end
+
+ private
+ def fail(exception)
+ close
+ raise exception
+ end
+
+ def setup_plum
+ plum = ClientConnection.new(@socket.method(:write), @http2_settings)
+ plum.on(:connection_error) { |ex|
+ fail(ex)
+ }
+ plum.window_update(@http2_settings[:initial_window_size])
+ plum
+ end
+
+ def check_window(stream)
+ ws = @http2_settings[:initial_window_size]
+ stream.window_update(ws) if stream.recv_remaining_window < (ws / 2)
+ @plum.window_update(ws) if @plum.recv_remaining_window < (ws / 2)
+ end
+ end
+end
diff --git a/lib/plum/client/decoders.rb b/lib/plum/client/decoders.rb
new file mode 100644
index 0000000..e6d72e7
--- /dev/null
+++ b/lib/plum/client/decoders.rb
@@ -0,0 +1,51 @@
+module Plum
+ module Decoders
+ class Base
+ def decode(chunk)
+ chunk
+ end
+
+ def finish
+ end
+ end
+
+ # `deflate` is not just deflate, wrapped by zlib format (RFC 1950)
+ class Deflate < Base
+ def initialize
+ @inflate = Zlib::Inflate.new(Zlib::MAX_WBITS)
+ end
+
+ def decode(chunk)
+ @inflate.inflate(chunk)
+ rescue Zlib::Error => e
+ raise DecoderError.new("failed to decode chunk", e)
+ end
+
+ def finish
+ @inflate.finish
+ rescue Zlib::Error => e
+ raise DecoderError.new("failed to finalize", e)
+ end
+ end
+
+ class GZip < Base
+ def initialize
+ @stream = Zlib::Inflate.new(Zlib::MAX_WBITS + 16)
+ end
+
+ def decode(chunk)
+ @stream.inflate(chunk)
+ rescue Zlib::Error => e
+ raise DecoderError.new("failed to decode chunk", e)
+ end
+
+ def finish
+ @stream.finish
+ rescue Zlib::Error => e
+ raise DecoderError.new("failed to finalize", e)
+ end
+ end
+
+ DECODERS = { "gzip" => GZip, "deflate" => Deflate }.freeze
+ end
+end
diff --git a/lib/plum/client/legacy_client_session.rb b/lib/plum/client/legacy_client_session.rb
new file mode 100644
index 0000000..bc531ac
--- /dev/null
+++ b/lib/plum/client/legacy_client_session.rb
@@ -0,0 +1,118 @@
+# -*- frozen-string-literal: true -*-
+module Plum
+ # HTTP/1.x client session.
+ class LegacyClientSession
+ # Creates a new HTTP/1.1 client session
+ def initialize(socket, config)
+ require "http/parser"
+ @socket = socket
+ @config = config
+
+ @parser = setup_parser
+ @requests = []
+ @response = nil
+ @headers_callback = nil
+ end
+
+ def succ
+ @parser << @socket.readpartial(16384)
+ rescue => e # including HTTP::Parser::Error
+ fail(e)
+ end
+
+ def empty?
+ !@response
+ end
+
+ def close
+ @closed = true
+ @response._fail if @response
+ end
+
+ def request(headers, body, options, &headers_cb)
+ headers["host"] = headers[":authority"] || headers["host"] || @config[:hostname]
+ if body
+ if headers["content-length"] || headers["transfer-encoding"]
+ chunked = false
+ else
+ chunked = true
+ headers["transfer-encoding"] = "chunked"
+ end
+ end
+
+ response = Response.new(**options)
+ @requests << [response, headers, body, chunked, headers_cb]
+ consume_queue
+ response
+ end
+
+ private
+ def fail(exception)
+ close
+ raise exception
+ end
+
+ def consume_queue
+ return if @response || @requests.empty?
+
+ response, headers, body, chunked, cb = @requests.shift
+ @response = response
+ @headers_callback = cb
+
+ @socket << construct_request(headers)
+
+ if body
+ if chunked
+ read_object(body) { |chunk|
+ @socket << chunk.bytesize.to_s(16) << "\r\n" << chunk << "\r\n"
+ }
+ else
+ read_object(body) { |chunk| @socket << chunk }
+ end
+ end
+ end
+
+ def construct_request(headers)
+ out = String.new
+ out << "%s %s HTTP/1.1\r\n" % [headers[":method"], headers[":path"]]
+ headers.each { |key, value|
+ next if key.start_with?(":") # HTTP/2 psuedo headers
+ out << "%s: %s\r\n" % [key, value]
+ }
+ out << "\r\n"
+ end
+
+ def read_object(body)
+ if body.is_a?(String)
+ yield body
+ else # IO
+ until body.eof?
+ yield body.readpartial(1024)
+ end
+ end
+ end
+
+ def setup_parser
+ parser = HTTP::Parser.new
+ parser.on_headers_complete = proc {
+ resp_headers = parser.headers.map { |key, value| [key.downcase, value] }.to_h
+ @response._headers({ ":status" => parser.status_code.to_s }.merge(resp_headers))
+ @headers_callback.call(@response) if @headers_callback
+ }
+
+ parser.on_body = proc { |chunk|
+ @response._chunk(chunk)
+ }
+
+ parser.on_message_complete = proc { |env|
+ @response._finish
+ @response = nil
+ @headers_callback = nil
+ close unless parser.keep_alive?
+ consume_queue
+ }
+
+ parser
+ end
+ end
+end
diff --git a/lib/plum/client/response.rb b/lib/plum/client/response.rb
index 8270e95..9e50f02 100644
--- a/lib/plum/client/response.rb
+++ b/lib/plum/client/response.rb
@@ -6,14 +6,15 @@ module Plum
attr_reader :headers
# @api private
- def initialize
+ def initialize(auto_decode: true, **options)
@body = Queue.new
@finished = false
@failed = false
@body = []
+ @auto_decode = auto_decode
end
- # Return the HTTP status code.
+ # Returns the HTTP status code.
# @return [String] the HTTP status code
def status
@headers && @headers[":status"]
@@ -42,6 +43,7 @@ module Plum
# @yield [chunk] A chunk of the response body.
def on_chunk(&block)
raise "Body already read" if @on_chunk
+ raise ArgumentError, "block must be given" unless block_given?
@on_chunk = block
unless @body.empty?
@body.each(&block)
@@ -49,25 +51,34 @@ module Plum
end
end
+ # Set callback that will be called when the response finished.
+ def on_finish(&block)
+ raise ArgumentError, "block must be given" unless block_given?
+ if finished?
+ yield
+ else
+ @on_finish = block
+ end
+ end
+
# Returns the complete response body. Use #each_body instead if the body can be very large.
# @return [String] the whole response body
def body
raise "Body already read" if @on_chunk
- if finished?
- @body.join
- else
- raise "Response body is not complete"
- end
+ raise "Response body is not complete" unless finished?
+ @body.join
end
# @api private
def _headers(raw_headers)
# response headers should not have duplicates
@headers = raw_headers.to_h.freeze
+ @decoder = setup_decoder
end
# @api private
- def _chunk(chunk)
+ def _chunk(encoded)
+ chunk = @decoder.decode(encoded)
if @on_chunk
@on_chunk.call(chunk)
else
@@ -78,11 +89,22 @@ module Plum
# @api private
def _finish
@finished = true
+ @decoder.finish
+ @on_finish.call if @on_finish
end
# @api private
- def _fail(ex)
+ def _fail
@failed = true
end
+
+ private
+ def setup_decoder
+ if @auto_decode
+ klass = Decoders::DECODERS[@headers["content-encoding"]]
+ end
+ klass ||= Decoders::Base
+ klass.new
+ end
end
end
diff --git a/lib/plum/client/upgrade_client_session.rb b/lib/plum/client/upgrade_client_session.rb
new file mode 100644
index 0000000..c0e0d9e
--- /dev/null
+++ b/lib/plum/client/upgrade_client_session.rb
@@ -0,0 +1,46 @@
+# -*- frozen-string-literal: true -*-
+module Plum
+ # Try upgrade to HTTP/2
+ class UpgradeClientSession
+ def initialize(socket, config)
+ prepare_session(socket, config)
+ end
+
+ def succ
+ @session.succ
+ end
+
+ def empty?
+ @session.empty?
+ end
+
+ def close
+ @session.close
+ end
+
+ def request(headers, body, options, &headers_cb)
+ @session.request(headers, body, options, &headers_cb)
+ end
+
+ private
+ def prepare_session(socket, config)
+ lcs = LegacyClientSession.new(socket, config)
+ opt_res = lcs.request({ ":method" => "OPTIONS",
+ ":path" => "*",
+ "User-Agent" => config[:user_agent],
+ "Connection" => "Upgrade, HTTP2-Settings",
+ "Upgrade" => "h2c",
+ "HTTP2-Settings" => "" }, nil, {})
+ lcs.succ until opt_res.finished?
+
+ if opt_res.status == "101"
+ lcs.close
+ @session = ClientSession.new(socket, config)
+ @session.plum.stream(1).set_state(:half_closed_local)
+ else
+ @session = lcs
+ end
+ end
+ end
+end
+