aboutsummaryrefslogtreecommitdiffstats
path: root/lib/plum/stream.rb
blob: 7987532d3574c820cba7f0413252d26ce4159ea2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
using Plum::BinaryString

module Plum
  class Stream
    include FlowControl
    include StreamUtils

    attr_reader :id, :state, :connection
    attr_reader :weight, :exclusive
    attr_accessor :parent

    def initialize(con, id, state: :idle, weight: 16, parent: nil, exclusive: false)
      @connection = con
      @id = id
      @state = state
      @continuation = []

      initialize_flow_control(send: @connection.remote_settings[:initial_window_size],
                              recv: @connection.local_settings[:initial_window_size])
      update_dependency(weight: weight, parent: parent, exclusive: exclusive)
    end

    # Returns the child (depending on this stream) streams.
    #
    # @return [Array<Stream>] The child streams.
    def children
      @connection.streams.values.select {|c| c.parent == self }.freeze
    end

    # Processes received frames for this stream. Internal use.
    # @private
    def receive_frame(frame)
      validate_received_frame(frame)
      consume_recv_window(frame)

      case frame.type
      when :data
        receive_data(frame)
      when :headers
        receive_headers(frame)
      when :priority
        receive_priority(frame)
      when :rst_stream
        receive_rst_stream(frame)
      when :window_update
        receive_window_update(frame)
      when :continuation
        receive_continuation(frame)
      when :ping, :goaway, :settings, :push_promise
        raise ConnectionError.new(:protocol_error) # stream_id MUST be 0x00
      else
        # MUST ignore unknown frame
      end
    rescue StreamError => e
      connection.callback(:stream_error, self, e)
      close(e.http2_error_type)
    end

    # Closes this stream. Sends RST_STREAM frame to the peer.
    #
    # @param error_type [Symbol] The error type to be contained in the RST_STREAM frame.
    def close(error_type = :no_error)
      @state = :closed
      send_immediately Frame.rst_stream(id, error_type)
    end

    private
    def send_immediately(frame)
      callback(:send_frame, frame)
      @connection.send(frame)
    end

    def update_dependency(weight: nil, parent: nil, exclusive: nil)
      raise StreamError.new(:protocol_error, "A stream cannot depend on itself.") if parent == self
      @weight = weight unless weight.nil?
      @parent = parent unless parent.nil?
      @exclusive = exclusive unless exclusive.nil?

      if exclusive == true
        parent.children.each do |child|
          next if child == self
          child.parent = self
        end
      end
    end

    def validate_received_frame(frame)
      if frame.length > @connection.local_settings[:max_frame_size]
        if [:headers, :push_promise, :continuation].include?(frame.type)
          raise ConnectionError.new(:frame_size_error)
        else
          raise StreamError.new(:frame_size_error)
        end
      end
    end

    def receive_end_stream
      connection.callback(:end_stream, self)
      @state = :half_closed_remote
    end

    def receive_data(frame)
      if @state != :open && @state != :half_closed_local
        raise StreamError.new(:stream_closed)
      end

      if frame.padded?
        padding_length = frame.payload.uint8(0)
        if padding_length >= frame.length
          raise ConnectionError.new(:protocol_error, "padding is too long")
        end
        body = frame.payload.byteslice(1, frame.length - padding_length - 1)
      else
        body = frame.payload
      end
      connection.callback(:data, self, body)

      receive_end_stream if frame.end_stream?
    end

    def receive_complete_headers(frames)
      first = frames.shift

      payload = first.payload
      first_length = first.length
      padding_length = 0

      if first.padded?
        padding_length = payload.uint8
        first_length -= 1 + padding_length
        payload = payload.byteslice(1, first_length)
      else
        payload = payload.dup
      end

      if first.priority?
        receive_priority_payload(payload.byteshift(5))
        first_length -= 5
      end

      if padding_length > first_length
        raise ConnectionError.new(:protocol_error, "padding is too long")
      end

      frames.each do |frame|
        payload << frame.payload
      end

      begin
        decoded_headers = @connection.hpack_decoder.decode(payload)
      rescue => e
        raise ConnectionError.new(:compression_error, e)
      end

      connection.callback(:headers, self, decoded_headers)

      receive_end_stream if first.end_stream?
    end

    def receive_headers(frame)
      if @state == :reserved_local
        raise ConnectionError.new(:protocol_error)
      elsif @state == :half_closed_remote
        raise StreamError.new(:stream_closed)
      elsif @state == :closed
        raise ConnectionError.new(:stream_closed)
      end

      @state = :open
      connection.callback(:open, self)

      if frame.end_headers?
        receive_complete_headers([frame])
      else
        @continuation << frame
      end
    end

    def receive_continuation(frame)
      # state error mustn't happen: server_connection validates
      @continuation << frame

      if frame.end_headers?
        receive_complete_headers(@continuation)
        @continuation.clear
      end
    end

    def receive_priority(frame)
      if frame.length != 5
        raise StreamError.new(:frame_size_error)
      end
      receive_priority_payload(frame.payload)
    end

    def receive_priority_payload(payload)
      esd = payload.uint32
      e = esd >> 31
      dependency_id = e & ~(1 << 31)
      weight = payload.uint8(4)

      update_dependency(weight: weight, parent: @connection.streams[dependency_id], exclusive: e == 1)
    end

    def receive_rst_stream(frame)
      if frame.length != 4
        raise ConnectionError.new(:frame_size_error)
      elsif @state == :idle
        raise ConnectionError.new(:protocol_error)
      end

      connection.callback(:rst_stream, self, frame)
      @state = :closed # MUST NOT send RST_STREAM
    end
  end
end