aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorKazuki Yamaguchi <k@rhe.jp>2016-05-08 17:01:44 +0900
committerKazuki Yamaguchi <k@rhe.jp>2016-05-08 17:01:44 +0900
commit26ec6b88a8e078499835851a0310eba486475b7b (patch)
treed36913473ab45a5cdebf172697e21f32dca074f0
parent60e7ce6f29bc40515074df712670bf61094549dd (diff)
downloadplum-26ec6b88a8e078499835851a0310eba486475b7b.tar.gz
merge *_{utils,factory}
This improves readability a bit.
-rw-r--r--lib/plum.rb4
-rw-r--r--lib/plum/connection.rb32
-rw-r--r--lib/plum/connection_utils.rb42
-rw-r--r--lib/plum/frame.rb130
-rw-r--r--lib/plum/frame_factory.rb97
-rw-r--r--lib/plum/frame_utils.rb44
-rw-r--r--lib/plum/stream.rb38
-rw-r--r--lib/plum/stream_utils.rb44
8 files changed, 195 insertions, 236 deletions
diff --git a/lib/plum.rb b/lib/plum.rb
index 9452d8f..3220bbc 100644
--- a/lib/plum.rb
+++ b/lib/plum.rb
@@ -12,13 +12,9 @@ require "plum/hpack/huffman"
require "plum/hpack/context"
require "plum/hpack/decoder"
require "plum/hpack/encoder"
-require "plum/frame_utils"
-require "plum/frame_factory"
require "plum/frame"
require "plum/flow_control"
-require "plum/connection_utils"
require "plum/connection"
-require "plum/stream_utils"
require "plum/stream"
require "plum/server/connection"
require "plum/server/ssl_socket_connection"
diff --git a/lib/plum/connection.rb b/lib/plum/connection.rb
index 0ba5989..1eac16e 100644
--- a/lib/plum/connection.rb
+++ b/lib/plum/connection.rb
@@ -6,7 +6,6 @@ module Plum
class Connection
include EventEmitter
include FlowControl
- include ConnectionUtils
CLIENT_CONNECTION_PREFACE = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
@@ -83,6 +82,37 @@ module Plum
stream
end
+ # Sends local settings to the peer.
+ # @param new_settings [Hash<Symbol, Integer>]
+ def settings(**new_settings)
+ send_immediately Frame.settings(**new_settings)
+
+ old_settings = @local_settings.dup
+ @local_settings.merge!(new_settings)
+
+ @hpack_decoder.limit = @local_settings[:header_table_size]
+ update_recv_initial_window_size(@local_settings[:initial_window_size] - old_settings[:initial_window_size])
+ end
+
+ # Sends a PING frame to the peer.
+ # @param data [String] Must be 8 octets.
+ # @raise [ArgumentError] If the data is not 8 octets.
+ def ping(data = "plum\x00\x00\x00\x00")
+ send_immediately Frame.ping(data)
+ end
+
+ # Sends GOAWAY frame to the peer and closes the connection.
+ # @param error_type [Symbol] The error type to be contained in the GOAWAY frame.
+ def goaway(error_type = :no_error, message = "")
+ last_id = @max_stream_ids.max
+ send_immediately Frame.goaway(last_id, error_type, message)
+ end
+
+ # Returns whether peer enables server push or not
+ def push_enabled?
+ @remote_settings[:enable_push] == 1
+ end
+
private
def consume_buffer
while frame = Frame.parse!(@buffer)
diff --git a/lib/plum/connection_utils.rb b/lib/plum/connection_utils.rb
deleted file mode 100644
index b8329b8..0000000
--- a/lib/plum/connection_utils.rb
+++ /dev/null
@@ -1,42 +0,0 @@
-# frozen-string-literal: true
-
-using Plum::BinaryString
-
-module Plum
- module ConnectionUtils
- # Sends local settings to the peer.
- # @param kwargs [Hash<Symbol, Integer>]
- def settings(**kwargs)
- send_immediately Frame.settings(**kwargs)
- update_local_settings(kwargs)
- end
-
- # Sends a PING frame to the peer.
- # @param data [String] Must be 8 octets.
- # @raise [ArgumentError] If the data is not 8 octets.
- def ping(data = "plum\x00\x00\x00\x00")
- send_immediately Frame.ping(data)
- end
-
- # Sends GOAWAY frame to the peer and closes the connection.
- # @param error_type [Symbol] The error type to be contained in the GOAWAY frame.
- def goaway(error_type = :no_error, message = "")
- last_id = @max_stream_ids.max
- send_immediately Frame.goaway(last_id, error_type, message)
- end
-
- # Returns whether peer enables server push or not
- def push_enabled?
- @remote_settings[:enable_push] == 1
- end
-
- private
- def update_local_settings(new_settings)
- old_settings = @local_settings.dup
- @local_settings.merge!(new_settings)
-
- @hpack_decoder.limit = @local_settings[:header_table_size]
- update_recv_initial_window_size(@local_settings[:initial_window_size] - old_settings[:initial_window_size])
- end
- end
-end
diff --git a/lib/plum/frame.rb b/lib/plum/frame.rb
index bf61629..57b3061 100644
--- a/lib/plum/frame.rb
+++ b/lib/plum/frame.rb
@@ -4,9 +4,6 @@ using Plum::BinaryString
module Plum
class Frame
- extend FrameFactory
- include FrameUtils
-
FRAME_TYPES = {
data: 0x00,
headers: 0x01,
@@ -153,6 +150,43 @@ module Plum
"#<Plum::Frame:0x%04x} length=%d, type=%p, flags=%p, stream_id=0x%04x, payload=%p>" % [__id__, length, type, flags, stream_id, payload]
end
+ # Splits this frame into multiple frames not to exceed MAX_FRAME_SIZE.
+ # @param max [Integer] The maximum size of a frame payload.
+ # @yield [Frame] The splitted frames.
+ def split(max)
+ return yield self if @length <= max
+ first, *mid, last = @payload.chunk(max)
+ case type
+ when :data
+ yield Frame.new(type_value: 0, stream_id: @stream_id, payload: first, flags_value: @flags_value & ~1)
+ mid.each { |slice|
+ yield Frame.new(type_value: 0, stream_id: @stream_id, payload: slice, flags_value: 0)
+ }
+ yield Frame.new(type_value: 0, stream_id: @stream_id, payload: last, flags_value: @flags_value & 1)
+ when :headers, :push_promise
+ yield Frame.new(type_value: @type_value, stream_id: @stream_id, payload: first, flags_value: @flags_value & ~4)
+ mid.each { |slice|
+ yield Frame.new(type: :continuation, stream_id: @stream_id, payload: slice, flags_value: 0)
+ }
+ yield Frame.new(type: :continuation, stream_id: @stream_id, payload: last, flags_value: @flags_value & 4)
+ else
+ raise NotImplementedError.new("frame split of frame with type #{type} is not supported")
+ end
+ end
+
+ # Parses SETTINGS frame payload. Ignores unknown settings type (see RFC7540 6.5.2).
+ # @return [Hash<Symbol, Integer>] The parsed strings.
+ def parse_settings
+ settings = {}
+ payload.each_byteslice(6) do |param|
+ id = param.uint16
+ name = Frame::SETTINGS_TYPE.key(id)
+ # ignore unknown settings type
+ settings[name] = param.uint32(2) if name
+ end
+ settings
+ end
+
# Parses a frame from given buffer. It changes given buffer.
# @param buffer [String] The buffer stored the data received from peer. Encoding must be Encoding::BINARY.
# @return [Frame, nil] The parsed frame or nil if the buffer is imcomplete.
@@ -171,5 +205,95 @@ module Plum
stream_id: stream_id,
payload: cur.byteslice(9, length)).freeze
end
+
+ # Creates a RST_STREAM frame.
+ # @param stream_id [Integer] The stream ID.
+ # @param error_type [Symbol] The error type defined in RFC 7540 Section 7.
+ def self.rst_stream(stream_id, error_type)
+ payload = String.new.push_uint32(HTTPError::ERROR_CODES[error_type])
+ Frame.new(type: :rst_stream, stream_id: stream_id, payload: payload)
+ end
+
+ # Creates a GOAWAY frame.
+ # @param last_id [Integer] The biggest processed stream ID.
+ # @param error_type [Symbol] The error type defined in RFC 7540 Section 7.
+ # @param message [String] Additional debug data.
+ # @see RFC 7540 Section 6.8
+ def self.goaway(last_id, error_type, message = "")
+ payload = String.new.push_uint32(last_id)
+ .push_uint32(HTTPError::ERROR_CODES[error_type])
+ .push(message)
+ Frame.new(type: :goaway, stream_id: 0, payload: payload)
+ end
+
+ # Creates a SETTINGS frame.
+ # @param ack [Symbol] Pass :ack to create an ACK frame.
+ # @param args [Hash<Symbol, Integer>] The settings values to send.
+ def self.settings(ack = nil, **args)
+ payload = String.new
+ args.each { |key, value|
+ id = Frame::SETTINGS_TYPE[key] or raise ArgumentError.new("invalid settings type")
+ payload.push_uint16(id)
+ payload.push_uint32(value)
+ }
+ Frame.new(type: :settings, stream_id: 0, flags: [ack], payload: payload)
+ end
+
+ # Creates a PING frame.
+ # @overload ping(ack, payload)
+ # @param ack [Symbol] Pass :ack to create an ACK frame.
+ # @param payload [String] 8 bytes length data to send.
+ # @overload ping(payload = "plum\x00\x00\x00\x00")
+ # @param payload [String] 8 bytes length data to send.
+ def self.ping(arg1 = "plum\x00\x00\x00\x00".b, arg2 = nil)
+ if !arg2
+ raise ArgumentError.new("data must be 8 octets") if arg1.bytesize != 8
+ arg1 = arg1.b if arg1.encoding != Encoding::BINARY
+ Frame.new(type: :ping, stream_id: 0, payload: arg1)
+ else
+ Frame.new(type: :ping, stream_id: 0, flags: [:ack], payload: arg2)
+ end
+ end
+
+ # Creates a DATA frame.
+ # @param stream_id [Integer] The stream ID.
+ # @param payload [String] Payload.
+ # @param end_stream [Boolean] add END_STREAM flag
+ def self.data(stream_id, payload = "", end_stream: false)
+ payload = payload.b if payload&.encoding != Encoding::BINARY
+ fval = end_stream ? 1 : 0
+ Frame.new(type_value: 0, stream_id: stream_id, flags_value: fval, payload: payload)
+ end
+
+ # Creates a HEADERS frame.
+ # @param stream_id [Integer] The stream ID.
+ # @param encoded [String] Headers.
+ # @param end_stream [Boolean] add END_STREAM flag
+ # @param end_headers [Boolean] add END_HEADERS flag
+ def self.headers(stream_id, encoded, end_stream: false, end_headers: false)
+ fval = end_stream ? 1 : 0
+ fval += 4 if end_headers
+ Frame.new(type_value: 1, stream_id: stream_id, flags_value: fval, payload: encoded)
+ end
+
+ # Creates a PUSH_PROMISE frame.
+ # @param stream_id [Integer] The stream ID.
+ # @param new_id [Integer] The stream ID to create.
+ # @param encoded [String] Request headers.
+ # @param end_headers [Boolean] add END_HEADERS flag
+ def self.push_promise(stream_id, new_id, encoded, end_headers: false)
+ payload = String.new.push_uint32(new_id)
+ .push(encoded)
+ fval = end_headers ? 4 : 0
+ Frame.new(type: :push_promise, stream_id: stream_id, flags_value: fval, payload: payload)
+ end
+
+ # Creates a CONTINUATION frame.
+ # @param stream_id [Integer] The stream ID.
+ # @param payload [String] Payload.
+ # @param end_headers [Boolean] add END_HEADERS flag
+ def self.continuation(stream_id, payload, end_headers: false)
+ Frame.new(type: :continuation, stream_id: stream_id, flags_value: (end_headers ? 4 : 0), payload: payload)
+ end
end
end
diff --git a/lib/plum/frame_factory.rb b/lib/plum/frame_factory.rb
deleted file mode 100644
index 514d99d..0000000
--- a/lib/plum/frame_factory.rb
+++ /dev/null
@@ -1,97 +0,0 @@
-# frozen-string-literal: true
-
-using Plum::BinaryString
-
-module Plum
- module FrameFactory
- # Creates a RST_STREAM frame.
- # @param stream_id [Integer] The stream ID.
- # @param error_type [Symbol] The error type defined in RFC 7540 Section 7.
- def rst_stream(stream_id, error_type)
- payload = String.new.push_uint32(HTTPError::ERROR_CODES[error_type])
- Frame.new(type: :rst_stream, stream_id: stream_id, payload: payload)
- end
-
- # Creates a GOAWAY frame.
- # @param last_id [Integer] The biggest processed stream ID.
- # @param error_type [Symbol] The error type defined in RFC 7540 Section 7.
- # @param message [String] Additional debug data.
- # @see RFC 7540 Section 6.8
- def goaway(last_id, error_type, message = "")
- payload = String.new.push_uint32(last_id)
- .push_uint32(HTTPError::ERROR_CODES[error_type])
- .push(message)
- Frame.new(type: :goaway, stream_id: 0, payload: payload)
- end
-
- # Creates a SETTINGS frame.
- # @param ack [Symbol] Pass :ack to create an ACK frame.
- # @param args [Hash<Symbol, Integer>] The settings values to send.
- def settings(ack = nil, **args)
- payload = String.new
- args.each { |key, value|
- id = Frame::SETTINGS_TYPE[key] or raise ArgumentError.new("invalid settings type")
- payload.push_uint16(id)
- payload.push_uint32(value)
- }
- Frame.new(type: :settings, stream_id: 0, flags: [ack], payload: payload)
- end
-
- # Creates a PING frame.
- # @overload ping(ack, payload)
- # @param ack [Symbol] Pass :ack to create an ACK frame.
- # @param payload [String] 8 bytes length data to send.
- # @overload ping(payload = "plum\x00\x00\x00\x00")
- # @param payload [String] 8 bytes length data to send.
- def ping(arg1 = "plum\x00\x00\x00\x00".b, arg2 = nil)
- if !arg2
- raise ArgumentError.new("data must be 8 octets") if arg1.bytesize != 8
- arg1 = arg1.b if arg1.encoding != Encoding::BINARY
- Frame.new(type: :ping, stream_id: 0, payload: arg1)
- else
- Frame.new(type: :ping, stream_id: 0, flags: [:ack], payload: arg2)
- end
- end
-
- # Creates a DATA frame.
- # @param stream_id [Integer] The stream ID.
- # @param payload [String] Payload.
- # @param end_stream [Boolean] add END_STREAM flag
- def data(stream_id, payload = "", end_stream: false)
- payload = payload.b if payload&.encoding != Encoding::BINARY
- fval = end_stream ? 1 : 0
- Frame.new(type_value: 0, stream_id: stream_id, flags_value: fval, payload: payload)
- end
-
- # Creates a HEADERS frame.
- # @param stream_id [Integer] The stream ID.
- # @param encoded [String] Headers.
- # @param end_stream [Boolean] add END_STREAM flag
- # @param end_headers [Boolean] add END_HEADERS flag
- def headers(stream_id, encoded, end_stream: false, end_headers: false)
- fval = end_stream ? 1 : 0
- fval += 4 if end_headers
- Frame.new(type_value: 1, stream_id: stream_id, flags_value: fval, payload: encoded)
- end
-
- # Creates a PUSH_PROMISE frame.
- # @param stream_id [Integer] The stream ID.
- # @param new_id [Integer] The stream ID to create.
- # @param encoded [String] Request headers.
- # @param end_headers [Boolean] add END_HEADERS flag
- def push_promise(stream_id, new_id, encoded, end_headers: false)
- payload = String.new.push_uint32(new_id)
- .push(encoded)
- fval = end_headers ? 4 : 0
- Frame.new(type: :push_promise, stream_id: stream_id, flags_value: fval, payload: payload)
- end
-
- # Creates a CONTINUATION frame.
- # @param stream_id [Integer] The stream ID.
- # @param payload [String] Payload.
- # @param end_headers [Boolean] add END_HEADERS flag
- def continuation(stream_id, payload, end_headers: false)
- Frame.new(type: :continuation, stream_id: stream_id, flags_value: (end_headers ? 4 : 0), payload: payload)
- end
- end
-end
diff --git a/lib/plum/frame_utils.rb b/lib/plum/frame_utils.rb
deleted file mode 100644
index cbb16fa..0000000
--- a/lib/plum/frame_utils.rb
+++ /dev/null
@@ -1,44 +0,0 @@
-# frozen-string-literal: true
-
-using Plum::BinaryString
-
-module Plum
- module FrameUtils
- # Splits this frame into multiple frames not to exceed MAX_FRAME_SIZE.
- # @param max [Integer] The maximum size of a frame payload.
- # @yield [Frame] The splitted frames.
- def split(max)
- return yield self if @length <= max
- first, *mid, last = @payload.chunk(max)
- case type
- when :data
- yield Frame.new(type_value: 0, stream_id: @stream_id, payload: first, flags_value: @flags_value & ~1)
- mid.each { |slice|
- yield Frame.new(type_value: 0, stream_id: @stream_id, payload: slice, flags_value: 0)
- }
- yield Frame.new(type_value: 0, stream_id: @stream_id, payload: last, flags_value: @flags_value & 1)
- when :headers, :push_promise
- yield Frame.new(type_value: @type_value, stream_id: @stream_id, payload: first, flags_value: @flags_value & ~4)
- mid.each { |slice|
- yield Frame.new(type: :continuation, stream_id: @stream_id, payload: slice, flags_value: 0)
- }
- yield Frame.new(type: :continuation, stream_id: @stream_id, payload: last, flags_value: @flags_value & 4)
- else
- raise NotImplementedError.new("frame split of frame with type #{type} is not supported")
- end
- end
-
- # Parses SETTINGS frame payload. Ignores unknown settings type (see RFC7540 6.5.2).
- # @return [Hash<Symbol, Integer>] The parsed strings.
- def parse_settings
- settings = {}
- payload.each_byteslice(6) do |param|
- id = param.uint16
- name = Frame::SETTINGS_TYPE.key(id)
- # ignore unknown settings type
- settings[name] = param.uint32(2) if name
- end
- settings
- end
- end
-end
diff --git a/lib/plum/stream.rb b/lib/plum/stream.rb
index 823a41a..733aeaf 100644
--- a/lib/plum/stream.rb
+++ b/lib/plum/stream.rb
@@ -6,7 +6,6 @@ module Plum
class Stream
include EventEmitter
include FlowControl
- include StreamUtils
attr_reader :id, :state, :connection
attr_reader :weight, :exclusive
@@ -92,6 +91,43 @@ module Plum
end
end
+ # Reserves a stream to server push. Sends PUSH_PROMISE and create new stream.
+ # @param headers [Enumerable<String, String>] The *request* headers. It must contain all of them: ':authority', ':method', ':scheme' and ':path'.
+ # @return [Stream] The stream to send push response.
+ def promise(headers)
+ stream = @connection.reserve_stream(weight: self.weight + 1, parent: self)
+ encoded = @connection.hpack_encoder.encode(headers)
+ frame = Frame.push_promise(id, stream.id, encoded, end_headers: true)
+ send frame
+ stream
+ end
+
+ # Sends response headers. If the encoded frame is larger than MAX_FRAME_SIZE, the headers will be splitted into HEADERS frame and CONTINUATION frame(s).
+ # @param headers [Enumerable<String, String>] The response headers.
+ # @param end_stream [Boolean] Set END_STREAM flag or not.
+ def send_headers(headers, end_stream:)
+ encoded = @connection.hpack_encoder.encode(headers)
+ frame = Frame.headers(id, encoded, end_headers: true, end_stream: end_stream)
+ send frame
+ @state = :half_closed_local if end_stream
+ end
+
+ # Sends DATA frame. If the data is larger than MAX_FRAME_SIZE, DATA frame will be splitted.
+ # @param data [String, IO] The data to send.
+ # @param end_stream [Boolean] Set END_STREAM flag or not.
+ def send_data(data = "", end_stream: true)
+ max = @connection.remote_settings[:max_frame_size]
+ if data.is_a?(IO)
+ until data.eof?
+ fragment = data.readpartial(max)
+ send Frame.data(id, fragment, end_stream: end_stream && data.eof?)
+ end
+ else
+ send Frame.data(id, data, end_stream: end_stream)
+ end
+ @state = :half_closed_local if end_stream
+ end
+
private
def send_immediately(frame)
@connection.send(frame)
diff --git a/lib/plum/stream_utils.rb b/lib/plum/stream_utils.rb
deleted file mode 100644
index dca86e7..0000000
--- a/lib/plum/stream_utils.rb
+++ /dev/null
@@ -1,44 +0,0 @@
-# frozen-string-literal: true
-
-using Plum::BinaryString
-
-module Plum
- module StreamUtils
- # Reserves a stream to server push. Sends PUSH_PROMISE and create new stream.
- # @param headers [Enumerable<String, String>] The *request* headers. It must contain all of them: ':authority', ':method', ':scheme' and ':path'.
- # @return [Stream] The stream to send push response.
- def promise(headers)
- stream = @connection.reserve_stream(weight: self.weight + 1, parent: self)
- encoded = @connection.hpack_encoder.encode(headers)
- frame = Frame.push_promise(id, stream.id, encoded, end_headers: true)
- send frame
- stream
- end
-
- # Sends response headers. If the encoded frame is larger than MAX_FRAME_SIZE, the headers will be splitted into HEADERS frame and CONTINUATION frame(s).
- # @param headers [Enumerable<String, String>] The response headers.
- # @param end_stream [Boolean] Set END_STREAM flag or not.
- def send_headers(headers, end_stream:)
- encoded = @connection.hpack_encoder.encode(headers)
- frame = Frame.headers(id, encoded, end_headers: true, end_stream: end_stream)
- send frame
- @state = :half_closed_local if end_stream
- end
-
- # Sends DATA frame. If the data is larger than MAX_FRAME_SIZE, DATA frame will be splitted.
- # @param data [String, IO] The data to send.
- # @param end_stream [Boolean] Set END_STREAM flag or not.
- def send_data(data = "", end_stream: true)
- max = @connection.remote_settings[:max_frame_size]
- if data.is_a?(IO)
- until data.eof?
- fragment = data.readpartial(max)
- send Frame.data(id, fragment, end_stream: end_stream && data.eof?)
- end
- else
- send Frame.data(id, data, end_stream: end_stream)
- end
- @state = :half_closed_local if end_stream
- end
- end
-end