diff options
author | Kazuki Yamaguchi <k@rhe.jp> | 2015-08-06 22:16:28 +0900 |
---|---|---|
committer | Kazuki Yamaguchi <k@rhe.jp> | 2015-08-06 22:16:28 +0900 |
commit | c767321a5a02bc8c6baeee10f706a5e8e875f8e7 (patch) | |
tree | 3a75b935906d89830cdad639a14a9ef9ba733778 /lib/plum/stream.rb | |
parent | 47af906641a1764a2b75f9c1fc9919c8547cea14 (diff) | |
download | plum-c767321a5a02bc8c6baeee10f706a5e8e875f8e7.tar.gz |
extract flow control feature into FlowControl module
Diffstat (limited to 'lib/plum/stream.rb')
-rw-r--r-- | lib/plum/stream.rb | 74 |
1 files changed, 15 insertions, 59 deletions
diff --git a/lib/plum/stream.rb b/lib/plum/stream.rb index 98df787..ad0bfe6 100644 --- a/lib/plum/stream.rb +++ b/lib/plum/stream.rb @@ -2,11 +2,12 @@ using Plum::BinaryString module Plum class Stream + include FlowControl include StreamHelper attr_reader :id, :state, :connection attr_reader :weight, :exclusive - attr_accessor :parent, :recv_remaining_window, :send_remaining_window + attr_accessor :parent def initialize(con, id, state: :idle, weight: 16, parent: nil, exclusive: false) @connection = con @@ -14,10 +15,9 @@ module Plum @state = state @continuation = [] @callbacks = Hash.new {|hash, key| hash[key] = [] } - @recv_remaining_window = @connection.local_settings[:initial_window_size] - @send_remaining_window = @connection.remote_settings[:initial_window_size] - @send_buffer = [] + initialize_flow_control(send: @connection.remote_settings[:initial_window_size], + recv: @connection.local_settings[:initial_window_size]) update_dependency(weight: weight, parent: parent, exclusive: exclusive) end @@ -32,6 +32,7 @@ module Plum # @private def process_frame(frame) validate_received_frame(frame) + consume_recv_window(frame) case frame.type when :data @@ -63,43 +64,9 @@ module Plum @state = :closed data = "".force_encoding(Encoding::BINARY) data.push_uint32(error_code) - send Frame.new(type: :rst_stream, - stream_id: id, - payload: data) - end - - # Sends DATA frames remaining unsended due to the flow control. Internal. - # - # @private - def consume_send_buffer - while frame = @send_buffer.first - break if frame.length > @send_remaining_window - @send_buffer.shift - @send_remaining_window -= frame.length - send_immediately frame - end - end - - # Sends frame respecting inner-stream flow control. - # - # @param frame [Frame] The frame to be sent. - def send(frame) - case frame.type - when :data - @send_buffer << frame - callback(:send_deferred, frame) if @send_remaining_window < frame.length - consume_send_buffer - else - send_immediately frame - end - end - - # Sends the frame immediately ignoring inner-stream flow control. - # - # @param frame [Frame] The frame to be sent. - def send_immediately(frame) - callback(:send_frame, frame) - @connection.send(frame) + send_immediately Frame.new(type: :rst_stream, + stream_id: id, + payload: data) end # Registers an event handler to specified event. An event can have multiple handlers. @@ -114,6 +81,11 @@ module Plum @callbacks[name].each {|cb| cb.call(*args) } end + def send_immediately(frame) + callback(:send_frame, frame) + @connection.send(frame) + end + def update_dependency(weight: nil, parent: nil, exclusive: nil) raise StreamError.new(:protocol_error, "A stream cannot depend on itself.") if parent == self @weight = weight unless weight.nil? @@ -143,11 +115,6 @@ module Plum raise StreamError.new(:stream_closed) end - @recv_remaining_window -= frame.length - if @recv_remaining_window < 0 - raise StreamError.new(:flow_control_error) # MAY - end - if frame.flags.include?(:padded) padding_length = frame.payload.uint8(0) if padding_length >= frame.length @@ -263,19 +230,8 @@ module Plum @state = :closed # MUST NOT send RST_STREAM end - def process_window_update(frame) - if frame.length != 4 - raise Plum::ConnectionError.new(:frame_size_error) - end - r_wsi = frame.payload.uint32 - r = r_wsi >> 31 - wsi = r_wsi & ~(1 << 31) - if wsi == 0 - raise Plum::StreamError.new(:protocol_error) - end - - @send_remaining_window += wsi - consume_send_buffer + def local_error + StreamError end end end |