diff options
author | Kazuki Yamaguchi <k@rhe.jp> | 2015-11-11 14:16:47 +0900 |
---|---|---|
committer | Kazuki Yamaguchi <k@rhe.jp> | 2015-11-11 14:16:47 +0900 |
commit | 3466b54b14cc2d4ce3d1db6af1bc282678bab443 (patch) | |
tree | c06525218b657b975849145baf0b5a8f2c0eeb69 | |
parent | d2ff97498001cc672bdf00f35e3bcb29465aeddd (diff) | |
download | plum-3466b54b14cc2d4ce3d1db6af1bc282678bab443.tar.gz |
connection: split large frame in #send_immediately
-rw-r--r-- | lib/plum/binary_string.rb | 14 | ||||
-rw-r--r-- | lib/plum/connection.rb | 9 | ||||
-rw-r--r-- | lib/plum/frame_utils.rb | 47 | ||||
-rw-r--r-- | lib/plum/server/http_connection.rb | 5 | ||||
-rw-r--r-- | lib/plum/stream.rb | 1 | ||||
-rw-r--r-- | lib/plum/stream_utils.rb | 21 | ||||
-rw-r--r-- | test/plum/test_binary_string.rb | 6 | ||||
-rw-r--r-- | test/plum/test_connection.rb | 11 | ||||
-rw-r--r-- | test/plum/test_frame_utils.rb | 15 | ||||
-rw-r--r-- | test/utils/server.rb | 7 |
10 files changed, 80 insertions, 56 deletions
diff --git a/lib/plum/binary_string.rb b/lib/plum/binary_string.rb index 400c57b..2b50b38 100644 --- a/lib/plum/binary_string.rb +++ b/lib/plum/binary_string.rb @@ -70,6 +70,20 @@ module Plum # I want to write `enum_for(__method__, n)`! end end + + # Splits this String into chunks. + # @param n [Integer] max chunk bytesize + # @return [Array<String>] the slices + def chunk(n) + res = [] + pos = 0 + lim = bytesize + while pos < lim + res << byteslice(pos, n) + pos += n + end + res + end end end end diff --git a/lib/plum/connection.rb b/lib/plum/connection.rb index f158560..73f6206 100644 --- a/lib/plum/connection.rb +++ b/lib/plum/connection.rb @@ -92,7 +92,14 @@ module Plum def send_immediately(frame) callback(:send_frame, frame) - @writer.call(frame.assemble) + + if frame.length <= @remote_settings[:max_frame_size] + @writer.call(frame.assemble) + else + frame.split(@remote_settings[:max_frame_size]) { |splitted| + @writer.call(splitted.assemble) + } + end end def validate_received_frame(frame) diff --git a/lib/plum/frame_utils.rb b/lib/plum/frame_utils.rb index 20c79de..447c154 100644 --- a/lib/plum/frame_utils.rb +++ b/lib/plum/frame_utils.rb @@ -3,33 +3,28 @@ using Plum::BinaryString module Plum module FrameUtils - # Splits the DATA frame into multiple frames if the payload size exceeds max size. + # Splits this frame into multiple frames not to exceed MAX_FRAME_SIZE. # @param max [Integer] The maximum size of a frame payload. - # @return [Array<Frame>] The splitted frames. - def split_data(max) - return [self] if self.length <= max - raise "Frame type must be DATA" unless self.type == :data - - fragments = self.payload.each_byteslice(max).to_a - frames = fragments.map {|fragment| Frame.new(type: :data, flags: [], stream_id: self.stream_id, payload: fragment) } - frames.first.flags = self.flags - [:end_stream] - frames.last.flags = self.flags & [:end_stream] - frames - end - - # Splits the HEADERS or PUSH_PROMISE frame into multiple frames if the payload size exceeds max size. - # @param max [Integer] The maximum size of a frame payload. - # @return [Array<Frame>] The splitted frames. - def split_headers(max) - return [self] if self.length <= max - raise "Frame type must be HEADERS or PUSH_PROMISE" unless [:headers, :push_promise].include?(self.type) - - fragments = self.payload.each_byteslice(max).to_a - frames = fragments.map {|fragment| Frame.new(type: :continuation, flags: [], stream_id: self.stream_id, payload: fragment) } - frames.first.type_value = self.type_value - frames.first.flags = self.flags - [:end_headers] - frames.last.flags = self.flags & [:end_headers] - frames + # @yield [Frame] The splitted frames. + def split(max) + return yield self if @length <= max + first, *mid, last = @payload.chunk(max) + case type + when :data + yield Frame.new(type_value: 0, stream_id: @stream_id, payload: first, flags_value: @flags_value & ~1) + mid.each { |slice| + yield Frame.new(type_value: 0, stream_id: @stream_id, payload: slice, flags_value: 0) + } + yield Frame.new(type_value: 0, stream_id: @stream_id, payload: last, flags_value: @flags_value & 1) + when :headers, :push_promise + yield Frame.new(type_value: @type_value, stream_id: @stream_id, payload: first, flags_value: @flags_value & ~4) + mid.each { |slice| + yield Frame.new(type: :continuation, stream_id: @stream_id, payload: slice, flags_value: 0) + } + yield Frame.new(type: :continuation, stream_id: @stream_id, payload: last, flags_value: @flags_value & 4) + else + raise NotImplementedError.new("frame split of frame with type #{type} is not supported") + end end # Parses SETTINGS frame payload. Ignores unknown settings type (see RFC7540 6.5.2). diff --git a/lib/plum/server/http_connection.rb b/lib/plum/server/http_connection.rb index 74b7ad8..116fdcb 100644 --- a/lib/plum/server/http_connection.rb +++ b/lib/plum/server/http_connection.rb @@ -77,9 +77,8 @@ module Plum ":authority" => @_headers["host"] }) .reject {|n, v| ["connection", "http2-settings", "upgrade", "host"].include?(n) } - headers_s = Frame.headers(1, encoder.encode(headers), :end_headers).split_headers(max_frame_size) # stream ID is 1 - data_s = Frame.data(1, @_body, :end_stream).split_data(max_frame_size) - (headers_s + data_s).each {|frag| stream.receive_frame(frag) } + stream.receive_frame Frame.headers(1, encoder.encode(headers), :end_headers) + stream.receive_frame Frame.data(1, @_body, :end_stream) end end end diff --git a/lib/plum/stream.rb b/lib/plum/stream.rb index 87e228e..04190fc 100644 --- a/lib/plum/stream.rb +++ b/lib/plum/stream.rb @@ -58,7 +58,6 @@ module Plum end # Closes this stream. Sends RST_STREAM frame to the peer. - # @param error_type [Symbol] The error type to be contained in the RST_STREAM frame. def close @state = :closed callback(:close) diff --git a/lib/plum/stream_utils.rb b/lib/plum/stream_utils.rb index e344528..ff14c7a 100644 --- a/lib/plum/stream_utils.rb +++ b/lib/plum/stream_utils.rb @@ -9,10 +9,8 @@ module Plum def promise(headers) stream = @connection.reserve_stream(weight: self.weight + 1, parent: self) encoded = @connection.hpack_encoder.encode(headers) - original = Frame.push_promise(id, stream.id, encoded, :end_headers) - original.split_headers(@connection.remote_settings[:max_frame_size]).each do |frame| - send frame - end + frame = Frame.push_promise(id, stream.id, encoded, :end_headers) + send frame stream end @@ -22,10 +20,8 @@ module Plum def send_headers(headers, end_stream:) max = @connection.remote_settings[:max_frame_size] encoded = @connection.hpack_encoder.encode(headers) - original_frame = Frame.headers(id, encoded, :end_headers, (end_stream && :end_stream || nil)) - original_frame.split_headers(max).each do |frame| - send frame - end + frame = Frame.headers(id, encoded, :end_headers, (end_stream && :end_stream || nil)) + send frame @state = :half_closed_local if end_stream end @@ -35,14 +31,13 @@ module Plum def send_data(data, end_stream: true) max = @connection.remote_settings[:max_frame_size] if data.is_a?(IO) - while !data.eof? && fragment = data.readpartial(max) + until data.eof? + fragment = data.readpartial(max) send Frame.data(id, fragment, (end_stream && data.eof? && :end_stream)) end else - original = Frame.data(id, data, (end_stream && :end_stream)) - original.split_data(max).each do |frame| - send frame - end + frame = Frame.data(id, data, (end_stream && :end_stream)) + send frame end @state = :half_closed_local if end_stream end diff --git a/test/plum/test_binary_string.rb b/test/plum/test_binary_string.rb index 403248b..266b548 100644 --- a/test/plum/test_binary_string.rb +++ b/test/plum/test_binary_string.rb @@ -61,4 +61,10 @@ class BinaryStringTest < Minitest::Test ret = string.each_byteslice(3) assert_equal(["123", "456", "78"], ret.to_a) end + + def test_chunk + string = "12345678" + ret = string.chunk(3) + assert_equal(["123", "456", "78"], ret) + end end diff --git a/test/plum/test_connection.rb b/test/plum/test_connection.rb index b1a4803..799c3d0 100644 --- a/test/plum/test_connection.rb +++ b/test/plum/test_connection.rb @@ -100,4 +100,15 @@ class ConnectionTest < Minitest::Test } } end + + def test_send_immediately_split + io = StringIO.new + con = Connection.new(io.method(:write)) + fs = parse_frames(io) { + con.__send__(:send_immediately, Frame.new(type: :data, stream_id: 1, payload: "a"*16385)) + } + assert_equal(2, fs.size) + assert_equal(16384, fs.first.length) + assert_equal(1, fs.last.length) + end end diff --git a/test/plum/test_frame_utils.rb b/test/plum/test_frame_utils.rb index 4564e9a..7ad869f 100644 --- a/test/plum/test_frame_utils.rb +++ b/test/plum/test_frame_utils.rb @@ -3,30 +3,29 @@ require "test_helper" class FrameUtilsTest < Minitest::Test def test_frame_enough_short frame = Frame.new(type: :data, stream_id: 1, payload: "123") - ret = frame.split_data(3) + ret = frame.to_enum(:split, 3).to_a assert_equal(1, ret.size) assert_equal("123", ret.first.payload) end def test_frame_unknown frame = Frame.new(type: :settings, stream_id: 1, payload: "123") - assert_raises { frame.split_data(2) } - assert_raises { frame.split_headers(2) } + assert_raises(NotImplementedError) { frame.split(2) } end def test_frame_data frame = Frame.new(type: :data, flags: [:end_stream], stream_id: 1, payload: "12345") - ret = frame.split_data(3) - assert_equal(2, ret.size) - assert_equal("123", ret.first.payload) + ret = frame.to_enum(:split, 2).to_a + assert_equal(3, ret.size) + assert_equal("12", ret.first.payload) assert_equal([], ret.first.flags) - assert_equal("45", ret.last.payload) + assert_equal("5", ret.last.payload) assert_equal([:end_stream], ret.last.flags) end def test_frame_headers frame = Frame.new(type: :headers, flags: [:priority, :end_stream, :end_headers], stream_id: 1, payload: "1234567") - ret = frame.split_headers(3) + ret = frame.to_enum(:split, 3).to_a assert_equal(3, ret.size) assert_equal("123", ret[0].payload) assert_equal([:end_stream, :priority], ret[0].flags) diff --git a/test/utils/server.rb b/test/utils/server.rb index be2aba6..82ed9ad 100644 --- a/test/utils/server.rb +++ b/test/utils/server.rb @@ -39,8 +39,7 @@ module ServerUtils frames end - def capture_frames(con = nil, &blk) - io = (con || @_con).sock + def parse_frames(io, &blk) pos = io.string.bytesize blk.call resp = io.string.byteslice(pos, io.string.bytesize - pos).force_encoding(Encoding::BINARY) @@ -51,8 +50,8 @@ module ServerUtils frames end - def capture_frame(con = nil, &blk) - frames = capture_frames(con, &blk) + def parse_frame(io, &blk) + frames = capture_frames(io, &blk) assert_equal(1, frames.size, "Supplied block sent no frames or more than 1 frame") frames.first end |