diff options
author | Kazuki Yamaguchi <k@rhe.jp> | 2015-04-30 01:06:58 +0900 |
---|---|---|
committer | Kazuki Yamaguchi <k@rhe.jp> | 2015-04-30 01:10:49 +0900 |
commit | 2e1eea7875b192ba329c2c24852ea11069fe7f94 (patch) | |
tree | 54d0ddfef071c59908fbb619fb54f6f53cdfe99e | |
parent | 87fbdae4649f639a194b816453fafe57a4533d10 (diff) | |
download | aclog-2e1eea7875b192ba329c2c24852ea11069fe7f94.tar.gz |
add CollectorProxy
-rw-r--r-- | .gitignore | 1 | ||||
-rw-r--r-- | app/models/account.rb | 7 | ||||
-rw-r--r-- | app/views/about/status.html.haml | 5 | ||||
-rw-r--r-- | collector_proxy/Gemfile | 6 | ||||
-rw-r--r-- | collector_proxy/Gemfile.lock | 16 | ||||
-rw-r--r-- | collector_proxy/Rakefile | 12 | ||||
-rw-r--r-- | collector_proxy/lib/collector_connection.rb | 116 | ||||
-rw-r--r-- | collector_proxy/lib/collector_proxy.rb | 38 | ||||
-rw-r--r-- | collector_proxy/lib/event_channel.rb | 42 | ||||
-rw-r--r-- | collector_proxy/lib/node_manager.rb | 75 | ||||
-rw-r--r-- | collector_proxy/lib/worker_node_connection.rb | 117 | ||||
-rw-r--r-- | collector_proxy/settings.yml.example | 6 | ||||
-rw-r--r-- | config/settings.yml.example | 4 | ||||
-rw-r--r-- | lib/collector/collector_proxy_connection.rb | 125 | ||||
-rw-r--r-- | lib/collector/control_server.rb | 21 | ||||
-rw-r--r-- | lib/collector/daemon.rb | 7 | ||||
-rw-r--r-- | lib/collector/node_connection.rb | 148 | ||||
-rw-r--r-- | lib/collector/node_manager.rb | 55 |
18 files changed, 567 insertions, 234 deletions
@@ -12,6 +12,7 @@ /config/database.yml /worker_node/settings.yml +/collector_proxy/settings.yml .* *~ diff --git a/app/models/account.rb b/app/models/account.rb index d4486b9..eba1b00 100644 --- a/app/models/account.rb +++ b/app/models/account.rb @@ -2,7 +2,6 @@ class Account < ActiveRecord::Base enum status: { active: 0, inactive: 1, revoked: 2 } belongs_to :user - scope :for_node, ->(block_number) { active.where("id % ? = ?", Settings.collector.nodes_count, block_number) } # Returns whether tweet notification is enabled for this user. def notification_enabled?; notification_enabled end @@ -63,10 +62,4 @@ class Account < ActiveRecord::Base Set.new client.friend_ids end end - - # Returns the worker id collecting tweets for this account. - # @return [Integer] - def worker_number - id % Settings.collector.nodes_count - end end diff --git a/app/views/about/status.html.haml b/app/views/about/status.html.haml index c8b2149..5bcb2ae 100644 --- a/app/views/about/status.html.haml +++ b/app/views/about/status.html.haml @@ -24,11 +24,6 @@ %td - %td Down %td - - - if logged_in? - - if your_node = @worker_status["active_node_statuses"][current_user.account.worker_number] - %p Worker ##{your_node["connection_id"]} is assigned to your account. - - else - %p Worker for group #{current_user.account.worker_number} is currently down. - else .alert.alert-danger %strong Couldn't communicate with the collector service. diff --git a/collector_proxy/Gemfile b/collector_proxy/Gemfile new file mode 100644 index 0000000..907b7b1 --- /dev/null +++ b/collector_proxy/Gemfile @@ -0,0 +1,6 @@ +source "https://rubygems.org" + +gem "rake" +gem "msgpack" +gem "eventmachine" +gem "dalli" diff --git a/collector_proxy/Gemfile.lock b/collector_proxy/Gemfile.lock new file mode 100644 index 0000000..c0f7be5 --- /dev/null +++ b/collector_proxy/Gemfile.lock @@ -0,0 +1,16 @@ +GEM + remote: https://rubygems.org/ + specs: + dalli (2.7.4) + eventmachine (1.0.7) + msgpack (0.5.11) + rake (10.4.2) + +PLATFORMS + ruby + +DEPENDENCIES + dalli + eventmachine + msgpack + rake diff --git a/collector_proxy/Rakefile b/collector_proxy/Rakefile new file mode 100644 index 0000000..16df55b --- /dev/null +++ b/collector_proxy/Rakefile @@ -0,0 +1,12 @@ +$:.unshift File.expand_path("../lib/", __FILE__) +require "collector_proxy" + +$stdout.sync = true +$stderr.sync = true + +namespace :collector_proxy do + desc "Run collector proxy." + task :run do + CollectorProxy.run + end +end diff --git a/collector_proxy/lib/collector_connection.rb b/collector_proxy/lib/collector_connection.rb new file mode 100644 index 0000000..375e97e --- /dev/null +++ b/collector_proxy/lib/collector_connection.rb @@ -0,0 +1,116 @@ +class CollectorConnection < EM::Connection + @@_id = 0 + + def initialize + @@_id += 1 + @unpacker = MessagePack::Unpacker.new(symbolize_keys: true) + @authenticated = false + @closing = false + @heartbeats = Set.new + @subscribe_id = nil + end + + def post_init + # do nothing + end + + def unbind + @heartbeat_timer.cancel if @heartbeat_timer + if @closing + log(:info, "Connection was closed.") + else + if @authenticated + log(:info, "Connection was closed unexpectedly.") + EventChannel.unsubscribe(@subscribe_id) + end + end + end + + def receive_data(data) + @unpacker.feed_each(data) do |msg| + unless msg.is_a?(Hash) && msg[:event] + log(:warn, "Unknown message: #{msg}") + send_message(event: :error, data: "Unknown message.") + close_connection_after_writing + return + end + + parse_message(msg) + end + end + + private + def parse_message(msg) + unless @authenticated + if msg[:event] == "auth" + authenticate_node(msg[:data]) + else + log(:error, "Unauthenticated client: #{msg}") + send_message(event: :error, data: "You aren't authenticated.") + close_connection_after_writing + end + return + end + + case msg[:event] + when "exit" + log(:info, "Closing this connection...") + @closing = true + EventChannel.unsubscribe(@subscribe_id) + when "heartbeat" + log(:debug, "Heartbeat reply: #{msg[:data][:id]}") + @heartbeats.delete(msg[:data][:id]) + when "register" + log(:debug, "Registered account ##{msg[:data][:id]}") + NodeManager.register_account(msg[:data]) + when "unregister" + log(:debug, "Unregistered account ##{msg[:data][:id]}") + NodeManager.unregister_account(msg[:data]) + when "stats" + log(:debug, "Stats request") + # TODO + end + end + + def authenticate_node(data) + if data.key?(:secret_key) && Settings.secret_key == data[:secret_key] + log(:info, "Connection authenticated.") + send_message(event: :auth, data: nil) + @authenticated = true + @heartbeat_timer = EM.add_periodic_timer(10, &method(:heartbeat)) + @subscribe_id = EventChannel.subscribe {|message| send_message(message) } + else + log(:warn, "Invalid secret_key: #{data[:secret_key].inspect}") + send_message(event: :error, data: "Invalid secret_key.") + @closing = true + close_connection_after_writing + end + end + + def heartbeat + if @heartbeats.size > 2 # 30 sec + log(:warn, "Node is dead.") + @heartbeat_timer.cancel + @heartbeat_timer = nil + @closing = true + close_connection_after_writing + return + end + + id = Time.now.to_i + @heartbeats << id + send_message(event: :heartbeat, data: { id: id, stats: stats }) + end + + def stats + NodeManager.stats + end + + def send_message(data) + send_data(data.to_msgpack) + end + + def log(level, message) + CollectorProxy.logger.__send__(level, "CollectorConnection(##{@@_id})") { message } + end +end diff --git a/collector_proxy/lib/collector_proxy.rb b/collector_proxy/lib/collector_proxy.rb new file mode 100644 index 0000000..07804e9 --- /dev/null +++ b/collector_proxy/lib/collector_proxy.rb @@ -0,0 +1,38 @@ +Bundler.require +require "yaml" +require "logger" +require "event_channel" +require "node_manager" +require "collector_connection" +require "worker_node_connection" + +Settings = OpenStruct.new(YAML.load_file(File.expand_path("../../settings.yml", __FILE__))) + +class CollectorProxy + class << self + def run + EventChannel.setup + NodeManager.setup + + EM.run do + collector_connection = EM.start_server("0.0.0.0", Settings.collector_port, CollectorConnection) + worker_node_connections = EM.start_server("0.0.0.0", Settings.worker_node_port, WorkerNodeConnection) + + stop = proc do + EM.stop_server(worker_node_connections) + sleep 1 + EM.stop_server(collector_connection) + EM.stop + end + + Signal.trap(:INT, &stop) + Signal.trap(:TERM, &stop) + end + end + + def logger + @logger ||= Logger.new(STDOUT).tap {|l| + l.level = Logger.const_get(Settings.log_level.upcase) } + end + end +end diff --git a/collector_proxy/lib/event_channel.rb b/collector_proxy/lib/event_channel.rb new file mode 100644 index 0000000..5c394c2 --- /dev/null +++ b/collector_proxy/lib/event_channel.rb @@ -0,0 +1,42 @@ +class EventChannel + class << self + def setup + return if @dalli + @dalli = Dalli::Client.new(Settings.memcached, namespace: "aclog-collector-proxy:") + @queue = [] + @subscribers = {} + end + + def push(data) + raise ScriptError, "Call EventChannel.setup first" unless @dalli + if id = data[:identifier] + if @dalli.get(id) + CollectorProxy.logger.debug("UniqueChannel") { "Duplicate event: #{id}" } + return + else + @dalli.set(id, true) + end + end + if @subscribers.size > 0 + @subscribers.values.each do |blk| + blk.call(data) + end + else + @queue << data + end + end + alias << push + + def subscribe(&blk) + @subscribers[blk.__id__] = blk + while @queue.size > 0 + blk.call(@queue.shift) + end + blk.__id__ + end + + def unsubscribe(id) + @subscribers.delete(id) + end + end +end diff --git a/collector_proxy/lib/node_manager.rb b/collector_proxy/lib/node_manager.rb new file mode 100644 index 0000000..3f14a87 --- /dev/null +++ b/collector_proxy/lib/node_manager.rb @@ -0,0 +1,75 @@ +module NodeManager + class << self + attr_reader :node_connections, :active_connections, :inactive_connections + + def setup + @node_connections = [] + @active_connections = Array.new(Settings.nodes_count) + @inactive_connections = [] + @accounts = Array.new(Settings.nodes_count) { {} } + end + + def register(node_connection) + self.node_connections << node_connection + self.inactive_connections << node_connection + bind + end + + def unregister(node_connection) + self.node_connections.delete(node_connection) + if i = self.active_connections.find_index(node_connection) + self.active_connections[i] = nil + else + self.inactive_connections.delete(node_connection) + end + bind + end + + def register_account(account) + id = account[:id] + @accounts[id % Settings.nodes_count][id] = account + + if con = self.active_connections[id % Settings.nodes_count] + con.register_account(account) + end + end + + def unregister_account(account) + id = account[:id] + @accounts[id % Settings.nodes_count].delete(id) + + if con = self.active_connections[id % Settings.nodes_count] + con.unregister_account(account) + end + end + + def stats + actives = @active_connections.map {|con| + if con + { activated_time: con.activated_time.to_i, + connection_id: con.connection_id } + else + nil + end + } + + { active_node_statuses: actives } + end + + private + def bind + if first_inactive_id = self.active_connections.find_index(nil) + if con = self.inactive_connections.shift + self.active_connections[first_inactive_id] = con + con.activated_time = Time.now + CollectorProxy.logger.info("NodeManager") { "Registered node ##{con.connection_id} as group ##{first_inactive_id}" } + @accounts[first_inactive_id].values.each do |account| + con.register_account(account) + end + else + CollectorProxy.logger.warn("NodeManager") { "Not enough nodes: (#{self.active_connections.compact.size}/#{Settings.nodes_count})" } + end + end + end + end +end diff --git a/collector_proxy/lib/worker_node_connection.rb b/collector_proxy/lib/worker_node_connection.rb new file mode 100644 index 0000000..204b364 --- /dev/null +++ b/collector_proxy/lib/worker_node_connection.rb @@ -0,0 +1,117 @@ +class WorkerNodeConnection < EM::Connection + attr_reader :connection_id + attr_accessor :activated_time + + @@_id = 0 + + def initialize + @unpacker = MessagePack::Unpacker.new(symbolize_keys: true) + @connection_id = (@@_id += 1) + @authenticated = false + @closing = false + @heartbeats = Set.new + @activated_time = nil + end + + def post_init + # do nothing + end + + def unbind + @heartbeat_timer.cancel if @heartbeat_timer + if @closing + log(:info, "Connection was closed.") + else + if @authenticated + log(:warn, "Connection was closed unexpectedly.") + NodeManager.unregister(self) + end + end + end + + def receive_data(data) + @unpacker.feed_each(data) do |msg| + unless msg.is_a?(Hash) && msg[:event] + log(:warn, "Unknown message: #{msg}") + send_message(event: :error, data: "Unknown message.") + close_connection_after_writing + return + end + + parse_message(msg) + end + end + + def register_account(account) + send_message(event: "register", data: account) + end + + def unregister_account(account) + send_message(event: "unregister", data: account) + end + + private + def parse_message(msg) + unless @authenticated + if msg[:event] == "auth" + authenticate_node(msg[:data]) + else + log(:error, "Unauthenticated client: #{msg}") + send_message(event: :error, data: "You aren't authenticated.") + close_connection_after_writing + end + return + end + + case msg[:event] + when "exit" + log(:info, "Closing this connection...") + @closing = true + NodeManager.unregister(self) + when "heartbeat" + log(:debug, "Heartbeat reply: #{msg[:data]}") + @heartbeats.delete(msg[:data]) + else + EventChannel << msg + end + end + + def authenticate_node(data) + if data.key?(:secret_key) && Settings.secret_key == data[:secret_key] + log(:info, "Connection authenticated.") + send_message(event: :auth, data: nil) + @authenticated = true + @heartbeat_timer = EM.add_periodic_timer(10, &method(:heartbeat)) + NodeManager.register(self) + else + log(:warn, "Invalid secret_key: #{data[:secret_key].inspect}") + send_message(event: :error, data: "Invalid secret_key.") + @closing = true + close_connection_after_writing + end + end + + def heartbeat + if @heartbeats.size > 2 # 30 sec + log(:warn, "Node is dead.") + NodeManager.unregister(self) + @heartbeat_timer.cancel + @heartbeat_timer = nil + @closing = true + close_connection_after_writing + return + end + + id = Time.now.to_i + @heartbeats << id + send_message(event: :heartbeat, data: id) + end + + def send_message(data) + send_data(data.to_msgpack) + end + + def log(level, message) + CollectorProxy.logger.__send__(level, "WorkerNodeConnection(##{@connection_id})") { message } + end +end diff --git a/collector_proxy/settings.yml.example b/collector_proxy/settings.yml.example new file mode 100644 index 0000000..78cce4d --- /dev/null +++ b/collector_proxy/settings.yml.example @@ -0,0 +1,6 @@ +secret_key: secret_key +collector_port: 42107 +worker_node_port: 42106 +nodes_count: 1 +log_level: debug +memcached: "127.0.0.1:11211" diff --git a/config/settings.yml.example b/config/settings.yml.example index b1eeba2..dc849d9 100644 --- a/config/settings.yml.example +++ b/config/settings.yml.example @@ -5,9 +5,9 @@ default: &default secret: "consumer secret" collector: - server_port: 42106 + proxy_host: localhost + proxy_port: 42107 secret_key: "secret key to authorize workers" - nodes_count: 1 # workers count flush_interval: 3 notification: diff --git a/lib/collector/collector_proxy_connection.rb b/lib/collector/collector_proxy_connection.rb new file mode 100644 index 0000000..7f0f558 --- /dev/null +++ b/lib/collector/collector_proxy_connection.rb @@ -0,0 +1,125 @@ +module Collector + class CollectorProxyConnection < EM::Connection + @@_instance = nil + + def self.instance + @@_instance + end + + attr_reader :connected, :last_stats + + def initialize(queue) + @@_instance = self + @unpacker = MessagePack::Unpacker.new(symbolize_keys: true) + @queue = queue + @closing = false + @connected = false + @last_stats = nil + end + + def post_init + send_message(event: :auth, + data: { secret_key: Settings.collector.secret_key }) + end + + def unbind + if @closing + log(:info, "Connection was closed.") + else + if @connected + log(:info, "Connection was closed unexpectedly.") + end + + EM.add_timer(10) { try_reconnect } + end + end + + def try_reconnect + reconnect(Settings.collector.proxy_host, Settings.collector.proxy_port) + post_init + end + + def receive_data(data) + @unpacker.feed_each(data) do |msg| + unless msg.is_a?(Hash) && msg[:event] + log(:warn, "Unknown message: #{msg}") + next + end + + parse_message(msg) + end + end + + def exit + send_message(event: :exit, data: nil) + close_connection_after_writing + end + + def register_account(account) + data = { id: account.id, + consumer_key: Settings.consumer.key, + consumer_secret: Settings.consumer.secret, + oauth_token: account.oauth_token, + oauth_token_secret: account.oauth_token_secret, + user_id: account.user_id } + send_message(event: :register, data: data) + end + + def unregister_account(account) + data = { id: account.id, + user_id: account.user_id } + send_message(event: :unregister, data: data) + end + + private + def parse_message(msg) + case msg[:event] + when "unauthorized" + log(:info, "Received unauthorized: ##{msg[:data][:id]}/#{msg[:data][:user_id]}") + @queue.push_unauthorized(msg) + when "user" + log(:debug, "Received user: #{msg[:identifier]}") + @queue.push_user(msg) + when "tweet" + log(:debug, "Received tweet: #{msg[:identifier]}") + @queue.push_tweet(msg) + when "favorite" + log(:debug, "Receive favorite: #{msg[:identifier]}") + @queue.push_favorite(msg) + when "unfavorite" + log(:debug, "Receive unfavorite: #{msg[:identifier]}") + @queue.push_unfavorite(msg) + when "retweet" + log(:debug, "Receive retweet: #{msg[:identifier]}") + @queue.push_retweet(msg) + when "delete" + log(:debug, "Receive delete: #{msg[:identifier]}") + @queue.push_delete(msg) + when "auth" + log(:info, "Connection authenticated.") + @connected = true + register_accounts + when "heartbeat" + log(:debug, "Heartbeat: #{msg[:data][:id]}") + send_message(event: :heartbeat, data: { id: msg[:data][:id] }) + @last_stats = msg[:data][:stats] + else + log(:warn, "Unknown message: #{msg.inspect}") + end + end + + def register_accounts + Account.active.each do |account| + register_account(account) + end + end + + def send_message(data) + send_data(data.to_msgpack) + end + + def log(level, message) + Rails.logger.__send__(level, "CollectorProxyConnection") { message } + end + end +end diff --git a/lib/collector/control_server.rb b/lib/collector/control_server.rb index 904cd97..b643570 100644 --- a/lib/collector/control_server.rb +++ b/lib/collector/control_server.rb @@ -1,27 +1,20 @@ module Collector class ControlServer def register_account(account_id) - NodeManager.register_account(Account.find(account_id)) + CollectorProxyConnection.instance.register_account(Account.find(account_id)) end def deactivate_account(account_id) - NodeManager.unregister_account(Account.find(account_id)) + CollectorProxyConnection.instance.unregister_account(Account.find(account_id)) end def status - active_node_statuses = Settings.collector.nodes_count.times.map do |number| - node = NodeManager.active_connections[number] - if node - { activated_time: node.activated_time.to_i, - connection_id: node.connection_id } - else - nil - end + con = CollectorProxyConnection.instance + if con.connected + con.last_stats + else + nil end - - { start_time: Daemon.start_time.to_i, - active_node_statuses: active_node_statuses, - inactive_nodes_count: NodeManager.inactive_connections.size } end end end diff --git a/lib/collector/daemon.rb b/lib/collector/daemon.rb index 2363874..e9146c8 100644 --- a/lib/collector/daemon.rb +++ b/lib/collector/daemon.rb @@ -21,11 +21,12 @@ module Collector event_queue.flush end - nodes = EM.start_server("0.0.0.0", Settings.collector.server_port, Collector::NodeConnection, event_queue) + proxy_connection = EM.connect(Settings.collector.proxy_host, Settings.collector.proxy_port, Collector::CollectorProxyConnection, event_queue) - stop = -> _ do + stop = proc do control.stop - EM.stop_server(nodes) + proxy_connection.exit + EM.stop end diff --git a/lib/collector/node_connection.rb b/lib/collector/node_connection.rb deleted file mode 100644 index 1504b1c..0000000 --- a/lib/collector/node_connection.rb +++ /dev/null @@ -1,148 +0,0 @@ -require "set" - -module Collector - class NodeConnection < EM::Connection - attr_reader :connection_id - attr_accessor :activated_time - - @@_id = 0 - - def initialize(queue) - @unpacker = MessagePack::Unpacker.new(symbolize_keys: true) - @connection_id = (@@_id += 1) - @authenticated = false - @closing = false - @activated_time = nil - @queue = queue - @heartbeats = Set.new - end - - def unbind - @heartbeat_timer.cancel if @heartbeat_timer - if @closing - log(:info, "Connection was closed.") - else - log(:warn, "Connection was closed unexpectedly.") - NodeManager.unregister(self) - end - end - - def receive_data(data) - @unpacker.feed_each(data) do |msg| - unless msg.is_a?(Hash) && msg[:event] - log(:warn, "Unknown message: #{msg}") - send_message(:error, text: "Unknown message.") - close_connection_after_writing - next - end - - parse_message(msg) - end - end - - def register_account(account) - send_message(event: :register, - data: { id: account.id, - consumer_key: Settings.consumer.key, - consumer_secret: Settings.consumer.secret, - oauth_token: account.oauth_token, - oauth_token_secret: account.oauth_token_secret, - user_id: account.user_id }) - log(:info, "Registered account ##{account.id}/#{account.user_id}") - end - - def unregister_account(account) - send_message(event: :unregister, - data: { id: account.id, - user_id: account.user_id }) - log(:info, "Unregistered account ##{account.id}/#{account.user_id}") - end - - private - def parse_message(msg) - unless @authenticated - if msg[:event] == "auth" - authenticate_node(msg[:data]) - else - log(:error, "Unauthenticated client: #{msg}") - send_message(event: :error, data: "You aren't authenticated.") - close_connection_after_writing - end - return - end - - case msg[:event] - when "unauthorized" - log(:info, "Received unauthorized: ##{msg[:data][:id]}/#{msg[:data][:user_id]}") - @queue.push_unauthorized(msg) - when "user" - log(:debug, "Received user: #{msg[:identifier]}") - @queue.push_user(msg) - when "tweet" - log(:debug, "Received tweet: #{msg[:identifier]}") - @queue.push_tweet(msg) - when "favorite" - log(:debug, "Receive favorite: #{msg[:identifier]}") - @queue.push_favorite(msg) - when "unfavorite" - log(:debug, "Receive unfavorite: #{msg[:identifier]}") - @queue.push_unfavorite(msg) - when "retweet" - log(:debug, "Receive retweet: #{msg[:identifier]}") - @queue.push_retweet(msg) - when "delete" - log(:debug, "Receive delete: #{msg[:identifier]}") - @queue.push_delete(msg) - when "exit" - log(:info, "Closing this connection...") - @closing = true - NodeManager.unregister(self) - when "heartbeat" - log(:debug, "Heartbeat reply: #{msg[:data]}") - @heartbeats.delete(msg[:data]) - else - log(:warn, "Unknown message: #{msg.inspect}") - send_message(event: :error, data: "Unknown message.") - end - end - - def authenticate_node(data) - if data.key?(:secret_key) && Settings.collector.secret_key == data[:secret_key] - @authenticated = true - log(:info, "Connection authenticated.") - send_message(event: :auth, data: nil) - NodeManager.register(self) - @heartbeat_timer = EM.add_periodic_timer(10, &method(:heartbeat)) - else - log(:warn, "Invalid secret_key: #{secret_key.inspect}") - send_message(event: :error, data: "Invalid secret_key.") - close_connection_after_writing - return - end - end - - def send_message(data) - send_data(data.to_msgpack) - end - - def heartbeat - if @heartbeats.size > 2 # 30 sec - log(:warn, "Node is dead.") - NodeManager.unregister(self) - @heartbeat_timer.cancel - @heartbeat_timer = nil - @closing = true - close_connection_after_writing - return - end - - id = Time.now.to_i - @heartbeats << id - send_message(event: :heartbeat, data: id) - end - - def log(level, message) - Rails.logger.__send__(level, "Node(#{@connection_id})") { message } - end - end -end diff --git a/lib/collector/node_manager.rb b/lib/collector/node_manager.rb deleted file mode 100644 index e58433e..0000000 --- a/lib/collector/node_manager.rb +++ /dev/null @@ -1,55 +0,0 @@ -module Collector - module NodeManager - @node_connections = [] - @active_connections = Array.new(Settings.collector.nodes_count) - @inactive_connections = [] - - class << self - attr_reader :node_connections, :active_connections, :inactive_connections - - def register(node_connection) - self.node_connections << node_connection - self.inactive_connections << node_connection - bind - end - - def unregister(node_connection) - self.node_connections.delete(node_connection) - if i = self.active_connections.find_index(node_connection) - self.active_connections[i] = nil - else - self.inactive_connections.delete(node_connection) - end - bind - end - - def register_account(account) - if con = self.active_connections[account.worker_number] - con.register_account(account) - end - end - - def unregister_account(account) - if con = self.active_connections[account.worker_number] - con.unregister_account(account) - end - end - - private - def bind - if first_inactive_id = self.active_connections.find_index(nil) - if con = self.inactive_connections.shift - self.active_connections[first_inactive_id] = con - con.activated_time = Time.now - Rails.logger.warn("NodeManager") { "Registered node ##{con.connection_id} as group ##{first_inactive_id}" } - Account.for_node(first_inactive_id).each do |a| - con.register_account(a) - end - else - Rails.logger.warn("NodeManager") { "Not enough nodes: (#{self.active_connections.count {|c| c }}/#{Settings.collector.nodes_count})" } - end - end - end - end - end -end |