1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
|
module Collector
class CollectorProxyConnection < EM::Connection
@@_instance = nil
def self.instance
@@_instance
end
attr_reader :connected, :last_stats
def initialize(queue)
@@_instance = self
@unpacker = MessagePack::Unpacker.new(symbolize_keys: true)
@queue = queue
@closing = false
@connected = false
@last_stats = nil
end
def post_init
send_message(event: :auth,
data: { secret_key: Settings.collector.secret_key })
end
def unbind
if @closing
log(:info, "Connection was closed.")
else
if @connected
log(:info, "Connection was closed unexpectedly.")
end
EM.add_timer(10) { try_reconnect }
end
end
def try_reconnect
reconnect(Settings.collector.proxy_host, Settings.collector.proxy_port)
post_init
end
def receive_data(data)
@unpacker.feed_each(data) do |msg|
unless msg.is_a?(Hash) && msg[:event]
log(:warn, "Unknown message: #{msg}")
next
end
parse_message(msg)
end
end
def exit
send_message(event: :exit, data: nil)
close_connection_after_writing
end
def register_account(account)
data = { id: account.id,
consumer_key: Settings.consumer.key,
consumer_secret: Settings.consumer.secret,
oauth_token: account.oauth_token,
oauth_token_secret: account.oauth_token_secret,
user_id: account.user_id }
send_message(event: :register, data: data)
end
def unregister_account(account)
data = { id: account.id,
user_id: account.user_id }
send_message(event: :unregister, data: data)
end
private
def parse_message(msg)
case msg[:event]
when "unauthorized"
log(:info, "Received unauthorized: ##{msg[:data][:id]}/#{msg[:data][:user_id]}")
@queue.push_unauthorized(msg)
when "user"
log(:debug, "Received user: #{msg[:identifier]}")
@queue.push_user(msg)
when "tweet"
log(:debug, "Received tweet: #{msg[:identifier]}")
@queue.push_tweet(msg)
when "favorite"
log(:debug, "Receive favorite: #{msg[:identifier]}")
@queue.push_favorite(msg)
when "unfavorite"
log(:debug, "Receive unfavorite: #{msg[:identifier]}")
@queue.push_unfavorite(msg)
when "retweet"
log(:debug, "Receive retweet: #{msg[:identifier]}")
@queue.push_retweet(msg)
when "delete"
log(:debug, "Receive delete: #{msg[:identifier]}")
@queue.push_delete(msg)
when "auth"
log(:info, "Connection authenticated.")
@connected = true
register_accounts
when "heartbeat"
log(:debug, "Heartbeat: #{msg[:data][:id]}")
send_message(event: :heartbeat, data: { id: msg[:data][:id] })
@last_stats = msg[:data][:stats]
else
log(:warn, "Unknown message: #{msg.inspect}")
end
end
def register_accounts
Account.active.each do |account|
register_account(account)
end
end
def send_message(data)
send_data(data.to_msgpack)
end
def log(level, message)
Rails.logger.__send__(level, "CollectorProxyConnection") { message }
end
end
end
|