aboutsummaryrefslogtreecommitdiffstats
path: root/lib
diff options
context:
space:
mode:
authorKazuki Yamaguchi <k@rhe.jp>2015-08-06 22:16:28 +0900
committerKazuki Yamaguchi <k@rhe.jp>2015-08-06 22:16:28 +0900
commitc767321a5a02bc8c6baeee10f706a5e8e875f8e7 (patch)
tree3a75b935906d89830cdad639a14a9ef9ba733778 /lib
parent47af906641a1764a2b75f9c1fc9919c8547cea14 (diff)
downloadplum-c767321a5a02bc8c6baeee10f706a5e8e875f8e7.tar.gz
extract flow control feature into FlowControl module
Diffstat (limited to 'lib')
-rw-r--r--lib/plum.rb1
-rw-r--r--lib/plum/flow_control.rb95
-rw-r--r--lib/plum/server_connection.rb93
-rw-r--r--lib/plum/server_connection_helper.rb10
-rw-r--r--lib/plum/stream.rb74
-rw-r--r--lib/plum/stream_helper.rb9
6 files changed, 131 insertions, 151 deletions
diff --git a/lib/plum.rb b/lib/plum.rb
index cbb02b0..d37a22c 100644
--- a/lib/plum.rb
+++ b/lib/plum.rb
@@ -10,6 +10,7 @@ require "plum/hpack/decoder"
require "plum/hpack/encoder"
require "plum/frame_helper"
require "plum/frame"
+require "plum/flow_control"
require "plum/stream_helper"
require "plum/stream"
require "plum/server_connection_helper"
diff --git a/lib/plum/flow_control.rb b/lib/plum/flow_control.rb
new file mode 100644
index 0000000..b62b39e
--- /dev/null
+++ b/lib/plum/flow_control.rb
@@ -0,0 +1,95 @@
+using Plum::BinaryString
+
+module Plum
+ module FlowControl
+ attr_reader :send_remaining_window, :recv_remaining_window
+
+ # Sends frame respecting inner-stream flow control.
+ #
+ # @param frame [Frame] The frame to be sent.
+ def send(frame)
+ case frame.type
+ when :data
+ @send_buffer << frame
+ callback(:send_deferred, frame) if @send_remaining_window < frame.length
+ consume_send_buffer
+ else
+ send_immediately frame
+ end
+ end
+
+ # Increases receiving window size. Sends WINDOW_UPDATE frame to the peer.
+ #
+ # @param wsi [Integer] The amount to increase receiving window size. The legal range is 1 to 2^32-1.
+ def window_update(wsi)
+ @recv_remaining_window += wsi
+ payload = "".push_uint32(wsi & ~(1 << 31))
+ sid = (Stream === self) ? self.id : 0
+ send_immediately Frame.new(type: :window_update, stream_id: sid, payload: payload)
+ end
+
+ protected
+ def update_send_initial_window_size(diff)
+ @send_remaining_window += diff
+ consume_send_buffer
+
+ if ServerConnection === self
+ @streams.values.each do |stream|
+ stream.update_send_initial_window_size(diff)
+ end
+ end
+ end
+
+ def update_recv_initial_window_size(diff)
+ @recv_remaining_window += diff
+ if ServerConnection === self
+ @streams.values.each do |stream|
+ stream.update_recv_initial_window_size(diff)
+ end
+ end
+ end
+
+ private
+ def initialize_flow_control(send:, recv:)
+ @send_buffer = []
+ @send_remaining_window = send
+ @recv_remaining_window = recv
+ end
+
+ def consume_recv_window(frame)
+ case frame.type
+ when :data
+ @recv_remaining_window -= frame.length
+ if @recv_remaining_window < 0
+ raise local_error.new(:flow_control_error)
+ end
+ end
+ end
+
+ def consume_send_buffer
+ while frame = @send_buffer.first
+ break if frame.length > @send_remaining_window
+ @send_buffer.shift
+ @send_remaining_window -= frame.length
+ send_immediately frame
+ end
+ end
+
+ def process_window_update(frame)
+ if frame.length != 4
+ raise Plum::ConnectionError.new(:frame_size_error)
+ end
+
+ r_wsi = frame.payload.uint32
+ r = r_wsi >> 31
+ wsi = r_wsi & ~(1 << 31)
+
+ if wsi == 0
+ raise local_error.new(:protocol_error)
+ end
+
+ @send_remaining_window += wsi
+ consume_send_buffer
+ end
+ end
+end
diff --git a/lib/plum/server_connection.rb b/lib/plum/server_connection.rb
index 33d4d01..18df556 100644
--- a/lib/plum/server_connection.rb
+++ b/lib/plum/server_connection.rb
@@ -2,6 +2,7 @@ using Plum::BinaryString
module Plum
class ServerConnection
+ include FlowControl
include ServerConnectionHelper
CLIENT_CONNECTION_PREFACE = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
@@ -18,7 +19,6 @@ module Plum
attr_reader :hpack_encoder, :hpack_decoder
attr_reader :local_settings, :remote_settings
attr_reader :state, :socket, :streams
- attr_reader :recv_remaining_window, :send_remaining_window
def initialize(socket, local_settings = {})
@socket = socket
@@ -30,9 +30,8 @@ module Plum
@state = :waiting_connetion_preface
@hpack_decoder = HPACK::Decoder.new(@local_settings[:header_table_size])
@hpack_encoder = HPACK::Encoder.new(@remote_settings[:header_table_size])
- @recv_remaining_window = @local_settings[:initial_window_size]
- @send_remaining_window = @remote_settings[:initial_window_size]
- @send_buffer = []
+ initialize_flow_control(send: @remote_settings[:initial_window_size],
+ recv: @local_settings[:initial_window_size])
end
# Registers an event handler to specified event. An event can have multiple handlers.
@@ -42,20 +41,6 @@ module Plum
@callbacks[name] << blk
end
- # Sends frame respecting inner-stream flow control.
- #
- # @param frame [Frame] The frame to be sent.
- def send(frame)
- case frame.type
- when :data
- @send_buffer << frame
- callback(:send_deferred, frame) if @send_remaining_window < frame.length
- consume_send_buffer
- else
- send_immediately frame
- end
- end
-
# Starts communication with the peer. It blocks until the socket is closed, or reaches EOF.
def start
settings(@local_settings)
@@ -76,10 +61,9 @@ module Plum
data.push_uint32((last_id || 0) & ~(1 << 31))
data.push_uint32(error_code)
data.push("") # debug message
- error = Frame.new(type: :goaway,
- stream_id: 0,
- payload: data)
- send(error)
+ send_immediately Frame.new(type: :goaway,
+ stream_id: 0,
+ payload: data)
# TODO: server MAY wait streams
@socket.close
end
@@ -127,6 +111,11 @@ module Plum
@callbacks[name].each {|cb| cb.call(*args) }
end
+ def send_immediately(frame)
+ callback(:send_frame, frame)
+ @socket.write(frame.assemble)
+ end
+
def validate_received_frame(frame)
case @state
when :waiting_settings
@@ -208,7 +197,7 @@ module Plum
callback(:remote_settings, @remote_settings, old_remote_settings)
- send Frame.new(type: :settings, stream_id: 0x00, flags: [:ack])
+ send_immediately Frame.new(type: :settings, stream_id: 0x00, flags: [:ack])
end
def update_local_settings(new_settings)
@@ -216,44 +205,12 @@ module Plum
@local_settings.merge!(new_settings)
@hpack_decoder.limit = @local_settings[:header_table_size]
- update_recv_window_size(@local_settings[:initial_window_size], old_settings[:initial_window_size])
- end
-
- def update_recv_window_size(new_val, old_val)
- initial_window_diff = new_val - old_val
- @streams.values.each do |stream|
- stream.recv_remaining_window += initial_window_diff
- end
- @recv_remaining_window += initial_window_diff
+ update_recv_initial_window_size(@local_settings[:initial_window_size] - old_settings[:initial_window_size])
end
def apply_remote_settings(old_remote_settings)
@hpack_encoder.limit = @remote_settings[:header_table_size]
- update_send_window_size(@remote_settings[:initial_window_size], old_remote_settings[:initial_window_size])
- end
-
- def update_send_window_size(new_val, old_val)
- initial_window_diff = new_val - old_val
- @streams.values.each do |stream|
- stream.send_remaining_window += initial_window_diff
- stream.consume_send_buffer
- end
- @send_remaining_window += initial_window_diff
- consume_send_buffer
- end
-
- def process_window_update(frame)
- raise Plum::ConnectionError.new(:frame_size_error) if frame.length != 4
-
- r_wsi = frame.payload.uint32
- r = r_wsi >> 31
- wsi = r_wsi & ~(1 << 31)
- if wsi == 0
- raise Plum::ConnectionError.new(:protocol_error)
- end
-
- @send_remaining_window += wsi
- consume_send_buffer
+ update_send_initial_window_size(@remote_settings[:initial_window_size] - old_remote_settings[:initial_window_size])
end
def process_ping(frame)
@@ -264,10 +221,10 @@ module Plum
else
on(:ping)
opaque_data = frame.payload
- send Frame.new(type: :ping,
- stream_id: 0,
- flags: [:ack],
- payload: opaque_data)
+ send_immediately Frame.new(type: :ping,
+ stream_id: 0,
+ flags: [:ack],
+ payload: opaque_data)
end
end
@@ -282,18 +239,8 @@ module Plum
stream
end
- def consume_send_buffer
- while frame = @send_buffer.first
- break if frame.length > @send_remaining_window
- @send_buffer.shift
- @send_remaining_window -= frame.length
- send_immediately frame
- end
- end
-
- def send_immediately(frame)
- callback(:send_frame, frame)
- @socket.write(frame.assemble)
+ def local_error
+ ConnectionError
end
end
end
diff --git a/lib/plum/server_connection_helper.rb b/lib/plum/server_connection_helper.rb
index 577f292..d799cf6 100644
--- a/lib/plum/server_connection_helper.rb
+++ b/lib/plum/server_connection_helper.rb
@@ -2,15 +2,6 @@ using Plum::BinaryString
module Plum
module ServerConnectionHelper
- # Increases receiving window size. Sends WINDOW_UPDATE frame to the peer.
- #
- # @param wsi [Integer] The amount to increase receiving window size. The legal range is 1 to 2^32-1.
- def window_update(wsi)
- @recv_remaining_window += wsi
- payload = "".push_uint32(wsi & ~(1 << 31))
- send Frame.new(type: :window_update, stream_id: 0, payload: payload)
- end
-
# Sends local settings to the peer.
#
# @param kwargs [Hash<Symbol, Integer>]
@@ -36,6 +27,5 @@ module Plum
stream_id: 0,
payload: data)
end
-
end
end
diff --git a/lib/plum/stream.rb b/lib/plum/stream.rb
index 98df787..ad0bfe6 100644
--- a/lib/plum/stream.rb
+++ b/lib/plum/stream.rb
@@ -2,11 +2,12 @@ using Plum::BinaryString
module Plum
class Stream
+ include FlowControl
include StreamHelper
attr_reader :id, :state, :connection
attr_reader :weight, :exclusive
- attr_accessor :parent, :recv_remaining_window, :send_remaining_window
+ attr_accessor :parent
def initialize(con, id, state: :idle, weight: 16, parent: nil, exclusive: false)
@connection = con
@@ -14,10 +15,9 @@ module Plum
@state = state
@continuation = []
@callbacks = Hash.new {|hash, key| hash[key] = [] }
- @recv_remaining_window = @connection.local_settings[:initial_window_size]
- @send_remaining_window = @connection.remote_settings[:initial_window_size]
- @send_buffer = []
+ initialize_flow_control(send: @connection.remote_settings[:initial_window_size],
+ recv: @connection.local_settings[:initial_window_size])
update_dependency(weight: weight, parent: parent, exclusive: exclusive)
end
@@ -32,6 +32,7 @@ module Plum
# @private
def process_frame(frame)
validate_received_frame(frame)
+ consume_recv_window(frame)
case frame.type
when :data
@@ -63,43 +64,9 @@ module Plum
@state = :closed
data = "".force_encoding(Encoding::BINARY)
data.push_uint32(error_code)
- send Frame.new(type: :rst_stream,
- stream_id: id,
- payload: data)
- end
-
- # Sends DATA frames remaining unsended due to the flow control. Internal.
- #
- # @private
- def consume_send_buffer
- while frame = @send_buffer.first
- break if frame.length > @send_remaining_window
- @send_buffer.shift
- @send_remaining_window -= frame.length
- send_immediately frame
- end
- end
-
- # Sends frame respecting inner-stream flow control.
- #
- # @param frame [Frame] The frame to be sent.
- def send(frame)
- case frame.type
- when :data
- @send_buffer << frame
- callback(:send_deferred, frame) if @send_remaining_window < frame.length
- consume_send_buffer
- else
- send_immediately frame
- end
- end
-
- # Sends the frame immediately ignoring inner-stream flow control.
- #
- # @param frame [Frame] The frame to be sent.
- def send_immediately(frame)
- callback(:send_frame, frame)
- @connection.send(frame)
+ send_immediately Frame.new(type: :rst_stream,
+ stream_id: id,
+ payload: data)
end
# Registers an event handler to specified event. An event can have multiple handlers.
@@ -114,6 +81,11 @@ module Plum
@callbacks[name].each {|cb| cb.call(*args) }
end
+ def send_immediately(frame)
+ callback(:send_frame, frame)
+ @connection.send(frame)
+ end
+
def update_dependency(weight: nil, parent: nil, exclusive: nil)
raise StreamError.new(:protocol_error, "A stream cannot depend on itself.") if parent == self
@weight = weight unless weight.nil?
@@ -143,11 +115,6 @@ module Plum
raise StreamError.new(:stream_closed)
end
- @recv_remaining_window -= frame.length
- if @recv_remaining_window < 0
- raise StreamError.new(:flow_control_error) # MAY
- end
-
if frame.flags.include?(:padded)
padding_length = frame.payload.uint8(0)
if padding_length >= frame.length
@@ -263,19 +230,8 @@ module Plum
@state = :closed # MUST NOT send RST_STREAM
end
- def process_window_update(frame)
- if frame.length != 4
- raise Plum::ConnectionError.new(:frame_size_error)
- end
- r_wsi = frame.payload.uint32
- r = r_wsi >> 31
- wsi = r_wsi & ~(1 << 31)
- if wsi == 0
- raise Plum::StreamError.new(:protocol_error)
- end
-
- @send_remaining_window += wsi
- consume_send_buffer
+ def local_error
+ StreamError
end
end
end
diff --git a/lib/plum/stream_helper.rb b/lib/plum/stream_helper.rb
index 6dd48d4..5dd2de9 100644
--- a/lib/plum/stream_helper.rb
+++ b/lib/plum/stream_helper.rb
@@ -2,15 +2,6 @@ using Plum::BinaryString
module Plum
module StreamHelper
- # Increases receiving window size. Sends WINDOW_UPDATE frame to the peer.
- #
- # @param wsi [Integer] The amount to increase receiving window size. The legal range is 1 to 2^32-1.
- def window_update(wsi)
- @recv_remaining_window += wsi
- payload = "".push_uint32(wsi & ~(1 << 31))
- send Frame.new(type: :window_update, stream_id: id, payload: payload)
- end
-
# Responds to HTTP request.
#
# @param headers [Hash<String, String>] The response headers.