summaryrefslogtreecommitdiffstats
path: root/lib/plum/rack/connection.rb
blob: 1100ae87b4a4e4733aec901ccef83b05e7585068 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
module Plum
  module Rack
    class Connection
      attr_reader :app, :sock, :plum

      def initialize(app, sock, logger)
        @app = app
        @sock = sock
        @logger = logger
      end

      def stop
        @sock.close # TODO: gracefully shutdown
      end

      def start
        Thread.new {
          begin
            @sock = @sock.accept if @sock.respond_to?(:accept) # SSLSocket
            @plum = setup_plum
            @plum.run
          rescue Errno::EPIPE, Errno::ECONNRESET => e
            @logger.debug("connection closed: #{e}")
          rescue StandardError => e
            @logger.error("#{e.class}: #{e.message}\n#{e.backtrace.map { |b| "\t#{b}" }.join("\n")}")
          end
        }
      end

      private
      def setup_plum
        plum = ::Plum::HTTPConnection.new(@sock)
        plum.on(:connection_error) { |ex| @logger.error(ex) }

        plum.on(:stream) do |stream|
          stream.on(:stream_error) { |ex| @logger.error(ex) }

          headers = data = nil
          stream.on(:open) {
            headers = nil
            data = "".b
          }

          stream.on(:headers) { |h|
            @logger.debug("headers: " + h.map {|name, value| "#{name}: #{value}" }.join(" // "))
            headers = h
          }

          stream.on(:data) { |d|
            @logger.debug("data: #{d.bytesize}")
            data << d # TODO: store to file?
          }

          stream.on(:end_stream) {
            env = new_env(headers, data)
            r_headers, r_body = new_resp(@app.call(env))

            if r_body.is_a?(::Rack::BodyProxy)
              stream.respond(r_headers, end_stream: false)
              r_body.each { |part|
                stream.send_data(part, end_stream: false)
              }
              stream.send_data(nil)
            else
              stream.respond(r_headers, r_body)
            end
          }
        end

        plum
      end

      def new_env(h, data)
        headers = h.group_by { |k, v| k }.map { |k, kvs|
          if k == "cookie"
            [k, kvs.map(&:last).join("; ")]
          else
            [k, kvs.first.last]
          end
        }.to_h

        cmethod = headers.delete(":method")
        cpath = headers.delete(":path")
        cpath_name, cpath_query = cpath.split("?", 2).map(&:to_s)
        cauthority = headers.delete(":authority")
        cscheme = headers.delete(":scheme")
        ebase = {
          "REQUEST_METHOD"    => cmethod,
          "SCRIPT_NAME"       => "",
          "PATH_INFO"         => cpath_name,
          "QUERY_STRING"      => cpath_query.to_s,
          "SERVER_NAME"       => cauthority.split(":").first,
          "SERVER_PORT"       => (cauthority.split(":").last || 443), # TODO: forwarded header (RFC 7239)
        }

        headers.each {|key, value|
          ebase["HTTP_" + key.gsub("-", "_").upcase] = value
        }

        ebase.merge!({
          "rack.version"      => ::Rack::VERSION,
          "rack.url_scheme"   => cscheme,
          "rack.input"        => StringIO.new(data),
          "rack.errors"       => $stderr,
          "rack.multithread"  => true,
          "rack.multiprocess" => false,
          "rack.run_once"     => false,
          "rack.hijack?"      => false,
        })

        ebase
      end

      def new_resp(app_call)
        r_status, r_h, r_body = app_call

        rbase = {
          ":status" => r_status,
          "server" => "plum/#{::Plum::VERSION}",
        }

        r_h.each do |key, v_|
          if key.start_with?("rack.")
            next
          end

          key = key.downcase.gsub(/^x-/, "")
          vs = v_.split("\n")
          if key == "set-cookie"
            rbase[key] = vs.join("; ") # RFC 7540 8.1.2.5
          else
            rbase[key] = vs.join(",") # RFC 7230 7
          end
        end

        [rbase, r_body]
      end
    end
  end
end