aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorKazuki Yamaguchi <k@rhe.jp>2015-11-15 18:02:32 +0900
committerKazuki Yamaguchi <k@rhe.jp>2015-11-15 18:25:08 +0900
commit7a3f86dac67bc3ef9e6db26c5a58cc963790c03b (patch)
treebbd8cab92c997434abf024fc98a730c19a916c84
parenta31c99fd3471aedd18c0534529c521e34a808d15 (diff)
downloadplum-7a3f86dac67bc3ef9e6db26c5a58cc963790c03b.tar.gz
connection: fix stream id management
-rw-r--r--lib/plum/client/connection.rb2
-rw-r--r--lib/plum/connection.rb12
-rw-r--r--lib/plum/connection_utils.rb2
-rw-r--r--lib/plum/rack/session.rb27
-rw-r--r--lib/plum/server/connection.rb2
-rw-r--r--test/utils/server.rb2
6 files changed, 23 insertions, 24 deletions
diff --git a/lib/plum/client/connection.rb b/lib/plum/client/connection.rb
index 26e53b3..c244df9 100644
--- a/lib/plum/client/connection.rb
+++ b/lib/plum/client/connection.rb
@@ -12,7 +12,7 @@ module Plum
# Create a new stream for HTTP request.
def open_stream
- next_id = @max_stream_id + (@max_stream_id.even? ? 1 : 2)
+ next_id = @max_stream_ids[1] + 2
stream(next_id)
end
end
diff --git a/lib/plum/connection.rb b/lib/plum/connection.rb
index 73f6206..5469a4d 100644
--- a/lib/plum/connection.rb
+++ b/lib/plum/connection.rb
@@ -33,7 +33,7 @@ module Plum
@hpack_encoder = HPACK::Encoder.new(@remote_settings[:header_table_size])
initialize_flow_control(send: @remote_settings[:initial_window_size],
recv: @local_settings[:initial_window_size])
- @max_stream_id = 0
+ @max_stream_ids = [0, -1] # [even, odd]
end
# Emits :close event. Doesn't actually close socket.
@@ -61,16 +61,16 @@ module Plum
# Returns a Stream object with the specified ID.
# @param stream_id [Integer] the stream id
# @return [Stream] the stream
- def stream(stream_id)
+ def stream(stream_id, update_max_id = true)
raise ArgumentError, "stream_id can't be 0" if stream_id == 0
stream = @streams[stream_id]
if stream
- if stream.state == :idle && stream.id < @max_stream_id
+ if stream.state == :idle && stream_id < @max_stream_ids[stream_id % 2]
stream.set_state(:closed_implicitly)
end
- elsif stream_id > @max_stream_id
- @max_stream_id = stream_id
+ elsif stream_id > @max_stream_ids[stream_id % 2]
+ @max_stream_ids[stream_id % 2] = stream_id if update_max_id
stream = Stream.new(self, stream_id, state: :idle)
callback(:stream, stream)
@streams[stream_id] = stream
@@ -131,7 +131,7 @@ module Plum
if frame.stream_id == 0
receive_control_frame(frame)
else
- stream(frame.stream_id).receive_frame(frame)
+ stream(frame.stream_id, frame.type == :headers).receive_frame(frame)
end
end
diff --git a/lib/plum/connection_utils.rb b/lib/plum/connection_utils.rb
index ecedfe2..ba8acbd 100644
--- a/lib/plum/connection_utils.rb
+++ b/lib/plum/connection_utils.rb
@@ -20,7 +20,7 @@ module Plum
# Sends GOAWAY frame to the peer and closes the connection.
# @param error_type [Symbol] The error type to be contained in the GOAWAY frame.
def goaway(error_type = :no_error)
- last_id = @max_stream_id
+ last_id = @max_stream_ids.max
send_immediately Frame.goaway(last_id, error_type)
end
diff --git a/lib/plum/rack/session.rb b/lib/plum/rack/session.rb
index 5e17ea1..3ed6472 100644
--- a/lib/plum/rack/session.rb
+++ b/lib/plum/rack/session.rb
@@ -72,16 +72,12 @@ module Plum
def send_body(stream, body)
begin
- if body.is_a?(IO)
+ if body.respond_to?(:to_str)
stream.send_data(body, end_stream: true)
- elsif body.respond_to?(:size)
- last = body.size - 1
- i = 0
- body.each { |part|
- stream.send_data(part, end_stream: last == i)
- i += 1
- }
- stream.send_data(nil, end_stream: true) if i == 0
+ elsif body.respond_to?(:readpartial) && body.respond_to?(:eof?)
+ until body.eof?
+ stream.send_data(body.readpartial(65536), end_stream: body.eof?)
+ end
else
body.each { |part| stream.send_data(part, end_stream: false) }
stream.send_data(nil, end_stream: true)
@@ -100,7 +96,7 @@ module Plum
method, path = push.split(" ", 2)
{
":authority" => authority,
- ":method" => method.to_s.upcase,
+ ":method" => method.upcase,
":scheme" => scheme,
":path" => path
}
@@ -115,20 +111,23 @@ module Plum
r_status, r_rawheaders, r_body = @app.call(env)
r_headers, r_extheaders = extract_headers(r_status, r_rawheaders)
- stream.send_headers(r_headers, end_stream: false)
+ no_body = r_body.respond_to?(:empty?) && r_body.empty?
+
+ stream.send_headers(r_headers, end_stream: no_body)
push_sts = extract_push(headers, r_extheaders).map { |preq|
[stream.promise(preq), preq]
}
- send_body(stream, r_body)
+ send_body(stream, r_body) unless no_body
push_sts.each { |st, preq|
penv = new_env(preq, "".b)
p_status, p_h, p_body = @app.call(penv)
p_headers, _ = extract_headers(p_status, p_h)
- st.send_headers(p_headers, end_stream: false)
- send_body(st, p_body)
+ pno_body = p_body.respond_to?(:empty?) && p_body.empty?
+ st.send_headers(p_headers, end_stream: pno_body)
+ send_body(st, p_body) unless pno_body
}
@request_thread.delete(stream)
diff --git a/lib/plum/server/connection.rb b/lib/plum/server/connection.rb
index a82d4aa..b23de59 100644
--- a/lib/plum/server/connection.rb
+++ b/lib/plum/server/connection.rb
@@ -11,7 +11,7 @@ module Plum
# Reserves a new stream to server push.
# @param args [Hash] The argument to pass to Stram.new.
def reserve_stream(**args)
- next_id = @max_stream_id + (@max_stream_id.odd? ? 1 : 2)
+ next_id = @max_stream_ids[0] + 2
stream = stream(next_id)
stream.set_state(:reserved_local)
stream.update_dependency(**args)
diff --git a/test/utils/server.rb b/test/utils/server.rb
index 82ed9ad..afb9f57 100644
--- a/test/utils/server.rb
+++ b/test/utils/server.rb
@@ -20,7 +20,7 @@ module ServerUtils
con = open_server_connection
end
- @_stream = con.instance_eval { stream(((@max_stream_id+1)/2)*2+1) }
+ @_stream = con.instance_eval { stream(@max_stream_ids[1] + 2) }
@_stream.set_state(state)
@_stream.update_dependency(**kwargs)
if block_given?