aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--lib/plum.rb5
-rw-r--r--lib/plum/frame.rb114
-rw-r--r--lib/plum/server.rb118
-rw-r--r--lib/plum/stream.rb176
4 files changed, 413 insertions, 0 deletions
diff --git a/lib/plum.rb b/lib/plum.rb
index a5343ef..ebe6f0d 100644
--- a/lib/plum.rb
+++ b/lib/plum.rb
@@ -1,3 +1,5 @@
+require "openssl"
+require "socket"
require "plum/version"
require "plum/error"
require "plum/hpack"
@@ -5,3 +7,6 @@ require "plum/hpack/huffman"
require "plum/hpack/context"
require "plum/hpack/decoder"
require "plum/hpack/encoder"
+require "plum/frame"
+require "plum/stream"
+require "plum/server"
diff --git a/lib/plum/frame.rb b/lib/plum/frame.rb
new file mode 100644
index 0000000..cbddbe8
--- /dev/null
+++ b/lib/plum/frame.rb
@@ -0,0 +1,114 @@
+module Plum
+ class Frame
+ FRAME_TYPES = {
+ data: 0x00,
+ headers: 0x01,
+ priority: 0x02,
+ rst_stream: 0x03,
+ settings: 0x04,
+ push_promise: 0x05,
+ ping: 0x06,
+ goaway: 0x07,
+ window_update: 0x08,
+ continuation: 0x09
+ }
+
+ FRAME_FLAGS = {
+ data: {
+ end_stream: 0x01,
+ padded: 0x08
+ },
+ headers: {
+ end_stream: 0x01,
+ end_headers: 0x04,
+ padded: 0x08,
+ priority: 0x20
+ },
+ priority: {},
+ rst_stream: {},
+ settings: {
+ ack: 0x01
+ },
+ push_promise: {
+ end_headers: 0x04,
+ padded: 0x08
+ },
+ ping: {
+ ack: 0x01
+ },
+ goaway: {},
+ window_update: {},
+ continuation: {
+ end_headers: 0x04
+ }
+ }
+
+ # RFC7540: 4.1 Frame format
+ # +-----------------------------------------------+
+ # | Length (24) |
+ # +---------------+---------------+---------------+
+ # | Type (8) | Flags (8) |
+ # +-+-------------+---------------+-------------------------------+
+ # |R| Stream Identifier (31) |
+ # +=+=============================================================+
+ # | Frame Payload (0...) ...
+ # +---------------------------------------------------------------+
+
+ # [Integer] The length of payload. unsigned 24-bit integer
+ attr_reader :length
+ # [Integer] Frame type. 8-bit
+ attr_reader :type_value
+ # [Integer] Flags. 8-bit
+ attr_reader :flags_value
+ # [Integer] Stream Identifier. unsigned 31-bit integer
+ attr_reader :stream_id
+ # [String] The payload.
+ attr_reader :payload
+
+ def initialize(length: nil, type: nil, type_value: nil, flags: nil, flags_value: nil, stream_id: nil, payload: nil)
+ @payload = payload.to_s
+ @length = length || @payload.bytesize
+ @type_value = type_value || FRAME_TYPES[type] or raise ArgumentError.new("type_value or type is necessary")
+ @flags_value = flags_value || (flags && flags.map {|flag| FRAME_FLAGS[type][flag] }.inject(:|)) || 0
+ @stream_id = stream_id or raise ArgumentError.new("stream_id is necessary")
+ end
+
+ def type
+ FRAME_TYPES.key(type_value)
+ end
+
+ def flags
+ FRAME_FLAGS[type].select {|name, value| value & flags_value > 0 }.map {|name, value| name }
+ end
+
+ def assemble
+ bytes = ""
+ bytes << [length].pack("N")[1, 3] # last 3*8 bits
+ bytes << [type_value].pack("C")
+ bytes << [flags_value].pack("C")
+ bytes << [stream_id & ~(1 << 31)].pack("N") # first bit is reserved (MUST be 0)
+ bytes << payload
+ bytes
+ end
+
+ def inspect
+ "#<Plum::Frame:0x#{__id__.to_s(16)} length=#{length.inspect}, type=#{type.inspect}, flags=#{flags.inspect}, stream_id=0x#{stream_id.to_s(16)}, payload=#{payload.inspect}>"
+ end
+
+ def self.parse!(buffer)
+ return nil if buffer.size < 9 # header: 9 bytes
+ bhead = buffer[0, 9]
+ length = ("\x00" + bhead[0, 3]).unpack("N")[0]
+ return nil if buffer.size < 9 + length
+
+ payload = buffer.slice!(0...(9 + length))[9, length]
+ type = bhead[3, 1].unpack("C")[0]
+ flags = bhead[4, 1].unpack("C")[0]
+ r_sid = bhead[5, 4].unpack("N")[0]
+ r = r_sid >> 31
+ stream_id = r_sid & ~(1 << 31)
+
+ self.new(length: length, type_value: type, flags_value: flags, stream_id: stream_id, payload: payload)
+ end
+ end
+end
diff --git a/lib/plum/server.rb b/lib/plum/server.rb
new file mode 100644
index 0000000..1551985
--- /dev/null
+++ b/lib/plum/server.rb
@@ -0,0 +1,118 @@
+module Plum
+ class Server
+ CLIENT_CONNECTION_PREFACE = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
+
+ attr_reader :hpack_encoder, :hpack_decoder
+ attr_accessor :on_stream, :on_frame, :on_send_frame
+
+ def initialize(socket, settings = nil)
+ @socket = socket
+ @settings = nil
+ @buffer = ""
+ @streams = {}
+ @state = :waiting_for_connetion_preface
+ @last_stream_id = 0
+ @hpack_decoder = HPACK::Decoder.new(65536)
+ @hpack_encoder = HPACK::Encoder.new(65536)
+ end
+
+ def send(frame)
+ on(:send_frame, frame)
+ @socket.write(frame.assemble)
+ end
+
+ def start
+ settings_payload = @settings
+ settings_frame = Frame.new(type: :settings,
+ stream_id: 0,
+ payload: settings_payload)
+ send(settings_frame)
+
+ until @socket.eof?
+ @buffer << @socket.readpartial(1024)
+ process
+ end
+ rescue Plum::ConnectionError => e
+ on(:connection_error, e)
+ data = [@last_stream_id & ~(1 << 31)].pack("N")
+ data << [e.http2_error_code].pack("N")
+ data << ""
+ error = Frame.new(type: :goaway,
+ stream_id: 0,
+ payload: data)
+ send(error)
+ end
+
+ def on(name, *args)
+ cb = instance_variable_get("@on_#{name}")
+ cb.call(*args) if cb
+ end
+
+ private
+ def process
+ if @state == :waiting_for_connetion_preface
+ return if @buffer.size < 24
+ if @buffer.slice!(0, 24) != CLIENT_CONNECTION_PREFACE
+ raise Plum::ConnectionError.new(:protocol_error) # (MAY) send GOAWAY. sending.
+ else
+ @state = :waiting_for_settings
+ # continue
+ end
+ end
+
+ while frame = Frame.parse!(@buffer)
+ on(:frame, frame)
+
+ if @state == :waiting_for_settings && frame.type != :settings
+ raise Plum::ConnectionError.new(:protocol_error)
+ end
+
+ if frame.stream_id == 0
+ process_control_frame(frame)
+ else
+ stream = @streams[frame.stream_id]
+ if stream
+ stream.on_frame(frame)
+ else
+ new_client_stream(frame)
+ end
+ @last_stream_id = frame.stream_id
+ end
+ end
+ end
+
+ def process_control_frame(frame)
+ case frame.type
+ when :settings
+ @state = :initialized if @state == :waiting_for_settings
+ process_settings(frame)
+ when :window_update
+ else
+ # TODO
+ end
+ end
+
+ def process_settings(frame)
+ # apply settings (MUST)
+ settings_ack = Frame.new(type: :settings, stream_id: 0x00, flags: [:ack])
+ send(settings_ack)
+ end
+
+ def new_client_stream(frame)
+ if (frame.stream_id % 2 == 0) ||
+ (@streams.size > 0 && @streams.keys.last >= frame.stream_id)
+ raise Plum::ConnectionError.new(:protocol_error)
+ end
+
+ unless [:headers, :push_stream].include?(frame.type)
+ raise Plum::ConnectionError.new(:protocol_error)
+ end
+
+ @streams.select {|id, s| s.state == :idle }.each {|id, s| s.close }
+ stream = Stream.new(self, frame.stream_id)
+ @streams[frame.stream_id] = stream
+ on(:stream, stream)
+ stream.on_frame(frame)
+ end
+ end
+end
diff --git a/lib/plum/stream.rb b/lib/plum/stream.rb
new file mode 100644
index 0000000..dddc0d9
--- /dev/null
+++ b/lib/plum/stream.rb
@@ -0,0 +1,176 @@
+module Plum
+ class Stream
+ attr_reader :id, :state, :priority
+ attr_accessor :on_headers, :on_data, :on_close, :on_open, :on_complete
+
+ def initialize(con, id)
+ @connection = con
+ @id = id
+ @state = :idle
+ @continuation = false
+ @header_fragment = nil
+ end
+
+ def on_frame(frame)
+ case frame.type
+ when :data
+ process_data(frame)
+ when :headers
+ process_headers(frame)
+ when :priority
+ process_priority(frame)
+ when :rst_stream
+ process_rst_stream(frame)
+ when :push_promise
+ process_push_promise(frame)
+ when :window_update
+ process_window_update(frame)
+ when :continuation
+ process_continuation(frame)
+ when :settings
+ raise Plum::ConnectionError.new(:protocol_error) # stream_id MUST be 0x00
+ end
+
+ if frame.flags.include?(:end_stream)
+ on(:complete)
+ @state = :half_closed
+ end
+ rescue Plum::StreamError => e
+ on(:stream_error, e)
+ send Frame.new(type: :rst_stream,
+ stream_id: id,
+ payload: [e.http2_error_code].pack("N"))
+ close
+ end
+
+ def send(frame)
+ @connection.send(frame)
+ end
+
+ def send_headers(headers, flags = [])
+ encoded = @connection.hpack_encoder.encode(headers)
+ send Frame.new(type: :headers,
+ flags: [:end_headers] + flags,
+ stream_id: id,
+ payload: encoded)
+ end
+
+ def send_body(body, flags = [])
+ send Frame.new(type: :data,
+ flags: flags,
+ stream_id: id,
+ payload: body)
+ end
+
+ def close
+ @state = :closed
+ end
+
+ def on(name, *args)
+ cb = instance_variable_get("@on_#{name}")
+ cb.call(*args) if cb
+ end
+
+ private
+ def process_data(frame)
+ if @state != :open && @state != :half_closed_local
+ raise Plum::StreamError.new(:stream_closed)
+ end
+
+ body = extract_padded(frame)
+ on(:data, body)
+ end
+
+ def process_headers(frame)
+ on(:open)
+ @state = :open
+
+ payload = extract_padded(frame)
+ if frame.flags.include?(:priority)
+ process_priority_payload(payload.slice!(0, 5))
+ end
+
+ if frame.flags.include?(:end_headers)
+ on(:headers, @connection.hpack_decoder.decode(payload).to_h)
+ else
+ @header_fragment = payload
+ @continuation = true
+ end
+ end
+
+ def process_priority(frame)
+ if frame.length != 5
+ raise Plum::StreamError.new(:frame_size_error)
+ end
+ process_priority_payload(frame.payload)
+ end
+
+ def process_priority_payload(payload)
+ esd = payload.slice(0, 4).unpack("N")[0]
+ e = esd >> 31
+ dependency_id = e & ~(1 << 31)
+ weight = payload.slice(4, 1).unpack("C")[0]
+ end
+
+ def process_rst_stream(frame)
+ if @state == :idle
+ raise Plum::ConnectionError.new(:protocol_error)
+ elsif frame.length != 4
+ raise Plum::ConnectionError.new(:frame_size_error)
+ else
+ close
+ end
+ end
+
+ def process_push_promise(frame)
+ payload = extract_padded(frame)
+ rpsid = payload.slice!(0, 4).unpack("N")[0]
+ r = rpsid >> 31
+ psid = rpsid & ~(1 << 31)
+ # TODO
+ end
+
+ def process_window_update(frame)
+ if frame.size != 4
+ raise Plum::ConnectionError.new(:frame_size_error)
+ end
+ inc = frame.payload.unpack("N")[0]
+ if inc == 0
+ raise Plum::StreamError.new(:protocol_error)
+ end
+ # TODO
+ end
+
+ def process_continuation(frame)
+ # TODO
+ unless @continuation
+ raise Plum::ConnectionError.new(:protocol_error)
+ end
+
+ @header_fragment << frame.payload
+ if frame.flags.include?(:end_headers)
+ if @continuation == :push_promise
+ @connection.push_promise
+ else # @continuation == :headers
+ headers = @connection.hpack_decoder.decode(@header_fragment)
+ end
+ @header_fragment = nil
+ @continuation = nil
+ else
+ # continue
+ end
+ end
+
+ def extract_padded(frame)
+ if frame.flags.include?(:padded)
+ padding_length = frame.payload[0, 1].unpack("C")[0]
+ if padding_length > frame.length
+ raise Plum::ConnectionError.new(:protocol_error, "padding is too long")
+ end
+ frame.payload[1, frame.length - padding_length - 1]
+ else
+ frame.payload
+ end
+ end
+ end
+end