From 18e6795c128788deb33c5587a50eced0f73bac42 Mon Sep 17 00:00:00 2001 From: Kazuki Yamaguchi Date: Thu, 16 Jul 2015 19:57:25 +0900 Subject: make examples work --- lib/plum.rb | 5 ++ lib/plum/frame.rb | 114 ++++++++++++++++++++++++++++++++++ lib/plum/server.rb | 118 +++++++++++++++++++++++++++++++++++ lib/plum/stream.rb | 176 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 413 insertions(+) create mode 100644 lib/plum/frame.rb create mode 100644 lib/plum/server.rb create mode 100644 lib/plum/stream.rb (limited to 'lib') diff --git a/lib/plum.rb b/lib/plum.rb index a5343ef..ebe6f0d 100644 --- a/lib/plum.rb +++ b/lib/plum.rb @@ -1,3 +1,5 @@ +require "openssl" +require "socket" require "plum/version" require "plum/error" require "plum/hpack" @@ -5,3 +7,6 @@ require "plum/hpack/huffman" require "plum/hpack/context" require "plum/hpack/decoder" require "plum/hpack/encoder" +require "plum/frame" +require "plum/stream" +require "plum/server" diff --git a/lib/plum/frame.rb b/lib/plum/frame.rb new file mode 100644 index 0000000..cbddbe8 --- /dev/null +++ b/lib/plum/frame.rb @@ -0,0 +1,114 @@ +module Plum + class Frame + FRAME_TYPES = { + data: 0x00, + headers: 0x01, + priority: 0x02, + rst_stream: 0x03, + settings: 0x04, + push_promise: 0x05, + ping: 0x06, + goaway: 0x07, + window_update: 0x08, + continuation: 0x09 + } + + FRAME_FLAGS = { + data: { + end_stream: 0x01, + padded: 0x08 + }, + headers: { + end_stream: 0x01, + end_headers: 0x04, + padded: 0x08, + priority: 0x20 + }, + priority: {}, + rst_stream: {}, + settings: { + ack: 0x01 + }, + push_promise: { + end_headers: 0x04, + padded: 0x08 + }, + ping: { + ack: 0x01 + }, + goaway: {}, + window_update: {}, + continuation: { + end_headers: 0x04 + } + } + + # RFC7540: 4.1 Frame format + # +-----------------------------------------------+ + # | Length (24) | + # +---------------+---------------+---------------+ + # | Type (8) | Flags (8) | + # +-+-------------+---------------+-------------------------------+ + # |R| Stream Identifier (31) | + # +=+=============================================================+ + # | Frame Payload (0...) ... + # +---------------------------------------------------------------+ + + # [Integer] The length of payload. unsigned 24-bit integer + attr_reader :length + # [Integer] Frame type. 8-bit + attr_reader :type_value + # [Integer] Flags. 8-bit + attr_reader :flags_value + # [Integer] Stream Identifier. unsigned 31-bit integer + attr_reader :stream_id + # [String] The payload. + attr_reader :payload + + def initialize(length: nil, type: nil, type_value: nil, flags: nil, flags_value: nil, stream_id: nil, payload: nil) + @payload = payload.to_s + @length = length || @payload.bytesize + @type_value = type_value || FRAME_TYPES[type] or raise ArgumentError.new("type_value or type is necessary") + @flags_value = flags_value || (flags && flags.map {|flag| FRAME_FLAGS[type][flag] }.inject(:|)) || 0 + @stream_id = stream_id or raise ArgumentError.new("stream_id is necessary") + end + + def type + FRAME_TYPES.key(type_value) + end + + def flags + FRAME_FLAGS[type].select {|name, value| value & flags_value > 0 }.map {|name, value| name } + end + + def assemble + bytes = "" + bytes << [length].pack("N")[1, 3] # last 3*8 bits + bytes << [type_value].pack("C") + bytes << [flags_value].pack("C") + bytes << [stream_id & ~(1 << 31)].pack("N") # first bit is reserved (MUST be 0) + bytes << payload + bytes + end + + def inspect + "#" + end + + def self.parse!(buffer) + return nil if buffer.size < 9 # header: 9 bytes + bhead = buffer[0, 9] + length = ("\x00" + bhead[0, 3]).unpack("N")[0] + return nil if buffer.size < 9 + length + + payload = buffer.slice!(0...(9 + length))[9, length] + type = bhead[3, 1].unpack("C")[0] + flags = bhead[4, 1].unpack("C")[0] + r_sid = bhead[5, 4].unpack("N")[0] + r = r_sid >> 31 + stream_id = r_sid & ~(1 << 31) + + self.new(length: length, type_value: type, flags_value: flags, stream_id: stream_id, payload: payload) + end + end +end diff --git a/lib/plum/server.rb b/lib/plum/server.rb new file mode 100644 index 0000000..1551985 --- /dev/null +++ b/lib/plum/server.rb @@ -0,0 +1,118 @@ +module Plum + class Server + CLIENT_CONNECTION_PREFACE = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" + + attr_reader :hpack_encoder, :hpack_decoder + attr_accessor :on_stream, :on_frame, :on_send_frame + + def initialize(socket, settings = nil) + @socket = socket + @settings = nil + @buffer = "" + @streams = {} + @state = :waiting_for_connetion_preface + @last_stream_id = 0 + @hpack_decoder = HPACK::Decoder.new(65536) + @hpack_encoder = HPACK::Encoder.new(65536) + end + + def send(frame) + on(:send_frame, frame) + @socket.write(frame.assemble) + end + + def start + settings_payload = @settings + settings_frame = Frame.new(type: :settings, + stream_id: 0, + payload: settings_payload) + send(settings_frame) + + until @socket.eof? + @buffer << @socket.readpartial(1024) + process + end + rescue Plum::ConnectionError => e + on(:connection_error, e) + data = [@last_stream_id & ~(1 << 31)].pack("N") + data << [e.http2_error_code].pack("N") + data << "" + error = Frame.new(type: :goaway, + stream_id: 0, + payload: data) + send(error) + end + + def on(name, *args) + cb = instance_variable_get("@on_#{name}") + cb.call(*args) if cb + end + + private + def process + if @state == :waiting_for_connetion_preface + return if @buffer.size < 24 + if @buffer.slice!(0, 24) != CLIENT_CONNECTION_PREFACE + raise Plum::ConnectionError.new(:protocol_error) # (MAY) send GOAWAY. sending. + else + @state = :waiting_for_settings + # continue + end + end + + while frame = Frame.parse!(@buffer) + on(:frame, frame) + + if @state == :waiting_for_settings && frame.type != :settings + raise Plum::ConnectionError.new(:protocol_error) + end + + if frame.stream_id == 0 + process_control_frame(frame) + else + stream = @streams[frame.stream_id] + if stream + stream.on_frame(frame) + else + new_client_stream(frame) + end + @last_stream_id = frame.stream_id + end + end + end + + def process_control_frame(frame) + case frame.type + when :settings + @state = :initialized if @state == :waiting_for_settings + process_settings(frame) + when :window_update + else + # TODO + end + end + + def process_settings(frame) + # apply settings (MUST) + settings_ack = Frame.new(type: :settings, stream_id: 0x00, flags: [:ack]) + send(settings_ack) + end + + def new_client_stream(frame) + if (frame.stream_id % 2 == 0) || + (@streams.size > 0 && @streams.keys.last >= frame.stream_id) + raise Plum::ConnectionError.new(:protocol_error) + end + + unless [:headers, :push_stream].include?(frame.type) + raise Plum::ConnectionError.new(:protocol_error) + end + + @streams.select {|id, s| s.state == :idle }.each {|id, s| s.close } + stream = Stream.new(self, frame.stream_id) + @streams[frame.stream_id] = stream + on(:stream, stream) + stream.on_frame(frame) + end + end +end diff --git a/lib/plum/stream.rb b/lib/plum/stream.rb new file mode 100644 index 0000000..dddc0d9 --- /dev/null +++ b/lib/plum/stream.rb @@ -0,0 +1,176 @@ +module Plum + class Stream + attr_reader :id, :state, :priority + attr_accessor :on_headers, :on_data, :on_close, :on_open, :on_complete + + def initialize(con, id) + @connection = con + @id = id + @state = :idle + @continuation = false + @header_fragment = nil + end + + def on_frame(frame) + case frame.type + when :data + process_data(frame) + when :headers + process_headers(frame) + when :priority + process_priority(frame) + when :rst_stream + process_rst_stream(frame) + when :push_promise + process_push_promise(frame) + when :window_update + process_window_update(frame) + when :continuation + process_continuation(frame) + when :settings + raise Plum::ConnectionError.new(:protocol_error) # stream_id MUST be 0x00 + end + + if frame.flags.include?(:end_stream) + on(:complete) + @state = :half_closed + end + rescue Plum::StreamError => e + on(:stream_error, e) + send Frame.new(type: :rst_stream, + stream_id: id, + payload: [e.http2_error_code].pack("N")) + close + end + + def send(frame) + @connection.send(frame) + end + + def send_headers(headers, flags = []) + encoded = @connection.hpack_encoder.encode(headers) + send Frame.new(type: :headers, + flags: [:end_headers] + flags, + stream_id: id, + payload: encoded) + end + + def send_body(body, flags = []) + send Frame.new(type: :data, + flags: flags, + stream_id: id, + payload: body) + end + + def close + @state = :closed + end + + def on(name, *args) + cb = instance_variable_get("@on_#{name}") + cb.call(*args) if cb + end + + private + def process_data(frame) + if @state != :open && @state != :half_closed_local + raise Plum::StreamError.new(:stream_closed) + end + + body = extract_padded(frame) + on(:data, body) + end + + def process_headers(frame) + on(:open) + @state = :open + + payload = extract_padded(frame) + if frame.flags.include?(:priority) + process_priority_payload(payload.slice!(0, 5)) + end + + if frame.flags.include?(:end_headers) + on(:headers, @connection.hpack_decoder.decode(payload).to_h) + else + @header_fragment = payload + @continuation = true + end + end + + def process_priority(frame) + if frame.length != 5 + raise Plum::StreamError.new(:frame_size_error) + end + process_priority_payload(frame.payload) + end + + def process_priority_payload(payload) + esd = payload.slice(0, 4).unpack("N")[0] + e = esd >> 31 + dependency_id = e & ~(1 << 31) + weight = payload.slice(4, 1).unpack("C")[0] + end + + def process_rst_stream(frame) + if @state == :idle + raise Plum::ConnectionError.new(:protocol_error) + elsif frame.length != 4 + raise Plum::ConnectionError.new(:frame_size_error) + else + close + end + end + + def process_push_promise(frame) + payload = extract_padded(frame) + rpsid = payload.slice!(0, 4).unpack("N")[0] + r = rpsid >> 31 + psid = rpsid & ~(1 << 31) + # TODO + end + + def process_window_update(frame) + if frame.size != 4 + raise Plum::ConnectionError.new(:frame_size_error) + end + inc = frame.payload.unpack("N")[0] + if inc == 0 + raise Plum::StreamError.new(:protocol_error) + end + # TODO + end + + def process_continuation(frame) + # TODO + unless @continuation + raise Plum::ConnectionError.new(:protocol_error) + end + + @header_fragment << frame.payload + if frame.flags.include?(:end_headers) + if @continuation == :push_promise + @connection.push_promise + else # @continuation == :headers + headers = @connection.hpack_decoder.decode(@header_fragment) + end + @header_fragment = nil + @continuation = nil + else + # continue + end + end + + def extract_padded(frame) + if frame.flags.include?(:padded) + padding_length = frame.payload[0, 1].unpack("C")[0] + if padding_length > frame.length + raise Plum::ConnectionError.new(:protocol_error, "padding is too long") + end + frame.payload[1, frame.length - padding_length - 1] + else + frame.payload + end + end + end +end -- cgit v1.2.3