blob: 0ab99cb55f6112495188aa76bfd053a314dea087 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
|
# -*- coding: utf-8 -*-
require File.expand_path(File.join(File.dirname(__FILE__), 'utils'))
require 'delayer'
require 'delayer/deferred'
require 'set'
require 'thread'
require 'timeout'
# 渡されたブロックを順番に実行するクラス
class SerialThreadGroup
QueueExpire = Class.new(Timeout::Error)
# ブロックを同時に処理する個数。最大でこの数だけThreadが作られる
attr_accessor :max_threads
@@force_exit = false
def initialize
@lock = Monitor.new
@queue = Queue.new
@max_threads = 1
@thread_pool = Set.new
end
# 実行するブロックを新しく登録する
# ==== Args
# [proc] 実行するブロック
def push(proc=Proc.new)
return if @@force_exit
@lock.synchronize{
@queue.push(proc)
new_thread if 0 == @queue.num_waiting and @thread_pool.size < max_threads } end
alias new push
# 処理中なら真
def busy?
@thread_pool.any?{ |t| :run == t.status.to_sym } end
# 全てのserial threadの実行をキャンセルする。終了時の処理用
def self.force_exit!
notice "all Serial Thread Group jobs canceled."
@@force_exit = true end
private
# Threadが必要なら一つ立ち上げる。
# これ以上Threadが必要ない場合はtrueを返す。
def flush
return true if @@force_exit
@lock.synchronize{
@thread_pool.delete_if{ |t| not t.alive? }
if @thread_pool.size > max_threads
return true
elsif 0 == @queue.num_waiting and @thread_pool.size < max_threads
new_thread end }
false end
def new_thread
return if @@force_exit
@thread_pool << Thread.new{
begin
while proc = Timeout.timeout(1, QueueExpire){ @queue.pop }
break if @@force_exit
proc.call
break if flush
debugging_wait
Thread.pass end
rescue QueueExpire => e
;
rescue ThreadError => e
;
rescue Object => e
error e
abort
ensure
@lock.synchronize{
@thread_pool.delete(Thread.current) } end } end
end
# SerialThreadGroup のインスタンス。
# 同時実行数は1固定
SerialThread = SerialThreadGroup.new
|