aboutsummaryrefslogtreecommitdiffstats
path: root/lib
diff options
context:
space:
mode:
authorRhenium <rhenium@rhe.jp>2014-04-04 20:09:40 +0900
committerRhenium <rhenium@rhe.jp>2014-04-04 20:09:40 +0900
commit7689f1752345523a0be2837ec5f55f2e0a9c0b37 (patch)
tree1a2284aea0e7570fc8b6a8259b0e78773856181f /lib
parentb20e7e0641760bb07c22ba457f53661b21cff904 (diff)
downloadaclog-7689f1752345523a0be2837ec5f55f2e0a9c0b37.tar.gz
reorganize workers
Diffstat (limited to 'lib')
-rw-r--r--lib/collector/collector_connection.rb138
-rw-r--r--lib/collector/control_server.rb11
-rw-r--r--lib/collector/daemon.rb48
-rw-r--r--lib/collector/node_connection.rb118
-rw-r--r--lib/collector/node_manager.rb58
-rw-r--r--lib/collector/register_server.rb22
6 files changed, 211 insertions, 184 deletions
diff --git a/lib/collector/collector_connection.rb b/lib/collector/collector_connection.rb
deleted file mode 100644
index 1af298b..0000000
--- a/lib/collector/collector_connection.rb
+++ /dev/null
@@ -1,138 +0,0 @@
-module Collector
- class CollectorConnection < EM::Connection
- def initialize(channel, connections)
- @channel = channel
- @connections = connections
-
- @worker_number = nil
- @unpacker = MessagePack::Unpacker.new(symbolize_keys: true)
- end
-
- def send_account(account)
- send_object(type: "account",
- id: account.id,
- consumer_key: Settings.consumer.key,
- consumer_secret: Settings.consumer.secret,
- oauth_token: account.oauth_token,
- oauth_token_secret: account.oauth_token_secret,
- user_id: account.user_id)
- log(:debug, "send: #{account.id}/#{account.user_id}")
- end
-
- def send_stop_account(account)
- send_object(type: "stop",
- id: account.id)
- log(:debug, "send stop: #{account.id}/#{account.user_id}")
- end
-
- def post_init
- end
-
- def unbind
- @connections.reject! {|k, v| v == self }
- log(:info, "connection closed")
- end
-
- def receive_data(data)
- @unpacker.feed_each(data) do |msg|
- unless msg.is_a?(Hash) && msg[:type]
- log(:error, "unknown data: #{msg}")
- send_object(type: "fatal", message: "unknown data")
- close_connection_after_writing
- return
- end
-
- unless @authorized
- if msg[:type] == "auth"
- auth(msg)
- else
- log(:warn, "not authorized client: #{msg}")
- send_object(type: "fatal", message: "You aren't authorized")
- close_connection_after_writing
- end
- return
- end
-
- case msg[:type]
- when "unauthorized"
- @channel << -> {
- log(:warn, "unauthorized: ##{msg[:id]}/#{msg[:user_id]}")
- }
- when "tweet"
- @channel << -> {
- log(:debug, "receive tweet: #{msg[:id]}")
- Tweet.create_from_json(msg)
- }
- when "favorite"
- @channel << -> {
- log(:debug, "receive favorite: #{msg[:source][:id]} => #{msg[:target_object][:id]}")
- f = Favorite.create_from_json(msg)
- Notification.notify_favorites_count(f.tweet)
- }
- when "unfavorite"
- @channel << -> {
- log(:debug, "receive unfavorite: #{msg[:source][:id]} => #{msg[:target_object][:id]}")
- Favorite.destroy_from_json(msg)
- }
- when "retweet"
- @channel << -> {
- log(:debug, "receive retweet: #{msg[:user][:id]} => #{msg[:retweeted_status][:id]}")
- Retweet.create_from_json(msg)
- }
- when "delete"
- @channel << -> {
- log(:debug, "receive delete: #{msg[:delete][:status][:id]}")
- Tweet.destroy_from_json(msg) || Retweet.destroy_from_json(msg)
- }
- when "quit"
- log(:info, "receive quit: #{msg[:reason]}")
- send_data(type: "quit", message: "Bye")
- close_connection_after_writing
- else
- log(:warn, "unknown message: #{msg[:type]}")
- send_object(type: "error", message: "Unknown message type")
- end
- end
- end
-
- private
- def log(level, message)
- text = "[RECEIVER"
- text << ":#{@worker_number}" if @worker_number
- text << "] #{message}"
- Rails.logger.__send__(level, text)
- end
-
- def send_object(data)
- send_data(data.to_msgpack)
- end
-
- def auth(msg)
- secret_key = msg[:secret_key]
- unless secret_key == Settings.collector.secret_key
- log(:warn, "Invalid secret_key: \"#{secret_key}\"")
- send_object(type: "fatal", message: "invalid secret_key")
- close_connection_after_writing
- return
- end
-
- worker_number = (Settings.collector.count.times.to_a - @connections.keys).sort.first
- if worker_number == nil
- log(:warn, "all connection alive")
- send_object(type: "error", message: "all connection alive")
- close_connection_after_writing
- return
- end
-
- @connections[worker_number] = self
- @worker_number = worker_number
- @authorized = true
- log(:info, "connect")
- send_object(type: "ok", message: "connected")
-
- Account.set_of_collector(@worker_number).each do |account|
- send_account(account)
- end
- end
- end
-end
diff --git a/lib/collector/control_server.rb b/lib/collector/control_server.rb
new file mode 100644
index 0000000..413345e
--- /dev/null
+++ b/lib/collector/control_server.rb
@@ -0,0 +1,11 @@
+module Collector
+ class ControlServer
+ def register_account(account)
+ NodeManager.register_account(Marshal.load(account))
+ end
+
+ def deactivate_account(account)
+ NodeManager.unregister_account(Marshal.load(account))
+ end
+ end
+end
diff --git a/lib/collector/daemon.rb b/lib/collector/daemon.rb
index b9a8b3e..8c7194f 100644
--- a/lib/collector/daemon.rb
+++ b/lib/collector/daemon.rb
@@ -2,35 +2,35 @@ require "msgpack/rpc/transport/unix"
module Collector
class Daemon
- def self.start
- _logger = Logger.new(STDOUT)
- _logger.level = Rails.env.production? ? Logger::INFO : Logger::DEBUG
- ActiveRecord::Base.logger = Rails.logger = _logger
+ class << self
+ def start
+ set_loggers
- _sock_path = File.join(Rails.root, "tmp", "sockets", "receiver.sock")
+ EM.run do
+ sock_path = File.join(Rails.root, "tmp", "sockets", "receiver.sock")
+ File.delete(sock_path) if File.exists?(sock_path)
+ control = MessagePack::RPC::Server.new
+ control.listen(MessagePack::RPC::UNIXServerTransport.new(sock_path), Collector::ControlServer.new)
+ EM.defer { control.run }
- Rails.logger.info("Receiver started")
- File.delete(_sock_path) if File.exists?(_sock_path)
- EM.run do
- channel = EM::Channel.new
- EM.defer { channel.subscribe(&:call) }
+ nodes = EM.start_server("0.0.0.0", Settings.collector.server_port, Collector::NodeConnection)
- connections = {}
+ stop = -> _ do
+ control.stop
+ EM.stop_server(nodes)
+ EM.stop
+ end
- collector_server = EM.start_server("0.0.0.0", Settings.collector.server_port, CollectorConnection, channel, connections)
-
- reg_svr_listener = MessagePack::RPC::UNIXServerTransport.new(_sock_path)
- register_server = MessagePack::RPC::Server.new
- register_server.listen(reg_svr_listener, RegisterServer.new(connections))
- EM.defer { register_server.run }
-
- stop = Proc.new do
- EM.stop_server(collector_server)
- register_server.close
- EM.stop
+ Signal.trap("INT", &stop)
+ Signal.trap("TERM", &stop)
end
- Signal.trap(:INT, &stop)
- Signal.trap(:TERM, &stop)
+ end
+
+ private
+ def set_loggers
+ _logger = Logger.new(STDOUT)
+ _logger.level = Rails.env.production? ? Logger::INFO : Logger::DEBUG
+ ActiveRecord::Base.logger = Rails.logger = _logger
end
end
end
diff --git a/lib/collector/node_connection.rb b/lib/collector/node_connection.rb
new file mode 100644
index 0000000..9e44df7
--- /dev/null
+++ b/lib/collector/node_connection.rb
@@ -0,0 +1,118 @@
+module Collector
+ class NodeConnection < EM::Connection
+ attr_reader :connection_id
+
+ @@_id = 0
+
+ def initialize
+ @unpacker = MessagePack::Unpacker.new(symbolize_keys: true)
+ @connection_id = (@@_id += 1)
+ @authenticated = false
+ @closing = false
+ end
+
+ def unbind
+ if @closing
+ log(:info, "Connection was closed.")
+ else
+ log(:warn, "Connection was closed unexpectedly.")
+ NodeManager.unregister(self)
+ end
+ end
+
+ def receive_data(data)
+ @unpacker.feed_each(data) do |msg|
+ unless msg.is_a?(Hash) && msg[:title]
+ log(:warn, "Unknown message: #{msg}")
+ send_message(:error, text: "Unknown message.")
+ close_connection_after_writing
+ next
+ end
+
+ parse_message(msg)
+ end
+ end
+
+ def register_account(account)
+ send_message(:register,
+ id: account.id,
+ consumer_key: Settings.consumer.key,
+ consumer_secret: Settings.consumer.secret,
+ oauth_token: account.oauth_token,
+ oauth_token_secret: account.oauth_token_secret,
+ user_id: account.user_id)
+ log(:info, "Registered account ##{account.id}/#{account.user_id}")
+ end
+
+ def unregister_account(account)
+ send_message(:unregister,
+ id: account.id,
+ user_id: account.user_id)
+ log(:info, "Unregistered account ##{account.id}/#{account.user_id}")
+ end
+
+ private
+ def parse_message(msg)
+ unless @authenticated
+ if msg[:title] == "auth"
+ authenticate_node(msg)
+ else
+ log(:error, "Unauthenticated client: #{msg}")
+ send_message(:fatal, text: "You aren't authenticate.")
+ close_connection_after_writing
+ end
+ return
+ end
+
+ case msg[:title]
+ when "unauthorized"
+ log(:info, "Received unauthorized: ##{msg[:id]}/#{msg[:user_id]}")
+ when "tweet"
+ log(:debug, "Received tweet: #{msg[:id]}")
+ Tweet.create_from_json(msg)
+ when "favorite"
+ log(:debug, "Receive favorite: #{msg[:source][:id]} => #{msg[:target_object][:id]}")
+ f = Favorite.create_from_json(msg)
+ Notification.notify_favorites_count(f.tweet)
+ when "unfavorite"
+ log(:debug, "Receive unfavorite: #{msg[:source][:id]} => #{msg[:target_object][:id]}")
+ Favorite.destroy_from_json(msg)
+ when "retweet"
+ log(:debug, "Receive retweet: #{msg[:user][:id]} => #{msg[:retweeted_status][:id]}")
+ Retweet.create_from_json(msg)
+ when "delete"
+ log(:debug, "Receive delete: #{msg[:delete][:status][:id]}")
+ Tweet.destroy_from_json(msg) || Retweet.destroy_from_json(msg)
+ when "exit"
+ log(:info, "Closing this connection...")
+ @closing = true
+ NodeManager.unregister(self)
+ else
+ log(:warn, "Unknown message: #{msg[:title]}")
+ send_message(:error, text: "Unknown message.")
+ end
+ end
+
+ def authenticate_node(msg)
+ if msg.key?(:secret_key) && Settings.collector.secret_key == msg[:secret_key]
+ @authenticated = true
+ log(:info, "Connection authenticated.")
+ send_message(:authenticated)
+ NodeManager.register(self)
+ else
+ log(:warn, "Invalid secret_key: #{secret_key.inspect}")
+ send_message(:fatal, text: "Invalid secret_key.")
+ close_connection_after_writing
+ return
+ end
+ end
+
+ def send_message(title, hash = {})
+ send_data(hash.merge(title: title).to_msgpack)
+ end
+
+ def log(level, message)
+ Rails.logger.__send__(level, "[Node:#{@connection_id}] #{message}")
+ end
+ end
+end
diff --git a/lib/collector/node_manager.rb b/lib/collector/node_manager.rb
new file mode 100644
index 0000000..8136fe8
--- /dev/null
+++ b/lib/collector/node_manager.rb
@@ -0,0 +1,58 @@
+module Collector
+ class NodeManager
+ @@node_connections = []
+ @@active_connections = Array.new(Settings.collector.nodes_count)
+ @@inactive_connections = []
+
+ class << self
+ def register(node_connection)
+ @@node_connections << node_connection
+ @@inactive_connections << node_connection
+ bind
+ end
+
+ def unregister(node_connection)
+ @@node_connections.delete(node_connection)
+ i = @@active_connections.find_index(node_connection)
+ if i
+ @@active_connections[i] = nil
+ else
+ @@inactive_connections.delete(node_connection)
+ end
+ bind
+ end
+
+ def register_account(account)
+ n = account.id % Settings.collector.nodes_count
+ if @@active_connections[n]
+ @@active_connections[n].register_account(account)
+ end
+ end
+
+ def unregister_account(account)
+ n = account.id % Settings.collector.nodes_count
+ if @@active_connections[n]
+ @@active_connections[n].unregister_account(account)
+ end
+ end
+
+ private
+ def bind
+ first_inactive_id = @@active_connections.find_index(nil)
+ if first_inactive_id
+ con = @@inactive_connections.shift
+ if con
+ @@active_connections[first_inactive_id] = con
+ Rails.logger.warn("[NodeManager] Registered node ##{con.connection_id} as group ##{first_inactive_id}")
+ Account.for_node(first_inactive_id).each do |a|
+ con.register_account(a)
+ end
+ else
+ Rails.logger.warn("[NodeManager] Not enough nodes: (#{@@active_connections.count {|c| c }}/#{Settings.collector.nodes_count})")
+ end
+ end
+ end
+
+ end
+ end
+end
diff --git a/lib/collector/register_server.rb b/lib/collector/register_server.rb
deleted file mode 100644
index 21114ab..0000000
--- a/lib/collector/register_server.rb
+++ /dev/null
@@ -1,22 +0,0 @@
-module Collector
- class RegisterServer
- def initialize(connections)
- @connections = connections
- end
-
- def register(account_)
- account = Marshal.load(account_)
- con_num = account.id % Settings.collector.count
- con = @connections[con_num]
- if con
- if account.active?
- con.send_account(account)
- else
- con.send_stop_account(account)
- end
- else
- Rails.logger.info("Connection not found: connection_number: #{con_num} / account_id: #{account.id}")
- end
- end
- end
-end