aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--lib/plum/client.rb35
-rw-r--r--lib/plum/client/response.rb10
-rw-r--r--lib/plum/connection.rb19
-rw-r--r--lib/plum/errors.rb4
-rw-r--r--test/plum/client/test_client.rb83
-rw-r--r--test/plum/client/test_response.rb19
-rw-r--r--test/plum/test_connection.rb2
-rw-r--r--test/test_helper.rb1
8 files changed, 141 insertions, 32 deletions
diff --git a/lib/plum/client.rb b/lib/plum/client.rb
index fa5bdd8..3749482 100644
--- a/lib/plum/client.rb
+++ b/lib/plum/client.rb
@@ -48,19 +48,23 @@ module Plum
# @param response [Response] if specified, waits only for the response
def wait(response = nil)
if response
- _succ while !response.finished?
+ _succ while !response.failed? && !response.finished?
else
_succ while !@responses.empty?
end
end
+ # Waits for the response headers.
+ # @param response [Response] the incomplete response.
+ def wait_headers(response)
+ _succ while !response.failed? && !response.headers
+ end
+
# Closes the connection.
def close
- begin
- @plum.close if @plum
- ensure
- @socket.close if @socket
- end
+ @plum.close if @plum && @plum.state != :closed
+ ensure
+ @socket.close if @socket
end
# Creates a new HTTP request.
@@ -169,18 +173,22 @@ module Plum
}
plum = ClientConnection.new(sock.method(:write), local_settings)
plum.on(:protocol_error) { |ex|
- _fail
+ _fail(ex)
raise ex
}
+ plum.on(:close) { _fail(RuntimeError.new(:closed)) }
plum.on(:stream_error) { |stream, ex|
if res = @responses.delete(stream)
- res._fail unless res.finished?
+ res._fail(ex) unless res.finished?
end
raise ex
}
plum.on(:headers) { |stream, headers|
response = @responses[stream]
response._headers(headers)
+ if handler = @response_handlers.delete(stream)
+ handler.call(response)
+ end
}
plum.on(:data) { |stream, chunk|
response = @responses[stream]
@@ -189,21 +197,18 @@ module Plum
plum.on(:end_stream) { |stream|
response = @responses.delete(stream)
response._finish
- if handler = @response_handlers.delete(stream)
- handler.call(response)
- end
}
plum
end
def _succ
- _fail if @socket.closed? || @socket.eof?
@plum << @socket.readpartial(1024)
end
- def _fail
- while res = @responses.pop
- res._fail unless res.finished?
+ def _fail(ex)
+ while sr = @responses.shift
+ stream, res = sr
+ res._fail(ex) unless res.finished?
end
ensure
close
diff --git a/lib/plum/client/response.rb b/lib/plum/client/response.rb
index 6afa4f9..ee23971 100644
--- a/lib/plum/client/response.rb
+++ b/lib/plum/client/response.rb
@@ -43,9 +43,9 @@ module Plum
def each_chunk(&block)
raise "Body already read" if @body_read
@body_read = true
- while chunk = @body.pop
- if chunk == :failed
- raise EOFError
+ while !(finished? && @body.empty?) && chunk = @body.pop
+ if Exception === chunk
+ raise chunk
else
yield chunk
end
@@ -78,9 +78,9 @@ module Plum
end
# @api private
- def _fail
+ def _fail(ex)
@failed = true
- @body << :failed
+ @body << ex
end
end
end
diff --git a/lib/plum/connection.rb b/lib/plum/connection.rb
index 68ba07d..f365861 100644
--- a/lib/plum/connection.rb
+++ b/lib/plum/connection.rb
@@ -38,6 +38,7 @@ module Plum
# Emits :close event. Doesn't actually close socket.
def close
+ @state = :closed
# TODO: server MAY wait streams
callback(:close)
end
@@ -45,6 +46,7 @@ module Plum
# Receives the specified data and process.
# @param new_data [String] The data received from the peer.
def receive(new_data)
+ return if @state == :closed
return if new_data.empty?
@buffer << new_data
consume_buffer
@@ -135,9 +137,7 @@ module Plum
when :ping
receive_ping(frame)
when :goaway
- callback(:goaway, frame)
- goaway
- close
+ receive_goaway(frame)
when :data, :headers, :priority, :rst_stream, :push_promise, :continuation
raise Plum::ConnectionError.new(:protocol_error)
else
@@ -184,5 +184,18 @@ module Plum
send_immediately Frame.ping(:ack, opaque_data)
end
end
+
+ def receive_goaway(frame)
+ callback(:goaway, frame)
+ goaway
+ close
+
+ last_id = frame.payload.uint32(0)
+ error_code = frame.payload.uint32(4)
+ message = frame.payload.byteslice(8, frame.length - 8)
+ if error_code > 0
+ raise LocalConnectionError.new(HTTPError::ERROR_CODES.key(error_code), message)
+ end
+ end
end
end
diff --git a/lib/plum/errors.rb b/lib/plum/errors.rb
index 9df4668..d5cce48 100644
--- a/lib/plum/errors.rb
+++ b/lib/plum/errors.rb
@@ -43,4 +43,8 @@ module Plum
@parser = parser
end
end
+
+ # Client
+ class LocalConnectionError < HTTPError; end
+ class LocalStreamError < HTTPError; end
end
diff --git a/test/plum/client/test_client.rb b/test/plum/client/test_client.rb
index 32f9728..66479a1 100644
--- a/test/plum/client/test_client.rb
+++ b/test/plum/client/test_client.rb
@@ -15,9 +15,11 @@ class ClientTest < Minitest::Test
end
def test_request_async
- server_thread = start_tls_server
res2 = nil
- Client.start("127.0.0.1", LISTEN_PORT, https: true, verify_mode: OpenSSL::SSL::VERIFY_NONE) { |client|
+ client = nil
+ server_thread = start_tls_server
+ Client.start("127.0.0.1", LISTEN_PORT, https: true, verify_mode: OpenSSL::SSL::VERIFY_NONE) { |c|
+ client = c
res1 = client.request_async({ ":path" => "/", ":method" => "GET", ":scheme" => "https", "header" => "ccc" }) { |res1|
assert(res1.headers)
}
@@ -32,9 +34,65 @@ class ClientTest < Minitest::Test
end
def test_verify
+ client = nil
server_thread = start_tls_server
assert_raises(OpenSSL::SSL::SSLError) {
- Client.start("127.0.0.1", LISTEN_PORT, https: true, verify_mode: OpenSSL::SSL::VERIFY_PEER)
+ client = Client.start("127.0.0.1", LISTEN_PORT, https: true, verify_mode: OpenSSL::SSL::VERIFY_PEER)
+ }
+ ensure
+ server_thread.join
+ end
+
+ def test_raise_error_sync
+ client = nil
+ server_thread = start_tls_server
+ Client.start("127.0.0.1", LISTEN_PORT, https: true, verify_mode: OpenSSL::SSL::VERIFY_NONE) { |c|
+ client = c
+ assert_raises(LocalConnectionError) {
+ client.get("/connection_error")
+ }
+ }
+ ensure
+ server_thread.join
+ end
+
+ def test_raise_error_async_seq_wait
+ server_thread = start_tls_server
+ client = Client.start("127.0.0.1", LISTEN_PORT, https: true, verify_mode: OpenSSL::SSL::VERIFY_NONE)
+ res = client.get_async("/error_in_data")
+ assert_raises(LocalConnectionError) {
+ client.wait(res)
+ }
+ client.close
+ ensure
+ server_thread.join
+ end
+
+ def test_raise_error_async_seq_wait_headers
+ server_thread = start_tls_server
+ client = Client.start("127.0.0.1", LISTEN_PORT, https: true, verify_mode: OpenSSL::SSL::VERIFY_NONE)
+ res = client.get_async("/error_in_data")
+ client.wait_headers(res)
+ data = String.new
+ tt = Thread.new { client.wait }
+ assert_raises(LocalConnectionError) {
+ res.each_chunk { |c| data << c }
+ }
+ tt.join
+ client.close
+ assert_equal("a", data)
+ ensure
+ server_thread.join
+ end
+
+ def test_raise_error_async_block
+ client = nil
+ server_thread = start_tls_server
+ assert_raises(LocalConnectionError) {
+ Client.start("127.0.0.1", LISTEN_PORT, https: true, verify_mode: OpenSSL::SSL::VERIFY_NONE) { |c|
+ client = c
+ client.get_async("/connection_error") { |res| flunk "success??" }
+ } # wait
}
ensure
server_thread.join
@@ -50,6 +108,7 @@ class ClientTest < Minitest::Test
ssl_server = OpenSSL::SSL::SSLServer.new(tcp_server, ctx)
server_thread = Thread.new {
+ plum = nil
begin
Timeout.timeout(3) {
sock = ssl_server.accept
@@ -58,12 +117,20 @@ class ClientTest < Minitest::Test
plum.on(:stream) { |stream|
headers = data = nil
stream.on(:headers) { |h|
- headers = h }
+ headers = h.to_h }
stream.on(:data) { |d|
data = d }
stream.on(:end_stream) {
- stream.respond({ ":status" => 200 }, headers.to_h[":method"] + headers.to_h["header"] + data.to_s) }
- }
+ case headers[":path"]
+ when "/connection_error"
+ plum.goaway(:protocol_error)
+ when "/error_in_data"
+ stream.send_headers({ ":status" => 200 }, end_stream: false)
+ stream.send_data("a", end_stream: false)
+ raise ExampleError, "example error"
+ else
+ stream.respond({ ":status" => 200 }, headers.to_h[":method"] + headers.to_h["header"].to_s + data.to_s)
+ end } }
yield plum if block_given?
@@ -74,8 +141,8 @@ class ClientTest < Minitest::Test
rescue OpenSSL::SSL::SSLError
rescue Timeout::Error
flunk "server timeout"
- rescue => e
- flunk e
+ rescue ExampleError => e
+ plum.goaway(:internal_error) if plum
ensure
tcp_server.close
end
diff --git a/test/plum/client/test_response.rb b/test/plum/client/test_response.rb
index 003e8ff..16b9be3 100644
--- a/test/plum/client/test_response.rb
+++ b/test/plum/client/test_response.rb
@@ -9,6 +9,25 @@ class ResponseTest < Minitest::Test
assert_equal(true, resp.finished?)
end
+ def test_fail
+ resp = Response.new
+ ret = ""
+ run = false
+ t = Thread.new {
+ assert_raises {
+ run = true
+ resp.each_chunk { |chunk| ret << chunk } } }
+ resp._chunk("a")
+ resp._fail
+ timeout(3) {
+ t.join }
+ assert(run)
+ assert(true, resp.failed?)
+ rescue Timeout::Error
+ t.kill
+ flunk "timeout"
+ end
+
def test_status
resp = Response.new
resp._headers([
diff --git a/test/plum/test_connection.rb b/test/plum/test_connection.rb
index 6ab63ed..f490a87 100644
--- a/test/plum/test_connection.rb
+++ b/test/plum/test_connection.rb
@@ -88,7 +88,7 @@ class ConnectionTest < Minitest::Test
}
prepare.call {|con|
assert_equal(:waiting_continuation, con.state)
- con << Frame.new(type: :continuation, flags: [:end_headers], stream_id: 3, payload: "hello").assemble
+ con << Frame.new(type: :continuation, flags: [:end_headers], stream_id: 3, payload: "").assemble
assert_equal(:open, con.state)
}
end
diff --git a/test/test_helper.rb b/test/test_helper.rb
index 2e3271b..20f76ad 100644
--- a/test/test_helper.rb
+++ b/test/test_helper.rb
@@ -28,3 +28,4 @@ end
LISTEN_PORT = ENV["PLUM_LISTEN_PORT"] || 40444
TLS_CERT = OpenSSL::X509::Certificate.new File.read(File.expand_path("../server.crt", __FILE__))
TLS_KEY = OpenSSL::PKey::RSA.new File.read(File.expand_path("../server.key", __FILE__))
+ExampleError = Class.new(RuntimeError)