1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
|
module Plum
module Rack
class Connection
attr_reader :app, :sock, :plum
def initialize(app, sock, logger)
@app = app
@sock = sock
@logger = logger
end
def stop
@sock.close # TODO: gracefully shutdown
end
def start
Thread.new {
begin
@sock = @sock.accept if @sock.respond_to?(:accept) # SSLSocket
@plum = setup_plum
@plum.run
rescue Errno::EPIPE, Errno::ECONNRESET => e
@logger.debug("connection closed: #{e}")
rescue StandardError => e
@logger.error("#{e.class}: #{e.message}\n#{e.backtrace.map { |b| "\t#{b}" }.join("\n")}")
end
}
end
private
def setup_plum
plum = ::Plum::HTTPConnection.new(@sock)
plum.on(:connection_error) { |ex| @logger.error(ex) }
plum.on(:stream) do |stream|
stream.on(:stream_error) { |ex| @logger.error(ex) }
headers = data = nil
stream.on(:open) {
headers = nil
data = "".b
}
stream.on(:headers) { |h|
@logger.debug("headers: " + h.map {|name, value| "#{name}: #{value}" }.join(" // "))
headers = h
}
stream.on(:data) { |d|
@logger.debug("data: #{d.bytesize}")
data << d # TODO: store to file?
}
stream.on(:end_stream) {
env = new_env(headers, data)
r_headers, r_body = new_resp(@app.call(env))
if r_body.is_a?(::Rack::BodyProxy)
stream.respond(r_headers, end_stream: false)
r_body.each { |part|
stream.send_data(part, end_stream: false)
}
stream.send_data(nil)
else
stream.respond(r_headers, r_body)
end
}
end
plum
end
def new_env(h, data)
headers = h.group_by { |k, v| k }.map { |k, kvs|
if k == "cookie"
[k, kvs.map(&:last).join("; ")]
else
[k, kvs.first.last]
end
}.to_h
cmethod = headers.delete(":method")
cpath = headers.delete(":path")
cpath_name, cpath_query = cpath.split("?", 2).map(&:to_s)
cauthority = headers.delete(":authority")
cscheme = headers.delete(":scheme")
ebase = {
"REQUEST_METHOD" => cmethod,
"SCRIPT_NAME" => "",
"PATH_INFO" => cpath_name,
"QUERY_STRING" => cpath_query.to_s,
"SERVER_NAME" => cauthority.split(":").first,
"SERVER_PORT" => (cauthority.split(":").last || 443), # TODO: forwarded header (RFC 7239)
}
headers.each {|key, value|
ebase["HTTP_" + key.gsub("-", "_").upcase] = value
}
ebase.merge!({
"rack.version" => ::Rack::VERSION,
"rack.url_scheme" => cscheme,
"rack.input" => StringIO.new(data),
"rack.errors" => $stderr,
"rack.multithread" => true,
"rack.multiprocess" => false,
"rack.run_once" => false,
"rack.hijack?" => false,
})
ebase
end
def new_resp(app_call)
r_status, r_h, r_body = app_call
rbase = {
":status" => r_status,
"server" => "plum/#{::Plum::VERSION}",
}
r_h.each do |key, v_|
if key.start_with?("rack.")
next
end
key = key.downcase.gsub(/^x-/, "")
vs = v_.split("\n")
if key == "set-cookie"
rbase[key] = vs.join("; ") # RFC 7540 8.1.2.5
else
rbase[key] = vs.join(",") # RFC 7230 7
end
end
[rbase, r_body]
end
end
end
end
|