diff options
author | Kazuki Yamaguchi <k@rhe.jp> | 2015-11-15 18:02:32 +0900 |
---|---|---|
committer | Kazuki Yamaguchi <k@rhe.jp> | 2015-11-15 18:25:08 +0900 |
commit | 7a3f86dac67bc3ef9e6db26c5a58cc963790c03b (patch) | |
tree | bbd8cab92c997434abf024fc98a730c19a916c84 /lib/plum | |
parent | a31c99fd3471aedd18c0534529c521e34a808d15 (diff) | |
download | plum-7a3f86dac67bc3ef9e6db26c5a58cc963790c03b.tar.gz |
connection: fix stream id management
Diffstat (limited to 'lib/plum')
-rw-r--r-- | lib/plum/client/connection.rb | 2 | ||||
-rw-r--r-- | lib/plum/connection.rb | 12 | ||||
-rw-r--r-- | lib/plum/connection_utils.rb | 2 | ||||
-rw-r--r-- | lib/plum/rack/session.rb | 27 | ||||
-rw-r--r-- | lib/plum/server/connection.rb | 2 |
5 files changed, 22 insertions, 23 deletions
diff --git a/lib/plum/client/connection.rb b/lib/plum/client/connection.rb index 26e53b3..c244df9 100644 --- a/lib/plum/client/connection.rb +++ b/lib/plum/client/connection.rb @@ -12,7 +12,7 @@ module Plum # Create a new stream for HTTP request. def open_stream - next_id = @max_stream_id + (@max_stream_id.even? ? 1 : 2) + next_id = @max_stream_ids[1] + 2 stream(next_id) end end diff --git a/lib/plum/connection.rb b/lib/plum/connection.rb index 73f6206..5469a4d 100644 --- a/lib/plum/connection.rb +++ b/lib/plum/connection.rb @@ -33,7 +33,7 @@ module Plum @hpack_encoder = HPACK::Encoder.new(@remote_settings[:header_table_size]) initialize_flow_control(send: @remote_settings[:initial_window_size], recv: @local_settings[:initial_window_size]) - @max_stream_id = 0 + @max_stream_ids = [0, -1] # [even, odd] end # Emits :close event. Doesn't actually close socket. @@ -61,16 +61,16 @@ module Plum # Returns a Stream object with the specified ID. # @param stream_id [Integer] the stream id # @return [Stream] the stream - def stream(stream_id) + def stream(stream_id, update_max_id = true) raise ArgumentError, "stream_id can't be 0" if stream_id == 0 stream = @streams[stream_id] if stream - if stream.state == :idle && stream.id < @max_stream_id + if stream.state == :idle && stream_id < @max_stream_ids[stream_id % 2] stream.set_state(:closed_implicitly) end - elsif stream_id > @max_stream_id - @max_stream_id = stream_id + elsif stream_id > @max_stream_ids[stream_id % 2] + @max_stream_ids[stream_id % 2] = stream_id if update_max_id stream = Stream.new(self, stream_id, state: :idle) callback(:stream, stream) @streams[stream_id] = stream @@ -131,7 +131,7 @@ module Plum if frame.stream_id == 0 receive_control_frame(frame) else - stream(frame.stream_id).receive_frame(frame) + stream(frame.stream_id, frame.type == :headers).receive_frame(frame) end end diff --git a/lib/plum/connection_utils.rb b/lib/plum/connection_utils.rb index ecedfe2..ba8acbd 100644 --- a/lib/plum/connection_utils.rb +++ b/lib/plum/connection_utils.rb @@ -20,7 +20,7 @@ module Plum # Sends GOAWAY frame to the peer and closes the connection. # @param error_type [Symbol] The error type to be contained in the GOAWAY frame. def goaway(error_type = :no_error) - last_id = @max_stream_id + last_id = @max_stream_ids.max send_immediately Frame.goaway(last_id, error_type) end diff --git a/lib/plum/rack/session.rb b/lib/plum/rack/session.rb index 5e17ea1..3ed6472 100644 --- a/lib/plum/rack/session.rb +++ b/lib/plum/rack/session.rb @@ -72,16 +72,12 @@ module Plum def send_body(stream, body) begin - if body.is_a?(IO) + if body.respond_to?(:to_str) stream.send_data(body, end_stream: true) - elsif body.respond_to?(:size) - last = body.size - 1 - i = 0 - body.each { |part| - stream.send_data(part, end_stream: last == i) - i += 1 - } - stream.send_data(nil, end_stream: true) if i == 0 + elsif body.respond_to?(:readpartial) && body.respond_to?(:eof?) + until body.eof? + stream.send_data(body.readpartial(65536), end_stream: body.eof?) + end else body.each { |part| stream.send_data(part, end_stream: false) } stream.send_data(nil, end_stream: true) @@ -100,7 +96,7 @@ module Plum method, path = push.split(" ", 2) { ":authority" => authority, - ":method" => method.to_s.upcase, + ":method" => method.upcase, ":scheme" => scheme, ":path" => path } @@ -115,20 +111,23 @@ module Plum r_status, r_rawheaders, r_body = @app.call(env) r_headers, r_extheaders = extract_headers(r_status, r_rawheaders) - stream.send_headers(r_headers, end_stream: false) + no_body = r_body.respond_to?(:empty?) && r_body.empty? + + stream.send_headers(r_headers, end_stream: no_body) push_sts = extract_push(headers, r_extheaders).map { |preq| [stream.promise(preq), preq] } - send_body(stream, r_body) + send_body(stream, r_body) unless no_body push_sts.each { |st, preq| penv = new_env(preq, "".b) p_status, p_h, p_body = @app.call(penv) p_headers, _ = extract_headers(p_status, p_h) - st.send_headers(p_headers, end_stream: false) - send_body(st, p_body) + pno_body = p_body.respond_to?(:empty?) && p_body.empty? + st.send_headers(p_headers, end_stream: pno_body) + send_body(st, p_body) unless pno_body } @request_thread.delete(stream) diff --git a/lib/plum/server/connection.rb b/lib/plum/server/connection.rb index a82d4aa..b23de59 100644 --- a/lib/plum/server/connection.rb +++ b/lib/plum/server/connection.rb @@ -11,7 +11,7 @@ module Plum # Reserves a new stream to server push. # @param args [Hash] The argument to pass to Stram.new. def reserve_stream(**args) - next_id = @max_stream_id + (@max_stream_id.odd? ? 1 : 2) + next_id = @max_stream_ids[0] + 2 stream = stream(next_id) stream.set_state(:reserved_local) stream.update_dependency(**args) |