diff options
author | Kazuki Yamaguchi <k@rhe.jp> | 2015-08-09 00:25:50 +0900 |
---|---|---|
committer | Kazuki Yamaguchi <k@rhe.jp> | 2015-08-09 00:25:50 +0900 |
commit | 3969a4bebf7737aec0484cbf4ac4d2ca3511b6b7 (patch) | |
tree | a2e99488844ccf108563a6e282284bc6ddbbb619 /lib/plum/connection.rb | |
parent | e2d18a67f6d9f8c78afe1eba769b9d474011bf6c (diff) | |
download | plum-3969a4bebf7737aec0484cbf4ac4d2ca3511b6b7.tar.gz |
implement "http" URIs support (currently only 'with prior knowledge')
Diffstat (limited to 'lib/plum/connection.rb')
-rw-r--r-- | lib/plum/connection.rb | 223 |
1 files changed, 223 insertions, 0 deletions
diff --git a/lib/plum/connection.rb b/lib/plum/connection.rb new file mode 100644 index 0000000..6dac8c3 --- /dev/null +++ b/lib/plum/connection.rb @@ -0,0 +1,223 @@ +using Plum::BinaryString + +module Plum + class Connection + include EventEmitter + include FlowControl + include ConnectionHelper + + CLIENT_CONNECTION_PREFACE = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" + + DEFAULT_SETTINGS = { + header_table_size: 4096, # octets + enable_push: 1, # 1: enabled, 0: disabled + max_concurrent_streams: 1 << 30, # (1 << 31) / 2 + initial_window_size: 65535, # octets; <= 2 ** 31 - 1 + max_frame_size: 16384, # octets; <= 2 ** 24 - 1 + max_header_list_size: (1 << 32) - 1 # Fixnum + } + + attr_reader :hpack_encoder, :hpack_decoder + attr_reader :local_settings, :remote_settings + attr_reader :state, :streams, :io + + def initialize(io, local_settings = {}) + @io = io + @local_settings = Hash.new {|hash, key| DEFAULT_SETTINGS[key] }.merge!(local_settings) + @remote_settings = Hash.new {|hash, key| DEFAULT_SETTINGS[key] } + @buffer = "".force_encoding(Encoding::BINARY) + @streams = {} + @state = :negotiation + @hpack_decoder = HPACK::Decoder.new(@local_settings[:header_table_size]) + @hpack_encoder = HPACK::Encoder.new(@remote_settings[:header_table_size]) + initialize_flow_control(send: @remote_settings[:initial_window_size], + recv: @local_settings[:initial_window_size]) + end + private :initialize + + # Starts communication with the peer. It blocks until the io is closed, or reaches EOF. + def run + while !@io.closed? && !@io.eof? + receive @io.readpartial(1024) + end + end + + # Closes the connection and closes the io. Sends GOAWAY frame to the peer. + # + # @param error_code [Integer] The error code to be contained in the GOAWAY frame. + def close(error_code = 0) + last_id = @streams.keys.reverse_each.find {|id| id.odd? } + data = "" + data.push_uint32((last_id || 0) & ~(1 << 31)) + data.push_uint32(error_code) + data.push("") # debug message + send_immediately Frame.new(type: :goaway, + stream_id: 0, + payload: data) + # TODO: server MAY wait streams + @io.close + end + + # Receives the specified data and process. + # + # @param new_data [String] The data received from the peer. + def receive(new_data) + return if new_data.empty? + @buffer << new_data + + if @state == :negotiation + negotiate! + end + + if @state != :negotiation + while frame = Frame.parse!(@buffer) + callback(:frame, frame) + receive_frame(frame) + end + end + rescue ConnectionError => e + callback(:connection_error, e) + close(e.http2_error_code) + end + alias << receive + + # Reserves a new stream to server push. + # + # @param args [Hash] The argument to pass to Stram.new. + def reserve_stream(**args) + next_id = ((@streams.keys.last / 2).to_i + 1) * 2 + stream = new_stream(next_id, state: :reserved_local, **args) + stream + end + + private + def send_immediately(frame) + callback(:send_frame, frame) + @io.write(frame.assemble) + end + + def validate_received_frame(frame) + case @state + when :waiting_settings + raise ConnectionError.new(:protocol_error) if frame.type != :settings + @state = :negotiated + callback(:negotiated) + when :waiting_continuation + if frame.type != :continuation || frame.stream_id != @continuation_id + raise Plum::ConnectionError.new(:protocol_error) + end + + if frame.flags.include?(:end_headers) + @state = :open + @continuation_id = nil + end + else + if [:headers].include?(frame.type) + if !frame.flags.include?(:end_headers) + @state = :waiting_continuation + @continuation_id = frame.stream_id + end + end + end + end + + def receive_frame(frame) + validate_received_frame(frame) + + if frame.stream_id == 0 + receive_control_frame(frame) + else + if @streams.key?(frame.stream_id) + stream = @streams[frame.stream_id] + else + if frame.stream_id.even? # stream started by client must have odd ID + raise Plum::ConnectionError.new(:protocol_error) + end + stream = new_stream(frame.stream_id) + end + stream.receive_frame(frame) + end + end + + def receive_control_frame(frame) + if frame.length > @local_settings[:max_frame_size] + raise ConnectionError.new(:frame_size_error) + end + + case frame.type + when :settings + receive_settings(frame) + when :window_update + receive_window_update(frame) + when :ping + receive_ping(frame) + when :goaway + close + when :data, :headers, :priority, :rst_stream, :push_promise, :continuation + raise Plum::ConnectionError.new(:protocol_error) + else + # MUST ignore unknown frame type. + end + end + + def receive_settings(frame) + if frame.flags.include?(:ack) + raise ConnectionError.new(:frame_size_error) if frame.length != 0 + return + end + + raise ConnectionError.new(:frame_size_error) if frame.length % 6 != 0 + + old_remote_settings = @remote_settings.dup + @remote_settings.merge!(frame.parse_settings) + apply_remote_settings(old_remote_settings) + + callback(:remote_settings, @remote_settings, old_remote_settings) + + send_immediately Frame.new(type: :settings, stream_id: 0x00, flags: [:ack]) + end + + def update_local_settings(new_settings) + old_settings = @local_settings.dup + @local_settings.merge!(new_settings) + + @hpack_decoder.limit = @local_settings[:header_table_size] + update_recv_initial_window_size(@local_settings[:initial_window_size] - old_settings[:initial_window_size]) + end + + def apply_remote_settings(old_remote_settings) + @hpack_encoder.limit = @remote_settings[:header_table_size] + update_send_initial_window_size(@remote_settings[:initial_window_size] - old_remote_settings[:initial_window_size]) + end + + def receive_ping(frame) + raise Plum::ConnectionError.new(:frame_size_error) if frame.length != 8 + + if frame.flags.include?(:ack) + on(:ping_ack) + else + on(:ping) + opaque_data = frame.payload + send_immediately Frame.new(type: :ping, + stream_id: 0, + flags: [:ack], + payload: opaque_data) + end + end + + def new_stream(stream_id, **args) + if @streams.size > 0 && @streams.keys.last >= stream_id + raise Plum::ConnectionError.new(:protocol_error) + end + + stream = Stream.new(self, stream_id, **args) + callback(:stream, stream) + @streams[stream_id] = stream + stream + end + + def local_error + ConnectionError + end + end +end |