aboutsummaryrefslogtreecommitdiffstats
path: root/lib/plum/stream.rb
diff options
context:
space:
mode:
authorKazuki Yamaguchi <k@rhe.jp>2015-08-06 22:16:28 +0900
committerKazuki Yamaguchi <k@rhe.jp>2015-08-06 22:16:28 +0900
commitc767321a5a02bc8c6baeee10f706a5e8e875f8e7 (patch)
tree3a75b935906d89830cdad639a14a9ef9ba733778 /lib/plum/stream.rb
parent47af906641a1764a2b75f9c1fc9919c8547cea14 (diff)
downloadplum-c767321a5a02bc8c6baeee10f706a5e8e875f8e7.tar.gz
extract flow control feature into FlowControl module
Diffstat (limited to 'lib/plum/stream.rb')
-rw-r--r--lib/plum/stream.rb74
1 files changed, 15 insertions, 59 deletions
diff --git a/lib/plum/stream.rb b/lib/plum/stream.rb
index 98df787..ad0bfe6 100644
--- a/lib/plum/stream.rb
+++ b/lib/plum/stream.rb
@@ -2,11 +2,12 @@ using Plum::BinaryString
module Plum
class Stream
+ include FlowControl
include StreamHelper
attr_reader :id, :state, :connection
attr_reader :weight, :exclusive
- attr_accessor :parent, :recv_remaining_window, :send_remaining_window
+ attr_accessor :parent
def initialize(con, id, state: :idle, weight: 16, parent: nil, exclusive: false)
@connection = con
@@ -14,10 +15,9 @@ module Plum
@state = state
@continuation = []
@callbacks = Hash.new {|hash, key| hash[key] = [] }
- @recv_remaining_window = @connection.local_settings[:initial_window_size]
- @send_remaining_window = @connection.remote_settings[:initial_window_size]
- @send_buffer = []
+ initialize_flow_control(send: @connection.remote_settings[:initial_window_size],
+ recv: @connection.local_settings[:initial_window_size])
update_dependency(weight: weight, parent: parent, exclusive: exclusive)
end
@@ -32,6 +32,7 @@ module Plum
# @private
def process_frame(frame)
validate_received_frame(frame)
+ consume_recv_window(frame)
case frame.type
when :data
@@ -63,43 +64,9 @@ module Plum
@state = :closed
data = "".force_encoding(Encoding::BINARY)
data.push_uint32(error_code)
- send Frame.new(type: :rst_stream,
- stream_id: id,
- payload: data)
- end
-
- # Sends DATA frames remaining unsended due to the flow control. Internal.
- #
- # @private
- def consume_send_buffer
- while frame = @send_buffer.first
- break if frame.length > @send_remaining_window
- @send_buffer.shift
- @send_remaining_window -= frame.length
- send_immediately frame
- end
- end
-
- # Sends frame respecting inner-stream flow control.
- #
- # @param frame [Frame] The frame to be sent.
- def send(frame)
- case frame.type
- when :data
- @send_buffer << frame
- callback(:send_deferred, frame) if @send_remaining_window < frame.length
- consume_send_buffer
- else
- send_immediately frame
- end
- end
-
- # Sends the frame immediately ignoring inner-stream flow control.
- #
- # @param frame [Frame] The frame to be sent.
- def send_immediately(frame)
- callback(:send_frame, frame)
- @connection.send(frame)
+ send_immediately Frame.new(type: :rst_stream,
+ stream_id: id,
+ payload: data)
end
# Registers an event handler to specified event. An event can have multiple handlers.
@@ -114,6 +81,11 @@ module Plum
@callbacks[name].each {|cb| cb.call(*args) }
end
+ def send_immediately(frame)
+ callback(:send_frame, frame)
+ @connection.send(frame)
+ end
+
def update_dependency(weight: nil, parent: nil, exclusive: nil)
raise StreamError.new(:protocol_error, "A stream cannot depend on itself.") if parent == self
@weight = weight unless weight.nil?
@@ -143,11 +115,6 @@ module Plum
raise StreamError.new(:stream_closed)
end
- @recv_remaining_window -= frame.length
- if @recv_remaining_window < 0
- raise StreamError.new(:flow_control_error) # MAY
- end
-
if frame.flags.include?(:padded)
padding_length = frame.payload.uint8(0)
if padding_length >= frame.length
@@ -263,19 +230,8 @@ module Plum
@state = :closed # MUST NOT send RST_STREAM
end
- def process_window_update(frame)
- if frame.length != 4
- raise Plum::ConnectionError.new(:frame_size_error)
- end
- r_wsi = frame.payload.uint32
- r = r_wsi >> 31
- wsi = r_wsi & ~(1 << 31)
- if wsi == 0
- raise Plum::StreamError.new(:protocol_error)
- end
-
- @send_remaining_window += wsi
- consume_send_buffer
+ def local_error
+ StreamError
end
end
end