diff options
40 files changed, 1112 insertions, 318 deletions
diff --git a/.travis.yml b/.travis.yml index 3ea8ace..18ee383 100644 --- a/.travis.yml +++ b/.travis.yml @@ -7,8 +7,8 @@ install: - echo openssl_version=1.0.2d >> $rvm_path/user/db - rvm pkg install openssl - $rvm_path/usr/bin/openssl version - - rvm install ruby-head --with-openssl-dir=$rvm_path/usr - - rvm use ruby-head + - rvm install 2.2.3-alpn --patch https://gist.githubusercontent.com/rhenium/b1711edcc903e8887a51/raw/2309e469f5a3ba15917d804ac61b19e62b3d8faf/ruby-openssl-alpn-no-tests-and-docs.patch --with-openssl-dir=$rvm_path/usr + - rvm use 2.2.3-alpn - bundle install script: - bundle exec rake test @@ -1,5 +1,4 @@ -# -*- frozen-string-literal: true -*- -source 'https://rubygems.org' +source "https://rubygems.org" # Specify your gem's dependencies in plum.gemspec gemspec @@ -1,16 +1,45 @@ # Plum [![Build Status](https://travis-ci.org/rhenium/plum.png?branch=master)](https://travis-ci.org/rhenium/plum) [![Code Climate](https://codeclimate.com/github/rhenium/plum/badges/gpa.svg)](https://codeclimate.com/github/rhenium/plum) [![Test Coverage](https://codeclimate.com/github/rhenium/plum/badges/coverage.svg)](https://codeclimate.com/github/rhenium/plum/coverage) -A minimal implementation of HTTP/2 server. - -## Examples -* examples/ - Minimal usage. -* [rhenium/plum-server](https://github.com/rhenium/plum-server) - A example server for https://rhe.jp and http://rhe.jp. +A minimal pure Ruby implementation of HTTP/2 library / server. ## Requirements * Ruby * Ruby 2.2 with [ALPN support patch](https://gist.github.com/rhenium/b1711edcc903e8887a51) and [ECDH support patch (r51348)](https://bugs.ruby-lang.org/projects/ruby-trunk/repository/revisions/51348/diff?format=diff) * or latest Ruby 2.3.0-dev * OpenSSL 1.0.2 or newer (HTTP/2 requires ALPN) -* [http-parser.rb gem](https://rubygems.org/gems/http_parser.rb) (HTTP/1.1 parser; if you use "http" URI scheme) +* Optional: + * [http-parser.rb gem](https://rubygems.org/gems/http_parser.rb) (HTTP/1.1 parser; if you use "http" URI scheme) + * [rack gem](https://rubygems.org/gems/rack) if you use Plum as Rack server. + +## Usage +### As a library +* See documentation: http://www.rubydoc.info/gems/plum +* See examples in `examples/` +* [rhenium/plum-server](https://github.com/rhenium/plum-server) - A static-file server for https://rhe.jp and http://rhe.jp. + +### As a Rack-compatible server + +Most existing Rack-based applications (plum doesn't support Rack hijack API) should work without modification. + +```ruby +# config.ru +App = -> env { + [ + 200, + { "Content-Type" => "text/plain" }, + [" request: #{env["REQUEST_METHOD"]} #{env["PATH_INFO"]}"] + ] +} + +run App +``` + +You can run it: + +```sh +% plum -e production -p 8080 --https config.ru +``` + +By default, Plum generates a dummy server certificate if `--cert` and `--key` options are not specified. ## TODO * **Better API** diff --git a/bin/plum b/bin/plum new file mode 100755 index 0000000..6773828 --- /dev/null +++ b/bin/plum @@ -0,0 +1,7 @@ +#!/usr/bin/env ruby +$LOAD_PATH << File.expand_path("../../lib", __FILE__) +require "plum/rack" +require "plum/rack/cli" + +cli = Plum::Rack::CLI.new(ARGV) +cli.run diff --git a/examples/rack.ru b/examples/rack.ru new file mode 100644 index 0000000..d4541e0 --- /dev/null +++ b/examples/rack.ru @@ -0,0 +1,22 @@ +$LOAD_PATH << File.expand_path("../../lib", __FILE__) +require "plum/rack" + +class App2 + def call(env) + if env["REQUEST_METHOD"] == "GET" && env["PATH_INFO"] == "/" + [ + 200, + { "Content-Type" => "text/html" }, + ["8 bytes-" * 512] + ] + else + [ + 404, + { "Content-Type" => "text/html" }, + ["#{env["REQUEST_METHOD"]} #{env["PATH_INFO"]}"] + ] + end + end +end + +run App2.new diff --git a/lib/plum.rb b/lib/plum.rb index f9020a4..6dab221 100644 --- a/lib/plum.rb +++ b/lib/plum.rb @@ -1,7 +1,7 @@ -# -*- frozen-string-literal: true -*- require "openssl" require "socket" require "base64" +require "set" require "plum/version" require "plum/errors" require "plum/binary_string" diff --git a/lib/plum/binary_string.rb b/lib/plum/binary_string.rb index fcf7e77..400c57b 100644 --- a/lib/plum/binary_string.rb +++ b/lib/plum/binary_string.rb @@ -5,7 +5,7 @@ module Plum # Reads a 8-bit unsigned integer. # @param pos [Integer] The start position to read. def uint8(pos = 0) - byteslice(pos, 1).unpack("C")[0] + getbyte(pos) end # Reads a 16-bit unsigned integer. @@ -17,7 +17,8 @@ module Plum # Reads a 24-bit unsigned integer. # @param pos [Integer] The start position to read. def uint24(pos = 0) - (uint16(pos) << 8) | uint8(pos + 2) + a, b = byteslice(pos, 3).unpack("nC") + (a * 0x100) + b end # Reads a 32-bit unsigned integer. @@ -28,7 +29,7 @@ module Plum # Appends a 8-bit unsigned integer to this string. def push_uint8(val) - self << [val].pack("C") + self << val.chr end # Appends a 16-bit unsigned integer to this string. @@ -38,8 +39,7 @@ module Plum # Appends a 24-bit unsigned integer to this string. def push_uint24(val) - push_uint16(val >> 8) - push_uint8(val & ((1 << 8) - 1)) + self << [val / 0x100, val % 0x100].pack("nC") end # Appends a 32-bit unsigned integer to this string. diff --git a/lib/plum/connection.rb b/lib/plum/connection.rb index b82d5d9..a8b2916 100644 --- a/lib/plum/connection.rb +++ b/lib/plum/connection.rb @@ -16,14 +16,14 @@ module Plum initial_window_size: 65535, # octets; <= 2 ** 31 - 1 max_frame_size: 16384, # octets; <= 2 ** 24 - 1 max_header_list_size: (1 << 32) - 1 # Fixnum - } + }.freeze attr_reader :hpack_encoder, :hpack_decoder attr_reader :local_settings, :remote_settings - attr_reader :state, :streams, :io + attr_reader :state, :streams - def initialize(io, local_settings = {}) - @io = io + def initialize(writer, local_settings = {}) + @writer = writer @local_settings = Hash.new {|hash, key| DEFAULT_SETTINGS[key] }.merge!(local_settings) @remote_settings = Hash.new {|hash, key| DEFAULT_SETTINGS[key] } @buffer = String.new @@ -33,24 +33,18 @@ module Plum @hpack_encoder = HPACK::Encoder.new(@remote_settings[:header_table_size]) initialize_flow_control(send: @remote_settings[:initial_window_size], recv: @local_settings[:initial_window_size]) + @max_odd_stream_id = 0 + @max_even_stream_id = 0 end private :initialize - # Starts communication with the peer. It blocks until the io is closed, or reaches EOF. - def run - while !@io.closed? && !@io.eof? - receive @io.readpartial(1024) - end - end - - # Closes the io. + # Emits :close event. Doesn't actually close socket. def close # TODO: server MAY wait streams - @io.close + callback(:close) end # Receives the specified data and process. - # # @param new_data [String] The data received from the peer. def receive(new_data) return if new_data.empty? @@ -72,10 +66,9 @@ module Plum alias << receive # Reserves a new stream to server push. - # # @param args [Hash] The argument to pass to Stram.new. def reserve_stream(**args) - next_id = (@streams.keys.select(&:even?).max || 0) + 2 + next_id = @max_even_stream_id + 2 stream = new_stream(next_id, state: :reserved_local, **args) stream end @@ -83,7 +76,7 @@ module Plum private def send_immediately(frame) callback(:send_frame, frame) - @io.write(frame.assemble) + @writer.call(frame.assemble) end def negotiate! @@ -99,9 +92,15 @@ module Plum end def new_stream(stream_id, **args) + if stream_id.even? + @max_even_stream_id = stream_id + else + @max_odd_stream_id = stream_id + end + stream = Stream.new(self, stream_id, **args) - callback(:stream, stream) @streams[stream_id] = stream + callback(:stream, stream) stream end @@ -115,14 +114,14 @@ module Plum raise ConnectionError.new(:protocol_error) end - if frame.flags.include?(:end_headers) + if frame.end_headers? @state = :open @continuation_id = nil end end - if [:headers].include?(frame.type) - if !frame.flags.include?(:end_headers) + if frame.type == :headers + if !frame.end_headers? @state = :waiting_continuation @continuation_id = frame.stream_id end @@ -136,10 +135,8 @@ module Plum if frame.stream_id == 0 receive_control_frame(frame) else - if @streams.key?(frame.stream_id) - stream = @streams[frame.stream_id] - else - if frame.stream_id.even? || (@streams.size > 0 && @streams.keys.select(&:odd?).max >= frame.stream_id) + unless stream = @streams[frame.stream_id] + if frame.stream_id.even? || @max_odd_stream_id >= frame.stream_id raise Plum::ConnectionError.new(:protocol_error) end @@ -173,7 +170,7 @@ module Plum end def receive_settings(frame, send_ack: true) - if frame.flags.include?(:ack) + if frame.ack? raise ConnectionError.new(:frame_size_error) if frame.length != 0 callback(:settings_ack) return @@ -203,7 +200,7 @@ module Plum def receive_ping(frame) raise Plum::ConnectionError.new(:frame_size_error) if frame.length != 8 - if frame.flags.include?(:ack) + if frame.ack? callback(:ping_ack) else opaque_data = frame.payload diff --git a/lib/plum/connection_utils.rb b/lib/plum/connection_utils.rb index 96128cf..8c96267 100644 --- a/lib/plum/connection_utils.rb +++ b/lib/plum/connection_utils.rb @@ -4,7 +4,6 @@ using Plum::BinaryString module Plum module ConnectionUtils # Sends local settings to the peer. - # # @param kwargs [Hash<Symbol, Integer>] def settings(**kwargs) send_immediately Frame.settings(**kwargs) @@ -12,7 +11,6 @@ module Plum end # Sends a PING frame to the peer. - # # @param data [String] Must be 8 octets. # @raise [ArgumentError] If the data is not 8 octets. def ping(data = "plum\x00\x00\x00\x00") @@ -20,10 +18,9 @@ module Plum end # Sends GOAWAY frame to the peer and closes the connection. - # # @param error_type [Symbol] The error type to be contained in the GOAWAY frame. def goaway(error_type = :no_error) - last_id = @streams.keys.max || 0 + last_id = @max_odd_stream_id > @max_even_stream_id ? @max_odd_stream_id : @max_even_stream_id send_immediately Frame.goaway(last_id, error_type) end diff --git a/lib/plum/errors.rb b/lib/plum/errors.rb index d669ef0..9df4668 100644 --- a/lib/plum/errors.rb +++ b/lib/plum/errors.rb @@ -18,7 +18,7 @@ module Plum enhance_your_calm: 0x0b, inadequate_security: 0x0c, http_1_1_required: 0x0d - } + }.freeze attr_reader :http2_error_type diff --git a/lib/plum/event_emitter.rb b/lib/plum/event_emitter.rb index 39e97c9..7bc9695 100644 --- a/lib/plum/event_emitter.rb +++ b/lib/plum/event_emitter.rb @@ -2,19 +2,21 @@ module Plum module EventEmitter # Registers an event handler to specified event. An event can have multiple handlers. - # @param name [String] The name of event. + # @param name [Symbol] The name of event. # @yield Gives event-specific parameters. def on(name, &blk) - callbacks[name] << blk + (callbacks[name] ||= []) << blk end - private + # Invokes an event and call handlers with args. + # @param name [Symbol] The identifier of event. def callback(name, *args) - callbacks[name].each {|cb| cb.call(*args) } + (cbs = callbacks[name]) && cbs.each {|cb| cb.call(*args) } end + private def callbacks - @callbacks ||= Hash.new {|hash, key| hash[key] = [] } + @callbacks ||= {} end end end diff --git a/lib/plum/flow_control.rb b/lib/plum/flow_control.rb index 8f5f48d..cfb181d 100644 --- a/lib/plum/flow_control.rb +++ b/lib/plum/flow_control.rb @@ -6,25 +6,29 @@ module Plum attr_reader :send_remaining_window, :recv_remaining_window # Sends frame respecting inner-stream flow control. - # # @param frame [Frame] The frame to be sent. def send(frame) - case frame.type - when :data + if frame.type == :data @send_buffer << frame - callback(:send_deferred, frame) if @send_remaining_window < frame.length - consume_send_buffer + if @send_remaining_window < frame.length + if Stream === self + connection.callback(:send_deferred, self, frame) + else + callback(:send_deferred, self, frame) + end + else + consume_send_buffer + end else send_immediately frame end end # Increases receiving window size. Sends WINDOW_UPDATE frame to the peer. - # # @param wsi [Integer] The amount to increase receiving window size. The legal range is 1 to 2^32-1. def window_update(wsi) @recv_remaining_window += wsi - payload = String.new.push_uint32(wsi & ~(1 << 31)) + payload = String.new.push_uint32(wsi) sid = (Stream === self) ? self.id : 0 send_immediately Frame.new(type: :window_update, stream_id: sid, payload: payload) end @@ -58,8 +62,7 @@ module Plum end def consume_recv_window(frame) - case frame.type - when :data + if frame.type == :data @recv_remaining_window -= frame.length if @recv_remaining_window < 0 local_error = (Connection === self) ? ConnectionError : StreamError @@ -83,15 +86,19 @@ module Plum end r_wsi = frame.payload.uint32 - r = r_wsi >> 31 - wsi = r_wsi & ~(1 << 31) + # r = r_wsi >> 31 # currently not used + wsi = r_wsi # & ~(1 << 31) if wsi == 0 local_error = (Connection === self) ? ConnectionError : StreamError raise local_error.new(:protocol_error) end - callback(:window_update, wsi) + if Stream === self + connection.callback(:window_update, self, wsi) + else + callback(:window_update, self, wsi) + end @send_remaining_window += wsi consume_send_buffer diff --git a/lib/plum/frame.rb b/lib/plum/frame.rb index 3c069fe..fdd16ef 100644 --- a/lib/plum/frame.rb +++ b/lib/plum/frame.rb @@ -17,37 +17,43 @@ module Plum goaway: 0x07, window_update: 0x08, continuation: 0x09 - } + }.freeze + + # @!visibility private + FRAME_TYPES_INVERSE = FRAME_TYPES.invert.freeze FRAME_FLAGS = { data: { end_stream: 0x01, padded: 0x08 - }, + }.freeze, headers: { end_stream: 0x01, end_headers: 0x04, padded: 0x08, priority: 0x20 - }, - priority: {}, - rst_stream: {}, + }.freeze, + priority: {}.freeze, + rst_stream: {}.freeze, settings: { ack: 0x01 - }, + }.freeze, push_promise: { end_headers: 0x04, padded: 0x08 - }, + }.freeze, ping: { ack: 0x01 - }, - goaway: {}, - window_update: {}, + }.freeze, + goaway: {}.freeze, + window_update: {}.freeze, continuation: { end_headers: 0x04 - } - } + }.freeze + }.freeze + + # @!visibility private + FRAME_FLAGS_MAP = FRAME_FLAGS.values.inject(:merge).freeze SETTINGS_TYPE = { header_table_size: 0x01, @@ -56,7 +62,7 @@ module Plum initial_window_size: 0x04, max_frame_size: 0x05, max_header_list_size: 0x06 - } + }.freeze # RFC7540: 4.1 Frame format # +-----------------------------------------------+ @@ -73,34 +79,35 @@ module Plum attr_accessor :type_value # [Integer] Flags. 8-bit attr_accessor :flags_value - # [Integer] Stream Identifier. unsigned 31-bit integer - attr_accessor :stream_id - # [String] The payload. - attr_accessor :payload + # [Integer] Stream Identifier. Unsigned 31-bit integer + attr_reader :stream_id + # [String] The payload. Value is frozen. + attr_reader :payload def initialize(type: nil, type_value: nil, flags: nil, flags_value: nil, stream_id: nil, payload: nil) - self.payload = (payload || "") - self.type_value = type_value or self.type = type - self.flags_value = flags_value or self.flags = flags - self.stream_id = stream_id or raise ArgumentError.new("stream_id is necessary") + @payload = payload || "" + @length = @payload.bytesize + @type_value = type_value or self.type = type + @flags_value = flags_value or self.flags = flags + @stream_id = stream_id or raise ArgumentError.new("stream_id is necessary") end # Returns the length of payload. # @return [Integer] The length. def length - @payload.bytesize + @length end # Returns the type of the frame in Symbol. # @return [Symbol] The type. def type - FRAME_TYPES.key(type_value) || ("unknown_%02x" % type_value).to_sym + FRAME_TYPES_INVERSE[@type_value] || ("unknown_%02x" % @type_value).to_sym end # Sets the frame type. # @param value [Symbol] The type. def type=(value) - self.type_value = FRAME_TYPES[value] or raise ArgumentError.new("unknown frame type: #{value}") + @type_value = FRAME_TYPES[value] or raise ArgumentError.new("unknown frame type: #{value}") end # Returns the set flags on the frame. @@ -108,26 +115,36 @@ module Plum def flags fs = FRAME_FLAGS[type] [0x01, 0x02, 0x04, 0x08, 0x10, 0x20, 0x40, 0x80] - .select {|v| flags_value & v > 0 } + .select {|v| @flags_value & v > 0 } .map {|val| fs && fs.key(val) || ("unknown_%02x" % val).to_sym } end # Sets the frame flags. - # @param value [Array<Symbol>] The flags. - def flags=(value) - self.flags_value = (value && value.map {|flag| FRAME_FLAGS[self.type][flag] }.inject(:|) || 0) + # @param values [Array<Symbol>] The flags. + def flags=(values) + val = 0 + FRAME_FLAGS_MAP.values_at(*values).each { |c| + val |= c if c + } + @flags_value = val end + # Frame#flag_name?() == Frame#flags().include?(:flag_name) + FRAME_FLAGS_MAP.each { |name, value| + class_eval <<-EOS, __FILE__, __LINE__ + 1 + def #{name}? + @flags_value & #{value} > 0 + end + EOS + } + # Assembles the frame into binary representation. # @return [String] Binary representation of this frame. def assemble - bytes = String.new - bytes.push_uint24(length) - bytes.push_uint8(type_value) - bytes.push_uint8(flags_value) - bytes.push_uint32(stream_id & ~(1 << 31)) # first bit is reserved (MUST be 0) - bytes.push(payload) - bytes + [length / 0x100, length % 0x100, + @type_value, + @flags_value, + @stream_id].pack("nCCCN") << @payload end # @private @@ -136,27 +153,22 @@ module Plum end # Parses a frame from given buffer. It changes given buffer. - # - # @param buffer [String] The buffer stored the data received from peer. + # @param buffer [String] The buffer stored the data received from peer. Encoding must be Encoding::BINARY. # @return [Frame, nil] The parsed frame or nil if the buffer is imcomplete. def self.parse!(buffer) return nil if buffer.bytesize < 9 # header: 9 bytes length = buffer.uint24 return nil if buffer.bytesize < 9 + length - bhead = buffer.byteshift(9) - payload = buffer.byteshift(length) - - type_value = bhead.uint8(3) - flags_value = bhead.uint8(4) - r_sid = bhead.uint32(5) - r = r_sid >> 31 - stream_id = r_sid & ~(1 << 31) + cur = buffer.byteshift(9 + length) + type_value, flags_value, r_sid = cur.byteslice(3, 6).unpack("CCN") + # r = r_sid >> 31 # currently not used + stream_id = r_sid # & ~(1 << 31) self.new(type_value: type_value, flags_value: flags_value, stream_id: stream_id, - payload: payload).freeze + payload: cur.byteslice(9, length)).freeze end end end diff --git a/lib/plum/frame_factory.rb b/lib/plum/frame_factory.rb index eff0718..51908ec 100644 --- a/lib/plum/frame_factory.rb +++ b/lib/plum/frame_factory.rb @@ -3,11 +3,19 @@ using Plum::BinaryString module Plum module FrameFactory + # Creates a RST_STREAM frame. + # @param stream_id [Integer] The stream ID. + # @param error_type [Symbol] The error type defined in RFC 7540 Section 7. def rst_stream(stream_id, error_type) payload = String.new.push_uint32(HTTPError::ERROR_CODES[error_type]) Frame.new(type: :rst_stream, stream_id: stream_id, payload: payload) end + # Creates a GOAWAY frame. + # @param last_id [Integer] The biggest processed stream ID. + # @param error_type [Symbol] The error type defined in RFC 7540 Section 7. + # @param message [String] Additional debug data. + # @see RFC 7540 Section 6.8 def goaway(last_id, error_type, message = "") payload = String.new.push_uint32((last_id || 0) | (0 << 31)) .push_uint32(HTTPError::ERROR_CODES[error_type]) @@ -15,15 +23,24 @@ module Plum Frame.new(type: :goaway, stream_id: 0, payload: payload) end + # Creates a SETTINGS frame. + # @param ack [Symbol] Pass :ack to create an ACK frame. + # @param args [Hash<Symbol, Integer>] The settings values to send. def settings(ack = nil, **args) payload = args.inject(String.new) {|payload, (key, value)| id = Frame::SETTINGS_TYPE[key] or raise ArgumentError.new("invalid settings type") payload.push_uint16(id) payload.push_uint32(value) } - Frame.new(type: :settings, stream_id: 0, flags: [ack].compact, payload: payload) + Frame.new(type: :settings, stream_id: 0, flags: [ack], payload: payload) end + # Creates a PING frame. + # @overload ping(ack, payload) + # @param ack [Symbol] Pass :ack to create an ACK frame. + # @param payload [String] 8 bytes length data to send. + # @overload ping(payload = "plum\x00\x00\x00\x00") + # @param payload [String] 8 bytes length data to send. def ping(arg1 = "plum\x00\x00\x00\x00", arg2 = nil) if !arg2 raise ArgumentError.new("data must be 8 octets") if arg1.bytesize != 8 @@ -33,22 +50,39 @@ module Plum end end + # Creates a DATA frame. + # @param stream_id [Integer] The stream ID. + # @param payload [String] Payload. + # @param flags [Array<Symbol>] Flags. def data(stream_id, payload, *flags) - Frame.new(type: :data, stream_id: stream_id, flags: flags.compact, payload: payload.to_s) + Frame.new(type: :data, stream_id: stream_id, flags: flags, payload: payload) end + # Creates a DATA frame. + # @param stream_id [Integer] The stream ID. + # @param encoded [String] Headers. + # @param flags [Array<Symbol>] Flags. def headers(stream_id, encoded, *flags) - Frame.new(type: :headers, stream_id: stream_id, flags: flags.compact, payload: encoded) + Frame.new(type: :headers, stream_id: stream_id, flags: flags, payload: encoded) end + # Creates a PUSH_PROMISE frame. + # @param stream_id [Integer] The stream ID. + # @param new_id [Integer] The stream ID to create. + # @param encoded [String] Request headers. + # @param flags [Array<Symbol>] Flags. def push_promise(stream_id, new_id, encoded, *flags) - payload = String.new.push_uint32(0 << 31 | new_id) + payload = String.new.push_uint32(new_id) .push(encoded) - Frame.new(type: :push_promise, stream_id: stream_id, flags: flags.compact, payload: payload) + Frame.new(type: :push_promise, stream_id: stream_id, flags: flags, payload: payload) end + # Creates a CONTINUATION frame. + # @param stream_id [Integer] The stream ID. + # @param payload [String] Payload. + # @param flags [Array<Symbol>] Flags. def continuation(stream_id, payload, *flags) - Frame.new(type: :continuation, stream_id: stream_id, flags: flags.compact, payload: payload) + Frame.new(type: :continuation, stream_id: stream_id, flags: flags, payload: payload) end end end diff --git a/lib/plum/frame_utils.rb b/lib/plum/frame_utils.rb index 994f7a0..20c79de 100644 --- a/lib/plum/frame_utils.rb +++ b/lib/plum/frame_utils.rb @@ -4,7 +4,6 @@ using Plum::BinaryString module Plum module FrameUtils # Splits the DATA frame into multiple frames if the payload size exceeds max size. - # # @param max [Integer] The maximum size of a frame payload. # @return [Array<Frame>] The splitted frames. def split_data(max) @@ -19,7 +18,6 @@ module Plum end # Splits the HEADERS or PUSH_PROMISE frame into multiple frames if the payload size exceeds max size. - # # @param max [Integer] The maximum size of a frame payload. # @return [Array<Frame>] The splitted frames. def split_headers(max) @@ -35,7 +33,6 @@ module Plum end # Parses SETTINGS frame payload. Ignores unknown settings type (see RFC7540 6.5.2). - # # @return [Hash<Symbol, Integer>] The parsed strings. def parse_settings settings = {} diff --git a/lib/plum/hpack/constants.rb b/lib/plum/hpack/constants.rb index 26247fe..4eac05c 100644 --- a/lib/plum/hpack/constants.rb +++ b/lib/plum/hpack/constants.rb @@ -4,7 +4,7 @@ module Plum # RFC7541 Appendix A # index is starting from 0 STATIC_TABLE = [ - [":authority"], + [":authority", ""], [":method", "GET"], [":method", "POST"], [":path", "/"], @@ -20,52 +20,52 @@ module Plum [":status", "500"], ["accept-charset"], ["accept-encoding", "gzip, deflate"], - ["accept-language"], - ["accept-ranges"], - ["accept"], - ["access-control-allow-origin"], - ["age"], - ["allow"], - ["authorization"], - ["cache-control"], - ["content-disposition"], - ["content-encoding"], - ["content-language"], - ["content-length"], - ["content-location"], - ["content-range"], - ["content-type"], - ["cookie"], - ["date"], - ["etag"], - ["expect"], - ["expires"], - ["from"], - ["host"], - ["if-match"], - ["if-modified-since"], - ["if-none-match"], - ["if-range"], - ["if-unmodified-since"], - ["last-modified"], - ["link"], - ["location"], - ["max-forwards"], - ["proxy-authenticate"], - ["proxy-authorization"], - ["range"], - ["referer"], - ["refresh"], - ["retry-after"], - ["server"], - ["set-cookie"], - ["strict-transport-security"], - ["transfer-encoding"], - ["user-agent"], - ["vary"], - ["via"], - ["www-authenticate"], - ] + ["accept-language", ""], + ["accept-ranges", ""], + ["accept", ""], + ["access-control-allow-origin", ""], + ["age", ""], + ["allow", ""], + ["authorization", ""], + ["cache-control", ""], + ["content-disposition", ""], + ["content-encoding", ""], + ["content-language", ""], + ["content-length", ""], + ["content-location", ""], + ["content-range", ""], + ["content-type", ""], + ["cookie", ""], + ["date", ""], + ["etag", ""], + ["expect", ""], + ["expires", ""], + ["from", ""], + ["host", ""], + ["if-match", ""], + ["if-modified-since", ""], + ["if-none-match", ""], + ["if-range", ""], + ["if-unmodified-since", ""], + ["last-modified", ""], + ["link", ""], + ["location", ""], + ["max-forwards", ""], + ["proxy-authenticate", ""], + ["proxy-authorization", ""], + ["range", ""], + ["referer", ""], + ["refresh", ""], + ["retry-after", ""], + ["server", ""], + ["set-cookie", ""], + ["strict-transport-security", ""], + ["transfer-encoding", ""], + ["user-agent", ""], + ["vary", ""], + ["via", ""], + ["www-authenticate", ""], + ].freeze HUFFMAN_TABLE = [ "1111111111000", @@ -325,8 +325,8 @@ module Plum "111111111111111111111110000", "11111111111111111111101110", "111111111111111111111111111111" - ] + ].freeze - HUFFMAN_TABLE_INVERSED = HUFFMAN_TABLE.each_with_index.to_h + HUFFMAN_TABLE_INVERSED = HUFFMAN_TABLE.each_with_index.to_h.freeze end end diff --git a/lib/plum/hpack/context.rb b/lib/plum/hpack/context.rb index fc67f3e..622fe36 100644 --- a/lib/plum/hpack/context.rb +++ b/lib/plum/hpack/context.rb @@ -17,8 +17,9 @@ module Plum end def store(name, value) - @dynamic_table.unshift([name, value]) - @size += name.bytesize + value.to_s.bytesize + 32 + value = value.to_s + @dynamic_table.unshift([name.freeze, value.freeze]) + @size += name.bytesize + value.bytesize + 32 evict end @@ -35,7 +36,7 @@ module Plum end def search(name, value) - pr = proc {|n, v| + pr = proc { |n, v| n == name && (!value || v == value) } @@ -48,7 +49,7 @@ module Plum def evict while @limit && @size > @limit name, value = @dynamic_table.pop - @size -= name.bytesize + value.to_s.bytesize + 32 + @size -= name.bytesize + value.bytesize + 32 end end end diff --git a/lib/plum/hpack/decoder.rb b/lib/plum/hpack/decoder.rb index c1ae2bb..48546f8 100644 --- a/lib/plum/hpack/decoder.rb +++ b/lib/plum/hpack/decoder.rb @@ -11,42 +11,47 @@ module Plum end def decode(str) - str = str.dup headers = [] - headers << parse!(str) while str.size > 0 - headers.compact + pos = 0 + lpos = str.bytesize + while pos < lpos + l, succ = parse(str, pos) + pos += succ + headers << l if l + end + headers end private - def parse!(str) - first_byte = str.uint8 - if first_byte >= 128 # 0b1XXXXXXX - parse_indexed!(str) - elsif first_byte >= 64 # 0b01XXXXXX - parse_indexing!(str) - elsif first_byte >= 32 # 0b001XXXXX - self.limit = read_integer!(str, 5) - nil + def parse(str, pos) + first_byte = str.getbyte(pos) + if first_byte >= 0x80 # 0b1XXXXXXX + parse_indexed(str, pos) + elsif first_byte >= 0x40 # 0b01XXXXXX + parse_indexing(str, pos) + elsif first_byte >= 0x20 # 0b001XXXXX + self.limit, succ = read_integer(str, pos, 5) + [nil, succ] else # 0b0000XXXX (without indexing) or 0b0001XXXX (never indexing) - parse_no_indexing!(str) + parse_no_indexing(str, pos) end end - def read_integer!(str, prefix_length) - first_byte = str.byteshift(1).uint8 - raise HPACKError.new("integer: end of buffer") unless first_byte + def read_integer(str, pos, prefix_length) + raise HPACKError.new("integer: end of buffer") if str.empty? + first_byte = str.getbyte(pos) - mask = (1 << prefix_length) - 1 - ret = first_byte & mask - return ret if ret < mask + mask = 1 << prefix_length + ret = first_byte % mask + return [ret, 1] if ret != mask - 1 octets = 0 - while next_value = str.byteshift(1).uint8 - ret += (next_value & 0b01111111) << (7 * octets) + while next_value = str.uint8(pos + octets + 1) + ret += (next_value % 0x80) << (7 * octets) octets += 1 - if next_value < 128 - return ret + if next_value < 0x80 + return [ret, 1 + octets] elsif octets == 4 # RFC 7541 5.1 tells us that we MUST have limitation. at least > 2 ** 28 raise HPACKError.new("integer: too large integer") end @@ -55,29 +60,32 @@ module Plum raise HPACKError.new("integer: end of buffer") end - def read_string!(str) - first_byte = str.uint8 - raise HPACKError.new("string: end of buffer") unless first_byte + def read_string(str, pos) + raise HPACKError.new("string: end of buffer") if str.empty? + first_byte = str.uint8(pos) - huffman = (first_byte >> 7) == 1 - length = read_integer!(str, 7) - bin = str.byteshift(length) + huffman = first_byte > 0x80 + length, ilen = read_integer(str, pos, 7) + raise HTTPError.new("string: end of buffer") if str.bytesize < length - raise HTTPError.new("string: end of buffer") if bin.bytesize < length - bin = Huffman.decode(bin) if huffman - bin + bin = str.byteslice(pos + ilen, length) + if huffman + [Huffman.decode(bin), ilen + length] + else + [bin, ilen + length] + end end - def parse_indexed!(str) + def parse_indexed(str, pos) # indexed # +---+---+---+---+---+---+---+---+ # | 1 | Index (7+) | # +---+---------------------------+ - index = read_integer!(str, 7) - fetch(index) + index, succ = read_integer(str, pos, 7) + [fetch(index), succ] end - def parse_indexing!(str) + def parse_indexing(str, pos) # +---+---+---+---+---+---+---+---+ # | 0 | 1 | Index (6+) | # +---+---+-----------------------+ @@ -97,20 +105,21 @@ module Plum # +---+---------------------------+ # | Value String (Length octets) | # +-------------------------------+ - index = read_integer!(str, 6) + index, ilen = read_integer(str, pos, 6) if index == 0 - name = read_string!(str) + name, nlen = read_string(str, pos + ilen) else name, = fetch(index) + nlen = 0 end - val = read_string!(str) + val, vlen = read_string(str, pos + ilen + nlen) store(name, val) - [name, val] + [[name, val], ilen + nlen + vlen] end - def parse_no_indexing!(str) + def parse_no_indexing(str, pos) # +---+---+---+---+---+---+---+---+ # | 0 | 0 | 0 |0,1| Index (4+) | # +---+---+-----------------------+ @@ -130,16 +139,17 @@ module Plum # +---+---------------------------+ # | Value String (Length octets) | # +-------------------------------+ - index = read_integer!(str, 4) + index, ilen = read_integer(str, pos, 4) if index == 0 - name = read_string!(str) + name, nlen = read_string(str, pos + ilen) else name, = fetch(index) + nlen = 0 end - val = read_string!(str) + val, vlen = read_string(str, pos + ilen + nlen) - [name, val] + [[name, val], ilen + nlen + vlen] end end end diff --git a/lib/plum/hpack/encoder.rb b/lib/plum/hpack/encoder.rb index 48ee979..d817589 100644 --- a/lib/plum/hpack/encoder.rb +++ b/lib/plum/hpack/encoder.rb @@ -12,10 +12,10 @@ module Plum @huffman = huffman end def encode(headers) - out = String.new + out = String.new.force_encoding(Encoding::BINARY) headers.each do |name, value| - name = name.to_s.b - value = value.to_s.b + name = name.to_s + value = value.to_s if index = search(name, value) out << encode_indexed(index) elsif index = search(name, nil) @@ -24,7 +24,7 @@ module Plum out << encode_literal(name, value) end end - out.force_encoding(Encoding::BINARY) + out end private @@ -46,7 +46,7 @@ module Plum else fb = "\x00" end - fb.b << encode_string(name) << encode_string(value) + (fb + encode_string(name)) << encode_string(value) end # +---+---+---+---+---+---+---+---+ @@ -59,10 +59,9 @@ module Plum def encode_half_indexed(index, value) if @indexing store(fetch(index)[0], value) - fb = encode_integer(index, 6) - fb.setbyte(0, fb.uint8 | 0b01000000) + fb = encode_integer(index, 6, 0b01000000) else - fb = encode_integer(index, 4) + fb = encode_integer(index, 4, 0b00000000) end fb << encode_string(value) end @@ -71,27 +70,24 @@ module Plum # | 1 | Index (7+) | # +---+---------------------------+ def encode_indexed(index) - s = encode_integer(index, 7) - s.setbyte(0, s.uint8 | 0b10000000) - s + encode_integer(index, 7, 0b10000000) end - def encode_integer(value, prefix_length) + def encode_integer(value, prefix_length, hmask) mask = (1 << prefix_length) - 1 - out = String.new if value < mask - out.push_uint8(value) + (value + hmask).chr.force_encoding(Encoding::BINARY) else + vals = [mask + hmask] value -= mask - out.push_uint8(mask) while value >= mask - out.push_uint8((value % 0b10000000) + 0b10000000) - value >>= 7 + vals << (value % 0x80) + 0x80 + value /= 0x80 end - out.push_uint8(value) + vals << value + vals.pack("C*") end - out.force_encoding(Encoding::BINARY) end def encode_string(str) @@ -105,14 +101,12 @@ module Plum end def encode_string_plain(str) - encode_integer(str.bytesize, 7) << str.force_encoding(Encoding::BINARY) + encode_integer(str.bytesize, 7, 0b00000000) << str end def encode_string_huffman(str) huffman_str = Huffman.encode(str) - lenstr = encode_integer(huffman_str.bytesize, 7) - lenstr.setbyte(0, lenstr.uint8(0) | 0b10000000) - lenstr.force_encoding(Encoding::BINARY) << huffman_str + encode_integer(huffman_str.bytesize, 7, 0b10000000) << huffman_str end end end diff --git a/lib/plum/http_connection.rb b/lib/plum/http_connection.rb index 45c6486..1c30e6e 100644 --- a/lib/plum/http_connection.rb +++ b/lib/plum/http_connection.rb @@ -3,12 +3,28 @@ using Plum::BinaryString module Plum class HTTPConnection < Connection - def initialize(io, local_settings = {}) + attr_reader :sock + + def initialize(sock, local_settings = {}) require "http/parser" - super @_headers = nil @_body = String.new @_http_parser = setup_parser + @sock = sock + super(@sock.method(:write), local_settings) + end + + # Starts communication with the peer. It blocks until the io is closed, or reaches EOF. + def run + while !@sock.closed? && !@sock.eof? + self << @sock.readpartial(1024) + end + end + + # Closes the socket. + def close + super + @sock.close end private @@ -56,7 +72,7 @@ module Plum "Server: plum/#{Plum::VERSION}\r\n" "\r\n" - io.write(resp) + @sock.write(resp) end def process_first_request diff --git a/lib/plum/https_connection.rb b/lib/plum/https_connection.rb index 6ddf189..c719c2e 100644 --- a/lib/plum/https_connection.rb +++ b/lib/plum/https_connection.rb @@ -1,16 +1,32 @@ # -*- frozen-string-literal: true -*- module Plum class HTTPSConnection < Connection - def initialize(io, local_settings = {}) - if io.respond_to?(:cipher) # OpenSSL::SSL::SSLSocket-like - if CIPHER_BLACKLIST.include?(io.cipher.first) # [cipher-suite, ssl-version, keylen, alglen] - self.on(:negotiated) { + attr_reader :sock + + def initialize(sock, local_settings = {}) + @sock = sock + super(@sock.method(:write), local_settings) + end + + # Starts communication with the peer. It blocks until the io is closed, or reaches EOF. + def run + if @sock.respond_to?(:cipher) # OpenSSL::SSL::SSLSocket-like + if CIPHER_BLACKLIST.include?(@sock.cipher.first) # [cipher-suite, ssl-version, keylen, alglen] + on(:negotiated) { raise ConnectionError.new(:inadequate_security) } end end + while !@sock.closed? && !@sock.eof? + self << @sock.readpartial(1024) + end + end + + # Closes the socket. + def close super + @sock.close end CIPHER_BLACKLIST = %w( @@ -27,6 +43,6 @@ module Plum AECDH-NULL-SHA AECDH-RC4-SHA AECDH-DES-CBC3-SHA AECDH-AES128-SHA AECDH-AES256-SHA SRP-3DES-EDE-CBC-SHA SRP-RSA-3DES-EDE-CBC-SHA SRP-DSS-3DES-EDE-CBC-SHA SRP-AES-128-CBC-SHA SRP-RSA-AES-128-CBC-SHA SRP-DSS-AES-128-CBC-SHA SRP-AES-256-CBC-SHA SRP-RSA-AES-256-CBC-SHA SRP-DSS-AES-256-CBC-SHA ECDHE-ECDSA-AES128-SHA256 ECDHE-ECDSA-AES256-SHA384 ECDH-ECDSA-AES128-SHA256 ECDH-ECDSA-AES256-SHA384 ECDHE-RSA-AES128-SHA256 ECDHE-RSA-AES256-SHA384 ECDH-RSA-AES128-SHA256 ECDH-RSA-AES256-SHA384 ECDH-ECDSA-AES128-GCM-SHA256 ECDH-ECDSA-AES256-GCM-SHA384 ECDH-RSA-AES128-GCM-SHA256 ECDH-RSA-AES256-GCM-SHA384 - ) + ).freeze end end diff --git a/lib/plum/rack.rb b/lib/plum/rack.rb new file mode 100644 index 0000000..da0ae8a --- /dev/null +++ b/lib/plum/rack.rb @@ -0,0 +1,10 @@ +require "logger" +require "stringio" +require "plum" +require "rack" +require "rack/handler/plum" +require "plum/rack/config" +require "plum/rack/dsl" +require "plum/rack/listener" +require "plum/rack/server" +require "plum/rack/connection" diff --git a/lib/plum/rack/cli.rb b/lib/plum/rack/cli.rb new file mode 100644 index 0000000..18fe90f --- /dev/null +++ b/lib/plum/rack/cli.rb @@ -0,0 +1,131 @@ +# -*- frozen-string-literal: true -*- +require "optparse" +require "rack/builder" + +module Plum + module Rack + # CLI runner. Parses command line options and start ::Plum::Rack::Server. + class CLI + # Creates new CLI runner and parses command line. + # + # @param argv [Array<String>] ARGV + def initialize(argv) + @argv = argv + @options = {} + + parse! + end + + # Starts ::Plum::Rack::Server + def run + @server.start + end + + private + def parse! + @parser = setup_parser + @parser.parse!(@argv) + + config = transform_options + # TODO: parse rack_opts? + rack_app, rack_opts = ::Rack::Builder.parse_file(@argv.shift || "config.ru") + + @server = Plum::Rack::Server.new(rack_app, config) + end + + def transform_options + if @options[:config] + dsl = DSL::Config.new.instance_eval(File.read(@options[:config])) + config = dsl.config + else + config = Config.new + end + + ENV["RACK_ENV"] = @options[:env] if @options[:env] + config[:debug] = @options[:debug] unless @options[:debug].nil? + config[:server_push] = @options[:server_push] unless @options[:server_push].nil? + + if @options[:socket] + config[:listeners] << { listener: UNIXListener, + path: @options[:socket] } + end + + if !@options[:socket] || @options[:host] || @options[:port] + if @options[:tls] == false + config[:listeners] << { listener: TCPListener, + hostname: @options[:host] || "0.0.0.0", + port: @options[:port] || 8080 } + else + config[:listeners] << { listener: TLSListener, + hostname: @options[:host] || "0.0.0.0", + port: @options[:port] || 8080, + certificate: @options[:cert] && File.read(@options[:cert]), + certificate_key: @options[:cert] && File.read(@options[:key]) } + end + end + + config + end + + def setup_parser + parser = OptionParser.new do |o| + o.on "-C", "--config PATH", "Load PATH as a config" do |arg| + @options[:config] = arg + end + + o.on "-D", "--debug", "Run puma in debug mode" do + @options[:debug] = true + end + + o.on "-e", "--environment ENV", "Rack environment (default: development)" do |arg| + @options[:env] = arg + end + + o.on "-a", "--address HOST", "Bind to host HOST (default: 0.0.0.0)" do |arg| + @options[:host] = arg + end + + o.on "-p", "--port PORT", "Bind to port PORT (default: 8080)" do |arg| + @options[:port] = arg.to_i + end + + o.on "-S", "--socket PATH", "Bind to UNIX domain socket" do |arg| + @options[:socket] = arg + end + + o.on "--http", "Use http URI scheme (use raw TCP)" do |arg| + @options[:tls] = false + end + + o.on "--https", "Use https URI scheme (use TLS; default)" do |arg| + @options[:tls] = true + end + + o.on "--server-push BOOL", "Enable HTTP/2 server push" do |arg| + @options[:server_push] = arg != "false" + end + + o.on "--cert PATH", "Use PATH as server certificate" do |arg| + @options[:cert] = arg + end + + o.on "--key PATH", "Use PATH as server certificate's private key" do |arg| + @options[:key] = arg + end + + o.on "-v", "--version", "Show version" do + puts "plum version #{::Plum::VERSION}" + exit(0) + end + + o.on "-h", "--help", "Show this message" do + puts o + exit(0) + end + + o.banner = "plum [options] [rackup config file]" + end + end + end + end +end diff --git a/lib/plum/rack/config.rb b/lib/plum/rack/config.rb new file mode 100644 index 0000000..b75fd08 --- /dev/null +++ b/lib/plum/rack/config.rb @@ -0,0 +1,29 @@ +# -*- frozen-string-literal: true -*- +module Plum + module Rack + class Config + DEFAULT_CONFIG = { + listeners: [], + debug: false, + log: nil, # $stdout + server_push: true + }.freeze + + def initialize(config = {}) + @config = DEFAULT_CONFIG.merge(config) + end + + def [](key) + @config[key] + end + + def []=(key, value) + @config[key] = value + end + + def to_s + @config.to_s + end + end + end +end diff --git a/lib/plum/rack/connection.rb b/lib/plum/rack/connection.rb new file mode 100644 index 0000000..0c53e3d --- /dev/null +++ b/lib/plum/rack/connection.rb @@ -0,0 +1,184 @@ +# -*- frozen-string-literal: true -*- +using Plum::BinaryString + +module Plum + module Rack + class Connection + attr_reader :app, :plum + + def initialize(app, plum, logger) + @app = app + @plum = plum + @logger = logger + + setup_plum + end + + def stop + @plum.close + end + + def run + begin + @plum.run + rescue Errno::EPIPE, Errno::ECONNRESET => e + rescue StandardError => e + @logger.error("#{e.class}: #{e.message}\n#{e.backtrace.map { |b| "\t#{b}" }.join("\n")}") + end + end + + private + def setup_plum + @plum.on(:connection_error) { |ex| @logger.error(ex) } + + # @plum.on(:stream) { |stream| @logger.debug("new stream: #{stream}") } + @plum.on(:stream_error) { |stream, ex| @logger.error(ex) } + + reqs = {} + @plum.on(:headers) { |stream, h| + reqs[stream] = { headers: h, data: String.new.force_encoding(Encoding::BINARY) } + } + + @plum.on(:data) { |stream, d| + reqs[stream][:data] << d # TODO: store to file? + } + + @plum.on(:end_stream) { |stream| + handle_request(stream, reqs[stream][:headers], reqs[stream][:data]) + } + end + + def send_body(stream, body) + begin + if body.is_a?(IO) + stream.send_data(body, end_stream: true) + elsif body.respond_to?(:size) + last = body.size - 1 + i = 0 + body.each { |part| + stream.send_data(part, end_stream: last == i) + i += 1 + } + else + body.each { |part| stream.send_data(part, end_stream: false) } + stream.send_data(nil, end_stream: true) + end + ensure + body.close if body.respond_to?(:close) + end + end + + def extract_push(reqheaders, extheaders) + if pushs = extheaders["plum.serverpush"] + authority = reqheaders.find { |k, v| k == ":authority" }[1] + scheme = reqheaders.find { |k, v| k == ":scheme" }[1] + + pushs.split(";").map { |push| + method, path = push.split(" ", 2) + { + ":authority" => authority, + ":method" => method.to_s.upcase, + ":scheme" => scheme, + ":path" => path + } + } + else + [] + end + end + + def handle_request(stream, headers, data) + env = new_env(headers, data) + r_status, r_rawheaders, r_body = @app.call(env) + r_headers, r_extheaders = extract_headers(r_status, r_rawheaders) + + stream.send_headers(r_headers, end_stream: false) + + push_sts = extract_push(headers, r_extheaders).map { |preq| + [stream.promise(preq), preq] + } + + send_body(stream, r_body) + + push_sts.each { |st, preq| + penv = new_env(preq, "") + p_status, p_h, p_body = @app.call(penv) + p_headers = extract_headers(p_status, p_h) + st.send_headers(p_headers, end_stream: false) + send_body(st, p_body) + } + end + + def new_env(h, data) + ebase = { + "SCRIPT_NAME" => "", + "rack.version" => ::Rack::VERSION, + "rack.input" => StringIO.new(data), + "rack.errors" => $stderr, + "rack.multithread" => true, + "rack.multiprocess" => false, + "rack.run_once" => false, + "rack.hijack?" => false, + } + + h.each { |k, v| + case k + when ":method" + ebase["REQUEST_METHOD"] = v + when ":path" + cpath_name, cpath_query = v.split("?", 2) + ebase["PATH_INFO"] = cpath_name + ebase["QUERY_STRING"] = cpath_query || "" + when ":authority" + chost, cport = v.split(":", 2) + ebase["SERVER_NAME"] = chost + ebase["SERVER_PORT"] = (cport || 443).to_i + when ":scheme" + ebase["rack.url_scheme"] = v + else + if k.start_with?(":") + # unknown HTTP/2 pseudo-headers + else + if "cookie" == k && ebase["HTTP_COOKIE"] + if ebase["HTTP_COOKIE"].frozen? + (ebase["HTTP_COOKIE"] += "; ") << v + else + ebase["HTTP_COOKIE"] << "; " << v + end + else + ebase["HTTP_" + k.tr("-", "_").upcase!] = v + end + end + end + } + + ebase + end + + def extract_headers(r_status, r_h) + rbase = { + ":status" => r_status, + "server" => "plum/#{::Plum::VERSION}", + } + rext = {} + + r_h.each do |key, v_| + if key.include?(".") + rext[key] = v_ + else + key = key.downcase + + if "set-cookie" == key + rbase[key] = v_.gsub("\n", "; ") # RFC 7540 8.1.2.5 + else + key.byteshift(2) if key.start_with?("x-") + rbase[key] = v_.tr("\n", ",") # RFC 7230 7 + end + end + end + + [rbase, rext] + end + end + end +end diff --git a/lib/plum/rack/dsl.rb b/lib/plum/rack/dsl.rb new file mode 100644 index 0000000..eb2ba17 --- /dev/null +++ b/lib/plum/rack/dsl.rb @@ -0,0 +1,45 @@ +# -*- frozen-string-literal: true -*- +module Plum + module Rack + module DSL + class Config + attr_reader :config + + def initialize + @config = ::Plum::Rack::Config::DEFAULT_CONFIG.dup + end + + def log(out) + if out.is_a?(String) + @config[:log] = File.open(out, "a") + else + @config[:log] = out + end + end + + def debug(bool) + @config[:debug] = !!bool + end + + def listener(type, conf) + case type + when :unix + lc = conf.merge(listener: UNIXListener) + when :tcp + lc = conf.merge(listener: TCPListener) + when :tls + lc = conf.merge(listener: TLSListener) + else + raise "Unknown listener type: #{type} (known type: :unix, :http, :https)" + end + + @config[:listeners] << lc + end + + def server_push(bool) + @config[:server_push] = !!bool + end + end + end + end +end diff --git a/lib/plum/rack/listener.rb b/lib/plum/rack/listener.rb new file mode 100644 index 0000000..31bbc8c --- /dev/null +++ b/lib/plum/rack/listener.rb @@ -0,0 +1,123 @@ +# -*- frozen-string-literal: true -*- +module Plum + module Rack + class BaseListener + def stop + @server.close + end + + def to_io + raise "not implemented" + end + + def method_missing(name, *args) + @server.__send__(name, *args) + end + end + + class TCPListener < BaseListener + def initialize(lc) + @server = ::TCPServer.new(lc[:hostname], lc[:port]) + end + + def to_io + @server.to_io + end + + def plum(sock) + ::Plum::HTTPConnection.new(sock) + end + end + + class TLSListener < BaseListener + def initialize(lc) + cert, key = lc[:certificate], lc[:certificate_key] + unless cert && key + puts "WARNING: using dummy certificate" + cert, key = dummy_key + end + + ctx = OpenSSL::SSL::SSLContext.new + ctx.ssl_version = :TLSv1_2 + ctx.alpn_select_cb = -> protocols { + raise "Client does not support HTTP/2: #{protocols}" unless protocols.include?("h2") + "h2" + } + ctx.tmp_ecdh_callback = -> (sock, ise, keyl) { OpenSSL::PKey::EC.new("prime256v1") } + ctx.cert = OpenSSL::X509::Certificate.new(cert) + ctx.key = OpenSSL::PKey::RSA.new(key) + tcp_server = ::TCPServer.new(lc[:hostname], lc[:port]) + @server = OpenSSL::SSL::SSLServer.new(tcp_server, ctx) + @server.start_immediately = false + end + + def to_io + @server.to_io + end + + def plum(sock) + ::Plum::HTTPSConnection.new(sock) + end + + private + # returns: [cert, key] + def dummy_key + puts "WARNING: Generating new dummy certificate..." + + key = OpenSSL::PKey::RSA.new(2048) + cert = OpenSSL::X509::Certificate.new + cert.subject = cert.issuer = OpenSSL::X509::Name.parse("/C=JP/O=Test/OU=Test/CN=example.com") + cert.not_before = Time.now + cert.not_after = Time.now + 363 * 24 * 60 * 60 + cert.public_key = key.public_key + cert.serial = rand((1 << 20) - 1) + cert.version = 2 + + ef = OpenSSL::X509::ExtensionFactory.new + ef.subject_certificate = cert + ef.issuer_certificate = cert + cert.extensions = [ + ef.create_extension("basicConstraints", "CA:TRUE", true), + ef.create_extension("subjectKeyIdentifier", "hash"), + ] + cert.add_extension ef.create_extension("authorityKeyIdentifier", "keyid:always,issuer:always") + + cert.sign key, OpenSSL::Digest::SHA1.new + + [cert, key] + end + end + + class UNIXListener < BaseListener + def initialize(lc) + if File.exist?(lc[:path]) + begin + old = UNIXSocket.new(lc[:path]) + rescue SystemCallError, IOError + File.unlink(lc[:path]) + else + old.close + raise "Already a server bound to: #{lc[:path]}" + end + end + + @server = ::UNIXServer.new(lc[:path]) + + File.chmod(lc[:mode], lc[:path]) if lc[:mode] + end + + def stop + super + File.unlink(lc[:path]) + end + + def to_io + @server.to_io + end + + def plum(sock) + ::Plum::HTTPSConnection.new(sock) + end + end + end +end diff --git a/lib/plum/rack/server.rb b/lib/plum/rack/server.rb new file mode 100644 index 0000000..0f8338c --- /dev/null +++ b/lib/plum/rack/server.rb @@ -0,0 +1,69 @@ +# -*- frozen-string-literal: true -*- +module Plum + module Rack + class Server + def initialize(app, config) + @state = :null + @app = config[:debug] ? ::Rack::CommonLogger.new(app) : app + @logger = Logger.new(config[:log] || $stdout).tap { |l| + l.level = config[:debug] ? Logger::DEBUG : Logger::INFO + } + @listeners = config[:listeners].map { |lc| + lc[:listener].new(lc) + } + + @logger.info("Plum #{::Plum::VERSION}") + @logger.info("Config: #{config}") + end + + def start + @state = :running + while @state == :running + break if @listeners.empty? + begin + if ss = IO.select(@listeners, nil, nil, 2.0) + ss[0].each { |svr| + new_con(svr) + } + end + rescue Errno::EBADF, Errno::ENOTSOCK, IOError => e # closed + rescue StandardError => e + log_exception(e) + end + end + end + + def stop + @state = :stop + @listeners.map(&:stop) + # TODO: gracefully shutdown connections + end + + private + def new_con(svr) + sock = svr.accept + Thread.new { + begin + sock = sock.accept if sock.respond_to?(:accept) + plum = svr.plum(sock) + + con = Connection.new(@app, plum, @logger) + con.run + rescue Errno::ECONNRESET, Errno::ECONNABORTED, Errno::EPROTO, Errno::EINVAL => e # closed + sock.close if sock + rescue StandardError => e + log_exception(e) + sock.close if sock + end + } + rescue Errno::ECONNRESET, Errno::ECONNABORTED, Errno::EPROTO, Errno::EINVAL => e # closed + rescue StandardError => e + log_exception(e) + end + + def log_exception(e) + @logger.error("#{e.class}: #{e.message}\n#{e.backtrace.map { |b| "\t#{b}" }.join("\n")}") + end + end + end +end diff --git a/lib/plum/stream.rb b/lib/plum/stream.rb index e566d05..e67d80e 100644 --- a/lib/plum/stream.rb +++ b/lib/plum/stream.rb @@ -10,25 +10,21 @@ module Plum attr_reader :id, :state, :connection attr_reader :weight, :exclusive attr_accessor :parent + # The child (depending on this stream) streams. + attr_reader :children def initialize(con, id, state: :idle, weight: 16, parent: nil, exclusive: false) @connection = con @id = id @state = state @continuation = [] + @children = Set.new initialize_flow_control(send: @connection.remote_settings[:initial_window_size], recv: @connection.local_settings[:initial_window_size]) update_dependency(weight: weight, parent: parent, exclusive: exclusive) end - # Returns the child (depending on this stream) streams. - # - # @return [Array<Stream>] The child streams. - def children - @connection.streams.values.select {|c| c.parent == self }.freeze - end - # Processes received frames for this stream. Internal use. # @private def receive_frame(frame) @@ -59,7 +55,6 @@ module Plum end # Closes this stream. Sends RST_STREAM frame to the peer. - # # @param error_type [Symbol] The error type to be contained in the RST_STREAM frame. def close(error_type = :no_error) @state = :closed @@ -68,20 +63,30 @@ module Plum private def send_immediately(frame) - callback(:send_frame, frame) @connection.send(frame) end def update_dependency(weight: nil, parent: nil, exclusive: nil) raise StreamError.new(:protocol_error, "A stream cannot depend on itself.") if parent == self - @weight = weight unless weight.nil? - @parent = parent unless parent.nil? - @exclusive = exclusive unless exclusive.nil? - - if exclusive == true - parent.children.each do |child| - next if child == self - child.parent = self + + if weight + @weight = weight + end + + if parent + @parent = parent + @parent.children << self + end + + if exclusive != nil + @exclusive = exclusive + if @parent && exclusive + @parent.children.to_a.each do |child| + next if child == self + @parent.children.delete(child) + child.parent = self + @children << child + end end end end @@ -106,7 +111,7 @@ module Plum raise StreamError.new(:stream_closed) end - if frame.flags.include?(:padded) + if frame.padded? padding_length = frame.payload.uint8(0) if padding_length >= frame.length raise ConnectionError.new(:protocol_error, "padding is too long") @@ -117,17 +122,17 @@ module Plum end callback(:data, body) - receive_end_stream if frame.flags.include?(:end_stream) + receive_end_stream if frame.end_stream? end def receive_complete_headers(frames) - first, *rest = frames + first = frames.shift payload = first.payload first_length = first.length padding_length = 0 - if first.flags.include?(:padded) + if first.padded? padding_length = payload.uint8 first_length -= 1 + padding_length payload = payload.byteslice(1, first_length) @@ -135,7 +140,7 @@ module Plum payload = payload.dup end - if first.flags.include?(:priority) + if first.priority? receive_priority_payload(payload.byteshift(5)) first_length -= 5 end @@ -144,7 +149,7 @@ module Plum raise ConnectionError.new(:protocol_error, "padding is too long") end - rest.each do |frame| + frames.each do |frame| payload << frame.payload end @@ -156,7 +161,7 @@ module Plum callback(:headers, decoded_headers) - receive_end_stream if first.flags.include?(:end_stream) + receive_end_stream if first.end_stream? end def receive_headers(frame) @@ -171,7 +176,7 @@ module Plum @state = :open callback(:open) - if frame.flags.include?(:end_headers) + if frame.end_headers? receive_complete_headers([frame]) else @continuation << frame @@ -182,7 +187,7 @@ module Plum # state error mustn't happen: server_connection validates @continuation << frame - if frame.flags.include?(:end_headers) + if frame.end_headers? receive_complete_headers(@continuation) @continuation.clear end @@ -214,5 +219,11 @@ module Plum callback(:rst_stream, frame) @state = :closed # MUST NOT send RST_STREAM end + + # override EventEmitter + def callback(name, *args) + super(name, *args) + @connection.callback(name, self, *args) + end end end diff --git a/lib/plum/stream_utils.rb b/lib/plum/stream_utils.rb index 999cd87..a8d959f 100644 --- a/lib/plum/stream_utils.rb +++ b/lib/plum/stream_utils.rb @@ -3,9 +3,8 @@ using Plum::BinaryString module Plum module StreamUtils - # Responds to HTTP request. - # - # @param headers [Hash<String, String>] The response headers. + # Responds to a HTTP request. + # @param headers [Enumerable<String, String>] The response headers. # @param body [String, IO] The response body. def respond(headers, body = nil, end_stream: true) # TODO: priority, padding if body @@ -17,8 +16,7 @@ module Plum end # Reserves a stream to server push. Sends PUSH_PROMISE and create new stream. - # - # @param headers [Hash<String, String>] The *request* headers. It must contain all of them: ':authority', ':method', ':scheme' and ':path'. + # @param headers [Enumerable<String, String>] The *request* headers. It must contain all of them: ':authority', ':method', ':scheme' and ':path'. # @return [Stream] The stream to send push response. def promise(headers) stream = @connection.reserve_stream(weight: self.weight + 1, parent: self) @@ -30,7 +28,9 @@ module Plum stream end - private + # Sends response headers. If the encoded frame is larger than MAX_FRAME_SIZE, the headers will be splitted into HEADERS frame and CONTINUATION frame(s). + # @param headers [Enumerable<String, String>] The response headers. + # @param end_stream [Boolean] Set END_STREAM flag or not. def send_headers(headers, end_stream:) max = @connection.remote_settings[:max_frame_size] encoded = @connection.hpack_encoder.encode(headers) @@ -41,6 +41,9 @@ module Plum @state = :half_closed_local if end_stream end + # Sends DATA frame. If the data is larger than MAX_FRAME_SIZE, DATA frame will be splitted. + # @param data [String, IO] The data to send. + # @param end_stream [Boolean] Set END_STREAM flag or not. def send_data(data, end_stream: true) max = @connection.remote_settings[:max_frame_size] if data.is_a?(IO) diff --git a/lib/plum/version.rb b/lib/plum/version.rb index 44af165..2b37004 100644 --- a/lib/plum/version.rb +++ b/lib/plum/version.rb @@ -1,4 +1,4 @@ # -*- frozen-string-literal: true -*- module Plum - VERSION = "0.0.2" + VERSION = "0.1.2" end diff --git a/lib/rack/handler/plum.rb b/lib/rack/handler/plum.rb new file mode 100644 index 0000000..cf34ee4 --- /dev/null +++ b/lib/rack/handler/plum.rb @@ -0,0 +1,50 @@ +# -*- frozen-string-literal: true -*- +module Rack + module Handler + class Plum + def self.run(app, options = {}) + opts = default_options.merge(options) + + config = ::Plum::Rack::Config.new( + listeners: [ + { + listener: ::Plum::Rack::TLSListener, + hostname: opts[:Host], + port: opts[:Port].to_i + } + ], + debug: !!opts[:Debug] + ) + + @server = ::Plum::Rack::Server.new(app, config) + yield @server if block_given? # TODO + @server.start + end + + def self.shutdown + @server.stop if @server + end + + def self.valid_options + { + "Host=HOST" => "Hostname to listen on (default: #{default_options[:Host]})", + "Port=PORT" => "Port to listen on (default: #{default_options[:Port]})", + "Debug" => "Turn on debug mode (default: #{default_options[:Debug]})", + } + end + + private + def self.default_options + rack_env = ENV["RACK_ENV"] || "development" + dev = rack_env == "development" + default_options = { + Host: dev ? "localhost" : "0.0.0.0", + Port: 8080, + Debug: dev, + } + end + end + + register(:plum, ::Rack::Handler::Plum) + end +end diff --git a/plum.gemspec b/plum.gemspec index 4fd1289..cb1a3b6 100644 --- a/plum.gemspec +++ b/plum.gemspec @@ -21,6 +21,7 @@ Gem::Specification.new do |spec| spec.add_development_dependency "bundler", "~> 1.10" spec.add_development_dependency "http_parser.rb" + spec.add_development_dependency "rack" spec.add_development_dependency "rake" spec.add_development_dependency "yard" spec.add_development_dependency "minitest", "~> 5.7.0" diff --git a/test/plum/hpack/test_decoder.rb b/test/plum/hpack/test_decoder.rb index 9acb821..9a500f7 100644 --- a/test/plum/hpack/test_decoder.rb +++ b/test/plum/hpack/test_decoder.rb @@ -4,38 +4,38 @@ class HPACKDecoderTest < Minitest::Test # C.1.1 def test_hpack_read_integer_small buf = [0b11001010, 0b00001111].pack("C*") - result = new_decoder.__send__(:read_integer!, buf, 5) + result, succ = new_decoder.__send__(:read_integer, buf, 0, 5) assert_equal(10, result) - assert_equal([0b00001111].pack("C*"), buf) + assert_equal(1, succ) end # C.1.2 def test_hpack_read_integer_big buf = [0b11011111, 0b10011010, 0b00001010, 0b00001111].pack("C*") - result = new_decoder.__send__(:read_integer!, buf, 5) + result, succ = new_decoder.__send__(:read_integer, buf, 0, 5) assert_equal(1337, result) - assert_equal([0b00001111].pack("C*"), buf) + assert_equal(3, succ) end # C.1.3 def test_hpack_read_integer_8prefix buf = [0b00101010, 0b00001111].pack("C*") - result = new_decoder.__send__(:read_integer!, buf, 8) + result, succ = new_decoder.__send__(:read_integer, buf, 0, 8) assert_equal(42, result) - assert_equal([0b00001111].pack("C*"), buf) + assert_equal(1, succ) end def test_hpack_read_integer_too_big buf = [0b11011111, 0b10011010, 0b10001010, 0b10001111, 0b11111111, 0b00000011].pack("C*") assert_raises(HPACKError) { - new_decoder.__send__(:read_integer!, buf, 5) + new_decoder.__send__(:read_integer, buf, 0, 5) } end def test_hpack_read_integer_incomplete buf = [0b11011111, 0b10011010].pack("C*") assert_raises(HPACKError) { - new_decoder.__send__(:read_integer!, buf, 5) + new_decoder.__send__(:read_integer, buf, 0, 5) } end diff --git a/test/plum/hpack/test_encoder.rb b/test/plum/hpack/test_encoder.rb index 3c56dfc..a0096ed 100644 --- a/test/plum/hpack/test_encoder.rb +++ b/test/plum/hpack/test_encoder.rb @@ -3,19 +3,19 @@ require "test_helper" class HPACKEncoderTest < Minitest::Test # C.1.1 def test_hpack_encode_integer_small - result = new_encoder(1 << 31).__send__(:encode_integer, 10, 5) + result = new_encoder(1 << 31).__send__(:encode_integer, 10, 5, 0b00000000) assert_equal([0b00001010].pack("C*"), result) end # C.1.2 def test_hpack_encode_integer_big - result = new_encoder(1 << 31).__send__(:encode_integer, 1337, 5) + result = new_encoder(1 << 31).__send__(:encode_integer, 1337, 5, 0b000000) assert_equal([0b00011111, 0b10011010, 0b00001010].pack("C*"), result) end # C.1.3 def test_hpack_encode_integer_8prefix - result = new_encoder(1 << 31).__send__(:encode_integer, 42, 8) + result = new_encoder(1 << 31).__send__(:encode_integer, 42, 8, 0b000000) assert_equal([0b00101010].pack("C*"), result) end diff --git a/test/plum/stream/test_handle_frame.rb b/test/plum/stream/test_handle_frame.rb index ee9a394..6a062cf 100644 --- a/test/plum/stream/test_handle_frame.rb +++ b/test/plum/stream/test_handle_frame.rb @@ -8,7 +8,7 @@ class StreamHandleFrameTest < Minitest::Test payload = "ABC" * 5 open_new_stream(state: :open) {|stream| data = nil - stream.on(:data) {|_data| data = _data } + stream.connection.on(:data) {|_, _data| data = _data } stream.receive_frame(Frame.new(type: :data, stream_id: stream.id, flags: [], payload: payload)) assert_equal(payload, data) @@ -19,7 +19,7 @@ class StreamHandleFrameTest < Minitest::Test payload = "ABC" * 5 open_new_stream(state: :open) {|stream| data = nil - stream.on(:data) {|_data| data = _data } + stream.connection.on(:data) {|_, _data| data = _data } stream.receive_frame(Frame.new(type: :data, stream_id: stream.id, flags: [:padded], payload: "".push_uint8(6).push(payload).push("\x00"*6))) assert_equal(payload, data) @@ -59,7 +59,7 @@ class StreamHandleFrameTest < Minitest::Test def test_stream_handle_headers_single open_new_stream {|stream| headers = nil - stream.on(:headers) {|_headers| + stream.connection.on(:headers) {|_, _headers| headers = _headers } stream.receive_frame(Frame.new(type: :headers, @@ -75,7 +75,7 @@ class StreamHandleFrameTest < Minitest::Test open_new_stream {|stream| payload = HPACK::Encoder.new(0).encode([[":path", "/"]]) headers = nil - stream.on(:headers) {|_headers| + stream.connection.on(:headers) {|_, _headers| headers = _headers } stream.receive_frame(Frame.new(type: :headers, @@ -96,7 +96,7 @@ class StreamHandleFrameTest < Minitest::Test open_new_stream {|stream| payload = HPACK::Encoder.new(0).encode([[":path", "/"]]) headers = nil - stream.on(:headers) {|_headers| + stream.connection.on(:headers) {|_, _headers| headers = _headers } stream.receive_frame(Frame.new(type: :headers, @@ -156,7 +156,7 @@ class StreamHandleFrameTest < Minitest::Test stream = open_new_stream(con) headers = nil - stream.on(:headers) {|_headers| headers = _headers } + stream.connection.on(:headers) {|_, _headers| headers = _headers } header_block = HPACK::Encoder.new(0).encode([[":path", "/"]]) payload = "".push_uint32((1 << 31) | parent.id) .push_uint8(50) diff --git a/test/plum/test_frame.rb b/test/plum/test_frame.rb index 0cd9e7e..4795627 100644 --- a/test/plum/test_frame.rb +++ b/test/plum/test_frame.rb @@ -4,6 +4,7 @@ class FrameTest < Minitest::Test # Frame.parse! def test_parse_header_uncomplete buffer = "\x00\x00\x00" << "\x00" << "\x00" + buffer.force_encoding(Encoding::BINARY) buffer_orig = buffer.dup assert_nil(Plum::Frame.parse!(buffer)) assert_equal(buffer_orig, buffer) @@ -11,6 +12,7 @@ class FrameTest < Minitest::Test def test_parse_body_uncomplete buffer = "\x00\x00\x03" << "\x00" << "\x00" << "\x00\x00\x00\x00" << "ab" + buffer.force_encoding(Encoding::BINARY) buffer_orig = buffer.dup assert_nil(Plum::Frame.parse!(buffer)) assert_equal(buffer_orig, buffer) @@ -18,7 +20,8 @@ class FrameTest < Minitest::Test def test_parse # R 0x1, stream_id 0x4, body "abc" - buffer = "\x00\x00\x03" << "\x00" << "\x09" << "\x80\x00\x00\x04" << "abc" << "next_frame_data" + buffer = "\x00\x00\x03" << "\x00" << "\x09" << "\x00\x00\x00\x04" << "abc" << "next_frame_data" + buffer.force_encoding(Encoding::BINARY) frame = Plum::Frame.parse!(buffer) assert_equal(3, frame.length) assert_equal(:data, frame.type) diff --git a/test/plum/test_http_connection.rb b/test/plum/test_http_connection.rb index fc451d0..ea72a55 100644 --- a/test/plum/test_http_connection.rb +++ b/test/plum/test_http_connection.rb @@ -27,9 +27,7 @@ class HTTPConnectionNegotiationTest < Minitest::Test io = StringIO.new con = HTTPConnection.new(io) heads = nil - con.on(:stream) {|stream| - stream.on(:headers) {|_h| heads = _h.to_h } - } + con.on(:headers) {|_, _h| heads = _h.to_h } req = "GET / HTTP/1.1\r\n" << "Host: rhe.jp\r\n" << "User-Agent: nya\r\n" << diff --git a/test/plum/test_https_connection.rb b/test/plum/test_https_connection.rb index 34679bc..b08f7be 100644 --- a/test/plum/test_https_connection.rb +++ b/test/plum/test_https_connection.rb @@ -64,24 +64,21 @@ class HTTPSConnectionNegotiationTest < Minitest::Test client_thread = Thread.new { sock = TCPSocket.new("127.0.0.1", LISTEN_PORT) begin - Timeout.timeout(3) { - ctx = OpenSSL::SSL::SSLContext.new.tap {|ctx| - ctx.alpn_protocols = ["h2"] - ctx.ciphers = "AES256-GCM-SHA384" - } - ssl = OpenSSL::SSL::SSLSocket.new(sock, ctx) - ssl.connect - ssl.write Connection::CLIENT_CONNECTION_PREFACE - ssl.write Frame.settings.assemble + ctx = OpenSSL::SSL::SSLContext.new.tap {|ctx| + ctx.alpn_protocols = ["h2"] + ctx.ciphers = "AES256-GCM-SHA384" } - rescue Timeout::Error - flunk "client timeout" + ssl = OpenSSL::SSL::SSLSocket.new(sock, ctx) + ssl.connect + ssl.write Connection::CLIENT_CONNECTION_PREFACE + ssl.write Frame.settings.assemble + sleep ensure sock.close end } - client_thread.join server_thread.join + client_thread.kill flunk "test not run" unless run end diff --git a/test/utils/server.rb b/test/utils/server.rb index e3b8386..8d1c81d 100644 --- a/test/utils/server.rb +++ b/test/utils/server.rb @@ -31,7 +31,7 @@ module ServerUtils end def sent_frames(con = nil) - resp = (con || @_con).io.string.dup + resp = (con || @_con).sock.string.dup.force_encoding(Encoding::BINARY) frames = [] while f = Frame.parse!(resp) frames << f @@ -40,10 +40,10 @@ module ServerUtils end def capture_frames(con = nil, &blk) - io = (con || @_con).io + io = (con || @_con).sock pos = io.string.bytesize blk.call - resp = io.string.byteslice(pos, io.string.bytesize - pos) + resp = io.string.byteslice(pos, io.string.bytesize - pos).force_encoding(Encoding::BINARY) frames = [] while f = Frame.parse!(resp) frames << f |