From c767321a5a02bc8c6baeee10f706a5e8e875f8e7 Mon Sep 17 00:00:00 2001 From: Kazuki Yamaguchi Date: Thu, 6 Aug 2015 22:16:28 +0900 Subject: extract flow control feature into FlowControl module --- lib/plum.rb | 1 + lib/plum/flow_control.rb | 95 ++++++++++++++++++++++++++++++++++++ lib/plum/server_connection.rb | 93 ++++++++--------------------------- lib/plum/server_connection_helper.rb | 10 ---- lib/plum/stream.rb | 74 ++++++---------------------- lib/plum/stream_helper.rb | 9 ---- 6 files changed, 131 insertions(+), 151 deletions(-) create mode 100644 lib/plum/flow_control.rb (limited to 'lib') diff --git a/lib/plum.rb b/lib/plum.rb index cbb02b0..d37a22c 100644 --- a/lib/plum.rb +++ b/lib/plum.rb @@ -10,6 +10,7 @@ require "plum/hpack/decoder" require "plum/hpack/encoder" require "plum/frame_helper" require "plum/frame" +require "plum/flow_control" require "plum/stream_helper" require "plum/stream" require "plum/server_connection_helper" diff --git a/lib/plum/flow_control.rb b/lib/plum/flow_control.rb new file mode 100644 index 0000000..b62b39e --- /dev/null +++ b/lib/plum/flow_control.rb @@ -0,0 +1,95 @@ +using Plum::BinaryString + +module Plum + module FlowControl + attr_reader :send_remaining_window, :recv_remaining_window + + # Sends frame respecting inner-stream flow control. + # + # @param frame [Frame] The frame to be sent. + def send(frame) + case frame.type + when :data + @send_buffer << frame + callback(:send_deferred, frame) if @send_remaining_window < frame.length + consume_send_buffer + else + send_immediately frame + end + end + + # Increases receiving window size. Sends WINDOW_UPDATE frame to the peer. + # + # @param wsi [Integer] The amount to increase receiving window size. The legal range is 1 to 2^32-1. + def window_update(wsi) + @recv_remaining_window += wsi + payload = "".push_uint32(wsi & ~(1 << 31)) + sid = (Stream === self) ? self.id : 0 + send_immediately Frame.new(type: :window_update, stream_id: sid, payload: payload) + end + + protected + def update_send_initial_window_size(diff) + @send_remaining_window += diff + consume_send_buffer + + if ServerConnection === self + @streams.values.each do |stream| + stream.update_send_initial_window_size(diff) + end + end + end + + def update_recv_initial_window_size(diff) + @recv_remaining_window += diff + if ServerConnection === self + @streams.values.each do |stream| + stream.update_recv_initial_window_size(diff) + end + end + end + + private + def initialize_flow_control(send:, recv:) + @send_buffer = [] + @send_remaining_window = send + @recv_remaining_window = recv + end + + def consume_recv_window(frame) + case frame.type + when :data + @recv_remaining_window -= frame.length + if @recv_remaining_window < 0 + raise local_error.new(:flow_control_error) + end + end + end + + def consume_send_buffer + while frame = @send_buffer.first + break if frame.length > @send_remaining_window + @send_buffer.shift + @send_remaining_window -= frame.length + send_immediately frame + end + end + + def process_window_update(frame) + if frame.length != 4 + raise Plum::ConnectionError.new(:frame_size_error) + end + + r_wsi = frame.payload.uint32 + r = r_wsi >> 31 + wsi = r_wsi & ~(1 << 31) + + if wsi == 0 + raise local_error.new(:protocol_error) + end + + @send_remaining_window += wsi + consume_send_buffer + end + end +end diff --git a/lib/plum/server_connection.rb b/lib/plum/server_connection.rb index 33d4d01..18df556 100644 --- a/lib/plum/server_connection.rb +++ b/lib/plum/server_connection.rb @@ -2,6 +2,7 @@ using Plum::BinaryString module Plum class ServerConnection + include FlowControl include ServerConnectionHelper CLIENT_CONNECTION_PREFACE = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" @@ -18,7 +19,6 @@ module Plum attr_reader :hpack_encoder, :hpack_decoder attr_reader :local_settings, :remote_settings attr_reader :state, :socket, :streams - attr_reader :recv_remaining_window, :send_remaining_window def initialize(socket, local_settings = {}) @socket = socket @@ -30,9 +30,8 @@ module Plum @state = :waiting_connetion_preface @hpack_decoder = HPACK::Decoder.new(@local_settings[:header_table_size]) @hpack_encoder = HPACK::Encoder.new(@remote_settings[:header_table_size]) - @recv_remaining_window = @local_settings[:initial_window_size] - @send_remaining_window = @remote_settings[:initial_window_size] - @send_buffer = [] + initialize_flow_control(send: @remote_settings[:initial_window_size], + recv: @local_settings[:initial_window_size]) end # Registers an event handler to specified event. An event can have multiple handlers. @@ -42,20 +41,6 @@ module Plum @callbacks[name] << blk end - # Sends frame respecting inner-stream flow control. - # - # @param frame [Frame] The frame to be sent. - def send(frame) - case frame.type - when :data - @send_buffer << frame - callback(:send_deferred, frame) if @send_remaining_window < frame.length - consume_send_buffer - else - send_immediately frame - end - end - # Starts communication with the peer. It blocks until the socket is closed, or reaches EOF. def start settings(@local_settings) @@ -76,10 +61,9 @@ module Plum data.push_uint32((last_id || 0) & ~(1 << 31)) data.push_uint32(error_code) data.push("") # debug message - error = Frame.new(type: :goaway, - stream_id: 0, - payload: data) - send(error) + send_immediately Frame.new(type: :goaway, + stream_id: 0, + payload: data) # TODO: server MAY wait streams @socket.close end @@ -127,6 +111,11 @@ module Plum @callbacks[name].each {|cb| cb.call(*args) } end + def send_immediately(frame) + callback(:send_frame, frame) + @socket.write(frame.assemble) + end + def validate_received_frame(frame) case @state when :waiting_settings @@ -208,7 +197,7 @@ module Plum callback(:remote_settings, @remote_settings, old_remote_settings) - send Frame.new(type: :settings, stream_id: 0x00, flags: [:ack]) + send_immediately Frame.new(type: :settings, stream_id: 0x00, flags: [:ack]) end def update_local_settings(new_settings) @@ -216,44 +205,12 @@ module Plum @local_settings.merge!(new_settings) @hpack_decoder.limit = @local_settings[:header_table_size] - update_recv_window_size(@local_settings[:initial_window_size], old_settings[:initial_window_size]) - end - - def update_recv_window_size(new_val, old_val) - initial_window_diff = new_val - old_val - @streams.values.each do |stream| - stream.recv_remaining_window += initial_window_diff - end - @recv_remaining_window += initial_window_diff + update_recv_initial_window_size(@local_settings[:initial_window_size] - old_settings[:initial_window_size]) end def apply_remote_settings(old_remote_settings) @hpack_encoder.limit = @remote_settings[:header_table_size] - update_send_window_size(@remote_settings[:initial_window_size], old_remote_settings[:initial_window_size]) - end - - def update_send_window_size(new_val, old_val) - initial_window_diff = new_val - old_val - @streams.values.each do |stream| - stream.send_remaining_window += initial_window_diff - stream.consume_send_buffer - end - @send_remaining_window += initial_window_diff - consume_send_buffer - end - - def process_window_update(frame) - raise Plum::ConnectionError.new(:frame_size_error) if frame.length != 4 - - r_wsi = frame.payload.uint32 - r = r_wsi >> 31 - wsi = r_wsi & ~(1 << 31) - if wsi == 0 - raise Plum::ConnectionError.new(:protocol_error) - end - - @send_remaining_window += wsi - consume_send_buffer + update_send_initial_window_size(@remote_settings[:initial_window_size] - old_remote_settings[:initial_window_size]) end def process_ping(frame) @@ -264,10 +221,10 @@ module Plum else on(:ping) opaque_data = frame.payload - send Frame.new(type: :ping, - stream_id: 0, - flags: [:ack], - payload: opaque_data) + send_immediately Frame.new(type: :ping, + stream_id: 0, + flags: [:ack], + payload: opaque_data) end end @@ -282,18 +239,8 @@ module Plum stream end - def consume_send_buffer - while frame = @send_buffer.first - break if frame.length > @send_remaining_window - @send_buffer.shift - @send_remaining_window -= frame.length - send_immediately frame - end - end - - def send_immediately(frame) - callback(:send_frame, frame) - @socket.write(frame.assemble) + def local_error + ConnectionError end end end diff --git a/lib/plum/server_connection_helper.rb b/lib/plum/server_connection_helper.rb index 577f292..d799cf6 100644 --- a/lib/plum/server_connection_helper.rb +++ b/lib/plum/server_connection_helper.rb @@ -2,15 +2,6 @@ using Plum::BinaryString module Plum module ServerConnectionHelper - # Increases receiving window size. Sends WINDOW_UPDATE frame to the peer. - # - # @param wsi [Integer] The amount to increase receiving window size. The legal range is 1 to 2^32-1. - def window_update(wsi) - @recv_remaining_window += wsi - payload = "".push_uint32(wsi & ~(1 << 31)) - send Frame.new(type: :window_update, stream_id: 0, payload: payload) - end - # Sends local settings to the peer. # # @param kwargs [Hash] @@ -36,6 +27,5 @@ module Plum stream_id: 0, payload: data) end - end end diff --git a/lib/plum/stream.rb b/lib/plum/stream.rb index 98df787..ad0bfe6 100644 --- a/lib/plum/stream.rb +++ b/lib/plum/stream.rb @@ -2,11 +2,12 @@ using Plum::BinaryString module Plum class Stream + include FlowControl include StreamHelper attr_reader :id, :state, :connection attr_reader :weight, :exclusive - attr_accessor :parent, :recv_remaining_window, :send_remaining_window + attr_accessor :parent def initialize(con, id, state: :idle, weight: 16, parent: nil, exclusive: false) @connection = con @@ -14,10 +15,9 @@ module Plum @state = state @continuation = [] @callbacks = Hash.new {|hash, key| hash[key] = [] } - @recv_remaining_window = @connection.local_settings[:initial_window_size] - @send_remaining_window = @connection.remote_settings[:initial_window_size] - @send_buffer = [] + initialize_flow_control(send: @connection.remote_settings[:initial_window_size], + recv: @connection.local_settings[:initial_window_size]) update_dependency(weight: weight, parent: parent, exclusive: exclusive) end @@ -32,6 +32,7 @@ module Plum # @private def process_frame(frame) validate_received_frame(frame) + consume_recv_window(frame) case frame.type when :data @@ -63,43 +64,9 @@ module Plum @state = :closed data = "".force_encoding(Encoding::BINARY) data.push_uint32(error_code) - send Frame.new(type: :rst_stream, - stream_id: id, - payload: data) - end - - # Sends DATA frames remaining unsended due to the flow control. Internal. - # - # @private - def consume_send_buffer - while frame = @send_buffer.first - break if frame.length > @send_remaining_window - @send_buffer.shift - @send_remaining_window -= frame.length - send_immediately frame - end - end - - # Sends frame respecting inner-stream flow control. - # - # @param frame [Frame] The frame to be sent. - def send(frame) - case frame.type - when :data - @send_buffer << frame - callback(:send_deferred, frame) if @send_remaining_window < frame.length - consume_send_buffer - else - send_immediately frame - end - end - - # Sends the frame immediately ignoring inner-stream flow control. - # - # @param frame [Frame] The frame to be sent. - def send_immediately(frame) - callback(:send_frame, frame) - @connection.send(frame) + send_immediately Frame.new(type: :rst_stream, + stream_id: id, + payload: data) end # Registers an event handler to specified event. An event can have multiple handlers. @@ -114,6 +81,11 @@ module Plum @callbacks[name].each {|cb| cb.call(*args) } end + def send_immediately(frame) + callback(:send_frame, frame) + @connection.send(frame) + end + def update_dependency(weight: nil, parent: nil, exclusive: nil) raise StreamError.new(:protocol_error, "A stream cannot depend on itself.") if parent == self @weight = weight unless weight.nil? @@ -143,11 +115,6 @@ module Plum raise StreamError.new(:stream_closed) end - @recv_remaining_window -= frame.length - if @recv_remaining_window < 0 - raise StreamError.new(:flow_control_error) # MAY - end - if frame.flags.include?(:padded) padding_length = frame.payload.uint8(0) if padding_length >= frame.length @@ -263,19 +230,8 @@ module Plum @state = :closed # MUST NOT send RST_STREAM end - def process_window_update(frame) - if frame.length != 4 - raise Plum::ConnectionError.new(:frame_size_error) - end - r_wsi = frame.payload.uint32 - r = r_wsi >> 31 - wsi = r_wsi & ~(1 << 31) - if wsi == 0 - raise Plum::StreamError.new(:protocol_error) - end - - @send_remaining_window += wsi - consume_send_buffer + def local_error + StreamError end end end diff --git a/lib/plum/stream_helper.rb b/lib/plum/stream_helper.rb index 6dd48d4..5dd2de9 100644 --- a/lib/plum/stream_helper.rb +++ b/lib/plum/stream_helper.rb @@ -2,15 +2,6 @@ using Plum::BinaryString module Plum module StreamHelper - # Increases receiving window size. Sends WINDOW_UPDATE frame to the peer. - # - # @param wsi [Integer] The amount to increase receiving window size. The legal range is 1 to 2^32-1. - def window_update(wsi) - @recv_remaining_window += wsi - payload = "".push_uint32(wsi & ~(1 << 31)) - send Frame.new(type: :window_update, stream_id: id, payload: payload) - end - # Responds to HTTP request. # # @param headers [Hash] The response headers. -- cgit v1.2.3