aboutsummaryrefslogtreecommitdiffstats
path: root/lib/aclog
diff options
context:
space:
mode:
authorrhenium <re4k@re4k.info>2013-05-11 10:06:17 +0900
committerrhenium <re4k@re4k.info>2013-05-11 10:06:17 +0900
commite05734159a0166664f5fc62ddf13c3a7624836f9 (patch)
treed3ba13316854f2ef5a20e24482e81d89416e07b4 /lib/aclog
parent339cfd7db182fe80eef5109fe9d82229a2ab1584 (diff)
downloadaclog-e05734159a0166664f5fc62ddf13c3a7624836f9.tar.gz
split receiver
Diffstat (limited to 'lib/aclog')
-rw-r--r--lib/aclog/receiver/collector_server.rb208
-rw-r--r--lib/aclog/receiver/worker.rb207
2 files changed, 210 insertions, 205 deletions
diff --git a/lib/aclog/receiver/collector_server.rb b/lib/aclog/receiver/collector_server.rb
new file mode 100644
index 0000000..3aee3bc
--- /dev/null
+++ b/lib/aclog/receiver/collector_server.rb
@@ -0,0 +1,208 @@
+# -*- coding: utf-8 -*-
+module Aclog
+ module Receiver
+ class CollectorServer < EM::Connection
+ def send_account_all
+ Account.where("id % ? = ?", Settings.worker_count, @worker_number).each do |account|
+ send_account(account)
+ end
+ end
+
+ def send_account(account)
+ out = {:type => "account",
+ :id => account.id,
+ :oauth_token => account.oauth_token,
+ :oauth_token_secret => account.oauth_token_secret,
+ :user_id => account.user_id,
+ :consumer_version => account.consumer_version.to_i}
+ send_object(out)
+ Rails.logger.debug("Sent #{account.id}/#{account.user_id}")
+ end
+
+ def self.send_account(account)
+ if con = @@connections[account.id % Settings.worker_count]
+ con.send_account(account)
+ end
+ end
+
+ def initialize
+ @@connections ||= {}
+
+ @worker_number = nil
+ @pac = MessagePack::Unpacker.new
+
+ @@saved_tweets ||= []
+ unless defined?(@@wq)
+ @@wq = EM::Queue.new # ふぁぼ以外
+ EM.defer do
+ wcb = -> msg{msg.call; @@wq.pop &wcb}
+ @@wq.pop &wcb
+ end
+
+ @@nq = EM::Queue.new # 通知するやつ(ふぁぼ)
+ EM.defer do
+ ncb = -> msg{msg.call; @@nq.pop &ncb}
+ @@nq.pop &ncb
+ end
+ end
+ end
+
+ def post_init
+ # なにもしない。クライアントが
+ end
+
+ def unbind
+ Rails.logger.info("Connection closed(#{@worker_number})")
+ @@connections.delete_if{|k, v| v == self}
+ end
+
+ def receive_data(data)
+ @pac.feed_each(data) do |msg|
+ unless msg.is_a?(Hash) && msg["type"]
+ Rails.logger.warn("Unknown data: #{msg}")
+ send_object({:type => "fatal", :message => "Unknown data"})
+ close_connection_after_writing
+ return
+ end
+
+ if msg["type"] != "init" && !@authorized
+ Rails.logger.warn("Not authorized client: #{msg}")
+ send_object({:type => "fatal", :message => "You aren't authorized"})
+ close_connection_after_writing
+ return
+ end
+
+ case msg["type"]
+ when "init"
+ receive_init(msg)
+ when "unauthorized"
+ receive_unauthorized(msg)
+ when "user"
+ receive_user(msg)
+ when "tweet"
+ receive_tweet(msg)
+ when "favorite"
+ receive_favorite(msg)
+ when "retweet"
+ receive_retweet(msg)
+ when "delete"
+ receive_delete(msg)
+ when "spam"
+ receive_spam(msg)
+ when "quit"
+ # Heroku の cycling など
+ Rails.logger.info("Quit(#{@worker_number}): #{msg["reason"]}")
+ send_data({:type => "quit", :message => "Bye"})
+ close_connection_after_writing
+ else
+ Rails.logger.warn("Unknown message type(#{@worker_number}): #{msg["type"]}")
+ send_object({:type => "error", :message => "Unknown message type: #{msg["type"]}"})
+ end
+ end
+ end
+
+ def receive_init(msg)
+ secret_key = msg["secret_key"]
+ worker_number = msg["worker_number"]
+ unless secret_key == Settings.secret_key
+ Rails.logger.warn("Invalid secret_key(?:#{worker_number}): \"#{secret_key}\"")
+ send_object({:type => "fatal", :message => "Invalid secret_key"})
+ close_connection_after_writing
+ return
+ end
+ if worker_number > Settings.worker_count
+ Rails.logger.warn("Invalid worker_number: #{worker_number}, secret_key: \"#{secret_key}\"")
+ send_object({:type => "fatal", :message => "Invalid worker_number"})
+ close_connection_after_writing
+ return
+ end
+ if @@connections[worker_number]
+ @@connections[worker_number].close_connection
+ end
+ @@connections[worker_number] = self
+ @worker_number = worker_number
+ @authorized = true
+ Rails.logger.info("Connected(#{@worker_number})")
+ send_object({:type => "ok", :message => "Connected"})
+ send_account_all
+ end
+
+ def receive_unauthorized(msg)
+ Rails.logger.warn("Unauthorized(#{@worker_number}): #{msg["user_id"]}")
+ # unregister
+ end
+
+ def receive_user(msg)
+ @@wq.push -> do
+ Rails.logger.debug("Received User(#{@worker_number}): #{msg["id"]}")
+ User.from_hash(:id => msg["id"],
+ :screen_name => msg["screen_name"],
+ :name => msg["name"],
+ :profile_image_url => msg["profile_image_url"],
+ :protected => msg["protected"])
+ end
+ end
+
+ def receive_tweet(msg)
+ @@wq.push -> do
+ Rails.logger.debug("Received Tweet(#{@worker_number}): #{msg["id"]}")
+ unless @@saved_tweets.include?(msg["id"])
+ @@saved_tweets << msg["id"]
+ if @@saved_tweets.size > 100000
+ Rails.logger.debug("Tweet id dropped from cache: #{@@saved_tweets.shift}")
+ end
+
+ Tweet.from_hash(:id => msg["id"],
+ :text => msg["text"],
+ :source => msg["source"],
+ :tweeted_at => Time.parse(msg["tweeted_at"]),
+ :user_id => msg["user_id"])
+ else
+ Rails.logger.debug("Tweet already exists(#{@worker_number}): #{msg["id"]}")
+ end
+ end
+ end
+
+ def receive_favorite(msg)
+ @@nq.push -> do
+ Rails.logger.debug("Receive Favorite(#{@worker_number}): #{msg["user_id"]} => #{msg["tweet_id"]}")
+ f = Favorite.from_hash(:tweet_id => msg["tweet_id"],
+ :user_id => msg["user_id"])
+ if t = Tweet.find_by(id: msg["tweet_id"])
+ t.notify_favorite
+ end
+ end
+ end
+
+ def receive_retweet(msg)
+ @@wq.push -> do
+ Rails.logger.debug("Receive Retweet(#{@worker_number}): #{msg["user_id"]} => #{msg["tweet_id"]}")
+ Retweet.from_hash(:id => msg["id"],
+ :tweet_id => msg["tweet_id"],
+ :user_id => msg["user_id"])
+ end
+ end
+
+ def receive_delete(msg)
+ @@wq.push -> do
+ if msg["id"]
+ Rails.logger.debug("Receive Delete(#{@worker_number}): #{msg["id"]}")
+ Tweet.delete_from_id(msg["id"])
+ elsif msg["tweet_id"]
+ Rails.logger.debug("Receive Unfavorite(#{@worker_number}): #{msg["user_id"]} => #{msg["tweet_id"]}")
+ Favorite.delete_from_hash(:tweet_id => msg["tweet_id"],
+ :user_id => msg["user_id"])
+ end
+ end
+ end
+
+ def receive_spam(msg)
+ Rails.logger.info("Receive Spam(#{@worker_number}): #{msg["id"]}")
+ # @@wq.push -> do
+ # # TODO
+ # end
+ end
+ end
+ end
+end
+
diff --git a/lib/aclog/receiver/worker.rb b/lib/aclog/receiver/worker.rb
index 7ad9e56..7f5e5e9 100644
--- a/lib/aclog/receiver/worker.rb
+++ b/lib/aclog/receiver/worker.rb
@@ -11,209 +11,6 @@ end
module Aclog
module Receiver
class Worker < DaemonSpawn::Base
- class DBProxyServer < EM::Connection
- def send_account_all
- Account.where("id % ? = ?", Settings.worker_count, @worker_number).each do |account|
- send_account(account)
- end
- end
-
- def send_account(account)
- out = {:type => "account",
- :id => account.id,
- :oauth_token => account.oauth_token,
- :oauth_token_secret => account.oauth_token_secret,
- :user_id => account.user_id,
- :consumer_version => account.consumer_version.to_i}
- send_object(out)
- Rails.logger.debug("Sent #{account.id}/#{account.user_id}")
- end
-
- def self.send_account(account)
- if con = @@connections[account.id % Settings.worker_count]
- con.send_account(account)
- end
- end
-
- def initialize
- @@connections ||= {}
-
- @worker_number = nil
- @pac = MessagePack::Unpacker.new
-
- @@saved_tweets ||= []
- unless defined?(@@wq)
- @@wq = EM::Queue.new # ふぁぼ以外
- EM.defer do
- wcb = -> msg{msg.call; @@wq.pop &wcb}
- @@wq.pop &wcb
- end
-
- @@nq = EM::Queue.new # 通知するやつ(ふぁぼ)
- EM.defer do
- ncb = -> msg{msg.call; @@nq.pop &ncb}
- @@nq.pop &ncb
- end
- end
- end
-
- def post_init
- # なにもしない。クライアントが
- end
-
- def unbind
- Rails.logger.info("Connection closed(#{@worker_number})")
- @@connections.delete_if{|k, v| v == self}
- end
-
- def receive_data(data)
- @pac.feed_each(data) do |msg|
- unless msg.is_a?(Hash) && msg["type"]
- Rails.logger.warn("Unknown data: #{msg}")
- send_object({:type => "fatal", :message => "Unknown data"})
- close_connection_after_writing
- return
- end
-
- if msg["type"] != "init" && !@authorized
- Rails.logger.warn("Not authorized client: #{msg}")
- send_object({:type => "fatal", :message => "You aren't authorized"})
- close_connection_after_writing
- return
- end
-
- case msg["type"]
- when "init"
- receive_init(msg)
- when "unauthorized"
- receive_unauthorized(msg)
- when "user"
- receive_user(msg)
- when "tweet"
- receive_tweet(msg)
- when "favorite"
- receive_favorite(msg)
- when "retweet"
- receive_retweet(msg)
- when "delete"
- receive_delete(msg)
- when "spam"
- receive_spam(msg)
- when "quit"
- # Heroku の cycling など
- Rails.logger.info("Quit(#{@worker_number}): #{msg["reason"]}")
- send_data({:type => "quit", :message => "Bye"})
- close_connection_after_writing
- else
- Rails.logger.warn("Unknown message type(#{@worker_number}): #{msg["type"]}")
- send_object({:type => "error", :message => "Unknown message type: #{msg["type"]}"})
- end
- end
- end
-
- def receive_init(msg)
- secret_key = msg["secret_key"]
- worker_number = msg["worker_number"]
- unless secret_key == Settings.secret_key
- Rails.logger.warn("Invalid secret_key(?:#{worker_number}): \"#{secret_key}\"")
- send_object({:type => "fatal", :message => "Invalid secret_key"})
- close_connection_after_writing
- return
- end
- if worker_number > Settings.worker_count
- Rails.logger.warn("Invalid worker_number: #{worker_number}, secret_key: \"#{secret_key}\"")
- send_object({:type => "fatal", :message => "Invalid worker_number"})
- close_connection_after_writing
- return
- end
- if @@connections[worker_number]
- @@connections[worker_number].close_connection
- end
- @@connections[worker_number] = self
- @worker_number = worker_number
- @authorized = true
- Rails.logger.info("Connected(#{@worker_number})")
- send_object({:type => "ok", :message => "Connected"})
- send_account_all
- end
-
- def receive_unauthorized(msg)
- Rails.logger.warn("Unauthorized(#{@worker_number}): #{msg["user_id"]}")
- # unregister
- end
-
- def receive_user(msg)
- @@wq.push -> do
- Rails.logger.debug("Received User(#{@worker_number}): #{msg["id"]}")
- User.from_hash(:id => msg["id"],
- :screen_name => msg["screen_name"],
- :name => msg["name"],
- :profile_image_url => msg["profile_image_url"],
- :protected => msg["protected"])
- end
- end
-
- def receive_tweet(msg)
- @@wq.push -> do
- Rails.logger.debug("Received Tweet(#{@worker_number}): #{msg["id"]}")
- unless @@saved_tweets.include?(msg["id"])
- @@saved_tweets << msg["id"]
- if @@saved_tweets.size > 100000
- Rails.logger.debug("Tweet id dropped from cache: #{@@saved_tweets.shift}")
- end
-
- Tweet.from_hash(:id => msg["id"],
- :text => msg["text"],
- :source => msg["source"],
- :tweeted_at => Time.parse(msg["tweeted_at"]),
- :user_id => msg["user_id"])
- else
- Rails.logger.debug("Tweet already exists(#{@worker_number}): #{msg["id"]}")
- end
- end
- end
-
- def receive_favorite(msg)
- @@nq.push -> do
- Rails.logger.debug("Receive Favorite(#{@worker_number}): #{msg["user_id"]} => #{msg["tweet_id"]}")
- f = Favorite.from_hash(:tweet_id => msg["tweet_id"],
- :user_id => msg["user_id"])
- if t = Tweet.find_by(id: msg["tweet_id"])
- t.notify_favorite
- end
- end
- end
-
- def receive_retweet(msg)
- @@wq.push -> do
- Rails.logger.debug("Receive Retweet(#{@worker_number}): #{msg["user_id"]} => #{msg["tweet_id"]}")
- Retweet.from_hash(:id => msg["id"],
- :tweet_id => msg["tweet_id"],
- :user_id => msg["user_id"])
- end
- end
-
- def receive_delete(msg)
- @@wq.push -> do
- if msg["id"]
- Rails.logger.debug("Receive Delete(#{@worker_number}): #{msg["id"]}")
- Tweet.delete_from_id(msg["id"])
- elsif msg["tweet_id"]
- Rails.logger.debug("Receive Unfavorite(#{@worker_number}): #{msg["user_id"]} => #{msg["tweet_id"]}")
- Favorite.delete_from_hash(:tweet_id => msg["tweet_id"],
- :user_id => msg["user_id"])
- end
- end
- end
-
- def receive_spam(msg)
- Rails.logger.info("Receive Spam(#{@worker_number}): #{msg["id"]}")
- # @@wq.push -> do
- # # TODO
- # end
- end
- end
-
class RegisterServer < EM::Connection
def initialize
@pac = MessagePack::Unpacker.new
@@ -236,7 +33,7 @@ module Aclog
when "register"
account = Account.where(:id => msg["id"]).first
if account
- DBProxyServer.send_account(account)
+ Aclog::Receiver::CollectorServer.send_account(account)
Rails.logger.info("Account registered and sent")
else
Rails.logger.error("Unknown account id")
@@ -260,7 +57,7 @@ module Aclog
def start(args)
Rails.logger.info("Database Proxy Started")
EM.run do
- o = EM.start_server("0.0.0.0", Settings.listen_port, DBProxyServer)
+ o = EM.start_server("0.0.0.0", Settings.listen_port, Aclog::Receiver::CollectorServer)
i = EM.start_unix_domain_server(File.join(Rails.root, "tmp", "sockets", "receiver.sock"), RegisterServer)
stop = Proc.new do