aboutsummaryrefslogtreecommitdiffstats
path: root/lib/plum/flow_control.rb
blob: f28fa3a79bd53ae785bc7afe2d1f242292a3d462 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# frozen-string-literal: true

using Plum::BinaryString

module Plum
  module FlowControl
    attr_reader :send_remaining_window, :recv_remaining_window

    # Sends frame respecting inner-stream flow control.
    # @param frame [Frame] The frame to be sent.
    def send(frame)
      if frame.type == :data
        @send_buffer << frame
        if @send_remaining_window < frame.length
          if Stream === self
            connection.callback(:send_deferred, self, frame)
          else
            callback(:send_deferred, self, frame)
          end
        else
          consume_send_buffer
        end
      else
        send_immediately frame
      end
    end

    # Increases receiving window size. Sends WINDOW_UPDATE frame to the peer.
    # @param wsi [Integer] The amount to increase receiving window size. The legal range is 1 to 2^32-1.
    def window_update(wsi)
      @recv_remaining_window += wsi
      sid = (Stream === self) ? self.id : 0
      send_immediately Frame::WindowUpdate.new(sid, wsi)
    end

    protected
    def update_send_initial_window_size(diff)
      @send_remaining_window += diff
      consume_send_buffer

      if Connection === self
        @streams.values.each do |stream|
          stream.update_send_initial_window_size(diff)
        end
      end
    end

    def update_recv_initial_window_size(diff)
      @recv_remaining_window += diff
      if Connection === self
        @streams.values.each do |stream|
          stream.update_recv_initial_window_size(diff)
        end
      end
    end

    private
    def initialize_flow_control(send:, recv:)
      @send_buffer = []
      @send_remaining_window = send
      @recv_remaining_window = recv
    end

    def consume_recv_window(frame)
      if frame.type == :data
        @recv_remaining_window -= frame.length
        if @recv_remaining_window < 0
          local_error = (Connection === self) ? RemoteConnectionError : RemoteStreamError
          raise local_error.new(:flow_control_error)
        end
      end
    end

    def consume_send_buffer
      while frame = @send_buffer.first
        break if frame.length > @send_remaining_window
        @send_buffer.shift
        @send_remaining_window -= frame.length
        send_immediately frame
      end
    end

    def receive_window_update(frame)
      if frame.length != 4
        raise Plum::RemoteConnectionError.new(:frame_size_error)
      end

      r_wsi = frame.payload.uint32
      # r = r_wsi >> 31 # currently not used
      wsi = r_wsi # & ~(1 << 31)

      if wsi == 0
        local_error = (Connection === self) ? RemoteConnectionError : RemoteStreamError
        raise local_error.new(:protocol_error)
      end

      if Stream === self
        connection.callback(:window_update, self, wsi)
      else
        callback(:window_update, self, wsi)
      end

      @send_remaining_window += wsi
      consume_send_buffer
    end
  end
end