diff options
author | Kazuki Yamaguchi <k@rhe.jp> | 2015-08-05 22:52:04 +0900 |
---|---|---|
committer | Kazuki Yamaguchi <k@rhe.jp> | 2015-08-05 22:52:04 +0900 |
commit | 2118f5bf3ccbacc060b9d2502934119b04e184f0 (patch) | |
tree | 22dc1e31a7c59a575f2fb15734912315d33239ae /lib/plum | |
parent | 06608a0cab5fd7f6e319b4954e2bf8d52f37ae72 (diff) | |
download | plum-2118f5bf3ccbacc060b9d2502934119b04e184f0.tar.gz |
stream: split user APIs into StreamHelper
Diffstat (limited to 'lib/plum')
-rw-r--r-- | lib/plum/stream.rb | 78 | ||||
-rw-r--r-- | lib/plum/stream_helper.rb | 82 |
2 files changed, 84 insertions, 76 deletions
diff --git a/lib/plum/stream.rb b/lib/plum/stream.rb index fe49e66..f44f8e9 100644 --- a/lib/plum/stream.rb +++ b/lib/plum/stream.rb @@ -2,6 +2,8 @@ using Plum::BinaryString module Plum class Stream + include StreamHelper + attr_reader :id, :state, :connection attr_reader :weight, :parent, :exclusive attr_accessor :recv_remaining_window, :send_remaining_window @@ -38,15 +40,6 @@ module Plum end end - # Increases receiving window size. Sends WINDOW_UPDATE frame to the peer. - # - # @param wsi [Integer] The amount to increase receiving window size. The legal range is 1 to 2^32-1. - def window_update(wsi) - @recv_remaining_window += wsi - payload = "".push_uint32(wsi & ~(1 << 31)) - send Frame.new(type: :window_update, stream_id: id, payload: payload) - end - # Processes received frames for this stream. Internal use. # @private def process_frame(frame) @@ -119,39 +112,6 @@ module Plum @connection.send(frame) end - # Responds to HTTP request. - # - # @param headers [Hash<String, String>] The response headers. - # @param body [String, IO] The response body. - def respond(headers, body = nil, end_stream: true) # TODO: priority, padding - if body - send_headers(headers, end_stream: false) - send_data(body, end_stream: end_stream) - else - send_headers(headers, end_stream: end_stream) - end - end - - # Reserves a stream to server push. Sends PUSH_STREAM and create new stream. - # - # @param headers [Hash<String, String>] The *request* headers. It must contain all of them: ':authority', ':method', ':scheme' and ':path'. - # @return [Stream] The stream to send push response. - def promise(headers) - stream = @connection.reserve_stream(weight: self.weight + 1, parent: self) - payload = "".force_encoding(Encoding::BINARY) - payload.push_uint32((0 << 31 | stream.id)) - payload.push(@connection.hpack_encoder.encode(headers)) - - original = Frame.new(type: :push_promise, - flags: [:end_headers], - stream_id: id, - payload: payload) - original.split_headers(@connection.remote_settings[:max_frame_size]).each do |frame| - send frame - end - stream - end - # Registers an event handler to specified event. An event can have multiple handlers. # @param name [String] The name of event. # @yield Gives event-specific parameters. @@ -177,40 +137,6 @@ module Plum end end - def send_headers(headers, end_stream:) - max = @connection.remote_settings[:max_frame_size] - encoded = @connection.hpack_encoder.encode(headers) - original_frame = Frame.new(type: :headers, - flags: [:end_headers, end_stream ? :end_stream : nil].compact, - stream_id: id, - payload: encoded) - original_frame.split_headers(max).each do |frame| - send frame - end - @state = :half_closed_local if end_stream - end - - def send_data(data, end_stream: true) - max = @connection.remote_settings[:max_frame_size] - if data.is_a?(IO) - while !data.eof? && fragment = data.readpartial(max) - send Frame.new(type: :data, - stream_id: id, - flags: (end_stream && data.eof? && [:end_stream]), - payload: fragment) - end - else - original = Frame.new(type: :data, - stream_id: id, - flags: (end_stream && [:end_stream]), - payload: data.to_s) - original.split_data(max).each do |frame| - send frame - end - end - @state = :half_closed_local if end_stream - end - def validate_received_frame(frame) if frame.length > @connection.local_settings[:max_frame_size] if [:headers, :push_promise, :continuation].include?(frame.type) diff --git a/lib/plum/stream_helper.rb b/lib/plum/stream_helper.rb new file mode 100644 index 0000000..6dd48d4 --- /dev/null +++ b/lib/plum/stream_helper.rb @@ -0,0 +1,82 @@ +using Plum::BinaryString + +module Plum + module StreamHelper + # Increases receiving window size. Sends WINDOW_UPDATE frame to the peer. + # + # @param wsi [Integer] The amount to increase receiving window size. The legal range is 1 to 2^32-1. + def window_update(wsi) + @recv_remaining_window += wsi + payload = "".push_uint32(wsi & ~(1 << 31)) + send Frame.new(type: :window_update, stream_id: id, payload: payload) + end + + # Responds to HTTP request. + # + # @param headers [Hash<String, String>] The response headers. + # @param body [String, IO] The response body. + def respond(headers, body = nil, end_stream: true) # TODO: priority, padding + if body + send_headers(headers, end_stream: false) + send_data(body, end_stream: end_stream) + else + send_headers(headers, end_stream: end_stream) + end + end + + # Reserves a stream to server push. Sends PUSH_STREAM and create new stream. + # + # @param headers [Hash<String, String>] The *request* headers. It must contain all of them: ':authority', ':method', ':scheme' and ':path'. + # @return [Stream] The stream to send push response. + def promise(headers) + stream = @connection.reserve_stream(weight: self.weight + 1, parent: self) + payload = "".force_encoding(Encoding::BINARY) + payload.push_uint32((0 << 31 | stream.id)) + payload.push(@connection.hpack_encoder.encode(headers)) + + original = Frame.new(type: :push_promise, + flags: [:end_headers], + stream_id: id, + payload: payload) + original.split_headers(@connection.remote_settings[:max_frame_size]).each do |frame| + send frame + end + stream + end + + private + def send_headers(headers, end_stream:) + max = @connection.remote_settings[:max_frame_size] + encoded = @connection.hpack_encoder.encode(headers) + original_frame = Frame.new(type: :headers, + flags: [:end_headers, end_stream ? :end_stream : nil].compact, + stream_id: id, + payload: encoded) + original_frame.split_headers(max).each do |frame| + send frame + end + @state = :half_closed_local if end_stream + end + + def send_data(data, end_stream: true) + max = @connection.remote_settings[:max_frame_size] + if data.is_a?(IO) + while !data.eof? && fragment = data.readpartial(max) + send Frame.new(type: :data, + stream_id: id, + flags: (end_stream && data.eof? && [:end_stream]), + payload: fragment) + end + else + original = Frame.new(type: :data, + stream_id: id, + flags: (end_stream && [:end_stream]), + payload: data.to_s) + original.split_data(max).each do |frame| + send frame + end + end + @state = :half_closed_local if end_stream + end + end +end |