aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorKazuki Yamaguchi <k@rhe.jp>2015-11-11 14:16:47 +0900
committerKazuki Yamaguchi <k@rhe.jp>2015-11-11 14:16:47 +0900
commit3466b54b14cc2d4ce3d1db6af1bc282678bab443 (patch)
treec06525218b657b975849145baf0b5a8f2c0eeb69
parentd2ff97498001cc672bdf00f35e3bcb29465aeddd (diff)
downloadplum-3466b54b14cc2d4ce3d1db6af1bc282678bab443.tar.gz
connection: split large frame in #send_immediately
-rw-r--r--lib/plum/binary_string.rb14
-rw-r--r--lib/plum/connection.rb9
-rw-r--r--lib/plum/frame_utils.rb47
-rw-r--r--lib/plum/server/http_connection.rb5
-rw-r--r--lib/plum/stream.rb1
-rw-r--r--lib/plum/stream_utils.rb21
-rw-r--r--test/plum/test_binary_string.rb6
-rw-r--r--test/plum/test_connection.rb11
-rw-r--r--test/plum/test_frame_utils.rb15
-rw-r--r--test/utils/server.rb7
10 files changed, 80 insertions, 56 deletions
diff --git a/lib/plum/binary_string.rb b/lib/plum/binary_string.rb
index 400c57b..2b50b38 100644
--- a/lib/plum/binary_string.rb
+++ b/lib/plum/binary_string.rb
@@ -70,6 +70,20 @@ module Plum
# I want to write `enum_for(__method__, n)`!
end
end
+
+ # Splits this String into chunks.
+ # @param n [Integer] max chunk bytesize
+ # @return [Array<String>] the slices
+ def chunk(n)
+ res = []
+ pos = 0
+ lim = bytesize
+ while pos < lim
+ res << byteslice(pos, n)
+ pos += n
+ end
+ res
+ end
end
end
end
diff --git a/lib/plum/connection.rb b/lib/plum/connection.rb
index f158560..73f6206 100644
--- a/lib/plum/connection.rb
+++ b/lib/plum/connection.rb
@@ -92,7 +92,14 @@ module Plum
def send_immediately(frame)
callback(:send_frame, frame)
- @writer.call(frame.assemble)
+
+ if frame.length <= @remote_settings[:max_frame_size]
+ @writer.call(frame.assemble)
+ else
+ frame.split(@remote_settings[:max_frame_size]) { |splitted|
+ @writer.call(splitted.assemble)
+ }
+ end
end
def validate_received_frame(frame)
diff --git a/lib/plum/frame_utils.rb b/lib/plum/frame_utils.rb
index 20c79de..447c154 100644
--- a/lib/plum/frame_utils.rb
+++ b/lib/plum/frame_utils.rb
@@ -3,33 +3,28 @@ using Plum::BinaryString
module Plum
module FrameUtils
- # Splits the DATA frame into multiple frames if the payload size exceeds max size.
+ # Splits this frame into multiple frames not to exceed MAX_FRAME_SIZE.
# @param max [Integer] The maximum size of a frame payload.
- # @return [Array<Frame>] The splitted frames.
- def split_data(max)
- return [self] if self.length <= max
- raise "Frame type must be DATA" unless self.type == :data
-
- fragments = self.payload.each_byteslice(max).to_a
- frames = fragments.map {|fragment| Frame.new(type: :data, flags: [], stream_id: self.stream_id, payload: fragment) }
- frames.first.flags = self.flags - [:end_stream]
- frames.last.flags = self.flags & [:end_stream]
- frames
- end
-
- # Splits the HEADERS or PUSH_PROMISE frame into multiple frames if the payload size exceeds max size.
- # @param max [Integer] The maximum size of a frame payload.
- # @return [Array<Frame>] The splitted frames.
- def split_headers(max)
- return [self] if self.length <= max
- raise "Frame type must be HEADERS or PUSH_PROMISE" unless [:headers, :push_promise].include?(self.type)
-
- fragments = self.payload.each_byteslice(max).to_a
- frames = fragments.map {|fragment| Frame.new(type: :continuation, flags: [], stream_id: self.stream_id, payload: fragment) }
- frames.first.type_value = self.type_value
- frames.first.flags = self.flags - [:end_headers]
- frames.last.flags = self.flags & [:end_headers]
- frames
+ # @yield [Frame] The splitted frames.
+ def split(max)
+ return yield self if @length <= max
+ first, *mid, last = @payload.chunk(max)
+ case type
+ when :data
+ yield Frame.new(type_value: 0, stream_id: @stream_id, payload: first, flags_value: @flags_value & ~1)
+ mid.each { |slice|
+ yield Frame.new(type_value: 0, stream_id: @stream_id, payload: slice, flags_value: 0)
+ }
+ yield Frame.new(type_value: 0, stream_id: @stream_id, payload: last, flags_value: @flags_value & 1)
+ when :headers, :push_promise
+ yield Frame.new(type_value: @type_value, stream_id: @stream_id, payload: first, flags_value: @flags_value & ~4)
+ mid.each { |slice|
+ yield Frame.new(type: :continuation, stream_id: @stream_id, payload: slice, flags_value: 0)
+ }
+ yield Frame.new(type: :continuation, stream_id: @stream_id, payload: last, flags_value: @flags_value & 4)
+ else
+ raise NotImplementedError.new("frame split of frame with type #{type} is not supported")
+ end
end
# Parses SETTINGS frame payload. Ignores unknown settings type (see RFC7540 6.5.2).
diff --git a/lib/plum/server/http_connection.rb b/lib/plum/server/http_connection.rb
index 74b7ad8..116fdcb 100644
--- a/lib/plum/server/http_connection.rb
+++ b/lib/plum/server/http_connection.rb
@@ -77,9 +77,8 @@ module Plum
":authority" => @_headers["host"] })
.reject {|n, v| ["connection", "http2-settings", "upgrade", "host"].include?(n) }
- headers_s = Frame.headers(1, encoder.encode(headers), :end_headers).split_headers(max_frame_size) # stream ID is 1
- data_s = Frame.data(1, @_body, :end_stream).split_data(max_frame_size)
- (headers_s + data_s).each {|frag| stream.receive_frame(frag) }
+ stream.receive_frame Frame.headers(1, encoder.encode(headers), :end_headers)
+ stream.receive_frame Frame.data(1, @_body, :end_stream)
end
end
end
diff --git a/lib/plum/stream.rb b/lib/plum/stream.rb
index 87e228e..04190fc 100644
--- a/lib/plum/stream.rb
+++ b/lib/plum/stream.rb
@@ -58,7 +58,6 @@ module Plum
end
# Closes this stream. Sends RST_STREAM frame to the peer.
- # @param error_type [Symbol] The error type to be contained in the RST_STREAM frame.
def close
@state = :closed
callback(:close)
diff --git a/lib/plum/stream_utils.rb b/lib/plum/stream_utils.rb
index e344528..ff14c7a 100644
--- a/lib/plum/stream_utils.rb
+++ b/lib/plum/stream_utils.rb
@@ -9,10 +9,8 @@ module Plum
def promise(headers)
stream = @connection.reserve_stream(weight: self.weight + 1, parent: self)
encoded = @connection.hpack_encoder.encode(headers)
- original = Frame.push_promise(id, stream.id, encoded, :end_headers)
- original.split_headers(@connection.remote_settings[:max_frame_size]).each do |frame|
- send frame
- end
+ frame = Frame.push_promise(id, stream.id, encoded, :end_headers)
+ send frame
stream
end
@@ -22,10 +20,8 @@ module Plum
def send_headers(headers, end_stream:)
max = @connection.remote_settings[:max_frame_size]
encoded = @connection.hpack_encoder.encode(headers)
- original_frame = Frame.headers(id, encoded, :end_headers, (end_stream && :end_stream || nil))
- original_frame.split_headers(max).each do |frame|
- send frame
- end
+ frame = Frame.headers(id, encoded, :end_headers, (end_stream && :end_stream || nil))
+ send frame
@state = :half_closed_local if end_stream
end
@@ -35,14 +31,13 @@ module Plum
def send_data(data, end_stream: true)
max = @connection.remote_settings[:max_frame_size]
if data.is_a?(IO)
- while !data.eof? && fragment = data.readpartial(max)
+ until data.eof?
+ fragment = data.readpartial(max)
send Frame.data(id, fragment, (end_stream && data.eof? && :end_stream))
end
else
- original = Frame.data(id, data, (end_stream && :end_stream))
- original.split_data(max).each do |frame|
- send frame
- end
+ frame = Frame.data(id, data, (end_stream && :end_stream))
+ send frame
end
@state = :half_closed_local if end_stream
end
diff --git a/test/plum/test_binary_string.rb b/test/plum/test_binary_string.rb
index 403248b..266b548 100644
--- a/test/plum/test_binary_string.rb
+++ b/test/plum/test_binary_string.rb
@@ -61,4 +61,10 @@ class BinaryStringTest < Minitest::Test
ret = string.each_byteslice(3)
assert_equal(["123", "456", "78"], ret.to_a)
end
+
+ def test_chunk
+ string = "12345678"
+ ret = string.chunk(3)
+ assert_equal(["123", "456", "78"], ret)
+ end
end
diff --git a/test/plum/test_connection.rb b/test/plum/test_connection.rb
index b1a4803..799c3d0 100644
--- a/test/plum/test_connection.rb
+++ b/test/plum/test_connection.rb
@@ -100,4 +100,15 @@ class ConnectionTest < Minitest::Test
}
}
end
+
+ def test_send_immediately_split
+ io = StringIO.new
+ con = Connection.new(io.method(:write))
+ fs = parse_frames(io) {
+ con.__send__(:send_immediately, Frame.new(type: :data, stream_id: 1, payload: "a"*16385))
+ }
+ assert_equal(2, fs.size)
+ assert_equal(16384, fs.first.length)
+ assert_equal(1, fs.last.length)
+ end
end
diff --git a/test/plum/test_frame_utils.rb b/test/plum/test_frame_utils.rb
index 4564e9a..7ad869f 100644
--- a/test/plum/test_frame_utils.rb
+++ b/test/plum/test_frame_utils.rb
@@ -3,30 +3,29 @@ require "test_helper"
class FrameUtilsTest < Minitest::Test
def test_frame_enough_short
frame = Frame.new(type: :data, stream_id: 1, payload: "123")
- ret = frame.split_data(3)
+ ret = frame.to_enum(:split, 3).to_a
assert_equal(1, ret.size)
assert_equal("123", ret.first.payload)
end
def test_frame_unknown
frame = Frame.new(type: :settings, stream_id: 1, payload: "123")
- assert_raises { frame.split_data(2) }
- assert_raises { frame.split_headers(2) }
+ assert_raises(NotImplementedError) { frame.split(2) }
end
def test_frame_data
frame = Frame.new(type: :data, flags: [:end_stream], stream_id: 1, payload: "12345")
- ret = frame.split_data(3)
- assert_equal(2, ret.size)
- assert_equal("123", ret.first.payload)
+ ret = frame.to_enum(:split, 2).to_a
+ assert_equal(3, ret.size)
+ assert_equal("12", ret.first.payload)
assert_equal([], ret.first.flags)
- assert_equal("45", ret.last.payload)
+ assert_equal("5", ret.last.payload)
assert_equal([:end_stream], ret.last.flags)
end
def test_frame_headers
frame = Frame.new(type: :headers, flags: [:priority, :end_stream, :end_headers], stream_id: 1, payload: "1234567")
- ret = frame.split_headers(3)
+ ret = frame.to_enum(:split, 3).to_a
assert_equal(3, ret.size)
assert_equal("123", ret[0].payload)
assert_equal([:end_stream, :priority], ret[0].flags)
diff --git a/test/utils/server.rb b/test/utils/server.rb
index be2aba6..82ed9ad 100644
--- a/test/utils/server.rb
+++ b/test/utils/server.rb
@@ -39,8 +39,7 @@ module ServerUtils
frames
end
- def capture_frames(con = nil, &blk)
- io = (con || @_con).sock
+ def parse_frames(io, &blk)
pos = io.string.bytesize
blk.call
resp = io.string.byteslice(pos, io.string.bytesize - pos).force_encoding(Encoding::BINARY)
@@ -51,8 +50,8 @@ module ServerUtils
frames
end
- def capture_frame(con = nil, &blk)
- frames = capture_frames(con, &blk)
+ def parse_frame(io, &blk)
+ frames = capture_frames(io, &blk)
assert_equal(1, frames.size, "Supplied block sent no frames or more than 1 frame")
frames.first
end