aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorKazuki Yamaguchi <k@rhe.jp>2015-04-30 01:06:58 +0900
committerKazuki Yamaguchi <k@rhe.jp>2015-04-30 01:10:49 +0900
commit2e1eea7875b192ba329c2c24852ea11069fe7f94 (patch)
tree54d0ddfef071c59908fbb619fb54f6f53cdfe99e
parent87fbdae4649f639a194b816453fafe57a4533d10 (diff)
downloadaclog-2e1eea7875b192ba329c2c24852ea11069fe7f94.tar.gz
add CollectorProxy
-rw-r--r--.gitignore1
-rw-r--r--app/models/account.rb7
-rw-r--r--app/views/about/status.html.haml5
-rw-r--r--collector_proxy/Gemfile6
-rw-r--r--collector_proxy/Gemfile.lock16
-rw-r--r--collector_proxy/Rakefile12
-rw-r--r--collector_proxy/lib/collector_connection.rb116
-rw-r--r--collector_proxy/lib/collector_proxy.rb38
-rw-r--r--collector_proxy/lib/event_channel.rb42
-rw-r--r--collector_proxy/lib/node_manager.rb75
-rw-r--r--collector_proxy/lib/worker_node_connection.rb117
-rw-r--r--collector_proxy/settings.yml.example6
-rw-r--r--config/settings.yml.example4
-rw-r--r--lib/collector/collector_proxy_connection.rb125
-rw-r--r--lib/collector/control_server.rb21
-rw-r--r--lib/collector/daemon.rb7
-rw-r--r--lib/collector/node_connection.rb148
-rw-r--r--lib/collector/node_manager.rb55
18 files changed, 567 insertions, 234 deletions
diff --git a/.gitignore b/.gitignore
index 269731e..a13cc54 100644
--- a/.gitignore
+++ b/.gitignore
@@ -12,6 +12,7 @@
/config/database.yml
/worker_node/settings.yml
+/collector_proxy/settings.yml
.*
*~
diff --git a/app/models/account.rb b/app/models/account.rb
index d4486b9..eba1b00 100644
--- a/app/models/account.rb
+++ b/app/models/account.rb
@@ -2,7 +2,6 @@ class Account < ActiveRecord::Base
enum status: { active: 0, inactive: 1, revoked: 2 }
belongs_to :user
- scope :for_node, ->(block_number) { active.where("id % ? = ?", Settings.collector.nodes_count, block_number) }
# Returns whether tweet notification is enabled for this user.
def notification_enabled?; notification_enabled end
@@ -63,10 +62,4 @@ class Account < ActiveRecord::Base
Set.new client.friend_ids
end
end
-
- # Returns the worker id collecting tweets for this account.
- # @return [Integer]
- def worker_number
- id % Settings.collector.nodes_count
- end
end
diff --git a/app/views/about/status.html.haml b/app/views/about/status.html.haml
index c8b2149..5bcb2ae 100644
--- a/app/views/about/status.html.haml
+++ b/app/views/about/status.html.haml
@@ -24,11 +24,6 @@
%td -
%td Down
%td -
- - if logged_in?
- - if your_node = @worker_status["active_node_statuses"][current_user.account.worker_number]
- %p Worker ##{your_node["connection_id"]} is assigned to your account.
- - else
- %p Worker for group #{current_user.account.worker_number} is currently down.
- else
.alert.alert-danger
%strong Couldn't communicate with the collector service.
diff --git a/collector_proxy/Gemfile b/collector_proxy/Gemfile
new file mode 100644
index 0000000..907b7b1
--- /dev/null
+++ b/collector_proxy/Gemfile
@@ -0,0 +1,6 @@
+source "https://rubygems.org"
+
+gem "rake"
+gem "msgpack"
+gem "eventmachine"
+gem "dalli"
diff --git a/collector_proxy/Gemfile.lock b/collector_proxy/Gemfile.lock
new file mode 100644
index 0000000..c0f7be5
--- /dev/null
+++ b/collector_proxy/Gemfile.lock
@@ -0,0 +1,16 @@
+GEM
+ remote: https://rubygems.org/
+ specs:
+ dalli (2.7.4)
+ eventmachine (1.0.7)
+ msgpack (0.5.11)
+ rake (10.4.2)
+
+PLATFORMS
+ ruby
+
+DEPENDENCIES
+ dalli
+ eventmachine
+ msgpack
+ rake
diff --git a/collector_proxy/Rakefile b/collector_proxy/Rakefile
new file mode 100644
index 0000000..16df55b
--- /dev/null
+++ b/collector_proxy/Rakefile
@@ -0,0 +1,12 @@
+$:.unshift File.expand_path("../lib/", __FILE__)
+require "collector_proxy"
+
+$stdout.sync = true
+$stderr.sync = true
+
+namespace :collector_proxy do
+ desc "Run collector proxy."
+ task :run do
+ CollectorProxy.run
+ end
+end
diff --git a/collector_proxy/lib/collector_connection.rb b/collector_proxy/lib/collector_connection.rb
new file mode 100644
index 0000000..375e97e
--- /dev/null
+++ b/collector_proxy/lib/collector_connection.rb
@@ -0,0 +1,116 @@
+class CollectorConnection < EM::Connection
+ @@_id = 0
+
+ def initialize
+ @@_id += 1
+ @unpacker = MessagePack::Unpacker.new(symbolize_keys: true)
+ @authenticated = false
+ @closing = false
+ @heartbeats = Set.new
+ @subscribe_id = nil
+ end
+
+ def post_init
+ # do nothing
+ end
+
+ def unbind
+ @heartbeat_timer.cancel if @heartbeat_timer
+ if @closing
+ log(:info, "Connection was closed.")
+ else
+ if @authenticated
+ log(:info, "Connection was closed unexpectedly.")
+ EventChannel.unsubscribe(@subscribe_id)
+ end
+ end
+ end
+
+ def receive_data(data)
+ @unpacker.feed_each(data) do |msg|
+ unless msg.is_a?(Hash) && msg[:event]
+ log(:warn, "Unknown message: #{msg}")
+ send_message(event: :error, data: "Unknown message.")
+ close_connection_after_writing
+ return
+ end
+
+ parse_message(msg)
+ end
+ end
+
+ private
+ def parse_message(msg)
+ unless @authenticated
+ if msg[:event] == "auth"
+ authenticate_node(msg[:data])
+ else
+ log(:error, "Unauthenticated client: #{msg}")
+ send_message(event: :error, data: "You aren't authenticated.")
+ close_connection_after_writing
+ end
+ return
+ end
+
+ case msg[:event]
+ when "exit"
+ log(:info, "Closing this connection...")
+ @closing = true
+ EventChannel.unsubscribe(@subscribe_id)
+ when "heartbeat"
+ log(:debug, "Heartbeat reply: #{msg[:data][:id]}")
+ @heartbeats.delete(msg[:data][:id])
+ when "register"
+ log(:debug, "Registered account ##{msg[:data][:id]}")
+ NodeManager.register_account(msg[:data])
+ when "unregister"
+ log(:debug, "Unregistered account ##{msg[:data][:id]}")
+ NodeManager.unregister_account(msg[:data])
+ when "stats"
+ log(:debug, "Stats request")
+ # TODO
+ end
+ end
+
+ def authenticate_node(data)
+ if data.key?(:secret_key) && Settings.secret_key == data[:secret_key]
+ log(:info, "Connection authenticated.")
+ send_message(event: :auth, data: nil)
+ @authenticated = true
+ @heartbeat_timer = EM.add_periodic_timer(10, &method(:heartbeat))
+ @subscribe_id = EventChannel.subscribe {|message| send_message(message) }
+ else
+ log(:warn, "Invalid secret_key: #{data[:secret_key].inspect}")
+ send_message(event: :error, data: "Invalid secret_key.")
+ @closing = true
+ close_connection_after_writing
+ end
+ end
+
+ def heartbeat
+ if @heartbeats.size > 2 # 30 sec
+ log(:warn, "Node is dead.")
+ @heartbeat_timer.cancel
+ @heartbeat_timer = nil
+ @closing = true
+ close_connection_after_writing
+ return
+ end
+
+ id = Time.now.to_i
+ @heartbeats << id
+ send_message(event: :heartbeat, data: { id: id, stats: stats })
+ end
+
+ def stats
+ NodeManager.stats
+ end
+
+ def send_message(data)
+ send_data(data.to_msgpack)
+ end
+
+ def log(level, message)
+ CollectorProxy.logger.__send__(level, "CollectorConnection(##{@@_id})") { message }
+ end
+end
diff --git a/collector_proxy/lib/collector_proxy.rb b/collector_proxy/lib/collector_proxy.rb
new file mode 100644
index 0000000..07804e9
--- /dev/null
+++ b/collector_proxy/lib/collector_proxy.rb
@@ -0,0 +1,38 @@
+Bundler.require
+require "yaml"
+require "logger"
+require "event_channel"
+require "node_manager"
+require "collector_connection"
+require "worker_node_connection"
+
+Settings = OpenStruct.new(YAML.load_file(File.expand_path("../../settings.yml", __FILE__)))
+
+class CollectorProxy
+ class << self
+ def run
+ EventChannel.setup
+ NodeManager.setup
+
+ EM.run do
+ collector_connection = EM.start_server("0.0.0.0", Settings.collector_port, CollectorConnection)
+ worker_node_connections = EM.start_server("0.0.0.0", Settings.worker_node_port, WorkerNodeConnection)
+
+ stop = proc do
+ EM.stop_server(worker_node_connections)
+ sleep 1
+ EM.stop_server(collector_connection)
+ EM.stop
+ end
+
+ Signal.trap(:INT, &stop)
+ Signal.trap(:TERM, &stop)
+ end
+ end
+
+ def logger
+ @logger ||= Logger.new(STDOUT).tap {|l|
+ l.level = Logger.const_get(Settings.log_level.upcase) }
+ end
+ end
+end
diff --git a/collector_proxy/lib/event_channel.rb b/collector_proxy/lib/event_channel.rb
new file mode 100644
index 0000000..5c394c2
--- /dev/null
+++ b/collector_proxy/lib/event_channel.rb
@@ -0,0 +1,42 @@
+class EventChannel
+ class << self
+ def setup
+ return if @dalli
+ @dalli = Dalli::Client.new(Settings.memcached, namespace: "aclog-collector-proxy:")
+ @queue = []
+ @subscribers = {}
+ end
+
+ def push(data)
+ raise ScriptError, "Call EventChannel.setup first" unless @dalli
+ if id = data[:identifier]
+ if @dalli.get(id)
+ CollectorProxy.logger.debug("UniqueChannel") { "Duplicate event: #{id}" }
+ return
+ else
+ @dalli.set(id, true)
+ end
+ end
+ if @subscribers.size > 0
+ @subscribers.values.each do |blk|
+ blk.call(data)
+ end
+ else
+ @queue << data
+ end
+ end
+ alias << push
+
+ def subscribe(&blk)
+ @subscribers[blk.__id__] = blk
+ while @queue.size > 0
+ blk.call(@queue.shift)
+ end
+ blk.__id__
+ end
+
+ def unsubscribe(id)
+ @subscribers.delete(id)
+ end
+ end
+end
diff --git a/collector_proxy/lib/node_manager.rb b/collector_proxy/lib/node_manager.rb
new file mode 100644
index 0000000..3f14a87
--- /dev/null
+++ b/collector_proxy/lib/node_manager.rb
@@ -0,0 +1,75 @@
+module NodeManager
+ class << self
+ attr_reader :node_connections, :active_connections, :inactive_connections
+
+ def setup
+ @node_connections = []
+ @active_connections = Array.new(Settings.nodes_count)
+ @inactive_connections = []
+ @accounts = Array.new(Settings.nodes_count) { {} }
+ end
+
+ def register(node_connection)
+ self.node_connections << node_connection
+ self.inactive_connections << node_connection
+ bind
+ end
+
+ def unregister(node_connection)
+ self.node_connections.delete(node_connection)
+ if i = self.active_connections.find_index(node_connection)
+ self.active_connections[i] = nil
+ else
+ self.inactive_connections.delete(node_connection)
+ end
+ bind
+ end
+
+ def register_account(account)
+ id = account[:id]
+ @accounts[id % Settings.nodes_count][id] = account
+
+ if con = self.active_connections[id % Settings.nodes_count]
+ con.register_account(account)
+ end
+ end
+
+ def unregister_account(account)
+ id = account[:id]
+ @accounts[id % Settings.nodes_count].delete(id)
+
+ if con = self.active_connections[id % Settings.nodes_count]
+ con.unregister_account(account)
+ end
+ end
+
+ def stats
+ actives = @active_connections.map {|con|
+ if con
+ { activated_time: con.activated_time.to_i,
+ connection_id: con.connection_id }
+ else
+ nil
+ end
+ }
+
+ { active_node_statuses: actives }
+ end
+
+ private
+ def bind
+ if first_inactive_id = self.active_connections.find_index(nil)
+ if con = self.inactive_connections.shift
+ self.active_connections[first_inactive_id] = con
+ con.activated_time = Time.now
+ CollectorProxy.logger.info("NodeManager") { "Registered node ##{con.connection_id} as group ##{first_inactive_id}" }
+ @accounts[first_inactive_id].values.each do |account|
+ con.register_account(account)
+ end
+ else
+ CollectorProxy.logger.warn("NodeManager") { "Not enough nodes: (#{self.active_connections.compact.size}/#{Settings.nodes_count})" }
+ end
+ end
+ end
+ end
+end
diff --git a/collector_proxy/lib/worker_node_connection.rb b/collector_proxy/lib/worker_node_connection.rb
new file mode 100644
index 0000000..204b364
--- /dev/null
+++ b/collector_proxy/lib/worker_node_connection.rb
@@ -0,0 +1,117 @@
+class WorkerNodeConnection < EM::Connection
+ attr_reader :connection_id
+ attr_accessor :activated_time
+
+ @@_id = 0
+
+ def initialize
+ @unpacker = MessagePack::Unpacker.new(symbolize_keys: true)
+ @connection_id = (@@_id += 1)
+ @authenticated = false
+ @closing = false
+ @heartbeats = Set.new
+ @activated_time = nil
+ end
+
+ def post_init
+ # do nothing
+ end
+
+ def unbind
+ @heartbeat_timer.cancel if @heartbeat_timer
+ if @closing
+ log(:info, "Connection was closed.")
+ else
+ if @authenticated
+ log(:warn, "Connection was closed unexpectedly.")
+ NodeManager.unregister(self)
+ end
+ end
+ end
+
+ def receive_data(data)
+ @unpacker.feed_each(data) do |msg|
+ unless msg.is_a?(Hash) && msg[:event]
+ log(:warn, "Unknown message: #{msg}")
+ send_message(event: :error, data: "Unknown message.")
+ close_connection_after_writing
+ return
+ end
+
+ parse_message(msg)
+ end
+ end
+
+ def register_account(account)
+ send_message(event: "register", data: account)
+ end
+
+ def unregister_account(account)
+ send_message(event: "unregister", data: account)
+ end
+
+ private
+ def parse_message(msg)
+ unless @authenticated
+ if msg[:event] == "auth"
+ authenticate_node(msg[:data])
+ else
+ log(:error, "Unauthenticated client: #{msg}")
+ send_message(event: :error, data: "You aren't authenticated.")
+ close_connection_after_writing
+ end
+ return
+ end
+
+ case msg[:event]
+ when "exit"
+ log(:info, "Closing this connection...")
+ @closing = true
+ NodeManager.unregister(self)
+ when "heartbeat"
+ log(:debug, "Heartbeat reply: #{msg[:data]}")
+ @heartbeats.delete(msg[:data])
+ else
+ EventChannel << msg
+ end
+ end
+
+ def authenticate_node(data)
+ if data.key?(:secret_key) && Settings.secret_key == data[:secret_key]
+ log(:info, "Connection authenticated.")
+ send_message(event: :auth, data: nil)
+ @authenticated = true
+ @heartbeat_timer = EM.add_periodic_timer(10, &method(:heartbeat))
+ NodeManager.register(self)
+ else
+ log(:warn, "Invalid secret_key: #{data[:secret_key].inspect}")
+ send_message(event: :error, data: "Invalid secret_key.")
+ @closing = true
+ close_connection_after_writing
+ end
+ end
+
+ def heartbeat
+ if @heartbeats.size > 2 # 30 sec
+ log(:warn, "Node is dead.")
+ NodeManager.unregister(self)
+ @heartbeat_timer.cancel
+ @heartbeat_timer = nil
+ @closing = true
+ close_connection_after_writing
+ return
+ end
+
+ id = Time.now.to_i
+ @heartbeats << id
+ send_message(event: :heartbeat, data: id)
+ end
+
+ def send_message(data)
+ send_data(data.to_msgpack)
+ end
+
+ def log(level, message)
+ CollectorProxy.logger.__send__(level, "WorkerNodeConnection(##{@connection_id})") { message }
+ end
+end
diff --git a/collector_proxy/settings.yml.example b/collector_proxy/settings.yml.example
new file mode 100644
index 0000000..78cce4d
--- /dev/null
+++ b/collector_proxy/settings.yml.example
@@ -0,0 +1,6 @@
+secret_key: secret_key
+collector_port: 42107
+worker_node_port: 42106
+nodes_count: 1
+log_level: debug
+memcached: "127.0.0.1:11211"
diff --git a/config/settings.yml.example b/config/settings.yml.example
index b1eeba2..dc849d9 100644
--- a/config/settings.yml.example
+++ b/config/settings.yml.example
@@ -5,9 +5,9 @@ default: &default
secret: "consumer secret"
collector:
- server_port: 42106
+ proxy_host: localhost
+ proxy_port: 42107
secret_key: "secret key to authorize workers"
- nodes_count: 1 # workers count
flush_interval: 3
notification:
diff --git a/lib/collector/collector_proxy_connection.rb b/lib/collector/collector_proxy_connection.rb
new file mode 100644
index 0000000..7f0f558
--- /dev/null
+++ b/lib/collector/collector_proxy_connection.rb
@@ -0,0 +1,125 @@
+module Collector
+ class CollectorProxyConnection < EM::Connection
+ @@_instance = nil
+
+ def self.instance
+ @@_instance
+ end
+
+ attr_reader :connected, :last_stats
+
+ def initialize(queue)
+ @@_instance = self
+ @unpacker = MessagePack::Unpacker.new(symbolize_keys: true)
+ @queue = queue
+ @closing = false
+ @connected = false
+ @last_stats = nil
+ end
+
+ def post_init
+ send_message(event: :auth,
+ data: { secret_key: Settings.collector.secret_key })
+ end
+
+ def unbind
+ if @closing
+ log(:info, "Connection was closed.")
+ else
+ if @connected
+ log(:info, "Connection was closed unexpectedly.")
+ end
+
+ EM.add_timer(10) { try_reconnect }
+ end
+ end
+
+ def try_reconnect
+ reconnect(Settings.collector.proxy_host, Settings.collector.proxy_port)
+ post_init
+ end
+
+ def receive_data(data)
+ @unpacker.feed_each(data) do |msg|
+ unless msg.is_a?(Hash) && msg[:event]
+ log(:warn, "Unknown message: #{msg}")
+ next
+ end
+
+ parse_message(msg)
+ end
+ end
+
+ def exit
+ send_message(event: :exit, data: nil)
+ close_connection_after_writing
+ end
+
+ def register_account(account)
+ data = { id: account.id,
+ consumer_key: Settings.consumer.key,
+ consumer_secret: Settings.consumer.secret,
+ oauth_token: account.oauth_token,
+ oauth_token_secret: account.oauth_token_secret,
+ user_id: account.user_id }
+ send_message(event: :register, data: data)
+ end
+
+ def unregister_account(account)
+ data = { id: account.id,
+ user_id: account.user_id }
+ send_message(event: :unregister, data: data)
+ end
+
+ private
+ def parse_message(msg)
+ case msg[:event]
+ when "unauthorized"
+ log(:info, "Received unauthorized: ##{msg[:data][:id]}/#{msg[:data][:user_id]}")
+ @queue.push_unauthorized(msg)
+ when "user"
+ log(:debug, "Received user: #{msg[:identifier]}")
+ @queue.push_user(msg)
+ when "tweet"
+ log(:debug, "Received tweet: #{msg[:identifier]}")
+ @queue.push_tweet(msg)
+ when "favorite"
+ log(:debug, "Receive favorite: #{msg[:identifier]}")
+ @queue.push_favorite(msg)
+ when "unfavorite"
+ log(:debug, "Receive unfavorite: #{msg[:identifier]}")
+ @queue.push_unfavorite(msg)
+ when "retweet"
+ log(:debug, "Receive retweet: #{msg[:identifier]}")
+ @queue.push_retweet(msg)
+ when "delete"
+ log(:debug, "Receive delete: #{msg[:identifier]}")
+ @queue.push_delete(msg)
+ when "auth"
+ log(:info, "Connection authenticated.")
+ @connected = true
+ register_accounts
+ when "heartbeat"
+ log(:debug, "Heartbeat: #{msg[:data][:id]}")
+ send_message(event: :heartbeat, data: { id: msg[:data][:id] })
+ @last_stats = msg[:data][:stats]
+ else
+ log(:warn, "Unknown message: #{msg.inspect}")
+ end
+ end
+
+ def register_accounts
+ Account.active.each do |account|
+ register_account(account)
+ end
+ end
+
+ def send_message(data)
+ send_data(data.to_msgpack)
+ end
+
+ def log(level, message)
+ Rails.logger.__send__(level, "CollectorProxyConnection") { message }
+ end
+ end
+end
diff --git a/lib/collector/control_server.rb b/lib/collector/control_server.rb
index 904cd97..b643570 100644
--- a/lib/collector/control_server.rb
+++ b/lib/collector/control_server.rb
@@ -1,27 +1,20 @@
module Collector
class ControlServer
def register_account(account_id)
- NodeManager.register_account(Account.find(account_id))
+ CollectorProxyConnection.instance.register_account(Account.find(account_id))
end
def deactivate_account(account_id)
- NodeManager.unregister_account(Account.find(account_id))
+ CollectorProxyConnection.instance.unregister_account(Account.find(account_id))
end
def status
- active_node_statuses = Settings.collector.nodes_count.times.map do |number|
- node = NodeManager.active_connections[number]
- if node
- { activated_time: node.activated_time.to_i,
- connection_id: node.connection_id }
- else
- nil
- end
+ con = CollectorProxyConnection.instance
+ if con.connected
+ con.last_stats
+ else
+ nil
end
-
- { start_time: Daemon.start_time.to_i,
- active_node_statuses: active_node_statuses,
- inactive_nodes_count: NodeManager.inactive_connections.size }
end
end
end
diff --git a/lib/collector/daemon.rb b/lib/collector/daemon.rb
index 2363874..e9146c8 100644
--- a/lib/collector/daemon.rb
+++ b/lib/collector/daemon.rb
@@ -21,11 +21,12 @@ module Collector
event_queue.flush
end
- nodes = EM.start_server("0.0.0.0", Settings.collector.server_port, Collector::NodeConnection, event_queue)
+ proxy_connection = EM.connect(Settings.collector.proxy_host, Settings.collector.proxy_port, Collector::CollectorProxyConnection, event_queue)
- stop = -> _ do
+ stop = proc do
control.stop
- EM.stop_server(nodes)
+ proxy_connection.exit
+
EM.stop
end
diff --git a/lib/collector/node_connection.rb b/lib/collector/node_connection.rb
deleted file mode 100644
index 1504b1c..0000000
--- a/lib/collector/node_connection.rb
+++ /dev/null
@@ -1,148 +0,0 @@
-require "set"
-
-module Collector
- class NodeConnection < EM::Connection
- attr_reader :connection_id
- attr_accessor :activated_time
-
- @@_id = 0
-
- def initialize(queue)
- @unpacker = MessagePack::Unpacker.new(symbolize_keys: true)
- @connection_id = (@@_id += 1)
- @authenticated = false
- @closing = false
- @activated_time = nil
- @queue = queue
- @heartbeats = Set.new
- end
-
- def unbind
- @heartbeat_timer.cancel if @heartbeat_timer
- if @closing
- log(:info, "Connection was closed.")
- else
- log(:warn, "Connection was closed unexpectedly.")
- NodeManager.unregister(self)
- end
- end
-
- def receive_data(data)
- @unpacker.feed_each(data) do |msg|
- unless msg.is_a?(Hash) && msg[:event]
- log(:warn, "Unknown message: #{msg}")
- send_message(:error, text: "Unknown message.")
- close_connection_after_writing
- next
- end
-
- parse_message(msg)
- end
- end
-
- def register_account(account)
- send_message(event: :register,
- data: { id: account.id,
- consumer_key: Settings.consumer.key,
- consumer_secret: Settings.consumer.secret,
- oauth_token: account.oauth_token,
- oauth_token_secret: account.oauth_token_secret,
- user_id: account.user_id })
- log(:info, "Registered account ##{account.id}/#{account.user_id}")
- end
-
- def unregister_account(account)
- send_message(event: :unregister,
- data: { id: account.id,
- user_id: account.user_id })
- log(:info, "Unregistered account ##{account.id}/#{account.user_id}")
- end
-
- private
- def parse_message(msg)
- unless @authenticated
- if msg[:event] == "auth"
- authenticate_node(msg[:data])
- else
- log(:error, "Unauthenticated client: #{msg}")
- send_message(event: :error, data: "You aren't authenticated.")
- close_connection_after_writing
- end
- return
- end
-
- case msg[:event]
- when "unauthorized"
- log(:info, "Received unauthorized: ##{msg[:data][:id]}/#{msg[:data][:user_id]}")
- @queue.push_unauthorized(msg)
- when "user"
- log(:debug, "Received user: #{msg[:identifier]}")
- @queue.push_user(msg)
- when "tweet"
- log(:debug, "Received tweet: #{msg[:identifier]}")
- @queue.push_tweet(msg)
- when "favorite"
- log(:debug, "Receive favorite: #{msg[:identifier]}")
- @queue.push_favorite(msg)
- when "unfavorite"
- log(:debug, "Receive unfavorite: #{msg[:identifier]}")
- @queue.push_unfavorite(msg)
- when "retweet"
- log(:debug, "Receive retweet: #{msg[:identifier]}")
- @queue.push_retweet(msg)
- when "delete"
- log(:debug, "Receive delete: #{msg[:identifier]}")
- @queue.push_delete(msg)
- when "exit"
- log(:info, "Closing this connection...")
- @closing = true
- NodeManager.unregister(self)
- when "heartbeat"
- log(:debug, "Heartbeat reply: #{msg[:data]}")
- @heartbeats.delete(msg[:data])
- else
- log(:warn, "Unknown message: #{msg.inspect}")
- send_message(event: :error, data: "Unknown message.")
- end
- end
-
- def authenticate_node(data)
- if data.key?(:secret_key) && Settings.collector.secret_key == data[:secret_key]
- @authenticated = true
- log(:info, "Connection authenticated.")
- send_message(event: :auth, data: nil)
- NodeManager.register(self)
- @heartbeat_timer = EM.add_periodic_timer(10, &method(:heartbeat))
- else
- log(:warn, "Invalid secret_key: #{secret_key.inspect}")
- send_message(event: :error, data: "Invalid secret_key.")
- close_connection_after_writing
- return
- end
- end
-
- def send_message(data)
- send_data(data.to_msgpack)
- end
-
- def heartbeat
- if @heartbeats.size > 2 # 30 sec
- log(:warn, "Node is dead.")
- NodeManager.unregister(self)
- @heartbeat_timer.cancel
- @heartbeat_timer = nil
- @closing = true
- close_connection_after_writing
- return
- end
-
- id = Time.now.to_i
- @heartbeats << id
- send_message(event: :heartbeat, data: id)
- end
-
- def log(level, message)
- Rails.logger.__send__(level, "Node(#{@connection_id})") { message }
- end
- end
-end
diff --git a/lib/collector/node_manager.rb b/lib/collector/node_manager.rb
deleted file mode 100644
index e58433e..0000000
--- a/lib/collector/node_manager.rb
+++ /dev/null
@@ -1,55 +0,0 @@
-module Collector
- module NodeManager
- @node_connections = []
- @active_connections = Array.new(Settings.collector.nodes_count)
- @inactive_connections = []
-
- class << self
- attr_reader :node_connections, :active_connections, :inactive_connections
-
- def register(node_connection)
- self.node_connections << node_connection
- self.inactive_connections << node_connection
- bind
- end
-
- def unregister(node_connection)
- self.node_connections.delete(node_connection)
- if i = self.active_connections.find_index(node_connection)
- self.active_connections[i] = nil
- else
- self.inactive_connections.delete(node_connection)
- end
- bind
- end
-
- def register_account(account)
- if con = self.active_connections[account.worker_number]
- con.register_account(account)
- end
- end
-
- def unregister_account(account)
- if con = self.active_connections[account.worker_number]
- con.unregister_account(account)
- end
- end
-
- private
- def bind
- if first_inactive_id = self.active_connections.find_index(nil)
- if con = self.inactive_connections.shift
- self.active_connections[first_inactive_id] = con
- con.activated_time = Time.now
- Rails.logger.warn("NodeManager") { "Registered node ##{con.connection_id} as group ##{first_inactive_id}" }
- Account.for_node(first_inactive_id).each do |a|
- con.register_account(a)
- end
- else
- Rails.logger.warn("NodeManager") { "Not enough nodes: (#{self.active_connections.count {|c| c }}/#{Settings.collector.nodes_count})" }
- end
- end
- end
- end
- end
-end