aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorKazuki Yamaguchi <k@rhe.jp>2015-11-08 12:12:45 +0900
committerKazuki Yamaguchi <k@rhe.jp>2015-11-08 12:12:45 +0900
commit2535477c65a0093aa14324a5c47d8ea12b6da255 (patch)
treeb3a57592cd4c909444967fd8268826cbd1d6cdd5
parentb12d3a31530a6dfffb9bfe473ca8be888190ac0d (diff)
downloadplum-2535477c65a0093aa14324a5c47d8ea12b6da255.tar.gz
client: split HTTP/2-specific stuffs to ClientSession
-rw-r--r--lib/plum.rb1
-rw-r--r--lib/plum/client.rb132
-rw-r--r--lib/plum/client/client_session.rb82
-rw-r--r--lib/plum/client/response.rb4
-rw-r--r--test/plum/client/test_client.rb4
-rw-r--r--test/plum/client/test_response.rb14
6 files changed, 135 insertions, 102 deletions
diff --git a/lib/plum.rb b/lib/plum.rb
index bcd72d2..7b8e2ed 100644
--- a/lib/plum.rb
+++ b/lib/plum.rb
@@ -25,3 +25,4 @@ require "plum/server/http_connection"
require "plum/client"
require "plum/client/response"
require "plum/client/connection"
+require "plum/client/client_session"
diff --git a/lib/plum/client.rb b/lib/plum/client.rb
index 526fb69..6e53ade 100644
--- a/lib/plum/client.rb
+++ b/lib/plum/client.rb
@@ -30,8 +30,6 @@ module Plum
@port = port || (config[:tls] ? 443 : 80)
end
@config = DEFAULT_CONFIG.merge(hostname: host).merge(config)
- @response_handlers = {}
- @responses = {}
@started = false
end
@@ -54,23 +52,25 @@ module Plum
# Waits for the asynchronous response(s) to finish.
# @param response [Response] if specified, waits only for the response
+ # @return [Response] if parameter response is specified
def wait(response = nil)
if response
- _succ while !response.failed? && !response.finished?
+ @session.succ until response.failed? || response.finished?
+ response
else
- _succ while !@responses.empty?
+ @session.succ until @session.empty?
end
end
# Waits for the response headers.
# @param response [Response] the incomplete response.
def wait_headers(response)
- _succ while !response.failed? && !response.headers
+ @session.succ while !response.failed? && !response.headers
end
- # Closes the connection.
+ # Closes the connection immediately.
def close
- @plum.close if @plum
+ @session.close if @session
ensure
@socket.close if @socket
end
@@ -78,40 +78,16 @@ module Plum
# Creates a new HTTP request.
# @param headers [Hash<String, String>] the request headers
# @param body [String] the request body
- # @param block [Proc] if specified, calls the block when finished
+ # @param block [Proc] if passed, it will be called when received response headers.
def request_async(headers, body = nil, &block)
- stream = @plum.open_stream
- response = Response.new
- @responses[stream] = response
-
- if body
- stream.send_headers(headers, end_stream: false)
- stream.send_data(body, end_stream: true)
- else
- stream.send_headers(headers, end_stream: true)
- end
-
- if block_given?
- @response_handlers[stream] = block
- end
-
- response
+ @session.request(headers, body, &block)
end
# Creates a new HTTP request and waits for the response
# @param headers [Hash<String, String>] the request headers
# @param body [String] the request body
- def request(headers, body = nil)
- raise ArgumentError, ":method and :path headers are required" unless headers[":method"] && headers[":path"]
-
- base_headers = { ":method" => nil,
- ":path" => nil,
- ":authority" => @config[:hostname],
- ":scheme" => @config[:scheme] }
-
- response = request_async(base_headers.merge(headers), body)
- wait(response)
- response
+ def request(headers, body = nil, &block)
+ wait @session.request(headers, body, &block)
end
# @!method get
@@ -129,8 +105,8 @@ module Plum
# @param block [Proc] if specified, calls the block when finished
# Shorthand method for `#request_async`
%w(GET HEAD DELETE).each { |method|
- define_method(:"#{method.downcase}") do |path, headers = {}|
- request({ ":method" => method, ":path" => path }.merge(headers))
+ define_method(:"#{method.downcase}") do |path, headers = {}, &block|
+ request({ ":method" => method, ":path" => path }.merge(headers), &block)
end
define_method(:"#{method.downcase}_async") do |path, headers = {}, &block|
request_async({ ":method" => method, ":path" => path }.merge(headers), nil, &block)
@@ -151,8 +127,8 @@ module Plum
# @param block [Proc] if specified, calls the block when finished
# Shorthand method for `#request_async`
%w(POST PUT).each { |method|
- define_method(:"#{method.downcase}") do |path, body = nil, headers = {}|
- request({ ":method" => method, ":path" => path }.merge(headers), body)
+ define_method(:"#{method.downcase}") do |path, body = nil, headers = {}, &block|
+ request({ ":method" => method, ":path" => path }.merge(headers), body, &block)
end
define_method(:"#{method.downcase}_async") do |path, body = nil, headers = {}, &block|
request_async({ ":method" => method, ":path" => path }.merge(headers), body, &block)
@@ -162,69 +138,33 @@ module Plum
private
def _start
@started = true
+
+ http2 = true
unless @socket
- sock = TCPSocket.open(host, port)
+ @socket = TCPSocket.open(host, port)
if config[:tls]
ctx = @config[:ssl_context] || new_ssl_ctx
- sock = OpenSSL::SSL::SSLSocket.new(sock, ctx)
- sock.hostname = @config[:hostname] if sock.respond_to?(:hostname=)
- sock.sync_close = true
- sock.connect
- sock.post_connection_check(@config[:hostname])
+ @socket = OpenSSL::SSL::SSLSocket.new(@socket, ctx)
+ @socket.hostname = @config[:hostname] if @socket.respond_to?(:hostname=)
+ @socket.sync_close = true
+ @socket.connect
+ @socket.post_connection_check(@config[:hostname]) if ctx.verify_mode != OpenSSL::SSL::VERIFY_NONE
+
+ if @socket.respond_to?(:alpn_protocol)
+ http2 = @socket.alpn_protocol == "h2"
+ elsif sock.respond_to?(:npn_protocol)
+ http2 = @socket.npn_protocol == "h2"
+ else
+ http2 = false
+ end
end
-
- @socket = sock
end
- @plum = setup_plum(@socket)
- end
-
- def setup_plum(sock)
- local_settings = {
- enable_push: 0,
- initial_window_size: (1 << 30) - 1,
- }
- plum = ClientConnection.new(sock.method(:write), local_settings)
- plum.on(:protocol_error) { |ex|
- _fail(ex)
- raise ex
- }
- plum.on(:close) { _fail(RuntimeError.new(:closed)) }
- plum.on(:stream_error) { |stream, ex|
- if res = @responses.delete(stream)
- res._fail(ex) unless res.finished?
- end
- raise ex
- }
- plum.on(:headers) { |stream, headers|
- response = @responses[stream]
- response._headers(headers)
- if handler = @response_handlers.delete(stream)
- handler.call(response)
- end
- }
- plum.on(:data) { |stream, chunk|
- response = @responses[stream]
- response._chunk(chunk)
- }
- plum.on(:end_stream) { |stream|
- response = @responses.delete(stream)
- response._finish
- }
- plum
- end
-
- def _succ
- @plum << @socket.readpartial(1024)
- end
-
- def _fail(ex)
- while sr = @responses.shift
- stream, res = sr
- res._fail(ex) unless res.finished?
+ if http2
+ @session = ClientSession.new(@socket, @config)
+ else
+ @session = LegacyClientSession.new(@socket, @config)
end
- ensure
- close
end
def new_ssl_ctx
@@ -237,7 +177,7 @@ module Plum
if ctx.respond_to?(:alpn_protocols)
ctx.alpn_protocols = ["h2", "http/1.1"]
end
- if ctx.respond_to?(:npn_select_cb)
+ if ctx.respond_to?(:npn_select_cb) # TODO: RFC 7540 does not define protocol negotiation with NPN
ctx.npn_select_cb = -> protocols {
protocols.include?("h2") ? "h2" : protocols.first
}
diff --git a/lib/plum/client/client_session.rb b/lib/plum/client/client_session.rb
new file mode 100644
index 0000000..470f100
--- /dev/null
+++ b/lib/plum/client/client_session.rb
@@ -0,0 +1,82 @@
+# -*- frozen-string-literal: true -*-
+module Plum
+ class ClientSession
+ HTTP2_DEFAULT_SETTINGS = {
+ enable_push: 0, # TODO: api?
+ initial_window_size: (1 << 30) - 1, # TODO: maximum size: disable flow control
+ }
+
+ def initialize(socket, config)
+ @socket = socket
+ @config = config
+
+ @plum = setup_plum
+ @responses = Set.new
+ end
+
+ def succ
+ @plum << @socket.readpartial(1024)
+ rescue => e
+ fail(e)
+ end
+
+ def empty?
+ @responses.empty?
+ end
+
+ def close
+ @closed = true
+ @responses.each { |response| response._fail }
+ @responses.clear
+ @plum.close
+ end
+
+ def request(headers, body = nil, &headers_cb)
+ raise ArgumentError, ":method and :path headers are required" unless headers[":method"] && headers[":path"]
+
+ @responses << (response = Response.new)
+
+ headers = { ":method" => nil,
+ ":path" => nil,
+ ":authority" => @config[:hostname],
+ ":scheme" => @config[:scheme]
+ }.merge(headers)
+
+ stream = @plum.open_stream
+ stream.send_headers(headers, end_stream: !body)
+ stream.send_data(body, end_stream: true) if body
+
+ stream.on(:headers) { |resp_headers_raw|
+ response._headers(resp_headers_raw)
+ headers_cb.call(response) if headers_cb
+ }
+ stream.on(:data) { |chunk|
+ response._chunk(chunk)
+ }
+ stream.on(:end_stream) {
+ response._finish
+ @responses.delete(response)
+ }
+ stream.on(:stream_error) { |ex|
+ response._fail
+ raise ex
+ }
+
+ response
+ end
+
+ private
+ def fail(exception)
+ close
+ raise exception
+ end
+
+ def setup_plum
+ plum = ClientConnection.new(@socket.method(:write), HTTP2_DEFAULT_SETTINGS)
+ plum.on(:connection_error) { |ex|
+ fail(ex)
+ }
+ plum
+ end
+ end
+end
diff --git a/lib/plum/client/response.rb b/lib/plum/client/response.rb
index 8270e95..5c0b2a0 100644
--- a/lib/plum/client/response.rb
+++ b/lib/plum/client/response.rb
@@ -13,7 +13,7 @@ module Plum
@body = []
end
- # Return the HTTP status code.
+ # Returns the HTTP status code.
# @return [String] the HTTP status code
def status
@headers && @headers[":status"]
@@ -81,7 +81,7 @@ module Plum
end
# @api private
- def _fail(ex)
+ def _fail
@failed = true
end
end
diff --git a/test/plum/client/test_client.rb b/test/plum/client/test_client.rb
index f8b64b0..ef93e40 100644
--- a/test/plum/client/test_client.rb
+++ b/test/plum/client/test_client.rb
@@ -7,7 +7,7 @@ class ClientTest < Minitest::Test
client = Client.start("127.0.0.1", LISTEN_PORT, https: true, verify_mode: OpenSSL::SSL::VERIFY_NONE)
res1 = client.request({ ":path" => "/", ":method" => "POST", ":scheme" => "https", "header" => "ccc" }, "abc")
assert_equal("POSTcccabc", res1.body)
- res2 = client.put("/", "aaa", { ":scheme" => "https", "header" => "ccc" })
+ res2 = client.put("/", "aaa", { "header" => "ccc" })
assert_equal("PUTcccaaa", res2.body)
client.close
ensure
@@ -93,7 +93,7 @@ class ClientTest < Minitest::Test
server_thread = Thread.new {
plum = nil
begin
- Timeout.timeout(3) {
+ Timeout.timeout(1) {
sock = ssl_server.accept
plum = HTTPSServerConnection.new(sock)
diff --git a/test/plum/client/test_response.rb b/test/plum/client/test_response.rb
index eb98e3d..553bf45 100644
--- a/test/plum/client/test_response.rb
+++ b/test/plum/client/test_response.rb
@@ -11,8 +11,7 @@ class ResponseTest < Minitest::Test
def test_fail
resp = Response.new
- resp._chunk("a")
- resp._fail(RuntimeError.new)
+ resp._fail
assert(true, resp.failed?)
end
@@ -41,6 +40,15 @@ class ResponseTest < Minitest::Test
assert_equal("ab", resp.body)
end
+ def test_body_not_finished
+ resp = Response.new
+ resp._chunk("a")
+ resp._chunk("b")
+ assert_raises { # TODO
+ resp.body
+ }
+ end
+
def test_on_chunk
resp = Response.new
res = []
@@ -49,5 +57,7 @@ class ResponseTest < Minitest::Test
resp._finish
resp.on_chunk { |chunk| res << chunk }
assert_equal(["a", "b"], res)
+ resp._chunk("c")
+ assert_equal(["a", "b", "c"], res)
end
end