diff options
author | re4k <re4k@re4k.info> | 2013-04-01 03:59:33 +0900 |
---|---|---|
committer | re4k <re4k@re4k.info> | 2013-04-01 03:59:33 +0900 |
commit | 90d8845257404bb445d7611f9e9c7828ec25dc34 (patch) | |
tree | 6f3d19afa530f4e9d502e55c36d64202c2878ac5 /lib | |
parent | 27bf5349f62ce11763441eb2aa8e69ee10bf9987 (diff) | |
download | aclog-90d8845257404bb445d7611f9e9c7828ec25dc34.tar.gz |
Add importing favs feature(partial: only status_activity).
Refactor models
Diffstat (limited to 'lib')
-rw-r--r-- | lib/receiver/worker.rb | 99 |
1 files changed, 43 insertions, 56 deletions
diff --git a/lib/receiver/worker.rb b/lib/receiver/worker.rb index 87b55da..345ad73 100644 --- a/lib/receiver/worker.rb +++ b/lib/receiver/worker.rb @@ -10,11 +10,6 @@ end class Receiver::Worker < DaemonSpawn::Base class DBProxyServer < EM::Connection - @@wq = EM::WorkQueue::WorkQueue.new do |arg| - arg.call - end - @@wq.start - def send_account_all Account.where("id % ? = ?", Settings.worker_count, @worker_number).each do |account| puts "Sent #{account.id}/#{account.user_id}" @@ -34,8 +29,17 @@ class Receiver::Worker < DaemonSpawn::Base def initialize $connections ||= {} + @worker_number = nil @pac = MessagePack::Unpacker.new + + @@saved_tweets ||= [] + unless defined?(@@wq) + @@wq = EM::WorkQueue::WorkQueue.new do |arg| + arg.call + end + @@wq.start + end end def post_init @@ -112,32 +116,32 @@ class Receiver::Worker < DaemonSpawn::Base def receive_user(msg) @@wq.push -> do $logger.debug("Received User(#{@worker_number}): #{msg["id"]}") - begin - rec = User.find_or_initialize_by(:id => msg["id"]) - rec.screen_name = msg["screen_name"] - rec.name = msg["name"] - rec.profile_image_url = msg["profile_image_url"] - rec.protected = msg["protected"] - rec.save! if rec.changed? - rescue - $logger.error("Unknown error while inserting user: #{$!}/#{$@}") - end + User.from_hash(:id => msg["id"], + :screen_name => msg["screen_name"], + :name => msg["name"], + :profile_image_url => msg["profile_image_url"], + :protected => msg["protected"]) end end def receive_tweet(msg) - @@wq.push -> do - $logger.debug("Received Tweet(#{@worker_number}): #{msg["id"]}") - begin - Tweet.create!(:id => msg["id"], - :text => msg["text"], - :source => msg["source"], - :tweeted_at => Time.parse(msg["tweeted_at"]), - :user_id => msg["user_id"]) - rescue ActiveRecord::RecordNotUnique - $logger.debug("Duplicate Tweet(#{@worker_number}): #{msg["id"]}") - rescue - $logger.error("Unknown error while inserting tweet: #{$!}/#{$@}") + Rails.logger.silence do + @@wq.push -> do + $logger.debug("Received Tweet(#{@worker_number}): #{msg["id"]}") + unless @@saved_tweets.include?(msg["id"]) + @@saved_tweets << msg["id"] + if @@saved_tweets.size > 10000 + $logger.debug("Tweet ids dropped: #{@@saved_tweets.shift}") + end + + Tweet.from_hash(:id => msg["id"], + :text => msg["text"], + :source => msg["source"], + :tweeted_at => Time.parse(msg["tweeted_at"]), + :user_id => msg["user_id"]) + else + $logger.debug("Tweet already exists(#{@worker_number}): #{msg["id"]}") + end end end end @@ -145,47 +149,30 @@ class Receiver::Worker < DaemonSpawn::Base def receive_favorite(msg) @@wq.push -> do $logger.debug("Receive Favorite(#{@worker_number}): #{msg["user_id"]} => #{msg["tweet_id"]}") - begin - Favorite.create!(:tweet_id => msg["tweet_id"], + Favorite.from_hash(:tweet_id => msg["tweet_id"], :user_id => msg["user_id"]) - rescue ActiveRecord::RecordNotUnique - $logger.debug("Duplicate Favorite(#{@worker_number}): #{msg["id"]}") - rescue - $logger.error("Unknown error while inserting favorite: #{$!}/#{$@}") - end end end def receive_retweet(msg) @@wq.push -> do - $logger.debug("Received Retweet") - begin - Retweet.create!(:id => msg["id"], + $logger.debug("Receive Retweet(#{@worker_number}): #{msg["user_id"]} => #{msg["tweet_id"]}") + Retweet.from_hash(:id => msg["id"], :tweet_id => msg["tweet_id"], :user_id => msg["user_id"]) - rescue ActiveRecord::RecordNotUnique - $logger.debug("Duplicate Retweet(#{@worker_number}): #{msg["id"]}") - rescue - $logger.error("Unknown error while inserting retweet: #{$!}/#{$@}") - end end end def receive_delete(msg) @@wq.push -> do - begin - if msg["id"] - $logger.debug("Receive Delete(#{@worker_number}): #{msg["id"]}") - Tweet.where(:id => msg["id"]).destroy_all - Retweet.where(:id => msg["id"]).destroy_all - elsif msg["tweet_id"] - $logger.debug("Receive Unfavorite(#{@worker_number}): #{msg["user_id"]} => #{msg["tweet_id"]}") - Favorite - .where("tweet_id = #{msg["tweet_id"]} AND user_id = #{msg["user_id"]}") - .destroy_all - end - rescue - $logger.error("Unknown error while deleting: #{$!}/#{$@}") + if msg["id"] + $logger.debug("Receive Delete(#{@worker_number}): #{msg["id"]}") + Tweet.delete_from_id(msg["id"]) + Retweet.delete_from_id(msg["id"]) + elsif msg["tweet_id"] + $logger.debug("Receive Unfavorite(#{@worker_number}): #{msg["user_id"]} => #{msg["tweet_id"]}") + Favorite.delete_from_hash(:tweet_id => msg["tweet_id"], + :user_id => msg["user_id"]) end end end @@ -241,7 +228,7 @@ class Receiver::Worker < DaemonSpawn::Base def initialize(opts = {}) super(opts) - $logger = Receiver::Logger.new(:info) + $logger = Receiver::Logger.new(Rails.env.development? ? :debug : :info) $connections = {} end |