aboutsummaryrefslogtreecommitdiffstats
path: root/lib/plum/client.rb
diff options
context:
space:
mode:
Diffstat (limited to 'lib/plum/client.rb')
-rw-r--r--lib/plum/client.rb132
1 files changed, 36 insertions, 96 deletions
diff --git a/lib/plum/client.rb b/lib/plum/client.rb
index 526fb69..6e53ade 100644
--- a/lib/plum/client.rb
+++ b/lib/plum/client.rb
@@ -30,8 +30,6 @@ module Plum
@port = port || (config[:tls] ? 443 : 80)
end
@config = DEFAULT_CONFIG.merge(hostname: host).merge(config)
- @response_handlers = {}
- @responses = {}
@started = false
end
@@ -54,23 +52,25 @@ module Plum
# Waits for the asynchronous response(s) to finish.
# @param response [Response] if specified, waits only for the response
+ # @return [Response] if parameter response is specified
def wait(response = nil)
if response
- _succ while !response.failed? && !response.finished?
+ @session.succ until response.failed? || response.finished?
+ response
else
- _succ while !@responses.empty?
+ @session.succ until @session.empty?
end
end
# Waits for the response headers.
# @param response [Response] the incomplete response.
def wait_headers(response)
- _succ while !response.failed? && !response.headers
+ @session.succ while !response.failed? && !response.headers
end
- # Closes the connection.
+ # Closes the connection immediately.
def close
- @plum.close if @plum
+ @session.close if @session
ensure
@socket.close if @socket
end
@@ -78,40 +78,16 @@ module Plum
# Creates a new HTTP request.
# @param headers [Hash<String, String>] the request headers
# @param body [String] the request body
- # @param block [Proc] if specified, calls the block when finished
+ # @param block [Proc] if passed, it will be called when received response headers.
def request_async(headers, body = nil, &block)
- stream = @plum.open_stream
- response = Response.new
- @responses[stream] = response
-
- if body
- stream.send_headers(headers, end_stream: false)
- stream.send_data(body, end_stream: true)
- else
- stream.send_headers(headers, end_stream: true)
- end
-
- if block_given?
- @response_handlers[stream] = block
- end
-
- response
+ @session.request(headers, body, &block)
end
# Creates a new HTTP request and waits for the response
# @param headers [Hash<String, String>] the request headers
# @param body [String] the request body
- def request(headers, body = nil)
- raise ArgumentError, ":method and :path headers are required" unless headers[":method"] && headers[":path"]
-
- base_headers = { ":method" => nil,
- ":path" => nil,
- ":authority" => @config[:hostname],
- ":scheme" => @config[:scheme] }
-
- response = request_async(base_headers.merge(headers), body)
- wait(response)
- response
+ def request(headers, body = nil, &block)
+ wait @session.request(headers, body, &block)
end
# @!method get
@@ -129,8 +105,8 @@ module Plum
# @param block [Proc] if specified, calls the block when finished
# Shorthand method for `#request_async`
%w(GET HEAD DELETE).each { |method|
- define_method(:"#{method.downcase}") do |path, headers = {}|
- request({ ":method" => method, ":path" => path }.merge(headers))
+ define_method(:"#{method.downcase}") do |path, headers = {}, &block|
+ request({ ":method" => method, ":path" => path }.merge(headers), &block)
end
define_method(:"#{method.downcase}_async") do |path, headers = {}, &block|
request_async({ ":method" => method, ":path" => path }.merge(headers), nil, &block)
@@ -151,8 +127,8 @@ module Plum
# @param block [Proc] if specified, calls the block when finished
# Shorthand method for `#request_async`
%w(POST PUT).each { |method|
- define_method(:"#{method.downcase}") do |path, body = nil, headers = {}|
- request({ ":method" => method, ":path" => path }.merge(headers), body)
+ define_method(:"#{method.downcase}") do |path, body = nil, headers = {}, &block|
+ request({ ":method" => method, ":path" => path }.merge(headers), body, &block)
end
define_method(:"#{method.downcase}_async") do |path, body = nil, headers = {}, &block|
request_async({ ":method" => method, ":path" => path }.merge(headers), body, &block)
@@ -162,69 +138,33 @@ module Plum
private
def _start
@started = true
+
+ http2 = true
unless @socket
- sock = TCPSocket.open(host, port)
+ @socket = TCPSocket.open(host, port)
if config[:tls]
ctx = @config[:ssl_context] || new_ssl_ctx
- sock = OpenSSL::SSL::SSLSocket.new(sock, ctx)
- sock.hostname = @config[:hostname] if sock.respond_to?(:hostname=)
- sock.sync_close = true
- sock.connect
- sock.post_connection_check(@config[:hostname])
+ @socket = OpenSSL::SSL::SSLSocket.new(@socket, ctx)
+ @socket.hostname = @config[:hostname] if @socket.respond_to?(:hostname=)
+ @socket.sync_close = true
+ @socket.connect
+ @socket.post_connection_check(@config[:hostname]) if ctx.verify_mode != OpenSSL::SSL::VERIFY_NONE
+
+ if @socket.respond_to?(:alpn_protocol)
+ http2 = @socket.alpn_protocol == "h2"
+ elsif sock.respond_to?(:npn_protocol)
+ http2 = @socket.npn_protocol == "h2"
+ else
+ http2 = false
+ end
end
-
- @socket = sock
end
- @plum = setup_plum(@socket)
- end
-
- def setup_plum(sock)
- local_settings = {
- enable_push: 0,
- initial_window_size: (1 << 30) - 1,
- }
- plum = ClientConnection.new(sock.method(:write), local_settings)
- plum.on(:protocol_error) { |ex|
- _fail(ex)
- raise ex
- }
- plum.on(:close) { _fail(RuntimeError.new(:closed)) }
- plum.on(:stream_error) { |stream, ex|
- if res = @responses.delete(stream)
- res._fail(ex) unless res.finished?
- end
- raise ex
- }
- plum.on(:headers) { |stream, headers|
- response = @responses[stream]
- response._headers(headers)
- if handler = @response_handlers.delete(stream)
- handler.call(response)
- end
- }
- plum.on(:data) { |stream, chunk|
- response = @responses[stream]
- response._chunk(chunk)
- }
- plum.on(:end_stream) { |stream|
- response = @responses.delete(stream)
- response._finish
- }
- plum
- end
-
- def _succ
- @plum << @socket.readpartial(1024)
- end
-
- def _fail(ex)
- while sr = @responses.shift
- stream, res = sr
- res._fail(ex) unless res.finished?
+ if http2
+ @session = ClientSession.new(@socket, @config)
+ else
+ @session = LegacyClientSession.new(@socket, @config)
end
- ensure
- close
end
def new_ssl_ctx
@@ -237,7 +177,7 @@ module Plum
if ctx.respond_to?(:alpn_protocols)
ctx.alpn_protocols = ["h2", "http/1.1"]
end
- if ctx.respond_to?(:npn_select_cb)
+ if ctx.respond_to?(:npn_select_cb) # TODO: RFC 7540 does not define protocol negotiation with NPN
ctx.npn_select_cb = -> protocols {
protocols.include?("h2") ? "h2" : protocols.first
}