diff options
author | Rhenium <rhenium@rhe.jp> | 2014-04-04 20:09:40 +0900 |
---|---|---|
committer | Rhenium <rhenium@rhe.jp> | 2014-04-04 20:09:40 +0900 |
commit | 7689f1752345523a0be2837ec5f55f2e0a9c0b37 (patch) | |
tree | 1a2284aea0e7570fc8b6a8259b0e78773856181f /lib | |
parent | b20e7e0641760bb07c22ba457f53661b21cff904 (diff) | |
download | aclog-7689f1752345523a0be2837ec5f55f2e0a9c0b37.tar.gz |
reorganize workers
Diffstat (limited to 'lib')
-rw-r--r-- | lib/collector/collector_connection.rb | 138 | ||||
-rw-r--r-- | lib/collector/control_server.rb | 11 | ||||
-rw-r--r-- | lib/collector/daemon.rb | 48 | ||||
-rw-r--r-- | lib/collector/node_connection.rb | 118 | ||||
-rw-r--r-- | lib/collector/node_manager.rb | 58 | ||||
-rw-r--r-- | lib/collector/register_server.rb | 22 |
6 files changed, 211 insertions, 184 deletions
diff --git a/lib/collector/collector_connection.rb b/lib/collector/collector_connection.rb deleted file mode 100644 index 1af298b..0000000 --- a/lib/collector/collector_connection.rb +++ /dev/null @@ -1,138 +0,0 @@ -module Collector - class CollectorConnection < EM::Connection - def initialize(channel, connections) - @channel = channel - @connections = connections - - @worker_number = nil - @unpacker = MessagePack::Unpacker.new(symbolize_keys: true) - end - - def send_account(account) - send_object(type: "account", - id: account.id, - consumer_key: Settings.consumer.key, - consumer_secret: Settings.consumer.secret, - oauth_token: account.oauth_token, - oauth_token_secret: account.oauth_token_secret, - user_id: account.user_id) - log(:debug, "send: #{account.id}/#{account.user_id}") - end - - def send_stop_account(account) - send_object(type: "stop", - id: account.id) - log(:debug, "send stop: #{account.id}/#{account.user_id}") - end - - def post_init - end - - def unbind - @connections.reject! {|k, v| v == self } - log(:info, "connection closed") - end - - def receive_data(data) - @unpacker.feed_each(data) do |msg| - unless msg.is_a?(Hash) && msg[:type] - log(:error, "unknown data: #{msg}") - send_object(type: "fatal", message: "unknown data") - close_connection_after_writing - return - end - - unless @authorized - if msg[:type] == "auth" - auth(msg) - else - log(:warn, "not authorized client: #{msg}") - send_object(type: "fatal", message: "You aren't authorized") - close_connection_after_writing - end - return - end - - case msg[:type] - when "unauthorized" - @channel << -> { - log(:warn, "unauthorized: ##{msg[:id]}/#{msg[:user_id]}") - } - when "tweet" - @channel << -> { - log(:debug, "receive tweet: #{msg[:id]}") - Tweet.create_from_json(msg) - } - when "favorite" - @channel << -> { - log(:debug, "receive favorite: #{msg[:source][:id]} => #{msg[:target_object][:id]}") - f = Favorite.create_from_json(msg) - Notification.notify_favorites_count(f.tweet) - } - when "unfavorite" - @channel << -> { - log(:debug, "receive unfavorite: #{msg[:source][:id]} => #{msg[:target_object][:id]}") - Favorite.destroy_from_json(msg) - } - when "retweet" - @channel << -> { - log(:debug, "receive retweet: #{msg[:user][:id]} => #{msg[:retweeted_status][:id]}") - Retweet.create_from_json(msg) - } - when "delete" - @channel << -> { - log(:debug, "receive delete: #{msg[:delete][:status][:id]}") - Tweet.destroy_from_json(msg) || Retweet.destroy_from_json(msg) - } - when "quit" - log(:info, "receive quit: #{msg[:reason]}") - send_data(type: "quit", message: "Bye") - close_connection_after_writing - else - log(:warn, "unknown message: #{msg[:type]}") - send_object(type: "error", message: "Unknown message type") - end - end - end - - private - def log(level, message) - text = "[RECEIVER" - text << ":#{@worker_number}" if @worker_number - text << "] #{message}" - Rails.logger.__send__(level, text) - end - - def send_object(data) - send_data(data.to_msgpack) - end - - def auth(msg) - secret_key = msg[:secret_key] - unless secret_key == Settings.collector.secret_key - log(:warn, "Invalid secret_key: \"#{secret_key}\"") - send_object(type: "fatal", message: "invalid secret_key") - close_connection_after_writing - return - end - - worker_number = (Settings.collector.count.times.to_a - @connections.keys).sort.first - if worker_number == nil - log(:warn, "all connection alive") - send_object(type: "error", message: "all connection alive") - close_connection_after_writing - return - end - - @connections[worker_number] = self - @worker_number = worker_number - @authorized = true - log(:info, "connect") - send_object(type: "ok", message: "connected") - - Account.set_of_collector(@worker_number).each do |account| - send_account(account) - end - end - end -end diff --git a/lib/collector/control_server.rb b/lib/collector/control_server.rb new file mode 100644 index 0000000..413345e --- /dev/null +++ b/lib/collector/control_server.rb @@ -0,0 +1,11 @@ +module Collector + class ControlServer + def register_account(account) + NodeManager.register_account(Marshal.load(account)) + end + + def deactivate_account(account) + NodeManager.unregister_account(Marshal.load(account)) + end + end +end diff --git a/lib/collector/daemon.rb b/lib/collector/daemon.rb index b9a8b3e..8c7194f 100644 --- a/lib/collector/daemon.rb +++ b/lib/collector/daemon.rb @@ -2,35 +2,35 @@ require "msgpack/rpc/transport/unix" module Collector class Daemon - def self.start - _logger = Logger.new(STDOUT) - _logger.level = Rails.env.production? ? Logger::INFO : Logger::DEBUG - ActiveRecord::Base.logger = Rails.logger = _logger + class << self + def start + set_loggers - _sock_path = File.join(Rails.root, "tmp", "sockets", "receiver.sock") + EM.run do + sock_path = File.join(Rails.root, "tmp", "sockets", "receiver.sock") + File.delete(sock_path) if File.exists?(sock_path) + control = MessagePack::RPC::Server.new + control.listen(MessagePack::RPC::UNIXServerTransport.new(sock_path), Collector::ControlServer.new) + EM.defer { control.run } - Rails.logger.info("Receiver started") - File.delete(_sock_path) if File.exists?(_sock_path) - EM.run do - channel = EM::Channel.new - EM.defer { channel.subscribe(&:call) } + nodes = EM.start_server("0.0.0.0", Settings.collector.server_port, Collector::NodeConnection) - connections = {} + stop = -> _ do + control.stop + EM.stop_server(nodes) + EM.stop + end - collector_server = EM.start_server("0.0.0.0", Settings.collector.server_port, CollectorConnection, channel, connections) - - reg_svr_listener = MessagePack::RPC::UNIXServerTransport.new(_sock_path) - register_server = MessagePack::RPC::Server.new - register_server.listen(reg_svr_listener, RegisterServer.new(connections)) - EM.defer { register_server.run } - - stop = Proc.new do - EM.stop_server(collector_server) - register_server.close - EM.stop + Signal.trap("INT", &stop) + Signal.trap("TERM", &stop) end - Signal.trap(:INT, &stop) - Signal.trap(:TERM, &stop) + end + + private + def set_loggers + _logger = Logger.new(STDOUT) + _logger.level = Rails.env.production? ? Logger::INFO : Logger::DEBUG + ActiveRecord::Base.logger = Rails.logger = _logger end end end diff --git a/lib/collector/node_connection.rb b/lib/collector/node_connection.rb new file mode 100644 index 0000000..9e44df7 --- /dev/null +++ b/lib/collector/node_connection.rb @@ -0,0 +1,118 @@ +module Collector + class NodeConnection < EM::Connection + attr_reader :connection_id + + @@_id = 0 + + def initialize + @unpacker = MessagePack::Unpacker.new(symbolize_keys: true) + @connection_id = (@@_id += 1) + @authenticated = false + @closing = false + end + + def unbind + if @closing + log(:info, "Connection was closed.") + else + log(:warn, "Connection was closed unexpectedly.") + NodeManager.unregister(self) + end + end + + def receive_data(data) + @unpacker.feed_each(data) do |msg| + unless msg.is_a?(Hash) && msg[:title] + log(:warn, "Unknown message: #{msg}") + send_message(:error, text: "Unknown message.") + close_connection_after_writing + next + end + + parse_message(msg) + end + end + + def register_account(account) + send_message(:register, + id: account.id, + consumer_key: Settings.consumer.key, + consumer_secret: Settings.consumer.secret, + oauth_token: account.oauth_token, + oauth_token_secret: account.oauth_token_secret, + user_id: account.user_id) + log(:info, "Registered account ##{account.id}/#{account.user_id}") + end + + def unregister_account(account) + send_message(:unregister, + id: account.id, + user_id: account.user_id) + log(:info, "Unregistered account ##{account.id}/#{account.user_id}") + end + + private + def parse_message(msg) + unless @authenticated + if msg[:title] == "auth" + authenticate_node(msg) + else + log(:error, "Unauthenticated client: #{msg}") + send_message(:fatal, text: "You aren't authenticate.") + close_connection_after_writing + end + return + end + + case msg[:title] + when "unauthorized" + log(:info, "Received unauthorized: ##{msg[:id]}/#{msg[:user_id]}") + when "tweet" + log(:debug, "Received tweet: #{msg[:id]}") + Tweet.create_from_json(msg) + when "favorite" + log(:debug, "Receive favorite: #{msg[:source][:id]} => #{msg[:target_object][:id]}") + f = Favorite.create_from_json(msg) + Notification.notify_favorites_count(f.tweet) + when "unfavorite" + log(:debug, "Receive unfavorite: #{msg[:source][:id]} => #{msg[:target_object][:id]}") + Favorite.destroy_from_json(msg) + when "retweet" + log(:debug, "Receive retweet: #{msg[:user][:id]} => #{msg[:retweeted_status][:id]}") + Retweet.create_from_json(msg) + when "delete" + log(:debug, "Receive delete: #{msg[:delete][:status][:id]}") + Tweet.destroy_from_json(msg) || Retweet.destroy_from_json(msg) + when "exit" + log(:info, "Closing this connection...") + @closing = true + NodeManager.unregister(self) + else + log(:warn, "Unknown message: #{msg[:title]}") + send_message(:error, text: "Unknown message.") + end + end + + def authenticate_node(msg) + if msg.key?(:secret_key) && Settings.collector.secret_key == msg[:secret_key] + @authenticated = true + log(:info, "Connection authenticated.") + send_message(:authenticated) + NodeManager.register(self) + else + log(:warn, "Invalid secret_key: #{secret_key.inspect}") + send_message(:fatal, text: "Invalid secret_key.") + close_connection_after_writing + return + end + end + + def send_message(title, hash = {}) + send_data(hash.merge(title: title).to_msgpack) + end + + def log(level, message) + Rails.logger.__send__(level, "[Node:#{@connection_id}] #{message}") + end + end +end diff --git a/lib/collector/node_manager.rb b/lib/collector/node_manager.rb new file mode 100644 index 0000000..8136fe8 --- /dev/null +++ b/lib/collector/node_manager.rb @@ -0,0 +1,58 @@ +module Collector + class NodeManager + @@node_connections = [] + @@active_connections = Array.new(Settings.collector.nodes_count) + @@inactive_connections = [] + + class << self + def register(node_connection) + @@node_connections << node_connection + @@inactive_connections << node_connection + bind + end + + def unregister(node_connection) + @@node_connections.delete(node_connection) + i = @@active_connections.find_index(node_connection) + if i + @@active_connections[i] = nil + else + @@inactive_connections.delete(node_connection) + end + bind + end + + def register_account(account) + n = account.id % Settings.collector.nodes_count + if @@active_connections[n] + @@active_connections[n].register_account(account) + end + end + + def unregister_account(account) + n = account.id % Settings.collector.nodes_count + if @@active_connections[n] + @@active_connections[n].unregister_account(account) + end + end + + private + def bind + first_inactive_id = @@active_connections.find_index(nil) + if first_inactive_id + con = @@inactive_connections.shift + if con + @@active_connections[first_inactive_id] = con + Rails.logger.warn("[NodeManager] Registered node ##{con.connection_id} as group ##{first_inactive_id}") + Account.for_node(first_inactive_id).each do |a| + con.register_account(a) + end + else + Rails.logger.warn("[NodeManager] Not enough nodes: (#{@@active_connections.count {|c| c }}/#{Settings.collector.nodes_count})") + end + end + end + + end + end +end diff --git a/lib/collector/register_server.rb b/lib/collector/register_server.rb deleted file mode 100644 index 21114ab..0000000 --- a/lib/collector/register_server.rb +++ /dev/null @@ -1,22 +0,0 @@ -module Collector - class RegisterServer - def initialize(connections) - @connections = connections - end - - def register(account_) - account = Marshal.load(account_) - con_num = account.id % Settings.collector.count - con = @connections[con_num] - if con - if account.active? - con.send_account(account) - else - con.send_stop_account(account) - end - else - Rails.logger.info("Connection not found: connection_number: #{con_num} / account_id: #{account.id}") - end - end - end -end |