1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
|
module Collector
class CollectorProxyConnection < EM::Connection
@@_instance = nil
def self.instance
@@_instance
end
attr_reader :connected, :last_stats
def initialize(queue)
@@_instance = self
@unpacker = MessagePack::Unpacker.new(symbolize_keys: true)
@queue = queue
@closing = false
@connected = false
@last_stats = nil
end
def post_init
send_message(event: :auth,
data: { secret_key: Settings.collector.secret_key })
end
def unbind
if @closing
log(:info, "Connection was closed.")
@connected = false
else
if @connected
log(:info, "Connection was closed unexpectedly.")
@connected = false
end
EM.add_timer(10) { try_reconnect }
end
end
def try_reconnect
reconnect(Settings.collector.proxy_host, Settings.collector.proxy_port)
post_init
end
def receive_data(data)
@unpacker.feed_each(data) do |msg|
unless msg.is_a?(Hash) && msg[:event]
log(:warn, "Unknown message: #{msg}")
next
end
begin
parse_message(msg)
rescue
log(:error, "Failed to parse message: #{msg}")
end
end
rescue
log(:fatal, "Failed to parse data: #{data}")
end
def exit
send_message(event: :exit, data: nil)
close_connection_after_writing
end
def register_account(account)
data = { id: account.id,
consumer_key: Settings.consumer.key,
consumer_secret: Settings.consumer.secret,
oauth_token: account.oauth_token,
oauth_token_secret: account.oauth_token_secret,
user_id: account.user_id }
send_message(event: :register, data: data)
end
def unregister_account(account)
data = { id: account.id,
user_id: account.user_id }
send_message(event: :unregister, data: data)
end
private
def parse_message(msg)
case msg[:event]
when "unauthorized"
log(:info, "Received unauthorized: ##{msg[:data][:id]}/#{msg[:data][:user_id]}")
@queue.push_unauthorized(msg)
when "user"
log(:debug, "Received user: #{msg[:identifier]}")
@queue.push_user(msg)
when "tweet"
log(:debug, "Received tweet: #{msg[:identifier]}")
@queue.push_tweet(msg)
when "favorite"
log(:debug, "Receive favorite: #{msg[:identifier]}")
@queue.push_favorite(msg)
when "unfavorite"
log(:debug, "Receive unfavorite: #{msg[:identifier]}")
@queue.push_unfavorite(msg)
when "retweet"
log(:debug, "Receive retweet: #{msg[:identifier]}")
@queue.push_retweet(msg)
when "delete"
log(:debug, "Receive delete: #{msg[:identifier]}")
@queue.push_delete(msg)
when "auth"
log(:info, "Connection authenticated.")
@connected = true
register_accounts
when "heartbeat"
log(:debug, "Heartbeat: #{msg[:data][:id]}")
send_message(event: :heartbeat, data: { id: msg[:data][:id] })
@last_stats = msg[:data][:stats]
else
log(:warn, "Unknown message: #{msg.inspect}")
end
end
def register_accounts
Account.active.each do |account|
register_account(account)
end
end
def send_message(data)
send_data(data.to_msgpack)
end
def log(level, message)
Rails.logger.__send__(level, "CollectorProxyConnection") { message }
end
end
end
|