diff options
author | Rhenium <rhenium@rhe.jp> | 2013-12-20 21:42:05 +0900 |
---|---|---|
committer | Rhenium <rhenium@rhe.jp> | 2013-12-20 21:42:05 +0900 |
commit | 4b56cd6b67d890c5e2ed763656a7fd7832b10d8f (patch) | |
tree | eb394e9c187b8bbc60b54d16b72200edbf9380ab /lib | |
parent | 376c50784c2c740c6caacd1e94550e3d9d77fe41 (diff) | |
download | aclog-4b56cd6b67d890c5e2ed763656a7fd7832b10d8f.tar.gz |
update workers
Diffstat (limited to 'lib')
-rw-r--r-- | lib/aclog/receiver/collector_connection.rb | 149 | ||||
-rw-r--r-- | lib/aclog/receiver/register_server.rb | 1 | ||||
-rw-r--r-- | lib/aclog/receiver/worker.rb | 6 |
3 files changed, 66 insertions, 90 deletions
diff --git a/lib/aclog/receiver/collector_connection.rb b/lib/aclog/receiver/collector_connection.rb index 36f652f..9f43f00 100644 --- a/lib/aclog/receiver/collector_connection.rb +++ b/lib/aclog/receiver/collector_connection.rb @@ -1,166 +1,141 @@ -# -*- coding: utf-8 -*- -require "time" - module Aclog module Receiver class CollectorConnection < EM::Connection - def initialize(connections) + def initialize(channel, connections) + @channel = channel @connections = connections @worker_number = nil - @pac = MessagePack::Unpacker.new - - unless defined? @@queue - @@queue = EM::Queue.new - - _cr = -> bl { bl.call; @@queue.pop(&_cr) } - EM.defer { @@queue.pop(&_cr) } - end + @unpacker = MessagePack::Unpacker.new end def send_account(account) send_object(type: "account", id: account.id, + consumer_key: Settings.consumer.key, + consumer_secret: Settings.consumer.secret, oauth_token: account.oauth_token, oauth_token_secret: account.oauth_token_secret, user_id: account.user_id) - Rails.logger.debug("Sent #{account.id}/#{account.user_id}") + log(:debug, "send: #{account.id}/#{account.user_id}") end def send_stop_account(account) send_object(type: "stop", id: account.id) - Rails.logger.debug("Sent Stop #{account.id}/#{account.user_id}") + log(:debug, "send stop: #{account.id}/#{account.user_id}") end def post_init - # なにもしない。クライアントが end def unbind @connections.reject! {|k, v| v == self } - Rails.logger.info("Connection closed(#{@worker_number})") + log(:info, "connection closed") end def receive_data(data) - @pac.feed_each(data) do |msg| + @unpacker.feed_each(data) do |msg| unless msg.is_a?(Hash) && msg["type"] - Rails.logger.warn("Unknown data: #{msg}") - send_object(type: "fatal", message: "Unknown data") + log(:error, "unknown data: #{msg}") + send_object(type: "fatal", message: "unknown data") close_connection_after_writing return end - if not @authorized and not msg["type"] == "init" - Rails.logger.warn("Not authorized client: #{msg}") - send_object(type: "fatal", message: "You aren't authorized") - close_connection_after_writing + unless @authorized + if msg["type"] == "auth" + auth(msg) + else + log(:warn, "not authorized client: #{msg}") + send_object(type: "fatal", message: "You aren't authorized") + close_connection_after_writing + end return end case msg["type"] - when "init" - receive_init(msg) when "unauthorized" - receive_unauthorized(msg) + @channel << -> { + log(:warn, "unauthorized: #{msg["user_id"]}") + } when "tweet" - receive_tweet(msg) + @channel << -> { + log(:debug, "receive tweet: #{msg["id"]}") + Tweet.from_receiver(msg) + } when "favorite" - receive_favorite(msg) + @channel << -> { + log(:debug, "receive favorite: #{msg["source"]["id"]} => #{msg["target_object"]["id"]}") + if f = Favorite.from_receiver(msg) + f.tweet.notify_favorite + end + } when "unfavorite" - receive_unfavorite(msg) + @channel << -> { + log(:debug, "receive unfavorite: #{msg["source"]["id"]} => #{msg["target_object"]["id"]}") + Favorite.delete_from_receiver(msg) + } when "retweet" - receive_retweet(msg) + @channel << -> { + log(:debug, "receive retweet: #{msg["user"]["id"]} => #{msg["retweeted_status"]["id"]}") + Retweet.from_receiver(msg) + } when "delete" - receive_delete(msg) + @channel << -> { + log(:debug, "receive delete: #{msg["id"]}") + Tweet.delete_from_receiver(msg) + } when "quit" - Rails.logger.info("Quit(#{@worker_number}): #{msg["reason"]}") + log(:info, "receive quit: #{msg["reason"]}") send_data(type: "quit", message: "Bye") close_connection_after_writing else - Rails.logger.warn("Unknown message type(#{@worker_number}): #{msg["type"]}") - send_object(type: "error", message: "Unknown message type: #{msg["type"]}") + log(:warn, "unknown message: #{msg["type"]}") + send_object(type: "error", message: "Unknown message type") end end end private + def log(level, message) + text = "[RECEIVER" + text << ":#{@worker_number}" if @worker_number + text << "] #{message}" + Rails.logger.__send__(level, text) + end + def send_object(data) send_data(data.to_msgpack) end - def receive_init(msg) + def auth(msg) secret_key = msg["secret_key"] unless secret_key == Settings.collector.secret_key - Rails.logger.warn("Invalid secret_key: \"#{secret_key}\"") - send_object(type: "fatal", message: "Invalid secret_key") + log(:warn, "Invalid secret_key: \"#{secret_key}\"") + send_object(type: "fatal", message: "invalid secret_key") close_connection_after_writing return end - worker_number = Settings.collector.count.times.find {|num| !@connections.key?(num) } + worker_number = (Settings.collector.count.times.to_a - @connections.keys).sort.first if worker_number == nil - Rails.logger.warn("Invalid worker_number: #{worker_number}") - send_object(type: "fatal", message: "Invalid worker_number") + log(:warn, "all connection alive") + send_object(type: "fatal", message: "all connection alive") close_connection_after_writing return end - if @connections[worker_number] - @connections[worker_number].close_connection - end @connections[worker_number] = self @worker_number = worker_number @authorized = true - Rails.logger.info("Connected(#{@worker_number})") - send_object(type: "ok", message: "Connected") + log(:info, "connect") + send_object(type: "ok", message: "connected") Account.set_of_collector(@worker_number).each do |account| send_account(account) end end - - def receive_unauthorized(msg) - Rails.logger.warn("Unauthorized(#{@worker_number}): #{msg["user_id"]}") - # unregister - end - - def receive_tweet(msg) - @@queue.push -> do - Rails.logger.debug("Received Tweet(#{@worker_number}): #{msg["id"]}") - Tweet.from_receiver(msg) - end - end - - def receive_favorite(msg) - @@queue.push -> do - Rails.logger.debug("Receive Favorite(#{@worker_number}): #{msg["user"]["id"]} => #{msg["tweet"]["id"]}") - if f = Favorite.from_receiver(msg) - f.tweet.notify_favorite - end - end - end - - def receive_unfavorite(msg) - @@queue.push -> do - Rails.logger.debug("Receive Unfavorite(#{@worker_number}): #{msg["user"]["id"]} => #{msg["tweet"]["id"]}") - Favorite.delete_from_receiver(msg) - end - end - - def receive_retweet(msg) - @@queue.push -> do - Rails.logger.debug("Receive Retweet(#{@worker_number}): #{msg["user"]["id"]} => #{msg["tweet"]["id"]}") - Retweet.from_receiver(msg) - end - end - - def receive_delete(msg) - @@queue.push -> do - Rails.logger.debug("Receive Delete(#{@worker_number}): #{msg["id"]}") - Tweet.delete_from_receiver(msg) - end - end end end end diff --git a/lib/aclog/receiver/register_server.rb b/lib/aclog/receiver/register_server.rb index 541efd5..f5b6060 100644 --- a/lib/aclog/receiver/register_server.rb +++ b/lib/aclog/receiver/register_server.rb @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- module Aclog module Receiver class RegisterServer diff --git a/lib/aclog/receiver/worker.rb b/lib/aclog/receiver/worker.rb index 9896853..db60951 100644 --- a/lib/aclog/receiver/worker.rb +++ b/lib/aclog/receiver/worker.rb @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- require "msgpack/rpc/transport/unix" module Aclog @@ -17,9 +16,12 @@ module Aclog Rails.logger.info("Receiver started") File.delete(_sock_path) if File.exists?(_sock_path) EM.run do + channel = EM::Channel.new + EM.defer { channel.subscribe(&:call) } + connections = {} - collector_server = EM.start_server("0.0.0.0", Settings.collector.server_port, CollectorConnection, connections) + collector_server = EM.start_server("0.0.0.0", Settings.collector.server_port, CollectorConnection, channel, connections) reg_svr_listener = MessagePack::RPC::UNIXServerTransport.new(_sock_path) register_server = MessagePack::RPC::Server.new |