1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
|
# -*- frozen-string-literal: true -*-
module Plum
module Rack
class Server
attr_reader :config
def initialize(app, config)
@config = config
@state = :null
@app = config[:debug] ? ::Rack::CommonLogger.new(app) : app
@logger = Logger.new(config[:log] || $stdout).tap { |l|
l.level = config[:debug] ? Logger::DEBUG : Logger::INFO
}
@listeners = config[:listeners].map { |lc|
lc[:listener].new(lc)
}
if @config[:threaded]
@threadpool = ThreadPool.new(@config[:threadpool_size])
end
@logger.info("Plum #{::Plum::VERSION}")
@logger.info("Config: #{config}")
if @config[:user]
drop_privileges
end
end
def start
@state = :running
while @state == :running
break if @listeners.empty?
begin
if ss = IO.select(@listeners, nil, nil, 2.0)
ss[0].each { |svr|
new_con(svr)
}
end
rescue Errno::EBADF, Errno::ENOTSOCK, IOError => e # closed
rescue StandardError => e
log_exception(e)
end
end
end
def stop
@state = :stop
@listeners.map(&:stop)
# TODO: gracefully shutdown connections
end
private
def new_con(svr)
sock = svr.accept
Thread.new {
begin
begin
sock = sock.accept if sock.respond_to?(:accept)
plum = svr.plum(sock)
con = Session.new(app: @app,
plum: plum,
sock: sock,
logger: @logger,
config: @config,
remote_addr: sock.peeraddr.last,
threadpool: @threadpool)
con.run
rescue ::Plum::LegacyHTTPError => e
@logger.info "legacy HTTP client: #{e}"
handle_legacy(e, sock)
end
rescue Errno::ECONNRESET, Errno::EPROTO, Errno::EINVAL, EOFError => e # closed
rescue StandardError => e
log_exception(e)
ensure
sock.close if sock
end
}
rescue Errno::ECONNRESET, Errno::ECONNABORTED, Errno::EPROTO, Errno::EINVAL => e # closed
sock.close if sock
rescue StandardError => e
log_exception(e)
sock.close if sock
end
def log_exception(e)
@logger.error("#{e.class}: #{e.message}\n#{e.backtrace.map { |b| "\t#{b}" }.join("\n")}")
end
def handle_legacy(e, sock)
if @config[:fallback_legacy_host]
@logger.info "legacy HTTP: fallbacking to: #{@config[:fallback_legacy_host]}:#{@config[:fallback_legacy_port]}"
upstream = TCPSocket.open(@config[:fallback_legacy_host], @config[:fallback_legacy_port])
upstream.write(e.buf) if e.buf
loop do
ret = IO.select([sock, upstream])
ret[0].each { |s|
a = s.readpartial(65536)
if s == upstream
sock.write(a)
else
upstream.write(a)
end
}
end
end
ensure
upstream.close if upstream
end
def drop_privileges
begin
user = @config[:user]
group = @config[:group] || user
@logger.info "Dropping process privilege to #{user}:#{group}"
cuid, cgid = Process.euid, Process.egid
tuid, tgid = Etc.getpwnam(user).uid, Etc.getgrnam(group).gid
Process.initgroups(user, tgid)
Process::GID.change_privilege(tgid)
Process::UID.change_privilege(tuid)
rescue Errno::EPERM => e
@ogger.fatal "Could not change privilege: #{e}"
exit 2
end
end
end
end
end
|