blob: 7c3fc481ec7f966955a1724fb21c411354836363 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
|
# -*- frozen-string-literal: true -*-
module Plum
module Rack
class ThreadPool
def initialize(size = 20)
@workers = Set.new
@jobs = Queue.new
@tags = {}
@cancels = Set.new
@mutex = Mutex.new
size.times { |i|
spawn_worker
}
end
# returns cancel token
def acquire(tag = nil, err = nil, &blk)
@jobs << [blk, err, tag]
tag
end
def cancel(tag)
worker = @mutex.synchronize { @tags.delete?(tag) || (@cancels << tag; return) }
@workers.delete(worker)
worker.kill
spawn_worker
end
private
def spawn_worker
t = Thread.new {
while true
job, err, tag = @jobs.pop
if tag
next if @mutex.synchronize {
c = @cancels.delete?(tag)
@tags[tag] = self unless c
c
}
end
begin
job.call
rescue => e
err << e if err
end
@mutex.synchronize { @tags.delete(tag) } if tag
end
}
@workers << t
end
end
end
end
|