diff options
author | rhenium <re4k@re4k.info> | 2013-05-11 13:36:13 +0900 |
---|---|---|
committer | rhenium <re4k@re4k.info> | 2013-05-11 13:36:13 +0900 |
commit | e5accc8e74bf2a6f66638b1085ee7cc85abd281c (patch) | |
tree | 33399d3b0742dca1f79fa51eda0a48ba10d1b471 /lib | |
parent | e05734159a0166664f5fc62ddf13c3a7624836f9 (diff) | |
download | aclog-e5accc8e74bf2a6f66638b1085ee7cc85abd281c.tar.gz |
use MessagePack-RPC in Login - Daemon
Diffstat (limited to 'lib')
-rw-r--r-- | lib/aclog/receiver/collector_server.rb | 106 | ||||
-rw-r--r-- | lib/aclog/receiver/worker.rb | 71 |
2 files changed, 78 insertions, 99 deletions
diff --git a/lib/aclog/receiver/collector_server.rb b/lib/aclog/receiver/collector_server.rb index 3aee3bc..1130a25 100644 --- a/lib/aclog/receiver/collector_server.rb +++ b/lib/aclog/receiver/collector_server.rb @@ -1,50 +1,34 @@ # -*- coding: utf-8 -*- +require "time" + module Aclog module Receiver class CollectorServer < EM::Connection - def send_account_all - Account.where("id % ? = ?", Settings.worker_count, @worker_number).each do |account| - send_account(account) - end - end - - def send_account(account) - out = {:type => "account", - :id => account.id, - :oauth_token => account.oauth_token, - :oauth_token_secret => account.oauth_token_secret, - :user_id => account.user_id, - :consumer_version => account.consumer_version.to_i} - send_object(out) - Rails.logger.debug("Sent #{account.id}/#{account.user_id}") - end - - def self.send_account(account) - if con = @@connections[account.id % Settings.worker_count] - con.send_account(account) - end - end - - def initialize - @@connections ||= {} + def initialize(connections) + @connections = connections @worker_number = nil @pac = MessagePack::Unpacker.new - @@saved_tweets ||= [] - unless defined?(@@wq) - @@wq = EM::Queue.new # ふぁぼ以外 - EM.defer do - wcb = -> msg{msg.call; @@wq.pop &wcb} - @@wq.pop &wcb - end + unless defined? @@queue + @@queue = EM::Queue.new - @@nq = EM::Queue.new # 通知するやつ(ふぁぼ) - EM.defer do - ncb = -> msg{msg.call; @@nq.pop &ncb} - @@nq.pop &ncb - end + _cr = -> bl { bl.call; @@queue.pop &_cr } + EM.defer { @@queue.pop &_cr } end + + @@saved_tweets ||= [] + end + + def send_account(account) + send_object( + type: "account", + id: account.id, + oauth_token: account.oauth_token, + oauth_token_secret: account.oauth_token_secret, + user_id: account.user_id, + consumer_version: account.consumer_version) + Rails.logger.debug("Sent #{account.id}/#{account.user_id}") end def post_init @@ -53,21 +37,21 @@ module Aclog def unbind Rails.logger.info("Connection closed(#{@worker_number})") - @@connections.delete_if{|k, v| v == self} + @connections.delete_if{|k, v| v == self} end def receive_data(data) @pac.feed_each(data) do |msg| unless msg.is_a?(Hash) && msg["type"] Rails.logger.warn("Unknown data: #{msg}") - send_object({:type => "fatal", :message => "Unknown data"}) + send_object(type: "fatal", message: "Unknown data") close_connection_after_writing return end - if msg["type"] != "init" && !@authorized + if not @authorized and not msg["type"] == "init" Rails.logger.warn("Not authorized client: #{msg}") - send_object({:type => "fatal", :message => "You aren't authorized"}) + send_object(type: "fatal", message: "You aren't authorized") close_connection_after_writing return end @@ -90,41 +74,49 @@ module Aclog when "spam" receive_spam(msg) when "quit" - # Heroku の cycling など Rails.logger.info("Quit(#{@worker_number}): #{msg["reason"]}") - send_data({:type => "quit", :message => "Bye"}) + send_data(type: "quit", message: "Bye") close_connection_after_writing else Rails.logger.warn("Unknown message type(#{@worker_number}): #{msg["type"]}") - send_object({:type => "error", :message => "Unknown message type: #{msg["type"]}"}) + send_object(type: "error", message: "Unknown message type: #{msg["type"]}") end end end + private + def send_object(data) + send_data(data.to_msgpack) + end + def receive_init(msg) secret_key = msg["secret_key"] worker_number = msg["worker_number"] unless secret_key == Settings.secret_key Rails.logger.warn("Invalid secret_key(?:#{worker_number}): \"#{secret_key}\"") - send_object({:type => "fatal", :message => "Invalid secret_key"}) + send_object(type: "fatal", message: "Invalid secret_key") close_connection_after_writing return end if worker_number > Settings.worker_count Rails.logger.warn("Invalid worker_number: #{worker_number}, secret_key: \"#{secret_key}\"") - send_object({:type => "fatal", :message => "Invalid worker_number"}) + send_object(type: "fatal", message: "Invalid worker_number") close_connection_after_writing return end - if @@connections[worker_number] - @@connections[worker_number].close_connection + + if @connections[worker_number] + @connections[worker_number].close_connection end - @@connections[worker_number] = self + @connections[worker_number] = self @worker_number = worker_number @authorized = true Rails.logger.info("Connected(#{@worker_number})") - send_object({:type => "ok", :message => "Connected"}) - send_account_all + send_object(type: "ok", message: "Connected") + + Account.where("id % ? = ?", Settings.worker_count, @worker_number).each do |account| + send_account(account) + end end def receive_unauthorized(msg) @@ -133,7 +125,7 @@ module Aclog end def receive_user(msg) - @@wq.push -> do + @@queue.push -> do Rails.logger.debug("Received User(#{@worker_number}): #{msg["id"]}") User.from_hash(:id => msg["id"], :screen_name => msg["screen_name"], @@ -144,7 +136,7 @@ module Aclog end def receive_tweet(msg) - @@wq.push -> do + @@queue.push -> do Rails.logger.debug("Received Tweet(#{@worker_number}): #{msg["id"]}") unless @@saved_tweets.include?(msg["id"]) @@saved_tweets << msg["id"] @@ -164,7 +156,7 @@ module Aclog end def receive_favorite(msg) - @@nq.push -> do + @@queue.push -> do Rails.logger.debug("Receive Favorite(#{@worker_number}): #{msg["user_id"]} => #{msg["tweet_id"]}") f = Favorite.from_hash(:tweet_id => msg["tweet_id"], :user_id => msg["user_id"]) @@ -175,7 +167,7 @@ module Aclog end def receive_retweet(msg) - @@wq.push -> do + @@queue.push -> do Rails.logger.debug("Receive Retweet(#{@worker_number}): #{msg["user_id"]} => #{msg["tweet_id"]}") Retweet.from_hash(:id => msg["id"], :tweet_id => msg["tweet_id"], @@ -184,7 +176,7 @@ module Aclog end def receive_delete(msg) - @@wq.push -> do + @@queue.push -> do if msg["id"] Rails.logger.debug("Receive Delete(#{@worker_number}): #{msg["id"]}") Tweet.delete_from_id(msg["id"]) @@ -198,7 +190,7 @@ module Aclog def receive_spam(msg) Rails.logger.info("Receive Spam(#{@worker_number}): #{msg["id"]}") - # @@wq.push -> do + # @@queue.push -> do # # TODO # end end diff --git a/lib/aclog/receiver/worker.rb b/lib/aclog/receiver/worker.rb index 7f5e5e9..fe383e2 100644 --- a/lib/aclog/receiver/worker.rb +++ b/lib/aclog/receiver/worker.rb @@ -1,49 +1,28 @@ # -*- coding: utf-8 -*- -require "time" +require "msgpack/rpc/transport/unix" -module EM - class Connection - def send_object(data) - send_data(data.to_msgpack) - end - end -end module Aclog module Receiver class Worker < DaemonSpawn::Base - class RegisterServer < EM::Connection - def initialize - @pac = MessagePack::Unpacker.new + class RegisterServer + def initialize(connections) + @connections = connections end - def post_init + def register(account_) + account = Marshal.load(account_) + con_num = account.id % Settings.worker_count + con = @connections[con_num] + if con + con.send_account(account) + Rails.logger.info("Sent account: connection_number: #{con_num} / account_id: #{account.id}") + else + Rails.logger.info("Connection not found: connection_number: #{con_num} / account_id: #{account.id}") + end end - def receive_data(data) - @pac.feed_each(data) do |msg| - Rails.logger.debug(msg.to_s) - unless msg["type"] - Rails.logger.error("Unknown message") - send_object({:type => "fatal", :message => "Unknown message"}) - close_connection_after_writing - return - end - - case msg["type"] - when "register" - account = Account.where(:id => msg["id"]).first - if account - Aclog::Receiver::CollectorServer.send_account(account) - Rails.logger.info("Account registered and sent") - else - Rails.logger.error("Unknown account id") - send_object({:type => "error", :message => "Unknown account id"}) - end - close_connection_after_writing - else - Rails.logger.warn("Unknown register command: #{msg["type"]}") - end - end + def unregister(account) + # TODO end end @@ -55,17 +34,25 @@ module Aclog end def start(args) - Rails.logger.info("Database Proxy Started") + Rails.logger.info("Receiver started") EM.run do - o = EM.start_server("0.0.0.0", Settings.listen_port, Aclog::Receiver::CollectorServer) - i = EM.start_unix_domain_server(File.join(Rails.root, "tmp", "sockets", "receiver.sock"), RegisterServer) + connections = {} + + collector_server = EM.start_server("0.0.0.0", Settings.listen_port, Aclog::Receiver::CollectorServer, connections) + + reg_svr_listener = MessagePack::RPC::UNIXServerTransport.new(File.join(Rails.root, "tmp", "sockets", "receiver.sock")) + register_server = MessagePack::RPC::Server.new + register_server.listen(reg_svr_listener, RegisterServer.new(connections)) + EM.defer { register_server.run } stop = Proc.new do - EM.stop_server(o) - EM.stop_server(i) + EM.stop_server(collector_server) + register_server.close + File.delete(File.join(Rails.root, "tmp", "sockets", "receiver.sock")) EM.stop end Signal.trap(:INT, &stop) + Signal.trap(:TERM, &stop) end end |