From 26ec6b88a8e078499835851a0310eba486475b7b Mon Sep 17 00:00:00 2001 From: Kazuki Yamaguchi Date: Sun, 8 May 2016 17:01:44 +0900 Subject: merge *_{utils,factory} This improves readability a bit. --- lib/plum.rb | 4 -- lib/plum/connection.rb | 32 ++++++++++- lib/plum/connection_utils.rb | 42 -------------- lib/plum/frame.rb | 130 ++++++++++++++++++++++++++++++++++++++++++- lib/plum/frame_factory.rb | 97 -------------------------------- lib/plum/frame_utils.rb | 44 --------------- lib/plum/stream.rb | 38 ++++++++++++- lib/plum/stream_utils.rb | 44 --------------- 8 files changed, 195 insertions(+), 236 deletions(-) delete mode 100644 lib/plum/connection_utils.rb delete mode 100644 lib/plum/frame_factory.rb delete mode 100644 lib/plum/frame_utils.rb delete mode 100644 lib/plum/stream_utils.rb diff --git a/lib/plum.rb b/lib/plum.rb index 9452d8f..3220bbc 100644 --- a/lib/plum.rb +++ b/lib/plum.rb @@ -12,13 +12,9 @@ require "plum/hpack/huffman" require "plum/hpack/context" require "plum/hpack/decoder" require "plum/hpack/encoder" -require "plum/frame_utils" -require "plum/frame_factory" require "plum/frame" require "plum/flow_control" -require "plum/connection_utils" require "plum/connection" -require "plum/stream_utils" require "plum/stream" require "plum/server/connection" require "plum/server/ssl_socket_connection" diff --git a/lib/plum/connection.rb b/lib/plum/connection.rb index 0ba5989..1eac16e 100644 --- a/lib/plum/connection.rb +++ b/lib/plum/connection.rb @@ -6,7 +6,6 @@ module Plum class Connection include EventEmitter include FlowControl - include ConnectionUtils CLIENT_CONNECTION_PREFACE = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" @@ -83,6 +82,37 @@ module Plum stream end + # Sends local settings to the peer. + # @param new_settings [Hash] + def settings(**new_settings) + send_immediately Frame.settings(**new_settings) + + old_settings = @local_settings.dup + @local_settings.merge!(new_settings) + + @hpack_decoder.limit = @local_settings[:header_table_size] + update_recv_initial_window_size(@local_settings[:initial_window_size] - old_settings[:initial_window_size]) + end + + # Sends a PING frame to the peer. + # @param data [String] Must be 8 octets. + # @raise [ArgumentError] If the data is not 8 octets. + def ping(data = "plum\x00\x00\x00\x00") + send_immediately Frame.ping(data) + end + + # Sends GOAWAY frame to the peer and closes the connection. + # @param error_type [Symbol] The error type to be contained in the GOAWAY frame. + def goaway(error_type = :no_error, message = "") + last_id = @max_stream_ids.max + send_immediately Frame.goaway(last_id, error_type, message) + end + + # Returns whether peer enables server push or not + def push_enabled? + @remote_settings[:enable_push] == 1 + end + private def consume_buffer while frame = Frame.parse!(@buffer) diff --git a/lib/plum/connection_utils.rb b/lib/plum/connection_utils.rb deleted file mode 100644 index b8329b8..0000000 --- a/lib/plum/connection_utils.rb +++ /dev/null @@ -1,42 +0,0 @@ -# frozen-string-literal: true - -using Plum::BinaryString - -module Plum - module ConnectionUtils - # Sends local settings to the peer. - # @param kwargs [Hash] - def settings(**kwargs) - send_immediately Frame.settings(**kwargs) - update_local_settings(kwargs) - end - - # Sends a PING frame to the peer. - # @param data [String] Must be 8 octets. - # @raise [ArgumentError] If the data is not 8 octets. - def ping(data = "plum\x00\x00\x00\x00") - send_immediately Frame.ping(data) - end - - # Sends GOAWAY frame to the peer and closes the connection. - # @param error_type [Symbol] The error type to be contained in the GOAWAY frame. - def goaway(error_type = :no_error, message = "") - last_id = @max_stream_ids.max - send_immediately Frame.goaway(last_id, error_type, message) - end - - # Returns whether peer enables server push or not - def push_enabled? - @remote_settings[:enable_push] == 1 - end - - private - def update_local_settings(new_settings) - old_settings = @local_settings.dup - @local_settings.merge!(new_settings) - - @hpack_decoder.limit = @local_settings[:header_table_size] - update_recv_initial_window_size(@local_settings[:initial_window_size] - old_settings[:initial_window_size]) - end - end -end diff --git a/lib/plum/frame.rb b/lib/plum/frame.rb index bf61629..57b3061 100644 --- a/lib/plum/frame.rb +++ b/lib/plum/frame.rb @@ -4,9 +4,6 @@ using Plum::BinaryString module Plum class Frame - extend FrameFactory - include FrameUtils - FRAME_TYPES = { data: 0x00, headers: 0x01, @@ -153,6 +150,43 @@ module Plum "#" % [__id__, length, type, flags, stream_id, payload] end + # Splits this frame into multiple frames not to exceed MAX_FRAME_SIZE. + # @param max [Integer] The maximum size of a frame payload. + # @yield [Frame] The splitted frames. + def split(max) + return yield self if @length <= max + first, *mid, last = @payload.chunk(max) + case type + when :data + yield Frame.new(type_value: 0, stream_id: @stream_id, payload: first, flags_value: @flags_value & ~1) + mid.each { |slice| + yield Frame.new(type_value: 0, stream_id: @stream_id, payload: slice, flags_value: 0) + } + yield Frame.new(type_value: 0, stream_id: @stream_id, payload: last, flags_value: @flags_value & 1) + when :headers, :push_promise + yield Frame.new(type_value: @type_value, stream_id: @stream_id, payload: first, flags_value: @flags_value & ~4) + mid.each { |slice| + yield Frame.new(type: :continuation, stream_id: @stream_id, payload: slice, flags_value: 0) + } + yield Frame.new(type: :continuation, stream_id: @stream_id, payload: last, flags_value: @flags_value & 4) + else + raise NotImplementedError.new("frame split of frame with type #{type} is not supported") + end + end + + # Parses SETTINGS frame payload. Ignores unknown settings type (see RFC7540 6.5.2). + # @return [Hash] The parsed strings. + def parse_settings + settings = {} + payload.each_byteslice(6) do |param| + id = param.uint16 + name = Frame::SETTINGS_TYPE.key(id) + # ignore unknown settings type + settings[name] = param.uint32(2) if name + end + settings + end + # Parses a frame from given buffer. It changes given buffer. # @param buffer [String] The buffer stored the data received from peer. Encoding must be Encoding::BINARY. # @return [Frame, nil] The parsed frame or nil if the buffer is imcomplete. @@ -171,5 +205,95 @@ module Plum stream_id: stream_id, payload: cur.byteslice(9, length)).freeze end + + # Creates a RST_STREAM frame. + # @param stream_id [Integer] The stream ID. + # @param error_type [Symbol] The error type defined in RFC 7540 Section 7. + def self.rst_stream(stream_id, error_type) + payload = String.new.push_uint32(HTTPError::ERROR_CODES[error_type]) + Frame.new(type: :rst_stream, stream_id: stream_id, payload: payload) + end + + # Creates a GOAWAY frame. + # @param last_id [Integer] The biggest processed stream ID. + # @param error_type [Symbol] The error type defined in RFC 7540 Section 7. + # @param message [String] Additional debug data. + # @see RFC 7540 Section 6.8 + def self.goaway(last_id, error_type, message = "") + payload = String.new.push_uint32(last_id) + .push_uint32(HTTPError::ERROR_CODES[error_type]) + .push(message) + Frame.new(type: :goaway, stream_id: 0, payload: payload) + end + + # Creates a SETTINGS frame. + # @param ack [Symbol] Pass :ack to create an ACK frame. + # @param args [Hash] The settings values to send. + def self.settings(ack = nil, **args) + payload = String.new + args.each { |key, value| + id = Frame::SETTINGS_TYPE[key] or raise ArgumentError.new("invalid settings type") + payload.push_uint16(id) + payload.push_uint32(value) + } + Frame.new(type: :settings, stream_id: 0, flags: [ack], payload: payload) + end + + # Creates a PING frame. + # @overload ping(ack, payload) + # @param ack [Symbol] Pass :ack to create an ACK frame. + # @param payload [String] 8 bytes length data to send. + # @overload ping(payload = "plum\x00\x00\x00\x00") + # @param payload [String] 8 bytes length data to send. + def self.ping(arg1 = "plum\x00\x00\x00\x00".b, arg2 = nil) + if !arg2 + raise ArgumentError.new("data must be 8 octets") if arg1.bytesize != 8 + arg1 = arg1.b if arg1.encoding != Encoding::BINARY + Frame.new(type: :ping, stream_id: 0, payload: arg1) + else + Frame.new(type: :ping, stream_id: 0, flags: [:ack], payload: arg2) + end + end + + # Creates a DATA frame. + # @param stream_id [Integer] The stream ID. + # @param payload [String] Payload. + # @param end_stream [Boolean] add END_STREAM flag + def self.data(stream_id, payload = "", end_stream: false) + payload = payload.b if payload&.encoding != Encoding::BINARY + fval = end_stream ? 1 : 0 + Frame.new(type_value: 0, stream_id: stream_id, flags_value: fval, payload: payload) + end + + # Creates a HEADERS frame. + # @param stream_id [Integer] The stream ID. + # @param encoded [String] Headers. + # @param end_stream [Boolean] add END_STREAM flag + # @param end_headers [Boolean] add END_HEADERS flag + def self.headers(stream_id, encoded, end_stream: false, end_headers: false) + fval = end_stream ? 1 : 0 + fval += 4 if end_headers + Frame.new(type_value: 1, stream_id: stream_id, flags_value: fval, payload: encoded) + end + + # Creates a PUSH_PROMISE frame. + # @param stream_id [Integer] The stream ID. + # @param new_id [Integer] The stream ID to create. + # @param encoded [String] Request headers. + # @param end_headers [Boolean] add END_HEADERS flag + def self.push_promise(stream_id, new_id, encoded, end_headers: false) + payload = String.new.push_uint32(new_id) + .push(encoded) + fval = end_headers ? 4 : 0 + Frame.new(type: :push_promise, stream_id: stream_id, flags_value: fval, payload: payload) + end + + # Creates a CONTINUATION frame. + # @param stream_id [Integer] The stream ID. + # @param payload [String] Payload. + # @param end_headers [Boolean] add END_HEADERS flag + def self.continuation(stream_id, payload, end_headers: false) + Frame.new(type: :continuation, stream_id: stream_id, flags_value: (end_headers ? 4 : 0), payload: payload) + end end end diff --git a/lib/plum/frame_factory.rb b/lib/plum/frame_factory.rb deleted file mode 100644 index 514d99d..0000000 --- a/lib/plum/frame_factory.rb +++ /dev/null @@ -1,97 +0,0 @@ -# frozen-string-literal: true - -using Plum::BinaryString - -module Plum - module FrameFactory - # Creates a RST_STREAM frame. - # @param stream_id [Integer] The stream ID. - # @param error_type [Symbol] The error type defined in RFC 7540 Section 7. - def rst_stream(stream_id, error_type) - payload = String.new.push_uint32(HTTPError::ERROR_CODES[error_type]) - Frame.new(type: :rst_stream, stream_id: stream_id, payload: payload) - end - - # Creates a GOAWAY frame. - # @param last_id [Integer] The biggest processed stream ID. - # @param error_type [Symbol] The error type defined in RFC 7540 Section 7. - # @param message [String] Additional debug data. - # @see RFC 7540 Section 6.8 - def goaway(last_id, error_type, message = "") - payload = String.new.push_uint32(last_id) - .push_uint32(HTTPError::ERROR_CODES[error_type]) - .push(message) - Frame.new(type: :goaway, stream_id: 0, payload: payload) - end - - # Creates a SETTINGS frame. - # @param ack [Symbol] Pass :ack to create an ACK frame. - # @param args [Hash] The settings values to send. - def settings(ack = nil, **args) - payload = String.new - args.each { |key, value| - id = Frame::SETTINGS_TYPE[key] or raise ArgumentError.new("invalid settings type") - payload.push_uint16(id) - payload.push_uint32(value) - } - Frame.new(type: :settings, stream_id: 0, flags: [ack], payload: payload) - end - - # Creates a PING frame. - # @overload ping(ack, payload) - # @param ack [Symbol] Pass :ack to create an ACK frame. - # @param payload [String] 8 bytes length data to send. - # @overload ping(payload = "plum\x00\x00\x00\x00") - # @param payload [String] 8 bytes length data to send. - def ping(arg1 = "plum\x00\x00\x00\x00".b, arg2 = nil) - if !arg2 - raise ArgumentError.new("data must be 8 octets") if arg1.bytesize != 8 - arg1 = arg1.b if arg1.encoding != Encoding::BINARY - Frame.new(type: :ping, stream_id: 0, payload: arg1) - else - Frame.new(type: :ping, stream_id: 0, flags: [:ack], payload: arg2) - end - end - - # Creates a DATA frame. - # @param stream_id [Integer] The stream ID. - # @param payload [String] Payload. - # @param end_stream [Boolean] add END_STREAM flag - def data(stream_id, payload = "", end_stream: false) - payload = payload.b if payload&.encoding != Encoding::BINARY - fval = end_stream ? 1 : 0 - Frame.new(type_value: 0, stream_id: stream_id, flags_value: fval, payload: payload) - end - - # Creates a HEADERS frame. - # @param stream_id [Integer] The stream ID. - # @param encoded [String] Headers. - # @param end_stream [Boolean] add END_STREAM flag - # @param end_headers [Boolean] add END_HEADERS flag - def headers(stream_id, encoded, end_stream: false, end_headers: false) - fval = end_stream ? 1 : 0 - fval += 4 if end_headers - Frame.new(type_value: 1, stream_id: stream_id, flags_value: fval, payload: encoded) - end - - # Creates a PUSH_PROMISE frame. - # @param stream_id [Integer] The stream ID. - # @param new_id [Integer] The stream ID to create. - # @param encoded [String] Request headers. - # @param end_headers [Boolean] add END_HEADERS flag - def push_promise(stream_id, new_id, encoded, end_headers: false) - payload = String.new.push_uint32(new_id) - .push(encoded) - fval = end_headers ? 4 : 0 - Frame.new(type: :push_promise, stream_id: stream_id, flags_value: fval, payload: payload) - end - - # Creates a CONTINUATION frame. - # @param stream_id [Integer] The stream ID. - # @param payload [String] Payload. - # @param end_headers [Boolean] add END_HEADERS flag - def continuation(stream_id, payload, end_headers: false) - Frame.new(type: :continuation, stream_id: stream_id, flags_value: (end_headers ? 4 : 0), payload: payload) - end - end -end diff --git a/lib/plum/frame_utils.rb b/lib/plum/frame_utils.rb deleted file mode 100644 index cbb16fa..0000000 --- a/lib/plum/frame_utils.rb +++ /dev/null @@ -1,44 +0,0 @@ -# frozen-string-literal: true - -using Plum::BinaryString - -module Plum - module FrameUtils - # Splits this frame into multiple frames not to exceed MAX_FRAME_SIZE. - # @param max [Integer] The maximum size of a frame payload. - # @yield [Frame] The splitted frames. - def split(max) - return yield self if @length <= max - first, *mid, last = @payload.chunk(max) - case type - when :data - yield Frame.new(type_value: 0, stream_id: @stream_id, payload: first, flags_value: @flags_value & ~1) - mid.each { |slice| - yield Frame.new(type_value: 0, stream_id: @stream_id, payload: slice, flags_value: 0) - } - yield Frame.new(type_value: 0, stream_id: @stream_id, payload: last, flags_value: @flags_value & 1) - when :headers, :push_promise - yield Frame.new(type_value: @type_value, stream_id: @stream_id, payload: first, flags_value: @flags_value & ~4) - mid.each { |slice| - yield Frame.new(type: :continuation, stream_id: @stream_id, payload: slice, flags_value: 0) - } - yield Frame.new(type: :continuation, stream_id: @stream_id, payload: last, flags_value: @flags_value & 4) - else - raise NotImplementedError.new("frame split of frame with type #{type} is not supported") - end - end - - # Parses SETTINGS frame payload. Ignores unknown settings type (see RFC7540 6.5.2). - # @return [Hash] The parsed strings. - def parse_settings - settings = {} - payload.each_byteslice(6) do |param| - id = param.uint16 - name = Frame::SETTINGS_TYPE.key(id) - # ignore unknown settings type - settings[name] = param.uint32(2) if name - end - settings - end - end -end diff --git a/lib/plum/stream.rb b/lib/plum/stream.rb index 823a41a..733aeaf 100644 --- a/lib/plum/stream.rb +++ b/lib/plum/stream.rb @@ -6,7 +6,6 @@ module Plum class Stream include EventEmitter include FlowControl - include StreamUtils attr_reader :id, :state, :connection attr_reader :weight, :exclusive @@ -92,6 +91,43 @@ module Plum end end + # Reserves a stream to server push. Sends PUSH_PROMISE and create new stream. + # @param headers [Enumerable] The *request* headers. It must contain all of them: ':authority', ':method', ':scheme' and ':path'. + # @return [Stream] The stream to send push response. + def promise(headers) + stream = @connection.reserve_stream(weight: self.weight + 1, parent: self) + encoded = @connection.hpack_encoder.encode(headers) + frame = Frame.push_promise(id, stream.id, encoded, end_headers: true) + send frame + stream + end + + # Sends response headers. If the encoded frame is larger than MAX_FRAME_SIZE, the headers will be splitted into HEADERS frame and CONTINUATION frame(s). + # @param headers [Enumerable] The response headers. + # @param end_stream [Boolean] Set END_STREAM flag or not. + def send_headers(headers, end_stream:) + encoded = @connection.hpack_encoder.encode(headers) + frame = Frame.headers(id, encoded, end_headers: true, end_stream: end_stream) + send frame + @state = :half_closed_local if end_stream + end + + # Sends DATA frame. If the data is larger than MAX_FRAME_SIZE, DATA frame will be splitted. + # @param data [String, IO] The data to send. + # @param end_stream [Boolean] Set END_STREAM flag or not. + def send_data(data = "", end_stream: true) + max = @connection.remote_settings[:max_frame_size] + if data.is_a?(IO) + until data.eof? + fragment = data.readpartial(max) + send Frame.data(id, fragment, end_stream: end_stream && data.eof?) + end + else + send Frame.data(id, data, end_stream: end_stream) + end + @state = :half_closed_local if end_stream + end + private def send_immediately(frame) @connection.send(frame) diff --git a/lib/plum/stream_utils.rb b/lib/plum/stream_utils.rb deleted file mode 100644 index dca86e7..0000000 --- a/lib/plum/stream_utils.rb +++ /dev/null @@ -1,44 +0,0 @@ -# frozen-string-literal: true - -using Plum::BinaryString - -module Plum - module StreamUtils - # Reserves a stream to server push. Sends PUSH_PROMISE and create new stream. - # @param headers [Enumerable] The *request* headers. It must contain all of them: ':authority', ':method', ':scheme' and ':path'. - # @return [Stream] The stream to send push response. - def promise(headers) - stream = @connection.reserve_stream(weight: self.weight + 1, parent: self) - encoded = @connection.hpack_encoder.encode(headers) - frame = Frame.push_promise(id, stream.id, encoded, end_headers: true) - send frame - stream - end - - # Sends response headers. If the encoded frame is larger than MAX_FRAME_SIZE, the headers will be splitted into HEADERS frame and CONTINUATION frame(s). - # @param headers [Enumerable] The response headers. - # @param end_stream [Boolean] Set END_STREAM flag or not. - def send_headers(headers, end_stream:) - encoded = @connection.hpack_encoder.encode(headers) - frame = Frame.headers(id, encoded, end_headers: true, end_stream: end_stream) - send frame - @state = :half_closed_local if end_stream - end - - # Sends DATA frame. If the data is larger than MAX_FRAME_SIZE, DATA frame will be splitted. - # @param data [String, IO] The data to send. - # @param end_stream [Boolean] Set END_STREAM flag or not. - def send_data(data = "", end_stream: true) - max = @connection.remote_settings[:max_frame_size] - if data.is_a?(IO) - until data.eof? - fragment = data.readpartial(max) - send Frame.data(id, fragment, end_stream: end_stream && data.eof?) - end - else - send Frame.data(id, data, end_stream: end_stream) - end - @state = :half_closed_local if end_stream - end - end -end -- cgit v1.2.3