aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorKazuki Yamaguchi <k@rhe.jp>2015-11-04 00:57:02 +0900
committerKazuki Yamaguchi <k@rhe.jp>2015-11-04 00:57:02 +0900
commitb33d179b001119155d4dc0bd8dc43a18a29cfaa9 (patch)
tree580869fe3642ce637717386ef324680f0e8669ac
parentdc870b7b2e8da6f96c3e5141d0341d5a29ca38f4 (diff)
downloadplum-b33d179b001119155d4dc0bd8dc43a18a29cfaa9.tar.gz
connection: fix stream id managing
-rw-r--r--lib/plum/client/connection.rb7
-rw-r--r--lib/plum/connection.rb41
-rw-r--r--lib/plum/connection_utils.rb2
-rw-r--r--lib/plum/server/connection.rb6
-rw-r--r--lib/plum/server/http_connection.rb2
-rw-r--r--lib/plum/stream.rb32
-rw-r--r--test/plum/client/test_connection.rb5
-rw-r--r--test/plum/server/test_https_connection.rb4
-rw-r--r--test/utils/server.rb8
9 files changed, 66 insertions, 41 deletions
diff --git a/lib/plum/client/connection.rb b/lib/plum/client/connection.rb
index bab2dc0..8721fb5 100644
--- a/lib/plum/client/connection.rb
+++ b/lib/plum/client/connection.rb
@@ -12,10 +12,9 @@ module Plum
# Create a new stream for HTTP request.
# @param args [Hash] the argument for Stream.new
- def open_stream(**args)
- next_id = @max_odd_stream_id > 0 ? @max_odd_stream_id + 2 : 1
- stream = new_stream(next_id, **args)
- stream
+ def open_stream
+ next_id = @max_stream_id + (@max_stream_id.even? ? 1 : 2)
+ stream(next_id)
end
end
end
diff --git a/lib/plum/connection.rb b/lib/plum/connection.rb
index 0dedad3..68ba07d 100644
--- a/lib/plum/connection.rb
+++ b/lib/plum/connection.rb
@@ -23,7 +23,7 @@ module Plum
attr_reader :state, :streams
def initialize(writer, local_settings = {})
- @state = nil
+ @state = :open
@writer = writer
@local_settings = Hash.new {|hash, key| DEFAULT_SETTINGS[key] }.merge!(local_settings)
@remote_settings = Hash.new {|hash, key| DEFAULT_SETTINGS[key] }
@@ -33,10 +33,8 @@ module Plum
@hpack_encoder = HPACK::Encoder.new(@remote_settings[:header_table_size])
initialize_flow_control(send: @remote_settings[:initial_window_size],
recv: @local_settings[:initial_window_size])
- @max_odd_stream_id = 0
- @max_even_stream_id = 0
+ @max_stream_id = 0
end
- private :initialize
# Emits :close event. Doesn't actually close socket.
def close
@@ -70,16 +68,24 @@ module Plum
@writer.call(frame.assemble)
end
- def new_stream(stream_id, **args)
- if stream_id.even?
- @max_even_stream_id = stream_id
+ def stream(stream_id)
+ raise ArgumentError, "stream_id can't be 0" if stream_id == 0
+
+ stream = @streams[stream_id]
+ if stream
+ if stream.state == :idle && stream.id < @max_stream_id
+ stream.set_state(:closed_implicitly)
+ end
+ elsif stream_id > @max_stream_id
+ @max_stream_id = stream_id
+ stream = Stream.new(self, stream_id, state: :idle)
+ callback(:stream, stream)
+ @streams[stream_id] = stream
else
- @max_odd_stream_id = stream_id
+ stream = Stream.new(self, stream_id, state: :closed_implicitly)
+ callback(:stream, stream)
end
- stream = Stream.new(self, stream_id, **args)
- @streams[stream_id] = stream
- callback(:stream, stream)
stream
end
@@ -92,14 +98,12 @@ module Plum
if frame.type != :continuation || frame.stream_id != @continuation_id
raise ConnectionError.new(:protocol_error)
end
-
if frame.end_headers?
@state = :open
- @continuation_id = nil
end
end
- if frame.type == :headers
+ if frame.type == :headers || frame.type == :push_promise
if !frame.end_headers?
@state = :waiting_continuation
@continuation_id = frame.stream_id
@@ -114,14 +118,7 @@ module Plum
if frame.stream_id == 0
receive_control_frame(frame)
else
- unless stream = @streams[frame.stream_id]
- if frame.stream_id.even? || @max_odd_stream_id >= frame.stream_id
- raise Plum::ConnectionError.new(:protocol_error)
- end
-
- stream = new_stream(frame.stream_id)
- end
- stream.receive_frame(frame)
+ stream(frame.stream_id).receive_frame(frame)
end
end
diff --git a/lib/plum/connection_utils.rb b/lib/plum/connection_utils.rb
index 8c96267..ecedfe2 100644
--- a/lib/plum/connection_utils.rb
+++ b/lib/plum/connection_utils.rb
@@ -20,7 +20,7 @@ module Plum
# Sends GOAWAY frame to the peer and closes the connection.
# @param error_type [Symbol] The error type to be contained in the GOAWAY frame.
def goaway(error_type = :no_error)
- last_id = @max_odd_stream_id > @max_even_stream_id ? @max_odd_stream_id : @max_even_stream_id
+ last_id = @max_stream_id
send_immediately Frame.goaway(last_id, error_type)
end
diff --git a/lib/plum/server/connection.rb b/lib/plum/server/connection.rb
index c12ef00..450dcf6 100644
--- a/lib/plum/server/connection.rb
+++ b/lib/plum/server/connection.rb
@@ -11,8 +11,10 @@ module Plum
# Reserves a new stream to server push.
# @param args [Hash] The argument to pass to Stram.new.
def reserve_stream(**args)
- next_id = @max_even_stream_id + 2
- stream = new_stream(next_id, state: :reserved_local, **args)
+ next_id = @max_stream_id + (@max_stream_id.odd? ? 1 : 2)
+ stream = stream(next_id)
+ stream.set_state(:reserved_local)
+ stream.update_dependency(**args)
stream
end
diff --git a/lib/plum/server/http_connection.rb b/lib/plum/server/http_connection.rb
index 0e9462c..7111bb4 100644
--- a/lib/plum/server/http_connection.rb
+++ b/lib/plum/server/http_connection.rb
@@ -70,7 +70,7 @@ module Plum
def process_first_request
encoder = HPACK::Encoder.new(0, indexing: false) # don't pollute connection's HPACK context
- stream = new_stream(1)
+ stream = stream(1)
max_frame_size = local_settings[:max_frame_size]
headers = @_headers.merge({ ":method" => @_http_parser.http_method,
":path" => @_http_parser.request_url,
diff --git a/lib/plum/stream.rb b/lib/plum/stream.rb
index 63dea66..61c7cd5 100644
--- a/lib/plum/stream.rb
+++ b/lib/plum/stream.rb
@@ -44,7 +44,9 @@ module Plum
receive_window_update(frame)
when :continuation
receive_continuation(frame)
- when :ping, :goaway, :settings, :push_promise
+ when :push_promise
+ receive_push_promise(frame)
+ when :ping, :goaway, :settings
raise ConnectionError.new(:protocol_error) # stream_id MUST be 0x00
else
# MUST ignore unknown frame
@@ -61,11 +63,12 @@ module Plum
send_immediately Frame.rst_stream(id, error_type)
end
- private
- def send_immediately(frame)
- @connection.send(frame)
+ # @api private
+ def set_state(state)
+ @state = state
end
+ # @api private
def update_dependency(weight: nil, parent: nil, exclusive: nil)
raise StreamError.new(:protocol_error, "A stream cannot depend on itself.") if parent == self
@@ -91,6 +94,11 @@ module Plum
end
end
+ private
+ def send_immediately(frame)
+ @connection.send(frame)
+ end
+
def validate_received_frame(frame)
if frame.length > @connection.local_settings[:max_frame_size]
if [:headers, :push_promise, :continuation].include?(frame.type)
@@ -166,6 +174,10 @@ module Plum
raise StreamError.new(:stream_closed)
elsif @state == :closed
raise ConnectionError.new(:stream_closed)
+ elsif @state == :closed_implicitly
+ raise ConnectionError.new(:protocol_error)
+ elsif @state == :idle && self.id.even?
+ raise ConnectionError.new(:protocol_error)
end
@state = :open
@@ -178,6 +190,18 @@ module Plum
end
end
+ def receive_push_promise(frame)
+ raise NotImplementedError
+
+ if promised_stream.state == :closed_implicitly
+ # 5.1.1 An endpoint that receives an unexpected stream identifier MUST respond with a connection error of type PROTOCOL_ERROR.
+ raise ConnectionError.new(:protocol_error)
+ elsif promised_id.odd?
+ # 5.1.1 Streams initiated by the server MUST use even-numbered stream identifiers.
+ raise ConnectionError.new(:protocol_error)
+ end
+ end
+
def receive_continuation(frame)
# state error mustn't happen: server_connection validates
@continuation << frame
diff --git a/test/plum/client/test_connection.rb b/test/plum/client/test_connection.rb
index 204eb3a..2903cfa 100644
--- a/test/plum/client/test_connection.rb
+++ b/test/plum/client/test_connection.rb
@@ -4,9 +4,8 @@ using Plum::BinaryString
class ClientConnectionTest < Minitest::Test
def test_open_stream
con = open_client_connection
- stream = con.open_stream(weight: 256)
- assert(stream.id % 2 == 1)
- assert_equal(256, stream.weight)
+ stream = con.open_stream
+ assert(stream.id % 2 == 1, "Stream ID is not odd")
assert_equal(:idle, stream.state)
end
end
diff --git a/test/plum/server/test_https_connection.rb b/test/plum/server/test_https_connection.rb
index b93d1b1..d227f73 100644
--- a/test/plum/server/test_https_connection.rb
+++ b/test/plum/server/test_https_connection.rb
@@ -59,6 +59,8 @@ class HTTPSConnectionNegotiationTest < Minitest::Test
}
rescue Timeout::Error
flunk "server timeout"
+ rescue => e
+ flunk e
ensure
tcp_server.close
end
@@ -75,6 +77,8 @@ class HTTPSConnectionNegotiationTest < Minitest::Test
ssl.write Connection::CLIENT_CONNECTION_PREFACE
ssl.write Frame.settings.assemble
sleep
+ rescue => e
+ flunk e
ensure
sock.close
end
diff --git a/test/utils/server.rb b/test/utils/server.rb
index 82eb00f..be2aba6 100644
--- a/test/utils/server.rb
+++ b/test/utils/server.rb
@@ -13,16 +13,16 @@ module ServerUtils
end
end
- def open_new_stream(arg1 = nil, **kwargs)
+ def open_new_stream(arg1 = nil, state: :idle, **kwargs)
if arg1.is_a?(ServerConnection)
con = arg1
else
con = open_server_connection
end
- @_stream = con.instance_eval {
- new_stream((con.streams.keys.last||0/2)*2+1, **kwargs)
- }
+ @_stream = con.instance_eval { stream(((@max_stream_id+1)/2)*2+1) }
+ @_stream.set_state(state)
+ @_stream.update_dependency(**kwargs)
if block_given?
yield @_stream
else